xbmc
JobManager.h
1 /*
2  * Copyright (C) 2005-2018 Team Kodi
3  * This file is part of Kodi - https://kodi.tv
4  *
5  * SPDX-License-Identifier: GPL-2.0-or-later
6  * See LICENSES/README.md for more information.
7  */
8 
9 #pragma once
10 
11 #include "Job.h"
12 #include "threads/CriticalSection.h"
13 #include "threads/Thread.h"
14 
15 #include <queue>
16 #include <string>
17 #include <vector>
18 
19 class CJobManager;
20 
21 class CJobWorker : public CThread
22 {
23 public:
24  explicit CJobWorker(CJobManager *manager);
25  ~CJobWorker() override;
26 
27  void Process() override;
28 private:
29  CJobManager *m_jobManager;
30 };
31 
32 template<typename F>
33 class CLambdaJob : public CJob
34 {
35 public:
36  CLambdaJob(F&& f) : m_f(std::forward<F>(f)) {}
37  bool DoWork() override
38  {
39  m_f();
40  return true;
41  }
42  bool operator==(const CJob *job) const override
43  {
44  return this == job;
45  };
46 private:
47  F m_f;
48 };
49 
63 class CJobQueue: public IJobCallback
64 {
65  class CJobPointer
66  {
67  public:
68  explicit CJobPointer(CJob *job)
69  {
70  m_job = job;
71  m_id = 0;
72  };
73  void CancelJob();
74  void FreeJob()
75  {
76  delete m_job;
77  m_job = NULL;
78  };
79  bool operator==(const CJob *job) const
80  {
81  if (m_job)
82  return *m_job == job;
83  return false;
84  };
85  CJob *m_job;
86  unsigned int m_id;
87  };
88 public:
96  CJobQueue(bool lifo = false, unsigned int jobsAtOnce = 1, CJob::PRIORITY priority = CJob::PRIORITY_LOW);
97 
103  ~CJobQueue() override;
104 
113  bool AddJob(CJob *job);
114 
118  template<typename F>
119  void Submit(F&& f)
120  {
121  AddJob(new CLambdaJob<F>(std::forward<F>(f)));
122  }
123 
132  void CancelJob(const CJob *job);
133 
140  void CancelJobs();
141 
145  bool IsProcessing() const;
146 
158  void OnJobComplete(unsigned int jobID, bool success, CJob *job) override;
159 
170  void OnJobAbort(unsigned int jobID, CJob* job) override;
171 
172 protected:
177  bool QueueEmpty() const;
178 
179 private:
180  void OnJobNotify(CJob* job);
181  void QueueNextJob();
182 
183  typedef std::deque<CJobPointer> Queue;
184  typedef std::vector<CJobPointer> Processing;
185  Queue m_jobQueue;
186  Processing m_processing;
187 
188  unsigned int m_jobsAtOnce;
189  CJob::PRIORITY m_priority;
190  mutable CCriticalSection m_section;
191  bool m_lifo;
192 };
193 
205 class CJobManager final
206 {
207  class CWorkItem
208  {
209  public:
210  CWorkItem(CJob *job, unsigned int id, CJob::PRIORITY priority, IJobCallback *callback)
211  {
212  m_job = job;
213  m_id = id;
214  m_callback = callback;
215  m_priority = priority;
216  }
217  bool operator==(unsigned int jobID) const
218  {
219  return m_id == jobID;
220  };
221  bool operator==(const CJob *job) const
222  {
223  return m_job == job;
224  };
225  void FreeJob()
226  {
227  delete m_job;
228  m_job = NULL;
229  };
230  void Cancel()
231  {
232  m_callback = NULL;
233  };
234  CJob *m_job;
235  unsigned int m_id;
236  IJobCallback *m_callback;
237  CJob::PRIORITY m_priority;
238  };
239 
240 public:
241  CJobManager();
242 
253  unsigned int AddJob(CJob *job, IJobCallback *callback, CJob::PRIORITY priority = CJob::PRIORITY_LOW);
254 
258  template<typename F>
259  void Submit(F&& f, CJob::PRIORITY priority = CJob::PRIORITY_LOW)
260  {
261  AddJob(new CLambdaJob<F>(std::forward<F>(f)), nullptr, priority);
262  }
263 
267  template<typename F>
268  void Submit(F&& f, IJobCallback *callback, CJob::PRIORITY priority = CJob::PRIORITY_LOW)
269  {
270  AddJob(new CLambdaJob<F>(std::forward<F>(f)), callback, priority);
271  }
272 
278  void CancelJob(unsigned int jobID);
279 
285  void CancelJobs();
286 
293  void Restart();
294 
300  int IsProcessing(const std::string &type) const;
301 
308  void PauseJobs();
309 
314  void UnPauseJobs();
315 
321  bool IsProcessing(const CJob::PRIORITY &priority) const;
322 
323 protected:
324  friend class CJobWorker;
325  friend class CJob;
326  friend class CJobQueue;
327 
332  CJob* GetNextJob();
333 
341  void OnJobComplete(bool success, CJob *job);
342 
352  bool OnJobProgress(unsigned int progress, unsigned int total, const CJob *job) const;
353 
354 private:
355  CJobManager(const CJobManager&) = delete;
356  CJobManager const& operator=(CJobManager const&) = delete;
357 
361  CJob *PopJob();
362 
363  void StartWorkers(CJob::PRIORITY priority);
364  void RemoveWorker(const CJobWorker *worker);
365  static unsigned int GetMaxWorkers(CJob::PRIORITY priority);
366 
367  unsigned int m_jobCounter;
368 
369  typedef std::deque<CWorkItem> JobQueue;
370  typedef std::vector<CWorkItem> Processing;
371  typedef std::vector<CJobWorker*> Workers;
372 
373  JobQueue m_jobQueue[CJob::PRIORITY_DEDICATED + 1];
374  bool m_pauseJobs;
375  Processing m_processing;
376  Workers m_workers;
377 
378  mutable CCriticalSection m_section;
379  CEvent m_jobEvent;
380  bool m_running;
381 };
void Submit(F &&f)
Add a function f to this job queue.
Definition: JobManager.h:119
This is an Event class built from a ConditionVariable.
Definition: Event.h:35
virtual void OnJobProgress(unsigned int jobID, unsigned int progress, unsigned int total, const CJob *job)
An optional callback function that a job may call while processing.
Definition: Job.h:80
Definition: JobManager.h:21
void Submit(F &&f, IJobCallback *callback, CJob::PRIORITY priority=CJob::PRIORITY_LOW)
Add a function f to this job manager for asynchronously execution.
Definition: JobManager.h:268
Definition: Thread.h:44
Base class for jobs that are executed asynchronously.
Definition: Job.h:109
void Submit(F &&f, CJob::PRIORITY priority=CJob::PRIORITY_LOW)
Add a function f to this job manager for asynchronously execution.
Definition: JobManager.h:259
Job Manager class for scheduling asynchronous jobs.
Definition: JobManager.h:205
Definition: JobManager.h:33
bool DoWork() override
Main workhorse function of CJob instances.
Definition: JobManager.h:37
Callback interface for asynchronous jobs.
Definition: Job.h:31
void OnJobComplete(unsigned int jobID, bool success, CJob *job) override
The callback used when a job completes.
Definition: JobManager.cpp:81
PRIORITY
Priority levels for jobs, specified by clients when adding jobs to the CJobManager.
Definition: Job.h:116
Job Queue class to handle a queue of unique jobs to be processed sequentially.
Definition: JobManager.h:63