Aruna
comm.cpp
Go to the documentation of this file.
1 //
2 // Created by noeel on 7-1-19.
3 //
4 
5 #include "aruna/comm.h"
6 #include "aruna/log.h"
7 #include <math.h>
8 #include <aruna/comm/Link.h>
9 #include <pthread.h>
10 #include <queue>
11 #include <set>
12 #include <aruna/comm/commTypes.h>
13 
14 namespace aruna {
15  namespace comm {
16 
17 // private
18  namespace {
19 // begin ahead declaration
20  // variables
21 
22 
24 
27  pthread_cond_t out_buffer_not_empty;
28  pthread_mutex_t out_buffer_critical;
29  std::queue<transmitpackage_t> out_buffer;
30  std::set<Link *> driverCandidates;
31 
35  std::set<channel_t*, channel_t::compare_refrence> channels;
36 
41 
46 
47 // functions
48 
53  void set_status(status_t status);
54 
62  err_t transmit(transmitpackage_t transmitpackage);
63 
68  void * transmissionQueueHandeler(void *);
69 
74  Link *getDriver();
75 
83  std::tuple<Link *, err_t> pickDriver();
84 
89  void setDriver(Link &driver);
90 
96  unsigned int rateDriver(Link &driver);
97 
101  void * _selectDriverTask(void *);
102 
106  void selectDriverTask();
107 
112  void * receiveHandeler(void *);
113 
114 // end ahead declaration
115 
116  void set_status(status_t status) {
118  }
119 
120  err_t transmit(transmitpackage_t transmitpackage) {
121  err_t return_msg = err_t::OK;
122  size_t send;
123  uint8_t* data = (uint8_t*) malloc(transmitpackage.data_lenght + transmitpackage_t::HEADER_SIZE);
124 
125  transmitpackage_t::transmitpackage_to_binary(transmitpackage, data);
126  send = getDriver()->transmit(data, transmitpackage.size);
127  if (send != transmitpackage.size)
128  return_msg = err_t::FAIL;
129  free(data);
130  return return_msg;
131  }
132 
133  void * transmissionQueueHandeler(void *) {
134  err_t transmit_msg = err_t::FAIL;
135  while(1) {
136 
137  pthread_mutex_lock(&out_buffer_critical);
138  while(out_buffer.empty()){
139 // TODO cond_wait after mutex_lock can cause a deadlock.
140  pthread_cond_wait(&out_buffer_not_empty, &out_buffer_critical);
141  }
142  transmit_msg = transmit(out_buffer.front());
143  if (transmit_msg != err_t::OK) {
144  log->warning("transmit of: %d, to: %d, failed: %s", out_buffer.front().from_port, out_buffer.front().to_port,
145  err_to_char.at(transmit_msg));
146  log->dump(log::level_t::WARNING, out_buffer.front().data_transmitting, out_buffer.front().data_lenght);
147  } else {
148  out_buffer.pop();
149  }
150  pthread_mutex_unlock(&out_buffer_critical);
151 
152  }
153  }
155  return driver;
156  }
157  void setDriver(Link &driver) {
158  comm::driver = &driver;
159  }
160 
161  std::tuple<Link *, err_t> pickDriver() {
162 // bestpick initalisren omdat hij anders een lege terug kan geven.
163  Link *bestPick = nullptr;
164  unsigned int bestPickScore = 0;
165  unsigned int s = 0;
166  if (driverCandidates.empty())
167  return std::make_tuple(bestPick, err_t::NO_DRIVER);
168  for (auto driverCandidate : driverCandidates) {
169  s = rateDriver(*driverCandidate);
170  if (s > bestPickScore) {
171  bestPick = driverCandidate;
172  bestPickScore = s;
173  }
174 // TODO drivers die het niet zijn geworden moeten worden gedeleted.
175  }
176  return std::make_tuple(bestPick, err_t::OK);
177  }
178 
179  unsigned int rateDriver(Link &driver) {
180  unsigned int score = 0;
181  err_t drivstrt = driver.startup_error;
182  if (drivstrt != err_t::OK) {
183  log->warning("driver failed to start: %s", err_to_char.at(drivstrt));
184  return score;
185  }
186  if (!driver.is_connected())
187  return score;
188  else
189  score += 15;
190  score *= (int) driver.get_speed() / 100;
191 
192  return score;
193  }
194  void * _selectDriverTask(void *) {
195  auto dr = pickDriver();
196  int pret = 0;
197  if (std::get<1>(dr) == err_t::OK) {
198  setDriver(*std::get<0>(dr));
199  } else {
200  log->error("Failed to pick new driver: %s", err_to_char.at(std::get<1>(dr)));
201  comm::stop();
202  }
203  pthread_exit(&pret);
204  return nullptr;
205  }
206 
208  pthread_create(NULL, NULL, _selectDriverTask, NULL);
209  }
210 
211  bool register_log() {
212  static bool registerd = false;
213  if (!registerd){
214  log = new log::channel_t("comm");
215  registerd = true;
216  }
217  return registerd;
218  }
219 
220  void * receiveHandeler(void *) {
221  uint16_t bytes_read = 0;
222  uint8_t size = 0;
223  uint8_t *buff;
224  Link* driver;
225 // TODO make it less busyloopy.
226  while (1) {
227  driver = getDriver();
228 // read the size of the package
229  bytes_read = driver->receive(&size, 1);
230 // TODO if statement seems odd, `size = 1` should suffise.
231  if (bytes_read && size >= 2) {
232  log->verbose("new incoming connection, size: %i", size);
233  buff = (uint8_t *) malloc(size);
234  buff[0] = size;
235  bytes_read = driver->receive(&buff[1], (size - 1));
236  log->dump(aruna::log::level_t::VERBOSE, buff, size);
237 
238  incoming_connection(buff, size);
239 // cleanup
240  free(buff);
241  size = 0;
242  bytes_read = 0;
243  }
244  }
245  }
246  }
247 
248 
249 
250 
252 // pick a driver
253  auto dr = pickDriver();
254  if (std::get<1>(dr) == err_t::OK) {
255 // if driver is found start comm with driver as paramter
256  return start(std::get<0>(dr));
257  }
258 // return pickDriver error.
259  return std::get<1>(dr);
260  }
261 
263  switch (get_status()) {
264  case status_t::RUNNING:
265  return err_t::NOT_STOPPED;
266  case status_t::PAUSED:
267 // TODO is dit slim? kan ongwenst gedrag vertonen als je van driver wilt wisselen.
268  return resume();
269  default:
270  break;
271  }
272  int err = 0;
273 // register com channel
274  register_log();
275 // set the driver
276  err_t driver_err;
277  setDriver(*driver);
278 // TODO check if driver is not NULL
279  driver_err = getDriver()->startup_error;
280 // init pthread mutex and cond
281  pthread_cond_init(&out_buffer_not_empty, NULL);
282  pthread_mutex_init(&out_buffer_critical, NULL);
283  if (driver_err != err_t::OK)
284  return driver_err;
285 
286  err = pthread_create(&transmissionQueueHandeler_thread_handeler, NULL, transmissionQueueHandeler, NULL);
287  if (err) {
288  log->error("failed to start transmissionQueueHandeler: %i", err);
289 // stop when task failes
290 // TODO destroy driver object
291 // TODO driver unsetten en destroyen.
292  return err_t::TASK_FAILED;
293  }
294  err = pthread_create(&receiveHandeler_thread_handeler, NULL, receiveHandeler, NULL);
295  if (err) {
296  log->error("failed to start receiveHandeler: %i", err);
297 // stop when task failes
298 // TODO destroy driver object
299 // TODO driver unsetten en destroyen.
300  return err_t::TASK_FAILED;
301  }
302 // set status to running if all succeeded
303 
305 
306  return err_t::OK;
307  }
308 
310  switch (get_status()) {
311  case status_t::STOPPED:
312  return err_t::NOT_STARTED;
313  default:
314  break;
315  }
316 // stop all
317  err_t driver_err;
318 // TODO destory driver object
319  pthread_cancel(receiveHandeler_thread_handeler);
321  int pret = 0;
322  pthread_cond_destroy(&out_buffer_not_empty);
323  pthread_mutex_destroy(&out_buffer_critical);
324  pthread_exit(&pret);
325 // TODO is this code even reached is the thread is terminated?
326  while(!out_buffer.empty())
327  out_buffer.pop();
329 
330  if (driver_err != err_t::OK)
331  return driver_err;
332  return err_t::OK;
333  }
334 
336  switch (get_status()) {
337  case status_t::STOPPED:
338  case status_t::PAUSED:
339  return err_t::NOT_STARTED;
340  default:
341  break;
342  }
343 // TODO suspend transmissionQueueHandeler_thread_handeler
345  return err_t::OK;
346  }
347 
349  switch (get_status()) {
350  case status_t::STOPPED:
351  return err_t::NOT_STARTED;
352  case status_t::RUNNING:
353  return err_t::NOT_PAUSED;
354  default:
355  break;
356  }
357 // TODO continue transmissionQueueHandeler_thread_handeler
359  return err_t::OK;
360  }
361 
363 /*
364  * TODO paramters should consist of the paramters of channel_t together with a handeler.
365  * handeler can then later be used for sending and accessing Rx array
366  */
367 
368 // TODO testen of deze check wel werkt.
369  if (channels.find(channel) != channels.end())
370  return err_t::CHANNEL_EXISTS;
371  if (channels.insert(channel).second) {
372  return err_t::OK;
373  } else
374  return err_t::BUFFER_OVERFLOW;
375 // TODO handeler fixen.
376  }
377 
379  if (channels.erase(&channel))
380  return err_t::OK;
381  else
382  return err_t::NO_CHANNEL;
383  }
384 
385  err_t
386  send(channel_t *channel, port_t to_port, uint8_t *data, size_t data_size, bool wait_for_ack) {
388  if (channels.find(channel) == channels.end()) {
389  return err_t::NO_CHANNEL;
390  }
391  uint16_t packages = ceil((double) data_size / (double) MAX_DATA_SIZE);
392 // how many packages are we waiting for?
393  uint8_t n_to_wait_for = 0;
394  // datasize length
395  size_t ds_l;
396  // datasize pointer
397  size_t ds_p = 0;
398 
399  if (!getDriver()->is_connected())
400  return err_t::NO_CONNECTION;
401 
402 // TODO check if tp is allocated
403  auto tp = (transmitpackage_t *) calloc(packages, sizeof(transmitpackage_t));
404 
405 // loop until all data in send
406  while (data_size > 0) {
407 // static data
408  tp[n_to_wait_for].to_port = to_port;
409  tp[n_to_wait_for].from_port = channel->port;
410  tp[n_to_wait_for].n = 0;
411  tp[n_to_wait_for].notify_on_ack = wait_for_ack;
412 
413  tp[n_to_wait_for].data_transmitting = &data[ds_p];
414 
415  ds_l = (data_size >= MAX_DATA_SIZE) ? MAX_DATA_SIZE : data_size;
416 
417  tp[n_to_wait_for].data_lenght = ds_l;
418  tp[n_to_wait_for].size = ds_l + transmitpackage_t::HEADER_SIZE;
419 
420  ds_p += ds_l;
421  data_size -= ds_l;
422  log->verbose("sending from: %d to: %d", tp[n_to_wait_for].from_port,
423  tp[n_to_wait_for].to_port);
424  log->dump(log::level_t::VERBOSE, tp[n_to_wait_for].data_transmitting, tp[n_to_wait_for].data_lenght);
425  pthread_mutex_lock(&out_buffer_critical);
426  out_buffer.push(tp[n_to_wait_for]);
427  pthread_mutex_unlock(&out_buffer_critical);
428  pthread_cond_broadcast(&out_buffer_not_empty);
429 
430  n_to_wait_for++;
431  }
432  free(tp);
433  return err_t::OK;
434  }
435 
436  bool is_connected() {
437  return getDriver()->is_connected();
438  }
439 
441  return status;
442  }
443 
444  err_t get_channels(char *buffer) {
445 // TODO
446  return err_t::OK;
447  }
448 
449  unsigned int get_speed() {
450  return getDriver()->get_speed();
451  }
452 
453  err_t incoming_connection(uint8_t *package, uint8_t package_size) {
454 /*
455  * TODO O(N) is eigenlijk te langzaam.
456  * Als we het meteen kunnen opzoeken door ook nog een set te maken (of hashing table) van de namen
457  * scheelt dat ons heel veel tijd want dan is het O(1)!
458  */
461  log->verbose("incoming connection");
462 
465 
466  for (auto channel : channels) {
467  if (channel->port == tp.to_port) {
468  pthread_mutex_lock((pthread_mutex_t*)&channel->in_buffer_lock);
469  channel->in_buffer.push(tp);
470  pthread_mutex_unlock((pthread_mutex_t*)&channel->in_buffer_lock);
471  pthread_cond_broadcast((pthread_cond_t*)&channel->in_buffer_not_empty);
472  return err_t::OK;
473  }
474  }
475  return err_t::NO_CHANNEL;
476 
477  }
478 
480 // comdriver moet eigenlijk een referentie zijn en niet een levend object.
481  register_log();
482  log->debug("registering driver");
483  if (driverCandidates.find(driver) != driverCandidates.end()) {
484  return err_t::DRIVER_EXISTS;
485  }
486  if (!driverCandidates.insert(driver).second)
487  return err_t::BUFFER_OVERFLOW;
488 // select new driver if comm is running
489  if (get_status() == status_t::RUNNING) {
490 // TODO its better to rate this new driver and compare its score to the current driver.
492  }
493  return err_t::OK;
494  }
495 
496 
498  if (driverCandidates.erase(driver)) {
499 // if we delete our own driver, search for a new one
500  if (comm::driver == driver)
502  return err_t::OK;
503  } else
504  return err_t::NO_DRIVER;
505  }
506 
508  pthread_mutex_init(&in_buffer_lock, NULL);
509  pthread_cond_init(&in_buffer_not_empty, NULL);
511  }
512 
513  err_t channel_t::send(port_t to_port, uint8_t *data, size_t data_size, bool wait_for_ack) {
514  return comm::send(this, to_port, data, data_size, wait_for_ack);
515  }
516 
518  pthread_mutex_destroy(&in_buffer_lock);
519  pthread_cond_destroy(&in_buffer_not_empty);
521  }
522 
523  }}
Definition: comm.cpp:14
std::set< channel_t *, channel_t::compare_refrence > channels
all endpoints
Definition: comm.cpp:35
int debug(const char *format,...)
log debug message
Definition: log.cpp:51
static void transmitpackage_to_binary(transmitpackage_t transp, uint8_t *bin)
Get binary array of transmitpackage, for sending over a link.
Definition: commTypes.h:95
void * transmissionQueueHandeler(void *)
Tramsmission handeler.
Definition: comm.cpp:133
static constexpr size_t MAX_DATA_SIZE
Definition: commTypes.h:34
static log::channel_t * log
Definition: comm.cpp:23
const std::map< err_t, char * > err_to_char
Definition: arunaTypes.h:54
pthread_mutex_t in_buffer_lock
Definition: commTypes.h:152
void selectDriverTask()
start a task to select a driver, does not block.
Definition: comm.cpp:207
err_t transmit(transmitpackage_t transmitpackage)
Transmit a package.
Definition: comm.cpp:120
void setDriver(Link &driver)
set the driver
Definition: comm.cpp:157
port_t to_port
to whom to send it to.
Definition: commTypes.h:49
unsigned int rateDriver(Link &driver)
rate the driver on speed, errors, active connection, realtime, connection type etc.
Definition: comm.cpp:179
uint8_t port_t
Definition: commTypes.h:30
unsigned int get_speed()
Get the speed of the link to the other hosts in bits per second.
Definition: comm.cpp:449
err_t register_channel(channel_t *channel)
Register a new communication endpoint.
Definition: comm.cpp:362
status_t status
stores the comm status
Definition: comm.cpp:40
status_t get_status()
Get the running status of com.
Definition: comm.cpp:440
err_t send(channel_t *channel, port_t to_port, uint8_t *data, size_t data_size, bool wait_for_ack)
Send data.
Definition: comm.cpp:386
void set_status(status_t status)
set the comm status
Definition: comm.cpp:116
static constexpr uint8_t HEADER_SIZE
Definition: commTypes.h:40
uint8_t data_lenght
size of the data
Definition: commTypes.h:70
err_t send(port_t to_port, uint8_t *data, size_t data_size, bool wait_for_ack=false)
Definition: comm.cpp:513
uint8_t size
total size of package (+ the size of the size variable)
Definition: commTypes.h:75
channel_t(port_t port)
Definition: comm.cpp:507
std::queue< transmitpackage_t > out_buffer
Definition: comm.cpp:29
Link * getDriver()
get the driver
Definition: comm.cpp:154
void * receiveHandeler(void *)
Task to handle incomming connections.
Definition: comm.cpp:220
err_t unregister_candidate_driver(Link *driver)
unregister a driver to be a comdiver candidate
Definition: comm.cpp:497
err_t stop()
Stop the communication, free all queue&#39;s, channels and buffers.
Definition: comm.cpp:309
err_t resume()
resume all communication.
Definition: comm.cpp:348
Link * driver
stores the driver.
Definition: comm.cpp:45
int verbose(const char *format,...)
log verbose message
Definition: log.cpp:38
err_t incoming_connection(uint8_t *package, uint8_t package_size)
Interrupt incomming connection handeler.
Definition: comm.cpp:453
pthread_cond_t in_buffer_not_empty
Definition: commTypes.h:151
bool is_connected()
Get the status of the comm link.
Definition: comm.cpp:436
void * _selectDriverTask(void *)
pick a new best driver, dont call directly will delete your process.
Definition: comm.cpp:194
err_t register_candidate_driver(Link *driver)
register a driver to be a comm driver candidate.
Definition: comm.cpp:479
err_t start()
Start new communication.
Definition: comm.cpp:251
endpoint type of a comm channel
Definition: commTypes.h:150
std::tuple< Link *, err_t > pickDriver()
pick the best available driver
Definition: comm.cpp:161
int warning(const char *format,...)
log warning message
Definition: log.cpp:77
int error(const char *format,...)
log error message
Definition: log.cpp:90
err_t pause()
pause all communication.
Definition: comm.cpp:335
err_t get_channels(char *buffer)
get all names of the channels currently registered
Definition: comm.cpp:444
err_t unregister_channel(channel_t &channel)
unregister an endpoint
Definition: comm.cpp:378
static const comm::port_t port
Definition: reporter.h:15
const port_t port
port nr.
Definition: commTypes.h:164
static bool binary_to_transmitpackage(uint8_t *bin, transmitpackage_t &transp)
get tansmitpackage of binary array.
Definition: commTypes.h:125
int dump(level_t level, uint8_t *bin, size_t size)
dump array of bin data
Definition: log.cpp:135
transmit ready package.
Definition: commTypes.h:39