My Project
NPLMessageQueue.h
1 #pragma once
2 #include "NPLMessage.h"
3 
4 #include <boost/circular_buffer.hpp>
5 #include <boost/thread.hpp>
6 #include "util/mutex.h"
7 #include <queue>
8 
9 namespace NPL
10 {
15  {
16  public:
18  void notify_one(){};
19 
20  template <typename T>
21  void wait(T type_) {};
22  };
23 
24  // forward declare
25  template <typename Data, typename Condition = boost::condition_variable>
27 
35  template<typename Data, typename Condition>
37  {
38  protected:
39  typedef boost::circular_buffer<Data> container_type;
40  typedef typename container_type::size_type size_type;
41  typedef typename container_type::value_type value_type;
42 
43  mutable boost::mutex m_mutex;
44  Condition m_condition_variable;
45 
51 
52  container_type m_container;
53  public:
54  enum BufferStatus
55  {
56  BufferOverFlow = 0,
57  BufferFull = 1,
58  BufferNormal = 2,
59  BufferEmpty = 3,
60  BufferFirst = 3,
61  };
62 
63  explicit concurrent_ptr_queue(size_type capacity) : m_container(capacity),m_use_event(true) {}
64 
69  void SetUseEvent(bool bUseEvent)
70  {
71  boost::mutex::scoped_lock lock(m_mutex);
72  m_use_event = bUseEvent;
73  }
74 
81  BufferStatus try_push(value_type& item)
82  {
83  boost::mutex::scoped_lock lock(m_mutex);
84  BufferStatus bufferStatus = m_container.empty() ? BufferFirst : BufferNormal;
85  if(m_container.full())
86  {
87  bufferStatus = BufferOverFlow;
88  }
89  else
90  {
91  m_container.push_back(item);
92  bufferStatus = m_container.full() ? BufferFull : bufferStatus;
93  }
94  item.reset();
95  lock.unlock();
96  if(m_use_event)
97  m_condition_variable.notify_one();
98  return bufferStatus;
99  }
100 
107  BufferStatus try_push_get_front(value_type& item, value_type** ppFrontItem)
108  {
109  boost::mutex::scoped_lock lock(m_mutex);
110  BufferStatus bufferStatus = m_container.empty() ? BufferFirst : BufferNormal;
111  if(m_container.full())
112  {
113  bufferStatus = BufferOverFlow;
114  }
115  else
116  {
117  m_container.push_back(item);
118  *ppFrontItem = &(m_container.front());
119  bufferStatus = m_container.full() ? BufferFull : bufferStatus;
120  }
121  item.reset();
122  lock.unlock();
123  if(m_use_event)
124  m_condition_variable.notify_one();
125  return bufferStatus;
126  }
127 
132  void push(value_type& data)
133  {
134  boost::mutex::scoped_lock lock(m_mutex);
135  m_container.push_back(data);
136  data.reset();
137  lock.unlock();
138  if(m_use_event)
139  m_condition_variable.notify_one();
140  }
141 
147  void push_front(value_type & data)
148  {
149  boost::mutex::scoped_lock lock(m_mutex);
150  m_container.push_front(data);
151  data.reset();
152  lock.unlock();
153  if(m_use_event)
154  m_condition_variable.notify_one();
155  }
156 
157  bool try_pop(value_type& popped_value)
158  {
159  boost::mutex::scoped_lock lock(m_mutex);
160  if(m_container.empty())
161  {
162  return false;
163  }
164 
165  popped_value=m_container.front();
166  m_container.pop_front();
167  return true;
168  }
169 
170  void wait_and_pop(value_type& popped_value)
171  {
172  boost::mutex::scoped_lock lock(m_mutex);
173  while(m_container.empty())
174  {
175  m_condition_variable.wait(lock);
176  }
177  popped_value=m_container.front();
178  m_container.pop_front();
179  }
180 
185  void wait(int nMessageCount = -1)
186  {
187  boost::mutex::scoped_lock lock(m_mutex);
188  if (nMessageCount >= 0 && (nMessageCount != (int)m_container.size() || nMessageCount>= (int)m_container.capacity()))
189  return;
190  m_condition_variable.wait(lock);
191  }
192 
197  value_type peek(size_type nIndex)
198  {
199  boost::mutex::scoped_lock lock(m_mutex);
200  if (nIndex < m_container.size())
201  {
202  return m_container.at(nIndex);
203  }
204  return value_type();
205  }
206 
210  bool try_pop_at(size_type nIndex, value_type& popped_value)
211  {
212  boost::mutex::scoped_lock lock(m_mutex);
213  if (nIndex < m_container.size())
214  {
215  if (nIndex == 0)
216  {
217  popped_value = m_container.front();
218  }
219  else
220  {
221  popped_value = m_container[nIndex];
222  for (size_type i = nIndex; i >= 1; i--)
223  {
224  m_container[i] = m_container[i-1];
225  }
226  }
227  m_container.pop_front();
228  return true;
229  }
230  return false;
231  }
232 
237  value_type * try_front()
238  {
239  boost::mutex::scoped_lock lock(m_mutex);
240  return m_container.empty() ? NULL : &(m_container.front());
241  }
242 
249  bool try_next(value_type** ppValueFront)
250  {
251  boost::mutex::scoped_lock lock(m_mutex);
252  if(m_container.empty())
253  {
254  return false;
255  }
256  else
257  {
258  m_container.pop_front();
259  if(ppValueFront!=0 && !m_container.empty())
260  {
261  *ppValueFront = &(m_container.front());
262  }
263  return true;
264  }
265  }
266 
267  bool empty() const
268  {
269  boost::mutex::scoped_lock lock(m_mutex);
270  return m_container.empty();
271  }
272  bool full() const {
273  boost::mutex::scoped_lock lock(m_mutex);
274  return m_container.full();
275  };
276 
278  size_type size() const
279  {
280  boost::mutex::scoped_lock lock(m_mutex);
281  return m_container.size();
282  }
283 
285  size_type capacity() const
286  {
287  return m_container.capacity();
288  }
289 
291  void set_capacity(size_type new_capacity)
292  {
293  boost::mutex::scoped_lock lock(m_mutex);
294  m_container.set_capacity(new_capacity);
295  }
296  };
297 
301  class CNPLMessageQueue : public concurrent_ptr_queue<NPLMessage_ptr>
302  {
303  public:
305  CNPLMessageQueue(int capacity);
306  ~CNPLMessageQueue();
307  public:
308 
309  };
310 
311 
312 }
define this to enable debugging of NPL code in visual studio
Definition: INPL.h:9
bool try_next(value_type **ppValueFront)
try pop from the front of the queue and return the front object after the pop.
Definition: NPLMessageQueue.h:249
size_type size() const
Get the number of elements currently stored in the circular_buffer.
Definition: NPLMessageQueue.h:278
it implements a producer/consumer(s) queue design pattern.
Definition: NPLMessageQueue.h:26
void set_capacity(size_type new_capacity)
Set the max number of elements that can be stored in the circular_buffer.
Definition: NPLMessageQueue.h:291
void push(value_type &data)
add a data item to the back of the queue.
Definition: NPLMessageQueue.h:132
void wait(int nMessageCount=-1)
simply wait for the next message to arrive.
Definition: NPLMessageQueue.h:185
bool m_use_event
whether to use event to inform consumer when new data items are added to the queue.
Definition: NPLMessageQueue.h:50
value_type * try_front()
Definition: NPLMessageQueue.h:237
Message queue implementation.
Definition: NPLMessageQueue.h:301
size_type capacity() const
Get the number of elements that can be stored in the circular_buffer.
Definition: NPLMessageQueue.h:285
dummy condition variable
Definition: NPLMessageQueue.h:14
void notify_one()
notify one dummy
Definition: NPLMessageQueue.h:18
void push_front(value_type &data)
add a data item to the front of the queue.
Definition: NPLMessageQueue.h:147
bool try_pop_at(size_type nIndex, value_type &popped_value)
pop message at given index.
Definition: NPLMessageQueue.h:210
BufferStatus try_push(value_type &item)
try push to back of the queue.
Definition: NPLMessageQueue.h:81
void SetUseEvent(bool bUseEvent)
whether to use event to inform consumer when new data items are added to the queue.
Definition: NPLMessageQueue.h:69
BufferStatus try_push_get_front(value_type &item, value_type **ppFrontItem)
same as try_push, except that it also returns pointer to the front object.
Definition: NPLMessageQueue.h:107
value_type peek(size_type nIndex)
Definition: NPLMessageQueue.h:197