Zero  0.1.0
trx_worker.h
Go to the documentation of this file.
1 /* -*- mode:C++; c-basic-offset:4 -*-
2  Shore-kits -- Benchmark implementations for Shore-MT
3 
4  Copyright (c) 2007-2009
5  Data Intensive Applications and Systems Labaratory (DIAS)
6  Ecole Polytechnique Federale de Lausanne
7 
8  All Rights Reserved.
9 
10  Permission to use, copy, modify and distribute this software and
11  its documentation is hereby granted, provided that both the
12  copyright notice and this permission notice appear in all copies of
13  the software, derivative works or modified versions, and any
14  portions thereof, and that both notices appear in supporting
15  documentation.
16 
17  This code is distributed in the hope that it will be useful, but
18  WITHOUT ANY WARRANTY; without even the implied warranty of
19  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. THE AUTHORS
20  DISCLAIM ANY LIABILITY OF ANY KIND FOR ANY DAMAGES WHATSOEVER
21  RESULTING FROM THE USE OF THIS SOFTWARE.
22 */
23 
33 #ifndef __TRX_WORKER_H
34 #define __TRX_WORKER_H
35 
36 #include <boost/program_options.hpp>
37 #include "kits_thread.h"
38 #include "reqs.h"
39 #include "util/stl_pooled_alloc.h"
40 // Use this to enable verbode stats for worker threads
41 #undef WORKER_VERBOSE_STATS
42 //#define WORKER_VERBOSE_STATS
43 
44 // ditto
45 #undef WORKER_VERY_VERBOSE_STATS
46 //#define WORKER_VERY_VERBOSE_STATS
47 
48 
49 const int WAITING_WINDOW = 5; // keep track the last 5 seconds
50 
51 // A worker needs to have processed at least 10 packets to print its own stats
52 const uint MINIMUM_PROCESSED = 10;
53 
54 /********************************************************************
55  *
56  * @struct: worker_stats_t
57  *
58  * @brief: Worker statistics
59  *
60  * @note: No lock-protected. Assumes that only the assigned worker thread
61  * will modify it.
62  *
63  ********************************************************************/
64 
66  uint _processed;
67 
68  uint _problems;
69 
71 
73 
75 
77 
79 
81 
82 #ifdef WORKER_VERBOSE_STATS
83  void update_served(const double serve_time_ms);
84  double _serving_total; // in msecs
85 
86  void update_rvp_exec_time(const double rvp_exec_time);
87  void update_rvp_notify_time(const double rvp_notify_time);
88  uint _rvp_exec;
89  double _rvp_exec_time;
90  double _rvp_notify_time;
91 
92  void update_waited(const double queue_time);
93  double _waiting_total; // not only the last WAITING_WINDOW secs
94 
95 #ifdef WORKER_VERY_VERBOSE_STATS
96  double _ww[WAITING_WINDOW];
97  uint _ww_idx; // index on the ww (waiting window) ring
98  double _last_change;
99  stopwatch_t _for_last_change;
100 #endif
101 #endif
102 
104  : _processed(0),
105  _problems(0),
106  _served_input(0),
107  _served_waiting(0),
108  _condex_sleep(0),
109  _failed_sleep(0),
110  _early_aborts(0),
111  _mid_aborts(0)
112 #ifdef WORKER_VERBOSE_STATS
113  , _serving_total(0),
114  _rvp_exec(0), _rvp_exec_time(0), _rvp_notify_time(0),
115  _waiting_total(0)
116 #ifdef WORKER_VERY_VERBOSE_STATS
117  , _ww_idx(0), _last_change(0)
118 #endif
119 #endif
120  {}
121 
123 
124  void print_stats() const;
125 
126  void reset();
127 
129  print_stats();
130  reset();
131  }
132 
134 }; // EOF: worker_stats_t
135 
136 
137 
138 /********************************************************************
139  *
140  * @class: base_worker_t
141  *
142  * @brief: An smthread-based non-template abstract base class for the
143  * Shore worker threads
144  *
145  * @note: By default the worker thread is not bound to any processor.
146  * The creator of the worker thread needs to
147  * decide where and if it will bind it somewhere.
148  * @note: Implemented as a state machine.
149  *
150  ********************************************************************/
151 
152 class ShoreEnv;
153 
154 class base_worker_t : public thread_t {
155 protected:
156 
157  // status variables
158  unsigned _control;
159 
161 
162  unsigned _ws;
163 
164  // cond var for sleeping instead of looping after a while
166 
167  // data
169 
170  // needed for linked-list of workers
172 
174 
175  // statistics
177 
178  // processor binding
179  bool _is_bound;
180 
181  // sli
182  int _use_sli;
183 
184  // states
185  virtual int _work_PAUSED_impl();
186 
187  virtual int _work_ACTIVE_impl() = 0;
188 
189  virtual int _work_STOPPED_impl();
190 
191  virtual int _pre_STOP_impl() = 0;
192 
193  void _print_stats_impl() const;
194 
195 public:
196 
197  base_worker_t(ShoreEnv* env, std::string tname, const int use_sli)
198  : thread_t(tname),
199  _control(WC_PAUSED),
200  _data_owner(DOS_UNDEF),
201  _ws(WS_UNDEF),
202  _env(env),
203  _next(nullptr),
204  _is_bound(false),
205  _use_sli(use_sli) {}
206 
207  virtual ~base_worker_t() {}
208 
209  // access methods //
210 
211  // for the linked list
212  void set_next(base_worker_t* apworker) {
213  assert (apworker);
214  CRITICAL_SECTION(next_cs, _next_lock);
215  _next = apworker;
216  }
217 
219  return (_next);
220  }
221 
222  // data owner state
224  assert ((ados == DOS_ALONE) || (ados == DOS_MULTIPLE));
225  // CS TODO -- atomic necessary?
226  // atomic_swap(&_data_owner, ados);
227  _data_owner = ados;
228  }
229 
230  bool is_alone_owner() {
231  return (*&_data_owner == DOS_ALONE);
232  }
233 
234  // @brief: Set working state
235  // @note: This function can be called also by other threads
236  // (other than the worker)
237  inline unsigned set_ws(const unsigned new_ws) {
238  while (true) {
239  unsigned old_ws = *&_ws;
240  // Do not change WS, if it is already set to commit
241  if ((old_ws == WS_COMMIT_Q) && (new_ws != WS_LOOP)) {
242  return (old_ws);
243  }
244 
245  // Update WS
246  bool cas_ok =
247  lintel::unsafe::atomic_compare_exchange_strong(&_ws, &old_ws, new_ws);
248  if (cas_ok) {
249  // If cas successful, then wake up worker if sleeping
250  if ((old_ws == WS_SLEEP) && (new_ws != WS_SLEEP)) {
251  condex_wakeup();
252  }
253  return (old_ws);
254  }
255 
256  // Keep on trying
257  }
258  return _ws;
259  }
260 
261  inline unsigned get_ws() {
262  return (*&_ws);
263  }
264 
265  inline bool can_continue(const unsigned my_ws) {
266  return ((*&_ws == my_ws) || (*&_ws == WS_LOOP));
267  }
268 
269  inline bool is_sleeping(void) {
270  return (*&_ws == WS_SLEEP);
271  }
272 
273 
274 
275  // Condex functions in order to sleep/wakeup worker //
276 
277  // This function is called when the worker decides it is time to sleep
278  inline int condex_sleep() {
279  // can sleep only if in WS_LOOP
280  // (if on WS_COMMIT_Q or WS_INPUT_Q it means that a
281  // COMMIT or INPUT action was enqueued during this
282  // LOOP so there is no need to sleep).
283  while (true) {
284  unsigned old_ws = *&_ws;
285  bool cas_ok =
287  if (cas_ok) {
288  // If cas successful, then sleep
289  _notify.wait();
290  ++_stats._condex_sleep;
291  return (1);
292  }
293  }
294  ++_stats._failed_sleep;
295  return (0);
296  }
297 
298  // @note: The caller thread should have already changed the WS
299  // before calling this function
300  inline void condex_wakeup() {
301  //assert (*&_ws!=WS_SLEEP);
302  _notify.signal();
303  }
304 
305 
306 
307  // working states //
308 
309 
310  // thread control
311  inline unsigned get_control() {
312  return (*&_control);
313  }
314 
315  inline bool set_control(const unsigned awc) {
316  //
317  // Allowed transition matrix:
318  //
319  // |------------------------------------------------|
320  // |(new) | PAUSED | ACTIVE | STOPPED | RECOVERY |
321  // |(old) | | | | |
322  // |-------------------------------------|----------|
323  // |PAUSED | | Y | Y | Y |
324  // |ACTIVE | Y | | Y | Y |
325  // |STOPPED | | | Y | Y |
326  // |RECOVERY | Y | Y | Y | Y |
327  // |-------------------------------------|----------|
328  //
329  {
330  if ((*&_control == WC_PAUSED) &&
331  ((awc == WC_ACTIVE) || (awc == WC_STOPPED))) {
332  // CS TODO -- atomic necessary?
333  // atomic_swap(&_control, awc);
334  _control = awc;
335  return (true);
336  }
337 
338  if ((*&_control == WC_ACTIVE) &&
339  ((awc == WC_PAUSED) || (awc == WC_STOPPED))) {
340  // CS TODO -- atomic necessary?
341  // atomic_swap(&_control, awc);
342  _control = awc;
343  return (true);
344  }
345 
346  // Can go to recovery at any point
347  if ((*&_control == WC_RECOVERY) || (awc == WC_RECOVERY)) {
348  // CS TODO -- atomic necessary?
349  // atomic_swap(&_control, awc);
350  _control = awc;
351  return (true);
352  }
353  }
354  TRACE(TRACE_DEBUG, "Not allowed transition (%d)-->(%d)\n",
355  _control, awc);
356  return (false);
357  }
358 
359  // commands
360  void stop() {
361  set_control(WC_STOPPED);
362  if (is_sleeping()) {
363  _notify.signal();
364  }
365  }
366 
367  void start() {
368  set_control(WC_ACTIVE);
369  if (is_sleeping()) {
370  _notify.signal();
371  }
372  }
373 
374  void pause() {
375  set_control(WC_PAUSED);
376  if (is_sleeping()) {
377  _notify.signal();
378  }
379  }
380 
381  // state implementation
382  inline int work_PAUSED() {
383  return (_work_PAUSED_impl());
384  }
385 
386  inline int work_ACTIVE() {
387  return (_work_ACTIVE_impl());
388  }
389 
390  inline int work_STOPPED() {
391  return (_work_STOPPED_impl());
392  }
393 
394  // thread entrance
395  void work();
396 
397  // helper //
398 
399  bool abort_one_trx(xct_t* axct);
400 
401  void stats();
402 
403  worker_stats_t get_stats();
404 
405  void reset_stats() {
406  _stats.reset();
407  }
408 
409 private:
410 
411  // copying not allowed
413 
414  void operator=(base_worker_t const&);
415 }; // EOF: base_worker_t
416 
417 /********************************************************************
418  *
419  * @class: trx_worker_t
420  *
421  * @brief: The baseline system worker threads
422  *
423  ********************************************************************/
424 
425 template<class Action>
426 struct srmwqueue {
428 
429  typedef typename ActionVec::iterator ActionVecIt;
430 
431  // owner thread
433 
435 
437 
438  ActionVecIt _read_pos;
439 
441 
442  int _empty;
443 
445 
446  int _loops; // how many loops (spins) it will do before going to sleep (1=sleep immediately)
447  int _thres; // threshold value before waking up
448 
449  srmwqueue(Pool* actionPtrPool)
450  : _owner(nullptr),
451  _empty(true),
452  _my_ws(WS_UNDEF),
453  _loops(0),
454  _thres(0) {
455  assert (actionPtrPool);
456  _for_writers = new ActionVec(actionPtrPool);
457  _for_readers = new ActionVec(actionPtrPool);
458  _read_pos = _for_readers->begin();
459  }
460 
462 
463  // sets the pointer of the queue to the controls of a specific worker thread
464  void setqueue(eWorkingState aws, base_worker_t* owner, const int& loops, const int& thres) {
466  _my_ws = aws;
467  _owner = owner;
468  _loops = loops;
469  _thres = thres;
470  }
471 
472  // returns true if the passed control is the same
473  bool is_control(base_worker_t* athread) const {
474  return (_owner == athread);
475  }
476 
477  // !!! @note: should be called only by the reader !!!
478  inline int is_empty(void) const {
479  return ((_read_pos == _for_readers->end()) && (*&_empty));
480  }
481 
482  // The expensive version which first locks, and then checks if empty
483  bool is_really_empty(void) {
485  bool isEmpty = ((_read_pos == _for_readers->end()) && (*&_empty));
486  if (isEmpty) {
487  assert (_for_writers->empty());
488  }
489  return (isEmpty);
490  }
491 
492  // spins until new input is set
493  bool wait_for_input() {
494  assert (_owner);
495  int loopcnt = 0;
496  unsigned wc = WC_ACTIVE;
497 
498  // 1. start spinning
499  while (*&_empty) {
500 
501  wc = _owner->get_control();
502 
503  // 2. if thread was signalled to stop
504  //if ((wc != WC_ACTIVE) && (wc != WC_RECOVERY)) {
505  if (wc != WC_ACTIVE) {
506  _owner->set_ws(WS_FINISHED);
507  return (false);
508  }
509 
510  // 3. if thread was signalled to go to other queue
511  if (!_owner->can_continue(_my_ws)) {
512  return (false);
513  }
514 
515  // 4. if spinned too much, start waiting on the condex
516  if (++loopcnt > _loops) {
517  loopcnt = 0;
518 
519  //TRACE( TRACE_TRX_FLOW, "Condex sleeping (%d)...\n", _my_ws);
520  //assert (_my_ws==WS_INPUT_Q); // can sleep only on input queue
521  loopcnt = _owner->condex_sleep();
522  //TRACE( TRACE_TRX_FLOW, "Condex woke (%d) (%d)...\n", _my_ws, loopcnt);
523 
524  // after it wakes up, should do the loop again.
525  // if something has been pushed then _empty will be false
526  // and it will proceed normally.
527  // if signalled because it should stop, it will do a loop
528  // and return false.
529  // if signalled because it should go to other queue, it will
530  // do a loop and return false.
531  }
532  }
533 
534  {
536  _for_readers->erase(_for_readers->begin(), _for_readers->end());
537  _for_writers->swap(*_for_readers);
538  _empty = true;
539  }
540 
541  _read_pos = _for_readers->begin();
542  return (true);
543  }
544 
545  inline Action* pop() {
546  // pops an action from the input vector, or waits for one to show up
547  if ((_read_pos == _for_readers->end()) && (!wait_for_input())) {
548  return (nullptr);
549  }
550  return (*(_read_pos++));
551  }
552 
553  inline void push(Action* a, const bool bWake) {
554  //assert (a);
555  int queue_sz;
556 
557  // push action
558  {
560  _for_writers->push_back(a);
561  _empty = false;
562  queue_sz = _for_writers->size();
563  }
564 
565  // don't try to wake on every call. let for some requests to batch up
566  if ((queue_sz >= _thres) || bWake) {
567  // wake up if assigned worker thread sleeping
568  _owner->set_ws(_my_ws);
569  }
570  }
571 
572  // resets queue
573  void clear(const bool removeOwner = true) {
575 
576  // clear owner
577  if (removeOwner) {
578  _owner = nullptr;
579  }
580 
581  // clear lists
582  _for_writers->erase(_for_writers->begin(), _for_writers->end());
583  _for_readers->erase(_for_readers->begin(), _for_readers->end());
584 
585  // set the reading position to the beginning
586  _read_pos = _for_readers->begin();
587 
588  // the queue is empty again
589  _empty = true;
590  }
591 }; // EOF: struct srmwqueue
592 
593 
595 
596 class trx_worker_t : public base_worker_t {
597 public:
599 
601 
602 private:
603 
605 
607 
608  // states
609  int _work_ACTIVE_impl();
610 
611  int _pre_STOP_impl();
612 
613  // serves one action
614  int _serve_action(Request* prequest);
615 
616 public:
617 
618  trx_worker_t(ShoreEnv* env, std::string tname,
619  const int use_sli = 0);
620 
621  ~trx_worker_t();
622 
623  // Enqueues a request to the queue of the worker thread
624  inline void enqueue(Request* arequest, const bool bWake = true) {
625  _pqueue->push(arequest, bWake);
626  }
627 
628  void init(const int lc);
629 }; // EOF: trx_worker_t
630 
631 #endif // __TRX_WORKER_H
worker_stats_t & operator+=(worker_stats_t const &rhs)
Definition: shore_worker.cpp:131
eWorkingState _my_ws
Definition: trx_worker.h:444
condex _notify
Definition: trx_worker.h:165
unsigned _control
Definition: trx_worker.h:158
Simple pool class.
Definition: stl_pool.h:40
eDataOwnerState _data_owner
Definition: trx_worker.h:160
void enqueue(Request *arequest, const bool bWake=true)
Definition: trx_worker.h:624
: A simple pooled allocator
ShoreEnv * _env
Definition: trx_worker.h:168
guard< ActionVec > _for_writers
Definition: trx_worker.h:434
tatas_lock _next_lock
Definition: trx_worker.h:173
Definition: reqs.h:71
unsigned get_ws()
Definition: trx_worker.h:261
int condex_sleep()
Definition: trx_worker.h:278
base_worker_t * get_next()
Definition: trx_worker.h:218
Definition: latches.h:461
unsigned _ws
Definition: trx_worker.h:162
trx_request_t Request
Definition: trx_worker.h:598
Definition: trx_worker.h:596
void pause()
Definition: trx_worker.h:374
bool _is_bound
Definition: trx_worker.h:179
Definition: trx_worker.h:65
A transaction. Internal to the storage manager.This class may be used in a limited way for the handli...
Definition: xct.h:185
Definition: reqs.h:73
Definition: trx_worker.h:426
int work_PAUSED()
Definition: trx_worker.h:382
int _empty
Definition: trx_worker.h:442
Definition: reqs.h:95
bool set_control(const unsigned awc)
Definition: trx_worker.h:315
void setqueue(eWorkingState aws, base_worker_t *owner, const int &loops, const int &thres)
Definition: trx_worker.h:464
void push(Action *a, const bool bWake)
Definition: trx_worker.h:553
Definition: reqs.h:278
void reset_stats()
Definition: trx_worker.h:405
Definition: kits_thread.h:134
int is_empty(void) const
Definition: trx_worker.h:478
void clear(const bool removeOwner=true)
Definition: trx_worker.h:573
Definition: reqs.h:72
PooledVec< Action * >::Type ActionVec
Definition: trx_worker.h:427
void stop()
Definition: trx_worker.h:360
Definition: reqs.h:96
srmwqueue(Pool *actionPtrPool)
Definition: trx_worker.h:449
~worker_stats_t()
Definition: trx_worker.h:122
void print_and_reset()
Definition: trx_worker.h:128
Definition: reqs.h:99
void set_next(base_worker_t *apworker)
Definition: trx_worker.h:212
bool is_alone_owner()
Definition: trx_worker.h:230
Definition: reqs.h:111
bool is_sleeping(void)
Definition: trx_worker.h:269
Definition: reqs.h:97
void wait()
Definition: condex.h:67
worker_stats_t()
Definition: trx_worker.h:103
bool is_really_empty(void)
Definition: trx_worker.h:483
uint _problems
Definition: trx_worker.h:68
int work_ACTIVE()
Definition: trx_worker.h:386
#define TRACE_DEBUG
Definition: trace_types.h:58
uint _served_waiting
Definition: trx_worker.h:72
void signal()
Definition: condex.h:61
Definition: reqs.h:94
uint _mid_aborts
Definition: trx_worker.h:80
base_worker_t * _next
Definition: trx_worker.h:171
int _thres
Definition: trx_worker.h:447
a timer object.
Definition: stopwatch.h:34
unsigned set_ws(const unsigned new_ws)
Definition: trx_worker.h:237
bool wait_for_input()
Definition: trx_worker.h:493
Definition: reqs.h:113
: Structures that represent user requests
virtual ~base_worker_t()
Definition: trx_worker.h:207
void print_stats() const
Definition: shore_worker.cpp:51
void reset()
Definition: shore_worker.cpp:162
ActionVecIt _read_pos
Definition: trx_worker.h:438
const int WAITING_WINDOW
Definition: trx_worker.h:49
A test-and-test-and-set spinlock.
Definition: tatas.h:25
ActionVec::iterator ActionVecIt
Definition: trx_worker.h:429
eDataOwnerState
Definition: reqs.h:110
guard< ActionVec > _for_readers
Definition: trx_worker.h:436
base_worker_t(ShoreEnv *env, std::string tname, const int use_sli)
Definition: trx_worker.h:197
worker_stats_t _stats
Definition: trx_worker.h:176
int _use_sli
Definition: trx_worker.h:182
bool is_control(base_worker_t *athread) const
Definition: trx_worker.h:473
void condex_wakeup()
Definition: trx_worker.h:300
#define TRACE
Other modules in our program use this macro for reporting. We can use preprocessor macros like FILE a...
Definition: trace.h:91
uint _processed
Definition: trx_worker.h:66
int work_STOPPED()
Definition: trx_worker.h:390
void start()
Definition: trx_worker.h:367
~srmwqueue()
Definition: trx_worker.h:461
uint _served_input
Definition: trx_worker.h:70
Definition: reqs.h:70
guard< Queue > _pqueue
Definition: trx_worker.h:604
unsigned get_control()
Definition: trx_worker.h:311
int _loops
Definition: trx_worker.h:446
bool atomic_compare_exchange_strong(T *object, T *expected, C desired)
Definition: AtomicCounter.hpp:401
mcs_rwlock _lock
Definition: trx_worker.h:440
Definition: condex.h:37
base_worker_t * _owner
Definition: trx_worker.h:432
const uint MINIMUM_PROCESSED
Definition: trx_worker.h:52
Definition: shore_env.h:349
const int REQUESTS_PER_WORKER_POOL_SZ
Definition: trx_worker.h:594
Action * pop()
Definition: trx_worker.h:545
uint _early_aborts
Definition: trx_worker.h:78
eWorkingState
Definition: reqs.h:93
std::vector< Value, PooledAllocator< Value > > Type
Definition: stl_pooled_alloc.h:214
uint _condex_sleep
Definition: trx_worker.h:74
uint _failed_sleep
Definition: trx_worker.h:76
bool can_continue(const unsigned my_ws)
Definition: trx_worker.h:265
Definition: trx_worker.h:154
void set_data_owner_state(const eDataOwnerState ados)
Definition: trx_worker.h:223
#define CRITICAL_SECTION(name, lock)
Definition: critical_section.h:75
Shore read-write lock:: many-reader/one-writer spin lock.
Definition: latches.h:350
guard< Pool > _actionpool
Definition: trx_worker.h:606
Definition: reqs.h:112
srmwqueue< Request > Queue
Definition: trx_worker.h:600