quill
UnboundedSPSCQueue.h
1 
7 #pragma once
8 
9 #include "quill/core/Attributes.h"
10 #include "quill/core/BoundedSPSCQueue.h"
11 #include "quill/core/Common.h"
12 #include "quill/core/QuillError.h"
13 
14 #include <atomic>
15 #include <cassert>
16 #include <cstddef>
17 #include <string>
18 
19 QUILL_BEGIN_NAMESPACE
20 
21 namespace detail
22 {
38 {
39 private:
43  struct Node
44  {
50  explicit Node(size_t bounded_queue_capacity, HugePagesPolicy huge_pages_policy)
51  : bounded_queue(bounded_queue_capacity, huge_pages_policy)
52  {
53  }
54 
56  std::atomic<Node*> next{nullptr};
57  BoundedSPSCQueue bounded_queue;
58  };
59 
60 public:
61  struct ReadResult
62  {
63  explicit ReadResult(std::byte* read_position) : read_pos(read_position) {}
64 
65  std::byte* read_pos;
66  size_t previous_capacity{0};
67  size_t new_capacity{0};
68  bool allocation{false};
69  };
70 
74  UnboundedSPSCQueue(size_t initial_bounded_queue_capacity, size_t max_capacity,
75  HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never)
76  : _max_capacity(max_capacity),
77  _producer(new Node(initial_bounded_queue_capacity, huge_pages_policy)),
78  _consumer(_producer)
79  {
80  }
81 
85  UnboundedSPSCQueue(UnboundedSPSCQueue const&) = delete;
86  UnboundedSPSCQueue& operator=(UnboundedSPSCQueue const&) = delete;
87 
92  {
93  // Get the current consumer node
94  Node const* current_node = _consumer;
95 
96  // Look for extra nodes to delete
97  while (current_node != nullptr)
98  {
99  auto const to_delete = current_node;
100  current_node = current_node->next;
101  delete to_delete;
102  }
103  }
104 
110  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(size_t nbytes)
111  {
112  // Try to reserve the bounded queue
113  std::byte* write_pos = _producer->bounded_queue.prepare_write(nbytes);
114 
115  if (QUILL_LIKELY(write_pos != nullptr))
116  {
117  return write_pos;
118  }
119 
120  return _handle_full_queue(nbytes);
121  }
122 
127  QUILL_ATTRIBUTE_HOT void finish_write(size_t nbytes) noexcept
128  {
129  _producer->bounded_queue.finish_write(nbytes);
130  }
131 
135  QUILL_ATTRIBUTE_HOT void commit_write() noexcept { _producer->bounded_queue.commit_write(); }
136 
140  QUILL_ATTRIBUTE_HOT void finish_and_commit_write(size_t nbytes) noexcept
141  {
142  finish_write(nbytes);
143  commit_write();
144  }
145 
151  QUILL_NODISCARD size_t producer_capacity() const noexcept
152  {
153  return _producer->bounded_queue.capacity();
154  }
155 
161  void shrink(size_t capacity)
162  {
163  if (capacity > (_producer->bounded_queue.capacity() >> 1))
164  {
165  // We should only shrink if the new capacity is less or at least equal to the previous_power_of_2
166  return;
167  }
168 
169  // We want to shrink the queue, we will create a new queue with a smaller size
170  // the consumer will switch to the newer queue after emptying and deallocating the older queue
171  auto const next_node = new Node{capacity, _producer->bounded_queue.huge_pages_policy()};
172 
173  // store the new node pointer as next in the current node
174  _producer->next.store(next_node, std::memory_order_release);
175 
176  // producer is now using the next node
177  _producer = next_node;
178  }
179 
185  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ReadResult prepare_read()
186  {
187  ReadResult read_result{_consumer->bounded_queue.prepare_read()};
188 
189  if (read_result.read_pos != nullptr)
190  {
191  return read_result;
192  }
193 
194  // the buffer is empty check if another buffer exists
195  Node* const next_node = _consumer->next.load(std::memory_order_acquire);
196 
197  if (next_node)
198  {
199  return _read_next_queue(next_node);
200  }
201 
202  // Queue is empty and no new queue exists
203  return read_result;
204  }
205 
210  QUILL_ATTRIBUTE_HOT void finish_read(size_t nbytes) noexcept
211  {
212  _consumer->bounded_queue.finish_read(nbytes);
213  }
214 
218  QUILL_ATTRIBUTE_HOT void commit_read() noexcept { _consumer->bounded_queue.commit_read(); }
219 
225  QUILL_NODISCARD size_t capacity() const noexcept { return _consumer->bounded_queue.capacity(); }
226 
232  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
233  {
234  return _consumer->bounded_queue.empty() && (_consumer->next.load(std::memory_order_relaxed) == nullptr);
235  }
236 
237 private:
238  /***/
239  QUILL_NODISCARD std::byte* _handle_full_queue(size_t nbytes)
240  {
241  // Then it means the queue doesn't have enough size
242  size_t capacity = _producer->bounded_queue.capacity() * 2ull;
243  while (capacity < nbytes)
244  {
245  capacity = capacity * 2ull;
246  }
247 
248  if (QUILL_UNLIKELY(capacity > _max_capacity))
249  {
250  if (nbytes > _max_capacity)
251  {
252  QUILL_THROW(
253  QuillError{"Logging single messages larger than the configured maximum queue capacity "
254  "is not possible.\n"
255  "To log single messages exceeding this limit, consider increasing "
256  "FrontendOptions::unbounded_queue_max_capacity.\n"
257  "Message size: " +
258  std::to_string(nbytes) +
259  " bytes\n"
260  "Required queue capacity: " +
261  std::to_string(capacity) +
262  " bytes\n"
263  "Configured maximum queue capacity: " +
264  std::to_string(_max_capacity) + " bytes"});
265  }
266 
267  // we reached the unbounded_queue_max_capacity we won't be allocating more
268  // instead return nullptr to block or drop
269  return nullptr;
270  }
271 
272  // commit previous write to the old queue before switching
273  _producer->bounded_queue.commit_write();
274 
275  // We failed to reserve because the queue was full, create a new node with a new queue
276  auto const next_node = new Node{capacity, _producer->bounded_queue.huge_pages_policy()};
277 
278  // store the new node pointer as next in the current node
279  _producer->next.store(next_node, std::memory_order_release);
280 
281  // producer is now using the next node
282  _producer = next_node;
283 
284  // reserve again, this time we know we will always succeed, cast to void* to ignore
285  std::byte* const write_pos = _producer->bounded_queue.prepare_write(nbytes);
286 
287  assert(write_pos && "write_pos is nullptr");
288 
289  return write_pos;
290  }
291 
292  /***/
293  QUILL_NODISCARD ReadResult _read_next_queue(Node* next_node)
294  {
295  // a new buffer was added by the producer, this happens only when we have allocated a new queue
296 
297  // try the existing buffer once more
298  ReadResult read_result{_consumer->bounded_queue.prepare_read()};
299 
300  if (read_result.read_pos)
301  {
302  return read_result;
303  }
304 
305  // Switch to the new buffer for reading
306  // commit the previous reads before deleting the queue
307  _consumer->bounded_queue.commit_read();
308 
309  // switch to the new buffer, existing one is deleted
310  auto const previous_capacity = _consumer->bounded_queue.capacity();
311  delete _consumer;
312 
313  _consumer = next_node;
314  read_result.read_pos = _consumer->bounded_queue.prepare_read();
315 
316  // we switched to a new here, so we store the capacity info to return it
317  read_result.allocation = true;
318  read_result.new_capacity = _consumer->bounded_queue.capacity();
319  read_result.previous_capacity = previous_capacity;
320 
321  return read_result;
322  }
323 
324 private:
325  size_t _max_capacity;
326 
328  alignas(QUILL_CACHE_LINE_ALIGNED) Node* _producer{nullptr};
329  alignas(QUILL_CACHE_LINE_ALIGNED) Node* _consumer{nullptr};
330 };
331 
332 } // namespace detail
333 
334 QUILL_END_NAMESPACE
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:37
~UnboundedSPSCQueue()
Destructor.
Definition: UnboundedSPSCQueue.h:91
Definition: UnboundedSPSCQueue.h:61
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte * prepare_write(size_t nbytes)
Reserve contiguous space for the producer without making it visible to the consumer.
Definition: UnboundedSPSCQueue.h:110
QUILL_NODISCARD size_t capacity() const noexcept
Return the current buffer&#39;s capacity.
Definition: UnboundedSPSCQueue.h:225
UnboundedSPSCQueue(size_t initial_bounded_queue_capacity, size_t max_capacity, HugePagesPolicy huge_pages_policy=quill::HugePagesPolicy::Never)
Constructor.
Definition: UnboundedSPSCQueue.h:74
QUILL_ATTRIBUTE_HOT void finish_write(size_t nbytes) noexcept
Complement to reserve producer space that makes nbytes starting from the return of reserve producer s...
Definition: UnboundedSPSCQueue.h:127
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
checks if the queue is empty
Definition: UnboundedSPSCQueue.h:232
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
QUILL_ATTRIBUTE_HOT void finish_read(size_t nbytes) noexcept
Consumes the next nbytes in the buffer and frees it back for the producer to reuse.
Definition: UnboundedSPSCQueue.h:210
QUILL_NODISCARD size_t producer_capacity() const noexcept
Return the current buffer&#39;s capacity.
Definition: UnboundedSPSCQueue.h:151
QUILL_ATTRIBUTE_HOT void commit_write() noexcept
Commit write to notify the consumer bytes are ready to read.
Definition: UnboundedSPSCQueue.h:135
custom exception
Definition: QuillError.h:45
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ReadResult prepare_read()
Prepare to read from the buffer a callback used for notifications to the user.
Definition: UnboundedSPSCQueue.h:185
QUILL_ATTRIBUTE_HOT void commit_read() noexcept
Commit the read to indicate that the bytes are read and are now free to be reused.
Definition: UnboundedSPSCQueue.h:218
void shrink(size_t capacity)
Shrinks the queue if capacity is a valid smaller power of 2.
Definition: UnboundedSPSCQueue.h:161
QUILL_ATTRIBUTE_HOT void finish_and_commit_write(size_t nbytes) noexcept
Finish and commit write as a single function.
Definition: UnboundedSPSCQueue.h:140