Zero  0.1.0
daemons.h
Go to the documentation of this file.
1 /* -*- mode:C++; c-basic-offset:4 -*-
2  Shore-kits -- Benchmark implementations for Shore-MT
3 
4  Copyright (c) 2007-2009
5  Data Intensive Applications and Systems Labaratory (DIAS)
6  Ecole Polytechnique Federale de Lausanne
7 
8  All Rights Reserved.
9 
10  Permission to use, copy, modify and distribute this software and
11  its documentation is hereby granted, provided that both the
12  copyright notice and this permission notice appear in all copies of
13  the software, derivative works or modified versions, and any
14  portions thereof, and that both notices appear in supporting
15  documentation.
16 
17  This code is distributed in the hope that it will be useful, but
18  WITHOUT ANY WARRANTY; without even the implied warranty of
19  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. THE AUTHORS
20  DISCLAIM ANY LIABILITY OF ANY KIND FOR ANY DAMAGES WHATSOEVER
21  RESULTING FROM THE USE OF THIS SOFTWARE.
22 */
23 
31 #ifndef __DAEMONS_H
32 #define __DAEMONS_H
33 
34 #include "sm_vas.h"
35 
36 #include "table_man.h"
37 #include "shore_env.h"
38 #include "kits_thread.h"
39 
40 class ShoreEnv;
41 
42 /******************************************************************
43  *
44  * @class: db_init_smt_t
45  *
46  * @brief: An smthread inherited class that it is used for initiating
47  * the Shore environment
48  *
49  ******************************************************************/
50 
51 class db_init_smt_t : public thread_t {
52 private:
54 
55  int _rv;
56 
57 public:
58 
59  db_init_smt_t(std::string tname, ShoreEnv* db);
60 
62 
63  void work();
64 
65  int rv();
66 }; // EOF: db_init_smt_t
67 
68 
69 /******************************************************************
70  *
71  * @class: checkpointer_t
72  *
73  * @brief: An smthread inherited class that it is used for taking
74  * periodic checkpoints during loading and measurements.
75  * It is currently also used to activate the archiver and
76  * merger daemons. (TODO) In the future, we should use a
77  * generic "timer" service to control all system daemons:
78  * - Checkpointing
79  * - Page cleaner (or disk-to-disk propagation)
80  * - Log archiver
81  * - Archive merger
82  * - Backup
83  * - Space reclamation, etc.
84  *
85  ******************************************************************/
86 
87 
88 class checkpointer_t : public thread_t {
89 private:
91 
92  bool _active;
93 
94 public:
96  : thread_t("checkpointer"),
97  _env(env),
98  _active(true) {}
99 
100  void set_active(bool active) {
101  _active = active;
102  }
103 
104  void work();
105 };
106 
107 /******************************************************************
108  *
109  * @class: crasher_t
110  *
111  * @brief: An smthread inherited class that it is used for simulating
112  * a crash for recovery testing purposes. The thread simply
113  * calls abort after a certain number of seconds.
114  *
115  ******************************************************************/
116 
117 
118 class crasher_t : public thread_t {
119 private:
120  int _timeout;
121 
122 public:
123  crasher_t(int t)
124  : thread_t("crasher"),
125  _timeout(t) {}
126 
127  void work();
128 };
129 
130 /******************************************************************
131  *
132  * @class: table_loading_smt_t
133  *
134  * @brief: An smthread inherited class that it is used for spawning
135  * multiple table loading threads.
136  *
137  ******************************************************************/
138 
140 protected:
141 
143 
145 
146  const int _sf;
147 
148  const char* _datadir;
149 
150  int _rv;
151 
152 public:
153 
154  table_loading_smt_t(std::string tname, ss_m* assm,
155  table_desc_t* atable,
156  const int asf, const char* adatadir)
157  : thread_t(tname),
158  _pssm(assm),
159  _ptable(atable),
160  _sf(asf),
161  _datadir(adatadir) {
162  assert (_pssm);
163  assert (_ptable);
164  assert (_sf);
165  assert (_datadir);
166  }
167 
168  virtual ~table_loading_smt_t() {}
169 
170  // thread entrance
171  virtual void work() = 0;
172 
173  inline int rv() {
174  return (_rv);
175  }
176 
177  inline table_desc_t* table() {
178  return (_ptable);
179  }
180 }; // EOF: table_loading_smt_t
181 
182 
183 /******************************************************************
184  *
185  * @class: index_loading_smt_t
186  *
187  * @brief: An smthread inherited class that it is used for helping
188  * the index loading.
189  *
190  * @note: Thread for helping the index loading. In order to do the
191  * index loading we need to open an iterator on the main table.
192  * Unfortunately, we cannot commit while loading, cause the
193  * iterator closes automatically.
194  *
195  ******************************************************************/
196 
197 #if 0 // CS TODO
198 
199 template <class TableDesc>
200 class index_loading_smt_t : public thread_t
201 {
202  typedef table_row_t table_tuple;
203  typedef table_man_t<TableDesc> table_manager;
204 
205 private:
206 
207  ss_m* _pssm;
208  table_manager* _pmanager;
209  index_desc_t* _pindex;
210  int _t_count;
211  int _rv;
212 
213 
214 public:
215 
216  table_tuple* _ptuple;
217  mcs_lock _cs_mutex; /* (?) */
218 
219  bool _has_to_consume;
220  bool _start;
221  bool _finish;
222 
223 
224  index_loading_smt_t(std::string tname, ss_m* assm, table_manager* aptable_manager,
225  index_desc_t* apindex, table_tuple* aptuple)
226  : thread_t(tname), _pssm(assm), _pmanager(aptable_manager),
227  _pindex(apindex), _t_count(0), _ptuple(aptuple),
228  _has_to_consume(false), _start(false), _finish(false)
229  {
230  assert (_pssm);
231  assert (_pmanager);
232  assert (_pindex);
233  assert (_ptuple);
234  }
235 
236  ~index_loading_smt_t()
237  {}
238 
239  inline int rv() { return (_rv); }
240 
241  w_rc_t do_help()
242  {
243  assert (_pmanager);
244  assert (_pindex);
245 
246  char* pdest = nullptr;
247  int bufsz = 0;
248  int key_sz = 0;
249  int mark = COMMIT_ACTION_COUNT;
250  bool cons_happened = false;
251  int ispin = 0;
252 
253  {
254  spinlock_write_critical_section cs(&_cs_mutex);
255  // CS: used to be CRITICAL_SECTION followed by pause()
256  // Since I don't understant the code completely, I just
257  // use the equivalent Zero idiom. Note that pause() and
258  // release() do nothing other than release and aquire,
259  // respectively
260  }
261 
262  while(!_start) {
263  ispin++;
264  }
265 
266  W_DO(_pssm->begin_xct());
267 
268  while (true) {
269 
270  {
271  spinlock_write_critical_section cs(&_cs_mutex);
272 
273  if (_has_to_consume) {
274  // if new row waiting
275 
276  // if signalled to finish
277  if (_finish)
278  break;
279 
280  //*** CONSUME ***//
281 
282  key_sz = _pmanager->format_key(_pindex, _ptuple, *_ptuple->_rep);
283  assert (pdest); // if nullptr invalid key
284 
285  W_DO(_pssm->create_assoc(_pindex->stid(),
286  vec_t(pdest, key_sz),
287  vec_t(&(_ptuple->_rid), sizeof(rid_t))));
288 
289  _has_to_consume = false;
290  cons_happened = true; // a consumption just happened
291  }
292 
293  }
294  //*** EOF: CS ***//
295 
296  if (cons_happened) {
297  // It just consumed a row, increase the counters
298  _t_count++;
299 
300  if (_t_count >= mark) {
301  W_DO(_pssm->commit_xct());
302 
303  if ((_t_count % 100000) == 0) { // every 100K
304  TRACE( TRACE_ALWAYS, "index(%s): %d\n",
305  _pindex->name(), _t_count);
306  }
307  else {
308  TRACE( TRACE_TRX_FLOW, "index(%s): %d\n",
309  _pindex->name(), _t_count);
310  }
311 
312  W_DO(_pssm->begin_xct());
313  mark += COMMIT_ACTION_COUNT;
314  }
315  cons_happened = false;
316  }
317  }
318  // final commit
319  W_DO(_pssm->commit_xct());
320 
321  // if we reached this point everything went ok
322  return (RCOK);
323  }
324 
325 
326  // thread entrance
327  void work() {
328  w_rc_t e = do_help();
329  if (e.is_error()) {
330  TRACE( TRACE_ALWAYS, "Index (%s) loading aborted [0x%x]\n",
331  _pindex->name(), e.err_num());
332 
333  int iretries = 0;
334  w_rc_t abrt_rc = _pssm->abort_xct();
335 
336  while (!abrt_rc.is_error()) {
337  iretries++;
338  abrt_rc = _pssm->abort_xct();
339  if (iretries > SHORE_NUM_OF_RETRIES)
340  break;
341  }
342 
343  _rv = 1;
344  return;
345  }
346 
347  // the do_help() function exits _finish should be set to true
348  assert (_finish);
349 
350  // if reached this point everything was ok
351  _rv = 0;
352  }
353 
354  int count() { return (_t_count); }
355 
356 }; // EOF: index_loading_smt_t
357 
358 
359 
360 /******************************************************************
361  *
362  * @class table_checking_smt_t
363  *
364  * @brief An smthread inherited class that it is used for spawning
365  * multiple table checking consistency threads.
366  *
367  ******************************************************************/
368 
369 class table_checking_smt_t : public thread_t
370 {
371 protected:
372 
373  ss_m* _pssm;
374  table_desc_t* _ptable;
375 
376 public:
377 
378  table_checking_smt_t(std::string tname, ss_m* pssm,
379  table_desc_t* atable)
380  : thread_t(tname), _pssm(pssm), _ptable(atable)
381  {
382  assert (_pssm);
383  assert (_ptable);
384  }
385 
386  virtual ~table_checking_smt_t() { }
387 
388  // thread entrance
389  virtual void work()=0;
390 
391 }; // EOF: table_checking_smt_t
392 
393 
394 template <class TableDesc>
395 class table_checking_smt_impl : public table_checking_smt_t
396 {
397 private:
398  table_man_t<TableDesc>* _pmanager;
399 
400 public:
401 
402  table_checking_smt_impl(std::string tname, ss_m* pssm,
403  table_man_t<TableDesc>* amanager,
404  TableDesc* atable)
405  : table_checking_smt_t(tname, pssm, atable), _pmanager(amanager)
406  {
407  assert (_pmanager);
408  }
409 
410  ~table_checking_smt_impl() { }
411 
412  // thread entrance
413  void work() {
414  TRACE( TRACE_DEBUG, "Checking (%s)\n", _ptable->name());
415 
416  //if (!_pmanager->check_all_indexes(_pssm)) {
417  // w_rc_t e = _pmanager->check_all_indexes_together(_pssm);
418  // if (e.is_error()) {
419  // TRACE( TRACE_DEBUG, "Inconsistency in (%s)\n", _ptable->name());
420  // }
421  // else {
422  // TRACE( TRACE_DEBUG, "(%s) OK...\n", _ptable->name());
423  // }
424  }
425 
426 }; // EOF: table_checking_smt_impl
427 
428 
429 
430 /******************************************************************
431  *
432  * @class close_smt_t
433  *
434  * @brief An smthread inherited class that it is used just for
435  * closing the database.
436  *
437  ******************************************************************/
438 
439 class close_smt_t : public thread_t {
440 private:
441  ShoreEnv* _env;
442 
443 public:
444  int _rv;
445 
446  close_smt_t(ShoreEnv* env, std::string tname)
447  : thread_t(tname),
448  _env(env), _rv(0)
449  {}
450 
451  ~close_smt_t() {}
452 
453  void work();
454 
458  inline int retval() { return (_rv); }
459 
460 }; // EOF: close_smt_t
461 
462 
463 
464 /******************************************************************
465  *
466  * @class dump_smt_t
467  *
468  * @brief An smthread inherited class that it is used just for
469  * dumping the database.
470  *
471  ******************************************************************/
472 
473 class dump_smt_t : public thread_t
474 {
475 private:
476  ShoreEnv* _env;
477 
478 public:
479  int _rv;
480 
481  dump_smt_t(ShoreEnv* env, std::string tname)
482  : thread_t(tname),
483  _env(env), _rv(0)
484  {}
485 
486  ~dump_smt_t() {}
487 
488  void work();
489 
493  inline int retval() { return (_rv); }
494 
495 }; // EOF: dump_smt_t
496 
497 #endif
498 
499 /******************************************************************
500  *
501  * @class: abort_smt_t
502  *
503  * @brief: An smthread inherited class that it is used just for
504  * aborting a list of transactions
505  *
506  ******************************************************************/
507 
508 class abort_smt_t : public thread_t {
509 private:
511 
512 public:
513 
514  vector<xct_t*>* _toabort;
515 
516  uint _aborted;
517 
518  abort_smt_t(std::string tname, ShoreEnv* env, vector<xct_t*>& toabort);
519 
520  ~abort_smt_t();
521 
522  void work();
523 }; // EOF: abort_smt_t
524 
525 #endif // __DAEMONS_H
526 
crasher_t(int t)
Definition: daemons.h:123
vector< xct_t * > * _toabort
Definition: daemons.h:514
w_error_codes err_num() const
Definition: w_rc.h:510
bool _active
Definition: daemons.h:92
Definition: table_desc.h:122
Definition: daemons.h:508
const w_rc_t RCOK
Definition: w_rc.h:239
bool is_error() const
True if this return code is not RCOK or equivalent. This must be called for every w_rc_t before destr...
Definition: w_rc.h:505
static rc_t begin_xct(int timeout=timeout_t::WAIT_SPECIFIED_BY_THREAD)
Begin a transaction.
Definition: sm.cpp:522
Definition: latches.h:461
static rc_t create_assoc(StoreID stid, const w_keystr_t &key, const vec_t &el)
Create an entry in a B+-Tree index.
Definition: smindex.cpp:52
int _timeout
Definition: daemons.h:120
int _rv
Definition: daemons.h:55
table_desc_t * table()
Definition: daemons.h:177
Definition: daemons.h:139
int _rv
Definition: daemons.h:150
const char * _datadir
Definition: daemons.h:148
Definition: kits_thread.h:134
static rc_t abort_xct(sm_stats_t *&stats)
Abort an instrumented transaction and get its statistics.
Definition: sm.cpp:583
: Definition of a Shore environment (database)
static rc_t commit_xct(bool lazy=false, lsn_t *plastlsn=nullptr)
Commit a transaction.
Definition: sm.cpp:567
#define TRACE_TRX_FLOW
Definition: trace_types.h:55
void set_active(bool active)
Definition: daemons.h:100
This is the SHORE Storage Manager API.
Definition: sm.h:405
Definition: daemons.h:118
ShoreEnv * _env
Definition: daemons.h:53
#define TRACE_DEBUG
Definition: trace_types.h:58
checkpointer_t(ShoreEnv *env)
Definition: daemons.h:95
ss_m * _pssm
Definition: daemons.h:142
db_init_smt_t(std::string tname, ShoreEnv *db)
Definition: daemons.cpp:43
void work()
Definition: daemons.cpp:51
thread_t(const std::string &name)
Definition: kits_thread.cpp:93
Definition: daemons.h:88
Return code for most functions and methods.
Definition: w_rc.h:87
~db_init_smt_t()
Definition: daemons.cpp:49
Definition: index_desc.h:60
StoreID & stid()
Definition: index_desc.h:117
An MCS queuing spinlock.
Definition: mcs_lock.h:61
table_desc_t * _ptable
Definition: daemons.h:144
const int SHORE_NUM_OF_RETRIES
Definition: shore_env.h:52
ShoreEnv * _env
Definition: daemons.h:90
table_loading_smt_t(std::string tname, ss_m *assm, table_desc_t *atable, const int asf, const char *adatadir)
Definition: daemons.h:154
#define TRACE
Other modules in our program use this macro for reporting. We can use preprocessor macros like FILE a...
Definition: trace.h:91
Vector: a set of {pointer,length} pairs for memory manipulation.
Definition: vec_t.h:313
ShoreEnv * _env
Definition: daemons.h:510
int rv()
Definition: daemons.h:173
#define W_DO(x)
Call a method or function x. This macro is the normal idiom for calling a method or function...
Definition: w_rc.h:304
: Base class for tables stored in Shore
string name() const
Definition: index_desc.h:101
#define TRACE_ALWAYS
Definition: trace_types.h:41
virtual ~table_loading_smt_t()
Definition: daemons.h:168
Definition: shore_env.h:349
uint _aborted
Definition: daemons.h:516
const int _sf
Definition: daemons.h:146
Definition: row.h:147
int rv()
Definition: daemons.cpp:67
Definition: daemons.h:51
Definition: table_man.h:117
const unsigned int COMMIT_ACTION_COUNT
Definition: file_desc.h:51