43 #include <boost/program_options.hpp> 48 namespace po = boost::program_options;
54 #define SHORE_TABLE_DATA_DIR "databases" 63 #define DECLARE_TRX(trxlid) \ 64 w_rc_t run_##trxlid(Request* prequest, trxlid##_input_t& in); \ 65 w_rc_t run_##trxlid(Request* prequest); \ 66 w_rc_t xct_##trxlid(const int xct_id, trxlid##_input_t& in); \ 67 void _inc_##trxlid##_att(); \ 68 void _inc_##trxlid##_failed(); \ 69 void _inc_##trxlid##_dld() 71 #define DECLARE_TABLE(table, manimpl, abbrv) \ 72 guard<manimpl> _p##abbrv##_man; \ 73 inline manimpl* abbrv##_man() { return (_p##abbrv##_man); } \ 74 guard<table> _p##abbrv##_desc; \ 75 inline table* abbrv##_desc() { return (_p##abbrv##_desc.get()); } 90 #ifdef CFG_FLUSHER // ***** Mainstream FLUSHER ***** // 93 #define DEFINE_RUN_WITH_INPUT_TRX_WRAPPER(cname,trxlid,trximpl) \ 94 w_rc_t cname::run_##trximpl(Request* prequest, trxlid##_input_t& in) { \ 95 int xct_id = prequest->xct_id(); \ 96 TRACE( TRACE_TRX_FLOW, "%d. %s ...\n", xct_id, #trximpl); \ 97 _inc_##trxlid##_att(); \ 98 w_rc_t e = xct_##trximpl(xct_id, in); \ 99 if (!e.is_error()) { \ 101 e = _pssm->commit_xct(true,&xctLastLsn); \ 102 prequest->set_last_lsn(xctLastLsn); } \ 103 if (e.is_error()) { \ 104 if (e.err_num() != smlevel_0::eDEADLOCK) \ 105 _inc_##trxlid##_failed(); \ 106 else _inc_##trxlid##_dld(); \ 108 w_rc_t e2 = _pssm->abort_xct(); \ 109 if(e2.is_error()) TRACE( TRACE_ALWAYS, "Xct (%d) abort failed [0x%x]\n", xct_id, e2.err_num()); \ 110 prequest->notify_client(); \ 111 _request_pool.destroy(prequest); \ 112 if ((*&_measure)!=MST_MEASURE) return (e); \ 113 _env_stats.inc_trx_att(); \ 115 TRACE( TRACE_TRX_FLOW, "Xct (%d) (%d) to flush\n", xct_id, prequest->tid().get_lo()); \ 116 to_base_flusher(prequest); \ 119 #else // ***** NO FLUSHER ***** // 121 #define DEFINE_RUN_WITH_INPUT_TRX_WRAPPER(cname, trxlid, trximpl) \ 122 w_rc_t cname::run_##trximpl(Request* prequest, trxlid##_input_t& in) { \ 123 int xct_id = prequest->xct_id(); \ 125 _inc_##trxlid##_att(); \ 126 w_rc_t e = xct_##trximpl(xct_id, in); \ 127 if (!e.is_error()) { \ 128 if (isAsynchCommit()) e = _pssm->commit_xct(true); \ 129 else e = _pssm->commit_xct(); } \ 130 if (e.is_error()) { \ 131 if (e.err_num() != eDEADLOCK) \ 132 _inc_##trxlid##_failed(); \ 133 else _inc_##trxlid##_dld(); \ 135 w_rc_t e2 = _pssm->abort_xct(); \ 136 if(e2.is_error()) TRACE( TRACE_ALWAYS, "Xct (%d) abort failed [0x%x]\n", xct_id, e2.err_num()); \ 137 prequest->notify_client(); \ 138 if ((*&_measure)!=MST_MEASURE) return (e); \ 139 _env_stats.inc_trx_att(); \ 142 prequest->notify_client(); \ 143 if ((*&_measure)!=MST_MEASURE) return (RCOK); \ 144 _env_stats.inc_trx_com(); \ 147 #endif // ***** EOF: CFG_FLUSHER ***** // 149 #define DEFINE_RUN_WITHOUT_INPUT_TRX_WRAPPER(cname, trxlid, trximpl) \ 150 w_rc_t cname::run_##trximpl(Request* prequest) { \ 151 trxlid##_input_t in = create_##trxlid##_input(_queried_factor, \ 152 prequest->selectedID(), prequest->tspread()); \ 153 return (run_##trximpl(prequest, in)); } 155 #define DEFINE_TRX_STATS(cname, trxlid) \ 156 void cname::_inc_##trxlid##_att() { ++my_stats.attempted.trxlid; } \ 157 void cname::_inc_##trxlid##_failed() { ++my_stats.failed.trxlid; } \ 158 void cname::_inc_##trxlid##_dld() { ++my_stats.deadlocked.trxlid; } 162 #define DEFINE_TRX(cname, trx) \ 163 DEFINE_RUN_WITHOUT_INPUT_TRX_WRAPPER(cname,trx,trx); \ 164 DEFINE_RUN_WITH_INPUT_TRX_WRAPPER(cname,trx,trx); \ 165 DEFINE_TRX_STATS(cname,trx) 167 #define CHECK_XCT_RETURN(rc, retry, ENV) \ 168 if (rc.is_error()) { \ 169 TRACE( TRACE_ALWAYS, "Error %x\n", rc.err_num()); \ 170 W_COERCE(ENV->db()->abort_xct()); \ 171 switch(rc.err_num()) { \ 175 stringstream os; os << rc << ends; \ 176 string str = os.str(); \ 177 TRACE( TRACE_ALWAYS, \ 178 "Eek! Unable to populate db due to: \n%s\n", \ 180 W_FATAL(rc.err_num()); \ 278 virtual int conf() = 0;
280 virtual int set(envVarMap* vars) = 0;
282 virtual w_rc_t load_schema() = 0;
284 virtual int init() = 0;
286 virtual int open() = 0;
288 virtual int close() = 0;
290 virtual int start() = 0;
292 virtual int stop() = 0;
294 virtual int restart() = 0;
296 virtual int pause() = 0;
298 virtual int resume() = 0;
300 virtual w_rc_t newrun() = 0;
302 virtual int statistics() = 0;
304 virtual int dump() = 0;
306 virtual int info()
const = 0;
519 virtual int restart();
527 virtual w_rc_t newrun() = 0;
529 virtual int statistics();
533 virtual int info()
const = 0;
537 virtual w_rc_t load_schema() = 0;
539 virtual w_rc_t load_data() = 0;
541 virtual w_rc_t create_tables() = 0;
543 virtual w_rc_t warmup() = 0;
545 virtual w_rc_t check_consistency() = 0;
549 virtual w_rc_t load_and_register_fids() = 0;
556 bool is_initialized();
567 _measure = aMeasurementState;
575 return stop_benchmark;
583 return (&_init_mutex);
587 return (&_vol_mutex);
591 return (&_load_mutex);
595 return (_initialized);
603 _initialized = b_is_init;
607 _loaded = b_is_loaded;
619 void set_qf(
const double aQF);
621 double get_qf()
const;
623 void set_sf(
const double aSF);
625 double get_sf()
const;
627 void print_sf()
const;
637 void set_freqs(
int insert_freq = 0,
int delete_freq = 0,
int probe_freq = 0,
int update_freq = 0);
640 void set_chkpt_freq(
int);
646 void set_archiver_opts(
bool,
bool);
649 return _enable_archiver;
653 return _enable_merger;
657 _activation_delay = d;
661 return _activation_delay;
665 void set_crash_delay(
int);
672 void set_sm_shudown_filthy(
bool);
675 virtual void set_skew(
int area,
int load,
int start_imbalance,
int skew_type,
676 bool shifting =
false);
678 virtual void reset_skew();
680 virtual void start_load_imbalance();
693 uint upd_worker_cnt();
701 virtual void env_thread_init() = 0;
703 virtual void env_thread_fini() = 0;
706 int disable_fake_disk_latency();
708 int enable_fake_disk_latency(
const int adelay);
711 void gatherstats_sm(ostream&);
716 void activate_archiver();
718 void wait_for_warmup();
725 return (&_env_stats);
729 size_t get_total_pages_to_recover();
731 size_t get_dirty_page_count();
733 bool has_log_analysis_finished();
735 size_t get_total_pages_to_restore();
737 size_t get_num_restored_pages();
740 unsigned get_trx_att()
const;
742 unsigned get_trx_com()
const;
753 virtual void print_throughput(
const double iQueriedSF,
755 const int iNumOfThreads,
756 const double delay) = 0;
758 virtual void reset_stats() = 0;
761 virtual w_rc_t run_one_xct(Request* prequest) = 0;
765 return (_asynch_commit);
768 void setAsynchCommit(
const bool bAsynch);
802 return (_bUseFlusher);
806 _bUseFlusher = bUseFlusher;
815 void to_base_flusher(Request* ar);
820 int _set_sys_params();
827 #endif // __SHORE_ENV_H bool is_archiver_enabled()
Definition: shore_env.h:648
bool isAsynchCommit() const
Definition: shore_env.h:764
void inc_trx_com()
Definition: shore_env.h:748
std::vector< WorkerPtr >::iterator WorkerIt
Definition: shore_env.h:363
int _crash_delay
Definition: shore_env.h:461
pthread_mutex_t _queried_mutex
Definition: shore_env.h:418
const w_rc_t RCOK
Definition: w_rc.h:239
blob_pool RequestStack
Definition: shore_env.h:355
int _delete_freq
Definition: shore_env.h:442
Definition: shore_env.h:333
void set_measure(const MeasurementState aMeasurementState)
Definition: shore_env.h:562
int _chkpt_freq
Definition: shore_env.h:449
int ssm_max_small_rec
Definition: shore_env.cpp:710
Definition: trx_worker.h:596
envVarMap::iterator envVarIt
Definition: shore_env.h:247
pthread_mutex_t _statmap_mutex
Definition: shore_env.h:382
sm_options _popts
Definition: shore_env.h:392
pthread_mutex_t _scaling_mutex
Definition: shore_env.h:414
bool _asynch_commit
Definition: shore_env.h:822
virtual int post_init()
Definition: shore_env.h:508
eDBControl dbc()
Definition: shore_env.h:263
bool _bUseSLI
Definition: shore_env.h:782
void set_activation_delay(int d)
Definition: shore_env.h:656
void setFlusherEnabled(const bool bUseFlusher)
Definition: shore_env.h:805
pthread_mutex_t _load_mutex
Definition: shore_env.h:380
double _queried_factor
Definition: shore_env.h:416
po::variables_map optionValues
Definition: shore_env.h:484
virtual ~db_iface()
Definition: shore_env.h:259
uint32_t StoreID
Definition: basics.h:47
RequestStack _request_pool
Definition: shore_env.h:698
unsigned inc_trx_att()
Definition: shore_env.h:205
Definition: shore_env.h:336
std::map< string, string > ParamMap
Definition: shore_env.h:351
bool get_loaded_no_cs()
Definition: shore_env.h:598
Start-up parameters for the storage engine. See OPTIONS.
Definition: sm_options.h:24
int _insert_freq
Definition: shore_env.h:440
pthread_mutex_t _last_stats_mutex
Definition: shore_env.h:384
int get_crash_delay()
Definition: shore_env.h:667
virtual w_rc_t update_partitioning()
Definition: shore_env.h:632
WorkerPool _workers
Definition: shore_env.h:395
string _sysname
Definition: shore_env.h:434
int _update_freq
Definition: shore_env.h:446
MeasurementState get_measure()
Definition: shore_env.h:570
bool _enable_archiver
Definition: shore_env.h:452
Definition: shore_env.h:335
env_stats_t * get_env_stats()
Definition: shore_env.h:724
: Base class for tables stored in Shore
string sysname()
Definition: shore_env.h:720
bool _bUseELR
Definition: shore_env.h:796
bool _enable_merger
Definition: shore_env.h:454
Definition: shore_env.h:227
This is the SHORE Storage Manager API.
Definition: sm.h:405
void setELREnabled(const bool bUseELR)
Definition: shore_env.h:791
pthread_mutex_t * get_load_mutex()
Definition: shore_env.h:590
void set_loaded_no_cs(const bool b_is_loaded)
Definition: shore_env.h:606
envVarMap::const_iterator envVarConstIt
Definition: shore_env.h:249
sm_options & get_opts()
Definition: shore_env.h:492
ShoreEnv * _g_shore_env
Definition: shore_env.cpp:52
unsigned _ntrx_att
Definition: shore_env.h:193
T atomic_fetch_add(T *object, C operand)
Definition: AtomicCounter.hpp:406
: Structures that represent user requests
skew_type_t _skew_type
Definition: shore_env.h:480
Definition: shore_env.h:226
sm_stats_t _last_sm_stats
Definition: shore_env.h:428
tatas_lock _alarm_lock
Definition: shore_env.h:476
std::array< long, enum_to_base(sm_stat_id::stat_max)> sm_stats_t
Definition: smstats.h:205
Return code for most functions and methods.
Definition: w_rc.h:87
volatile uint _measure
Definition: shore_env.h:431
trx_worker_t * WorkerPtr
Definition: shore_env.h:359
virtual w_rc_t db_fetch()
Definition: shore_env.h:688
trx_worker_t Worker
Definition: shore_env.h:357
unsigned _ntrx_com
Definition: shore_env.h:195
env_stats_t _env_stats
Definition: shore_env.h:426
void set_loaders(int l)
Definition: shore_env.h:614
map< string, string > envVarMap
Definition: shore_env.h:245
void set_stop_benchmark(bool v)
Definition: shore_env.h:578
pthread_mutex_t _init_mutex
Definition: shore_env.h:376
bool _bUseFlusher
Definition: shore_env.h:810
pthread_mutex_t * get_vol_mutex()
Definition: shore_env.h:586
bool isFlusherEnabled() const
Definition: shore_env.h:801
const int SHORE_NUM_OF_RETRIES
Definition: shore_env.h:52
Definition: shore_env.h:229
bool _initialized
Definition: shore_env.h:374
virtual int open()
Definition: shore_env.h:510
A test-and-test-and-set spinlock.
Definition: tatas.h:25
db_iface()
Definition: shore_env.h:256
bool get_init_no_cs()
Definition: shore_env.h:594
volatile unsigned int _dbc
Definition: shore_env.h:252
Definition: shore_env.h:243
int _probe_freq
Definition: shore_env.h:444
int get_activation_delay()
Definition: shore_env.h:660
void setSLIEnabled(const bool bUseSLI)
Definition: shore_env.h:777
Definition for the class skewer.
void inc_trx_att()
Definition: shore_env.h:744
Definition: shore_env.h:337
eDBControl
Definition: shore_env.h:225
bool is_merger_enabled()
Definition: shore_env.h:652
pthread_mutex_t * get_init_mutex()
Definition: shore_env.h:582
~env_stats_t()
Definition: shore_env.h:201
int _start_imbalance
Definition: shore_env.h:478
Definition: shore_env.h:228
trx_request_t Request
Definition: shore_env.h:353
bool _clobber
Definition: shore_env.h:371
bool should_stop_benchmark()
Definition: shore_env.h:574
env_stats_t()
Definition: shore_env.h:197
unsigned inc_trx_com()
Definition: shore_env.h:209
StoreID _root_iid
Definition: shore_env.h:388
ss_m * db()
Definition: shore_env.h:552
ss_m * _pssm
Definition: shore_env.h:365
virtual int pause()
Definition: shore_env.h:521
bool _loaded
Definition: shore_env.h:378
void print_env_stats() const
Definition: shore_env.cpp:70
volatile bool _bAlarmSet
Definition: shore_env.h:474
void set_clobber(bool c)
Definition: shore_env.h:610
Definition: shore_env.h:349
Definition: shore_env.h:334
Definition: shore_env.h:192
void set_init_no_cs(const bool b_is_init)
Definition: shore_env.h:602
MeasurementState
Definition: shore_env.h:332
skew_type_t
Definition: skewer.h:49
bool isSLIEnabled() const
Definition: shore_env.h:773
void set_dbc(const eDBControl adbc)
Definition: shore_env.h:267
Definition: trx_worker.h:154
int get_chkpt_freq()
Definition: shore_env.h:642
uint _worker_cnt
Definition: shore_env.h:397
bool isELREnabled() const
Definition: shore_env.h:787
std::vector< WorkerPtr > WorkerPool
Definition: shore_env.h:361
virtual int resume()
Definition: shore_env.h:524
po::variables_map & get_optionValues()
Definition: shore_env.h:496
int _loaders_to_use
Definition: shore_env.h:420
int _activation_delay
Definition: shore_env.h:458
pthread_mutex_t _vol_mutex
Definition: shore_env.h:389
std::atomic< bool > stop_benchmark
Definition: shore_env.h:482
double _scaling_factor
Definition: shore_env.h:412