10 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H 11 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H 16 template <
typename Environment>
19 typedef typename Environment::Task Task;
24 threads_(num_threads),
26 coprimes_(num_threads),
27 waiters_(num_threads),
32 waiters_.resize(num_threads);
41 for (
int i = 1; i <= num_threads; i++) {
43 unsigned b = num_threads;
51 coprimes_.push_back(i);
54 for (
int i = 0; i < num_threads; i++) {
55 queues_.push_back(
new Queue());
57 for (
int i = 0; i < num_threads; i++) {
58 threads_.push_back(env_.CreateThread([
this, i]() { WorkerLoop(i); }));
70 for (
size_t i = 0; i < threads_.size(); i++)
delete threads_[i];
71 for (
size_t i = 0; i < threads_.size(); i++)
delete queues_[i];
74 void Schedule(std::function<
void()> fn) {
75 Task t = env_.CreateTask(std::move(fn));
76 PerThread* pt = GetPerThread();
77 if (pt->pool ==
this) {
79 Queue* q = queues_[pt->thread_id];
80 t = q->PushFront(std::move(t));
84 Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
85 t = q->PushBack(std::move(t));
100 int NumThreads()
const final {
101 return static_cast<int>(threads_.size());
104 int CurrentThreadId()
const final {
105 const PerThread* pt =
107 if (pt->pool ==
this) {
108 return pt->thread_id;
115 typedef typename Environment::EnvThread Thread;
118 constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
129 std::atomic<unsigned> blocked_;
130 std::atomic<bool> spinning_;
131 std::atomic<bool> done_;
135 void WorkerLoop(
int thread_id) {
136 PerThread* pt = GetPerThread();
138 pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
139 pt->thread_id = thread_id;
140 Queue* q = queues_[thread_id];
143 Task t = q->PopFront();
153 if (!spinning_ && !spinning_.exchange(
true)) {
154 for (
int i = 0; i < 1000 && !t.f; i++) {
160 if (!WaitForWork(waiter, &t)) {
174 PerThread* pt = GetPerThread();
175 const size_t size = queues_.size();
176 unsigned r = Rand(&pt->rand);
177 unsigned inc = coprimes_[r % coprimes_.size()];
178 unsigned victim = r % size;
179 for (
unsigned i = 0; i < size; i++) {
180 Task t = queues_[victim]->PopBack();
185 if (victim >= size) {
201 int victim = NonEmptyQueueIndex();
203 ec_.CancelWait(waiter);
204 *t = queues_[victim]->PopBack();
211 if (done_ && blocked_ == threads_.size()) {
212 ec_.CancelWait(waiter);
218 if (NonEmptyQueueIndex() != -1) {
232 ec_.CommitWait(waiter);
237 int NonEmptyQueueIndex() {
238 PerThread* pt = GetPerThread();
239 const size_t size = queues_.size();
240 unsigned r = Rand(&pt->rand);
241 unsigned inc = coprimes_[r % coprimes_.size()];
242 unsigned victim = r % size;
243 for (
unsigned i = 0; i < size; i++) {
244 if (!queues_[victim]->Empty()) {
248 if (victim >= size) {
255 static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
256 EIGEN_THREAD_LOCAL PerThread per_thread_;
257 PerThread* pt = &per_thread_;
261 static EIGEN_STRONG_INLINE
unsigned Rand(uint64_t* state) {
262 uint64_t current = *state;
264 *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
266 return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
274 #endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H Definition: EventCount.h:170
Namespace containing all symbols from the Eigen library.
Definition: bench_norm.cpp:85
Definition: EventCount.h:49
Definition: RunQueue.h:39
The MaxSizeVector class.
Definition: MaxSizeVector.h:31
Definition: NonBlockingThreadPool.h:17
Definition: ThreadPoolInterface.h:17