Zero  0.1.0
log_consumer.h
Go to the documentation of this file.
1 #ifndef __LOG_CONSUMER_H
2 #define __LOG_CONSUMER_H
3 
4 #include <bitset>
5 
6 #include "worker_thread.h"
7 #include "ringbuffer.h"
8 #include "log_storage.h"
9 
33 class LogScanner {
34 public:
35  bool nextLogrec(char* src, size_t& pos, logrec_t*& lr,
36  lsn_t* nextLSN = nullptr, lsn_t* stopLSN = nullptr,
37  int* lrLength = nullptr);
38 
39  bool hasPartialLogrec();
40 
41  void reset();
42 
44  : truncCopied(0),
45  truncMissing(0),
46  toSkip(0),
47  blockSize(blockSize) {
48  // maximum logrec size = 3 pages
49  truncBuf = new char[3 * log_storage::BLOCK_SIZE];
50  }
51 
53  delete[] truncBuf;
54  }
55 
56  size_t getBlockSize() {
57  return blockSize;
58  }
59 
61  ignore.set(type);
62  }
63 
64  void ignoreAll() {
65  ignore.set();
66  }
67 
69  ignore.reset(type);
70  }
71 
73  return ignore[type];
74  }
75 
76 private:
77  size_t truncCopied;
78 
79  size_t truncMissing;
80 
81  size_t toSkip;
82 
83  const size_t blockSize;
84 
85  char* truncBuf;
86 
87  bitset<logrec_t::t_max_logrec> ignore;
88 };
89 
135  pthread_mutex_t mutex;
136 
137  pthread_cond_t activateCond;
138 
140 
141  bool activated;
142 
143  bool listening;
144 
145  std::atomic<bool>* shutdownFlag;
146 
147  ArchiverControl(std::atomic<bool>* shutdown);
148 
149  ~ArchiverControl();
150 
151  bool activate(bool wait, lsn_t lsn = lsn_t::null);
152 
153  bool waitForActivation();
154 };
155 
173 protected:
175 
176  rc_t openPartition();
177 
179 
181 
182  off_t pos;
183 
185 
186 public:
187  virtual void do_work();
188 
189  ReaderThread(AsyncRingBuffer* readbuf, lsn_t startLSN);
190 
191  virtual ~ReaderThread() {}
192 
193  size_t getBlockSize() {
194  return buf->getBlockSize();
195  }
196 };
197 
215 class LogConsumer {
216 public:
217  LogConsumer(lsn_t startLSN, size_t blockSize, bool ignore = true);
218 
219  virtual ~LogConsumer();
220 
221  void shutdown();
222 
223  void open(lsn_t endLSN, bool readWholeBlocks = false);
224 
225  bool next(logrec_t*& lr);
226 
228  return nextLSN;
229  }
230 
231  static void initLogScanner(LogScanner* logScanner);
232 
233 private:
235 
237 
239 
241 
243 
245 
246  size_t blockSize;
247 
248  size_t pos;
249 
251 
252  bool nextBlock();
253 };
254 
255 #endif // __LOG_CONSUMER_H
size_t pos
Definition: log_consumer.h:248
Parses log records from a stream of binary data.
Definition: log_consumer.h:33
Definition: ringbuffer.h:38
Definition: log_storage.h:160
bool isIgnored(logrec_t::kind_t type)
Definition: log_consumer.h:72
void setIgnore(logrec_t::kind_t type)
Definition: log_consumer.h:60
size_t getBlockSize()
Definition: log_consumer.h:56
bool readWholeBlocks
Definition: log_consumer.h:250
ReaderThread * reader
Definition: log_consumer.h:236
LogScanner(size_t blockSize)
Definition: log_consumer.h:43
size_t truncCopied
Definition: log_consumer.h:77
std::atomic< bool > * shutdownFlag
Definition: log_consumer.h:145
size_t blockSize
Definition: log_consumer.h:246
bool listening
Definition: log_consumer.h:143
size_t toSkip
Definition: log_consumer.h:81
static const lsn_t null
Definition: lsn.h:371
uint nextPartition
Definition: log_consumer.h:174
pthread_mutex_t mutex
Definition: log_consumer.h:135
Represents a transactional log record.
Definition: logrec.h:143
char * currentBlock
Definition: log_consumer.h:244
void unsetIgnore(logrec_t::kind_t type)
Definition: log_consumer.h:68
int currentFd
Definition: log_consumer.h:180
virtual ~ReaderThread()
Definition: log_consumer.h:191
Log Sequence Number. See Log Sequence Numbers (LSN).
Definition: lsn.h:243
bool nextLogrec(char *src, size_t &pos, logrec_t *&lr, lsn_t *nextLSN=nullptr, lsn_t *stopLSN=nullptr, int *lrLength=nullptr)
Definition: log_consumer.cpp:401
off_t pos
Definition: log_consumer.h:182
lsn_t nextLSN
Definition: log_consumer.h:240
lsn_t endLSN
Definition: log_consumer.h:242
void ignoreAll()
Definition: log_consumer.h:64
bool hasPartialLogrec()
Definition: log_consumer.cpp:384
Return code for most functions and methods.
Definition: w_rc.h:87
LogScanner * logScanner
Definition: log_consumer.h:238
bool activated
Definition: log_consumer.h:141
Object to control execution of background threads.
Definition: log_consumer.h:134
const size_t blockSize
Definition: log_consumer.h:83
lsn_t getNextLSN()
Definition: log_consumer.h:227
~LogScanner()
Definition: log_consumer.h:52
void reset()
Definition: log_consumer.cpp:388
Asynchronous reader thread for the recovery log.
Definition: log_consumer.h:172
Provides a record-at-a-time interface to the recovery log using asynchronous read operations...
Definition: log_consumer.h:215
lsn_t localEndLSN
Definition: log_consumer.h:184
pthread_cond_t activateCond
Definition: log_consumer.h:137
AsyncRingBuffer * buf
Definition: log_consumer.h:178
size_t truncMissing
Definition: log_consumer.h:79
size_t getBlockSize()
Definition: log_consumer.h:193
AsyncRingBuffer * readbuf
Definition: log_consumer.h:234
size_t getBlockSize()
Definition: ringbuffer.h:56
bitset< logrec_t::t_max_logrec > ignore
Definition: log_consumer.h:87
kind_t
Definition: logrec.h:149
lsn_t endLSN
Definition: log_consumer.h:139
char * truncBuf
Definition: log_consumer.h:85
Definition: worker_thread.h:110