quill
UnboundedSPSCQueue.h
1 
7 #pragma once
8 
9 #include "quill/core/Attributes.h"
10 #include "quill/core/BoundedSPSCQueue.h"
11 #include "quill/core/Common.h"
12 #include "quill/core/QuillError.h"
13 
14 #include <atomic>
15 #include <cstddef>
16 #include <string>
17 
18 QUILL_BEGIN_NAMESPACE
19 
20 namespace detail
21 {
22 
23 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
24 #pragma warning(push)
25 #pragma warning(disable : 4324)
26 #endif
27 
43 {
44 private:
48  struct Node
49  {
55  explicit Node(size_t bounded_queue_capacity, HugePagesPolicy huge_pages_policy)
56  : bounded_queue(bounded_queue_capacity, huge_pages_policy)
57  {
58  }
59 
61  std::atomic<Node*> next{nullptr};
62  BoundedSPSCQueue bounded_queue;
63  };
64 
65 public:
66  struct ReadResult
67  {
68  explicit ReadResult(std::byte* read_position) : read_pos(read_position) {}
69 
70  std::byte* read_pos;
71  size_t previous_capacity{0};
72  size_t new_capacity{0};
73  bool allocation{false};
74  };
75 
79  UnboundedSPSCQueue(size_t initial_bounded_queue_capacity, size_t max_capacity,
80  HugePagesPolicy huge_pages_policy = quill::HugePagesPolicy::Never)
81  : _max_capacity(max_capacity),
82  _producer(new Node(initial_bounded_queue_capacity, huge_pages_policy)),
83  _consumer(_producer)
84  {
85  }
86 
90  UnboundedSPSCQueue(UnboundedSPSCQueue const&) = delete;
91  UnboundedSPSCQueue& operator=(UnboundedSPSCQueue const&) = delete;
92 
97  {
98  // Get the current consumer node
99  Node const* current_node = _consumer;
100 
101  // Look for extra nodes to delete
102  while (current_node != nullptr)
103  {
104  auto const to_delete = current_node;
105  current_node = current_node->next;
106  delete to_delete;
107  }
108  }
109 
115  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(size_t nbytes)
116  {
117  // Try to reserve the bounded queue
118  std::byte* write_pos = _producer->bounded_queue.prepare_write(nbytes);
119 
120  if (QUILL_LIKELY(write_pos != nullptr))
121  {
122  return write_pos;
123  }
124 
125  return _handle_full_queue(nbytes);
126  }
127 
132  QUILL_ATTRIBUTE_HOT void finish_write(size_t nbytes) noexcept
133  {
134  _producer->bounded_queue.finish_write(nbytes);
135  }
136 
140  QUILL_ATTRIBUTE_HOT void commit_write() noexcept { _producer->bounded_queue.commit_write(); }
141 
145  QUILL_ATTRIBUTE_HOT void finish_and_commit_write(size_t nbytes) noexcept
146  {
147  finish_write(nbytes);
148  commit_write();
149  }
150 
156  QUILL_NODISCARD size_t producer_capacity() const noexcept
157  {
158  return _producer->bounded_queue.capacity();
159  }
160 
166  void shrink(size_t capacity)
167  {
168  if (capacity > (_producer->bounded_queue.capacity() >> 1))
169  {
170  // We should only shrink if the new capacity is less or at least equal to the previous_power_of_2
171  return;
172  }
173 
174  // We want to shrink the queue, we will create a new queue with a smaller size
175  // the consumer will switch to the newer queue after emptying and deallocating the older queue
176  auto const next_node = new Node{capacity, _producer->bounded_queue.huge_pages_policy()};
177 
178  // store the new node pointer as next in the current node
179  _producer->next.store(next_node, std::memory_order_release);
180 
181  // producer is now using the next node
182  _producer = next_node;
183  }
184 
190  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ReadResult prepare_read()
191  {
192  ReadResult read_result{_consumer->bounded_queue.prepare_read()};
193 
194  if (read_result.read_pos != nullptr)
195  {
196  return read_result;
197  }
198 
199  // the buffer is empty check if another buffer exists
200  Node* const next_node = _consumer->next.load(std::memory_order_acquire);
201 
202  if (next_node)
203  {
204  return _read_next_queue(next_node);
205  }
206 
207  // Queue is empty and no new queue exists
208  return read_result;
209  }
210 
215  QUILL_ATTRIBUTE_HOT void finish_read(size_t nbytes) noexcept
216  {
217  _consumer->bounded_queue.finish_read(nbytes);
218  }
219 
223  QUILL_ATTRIBUTE_HOT void commit_read() noexcept { _consumer->bounded_queue.commit_read(); }
224 
230  QUILL_NODISCARD size_t capacity() const noexcept { return _consumer->bounded_queue.capacity(); }
231 
237  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
238  {
239  return _consumer->bounded_queue.empty() && (_consumer->next.load(std::memory_order_relaxed) == nullptr);
240  }
241 
242 private:
243  /***/
244  QUILL_NODISCARD std::byte* _handle_full_queue(size_t nbytes)
245  {
246  // Then it means the queue doesn't have enough size
247  size_t capacity = _producer->bounded_queue.capacity() * 2ull;
248  while (capacity < nbytes)
249  {
250  capacity = capacity * 2ull;
251  }
252 
253  if (QUILL_UNLIKELY(capacity > _max_capacity))
254  {
255  if (nbytes > _max_capacity)
256  {
257  QUILL_THROW(
258  QuillError{"Logging single messages larger than the configured maximum queue capacity "
259  "is not possible.\n"
260  "To log single messages exceeding this limit, consider increasing "
261  "FrontendOptions::unbounded_queue_max_capacity.\n"
262  "Message size: " +
263  std::to_string(nbytes) +
264  " bytes\n"
265  "Required queue capacity: " +
266  std::to_string(capacity) +
267  " bytes\n"
268  "Configured maximum queue capacity: " +
269  std::to_string(_max_capacity) + " bytes"});
270  }
271 
272  // we reached the unbounded_queue_max_capacity we won't be allocating more
273  // instead return nullptr to block or drop
274  return nullptr;
275  }
276 
277  // commit previous write to the old queue before switching
278  _producer->bounded_queue.commit_write();
279 
280  // We failed to reserve because the queue was full, create a new node with a new queue
281  auto const next_node = new Node{capacity, _producer->bounded_queue.huge_pages_policy()};
282 
283  // store the new node pointer as next in the current node
284  _producer->next.store(next_node, std::memory_order_release);
285 
286  // producer is now using the next node
287  _producer = next_node;
288 
289  // reserve again, this time we know we will always succeed, cast to void* to ignore
290  std::byte* const write_pos = _producer->bounded_queue.prepare_write(nbytes);
291 
292  QUILL_ASSERT(
293  write_pos,
294  "write_pos is nullptr after allocating new node in UnboundedSPSCQueue::prepare_write()");
295 
296  return write_pos;
297  }
298 
299  /***/
300  QUILL_NODISCARD ReadResult _read_next_queue(Node* next_node)
301  {
302  // a new buffer was added by the producer, this happens only when we have allocated a new queue
303 
304  // try the existing buffer once more
305  ReadResult read_result{_consumer->bounded_queue.prepare_read()};
306 
307  if (read_result.read_pos)
308  {
309  return read_result;
310  }
311 
312  // Switch to the new buffer for reading
313  // commit the previous reads before deleting the queue
314  _consumer->bounded_queue.commit_read();
315 
316  // switch to the new buffer, existing one is deleted
317  auto const previous_capacity = _consumer->bounded_queue.capacity();
318  delete _consumer;
319 
320  _consumer = next_node;
321  read_result.read_pos = _consumer->bounded_queue.prepare_read();
322 
323  // we switched to a new here, so we store the capacity info to return it
324  read_result.allocation = true;
325  read_result.new_capacity = _consumer->bounded_queue.capacity();
326  read_result.previous_capacity = previous_capacity;
327 
328  return read_result;
329  }
330 
331 private:
332  size_t _max_capacity;
333 
335  alignas(QUILL_CACHE_LINE_ALIGNED) Node* _producer{nullptr};
336  alignas(QUILL_CACHE_LINE_ALIGNED) Node* _consumer{nullptr};
337 };
338 
339 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
340 #pragma warning(pop)
341 #endif
342 
343 } // namespace detail
344 
345 QUILL_END_NAMESPACE
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:42
~UnboundedSPSCQueue()
Destructor.
Definition: UnboundedSPSCQueue.h:96
Definition: UnboundedSPSCQueue.h:66
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte * prepare_write(size_t nbytes)
Reserve contiguous space for the producer without making it visible to the consumer.
Definition: UnboundedSPSCQueue.h:115
QUILL_NODISCARD size_t capacity() const noexcept
Return the current buffer&#39;s capacity.
Definition: UnboundedSPSCQueue.h:230
UnboundedSPSCQueue(size_t initial_bounded_queue_capacity, size_t max_capacity, HugePagesPolicy huge_pages_policy=quill::HugePagesPolicy::Never)
Constructor.
Definition: UnboundedSPSCQueue.h:79
QUILL_ATTRIBUTE_HOT void finish_write(size_t nbytes) noexcept
Complement to reserve producer space that makes nbytes starting from the return of reserve producer s...
Definition: UnboundedSPSCQueue.h:132
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
checks if the queue is empty
Definition: UnboundedSPSCQueue.h:237
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
QUILL_ATTRIBUTE_HOT void finish_read(size_t nbytes) noexcept
Consumes the next nbytes in the buffer and frees it back for the producer to reuse.
Definition: UnboundedSPSCQueue.h:215
QUILL_NODISCARD size_t producer_capacity() const noexcept
Return the current buffer&#39;s capacity.
Definition: UnboundedSPSCQueue.h:156
QUILL_ATTRIBUTE_HOT void commit_write() noexcept
Commit write to notify the consumer bytes are ready to read.
Definition: UnboundedSPSCQueue.h:140
custom exception
Definition: QuillError.h:45
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ReadResult prepare_read()
Prepare to read from the buffer a callback used for notifications to the user.
Definition: UnboundedSPSCQueue.h:190
QUILL_ATTRIBUTE_HOT void commit_read() noexcept
Commit the read to indicate that the bytes are read and are now free to be reused.
Definition: UnboundedSPSCQueue.h:223
void shrink(size_t capacity)
Shrinks the queue if capacity is a valid smaller power of 2.
Definition: UnboundedSPSCQueue.h:166
QUILL_ATTRIBUTE_HOT void finish_and_commit_write(size_t nbytes) noexcept
Finish and commit write as a single function.
Definition: UnboundedSPSCQueue.h:145