41 #include <boost/bind.hpp> 46 DefaultMessageHandler Comm::default_message_handler_;
49 message_handler_(message_handler),
52 shutdown_requested_(false),
53 write_in_progress_(false)
66 callback_thread_ = std::thread(std::bind(&Comm::process_callbacks,
this));
69 io_thread_ = std::thread(boost::bind(&boost::asio::io_service::run, &this->io_service_));
78 std::unique_lock<std::mutex> lock(callback_mutex_);
79 shutdown_requested_ =
true;
81 condition_variable_.notify_one();
86 if (io_thread_.joinable())
91 if (callback_thread_.joinable())
93 callback_thread_.join();
99 mutex_lock lock(write_mutex_);
101 for (
size_t pos = 0; pos < len; pos += WRITE_BUFFER_SIZE)
103 size_t num_bytes = (len - pos) > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : (len - pos);
104 write_queue_.emplace_back(src + pos, num_bytes);
112 receive_callback_ = fun;
117 listeners_.push_back(listener);
120 void Comm::async_read()
122 if (!is_open())
return;
124 do_async_read(boost::asio::buffer(read_buffer_, READ_BUFFER_SIZE),
125 boost::bind(&Comm::async_read_end,
127 boost::asio::placeholders::error,
128 boost::asio::placeholders::bytes_transferred));
131 void Comm::async_read_end(
const boost::system::error_code &error,
size_t bytes_transferred)
135 message_handler_.error(error.message());
141 std::unique_lock<std::mutex> lock(callback_mutex_);
142 read_queue_.emplace_back(read_buffer_, bytes_transferred);
145 condition_variable_.notify_one();
150 void Comm::async_write(
bool check_write_state)
152 if (check_write_state && write_in_progress_)
155 mutex_lock lock(write_mutex_);
156 if (write_queue_.empty())
159 write_in_progress_ =
true;
160 WriteBuffer& buffer = write_queue_.front();
161 do_async_write(boost::asio::buffer(buffer.dpos(), buffer.nbytes()),
162 boost::bind(&Comm::async_write_end,
164 boost::asio::placeholders::error,
165 boost::asio::placeholders::bytes_transferred));
168 void Comm::async_write_end(
const boost::system::error_code &error,
size_t bytes_transferred)
172 message_handler_.error(error.message());
177 mutex_lock lock(write_mutex_);
178 if (write_queue_.empty())
180 write_in_progress_ =
false;
184 WriteBuffer& buffer = write_queue_.front();
185 buffer.pos += bytes_transferred;
186 if (buffer.nbytes() == 0)
188 write_queue_.pop_front();
191 if (write_queue_.empty())
192 write_in_progress_ =
false;
197 void Comm::process_callbacks()
199 std::list<ReadBuffer> local_queue;
204 std::unique_lock<std::mutex> lock(callback_mutex_);
205 condition_variable_.wait(lock, [
this]{
return new_data_ || shutdown_requested_; });
208 if (shutdown_requested_)
214 local_queue.splice(local_queue.end(), read_queue_);
221 while (!local_queue.empty())
223 ReadBuffer buffer = local_queue.front();
224 if (receive_callback_)
226 receive_callback_(buffer.data, buffer.len);
228 for (std::reference_wrapper<CommListener> listener_ref : listeners_)
230 listener_ref.get().receive_callback(buffer.data, buffer.len);
232 local_queue.pop_front();
Abstract base class for getting comm events via a listener interface.
void send_bytes(const uint8_t *src, size_t len)
Send bytes from a buffer over the port.
bool init()
Initializes and opens the port.
void register_receive_callback(std::function< void(const uint8_t *, size_t)> fun)
Register a callback function for when bytes are received on the port.
Comm(MessageHandler &message_handler=default_message_handler_)
Set up asynchronous communication base class.
void close()
Closes the port.
Abstract base class for message handler.
void register_listener(CommListener &listener)
Register a listener for when bytes are received on the port.