Zero  0.1.0
logarchiver.h
Go to the documentation of this file.
1 #ifndef __LOGARCHIVER_H
2 #define __LOGARCHIVER_H
3 
4 #include "worker_thread.h"
5 #include "sm_base.h"
6 #include "log_consumer.h"
7 #include "logarchive_index.h"
8 #include "logarchive_writer.h"
9 #include "w_heap.h"
10 #include "log_storage.h"
11 #include "mem_mgmt.h"
12 
13 #include <queue>
14 #include <set>
15 
16 #define BOOST_FILESYSTEM_NO_DEPRECATED
17 
18 #include <boost/filesystem.hpp>
19 
20 namespace fs = boost::filesystem;
21 
22 class sm_options;
23 class LogScanner;
24 
54 class ArchiverHeap {
55 public:
56  ArchiverHeap(size_t workspaceSize);
57 
58  virtual ~ArchiverHeap();
59 
60  bool push(logrec_t* lr, bool duplicate);
61 
62  logrec_t* top();
63 
64  void pop();
65 
67  return w_heap.First().run;
68  }
69 
70  size_t size() {
71  return w_heap.NumElements();
72  }
73 
74 private:
76 
78 
80 
81  fixed_lists_mem_t::slot_t allocate(size_t length);
82 
83  struct HeapEntry {
85 
87 
89 
91 
94  : slot(slot),
95  lsn(lsn),
96  run(run),
97  pid(pid) {}
98 
100  : slot(nullptr, 0),
101  lsn(lsn_t::null),
102  run(0),
103  pid(0) {}
104 
105  friend std::ostream& operator<<(std::ostream& os,
106  const HeapEntry& e) {
107  os << "[run " << e.run << ", " << e.pid << ", " << e.lsn <<
108  ", slot(" << e.slot.address << ", " << e.slot.length << ")]";
109  return os;
110  }
111  };
112 
113  struct Cmp {
114  bool gt(const HeapEntry& a, const HeapEntry& b) const;
115  };
116 
118 
120 };
121 
141 public:
142  MergerDaemon(const sm_options&,
143  std::shared_ptr<ArchiveIndex> in,
144  std::shared_ptr<ArchiveIndex> ou = nullptr);
145 
146  virtual ~MergerDaemon() {}
147 
148  virtual void do_work();
149 
150  rc_t doMerge(unsigned level, unsigned fanin);
151 
152 private:
153  std::shared_ptr<ArchiveIndex> indir;
154 
155  std::shared_ptr<ArchiveIndex> outdir;
156 
157  unsigned _fanin;
158 
160 };
161 
226 public:
227  LogArchiver(const sm_options& options);
228 
229  LogArchiver(
230  ArchiveIndex*,
231  LogConsumer*,
232  ArchiverHeap*,
234  );
235 
236  virtual ~LogArchiver();
237 
238  virtual void run();
239 
240  void activate(lsn_t endLSN = lsn_t::null, bool wait = true);
241 
242  void shutdown();
243 
244  bool requestFlushAsync(lsn_t);
245 
246  void requestFlushSync(lsn_t);
247 
248  void archiveUntilLSN(lsn_t);
249 
250  std::shared_ptr<ArchiveIndex> getIndex() {
251  return index;
252  }
253 
255  return consumer->getNextLSN();
256  }
257 
258  void setEager(bool e) {
259  eager = e;
261  }
262 
263  bool getEager() const {
264  return eager;
265  }
266 
267  /*
268  * IMPORTANT: the block size must be a multiple of the log
269  * page size to ensure that logrec headers are not truncated
270  */
271  const static bool DFT_EAGER = true;
272 
273  const static bool DFT_READ_WHOLE_BLOCKS = true;
274 
275  const static int DFT_GRACE_PERIOD = 1000000; // 1 sec
276 
277 private:
278  std::shared_ptr<ArchiveIndex> index;
279 
281 
283 
285 
287 
288  std::atomic<bool> shutdownFlag;
289 
291 
293 
294  bool eager;
295 
297 
299 
301 
303 
304  void replacement();
305 
306  bool selection();
307 
308  void pushIntoHeap(logrec_t*, bool duplicate);
309 
310  bool waitForActivation();
311 
312  bool processFlushRequest();
313 
314  bool isLogTooSlow();
315 
316  bool shouldActivate(bool logTooSlow);
317 };
318 
319 #endif // __LOGARCHIVER_H
Definition: mem_mgmt.h:32
void atomic_thread_fence(memory_order order)
Definition: AtomicCounter.hpp:223
virtual ~ArchiverHeap()
Definition: logarchiver.cpp:175
Parses log records from a stream of binary data.
Definition: log_consumer.h:33
Definition: worker_thread.h:12
BlockAssembly * blkAssemb
Definition: logarchiver.h:284
size_t size()
Definition: logarchiver.h:70
lsn_t lsn
Definition: logarchiver.h:86
bool selfManaged
Definition: logarchiver.h:292
int slowLogGracePeriod
Definition: logarchiver.h:298
friend std::ostream & operator<<(std::ostream &os, const HeapEntry &e)
Definition: logarchiver.h:105
ArchiverControl control
Definition: logarchiver.h:290
bool readWholeBlocks
Definition: logarchiver.h:296
run_number_t run
Definition: logarchiver.h:88
lsn_t getNextConsumedLSN()
Definition: logarchiver.h:254
HeapEntry(run_number_t run, PageID pid, lsn_t lsn, fixed_lists_mem_t::slot_t slot)
Definition: logarchiver.h:92
HeapEntry()
Definition: logarchiver.h:99
Definition: thread_wrapper.h:16
virtual ~MergerDaemon()
Definition: logarchiver.h:146
fixed_lists_mem_t::slot_t slot
Definition: logarchiver.h:84
void pop()
Definition: logarchiver.cpp:266
std::atomic< bool > shutdownFlag
Definition: logarchiver.h:288
bool filledFirst
Definition: logarchiver.h:77
Start-up parameters for the storage engine. See OPTIONS.
Definition: sm_options.h:24
Definition: sm_base.h:240
static const lsn_t null
Definition: lsn.h:371
logrec_t * top()
Definition: logarchiver.cpp:282
int32_t run_number_t
Definition: basics.h:50
Component that consumes a partially-sorted log record stream and generates indexed runs from it...
Definition: logarchive_writer.h:95
fixed_lists_mem_t::slot_t allocate(size_t length)
Definition: logarchiver.cpp:179
Encapsulates all file and I/O operations on the log archive.
Definition: logarchive_index.h:94
ArchiverHeap(size_t workspaceSize)
Definition: logarchiver.cpp:168
Represents a transactional log record.
Definition: logrec.h:143
lsn_t flushReqLSN
Definition: logarchiver.h:302
MergerDaemon * merger
Definition: logarchiver.h:286
Definition: logarchiver.h:83
uint32_t PageID
Definition: basics.h:45
std::shared_ptr< ArchiveIndex > indir
Definition: logarchiver.h:153
General-purpose heap.
Definition: w_heap.h:78
Definition: logarchiver.h:113
Log Sequence Number. See Log Sequence Numbers (LSN).
Definition: lsn.h:243
lsn_t nextActLSN
Definition: logarchiver.h:300
void setEager(bool e)
Definition: logarchiver.h:258
fixed_lists_mem_t * workspace
Definition: logarchiver.h:79
bool _compression
Definition: logarchiver.h:159
Return code for most functions and methods.
Definition: w_rc.h:87
std::shared_ptr< ArchiveIndex > index
Definition: logarchiver.h:278
Cmp heapCmp
Definition: logarchiver.h:117
ArchiverHeap * heap
Definition: logarchiver.h:282
Heap< HeapEntry, Cmp > w_heap
Definition: logarchiver.h:119
Object to control execution of background threads.
Definition: log_consumer.h:134
char * address
Definition: mem_mgmt.h:33
std::shared_ptr< ArchiveIndex > getIndex()
Definition: logarchiver.h:250
PageID pid
Definition: logarchiver.h:90
run_number_t currentRun
Definition: logarchiver.h:75
Definition: AtomicCounter.hpp:114
Definition: logarchiver.h:140
bool getEager() const
Definition: logarchiver.h:263
Implementation of a log archiver using asynchronous reader and writer threads.
Definition: logarchiver.h:225
unsigned _fanin
Definition: logarchiver.h:157
bool eager
Definition: logarchiver.h:294
Provides a record-at-a-time interface to the recovery log using asynchronous read operations...
Definition: log_consumer.h:215
std::shared_ptr< ArchiveIndex > outdir
Definition: logarchiver.h:155
Heap data structure that supports log archive run generation.
Definition: logarchiver.h:54
LogConsumer * consumer
Definition: logarchiver.h:280
bool push(logrec_t *lr, bool duplicate)
Definition: logarchiver.cpp:198
size_t length
Definition: mem_mgmt.h:35
run_number_t topRun()
Definition: logarchiver.h:66
Definition: mem_mgmt.h:30