5 #include <condition_variable> 23 static const size_t DEFAULT_CONCURRENT_QUEUE_SIZE = 64;
28 std::vector<T> to_yield;
30 std::atomic<size_t> push_idx;
31 std::atomic<size_t> pop_idx;
33 std::condition_variable_any full_cv;
34 std::condition_variable_any empty_cv;
40 ConcurrentQueue(
size_t n=DEFAULT_CONCURRENT_QUEUE_SIZE) : N(n), to_yield(n), push_idx(0), pop_idx(0) {
50 bool empty() {
return push_idx == pop_idx; }
51 bool full() {
return (push_idx+1) % N == pop_idx; }
52 size_t size() {
return (pop_idx - push_idx + N) % N; }
56 std::unique_lock lck(lock);
59 while(
full()) full_cv.wait(lck);
61 to_yield[push_idx] = x;
63 push_idx = (push_idx + 1) % N;
64 assert(push_idx != pop_idx);
66 empty_cv.notify_one();
70 std::unique_lock lck(lock);
73 while(
empty()) empty_cv.wait(lck);
75 assert(push_idx != pop_idx);
76 T ret = std::move(to_yield[pop_idx]);
77 pop_idx = (pop_idx + 1) % N;;
109 std::vector<ConcurrentQueue<T>> QS;
110 std::atomic<size_t> pop_index;
116 void push(
const T& item,
size_t thr) {
117 assert(thr < nthreads);
125 if(QS[pop_index].
empty()) {
126 pop_index = (pop_index + 1) % nthreads;
129 return QS[pop_index].pop();
141 auto start_index = size_t(pop_index);
144 if(not QS[pop_index].
empty()) {
148 pop_index = (pop_index + 1) % nthreads;
150 if(pop_index == start_index)
157 assert(
false &&
"*** Should not get here");
return false;
Definition: OrderedLock.h:16
ConcurrentQueue(size_t n=DEFAULT_CONCURRENT_QUEUE_SIZE)
Definition: ConcurrentQueue.h:40
void push(const T &item, size_t thr)
Definition: ConcurrentQueue.h:116
bool empty()
Definition: ConcurrentQueue.h:137
T pop()
Definition: ConcurrentQueue.h:69
void resize(size_t n)
Definition: ConcurrentQueue.h:44
bool full()
Definition: ConcurrentQueue.h:51
volatile sig_atomic_t CTRL_C
A FIFO mutex (from stackoverflow) https://stackoverflow.com/questions/14792016/creating-a-lock-that-p...
bool empty()
Definition: ConcurrentQueue.h:50
void push(const T &x)
Definition: ConcurrentQueue.h:54
Definition: ConcurrentQueue.h:106
size_t size()
Definition: ConcurrentQueue.h:52
size_t nthreads
Definition: FleetArgs.h:20
Definition: ConcurrentQueue.h:22
std::optional< T > pop()
Definition: ConcurrentQueue.h:121
ConcurrentQueueRing(size_t t)
Definition: ConcurrentQueue.h:112