3 #include "quill/core/Attributes.h" 4 #include "quill/core/Common.h" 5 #include "quill/core/MathUtilities.h" 6 #include "quill/core/QuillError.h" 22 #if defined(QUILL_X86ARCH) 26 #if __has_include(<x86gprintrin.h>) 27 #if defined(__GNUC__) && __GNUC__ > 10 28 #include <emmintrin.h> 29 #include <x86gprintrin.h> 30 #elif defined(__clang_major__) 32 #include <immintrin.h> 35 #include <immintrin.h> 36 #include <x86intrin.h> 46 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__) 48 #pragma warning(disable : 4324) 58 using integer_type = T;
61 HugePagesPolicy huge_pages_policy = HugePagesPolicy::Never,
62 integer_type reader_store_percent = 5)
65 _bytes_per_batch(static_cast<integer_type>(static_cast<double>(_capacity * reader_store_percent) / 100.0)),
66 _storage(static_cast<std::byte*>(_alloc_aligned(
67 2ull * static_cast<uint64_t>(_capacity), QUILL_CACHE_LINE_ALIGNED, huge_pages_policy))),
68 _huge_pages_policy(huge_pages_policy)
70 std::memset(_storage, 0, 2ull * static_cast<uint64_t>(_capacity));
72 _atomic_writer_pos.store(0);
73 _atomic_reader_pos.store(0);
75 #if defined(QUILL_X86ARCH) 77 for (uint64_t i = 0; i < (2ull * static_cast<uint64_t>(_capacity)); i += QUILL_CACHE_LINE_SIZE)
79 _mm_clflush(_storage + i);
85 QUILL_THROW(
QuillError{
"Capacity must be at least 1024"});
88 uint64_t
const cache_lines = (_capacity >= 2048) ? 32 : 16;
90 for (uint64_t i = 0; i < cache_lines; ++i)
92 _mm_prefetch(reinterpret_cast<char const*>(_storage + (QUILL_CACHE_LINE_SIZE * i)), _MM_HINT_T0);
97 ~BoundedSPSCQueueImpl() { _free_aligned(_storage); }
102 BoundedSPSCQueueImpl(BoundedSPSCQueueImpl
const&) =
delete;
103 BoundedSPSCQueueImpl& operator=(BoundedSPSCQueueImpl
const&) =
delete;
105 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_write(integer_type n) noexcept
107 if ((_capacity - static_cast<integer_type>(_writer_pos - _reader_pos_cache)) < n)
110 _reader_pos_cache = _atomic_reader_pos.load(std::memory_order_acquire);
112 if ((_capacity - static_cast<integer_type>(_writer_pos - _reader_pos_cache)) < n)
118 return _storage + (_writer_pos & _mask);
121 QUILL_ATTRIBUTE_HOT
void finish_write(integer_type n) noexcept { _writer_pos += n; }
123 QUILL_ATTRIBUTE_HOT
void commit_write() noexcept
126 _atomic_writer_pos.store(_writer_pos, std::memory_order_release);
128 #if defined(QUILL_X86ARCH) 130 _flush_cachelines(_last_flushed_writer_pos, _writer_pos);
134 reinterpret_cast<char const*>(_storage + (_writer_pos & _mask) + (QUILL_CACHE_LINE_SIZE * 10)), _MM_HINT_T0);
147 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* prepare_read() noexcept
154 return _storage + (_reader_pos & _mask);
157 QUILL_ATTRIBUTE_HOT
void finish_read(integer_type n) noexcept { _reader_pos += n; }
159 QUILL_ATTRIBUTE_HOT
void commit_read() noexcept
161 if (static_cast<integer_type>(_reader_pos - _atomic_reader_pos.load(std::memory_order_relaxed)) >= _bytes_per_batch)
163 _atomic_reader_pos.store(_reader_pos, std::memory_order_release);
165 #if defined(QUILL_X86ARCH) 166 _flush_cachelines(_last_flushed_reader_pos, _reader_pos);
175 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool empty() const noexcept
177 if (_writer_pos_cache == _reader_pos)
180 _writer_pos_cache = _atomic_writer_pos.load(std::memory_order_acquire);
182 if (_writer_pos_cache == _reader_pos)
191 QUILL_NODISCARD integer_type capacity()
const noexcept
193 return static_cast<integer_type
>(_capacity);
196 QUILL_NODISCARD HugePagesPolicy huge_pages_policy()
const noexcept {
return _huge_pages_policy; }
199 #if defined(QUILL_X86ARCH) 200 QUILL_ATTRIBUTE_HOT
void _flush_cachelines(integer_type& last, integer_type offset)
202 integer_type last_diff = last - (last & QUILL_CACHE_LINE_MASK);
203 integer_type
const cur_diff = offset - (offset & QUILL_CACHE_LINE_MASK);
205 if (cur_diff > last_diff)
208 std::byte* ptr = _storage + (last_diff & _mask);
213 ptr += QUILL_CACHE_LINE_SIZE;
214 last_diff += QUILL_CACHE_LINE_SIZE;
215 }
while (cur_diff > last_diff);
228 QUILL_NODISCARD
static std::byte* _align_pointer(
void* pointer,
size_t alignment) noexcept
231 "alignment must be a power of two in BoundedSPSCQueue::_align_pointer()");
232 return reinterpret_cast<std::byte*
>((
reinterpret_cast<uintptr_t
>(pointer) + (alignment - 1ul)) &
246 QUILL_NODISCARD
static void* _alloc_aligned(
size_t size,
size_t alignment,
247 QUILL_MAYBE_UNUSED HugePagesPolicy huge_pages_policy)
250 void* p = _aligned_malloc(size, alignment);
254 QUILL_THROW(
QuillError{std::string{
"_alloc_aligned failed with error message errno: "} +
255 std::to_string(errno)});
261 constexpr
size_t metadata_size{2u *
sizeof(size_t)};
262 size_t const total_size{size + metadata_size + alignment};
265 int flags = MAP_PRIVATE | MAP_ANONYMOUS;
267 #if defined(__linux__) 268 if (huge_pages_policy != HugePagesPolicy::Never)
270 flags |= MAP_HUGETLB;
274 void* mem = ::mmap(
nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
276 #if defined(__linux__) 277 if ((mem == MAP_FAILED) && (huge_pages_policy == HugePagesPolicy::Try))
280 flags &= ~MAP_HUGETLB;
281 mem = ::mmap(
nullptr, total_size, PROT_READ | PROT_WRITE, flags, -1, 0);
285 if (mem == MAP_FAILED)
287 QUILL_THROW(
QuillError{std::string{
"mmap failed. errno: "} + std::to_string(errno) +
288 " error: " + std::strerror(errno)});
292 std::byte* aligned_address = _align_pointer(static_cast<std::byte*>(mem) + metadata_size, alignment);
295 auto const offset =
static_cast<size_t>(aligned_address -
static_cast<std::byte*
>(mem));
298 std::memcpy(aligned_address -
sizeof(
size_t), &total_size,
sizeof(total_size));
299 std::memcpy(aligned_address - (2u *
sizeof(
size_t)), &offset,
sizeof(offset));
301 return aligned_address;
309 void static _free_aligned(
void* ptr) noexcept
316 std::memcpy(&offset, static_cast<std::byte*>(ptr) - (2u *
sizeof(
size_t)),
sizeof(offset));
319 std::memcpy(&total_size, static_cast<std::byte*>(ptr) -
sizeof(
size_t),
sizeof(total_size));
322 void* mem =
static_cast<std::byte*
>(ptr) - offset;
324 ::munmap(mem, total_size);
329 static constexpr integer_type QUILL_CACHE_LINE_MASK{QUILL_CACHE_LINE_SIZE - 1};
331 integer_type
const _capacity;
332 integer_type
const _mask;
333 integer_type
const _bytes_per_batch;
334 std::byte*
const _storage{
nullptr};
335 HugePagesPolicy
const _huge_pages_policy;
337 alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_writer_pos{0};
338 alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _writer_pos{0};
339 integer_type _reader_pos_cache{0};
340 integer_type _last_flushed_writer_pos{0};
342 alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<integer_type> _atomic_reader_pos{0};
343 alignas(QUILL_CACHE_LINE_ALIGNED) integer_type _reader_pos{0};
344 mutable integer_type _writer_pos_cache{0};
345 integer_type _last_flushed_reader_pos{0};
350 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__) A bounded single producer single consumer ring buffer.
Definition: BoundedSPSCQueue.h:55
QUILL_ATTRIBUTE_HOT void finish_and_commit_write(integer_type n) noexcept
Finish and commit write as a single function.
Definition: BoundedSPSCQueue.h:141
QUILL_NODISCARD T next_power_of_two(T n) noexcept
Round up to the next power of 2.
Definition: MathUtilities.h:45
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool empty() const noexcept
Only meant to be called by the reader.
Definition: BoundedSPSCQueue.h:175
custom exception
Definition: QuillError.h:45
QUILL_NODISCARD constexpr bool is_power_of_two(uint64_t number) noexcept
Check if a number is a power of 2.
Definition: MathUtilities.h:24