Processor Counter Monitor
threadpool.h
1 // SPDX-License-Identifier: BSD-3-Clause
2 // Copyright (c) 2020, Intel Corporation
3 
4 #pragma once
5 
6 #include "debug.h"
7 
8 #include <thread>
9 #include <future>
10 #include <functional>
11 #include <mutex>
12 #include <condition_variable>
13 #include <queue>
14 
15 namespace pcm {
16 
17 class Work {
18 public:
19  Work() {}
20  virtual ~Work() {}
21  virtual void execute() = 0;
22 };
23 
24 template<class ReturnType>
25 class LambdaJob : public Work {
26 public:
27  template<class F, class ... Args>
28  LambdaJob( F&& f, Args&& ... args )
29  //: task_( std::forward<F>(f)(std::forward<Args>( args )... ) ) {
30  : task_(std::bind( f, args... ) ) {
31  }
32 
33  virtual void execute() override {
34  task_();
35  }
36 
37  std::future<ReturnType> getFuture() {
38  return task_.get_future();
39  }
40 
41 private:
42  std::packaged_task<ReturnType()> task_;
43 };
44 
45 class WorkQueue;
46 
47 class ThreadPool {
48 private:
49  ThreadPool( const int n ) {
50  for ( int i = 0; i < n; ++i )
51  addThread();
52  }
53 
54  ThreadPool( ThreadPool const& ) = delete;
55  ThreadPool & operator = ( ThreadPool const& ) = delete;
56 
57 public:
58  ~ThreadPool() {
59  for ( size_t i = 0; i < threads_.size(); ++i )
60  addWork( nullptr );
61  for ( size_t i = 0; i < threads_.size(); ++i )
62  threads_[i].join();
63  threads_.clear();
64  }
65 
66 public:
67  static ThreadPool& getInstance() {
68  static ThreadPool tp_(64);
69  return tp_;
70  }
71 
72  void addWork( Work* w ) {
73  DBG( 3, "WQ: Adding work" );
74  std::lock_guard<std::mutex> lg( qMutex_ );
75  workQ_.push( w );
76  queueCV_.notify_one();
77  DBG( 3, "WQ: Work available" );
78  }
79 
80  Work* retrieveWork() {
81  DBG( 3, "WQ: Retrieving work" );
82  std::unique_lock<std::mutex> lock( qMutex_ );
83  queueCV_.wait( lock, [this]{ return !workQ_.empty(); } );
84  Work* w = workQ_.front();
85  workQ_.pop();
86  lock.unlock();
87  DBG( 3, "WQ: Work retrieved" );
88 
89  return w;
90  }
91 
92 private:
93  void addThread() {
94  threads_.push_back( std::thread( std::bind( &this->execute, this ) ) );
95  }
96 
97  // Executes work items from a std::thread, do not call manually
98  static void execute( ThreadPool* );
99 
100 private:
101  std::vector<std::thread> threads_;
102  std::queue<Work*> workQ_;
103  std::mutex qMutex_;
104  std::condition_variable queueCV_;
105 };
106 
107 class WorkQueue {
108 public:
109  WorkQueue() : tp_( ThreadPool::getInstance() ), workProcessed_(0) {}
110  WorkQueue( WorkQueue const& ) = delete;
111  WorkQueue & operator = ( WorkQueue const& ) = delete;
112  ~WorkQueue() = default;
113 
114  // Just forwarding to the threadpool
115  void addWork( Work* w ) {
116  ++workProcessed_;
117  tp_.addWork( w );
118  }
119 
120 private:
121  ThreadPool& tp_;
122  size_t workProcessed_;
123 };
124 
125 } // namespace pcm
Definition: threadpool.h:107
Definition: threadpool.h:47
Definition: threadpool.h:17
Definition: bw.cpp:12
Definition: threadpool.h:25