33 #ifndef __TRX_WORKER_H 34 #define __TRX_WORKER_H 36 #include <boost/program_options.hpp> 41 #undef WORKER_VERBOSE_STATS 45 #undef WORKER_VERY_VERBOSE_STATS 82 #ifdef WORKER_VERBOSE_STATS 83 void update_served(
const double serve_time_ms);
84 double _serving_total;
86 void update_rvp_exec_time(
const double rvp_exec_time);
87 void update_rvp_notify_time(
const double rvp_notify_time);
89 double _rvp_exec_time;
90 double _rvp_notify_time;
92 void update_waited(
const double queue_time);
93 double _waiting_total;
95 #ifdef WORKER_VERY_VERBOSE_STATS 112 #ifdef WORKER_VERBOSE_STATS
114 _rvp_exec(0), _rvp_exec_time(0), _rvp_notify_time(0),
116 #ifdef WORKER_VERY_VERBOSE_STATS
117 , _ww_idx(0), _last_change(0)
185 virtual int _work_PAUSED_impl();
187 virtual int _work_ACTIVE_impl() = 0;
189 virtual int _work_STOPPED_impl();
191 virtual int _pre_STOP_impl() = 0;
193 void _print_stats_impl()
const;
237 inline unsigned set_ws(
const unsigned new_ws) {
239 unsigned old_ws = *&_ws;
266 return ((*&_ws == my_ws) || (*&_ws ==
WS_LOOP));
284 unsigned old_ws = *&_ws;
383 return (_work_PAUSED_impl());
387 return (_work_ACTIVE_impl());
391 return (_work_STOPPED_impl());
399 bool abort_one_trx(
xct_t* axct);
425 template<
class Action>
455 assert (actionPtrPool);
456 _for_writers =
new ActionVec(actionPtrPool);
457 _for_readers =
new ActionVec(actionPtrPool);
458 _read_pos = _for_readers->begin();
474 return (_owner == athread);
479 return ((_read_pos == _for_readers->end()) && (*&_empty));
485 bool isEmpty = ((_read_pos == _for_readers->end()) && (*&_empty));
487 assert (_for_writers->empty());
516 if (++loopcnt > _loops) {
536 _for_readers->erase(_for_readers->begin(), _for_readers->end());
537 _for_writers->swap(*_for_readers);
541 _read_pos = _for_readers->begin();
547 if ((_read_pos == _for_readers->end()) && (!wait_for_input())) {
550 return (*(_read_pos++));
553 inline void push(Action* a,
const bool bWake) {
560 _for_writers->push_back(a);
562 queue_sz = _for_writers->size();
566 if ((queue_sz >= _thres) || bWake) {
573 void clear(
const bool removeOwner =
true) {
582 _for_writers->erase(_for_writers->begin(), _for_writers->end());
583 _for_readers->erase(_for_readers->begin(), _for_readers->end());
586 _read_pos = _for_readers->begin();
609 int _work_ACTIVE_impl();
611 int _pre_STOP_impl();
614 int _serve_action(Request* prequest);
619 const int use_sli = 0);
624 inline void enqueue(Request* arequest,
const bool bWake =
true) {
625 _pqueue->push(arequest, bWake);
628 void init(
const int lc);
631 #endif // __TRX_WORKER_H worker_stats_t & operator+=(worker_stats_t const &rhs)
Definition: shore_worker.cpp:131
eWorkingState _my_ws
Definition: trx_worker.h:444
condex _notify
Definition: trx_worker.h:165
unsigned _control
Definition: trx_worker.h:158
Simple pool class.
Definition: stl_pool.h:40
eDataOwnerState _data_owner
Definition: trx_worker.h:160
void enqueue(Request *arequest, const bool bWake=true)
Definition: trx_worker.h:624
: A simple pooled allocator
ShoreEnv * _env
Definition: trx_worker.h:168
guard< ActionVec > _for_writers
Definition: trx_worker.h:434
tatas_lock _next_lock
Definition: trx_worker.h:173
unsigned get_ws()
Definition: trx_worker.h:261
int condex_sleep()
Definition: trx_worker.h:278
base_worker_t * get_next()
Definition: trx_worker.h:218
Definition: latches.h:461
unsigned _ws
Definition: trx_worker.h:162
trx_request_t Request
Definition: trx_worker.h:598
Definition: trx_worker.h:596
void pause()
Definition: trx_worker.h:374
bool _is_bound
Definition: trx_worker.h:179
Definition: trx_worker.h:65
A transaction. Internal to the storage manager.This class may be used in a limited way for the handli...
Definition: xct.h:185
Definition: trx_worker.h:426
int work_PAUSED()
Definition: trx_worker.h:382
int _empty
Definition: trx_worker.h:442
bool set_control(const unsigned awc)
Definition: trx_worker.h:315
void setqueue(eWorkingState aws, base_worker_t *owner, const int &loops, const int &thres)
Definition: trx_worker.h:464
void push(Action *a, const bool bWake)
Definition: trx_worker.h:553
void reset_stats()
Definition: trx_worker.h:405
Definition: kits_thread.h:134
int is_empty(void) const
Definition: trx_worker.h:478
void clear(const bool removeOwner=true)
Definition: trx_worker.h:573
PooledVec< Action * >::Type ActionVec
Definition: trx_worker.h:427
void stop()
Definition: trx_worker.h:360
srmwqueue(Pool *actionPtrPool)
Definition: trx_worker.h:449
~worker_stats_t()
Definition: trx_worker.h:122
void print_and_reset()
Definition: trx_worker.h:128
void set_next(base_worker_t *apworker)
Definition: trx_worker.h:212
bool is_alone_owner()
Definition: trx_worker.h:230
bool is_sleeping(void)
Definition: trx_worker.h:269
void wait()
Definition: condex.h:67
worker_stats_t()
Definition: trx_worker.h:103
bool is_really_empty(void)
Definition: trx_worker.h:483
uint _problems
Definition: trx_worker.h:68
int work_ACTIVE()
Definition: trx_worker.h:386
#define TRACE_DEBUG
Definition: trace_types.h:58
uint _served_waiting
Definition: trx_worker.h:72
void signal()
Definition: condex.h:61
uint _mid_aborts
Definition: trx_worker.h:80
base_worker_t * _next
Definition: trx_worker.h:171
int _thres
Definition: trx_worker.h:447
a timer object.
Definition: stopwatch.h:34
unsigned set_ws(const unsigned new_ws)
Definition: trx_worker.h:237
bool wait_for_input()
Definition: trx_worker.h:493
: Structures that represent user requests
virtual ~base_worker_t()
Definition: trx_worker.h:207
void print_stats() const
Definition: shore_worker.cpp:51
void reset()
Definition: shore_worker.cpp:162
ActionVecIt _read_pos
Definition: trx_worker.h:438
const int WAITING_WINDOW
Definition: trx_worker.h:49
A test-and-test-and-set spinlock.
Definition: tatas.h:25
ActionVec::iterator ActionVecIt
Definition: trx_worker.h:429
eDataOwnerState
Definition: reqs.h:110
guard< ActionVec > _for_readers
Definition: trx_worker.h:436
base_worker_t(ShoreEnv *env, std::string tname, const int use_sli)
Definition: trx_worker.h:197
worker_stats_t _stats
Definition: trx_worker.h:176
int _use_sli
Definition: trx_worker.h:182
bool is_control(base_worker_t *athread) const
Definition: trx_worker.h:473
void condex_wakeup()
Definition: trx_worker.h:300
#define TRACE
Other modules in our program use this macro for reporting. We can use preprocessor macros like FILE a...
Definition: trace.h:91
uint _processed
Definition: trx_worker.h:66
int work_STOPPED()
Definition: trx_worker.h:390
void start()
Definition: trx_worker.h:367
~srmwqueue()
Definition: trx_worker.h:461
uint _served_input
Definition: trx_worker.h:70
guard< Queue > _pqueue
Definition: trx_worker.h:604
unsigned get_control()
Definition: trx_worker.h:311
int _loops
Definition: trx_worker.h:446
bool atomic_compare_exchange_strong(T *object, T *expected, C desired)
Definition: AtomicCounter.hpp:401
mcs_rwlock _lock
Definition: trx_worker.h:440
base_worker_t * _owner
Definition: trx_worker.h:432
const uint MINIMUM_PROCESSED
Definition: trx_worker.h:52
Definition: shore_env.h:349
const int REQUESTS_PER_WORKER_POOL_SZ
Definition: trx_worker.h:594
Action * pop()
Definition: trx_worker.h:545
uint _early_aborts
Definition: trx_worker.h:78
eWorkingState
Definition: reqs.h:93
std::vector< Value, PooledAllocator< Value > > Type
Definition: stl_pooled_alloc.h:214
uint _condex_sleep
Definition: trx_worker.h:74
uint _failed_sleep
Definition: trx_worker.h:76
bool can_continue(const unsigned my_ws)
Definition: trx_worker.h:265
Definition: trx_worker.h:154
void set_data_owner_state(const eDataOwnerState ados)
Definition: trx_worker.h:223
#define CRITICAL_SECTION(name, lock)
Definition: critical_section.h:75
Shore read-write lock:: many-reader/one-writer spin lock.
Definition: latches.h:350
guard< Pool > _actionpool
Definition: trx_worker.h:606
srmwqueue< Request > Queue
Definition: trx_worker.h:600