9 #include "quill/core/Attributes.h" 10 #include "quill/core/BoundedSPSCQueue.h" 11 #include "quill/core/Common.h" 12 #include "quill/core/InlinedVector.h" 13 #include "quill/core/Spinlock.h" 14 #include "quill/core/UnboundedSPSCQueue.h" 22 #include <string_view> 23 #include <type_traits> 31 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__) 33 #pragma warning(disable : 4324) 37 class TransitEventBuffer;
40 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__) 41 #pragma GCC diagnostic push 42 #pragma GCC diagnostic ignored "-Wredundant-decls" 46 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED
extern std::string
get_thread_name();
47 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED
extern uint32_t
get_thread_id() noexcept;
49 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__) 50 #pragma GCC diagnostic pop 67 ThreadContext(QueueType queue_type,
size_t initial_queue_capacity,
68 QUILL_MAYBE_UNUSED
size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
69 : _queue_type(queue_type)
71 if (has_unbounded_queue_type())
73 new (&_spsc_queue_union.unbounded_spsc_queue)
74 UnboundedSPSCQueue{initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy};
76 else if (has_bounded_queue_type())
78 new (&_spsc_queue_union.bounded_spsc_queue)
BoundedSPSCQueue{initial_queue_capacity, huge_pages_policy};
89 if (has_unbounded_queue_type())
91 _spsc_queue_union.unbounded_spsc_queue.~UnboundedSPSCQueue();
93 else if (has_bounded_queue_type())
95 _spsc_queue_union.bounded_spsc_queue.~BoundedSPSCQueue();
100 template <QueueType queue_type>
101 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue>& get_spsc_queue() noexcept
103 QUILL_ASSERT(_queue_type == queue_type,
"ThreadContext queue_type mismatch in get_spsc_queue()");
105 if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
107 return _spsc_queue_union.unbounded_spsc_queue;
111 return _spsc_queue_union.bounded_spsc_queue;
116 template <QueueType queue_type>
117 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue>
const& get_spsc_queue()
120 QUILL_ASSERT(_queue_type == queue_type,
121 "ThreadContext queue_type mismatch in get_spsc_queue() const");
123 if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
125 return _spsc_queue_union.unbounded_spsc_queue;
129 return _spsc_queue_union.bounded_spsc_queue;
134 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
SizeCacheVector& get_conditional_arg_size_cache() noexcept
136 return _conditional_arg_size_cache;
140 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_bounded_queue_type()
const noexcept
142 return (_queue_type == QueueType::BoundedBlocking) || (_queue_type == QueueType::BoundedDropping);
146 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_unbounded_queue_type()
const noexcept
148 return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::UnboundedDropping);
152 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_dropping_queue()
const noexcept
154 return (_queue_type == QueueType::UnboundedDropping) || (_queue_type == QueueType::BoundedDropping);
158 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_blocking_queue()
const noexcept
160 return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::BoundedBlocking);
164 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion
const& get_spsc_queue_union()
const noexcept
166 return _spsc_queue_union;
170 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion& get_spsc_queue_union() noexcept
172 return _spsc_queue_union;
176 QUILL_NODISCARD std::string_view thread_id()
const noexcept {
return _thread_id; }
179 QUILL_NODISCARD std::string_view thread_name()
const noexcept {
return _thread_name; }
182 void mark_invalid() noexcept { _valid.store(
false, std::memory_order_relaxed); }
185 QUILL_NODISCARD
bool is_valid()
const noexcept {
return _valid.load(std::memory_order_relaxed); }
188 void increment_failure_counter() noexcept
190 _failure_counter.fetch_add(1, std::memory_order_relaxed);
194 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
size_t get_and_reset_failure_counter() noexcept
196 if (QUILL_LIKELY(_failure_counter.load(std::memory_order_relaxed) == 0))
200 return _failure_counter.exchange(0, std::memory_order_relaxed);
206 SpscQueueUnion _spsc_queue_union;
210 std::shared_ptr<TransitEventBuffer> _transit_event_buffer;
211 QueueType _queue_type;
212 std::atomic<bool> _valid{
true};
213 alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<size_t> _failure_counter{0};
231 template <
typename TCallback>
232 void for_each_thread_context(TCallback cb)
236 for (
auto const& elem : _thread_contexts)
243 void register_thread_context(std::shared_ptr<ThreadContext>
const& thread_context)
246 _thread_contexts.push_back(thread_context);
248 _new_thread_context_flag.store(
true, std::memory_order_release);
252 void add_invalid_thread_context() noexcept
254 _invalid_thread_context_count.fetch_add(1, std::memory_order_relaxed);
258 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_invalid_thread_context()
const noexcept
263 return _invalid_thread_context_count.load(std::memory_order_relaxed) != 0;
267 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool new_thread_context_flag() noexcept
270 if (_new_thread_context_flag.load(std::memory_order_relaxed))
275 _new_thread_context_flag.store(
false, std::memory_order_relaxed);
282 void remove_shared_invalidated_thread_context(
ThreadContext const* thread_context)
289 auto thread_context_it = _thread_contexts.end();
290 for (
auto it = _thread_contexts.begin(); it != _thread_contexts.end(); ++it)
292 if (it->get() == thread_context)
294 thread_context_it = it;
299 QUILL_ASSERT(thread_context_it != _thread_contexts.end(),
300 "Attempting to remove a non-existent thread context in " 301 "ThreadContextManager::unregister_thread_context()");
303 QUILL_ASSERT(!thread_context_it->get()->is_valid(),
304 "Attempting to remove a valid thread context in " 305 "ThreadContextManager::unregister_thread_context()");
307 QUILL_ASSERT(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type(),
308 "Invalid queue type in ThreadContextManager::unregister_thread_context()");
310 if (thread_context->has_unbounded_queue_type())
312 QUILL_ASSERT(thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty(),
313 "Attempting to remove a thread context with a non-empty unbounded queue in " 314 "ThreadContextManager::unregister_thread_context()");
316 else if (thread_context->has_bounded_queue_type())
318 QUILL_ASSERT(thread_context->get_spsc_queue_union().bounded_spsc_queue.empty(),
319 "Attempting to remove a thread context with a non-empty bounded queue in " 320 "ThreadContextManager::unregister_thread_context()");
323 _thread_contexts.erase(thread_context_it);
326 _invalid_thread_context_count.fetch_sub(1, std::memory_order_relaxed);
334 std::vector<std::shared_ptr<ThreadContext>> _thread_contexts;
336 std::atomic<bool> _new_thread_context_flag{
false};
337 std::atomic<uint8_t> _invalid_thread_context_count{0};
345 size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
346 : _thread_context(std::make_shared<ThreadContext>(
347 queue_type, initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy))
349 #if defined(QUILL_ENABLE_ASSERTIONS) || !defined(NDEBUG) 355 thread_local
bool thread_local_instance_created =
false;
357 QUILL_ASSERT(!thread_local_instance_created,
358 R
"(ScopedThreadContext can only be instantiated once per thread. It appears you may be combining default FrontendOptions with custom FrontendOptions. Ensure only one set of FrontendOptions is used to maintain a single thread context per thread.)"); 360 thread_local_instance_created = true;
363 ThreadContextManager::instance().register_thread_context(_thread_context);
379 _thread_context->mark_invalid();
382 ThreadContextManager::instance().add_invalid_thread_context();
386 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
ThreadContext* get_thread_context()
const noexcept
388 QUILL_ASSERT(_thread_context,
389 "_thread_context cannot be null in ScopedThreadContext::get_thread_context()");
390 return _thread_context.get();
398 std::shared_ptr<ThreadContext> _thread_context;
406 QueueType queue_type,
size_t initial_queue_capacity,
size_t unbounded_queue_max_capacity,
407 HugePagesPolicy huge_pages_policy) noexcept
410 queue_type, initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy};
412 return scoped_thread_context.get_thread_context();
416 template <
typename TFrontendOptions>
417 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
ThreadContext* get_local_thread_context() noexcept
420 TFrontendOptions::queue_type, TFrontendOptions::initial_queue_capacity,
421 TFrontendOptions::unbounded_queue_max_capacity, TFrontendOptions::huge_pages_policy);
424 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:42
Definition: ThreadContextManager.h:340
Definition: ThreadContextManager.h:216
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED std::string get_thread_name()
Returns the name of the thread.
Definition: ThreadUtilities.h:148
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT QUILL_EXPORT ThreadContext * get_scoped_thread_context_impl(QueueType queue_type, size_t initial_queue_capacity, size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy) noexcept
Non-template implementation that owns the thread local context.
Definition: ThreadContextManager.h:405
Definition: Spinlock.h:18
Definition: Spinlock.h:58
Definition: BackendWorker.h:77
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED uint32_t get_thread_id() noexcept
Returns the os assigned ID of the thread.
Definition: ThreadUtilities.h:198
Definition: ThreadContextManager.h:53