Zero  0.1.0
ringbuffer.h
Go to the documentation of this file.
1 #ifndef __RINGBUFFER_H
2 #define __RINGBUFFER_H
3 
4 #include "w_defines.h"
5 #include "basics.h"
6 #include "w_debug.h"
7 #include "vec_t.h"
8 #include "smthread.h"
9 #include "sm_base.h"
10 
39 public:
40  char* producerRequest();
41 
42  void producerRelease();
43 
44  char* consumerRequest();
45 
46  void consumerRelease();
47 
48  bool isFull() {
49  return begin == end && bparity != eparity;
50  }
51 
52  bool isEmpty() {
53  return begin == end && bparity == eparity;
54  }
55 
56  size_t getBlockSize() {
57  return blockSize;
58  }
59 
60  size_t getBlockCount() {
61  return blockCount;
62  }
63 
64  void set_finished(bool f = true) {
65  finished = f;
66  }
67 
68  bool* get_finished() {
69  return &finished;
70  } // not thread-safe
71  bool isFinished(); // thread-safe
72 
73 
74  AsyncRingBuffer(size_t bsize, size_t bcount)
75  : begin(0),
76  end(0),
77  bparity(true),
78  eparity(true),
79  finished(false),
80  blockSize(bsize),
81  blockCount(bcount) {
82  buf = new char[blockCount * blockSize];
83  DO_PTHREAD(pthread_mutex_init(&mutex, nullptr));
84  DO_PTHREAD(pthread_cond_init(&notEmpty, nullptr));
85  DO_PTHREAD(pthread_cond_init(&notFull, nullptr));
86  }
87 
89  delete buf;
90  DO_PTHREAD(pthread_mutex_destroy(&mutex));
91  DO_PTHREAD(pthread_cond_destroy(&notEmpty));
92  DO_PTHREAD(pthread_cond_destroy(&notFull));
93  }
94 
95 private:
96  char* buf;
97 
98  int begin;
99 
100  int end;
101 
102  bool bparity;
103 
104  bool eparity;
105 
106  bool finished;
107 
108  const size_t blockSize;
109 
110  const size_t blockCount;
111 
112  pthread_mutex_t mutex;
113 
114  pthread_cond_t notEmpty;
115 
116  pthread_cond_t notFull;
117 
118  bool wait(pthread_cond_t*);
119 
120  void increment(int& p, bool& parity) {
121  p = (p + 1) % blockCount;
122  if (p == 0) {
123  parity = !parity;
124  }
125  }
126 };
127 
128 inline bool AsyncRingBuffer::wait(pthread_cond_t* cond) {
129  struct timespec timeout;
130  smthread_t::timeout_to_timespec(100, timeout); // 100ms
131  // caller must have locked mutex!
132  int code = pthread_cond_timedwait(cond, &mutex, &timeout);
133  if (code == ETIMEDOUT) {
134  //DBGTHRD(<< "Wait timed out -- try again");
135  if (finished && isEmpty()) {
136  DBGTHRD(<< "Wait aborted: finished flag is set");
137  return false;
138  }
139  }
140  DO_PTHREAD_TIMED(code);
141  return true;
142 }
143 
145  CRITICAL_SECTION(cs, mutex);
146  while (isFull()) {
147  DBGTHRD(<< "Waiting for condition notFull ...");
148  if (!wait(&notFull)) {
149  DBGTHRD(<< "Produce request failed!");
150  return nullptr;
151  }
152  }
153  DBGTHRD(<< "Producer request: block " << end);
154  return buf + (end * blockSize);
155 }
156 
158  CRITICAL_SECTION(cs, mutex);
159  bool wasEmpty = isEmpty();
161  DBGTHRD(<< "Producer release, new end is " << end);
162 
163  if (wasEmpty) {
164  DBGTHRD(<< "Signal buffer not empty");
165  DO_PTHREAD(pthread_cond_signal(&notEmpty));
166  }
167 }
168 
170  CRITICAL_SECTION(cs, mutex);
171  while (isEmpty()) {
172  DBGTHRD(<< "Waiting for condition notEmpty ...");
173  if (!wait(&notEmpty)) {
174  DBGTHRD(<< "Consume request failed!");
175  return nullptr;
176  }
177  }
178  DBGTHRD(<< "Consumer request: block " << begin);
179  return buf + (begin * blockSize);
180 }
181 
183  CRITICAL_SECTION(cs, mutex);
184  bool wasFull = isFull();
186  DBGTHRD(<< "Consumer release, new begin is " << begin);
187 
188  if (wasFull) {
189  DBGTHRD(<< "Signal buffer not full");
190  DO_PTHREAD(pthread_cond_signal(&notFull));
191  }
192 }
193 
195  /*
196  * Acquiring the mutex here is done to ensure consistency
197  * of modifications to the finished flag. It creates a memory
198  * fence to ensure the correct order of operations in the case
199  * where one thread reading the flag after another one has set it.
200  *
201  * Caution! This does not mean that there are no blocks left for
202  * consumption---just that someone set the finished flag. The former
203  * case must be checked by calling producerRequest()
204  */
205  CRITICAL_SECTION(cs, mutex);
206  return finished;
207 }
208 
209 #endif // __RINGBUFFER_H
pthread_cond_t notEmpty
Definition: ringbuffer.h:114
Definition: ringbuffer.h:38
bool wait(pthread_cond_t *)
Definition: ringbuffer.h:128
bool bparity
Definition: ringbuffer.h:102
void set_finished(bool f=true)
Definition: ringbuffer.h:64
~AsyncRingBuffer()
Definition: ringbuffer.h:88
size_t getBlockCount()
Definition: ringbuffer.h:60
int end
Definition: ringbuffer.h:100
#define DO_PTHREAD(x)
Definition: w_pthread.h:74
bool isFull()
Definition: ringbuffer.h:48
#define DBGTHRD(arg)
Definition: w_debug.h:297
AsyncRingBuffer(size_t bsize, size_t bcount)
Definition: ringbuffer.h:74
bool isFinished()
Definition: ringbuffer.h:194
char * consumerRequest()
Definition: ringbuffer.h:169
void producerRelease()
Definition: ringbuffer.h:157
pthread_cond_t notFull
Definition: ringbuffer.h:116
bool eparity
Definition: ringbuffer.h:104
char * producerRequest()
Definition: ringbuffer.h:144
void consumerRelease()
Definition: ringbuffer.h:182
const size_t blockSize
Definition: ringbuffer.h:108
void increment(int &p, bool &parity)
Definition: ringbuffer.h:120
bool finished
Definition: ringbuffer.h:106
const size_t blockCount
Definition: ringbuffer.h:110
static void timeout_to_timespec(int timeout_ms, struct timespec &when)
Definition: smthread.cpp:335
#define DO_PTHREAD_TIMED(x)
Definition: w_pthread.h:87
int begin
Definition: ringbuffer.h:98
bool isEmpty()
Definition: ringbuffer.h:52
size_t getBlockSize()
Definition: ringbuffer.h:56
pthread_mutex_t mutex
Definition: ringbuffer.h:112
#define CRITICAL_SECTION(name, lock)
Definition: critical_section.h:75
bool * get_finished()
Definition: ringbuffer.h:68
char * buf
Definition: ringbuffer.h:96