Zero  0.1.0
restore.h
Go to the documentation of this file.
1 #include "w_defines.h"
2 
3 #ifndef __RESTORE_H
4 #define __RESTORE_H
5 
6 #include "worker_thread.h"
7 #include "sm_base.h"
8 #include "logarchive_scanner.h"
9 
10 #include <queue>
11 #include <map>
12 #include <vector>
13 #include <unordered_map>
14 #include <atomic>
15 #include <mutex>
16 #include <chrono>
17 #include <condition_variable>
18 
19 class sm_options;
20 class RestoreBitmap;
21 class ArchiveIndex;
22 
32 public:
33 
34  enum class State {
35  UNRESTORED = 0,
36  RESTORING = 1,
37  RESTORED = 2
38  };
39 
40  RestoreBitmap(size_t size)
41  : _size(size) {
42  states = new std::atomic<State>[size];
43  for (size_t i = 0; i < size; i++) {
45  }
46  }
47 
49  delete[] states;
50  }
51 
52  size_t get_size() {
53  return _size;
54  }
55 
56  bool is_unrestored(unsigned i) const {
57  return states[i] == State::UNRESTORED;
58  }
59 
60  bool is_restoring(unsigned i) const {
61  return states[i] == State::RESTORING;
62  }
63 
64  bool is_restored(unsigned i) const {
65  return states[i] == State::RESTORED;
66  }
67 
68  bool attempt_restore(unsigned i) {
69  auto expected = State::UNRESTORED;
70  return states[i].compare_exchange_strong(expected, State::RESTORING);
71  }
72 
73  void mark_restored(unsigned i) {
76  }
77 
78  unsigned get_first_unrestored() const {
79  for (unsigned i = 0; i < _size; i++) {
80  if (states[i] == State::UNRESTORED) {
81  return i;
82  }
83  }
84  return _size;
85  }
86 
87  unsigned get_first_restoring() const {
88  for (unsigned i = 0; i < _size; i++) {
89  if (states[i] == State::RESTORING) {
90  return i;
91  }
92  }
93  return _size;
94  }
95 
96  // TODO: implement these to checkpoint bitmap state
97  // void serialize(char* buf, size_t from, size_t to);
98  // void deserialize(char* buf, size_t from, size_t to);
99 
106  // void getBoundaries(size_t& lowestFalse, size_t& highestTrue);
107 
108 protected:
109  std::atomic<State>* states;
110 
111  const size_t _size;
112 };
113 
116 template<typename RestoreFunctor>
118 public:
119 
120  RestoreCoordinator(size_t segSize, size_t segCount, RestoreFunctor f,
121  bool virgin_pages, bool on_demand = true, bool start_locked = false)
122  : _segment_size{segSize},
123  _bitmap{new RestoreBitmap{segCount}},
124  _restoreFunctor{f},
125  _virgin_pages{virgin_pages},
126  _on_demand(on_demand),
127  _start_locked(start_locked),
128  _begin_lsn(lsn_t::null),
129  _end_lsn(lsn_t::null) {
130  if (_start_locked) {
131  _mutex.lock();
132  }
133  }
134 
135  void set_lsns(lsn_t begin, lsn_t end) {
136  _begin_lsn = begin;
137  _end_lsn = end;
138  }
139 
140  void fetch(PageID pid) {
141  using namespace std::chrono_literals;
142 
143  auto segment = pid / _segment_size;
144  if (segment >= _bitmap->get_size() || _bitmap->is_restored(segment)) {
145  return;
146  }
147 
148  std::unique_lock<std::mutex> lck{_mutex};
149 
150  // check again in critical section
151  if (_bitmap->is_restored(segment)) {
152  return;
153  }
154 
155  // Segment not restored yet: we must attempt to restore it ourselves or
156  // wait on a ticket if it's already being restored
157  auto ticket = getWaitingTicket(segment);
158 
159  if (_on_demand && _bitmap->attempt_restore(segment)) {
160  lck.unlock();
161  doRestore(segment, segment + 1, ticket);
162  } else {
163  constexpr auto sleep_time = 10ms;
164  auto pred = [this, segment] {
165  return _bitmap->is_restored(segment);
166  };
167  while (!pred()) {
168  ticket->wait_for(lck, sleep_time, pred);
169  }
170  }
171  }
172 
173  bool tryBackgroundRestore(bool& done) {
174  done = false;
175 
176  // If no restore requests are pending, restore the first
177  // not-yet-restored segment.
178  if (_on_demand && !_waiting_table.empty()) {
179  return false;
180  }
181 
182  std::unique_lock<std::mutex> lck{_mutex};
183  auto segment_begin = _bitmap->get_first_unrestored();
184 
185  if (segment_begin == _bitmap->get_size()) {
186  // All segments in either "restoring" or "restored" state
187  done = true;
188  return false;
189  }
190 
191  // Try to restore multiple segments with a single call
192  size_t restore_size = 0;
193  unsigned segment_end = segment_begin;
194  while (true) {
195  if (!_bitmap->is_unrestored(segment_end)) {
196  break;
197  }
198 
199  getWaitingTicket(segment_end);
200  if (_bitmap->attempt_restore(segment_end)) {
201  segment_end++;
202  restore_size += _segment_size;
203  } else {
204  break;
205  }
206 
207  if (restore_size > MaxRestorePages - _segment_size) {
208  break;
209  }
210  }
211 
212  if (segment_end > segment_begin) {
213  lck.unlock();
214  // ticket is ignored here -- threads just wait for timeout
215  ERROUT(<< "background restore: " << segment_begin << " - " <<
216  segment_end);
217  doRestore(segment_begin, segment_end, nullptr);
218  return true;
219  }
220 
221  return false;
222  }
223 
224  bool isPidRestored(PageID pid) const {
225  auto segment = pid / _segment_size;
226  return segment >= _bitmap->get_size() || _bitmap->is_restored(segment);
227  }
228 
229  bool allDone() const {
230  return _bitmap->get_first_restoring() >= _bitmap->get_size();
231  }
232 
233  void start() {
234  if (_start_locked) {
235  _mutex.unlock();
236  }
237  }
238 
239 private:
240  using Ticket = std::shared_ptr<std::condition_variable>;
241 
242  const size_t _segment_size;
243 
244  std::mutex _mutex;
245 
246  std::unordered_map<unsigned, Ticket> _waiting_table;
247 
248  std::unique_ptr<RestoreBitmap> _bitmap;
249 
250  RestoreFunctor _restoreFunctor;
251 
252  const bool _virgin_pages;
253 
254  const bool _on_demand;
255 
256  // This is used to make threads wait for log archiver reach a certain LSN
257  const bool _start_locked;
258 
260 
262 
263  // Not customizable for now (should be at most IOV_MAX, which is 1024)
264  static constexpr size_t MaxRestorePages = 1024;
265 
266  Ticket getWaitingTicket(unsigned segment) {
267  // caller must hold mutex
268  auto it = _waiting_table.find(segment);
269  if (it == _waiting_table.end()) {
270  auto ticket = make_shared<std::condition_variable>();
271  _waiting_table[segment] = ticket;
272  w_assert0(_bitmap->is_unrestored(segment));
273  return ticket;
274  } else {
275  return it->second;
276  }
277  }
278 
279  void doRestore(unsigned segment_begin, unsigned segment_end, Ticket ticket) {
280  _restoreFunctor(segment_begin, segment_end, _segment_size,
281  _virgin_pages, _begin_lsn, _end_lsn);
282 
283  for (auto s = segment_begin; s < segment_end; s++) {
284  _bitmap->mark_restored(s);
285  }
286 
287  // If multiple segments given, no notify is sent -- rely on timeout
288  if (ticket) {
289  ticket->notify_all();
290  }
291 
292  std::unique_lock<std::mutex> lck{_mutex};
293  for (auto s = segment_begin; s < segment_end; s++) {
294  bool erased = _waiting_table.erase(s);
295  w_assert0(erased);
296  }
297  }
298 };
299 
300 struct LogReplayer {
301  template<class LogScan, class PageIter>
302  static void replay(LogScan logs, PageIter& pagesBegin, PageIter pagesEnd);
303 };
304 
306  static void bf_restore(unsigned segment_begin, unsigned segment_end,
307  size_t segment_size, bool virgin_pages, lsn_t begin_lsn, lsn_t end_lsn);
308 };
309 
311 template<class Coordinator, class OnDoneCallback>
313 public:
314  BackgroundRestorer(std::shared_ptr<Coordinator> coord, OnDoneCallback callback)
315  : _coord(coord),
316  _notify_done(callback) {}
317 
318  virtual void do_work() {
319  using namespace std::chrono_literals;
320 
321  bool no_segments_left = false;
322  bool restored_last = false;
323 
324  auto do_sleep = [] {
325  constexpr auto sleep_time = 100ms;
326  std::this_thread::sleep_for(sleep_time);
327  };
328 
329  while (true) {
330  if (!restored_last) {
331  do_sleep();
332  }
333  restored_last = _coord->tryBackgroundRestore(no_segments_left);
334 
335  if (no_segments_left || should_exit()) {
336  break;
337  }
338  }
339 
340  while (!should_exit() && no_segments_left && !_coord->allDone()) {
341  do_sleep();
342  }
343 
344  if (_coord->allDone()) {
345  _notify_done();
346  }
347 
348  _coord = nullptr;
349  quit();
350  }
351 
352 private:
353  std::shared_ptr<Coordinator> _coord;
354 
355  OnDoneCallback _notify_done;
356 };
357 
358 #endif // __RESTORE_H
Definition: restore.h:305
const size_t _segment_size
Definition: restore.h:242
void start()
Definition: restore.h:233
Bitmap data structure that controls the progress of restore.
Definition: restore.h:31
Definition: worker_thread.h:12
#define w_assert1(x)
Level 1 should not add significant extra time.
Definition: w_base.h:198
std::atomic< State > * states
Definition: restore.h:109
OnDoneCallback _notify_done
Definition: restore.h:355
bool attempt_restore(unsigned i)
Definition: restore.h:68
std::unordered_map< unsigned, Ticket > _waiting_table
Definition: restore.h:246
Definition: restore.h:312
lsn_t _begin_lsn
Definition: restore.h:259
bool is_restoring(unsigned i) const
Definition: restore.h:60
const bool _on_demand
Definition: restore.h:254
bool tryBackgroundRestore(bool &done)
Definition: restore.h:173
RestoreFunctor _restoreFunctor
Definition: restore.h:250
Coordinator that synchronizes multi-threaded decentralized restore.
Definition: restore.h:117
Start-up parameters for the storage engine. See OPTIONS.
Definition: sm_options.h:24
bool is_unrestored(unsigned i) const
Definition: restore.h:56
static const lsn_t null
Definition: lsn.h:371
lsn_t _end_lsn
Definition: restore.h:261
std::unique_ptr< RestoreBitmap > _bitmap
Definition: restore.h:248
Encapsulates all file and I/O operations on the log archive.
Definition: logarchive_index.h:94
bool is_restored(unsigned i) const
Definition: restore.h:64
void doRestore(unsigned segment_begin, unsigned segment_end, Ticket ticket)
Definition: restore.h:279
std::mutex _mutex
Definition: restore.h:244
BackgroundRestorer(std::shared_ptr< Coordinator > coord, OnDoneCallback callback)
Definition: restore.h:314
void mark_restored(unsigned i)
Definition: restore.h:73
uint32_t PageID
Definition: basics.h:45
bool allDone() const
Definition: restore.h:229
#define w_assert0(x)
Default assert/debug level is 0.
Definition: w_base.h:175
Log Sequence Number. See Log Sequence Numbers (LSN).
Definition: lsn.h:243
void set_lsns(lsn_t begin, lsn_t end)
Definition: restore.h:135
void fetch(PageID pid)
Definition: restore.h:140
Ticket getWaitingTicket(unsigned segment)
Definition: restore.h:266
const bool _start_locked
Definition: restore.h:257
#define ERROUT(a)
Definition: w_debug.h:175
size_t get_size()
Definition: restore.h:52
const size_t _size
Definition: restore.h:111
std::shared_ptr< Coordinator > _coord
Definition: restore.h:353
State
Definition: restore.h:34
unsigned get_first_restoring() const
Definition: restore.h:87
Definition: restore.h:300
const bool _virgin_pages
Definition: restore.h:252
RestoreCoordinator(size_t segSize, size_t segCount, RestoreFunctor f, bool virgin_pages, bool on_demand=true, bool start_locked=false)
Definition: restore.h:120
std::shared_ptr< std::condition_variable > Ticket
Definition: restore.h:240
virtual void do_work()
Definition: restore.h:318
~RestoreBitmap()
Definition: restore.h:48
RestoreBitmap(size_t size)
Definition: restore.h:40
bool isPidRestored(PageID pid) const
Definition: restore.h:224
unsigned get_first_unrestored() const
Definition: restore.h:78