Fleet  0.0.9
Inference in the LOT
ConcurrentQueue.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <vector>
4 #include <mutex>
5 #include <condition_variable>
6 #include <queue>
7 #include <optional>
8 
9 #include "OrderedLock.h"
10 
21 template<typename T>
23  static const size_t DEFAULT_CONCURRENT_QUEUE_SIZE = 64;
24 
25  size_t N; // length of the queue
26 
27  // NOTE: We have tried to align to_yield better for multithreading, but nothing seems to improve speed
28  std::vector<T> to_yield;
29 
30  std::atomic<size_t> push_idx;
31  std::atomic<size_t> pop_idx;
32 
33  std::condition_variable_any full_cv; // any needed here to use OrderedLock
34  std::condition_variable_any empty_cv;
35 
36  mutable OrderedLock lock;
37 
38 public:
39 
40  ConcurrentQueue(size_t n=DEFAULT_CONCURRENT_QUEUE_SIZE) : N(n), to_yield(n), push_idx(0), pop_idx(0) {
41 
42  }
43 
44  void resize(size_t n) {
45  assert(n >= 2);
46  N = n;
47  to_yield.resize(n);
48  }
49 
50  bool empty() { return push_idx == pop_idx; }
51  bool full() { return (push_idx+1) % N == pop_idx; }
52  size_t size() { return (pop_idx - push_idx + N) % N; }
53 
54  void push(const T& x) {
55 
56  std::unique_lock lck(lock);
57 
58  // if we are here, we must wait until a spot frees up
59  while(full()) full_cv.wait(lck);
60 
61  to_yield[push_idx] = x;
62 
63  push_idx = (push_idx + 1) % N;
64  assert(push_idx != pop_idx); // better not
65 
66  empty_cv.notify_one();
67  }
68 
69  T pop() {
70  std::unique_lock lck(lock);
71 
72  // we are empty so we must wait
73  while(empty()) empty_cv.wait(lck);
74 
75  assert(push_idx != pop_idx); // better not
76  T ret = std::move(to_yield[pop_idx]);
77  pop_idx = (pop_idx + 1) % N;;
78 
79  full_cv.notify_one();
80 
81  return ret;
82  }
83 
84 };
85 
86 
105 template<typename T>
107 
108  size_t nthreads;
109  std::vector<ConcurrentQueue<T>> QS;
110  std::atomic<size_t> pop_index;
111 public:
112  ConcurrentQueueRing(size_t t) : nthreads(t), QS(nthreads), pop_index(0){
113 
114  }
115 
116  void push(const T& item, size_t thr) {
117  assert(thr < nthreads); // must have thr >= 0
118  QS[thr].push(item);
119  }
120 
121  std::optional<T> pop() {
122 
123  while(not CTRL_C) {
124  // on empty, we just move to the next slot -- no waiting here.
125  if(QS[pop_index].empty()) {
126  pop_index = (pop_index + 1) % nthreads;
127  }
128  else {
129  return QS[pop_index].pop();
130  }
131  }
132 
133  return std::nullopt;
134 // assert(false && "*** Should not get here"); return T{};
135  }
136 
137  bool empty() {
138  // this, helpfully, leaves pop_index on the next
139  // non-empty queue
140 
141  auto start_index = size_t(pop_index); // check if we loop around
142 
143  while(true) {
144  if(not QS[pop_index].empty()) {
145  return false;
146  }
147  else {
148  pop_index = (pop_index + 1) % nthreads;
149 
150  if(pop_index == start_index) // if we make it all the way around, we're empty
151  return true;
152  }
153  }
154 
155  // This is what we return on CTRL_C -- I guess we'll call it empty?
156 // return false;
157  assert(false && "*** Should not get here"); return false;
158  }
159 
160 };
Definition: OrderedLock.h:16
ConcurrentQueue(size_t n=DEFAULT_CONCURRENT_QUEUE_SIZE)
Definition: ConcurrentQueue.h:40
void push(const T &item, size_t thr)
Definition: ConcurrentQueue.h:116
bool empty()
Definition: ConcurrentQueue.h:137
T pop()
Definition: ConcurrentQueue.h:69
void resize(size_t n)
Definition: ConcurrentQueue.h:44
bool full()
Definition: ConcurrentQueue.h:51
volatile sig_atomic_t CTRL_C
A FIFO mutex (from stackoverflow) https://stackoverflow.com/questions/14792016/creating-a-lock-that-p...
bool empty()
Definition: ConcurrentQueue.h:50
void push(const T &x)
Definition: ConcurrentQueue.h:54
Definition: ConcurrentQueue.h:106
size_t size()
Definition: ConcurrentQueue.h:52
size_t nthreads
Definition: FleetArgs.h:20
Definition: ConcurrentQueue.h:22
std::optional< T > pop()
Definition: ConcurrentQueue.h:121
ConcurrentQueueRing(size_t t)
Definition: ConcurrentQueue.h:112