4 #include <boost/interprocess/ipc/message_queue.hpp> 8 #include "ipc_message_queue.hpp" 10 #include <boost/shared_ptr.hpp> 12 #include <boost/logic/tribool.hpp> 13 #include <boost/tuple/tuple.hpp> 14 #include <boost/array.hpp> 15 #include "util/Mutex.hpp" 19 #define snprintf _snprintf 25 #pragma region Msg Header 61 #pragma endregion Msg Header 63 #pragma region Incomming msg 70 std::string m_codes[3];
71 int m_code_lengths[3];
83 m_code_lengths[i] = 0;
142 void reset(){state_ = method_start;};
148 template <
typename InputIterator>
150 InputIterator begin, InputIterator end)
152 Consume_Result result = c_res_indeterminate;
153 while (begin != end && (result == c_res_indeterminate))
155 result = consume(req, *begin++);
157 if(begin != end && result >= c_res_code_body1)
160 int nIndex = result - c_res_code_body1;
161 int nOldSize = req.m_codes[nIndex].size();
162 int nByteCount = end-begin;
163 if(req.m_code_lengths[nIndex] < (nOldSize+nByteCount))
165 nByteCount = req.m_code_lengths[nIndex] - nOldSize;
167 req.m_codes[nIndex].resize(nOldSize+nByteCount);
168 memcpy(&(req.m_codes[nIndex][nOldSize]), begin, nByteCount);
169 begin = begin + nByteCount;
171 if (req.m_code_lengths[nIndex]==(
int)req.m_codes[nIndex].size())
173 if(nIndex == (code_body1-code_length1-1)){
177 state_ = (state)(code_length1 + nIndex + 1);
178 result = c_res_indeterminate;
182 if(result == c_res_true)
187 boost::tribool result_ =
true;
188 return boost::make_tuple(result_, begin);
190 else if(result == c_res_false)
192 boost::tribool result_ =
false;
193 return boost::make_tuple(result_, begin);
197 boost::tribool result_ = boost::indeterminate;
198 return boost::make_tuple(result_, begin);
218 return c_res_indeterminate;
224 return c_res_indeterminate;
226 else if (!is_char(input) || is_ctl(input))
233 return c_res_indeterminate;
240 state_ = (state)(state_+1);
242 return c_res_indeterminate;
249 state_ = (state)(state_+1);
251 return c_res_indeterminate;
258 state_ = (state)(state_+1);
260 return c_res_indeterminate;
266 int nIndex = (state_ - code_length1);
269 req.m_code_lengths[nIndex] = req.m_code_lengths[nIndex] * 10 + input -
'0';
270 return c_res_indeterminate;
272 else if(input ==
':' || input ==
'>')
274 m_bCompressed = (input ==
'>');
275 if(req.m_code_lengths[nIndex]== 0)
277 state_ = (state)(state_+1);
278 return (state_ == code_body1) ? c_res_true : c_res_indeterminate;
282 state_ = (state)(state_ + (code_body1-code_length1));
283 req.m_codes[nIndex].reserve(req.m_code_lengths[nIndex]+1);
284 return (Consume_Result)(c_res_code_body1+nIndex);
292 int nIndex = (state_ - code_body1);
293 if(req.m_code_lengths[nIndex]>(
int)req.m_codes[nIndex].size())
297 req.m_codes[nIndex].push_back(input);
298 if (req.m_code_lengths[nIndex] > (
int)req.m_codes[nIndex].size())
300 return (Consume_Result)(c_res_code_body1+nIndex);
304 if(nIndex == (code_body1-code_length1-1)){
308 state_ = (state)(code_length1 + nIndex + 1);
309 return c_res_indeterminate;
327 static bool is_char(
int c){
328 return c >= 0 && c <= 127;
332 static bool is_ctl(
int c){
333 return c >= 0 && c <= 31 || c == 127;
337 static bool is_digit(
int c){
338 return c >=
'0' && c <=
'9';
369 #pragma endregion Incomming msg 371 #pragma region Out Msg 376 typedef std::string BufferType_t;
385 m_buffer.append(
" ");
392 return GetBufferSize();
395 const char* GetBuffer() {
return m_buffer.c_str();};
396 int GetBufferSize(){
return (
int)(m_buffer.size());};
403 void append(
const std::string & str)
405 append(str.c_str(), (int)(str.size()));
407 void append(
const char* pData,
int nLength)
411 nLength = (int)strlen(pData);
413 snprintf(tmp, 15,
"%d:", nLength);
414 m_buffer.append(tmp);
416 m_buffer.append(pData, nLength);
418 void append(DWORD dwValue)
420 unsigned char value[4];
422 for (
int i=0;i<4;++i)
424 value[i] = (dwValue >> ((3-i)*8)) & 0xff;
426 m_buffer.append((
const char*)value, 4);
429 BufferType_t m_buffer;
432 #pragma endregion Out Msg 434 #pragma region InterprocessQueue 438 IPQU_create_only = 0,
442 IPQU_open_copy_on_write,
452 IPRC_QUEUE_NOT_FOUND,
513 template <
int MAX_QUEUE_SIZE = 2000,
int MAX_PACKET_SIZE = 256,
typename MessageQueueType = boost::
interprocess::message_queue>
518 typedef boost::array<char, MAX_PACKET_SIZE> Buffer_Type;
527 :m_sQueueName(sQueueName), m_usage(usage), m_queue_size(MAX_QUEUE_SIZE), m_max_packet_size(MAX_PACKET_SIZE)
529 using namespace boost::interprocess;
534 case IPQU_create_only:
537 typename MessageQueueType::remove(m_sQueueName.c_str());
539 m_msg_queue.reset(
new (
typename MessageQueueType)(create_only
540 ,m_sQueueName.c_str()
550 m_msg_queue.reset(
new (
typename MessageQueueType)(open_only,m_sQueueName.c_str()));
553 case IPQU_open_or_create:
556 m_msg_queue.reset(
new (
typename MessageQueueType)(open_or_create
557 ,m_sQueueName.c_str()
602 typename MessageQueueType::remove(m_sQueueName.c_str());
619 std::size_t bytes_transferred = 0;
620 unsigned int nPriority = 0;
621 while (m_msg_queue->try_receive(m_buffer.c_array(), m_buffer.size(), bytes_transferred, nPriority) || m_msg_queue->get_num_msg()>0)
642 if( m_out_gen.GenerateMsg(msg) > 0)
644 int nSize = m_out_gen.GetBufferSize();
645 const char* pBuffer = m_out_gen.GetBuffer();
647 int nMaxSize = (int)(m_msg_queue->get_max_msg_size());
649 return IPRC_QUEUE_NOT_FOUND;
651 assert(nMaxSize == m_max_packet_size);
659 m_msg_queue->send(pBuffer, nMaxSize, nPriority);
665 m_msg_queue->send(pBuffer, nSize, nPriority);
679 return IPRC_QUEUE_NOT_FOUND;
691 if( m_out_gen.GenerateMsg(msg) > 0)
693 int nSize = m_out_gen.GetBufferSize();
694 const char* pBuffer = m_out_gen.GetBuffer();
698 int nMaxSize = (int)(m_msg_queue->get_max_msg_size());
700 return IPRC_QUEUE_NOT_FOUND;
702 assert(nMaxSize == m_max_packet_size);
704 int nMaxMessageLength = (int)(m_msg_queue->get_max_msg() -m_msg_queue->get_num_msg())*nMaxSize;
705 if(nMaxMessageLength < nSize)
706 return IPRC_QUEUE_IS_FULL;
712 m_msg_queue->send(pBuffer, nMaxSize, nPriority);
718 m_msg_queue->send(pBuffer, nSize, nPriority);
732 return IPRC_QUEUE_NOT_FOUND;
749 std::size_t bytes_transferred = 0;
750 m_msg_queue->receive(m_buffer.c_array(), m_buffer.size(), bytes_transferred, nPriority);
752 boost::tribool result =
true;
753 typename Buffer_Type::iterator curIt = m_buffer.begin();
754 typename Buffer_Type::iterator curEnd = m_buffer.begin() + bytes_transferred;
756 while (curIt!=curEnd)
758 boost::tie(result, curIt) = m_parser.parse(m_input_msg, curIt, curEnd);
762 m_input_msg.ToMessage(msg);
794 return IPRC_QUEUE_NOT_FOUND;
807 std::size_t bytes_transferred = 0;
810 while (m_msg_queue->try_receive(m_buffer.c_array(), m_buffer.size(), bytes_transferred, nPriority))
812 boost::tribool result =
true;
813 typename Buffer_Type::iterator curIt = m_buffer.begin();
814 typename Buffer_Type::iterator curEnd = m_buffer.begin() + bytes_transferred;
816 while (curIt!=curEnd)
818 boost::tie(result, curIt) = m_parser.parse(m_input_msg, curIt, curEnd);
822 m_input_msg.ToMessage(msg);
851 return IPRC_QUEUE_IS_EMPTY;
855 return IPRC_QUEUE_NOT_FOUND;
860 const std::string& GetName() {
return m_sQueueName;}
862 std::string m_sQueueName;
864 int m_max_packet_size;
867 message_queue_t m_msg_queue;
885 #pragma endregion InterprocessQueue IPQueueReturnCodeEnum send(const InterProcessMessage &msg, unsigned int nPriority=0)
send a message and block if queue is full until message is sent out.
Definition: InterprocessQueue.hpp:636
DWORD m_nMsgType
this is a optional message type, which could mean how the m_code is interpreted. However its meaning ...
Definition: InterprocessQueue.hpp:73
std::string m_filename
the destination file name as in NPL.activate(filename, ...)
Definition: InterprocessQueue.hpp:38
std::string m_method
the method name string. It must not contain space ' '. if empty, it will be assigned as "NPL"...
Definition: InterprocessQueue.hpp:32
IPQueueReturnCodeEnum try_receive(InterProcessMessage &msg, unsigned int &nPriority)
non-blocking call
Definition: InterprocessQueue.hpp:802
std::string m_to
the receiver message queue name, this is only assigned and used when sending a message.
Definition: InterprocessQueue.hpp:36
DWORD m_nParam2
this is another field to ease sending certain messages.
Definition: InterprocessQueue.hpp:46
IPQueueReturnCodeEnum
possible return code.
Definition: InterprocessQueue.hpp:446
Buffer_Type m_buffer
Buffer for incoming data.
Definition: InterprocessQueue.hpp:873
DWORD m_nParam2
this is another field to ease sending certain messages.
Definition: InterprocessQueue.hpp:77
different physics engine has different winding order.
Definition: EventBinding.h:32
It is for sending and receiving InterProcessMessage.
Definition: InterprocessQueue.hpp:514
void ToMessage(InterProcessMessage &msg)
convert to message
Definition: InterprocessQueue.hpp:92
CInterProcessMessageIn_parser()
Construct ready to parse the InterProcessMessageIn method.
Definition: InterprocessQueue.hpp:139
simple scoped lock function
Definition: Mutex.hpp:14
IPQueueReturnCodeEnum try_send(const InterProcessMessage &msg, unsigned int nPriority=0)
only send if queue is not full.
Definition: InterprocessQueue.hpp:685
for parsing incoming messages.
Definition: InterprocessQueue.hpp:125
Definition: class.hpp:124
DWORD m_nMsgType
this is a optional message type, which could mean how the m_code is interpreted. However its meaning ...
Definition: InterprocessQueue.hpp:42
boost::tuple< boost::tribool, InputIterator > parse(InterProcessMessageIn &req, InputIterator begin, InputIterator end)
Parse some data.
Definition: InterprocessQueue.hpp:149
IPQueueUsageEnum
how the message queue is used.
Definition: InterprocessQueue.hpp:436
IPQueueReturnCodeEnum receive(InterProcessMessage &msg, unsigned int &nPriority)
blocking call to force receive a message.
Definition: InterprocessQueue.hpp:740
cross platform mutex
Definition: Mutex.hpp:88
Definition: enum_maker.hpp:46
int GenerateMsg(const InterProcessMessage &msg)
call this function to generate a new message.
Definition: InterprocessQueue.hpp:381
it represents an interprocess message
Definition: InterprocessQueue.hpp:27
CInterprocessQueueT(const char *sQueueName, IPQueueUsageEnum usage=IPQU_open_or_create)
Since the message queue is a global object, it is not removed even the queue object is deleted...
Definition: InterprocessQueue.hpp:526
bool IsValid()
if this is valid.
Definition: InterprocessQueue.hpp:579
std::string m_from
the sender message queue name, this is only assigned and used when receiving a message.
Definition: InterprocessQueue.hpp:34
std::string m_method
number of bytes in m_code
Definition: InterprocessQueue.hpp:67
void Clear()
clear all messages.
Definition: InterprocessQueue.hpp:612
std::string m_code
the code (data) as in NPL.activate(..., scode), so in most cases it is a message table "msg={}" ...
Definition: InterprocessQueue.hpp:40
for reading an input message.
Definition: InterprocessQueue.hpp:65
bool Remove()
remove all messages.
Definition: InterprocessQueue.hpp:598
DWORD m_nParam1
this is another field to ease sending certain messages.
Definition: InterprocessQueue.hpp:44
generate an output message to be send via network.
Definition: InterprocessQueue.hpp:373
DWORD m_nParam1
this is another field to ease sending certain messages.
Definition: InterprocessQueue.hpp:75
void reset()
Reset to initial parser state.
Definition: InterprocessQueue.hpp:142