My Project
InterprocessQueue.hpp
1 #pragma once
2 // Author: LiXizhi
3 // Date: 2010.4.21
4 #include <boost/interprocess/ipc/message_queue.hpp>
5 #ifdef WIN32
6  // I used to write my own implementation of message queue, since the default one used by boost under win32 has some securities issues that does not work under win7.
7  // this allows us to use IPC even in low integrity level process in vista and win7.
8  #include "ipc_message_queue.hpp"
9 #endif
10 #include <boost/shared_ptr.hpp>
11 
12 #include <boost/logic/tribool.hpp>
13 #include <boost/tuple/tuple.hpp>
14 #include <boost/array.hpp>
15 #include "util/Mutex.hpp"
16 
17 #ifdef WIN32
18 #ifndef snprintf
19 #define snprintf _snprintf
20 #endif
21 #endif
22 
23 namespace ParaEngine
24 {
25 #pragma region Msg Header
26 
28  {
29  public:
32  std::string m_method;
34  std::string m_from;
36  std::string m_to;
38  std::string m_filename;
40  std::string m_code;
42  DWORD m_nMsgType;
44  DWORD m_nParam1;
46  DWORD m_nParam2;
47 
48  void reset()
49  {
50  m_method.clear();
51  m_from.clear();
52  m_to.clear();
53  m_filename.clear();
54  m_code.clear();
55  m_nMsgType = 0;
56  m_nParam1 = 0;
57  m_nParam2 = 0;
58  }
59  };
61 #pragma endregion Msg Header
62 
63 #pragma region Incomming msg
64 
66  {
67  InterProcessMessageIn(){reset();};
69  std::string m_method;
70  std::string m_codes[3];
71  int m_code_lengths[3];
73  DWORD m_nMsgType;
75  DWORD m_nParam1;
77  DWORD m_nParam2;
78 
79  void reset(){
80  m_method.clear();
81  for (int i=0;i<3;++i)
82  {
83  m_code_lengths[i] = 0;
84  m_codes[i].clear();
85  }
86  m_nMsgType = 0;
87  m_nParam1 = 0;
88  m_nParam2 = 0;
89  };
90 
93  {
94  msg.m_method = m_method;
95  msg.m_nMsgType = m_nMsgType;
96  msg.m_nParam1 = m_nParam1;
97  msg.m_nParam2 = m_nParam2;
98  msg.m_from = m_codes[0];
99  msg.m_filename = m_codes[1];
100  msg.m_code = m_codes[2];
101  }
102  };
104 
105 
126  {
127  public:
128  enum Consume_Result
129  {
130  c_res_indeterminate,
131  c_res_true,
132  c_res_false,
133  c_res_code_body1,
134  c_res_code_body2,
135  c_res_code_body3,
136  };
137 
139  CInterProcessMessageIn_parser() : state_(method_start){};
140 
142  void reset(){state_ = method_start;};
143 
148  template <typename InputIterator>
149  boost::tuple<boost::tribool, InputIterator> parse(InterProcessMessageIn& req,
150  InputIterator begin, InputIterator end)
151  {
152  Consume_Result result = c_res_indeterminate;
153  while (begin != end && (result == c_res_indeterminate))
154  {
155  result = consume(req, *begin++);
156  }
157  if(begin != end && result >= c_res_code_body1)
158  {
159  // we can read to end or length instead of doing the lexical work one char at a time.
160  int nIndex = result - c_res_code_body1;
161  int nOldSize = req.m_codes[nIndex].size();
162  int nByteCount = end-begin;
163  if(req.m_code_lengths[nIndex] < (nOldSize+nByteCount))
164  {
165  nByteCount = req.m_code_lengths[nIndex] - nOldSize;
166  }
167  req.m_codes[nIndex].resize(nOldSize+nByteCount);
168  memcpy(&(req.m_codes[nIndex][nOldSize]), begin, nByteCount);
169  begin = begin + nByteCount;
170 
171  if (req.m_code_lengths[nIndex]==(int)req.m_codes[nIndex].size())
172  {
173  if(nIndex == (code_body1-code_length1-1)){
174  result = c_res_true;
175  }
176  else{
177  state_ = (state)(code_length1 + nIndex + 1);
178  result = c_res_indeterminate;
179  }
180  }
181  }
182  if(result == c_res_true)
183  {
184  if(m_bCompressed)
185  Decompress(req);
186  reset();
187  boost::tribool result_ = true;
188  return boost::make_tuple(result_, begin);
189  }
190  else if(result == c_res_false)
191  {
192  boost::tribool result_ = false;
193  return boost::make_tuple(result_, begin);
194  }
195  else
196  {
197  boost::tribool result_ = boost::indeterminate;
198  return boost::make_tuple(result_, begin);
199  }
200  }
201 
202  private:
204  Consume_Result consume(InterProcessMessageIn& req, char input)
205  {
206  switch (state_)
207  {
208  case method_start:
209  if (!is_char(input))
210  {
211  return c_res_false;
212  }
213  else
214  {
215  state_ = method;
216  req.reset();
217  req.m_method.push_back(input);
218  return c_res_indeterminate;
219  }
220  case method:
221  if (input == ' ')
222  {
223  state_ = msg_type_1;
224  return c_res_indeterminate;
225  }
226  else if (!is_char(input) || is_ctl(input))
227  {
228  return c_res_false;
229  }
230  else
231  {
232  req.m_method.push_back(input);
233  return c_res_indeterminate;
234  }
235  case msg_type_1:
236  case msg_type_2:
237  case msg_type_3:
238  case msg_type_4:
239  {
240  state_ = (state)(state_+1);
241  req.m_nMsgType = ((req.m_nMsgType)<<8) + ((unsigned char)input);
242  return c_res_indeterminate;
243  }
244  case msg_param1_1:
245  case msg_param1_2:
246  case msg_param1_3:
247  case msg_param1_4:
248  {
249  state_ = (state)(state_+1);
250  req.m_nParam1 = ((req.m_nParam1)<<8) + ((unsigned char)input);
251  return c_res_indeterminate;
252  }
253  case msg_param2_1:
254  case msg_param2_2:
255  case msg_param2_3:
256  case msg_param2_4:
257  {
258  state_ = (state)(state_+1);
259  req.m_nParam2 = ((req.m_nParam2)<<8) + ((unsigned char)input);
260  return c_res_indeterminate;
261  }
262  case code_length1:
263  case code_length2:
264  case code_length3:
265  {
266  int nIndex = (state_ - code_length1);
267  if(is_digit(input))
268  {
269  req.m_code_lengths[nIndex] = req.m_code_lengths[nIndex] * 10 + input - '0';
270  return c_res_indeterminate;
271  }
272  else if(input == ':' || input == '>')
273  {
274  m_bCompressed = (input == '>');
275  if(req.m_code_lengths[nIndex]== 0)
276  {
277  state_ = (state)(state_+1);
278  return (state_ == code_body1) ? c_res_true : c_res_indeterminate;
279  }
280  else
281  {
282  state_ = (state)(state_ + (code_body1-code_length1)); // change to body
283  req.m_codes[nIndex].reserve(req.m_code_lengths[nIndex]+1);
284  return (Consume_Result)(c_res_code_body1+nIndex);
285  }
286  }
287  }
288  case code_body1:
289  case code_body2:
290  case code_body3:
291  {
292  int nIndex = (state_ - code_body1);
293  if(req.m_code_lengths[nIndex]>(int)req.m_codes[nIndex].size())
294  {
295  // I used a special return value c_res_code_body to indicate that code body is met,
296  // so that we can read to end or length instead of doing the lexical work by char at a time.
297  req.m_codes[nIndex].push_back(input);
298  if (req.m_code_lengths[nIndex] > (int)req.m_codes[nIndex].size())
299  {
300  return (Consume_Result)(c_res_code_body1+nIndex);
301  }
302  else
303  {
304  if(nIndex == (code_body1-code_length1-1)){
305  return c_res_true;
306  }
307  else{
308  state_ = (state)(code_length1 + nIndex + 1);
309  return c_res_indeterminate;
310  }
311  }
312  }
313  }
314  default:
315  return c_res_false;
316  }
317  return c_res_false;
318  }
319 
321  void Decompress(InterProcessMessageIn& req) {};
322 
324  void Decode(InterProcessMessageIn& req) {};
325 
327  static bool is_char(int c){
328  return c >= 0 && c <= 127;
329  }
330 
332  static bool is_ctl(int c){
333  return c >= 0 && c <= 31 || c == 127;
334  }
335 
337  static bool is_digit(int c){
338  return c >= '0' && c <= '9';
339  }
340 
342  enum state
343  {
344  method_start,
345  method,
346  msg_type_1,
347  msg_type_2,
348  msg_type_3,
349  msg_type_4,
350  msg_param1_1,
351  msg_param1_2,
352  msg_param1_3,
353  msg_param1_4,
354  msg_param2_1,
355  msg_param2_2,
356  msg_param2_3,
357  msg_param2_4,
358  code_length1,
359  code_length2,
360  code_length3,
361  code_body1,
362  code_body2,
363  code_body3,
364  } state_;
365 
367  bool m_bCompressed;
368  };
369 #pragma endregion Incomming msg
370 
371 #pragma region Out Msg
372 
374  {
375  public:
376  typedef std::string BufferType_t;
377 
382  {
383  reset();
384  m_buffer.append(msg.m_method.empty() ? "NPL" : msg.m_method);
385  m_buffer.append(" ");
386  append(msg.m_nMsgType);
387  append(msg.m_nParam1);
388  append(msg.m_nParam2);
389  append(msg.m_from);
390  append(msg.m_filename);
391  append(msg.m_code);
392  return GetBufferSize();
393  }
394 
395  const char* GetBuffer() {return m_buffer.c_str();};
396  int GetBufferSize(){return (int)(m_buffer.size());};
397  public:
398  void reset()
399  {
400  m_buffer.clear();
401  }
402 
403  void append(const std::string & str)
404  {
405  append(str.c_str(), (int)(str.size()));
406  }
407  void append(const char* pData, int nLength)
408  {
409  assert(pData!=0);
410  if(nLength<=0)
411  nLength = (int)strlen(pData);
412  char tmp[16];
413  snprintf(tmp, 15, "%d:", nLength);
414  m_buffer.append(tmp);
415  if(nLength>0)
416  m_buffer.append(pData, nLength);
417  }
418  void append(DWORD dwValue)
419  {
420  unsigned char value[4];
421 
422  for (int i=0;i<4;++i)
423  {
424  value[i] = (dwValue >> ((3-i)*8)) & 0xff;
425  }
426  m_buffer.append((const char*)value, 4);
427  }
428  private:
429  BufferType_t m_buffer;
430  };
431 
432 #pragma endregion Out Msg
433 
434 #pragma region InterprocessQueue
435 
437  {
438  IPQU_create_only = 0,
439  IPQU_open_only,
440  IPQU_open_or_create,
441  IPQU_open_read_only, // not supported
442  IPQU_open_copy_on_write, // not supported
443  };
444 
447  {
448  IPRC_OK = 0,
449  IPRC_FAILED,
450  IPRC_QUEUE_IS_FULL,
451  IPRC_QUEUE_IS_EMPTY,
452  IPRC_QUEUE_NOT_FOUND,
453  IPRC_TIMEDOUT,
454  };
455 
513  template <int MAX_QUEUE_SIZE = 2000, int MAX_PACKET_SIZE = 256, typename MessageQueueType = boost::interprocess::message_queue>
515  {
516  public:
518  typedef boost::array<char, MAX_PACKET_SIZE> Buffer_Type;
519 
526  CInterprocessQueueT(const char* sQueueName, IPQueueUsageEnum usage = IPQU_open_or_create)
527  :m_sQueueName(sQueueName), m_usage(usage), m_queue_size(MAX_QUEUE_SIZE), m_max_packet_size(MAX_PACKET_SIZE)
528  {
529  using namespace boost::interprocess;
530  try
531  {
532  switch(m_usage)
533  {
534  case IPQU_create_only:
535  {
536  //Erase previous message queue
537  typename MessageQueueType::remove(m_sQueueName.c_str());
538  //Create a message_queue.
539  m_msg_queue.reset(new (typename MessageQueueType)(create_only //only create
540  ,m_sQueueName.c_str() //name
541  ,m_queue_size //max message number
542  ,m_max_packet_size //max message size
543  ));
544 
545  break;
546  }
547  case IPQU_open_only:
548  {
549  //Opens a message_queue. If the queue, does not exist throws an exception.
550  m_msg_queue.reset(new (typename MessageQueueType)(open_only,m_sQueueName.c_str()));
551  break;
552  }
553  case IPQU_open_or_create:
554  {
555  //Create a message_queue.
556  m_msg_queue.reset(new (typename MessageQueueType)(open_or_create
557  ,m_sQueueName.c_str() //name
558  ,m_queue_size //max message number
559  ,m_max_packet_size //max message size
560  ));
561  break;
562  }
563  default:
564  break;
565  }
566  }
567  catch (...)
568  {
569  m_msg_queue.reset();
570  }
571  };
573  {
574  };
575 
576  public:
577 
579  bool IsValid()
580  {
581  if(m_msg_queue)
582  return true;
583  else
584  return false;
585  }
586 
587  void Cleanup()
588  {
589  // in case of error, just clear the queue and reset parser.
590  Clear();
591  m_out_gen.reset();
592  m_input_msg.reset();
593  m_parser.reset();
594  }
595 
596 
598  bool Remove()
599  {
600  try
601  {
602  typename MessageQueueType::remove(m_sQueueName.c_str());
603  }
604  catch ( ... )
605  {
606  return false;
607  }
608  return true;
609  }
610 
612  void Clear()
613  {
614  if(m_msg_queue)
615  {
616  ParaEngine::Mutex::ScopedLock lock_(m_mutex);
617  try
618  {
619  std::size_t bytes_transferred = 0;
620  unsigned int nPriority = 0;
621  while (m_msg_queue->try_receive(m_buffer.c_array(), m_buffer.size(), bytes_transferred, nPriority) || m_msg_queue->get_num_msg()>0)
622  {
623  }
624  m_out_gen.reset();
625  m_input_msg.reset();
626  m_parser.reset();
627  }
628  catch (...)
629  {
630  }
631  }
632  }
633 
636  IPQueueReturnCodeEnum send(const InterProcessMessage& msg, unsigned int nPriority = 0)
637  {
638  if(m_msg_queue)
639  {
640  ParaEngine::Mutex::ScopedLock lock_(m_mutex);
641 
642  if( m_out_gen.GenerateMsg(msg) > 0)
643  {
644  int nSize = m_out_gen.GetBufferSize();
645  const char* pBuffer = m_out_gen.GetBuffer();
646 
647  int nMaxSize = (int)(m_msg_queue->get_max_msg_size());
648  if(nMaxSize == 0)
649  return IPRC_QUEUE_NOT_FOUND;
650 
651  assert(nMaxSize == m_max_packet_size);
652 
653  try
654  {
655  while(nSize > 0)
656  {
657  if(nSize > nMaxSize)
658  {
659  m_msg_queue->send(pBuffer, nMaxSize, nPriority);
660  nSize -= nMaxSize;
661  pBuffer += nMaxSize;
662  }
663  else
664  {
665  m_msg_queue->send(pBuffer, nSize, nPriority);
666  nSize = 0;
667  }
668  }
669  }
670  catch (...)
671  {
672  Cleanup();
673  return IPRC_FAILED;
674  }
675  }
676  }
677  else
678  {
679  return IPRC_QUEUE_NOT_FOUND;
680  }
681  return IPRC_OK;
682  }
683 
685  IPQueueReturnCodeEnum try_send(const InterProcessMessage& msg, unsigned int nPriority = 0)
686  {
687  if(m_msg_queue)
688  {
689  ParaEngine::Mutex::ScopedLock lock_(m_mutex);
690 
691  if( m_out_gen.GenerateMsg(msg) > 0)
692  {
693  int nSize = m_out_gen.GetBufferSize();
694  const char* pBuffer = m_out_gen.GetBuffer();
695 
696  try
697  {
698  int nMaxSize = (int)(m_msg_queue->get_max_msg_size());
699  if(nMaxSize == 0)
700  return IPRC_QUEUE_NOT_FOUND;
701 
702  assert(nMaxSize == m_max_packet_size);
703 
704  int nMaxMessageLength = (int)(m_msg_queue->get_max_msg() -m_msg_queue->get_num_msg())*nMaxSize;
705  if(nMaxMessageLength < nSize)
706  return IPRC_QUEUE_IS_FULL;
707 
708  while(nSize > 0)
709  {
710  if(nSize > nMaxSize)
711  {
712  m_msg_queue->send(pBuffer, nMaxSize, nPriority);
713  nSize -= nMaxSize;
714  pBuffer += nMaxSize;
715  }
716  else
717  {
718  m_msg_queue->send(pBuffer, nSize, nPriority);
719  nSize = 0;
720  }
721  }
722  }
723  catch (...)
724  {
725  Cleanup();
726  return IPRC_FAILED;
727  }
728  }
729  }
730  else
731  {
732  return IPRC_QUEUE_NOT_FOUND;
733  }
734  return IPRC_OK;
735  }
736 
740  IPQueueReturnCodeEnum receive(InterProcessMessage& msg, unsigned int & nPriority)
741  {
742  // m_in_parser
743  if(m_msg_queue)
744  {
745  try
746  {
747  while (true)
748  {
749  std::size_t bytes_transferred = 0;
750  m_msg_queue->receive(m_buffer.c_array(), m_buffer.size(), bytes_transferred, nPriority);
751 
752  boost::tribool result = true;
753  typename Buffer_Type::iterator curIt = m_buffer.begin();
754  typename Buffer_Type::iterator curEnd = m_buffer.begin() + bytes_transferred;
755 
756  while (curIt!=curEnd)
757  {
758  boost::tie(result, curIt) = m_parser.parse(m_input_msg, curIt, curEnd);
759  if (result)
760  {
761  // a complete message is read to m_input_msg.
762  m_input_msg.ToMessage(msg);
763  return IPRC_OK;
764  }
765  else if (!result)
766  {
767  m_input_msg.reset();
768  m_parser.reset();
769  break;
770  }
771  }
772 
773  if (!result)
774  {
775  // message parsing failed. the message format is not supported.
776  // This is a stream error, we shall close the connection
777  Cleanup();
778  return IPRC_FAILED;
779  }
780  else
781  {
782  // the message has not been received completely, needs to receive more data.
783  }
784  }
785  }
786  catch (...)
787  {
788  Cleanup();
789  return IPRC_FAILED;
790  }
791  }
792  else
793  {
794  return IPRC_QUEUE_NOT_FOUND;
795  }
796  return IPRC_OK;
797  }
798 
802  IPQueueReturnCodeEnum try_receive(InterProcessMessage& msg, unsigned int & nPriority)
803  {
804  // m_in_parser
805  if(m_msg_queue)
806  {
807  std::size_t bytes_transferred = 0;
808  try
809  {
810  while (m_msg_queue->try_receive(m_buffer.c_array(), m_buffer.size(), bytes_transferred, nPriority))
811  {
812  boost::tribool result = true;
813  typename Buffer_Type::iterator curIt = m_buffer.begin();
814  typename Buffer_Type::iterator curEnd = m_buffer.begin() + bytes_transferred;
815 
816  while (curIt!=curEnd)
817  {
818  boost::tie(result, curIt) = m_parser.parse(m_input_msg, curIt, curEnd);
819  if (result)
820  {
821  // a complete message is read to m_input_msg.
822  m_input_msg.ToMessage(msg);
823  return IPRC_OK;
824  }
825  else if (!result)
826  {
827  m_input_msg.reset();
828  m_parser.reset();
829  break;
830  }
831  }
832 
833  if (!result)
834  {
835  // message parsing failed. the message format is not supported.
836  // This is a stream error, we shall close the connection
837  Cleanup();
838  return IPRC_FAILED;
839  }
840  else
841  {
842  // the message has not been received completely, needs to receive more data.
843  }
844  }
845  }
846  catch (...)
847  {
848  Cleanup();
849  return IPRC_FAILED;
850  }
851  return IPRC_QUEUE_IS_EMPTY;
852  }
853  else
854  {
855  return IPRC_QUEUE_NOT_FOUND;
856  }
857  return IPRC_OK;
858  }
859 
860  const std::string& GetName() {return m_sQueueName;}
861  protected:
862  std::string m_sQueueName;
863  int m_queue_size;
864  int m_max_packet_size;
865  IPQueueUsageEnum m_usage;
866 
867  message_queue_t m_msg_queue;
868  CInterProcessMessageOut_gen m_out_gen;
869  InterProcessMessageIn m_input_msg;
871 
873  Buffer_Type m_buffer;
874 
875  ParaEngine::Mutex m_mutex;
876  };
877 
878 #ifdef WIN32
880 #else
882 #endif
883 
885 #pragma endregion InterprocessQueue
886 }
IPQueueReturnCodeEnum send(const InterProcessMessage &msg, unsigned int nPriority=0)
send a message and block if queue is full until message is sent out.
Definition: InterprocessQueue.hpp:636
DWORD m_nMsgType
this is a optional message type, which could mean how the m_code is interpreted. However its meaning ...
Definition: InterprocessQueue.hpp:73
std::string m_filename
the destination file name as in NPL.activate(filename, ...)
Definition: InterprocessQueue.hpp:38
std::string m_method
the method name string. It must not contain space &#39; &#39;. if empty, it will be assigned as "NPL"...
Definition: InterprocessQueue.hpp:32
IPQueueReturnCodeEnum try_receive(InterProcessMessage &msg, unsigned int &nPriority)
non-blocking call
Definition: InterprocessQueue.hpp:802
std::string m_to
the receiver message queue name, this is only assigned and used when sending a message.
Definition: InterprocessQueue.hpp:36
DWORD m_nParam2
this is another field to ease sending certain messages.
Definition: InterprocessQueue.hpp:46
IPQueueReturnCodeEnum
possible return code.
Definition: InterprocessQueue.hpp:446
Buffer_Type m_buffer
Buffer for incoming data.
Definition: InterprocessQueue.hpp:873
DWORD m_nParam2
this is another field to ease sending certain messages.
Definition: InterprocessQueue.hpp:77
different physics engine has different winding order.
Definition: EventBinding.h:32
It is for sending and receiving InterProcessMessage.
Definition: InterprocessQueue.hpp:514
void ToMessage(InterProcessMessage &msg)
convert to message
Definition: InterprocessQueue.hpp:92
CInterProcessMessageIn_parser()
Construct ready to parse the InterProcessMessageIn method.
Definition: InterprocessQueue.hpp:139
simple scoped lock function
Definition: Mutex.hpp:14
IPQueueReturnCodeEnum try_send(const InterProcessMessage &msg, unsigned int nPriority=0)
only send if queue is not full.
Definition: InterprocessQueue.hpp:685
for parsing incoming messages.
Definition: InterprocessQueue.hpp:125
Definition: class.hpp:124
DWORD m_nMsgType
this is a optional message type, which could mean how the m_code is interpreted. However its meaning ...
Definition: InterprocessQueue.hpp:42
boost::tuple< boost::tribool, InputIterator > parse(InterProcessMessageIn &req, InputIterator begin, InputIterator end)
Parse some data.
Definition: InterprocessQueue.hpp:149
IPQueueUsageEnum
how the message queue is used.
Definition: InterprocessQueue.hpp:436
IPQueueReturnCodeEnum receive(InterProcessMessage &msg, unsigned int &nPriority)
blocking call to force receive a message.
Definition: InterprocessQueue.hpp:740
cross platform mutex
Definition: Mutex.hpp:88
Definition: enum_maker.hpp:46
int GenerateMsg(const InterProcessMessage &msg)
call this function to generate a new message.
Definition: InterprocessQueue.hpp:381
it represents an interprocess message
Definition: InterprocessQueue.hpp:27
CInterprocessQueueT(const char *sQueueName, IPQueueUsageEnum usage=IPQU_open_or_create)
Since the message queue is a global object, it is not removed even the queue object is deleted...
Definition: InterprocessQueue.hpp:526
bool IsValid()
if this is valid.
Definition: InterprocessQueue.hpp:579
std::string m_from
the sender message queue name, this is only assigned and used when receiving a message.
Definition: InterprocessQueue.hpp:34
std::string m_method
number of bytes in m_code
Definition: InterprocessQueue.hpp:67
void Clear()
clear all messages.
Definition: InterprocessQueue.hpp:612
std::string m_code
the code (data) as in NPL.activate(..., scode), so in most cases it is a message table "msg={}" ...
Definition: InterprocessQueue.hpp:40
for reading an input message.
Definition: InterprocessQueue.hpp:65
bool Remove()
remove all messages.
Definition: InterprocessQueue.hpp:598
DWORD m_nParam1
this is another field to ease sending certain messages.
Definition: InterprocessQueue.hpp:44
generate an output message to be send via network.
Definition: InterprocessQueue.hpp:373
DWORD m_nParam1
this is another field to ease sending certain messages.
Definition: InterprocessQueue.hpp:75
void reset()
Reset to initial parser state.
Definition: InterprocessQueue.hpp:142