quill
BoundedSPSCQueue.h
1 #pragma once
2 
3 #include "quill/core/Attributes.h"
4 #include "quill/core/Common.h"
5 #include "quill/core/MathUtilities.h"
6 #include "quill/core/QuillError.h"
7 
8 #include <atomic>
9 #include <cassert>
10 #include <cerrno>
11 #include <cstddef>
12 #include <cstdint>
13 #include <cstring>
14 #include <string>
15 
16 #if defined(_WIN32)
17  #include <malloc.h>
18 #else
19  #include <sys/mman.h>
20  #include <unistd.h>
21 #endif
22 
23 #if defined(QUILL_X86ARCH)
24  #if defined(_WIN32)
25  #include <intrin.h>
26  #else
27  #if __has_include(<x86gprintrin.h>)
28  #if defined(__GNUC__) && __GNUC__ > 10
29  #include <emmintrin.h>
30  #include <x86gprintrin.h>
31  #elif defined(__clang_major__)
32  // clang needs immintrin for _mm_clflushopt
33  #include <immintrin.h>
34  #endif
35  #else
36  #include <immintrin.h>
37  #include <x86intrin.h>
38  #endif
39  #endif
40 #endif
41 
42 QUILL_BEGIN_NAMESPACE
43 
44 namespace detail
45 {
49 template <typename T>
51 {
52 public:
53  using integer_type = T;
54 
55  QUILL_ATTRIBUTE_HOT explicit BoundedSPSCQueueImpl(integer_type capacity,
56  HugePagesPolicy huge_pages_policy = HugePagesPolicy::Never,
57  integer_type reader_store_percent = 5)
58  : _capacity(next_power_of_two(capacity)),
59  _mask(_capacity - 1),
60  _bytes_per_batch(static_cast<integer_type>(static_cast<double>(_capacity * reader_store_percent) / 100.0)),
61  _storage(static_cast<std::byte*>(_alloc_aligned(
62  2ull * static_cast<uint64_t>(_capacity), QUILL_CACHE_LINE_ALIGNED, huge_pages_policy))),
63  _huge_pages_policy(huge_pages_policy)
64  {
65  std::memset(_storage, 0, 2ull * static_cast<uint64_t>(_capacity));
66 
67  _atomic_writer_pos.store(0);
68  _atomic_reader_pos.store(0);
69 
70 #if defined(QUILL_X86ARCH)
71  // remove log memory from cache
72  for (uint64_t i = 0; i < (2ull * static_cast<uint64_t>(_capacity)); i += QUILL_CACHE_LINE_SIZE)
73  {
74  _mm_clflush(_storage + i);
75  }
76 
77  // load cache lines into memory
78  if (_capacity < 1024)
79  {
80  QUILL_THROW(QuillError{"Capacity must be at least 1024"});
81  }
82 
83  uint64_t const cache_lines = (_capacity >= 2048) ? 32 : 16;
84 
85  for (uint64_t i = 0; i < cache_lines; ++i)
86  {
87  _mm_prefetch(reinterpret_cast<char const*>(_storage + (QUILL_CACHE_LINE_SIZE * i)), _MM_HINT_T0);
88  }
89 #endif
90  }
91 
92  ~BoundedSPSCQueueImpl() { _free_aligned(_storage); }
93 
97  BoundedSPSCQueueImpl(BoundedSPSCQueueImpl const&) = delete;
98  BoundedSPSCQueueImpl& operator=(BoundedSPSCQueueImpl const&) = delete;
99 
100  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(integer_type n) noexcept
101  {
102  if ((_capacity - static_cast<integer_type>(_writer_pos - _reader_pos_cache)) < n)
103  {
104  // not enough space, we need to load reader and re-check
105  _reader_pos_cache = _atomic_reader_pos.load(std::memory_order_acquire);
106 
107  if ((_capacity - static_cast<integer_type>(_writer_pos - _reader_pos_cache)) < n)
108  {
109  return nullptr;
110  }
111  }
112 
113  return _storage + (_writer_pos & _mask);
114  }
115 
116  QUILL_ATTRIBUTE_HOT void finish_write(integer_type n) noexcept { _writer_pos += n; }
117 
118  QUILL_ATTRIBUTE_HOT void commit_write() noexcept
119  {
120  // set the atomic flag so the reader can see write
121  _atomic_writer_pos.store(_writer_pos, std::memory_order_release);
122 
123 #if defined(QUILL_X86ARCH)
124  // flush writen cache lines
125  _flush_cachelines(_last_flushed_writer_pos, _writer_pos);
126 
127  // prefetch a future cache line
128  _mm_prefetch(
129  reinterpret_cast<char const*>(_storage + (_writer_pos & _mask) + (QUILL_CACHE_LINE_SIZE * 10)), _MM_HINT_T0);
130 #endif
131  }
132 
136  QUILL_ATTRIBUTE_HOT void finish_and_commit_write(integer_type n) noexcept
137  {
138  finish_write(n);
139  commit_write();
140  }
141 
142  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_read() noexcept
143  {
144  if (empty())
145  {
146  return nullptr;
147  }
148 
149  return _storage + (_reader_pos & _mask);
150  }
151 
152  QUILL_ATTRIBUTE_HOT void finish_read(integer_type n) noexcept { _reader_pos += n; }
153 
154  QUILL_ATTRIBUTE_HOT void commit_read() noexcept
155  {
156  if (static_cast<integer_type>(_reader_pos - _atomic_reader_pos.load(std::memory_order_relaxed)) >= _bytes_per_batch)
157  {
158  _atomic_reader_pos.store(_reader_pos, std::memory_order_release);
159 
160 #if defined(QUILL_X86ARCH)
161  _flush_cachelines(_last_flushed_reader_pos, _reader_pos);
162 #endif
163  }
164  }
165 
170  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
171  {
172  if (_writer_pos_cache == _reader_pos)
173  {
174  // if we think the queue is empty we also load the atomic variable to check further
175  _writer_pos_cache = _atomic_writer_pos.load(std::memory_order_acquire);
176 
177  if (_writer_pos_cache == _reader_pos)
178  {
179  return true;
180  }
181  }
182 
183  return false;
184  }
185 
186  QUILL_NODISCARD integer_type capacity() const noexcept
187  {
188  return static_cast<integer_type>(_capacity);
189  }
190 
191  QUILL_NODISCARD HugePagesPolicy huge_pages_policy() const noexcept { return _huge_pages_policy; }
192 
193 private:
194 #if defined(QUILL_X86ARCH)
195  QUILL_ATTRIBUTE_HOT void _flush_cachelines(integer_type& last, integer_type offset)
196  {
197  integer_type last_diff = last - (last & QUILL_CACHE_LINE_MASK);
198  integer_type const cur_diff = offset - (offset & QUILL_CACHE_LINE_MASK);
199 
200  while (cur_diff > last_diff)
201  {
202  _mm_clflushopt(_storage + (last_diff & _mask));
203  last_diff += QUILL_CACHE_LINE_SIZE;
204  last = last_diff;
205  }
206  }
207 #endif
208 
215  QUILL_NODISCARD static std::byte* _align_pointer(void* pointer, size_t alignment) noexcept
216  {
217  assert(is_power_of_two(alignment) && "alignment must be a power of two");
218  return reinterpret_cast<std::byte*>((reinterpret_cast<uintptr_t>(pointer) + (alignment - 1ul)) &
219  ~(alignment - 1ul));
220  }
221 
232  QUILL_NODISCARD static void* _alloc_aligned(size_t size, size_t alignment,
233  QUILL_MAYBE_UNUSED HugePagesPolicy huge_pages_policy)
234  {
235 #if defined(_WIN32)
236  void* p = _aligned_malloc(size, alignment);
237 
238  if (!p)
239  {
240  QUILL_THROW(QuillError{std::string{"_alloc_aligned failed with error message errno: "} +
241  std::to_string(errno)});
242  }
243 
244  return p;
245 #else
246  // Calculate the total size including the metadata and alignment
247  constexpr size_t metadata_size{2u * sizeof(size_t)};
248  size_t const total_size{size + metadata_size + alignment};
249 
250  // Allocate the memory
251  int flags = MAP_PRIVATE | MAP_ANONYMOUS;
252 
253  #if defined(__linux__)
254  if (huge_pages_policy != HugePagesPolicy::Never)
255  {
256  flags |= MAP_HUGETLB;
257  }
258  #endif
259 
260  void* mem = ::mmap(nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
261 
262  #if defined(__linux__)
263  if ((mem == MAP_FAILED) && (huge_pages_policy == HugePagesPolicy::Try))
264  {
265  // we tried but failed allocating huge pages, try normal pages instead
266  flags &= ~MAP_HUGETLB;
267  mem = ::mmap(nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
268  }
269  #endif
270 
271  if (mem == MAP_FAILED)
272  {
273  QUILL_THROW(QuillError{std::string{"mmap failed. errno: "} + std::to_string(errno) +
274  " error: " + strerror(errno)});
275  }
276 
277  // Calculate the aligned address after the metadata
278  std::byte* aligned_address = _align_pointer(static_cast<std::byte*>(mem) + metadata_size, alignment);
279 
280  // Calculate the offset from the original memory location
281  auto const offset = static_cast<size_t>(aligned_address - static_cast<std::byte*>(mem));
282 
283  // Store the size and offset information in the metadata
284  std::memcpy(aligned_address - sizeof(size_t), &total_size, sizeof(total_size));
285  std::memcpy(aligned_address - (2u * sizeof(size_t)), &offset, sizeof(offset));
286 
287  return aligned_address;
288 #endif
289  }
290 
295  void static _free_aligned(void* ptr) noexcept
296  {
297 #if defined(_WIN32)
298  _aligned_free(ptr);
299 #else
300  // Retrieve the size and offset information from the metadata
301  size_t offset;
302  std::memcpy(&offset, static_cast<std::byte*>(ptr) - (2u * sizeof(size_t)), sizeof(offset));
303 
304  size_t total_size;
305  std::memcpy(&total_size, static_cast<std::byte*>(ptr) - sizeof(size_t), sizeof(total_size));
306 
307  // Calculate the original memory block address
308  void* mem = static_cast<std::byte*>(ptr) - offset;
309 
310  ::munmap(mem, total_size);
311 #endif
312  }
313 
314 private:
315  static constexpr integer_type QUILL_CACHE_LINE_MASK{QUILL_CACHE_LINE_SIZE - 1};
316 
317  integer_type const _capacity;
318  integer_type const _mask;
319  integer_type const _bytes_per_batch;
320  std::byte* const _storage{nullptr};
321  HugePagesPolicy const _huge_pages_policy;
322 
323  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_writer_pos{0};
324  alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _writer_pos{0};
325  integer_type _reader_pos_cache{0};
326  integer_type _last_flushed_writer_pos{0};
327 
328  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_reader_pos{0};
329  alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _reader_pos{0};
330  mutable integer_type _writer_pos_cache{0};
331  integer_type _last_flushed_reader_pos{0};
332 };
333 
335 } // namespace detail
336 
337 QUILL_END_NAMESPACE
A bounded single producer single consumer ring buffer.
Definition: BoundedSPSCQueue.h:50
QUILL_ATTRIBUTE_HOT void finish_and_commit_write(integer_type n) noexcept
Finish and commit write as a single function.
Definition: BoundedSPSCQueue.h:136
QUILL_NODISCARD T next_power_of_two(T n) noexcept
Round up to the next power of 2.
Definition: MathUtilities.h:46
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
Only meant to be called by the reader.
Definition: BoundedSPSCQueue.h:170
custom exception
Definition: QuillError.h:45
QUILL_NODISCARD constexpr bool is_power_of_two(uint64_t number) noexcept
Check if a number is a power of 2.
Definition: MathUtilities.h:25