quill
BoundedSPSCQueue.h
1 #pragma once
2 
3 #include "quill/core/Attributes.h"
4 #include "quill/core/Common.h"
5 #include "quill/core/MathUtilities.h"
6 #include "quill/core/QuillError.h"
7 
8 #include <atomic>
9 #include <cerrno>
10 #include <cstddef>
11 #include <cstdint>
12 #include <cstring>
13 #include <string>
14 
15 #if defined(_WIN32)
16  #include <malloc.h>
17 #else
18  #include <sys/mman.h>
19  #include <unistd.h>
20 #endif
21 
22 #if defined(QUILL_X86ARCH)
23  #if defined(_WIN32)
24  #include <intrin.h>
25  #else
26  #if __has_include(<x86gprintrin.h>)
27  #if defined(__GNUC__) && __GNUC__ > 10
28  #include <emmintrin.h>
29  #include <x86gprintrin.h>
30  #elif defined(__clang_major__)
31  // clang needs immintrin for _mm_clflushopt
32  #include <immintrin.h>
33  #endif
34  #else
35  #include <immintrin.h>
36  #include <x86intrin.h>
37  #endif
38  #endif
39 #endif
40 
41 QUILL_BEGIN_NAMESPACE
42 
43 namespace detail
44 {
45 
46 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
47 #pragma warning(push)
48 #pragma warning(disable : 4324)
49 #endif
50 
54 template <typename T>
56 {
57 public:
58  using integer_type = T;
59 
60  QUILL_ATTRIBUTE_HOT explicit BoundedSPSCQueueImpl(integer_type capacity,
61  HugePagesPolicy huge_pages_policy = HugePagesPolicy::Never,
62  integer_type reader_store_percent = 5)
63  : _capacity(next_power_of_two(capacity)),
64  _mask(_capacity - 1),
65  _bytes_per_batch(static_cast<integer_type>(static_cast<double>(_capacity * reader_store_percent) / 100.0)),
66  _storage(static_cast<std::byte*>(_alloc_aligned(
67  2ull * static_cast<uint64_t>(_capacity), QUILL_CACHE_LINE_ALIGNED, huge_pages_policy))),
68  _huge_pages_policy(huge_pages_policy)
69  {
70  std::memset(_storage, 0, 2ull * static_cast<uint64_t>(_capacity));
71 
72  _atomic_writer_pos.store(0);
73  _atomic_reader_pos.store(0);
74 
75 #if defined(QUILL_X86ARCH)
76  // remove log memory from cache
77  for (uint64_t i = 0; i < (2ull * static_cast<uint64_t>(_capacity)); i += QUILL_CACHE_LINE_SIZE)
78  {
79  _mm_clflush(_storage + i);
80  }
81 
82  // load cache lines into memory
83  if (_capacity < 1024)
84  {
85  QUILL_THROW(QuillError{"Capacity must be at least 1024"});
86  }
87 
88  uint64_t const cache_lines = (_capacity >= 2048) ? 32 : 16;
89 
90  for (uint64_t i = 0; i < cache_lines; ++i)
91  {
92  _mm_prefetch(reinterpret_cast<char const*>(_storage + (QUILL_CACHE_LINE_SIZE * i)), _MM_HINT_T0);
93  }
94 #endif
95  }
96 
97  ~BoundedSPSCQueueImpl() { _free_aligned(_storage); }
98 
102  BoundedSPSCQueueImpl(BoundedSPSCQueueImpl const&) = delete;
103  BoundedSPSCQueueImpl& operator=(BoundedSPSCQueueImpl const&) = delete;
104 
105  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(integer_type n) noexcept
106  {
107  if ((_capacity - static_cast<integer_type>(_writer_pos - _reader_pos_cache)) < n)
108  {
109  // not enough space, we need to load reader and re-check
110  _reader_pos_cache = _atomic_reader_pos.load(std::memory_order_acquire);
111 
112  if ((_capacity - static_cast<integer_type>(_writer_pos - _reader_pos_cache)) < n)
113  {
114  return nullptr;
115  }
116  }
117 
118  return _storage + (_writer_pos & _mask);
119  }
120 
121  QUILL_ATTRIBUTE_HOT void finish_write(integer_type n) noexcept { _writer_pos += n; }
122 
123  QUILL_ATTRIBUTE_HOT void commit_write() noexcept
124  {
125  // set the atomic flag so the reader can see write
126  _atomic_writer_pos.store(_writer_pos, std::memory_order_release);
127 
128 #if defined(QUILL_X86ARCH)
129  // flush writen cache lines
130  _flush_cachelines(_last_flushed_writer_pos, _writer_pos);
131 
132  // prefetch a future cache line
133  _mm_prefetch(
134  reinterpret_cast<char const*>(_storage + (_writer_pos & _mask) + (QUILL_CACHE_LINE_SIZE * 10)), _MM_HINT_T0);
135 #endif
136  }
137 
141  QUILL_ATTRIBUTE_HOT void finish_and_commit_write(integer_type n) noexcept
142  {
143  finish_write(n);
144  commit_write();
145  }
146 
147  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_read() noexcept
148  {
149  if (empty())
150  {
151  return nullptr;
152  }
153 
154  return _storage + (_reader_pos & _mask);
155  }
156 
157  QUILL_ATTRIBUTE_HOT void finish_read(integer_type n) noexcept { _reader_pos += n; }
158 
159  QUILL_ATTRIBUTE_HOT void commit_read() noexcept
160  {
161  if (static_cast<integer_type>(_reader_pos - _atomic_reader_pos.load(std::memory_order_relaxed)) >= _bytes_per_batch)
162  {
163  _atomic_reader_pos.store(_reader_pos, std::memory_order_release);
164 
165 #if defined(QUILL_X86ARCH)
166  _flush_cachelines(_last_flushed_reader_pos, _reader_pos);
167 #endif
168  }
169  }
170 
175  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
176  {
177  if (_writer_pos_cache == _reader_pos)
178  {
179  // if we think the queue is empty we also load the atomic variable to check further
180  _writer_pos_cache = _atomic_writer_pos.load(std::memory_order_acquire);
181 
182  if (_writer_pos_cache == _reader_pos)
183  {
184  return true;
185  }
186  }
187 
188  return false;
189  }
190 
191  QUILL_NODISCARD integer_type capacity() const noexcept
192  {
193  return static_cast<integer_type>(_capacity);
194  }
195 
196  QUILL_NODISCARD HugePagesPolicy huge_pages_policy() const noexcept { return _huge_pages_policy; }
197 
198 private:
199 #if defined(QUILL_X86ARCH)
200  QUILL_ATTRIBUTE_HOT void _flush_cachelines(integer_type& last, integer_type offset)
201  {
202  integer_type last_diff = last - (last & QUILL_CACHE_LINE_MASK);
203  integer_type const cur_diff = offset - (offset & QUILL_CACHE_LINE_MASK);
204 
205  if (cur_diff > last_diff)
206  {
207  // Compute masked base pointer once before loop to avoid repeated mask operations
208  std::byte* ptr = _storage + (last_diff & _mask);
209 
210  do
211  {
212  _mm_clflushopt(ptr);
213  ptr += QUILL_CACHE_LINE_SIZE;
214  last_diff += QUILL_CACHE_LINE_SIZE;
215  } while (cur_diff > last_diff);
216 
217  last = last_diff;
218  }
219  }
220 #endif
221 
228  QUILL_NODISCARD static std::byte* _align_pointer(void* pointer, size_t alignment) noexcept
229  {
230  QUILL_ASSERT(is_power_of_two(alignment),
231  "alignment must be a power of two in BoundedSPSCQueue::_align_pointer()");
232  return reinterpret_cast<std::byte*>((reinterpret_cast<uintptr_t>(pointer) + (alignment - 1ul)) &
233  ~(alignment - 1ul));
234  }
235 
246  QUILL_NODISCARD static void* _alloc_aligned(size_t size, size_t alignment,
247  QUILL_MAYBE_UNUSED HugePagesPolicy huge_pages_policy)
248  {
249 #if defined(_WIN32)
250  void* p = _aligned_malloc(size, alignment);
251 
252  if (!p)
253  {
254  QUILL_THROW(QuillError{std::string{"_alloc_aligned failed with error message errno: "} +
255  std::to_string(errno)});
256  }
257 
258  return p;
259 #else
260  // Calculate the total size including the metadata and alignment
261  constexpr size_t metadata_size{2u * sizeof(size_t)};
262  size_t const total_size{size + metadata_size + alignment};
263 
264  // Allocate the memory
265  int flags = MAP_PRIVATE | MAP_ANONYMOUS;
266 
267  #if defined(__linux__)
268  if (huge_pages_policy != HugePagesPolicy::Never)
269  {
270  flags |= MAP_HUGETLB;
271  }
272  #endif
273 
274  void* mem = ::mmap(nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
275 
276  #if defined(__linux__)
277  if ((mem == MAP_FAILED) && (huge_pages_policy == HugePagesPolicy::Try))
278  {
279  // we tried but failed allocating huge pages, try normal pages instead
280  flags &= ~MAP_HUGETLB;
281  mem = ::mmap(nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
282  }
283  #endif
284 
285  if (mem == MAP_FAILED)
286  {
287  QUILL_THROW(QuillError{std::string{"mmap failed. errno: "} + std::to_string(errno) +
288  " error: " + std::strerror(errno)});
289  }
290 
291  // Calculate the aligned address after the metadata
292  std::byte* aligned_address = _align_pointer(static_cast<std::byte*>(mem) + metadata_size, alignment);
293 
294  // Calculate the offset from the original memory location
295  auto const offset = static_cast<size_t>(aligned_address - static_cast<std::byte*>(mem));
296 
297  // Store the size and offset information in the metadata
298  std::memcpy(aligned_address - sizeof(size_t), &total_size, sizeof(total_size));
299  std::memcpy(aligned_address - (2u * sizeof(size_t)), &offset, sizeof(offset));
300 
301  return aligned_address;
302 #endif
303  }
304 
309  void static _free_aligned(void* ptr) noexcept
310  {
311 #if defined(_WIN32)
312  _aligned_free(ptr);
313 #else
314  // Retrieve the size and offset information from the metadata
315  size_t offset;
316  std::memcpy(&offset, static_cast<std::byte*>(ptr) - (2u * sizeof(size_t)), sizeof(offset));
317 
318  size_t total_size;
319  std::memcpy(&total_size, static_cast<std::byte*>(ptr) - sizeof(size_t), sizeof(total_size));
320 
321  // Calculate the original memory block address
322  void* mem = static_cast<std::byte*>(ptr) - offset;
323 
324  ::munmap(mem, total_size);
325 #endif
326  }
327 
328 private:
329  static constexpr integer_type QUILL_CACHE_LINE_MASK{QUILL_CACHE_LINE_SIZE - 1};
330 
331  integer_type const _capacity;
332  integer_type const _mask;
333  integer_type const _bytes_per_batch;
334  std::byte* const _storage{nullptr};
335  HugePagesPolicy const _huge_pages_policy;
336 
337  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_writer_pos{0};
338  alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _writer_pos{0};
339  integer_type _reader_pos_cache{0};
340  integer_type _last_flushed_writer_pos{0};
341 
342  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_reader_pos{0};
343  alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _reader_pos{0};
344  mutable integer_type _writer_pos_cache{0};
345  integer_type _last_flushed_reader_pos{0};
346 };
347 
349 
350 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
351 #pragma warning(pop)
352 #endif
353 
354 } // namespace detail
355 
356 QUILL_END_NAMESPACE
A bounded single producer single consumer ring buffer.
Definition: BoundedSPSCQueue.h:55
QUILL_ATTRIBUTE_HOT void finish_and_commit_write(integer_type n) noexcept
Finish and commit write as a single function.
Definition: BoundedSPSCQueue.h:141
QUILL_NODISCARD T next_power_of_two(T n) noexcept
Round up to the next power of 2.
Definition: MathUtilities.h:45
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
Only meant to be called by the reader.
Definition: BoundedSPSCQueue.h:175
custom exception
Definition: QuillError.h:45
QUILL_NODISCARD constexpr bool is_power_of_two(uint64_t number) noexcept
Check if a number is a power of 2.
Definition: MathUtilities.h:24