Zero  0.1.0
logarchive_index.h
Go to the documentation of this file.
1 #ifndef __LOGARCHIVE_INDEX_H
2 #define __LOGARCHIVE_INDEX_H
3 
4 #include <vector>
5 #include <list>
6 #include <unordered_map>
7 
8 #define BOOST_FILESYSTEM_NO_DEPRECATED
9 
10 #include <boost/filesystem.hpp>
11 
12 namespace fs = boost::filesystem;
13 
14 #include "basics.h"
15 #include "latches.h"
16 #include "lsn.h"
17 #include "sm_options.h"
18 
19 class RunRecycler;
20 
21 struct RunId {
23 
25 
26  unsigned level;
27 
28  bool operator==(const RunId& other) const {
29  return beginLSN == other.beginLSN && endLSN == other.endLSN
30  && level == other.level;
31  }
32 };
33 
34 // Controls access to a single run file through mmap
35 struct RunFile {
37 
38  int fd;
39 
40  int refcount;
41 
42  char* data;
43 
44  size_t length;
45 
46  RunFile() : fd(-1),
47  refcount(0),
48  data(nullptr),
49  length(0) {}
50 
51  char* getOffset(off_t offset) const {
52  return data + offset;
53  }
54 };
55 
56 namespace std {
59  template<>
60  struct hash<RunId> {
62  using result_type = std::size_t;
63 
65  result_type const h1(std::hash<lsn_t>()(a.beginLSN));
66  result_type const h2(std::hash<lsn_t>()(a.endLSN));
67  result_type const h3(std::hash<unsigned>()(a.level));
68  return ((h1 ^ (h2 << 1)) >> 1) ^ (h3 << 1);
69  }
70  };
71 }
72 
94 class ArchiveIndex {
95 public:
96  ArchiveIndex(const sm_options& options);
97 
98  virtual ~ArchiveIndex();
99 
100  struct BlockEntry {
101  size_t offset;
103  };
104 
105  struct BlockHeader {
106  uint32_t entries;
107  uint32_t blockNumber;
108  };
109 
110  struct RunInfo {
112 
113  // lastLSN must be equal to firstLSN of the following run. We keep
114  // it redundantly so that index probes don't have to look beyond
115  // the last finished run. We used to keep a global lastLSN field in
116  // the index, but there can be a race between the writer thread
117  // inserting new runs and probes on the last finished, so it was
118  // removed.
120 
121  // Used as a filter to avoid unneccessary probes on older runs
123 
124  std::vector<BlockEntry> entries;
125 
126  bool operator<(const RunInfo& other) const {
127  return firstLSN < other.firstLSN;
128  }
129  };
130 
131  size_t getBlockSize() const {
132  return blockSize;
133  }
134 
135  std::string getArchDir() const {
136  return archdir;
137  }
138 
139  lsn_t getLastLSN();
140 
141  lsn_t getLastLSN(unsigned level);
142 
143  lsn_t getFirstLSN(unsigned level);
144 
145  // run generation methods
146  rc_t openNewRun(unsigned level);
147 
148  rc_t append(char* data, size_t length, unsigned level);
149 
150  rc_t closeCurrentRun(lsn_t runEndLSN, unsigned level, PageID maxPID = 0);
151 
152  // run scanning methods
153  RunFile* openForScan(const RunId& runid);
154 
155  void closeScan(const RunId& runid);
156 
157  rc_t readBlock(int fd, char* buf, size_t& offset, size_t readSize = 0);
158 
159  void listFiles(std::vector<std::string>& list, int level = -1);
160 
161  void listFileStats(std::list<RunId>& list, int level = -1);
162 
163  void deleteRuns(unsigned replicationFactor = 0);
164 
165  size_t getSkipLogrecSize() const;
166 
167  static bool parseRunFileName(string fname, RunId& fstats);
168 
169  static size_t getFileSize(int fd);
170 
171  void newBlock(const vector<pair<PageID, size_t>>& buckets, unsigned level);
172 
173  rc_t finishRun(lsn_t first, lsn_t last, PageID maxPID,
174  int fd, off_t offset, unsigned level);
175 
176  template<class Input>
177  void probe(std::vector<Input>&, PageID, PageID, lsn_t startLSN,
179 
180  void getBlockCounts(RunFile*, size_t* indexBlocks, size_t* dataBlocks);
181 
182  void loadRunInfo(RunFile*, const RunId&);
183 
184  void startNewRun(unsigned level);
185 
186  unsigned getMaxLevel() const {
187  return maxLevel;
188  }
189 
190  size_t getBucketSize() {
191  return bucketSize;
192  }
193 
194  size_t getRunCount(unsigned level) {
195  if (level > maxLevel) {
196  return 0;
197  }
198  return runs[level].size();
199  }
200 
201  void dumpIndex(ostream& out);
202 
203  void dumpIndex(ostream& out, const RunId& runid);
204 
205  template<class OutputIter>
206  void listRunsNonOverlapping(OutputIter out) {
207  auto level = maxLevel;
208  auto startLSN = lsn_t::null;
209 
210  // Start collecting runs on the max level, which has the largest runs
211  // and therefore requires the least random reads
212  while (level > 0) {
213  auto index = findRun(startLSN, level);
214 
215  while ((int)index <= lastFinished[level]) {
216  auto& run = runs[level][index];
217  out = RunId{run.firstLSN, run.lastLSN, level};
218  startLSN = run.lastLSN;
219  index++;
220  }
221 
222  level--;
223  }
224  }
225 
226 private:
227 
228  void appendNewRun(unsigned level);
229 
230  size_t findRun(lsn_t lsn, unsigned level);
231 
232  // binary search
233  size_t findEntry(RunInfo* run, PageID pid,
234  int from = -1, int to = -1);
235 
236  rc_t serializeRunInfo(RunInfo&, int fd, off_t);
237 
238  lsn_t roundToEndLSN(lsn_t lsn, unsigned level);
239 
240 private:
241  std::string archdir;
242 
243  std::vector<int> appendFd;
244 
245  std::vector<off_t> appendPos;
246 
247  size_t blockSize;
248 
249  fs::path archpath;
250 
251  // Run information for each level of the index
252  std::vector<std::vector<RunInfo>> runs;
253 
254  // Last finished run on each level -- this is required because runs are
255  // generated asynchronously, so that a new one may be appended to the
256  // index before the last one is finished. Thus, when calling finishRun(),
257  // we cannot simply take the last run in the vector.
258  std::vector<int> lastFinished;
259 
268  size_t bucketSize;
269 
270  unsigned maxLevel;
271 
272  std::unique_ptr<RunRecycler> runRecycler;
273 
274  mutable srwlock_t _mutex;
275 
277  std::unordered_map<RunId, RunFile> _open_files;
278 
280 
281  bool directIO;
282 
283  fs::path make_run_path(lsn_t begin, lsn_t end, unsigned level = 1) const;
284 
285  fs::path make_current_run_path(unsigned level) const;
286 
287 public:
288  const static string RUN_PREFIX;
289 
290  const static string CURR_RUN_PREFIX;
291 
292  const static string run_regex;
293 
294  const static string current_regex;
295 };
296 
297 template<class Input>
298 void ArchiveIndex::probe(std::vector<Input>& inputs,
299  PageID startPID, PageID endPID, lsn_t startLSN, lsn_t endLSN) {
300  spinlock_read_critical_section cs(&_mutex);
301 
302  Input input;
303  input.endPID = endPID;
304  unsigned level = maxLevel;
305  inputs.clear();
306 
307  while (level > 0) {
308  size_t index = findRun(startLSN, level);
309 
310  while ((int)index <= lastFinished[level]) {
311  auto& run = runs[level][index];
312  index++;
313  startLSN = run.lastLSN;
314 
315  if (!endLSN.is_null() && startLSN >= endLSN) {
316  return;
317  }
318 
319  if (startPID > run.maxPID) {
320  continue;
321  }
322 
323  if (run.entries.size() > 0) {
324  size_t entryBegin = findEntry(&run, startPID);
325 
326  // CS TODO this if could just be run.entris[entryBEgin].pid >= endPID
327  if (bucketSize == 1 && startPID == endPID - 1 &&
328  run.entries[entryBegin].pid != startPID) {
329  // With bucket size one, we know precisely which PIDs are contained
330  // in this run, so what we have is a filter with 100% precision
331  continue;
332  }
333 
334  input.pos = run.entries[entryBegin].offset;
335  input.runFile =
336  openForScan(RunId{run.firstLSN, run.lastLSN, level});
337  w_assert1(input.pos < input.runFile->length);
338  inputs.push_back(input);
339  }
340  }
341 
342  level--;
343  }
344 }
345 
346 #endif // __LOGARCHIVE_INDEX_H
unsigned maxLevel
Definition: logarchive_index.h:270
PageID maxPID
Definition: logarchive_index.h:122
static const string RUN_PREFIX
Definition: logarchive_index.h:288
static const string run_regex
Definition: logarchive_index.h:292
size_t getRunCount(unsigned level)
Definition: logarchive_index.h:194
size_t blockSize
Definition: logarchive_index.h:247
unsigned level
Definition: logarchive_index.h:26
RunFile()
Definition: logarchive_index.h:46
size_t length
Definition: logarchive_index.h:44
static const string current_regex
Definition: logarchive_index.h:294
#define w_assert1(x)
Level 1 should not add significant extra time.
Definition: w_base.h:198
unsigned getMaxLevel() const
Definition: logarchive_index.h:186
std::string getArchDir() const
Definition: logarchive_index.h:135
std::size_t result_type
Definition: logarchive_index.h:62
STL namespace.
char * getOffset(off_t offset) const
Definition: logarchive_index.h:51
int fd
Definition: logarchive_index.h:38
std::vector< BlockEntry > entries
Definition: logarchive_index.h:124
Definition: latches.h:447
lsn_t lastLSN
Definition: logarchive_index.h:119
Start-up parameters for the storage engine. See OPTIONS.
Definition: sm_options.h:24
std::unordered_map< RunId, RunFile > _open_files
Cache for open files (for scans only)
Definition: logarchive_index.h:277
static const lsn_t null
Definition: lsn.h:371
lsn_t endLSN
Definition: logarchive_index.h:24
Definition: logarchive_index.cpp:20
Encapsulates all file and I/O operations on the log archive.
Definition: logarchive_index.h:94
std::string archdir
Definition: logarchive_index.h:241
Definition: lsn.h:422
lsn_t beginLSN
Definition: logarchive_index.h:22
std::vector< int > lastFinished
Definition: logarchive_index.h:258
uint32_t PageID
Definition: basics.h:45
uint32_t blockNumber
Definition: logarchive_index.h:107
PageID pid
Definition: logarchive_index.h:102
std::vector< off_t > appendPos
Definition: logarchive_index.h:245
void listRunsNonOverlapping(OutputIter out)
Definition: logarchive_index.h:206
RunId runid
Definition: logarchive_index.h:36
Log Sequence Number. See Log Sequence Numbers (LSN).
Definition: lsn.h:243
void probe(std::vector< Input > &, PageID, PageID, lsn_t startLSN, lsn_t endLSN=lsn_t::null)
Definition: logarchive_index.h:298
std::vector< std::vector< RunInfo > > runs
Definition: logarchive_index.h:252
size_t getBlockSize() const
Definition: logarchive_index.h:131
bool directIO
Definition: logarchive_index.h:281
lsn_t firstLSN
Definition: logarchive_index.h:111
size_t offset
Definition: logarchive_index.h:101
srwlock_t _open_file_mutex
Definition: logarchive_index.h:279
Return code for most functions and methods.
Definition: w_rc.h:87
Definition: logarchive_index.h:21
uint32_t entries
Definition: logarchive_index.h:106
srwlock_t _mutex
Definition: logarchive_index.h:274
bool operator==(const RunId &other) const
Definition: logarchive_index.h:28
Definition: logarchive_index.h:35
std::vector< int > appendFd
Definition: logarchive_index.h:243
char * data
Definition: logarchive_index.h:42
int refcount
Definition: logarchive_index.h:40
bool operator<(const RunInfo &other) const
Definition: logarchive_index.h:126
bool is_null() const
Definition: lsn.h:357
Definition: logarchive_index.h:110
static const string CURR_RUN_PREFIX
Definition: logarchive_index.h:290
result_type operator()(argument_type const &a) const
Definition: logarchive_index.h:64
size_t bucketSize
Definition: logarchive_index.h:268
Definition: logarchive_index.h:100
fs::path archpath
Definition: logarchive_index.h:249
Definition: logarchive_index.h:105
Shore read-write lock:: many-reader/one-writer spin lock.
Definition: latches.h:350
std::unique_ptr< RunRecycler > runRecycler
Definition: logarchive_index.h:272
const auto & parseRunFileName
Definition: scanner.cpp:13
size_t getBucketSize()
Definition: logarchive_index.h:190