3 #include "quill/core/Attributes.h" 4 #include "quill/core/Common.h" 5 #include "quill/core/MathUtilities.h" 6 #include "quill/core/QuillError.h" 23 #if defined(QUILL_X86ARCH) 27 #if __has_include(<x86gprintrin.h>) 28 #if defined(__GNUC__) && __GNUC__ > 10 29 #include <emmintrin.h> 30 #include <x86gprintrin.h> 31 #elif defined(__clang_major__) 33 #include <immintrin.h> 36 #include <immintrin.h> 37 #include <x86intrin.h> 53 using integer_type = T;
56 HugePagesPolicy huge_pages_policy = HugePagesPolicy::Never,
57 integer_type reader_store_percent = 5)
60 _bytes_per_batch(static_cast<integer_type>(static_cast<double>(_capacity * reader_store_percent) / 100.0)),
61 _storage(static_cast<std::byte*>(_alloc_aligned(
62 2ull * static_cast<uint64_t>(_capacity), QUILL_CACHE_LINE_ALIGNED, huge_pages_policy))),
63 _huge_pages_policy(huge_pages_policy)
65 std::memset(_storage, 0, 2ull * static_cast<uint64_t>(_capacity));
67 _atomic_writer_pos.store(0);
68 _atomic_reader_pos.store(0);
70 #if defined(QUILL_X86ARCH) 72 for (uint64_t i = 0; i < (2ull * static_cast<uint64_t>(_capacity)); i += QUILL_CACHE_LINE_SIZE)
74 _mm_clflush(_storage + i);
80 QUILL_THROW(
QuillError{
"Capacity must be at least 1024"});
83 uint64_t
const cache_lines = (_capacity >= 2048) ? 32 : 16;
85 for (uint64_t i = 0; i < cache_lines; ++i)
87 _mm_prefetch(reinterpret_cast<char const*>(_storage + (QUILL_CACHE_LINE_SIZE * i)), _MM_HINT_T0);
92 ~BoundedSPSCQueueImpl() { _free_aligned(_storage); }
97 BoundedSPSCQueueImpl(BoundedSPSCQueueImpl
const&) =
delete;
98 BoundedSPSCQueueImpl& operator=(BoundedSPSCQueueImpl
const&) =
delete;
100 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(integer_type n) noexcept
102 if ((_capacity - static_cast<integer_type>(_writer_pos - _reader_pos_cache)) < n)
105 _reader_pos_cache = _atomic_reader_pos.load(std::memory_order_acquire);
107 if ((_capacity - static_cast<integer_type>(_writer_pos - _reader_pos_cache)) < n)
113 return _storage + (_writer_pos & _mask);
116 QUILL_ATTRIBUTE_HOT
void finish_write(integer_type n) noexcept { _writer_pos += n; }
118 QUILL_ATTRIBUTE_HOT
void commit_write() noexcept
121 _atomic_writer_pos.store(_writer_pos, std::memory_order_release);
123 #if defined(QUILL_X86ARCH) 125 _flush_cachelines(_last_flushed_writer_pos, _writer_pos);
129 reinterpret_cast<char const*>(_storage + (_writer_pos & _mask) + (QUILL_CACHE_LINE_SIZE * 10)), _MM_HINT_T0);
142 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_read() noexcept
149 return _storage + (_reader_pos & _mask);
152 QUILL_ATTRIBUTE_HOT
void finish_read(integer_type n) noexcept { _reader_pos += n; }
154 QUILL_ATTRIBUTE_HOT
void commit_read() noexcept
156 if (static_cast<integer_type>(_reader_pos - _atomic_reader_pos.load(std::memory_order_relaxed)) >= _bytes_per_batch)
158 _atomic_reader_pos.store(_reader_pos, std::memory_order_release);
160 #if defined(QUILL_X86ARCH) 161 _flush_cachelines(_last_flushed_reader_pos, _reader_pos);
170 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool empty() const noexcept
172 if (_writer_pos_cache == _reader_pos)
175 _writer_pos_cache = _atomic_writer_pos.load(std::memory_order_acquire);
177 if (_writer_pos_cache == _reader_pos)
186 QUILL_NODISCARD integer_type capacity()
const noexcept
188 return static_cast<integer_type
>(_capacity);
191 QUILL_NODISCARD HugePagesPolicy huge_pages_policy()
const noexcept {
return _huge_pages_policy; }
194 #if defined(QUILL_X86ARCH) 195 QUILL_ATTRIBUTE_HOT
void _flush_cachelines(integer_type& last, integer_type offset)
197 integer_type last_diff = last - (last & QUILL_CACHE_LINE_MASK);
198 integer_type
const cur_diff = offset - (offset & QUILL_CACHE_LINE_MASK);
200 while (cur_diff > last_diff)
202 _mm_clflushopt(_storage + (last_diff & _mask));
203 last_diff += QUILL_CACHE_LINE_SIZE;
215 QUILL_NODISCARD
static std::byte* _align_pointer(
void* pointer,
size_t alignment) noexcept
217 assert(
is_power_of_two(alignment) &&
"alignment must be a power of two");
218 return reinterpret_cast<std::byte*
>((
reinterpret_cast<uintptr_t
>(pointer) + (alignment - 1ul)) &
232 QUILL_NODISCARD
static void* _alloc_aligned(
size_t size,
size_t alignment,
233 QUILL_MAYBE_UNUSED HugePagesPolicy huge_pages_policy)
236 void* p = _aligned_malloc(size, alignment);
240 QUILL_THROW(
QuillError{std::string{
"_alloc_aligned failed with error message errno: "} +
241 std::to_string(errno)});
247 constexpr
size_t metadata_size{2u *
sizeof(size_t)};
248 size_t const total_size{size + metadata_size + alignment};
251 int flags = MAP_PRIVATE | MAP_ANONYMOUS;
253 #if defined(__linux__) 254 if (huge_pages_policy != HugePagesPolicy::Never)
256 flags |= MAP_HUGETLB;
260 void* mem = ::mmap(
nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
262 #if defined(__linux__) 263 if ((mem == MAP_FAILED) && (huge_pages_policy == HugePagesPolicy::Try))
266 flags &= ~MAP_HUGETLB;
267 mem = ::mmap(
nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
271 if (mem == MAP_FAILED)
273 QUILL_THROW(
QuillError{std::string{
"mmap failed. errno: "} + std::to_string(errno) +
274 " error: " + strerror(errno)});
278 std::byte* aligned_address = _align_pointer(static_cast<std::byte*>(mem) + metadata_size, alignment);
281 auto const offset =
static_cast<size_t>(aligned_address -
static_cast<std::byte*
>(mem));
284 std::memcpy(aligned_address -
sizeof(
size_t), &total_size,
sizeof(total_size));
285 std::memcpy(aligned_address - (2u *
sizeof(
size_t)), &offset,
sizeof(offset));
287 return aligned_address;
295 void static _free_aligned(
void* ptr) noexcept
302 std::memcpy(&offset, static_cast<std::byte*>(ptr) - (2u *
sizeof(
size_t)),
sizeof(offset));
305 std::memcpy(&total_size, static_cast<std::byte*>(ptr) -
sizeof(
size_t),
sizeof(total_size));
308 void* mem =
static_cast<std::byte*
>(ptr) - offset;
310 ::munmap(mem, total_size);
315 static constexpr integer_type QUILL_CACHE_LINE_MASK{QUILL_CACHE_LINE_SIZE - 1};
317 integer_type
const _capacity;
318 integer_type
const _mask;
319 integer_type
const _bytes_per_batch;
320 std::byte*
const _storage{
nullptr};
321 HugePagesPolicy
const _huge_pages_policy;
323 alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_writer_pos{0};
324 alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _writer_pos{0};
325 integer_type _reader_pos_cache{0};
326 integer_type _last_flushed_writer_pos{0};
328 alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_reader_pos{0};
329 alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _reader_pos{0};
330 mutable integer_type _writer_pos_cache{0};
331 integer_type _last_flushed_reader_pos{0};
A bounded single producer single consumer ring buffer.
Definition: BoundedSPSCQueue.h:50
QUILL_ATTRIBUTE_HOT void finish_and_commit_write(integer_type n) noexcept
Finish and commit write as a single function.
Definition: BoundedSPSCQueue.h:136
QUILL_NODISCARD T next_power_of_two(T n) noexcept
Round up to the next power of 2.
Definition: MathUtilities.h:46
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
Only meant to be called by the reader.
Definition: BoundedSPSCQueue.h:170
custom exception
Definition: QuillError.h:45
QUILL_NODISCARD constexpr bool is_power_of_two(uint64_t number) noexcept
Check if a number is a power of 2.
Definition: MathUtilities.h:25