9 #include "quill/core/Attributes.h" 10 #include "quill/core/BoundedSPSCQueue.h" 11 #include "quill/core/Common.h" 12 #include "quill/core/InlinedVector.h" 13 #include "quill/core/Spinlock.h" 14 #include "quill/core/UnboundedSPSCQueue.h" 23 #include <string_view> 24 #include <type_traits> 32 class TransitEventBuffer;
35 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__) 36 #pragma GCC diagnostic push 37 #pragma GCC diagnostic ignored "-Wredundant-decls" 41 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED
extern std::string
get_thread_name();
42 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED
extern uint32_t
get_thread_id() noexcept;
44 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__) 45 #pragma GCC diagnostic pop 62 ThreadContext(QueueType queue_type,
size_t initial_queue_capacity,
63 QUILL_MAYBE_UNUSED
size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
64 : _queue_type(queue_type)
66 if (has_unbounded_queue_type())
68 new (&_spsc_queue_union.unbounded_spsc_queue)
69 UnboundedSPSCQueue{initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy};
71 else if (has_bounded_queue_type())
73 new (&_spsc_queue_union.bounded_spsc_queue)
BoundedSPSCQueue{initial_queue_capacity, huge_pages_policy};
84 if (has_unbounded_queue_type())
86 _spsc_queue_union.unbounded_spsc_queue.~UnboundedSPSCQueue();
88 else if (has_bounded_queue_type())
90 _spsc_queue_union.bounded_spsc_queue.~BoundedSPSCQueue();
95 template <QueueType queue_type>
96 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue>& get_spsc_queue() noexcept
98 assert((_queue_type == queue_type) &&
"ThreadContext queue_type mismatch");
100 if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
102 return _spsc_queue_union.unbounded_spsc_queue;
106 return _spsc_queue_union.bounded_spsc_queue;
111 template <QueueType queue_type>
112 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue>
const& get_spsc_queue()
115 assert((_queue_type == queue_type) &&
"ThreadContext queue_type mismatch");
117 if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
119 return _spsc_queue_union.unbounded_spsc_queue;
123 return _spsc_queue_union.bounded_spsc_queue;
128 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
SizeCacheVector& get_conditional_arg_size_cache() noexcept
130 return _conditional_arg_size_cache;
134 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_bounded_queue_type()
const noexcept
136 return (_queue_type == QueueType::BoundedBlocking) || (_queue_type == QueueType::BoundedDropping);
140 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_unbounded_queue_type()
const noexcept
142 return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::UnboundedDropping);
146 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_dropping_queue()
const noexcept
148 return (_queue_type == QueueType::UnboundedDropping) || (_queue_type == QueueType::BoundedDropping);
152 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_blocking_queue()
const noexcept
154 return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::BoundedBlocking);
158 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion
const& get_spsc_queue_union()
const noexcept
160 return _spsc_queue_union;
164 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion& get_spsc_queue_union() noexcept
166 return _spsc_queue_union;
170 QUILL_NODISCARD std::string_view thread_id()
const noexcept {
return _thread_id; }
173 QUILL_NODISCARD std::string_view thread_name()
const noexcept {
return _thread_name; }
176 void mark_invalid() noexcept { _valid.store(
false, std::memory_order_relaxed); }
179 QUILL_NODISCARD
bool is_valid()
const noexcept {
return _valid.load(std::memory_order_relaxed); }
182 void increment_failure_counter() noexcept
184 _failure_counter.fetch_add(1, std::memory_order_relaxed);
188 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
size_t get_and_reset_failure_counter() noexcept
190 if (QUILL_LIKELY(_failure_counter.load(std::memory_order_relaxed) == 0))
194 return _failure_counter.exchange(0, std::memory_order_relaxed);
200 SpscQueueUnion _spsc_queue_union;
204 std::shared_ptr<TransitEventBuffer> _transit_event_buffer;
205 QueueType _queue_type;
206 std::atomic<bool> _valid{
true};
207 alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<size_t> _failure_counter{0};
225 template <
typename TCallback>
226 void for_each_thread_context(TCallback cb)
230 for (
auto const& elem : _thread_contexts)
237 void register_thread_context(std::shared_ptr<ThreadContext>
const& thread_context)
240 _thread_contexts.push_back(thread_context);
242 _new_thread_context_flag.store(
true, std::memory_order_release);
246 void add_invalid_thread_context() noexcept
248 _invalid_thread_context_count.fetch_add(1, std::memory_order_relaxed);
252 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool has_invalid_thread_context()
const noexcept
257 return _invalid_thread_context_count.load(std::memory_order_relaxed) != 0;
261 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
bool new_thread_context_flag() noexcept
264 if (_new_thread_context_flag.load(std::memory_order_relaxed))
269 _new_thread_context_flag.store(
false, std::memory_order_relaxed);
276 void remove_shared_invalidated_thread_context(
ThreadContext const* thread_context)
283 auto thread_context_it = _thread_contexts.end();
284 for (
auto it = _thread_contexts.begin(); it != _thread_contexts.end(); ++it)
286 if (it->get() == thread_context)
288 thread_context_it = it;
293 assert(thread_context_it != _thread_contexts.end() &&
294 "Attempting to remove a non existent thread context");
296 assert(!thread_context_it->get()->is_valid() &&
"Attempting to remove a valid thread context");
299 assert(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type());
301 if (thread_context->has_unbounded_queue_type())
303 assert(thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty() &&
304 "Attempting to remove a thread context with a non empty queue");
306 else if (thread_context->has_bounded_queue_type())
308 assert(thread_context->get_spsc_queue_union().bounded_spsc_queue.empty() &&
309 "Attempting to remove a thread context with a non empty queue");
313 _thread_contexts.erase(thread_context_it);
316 _invalid_thread_context_count.fetch_sub(1, std::memory_order_relaxed);
324 std::vector<std::shared_ptr<ThreadContext>> _thread_contexts;
326 std::atomic<bool> _new_thread_context_flag{
false};
327 std::atomic<uint8_t> _invalid_thread_context_count{0};
335 size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
336 : _thread_context(std::make_shared<ThreadContext>(
337 queue_type, initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy))
345 thread_local
bool thread_local_instance_created =
false;
347 assert(!thread_local_instance_created &&
348 R
"(ScopedThreadContext can only be instantiated once per thread. It appears you may be combining default FrontendOptions with custom FrontendOptions. Ensure only one set of FrontendOptions is used to maintain a single thread context per thread.)"); 350 thread_local_instance_created = true;
353 ThreadContextManager::instance().register_thread_context(_thread_context);
369 _thread_context->mark_invalid();
372 ThreadContextManager::instance().add_invalid_thread_context();
376 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
ThreadContext* get_thread_context()
const noexcept
378 assert(_thread_context &&
"_thread_context can not be null");
379 return _thread_context.get();
387 std::shared_ptr<ThreadContext> _thread_context;
391 template <
typename TFrontendOptions>
392 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT
ThreadContext* get_local_thread_context() noexcept
395 TFrontendOptions::queue_type, TFrontendOptions::initial_queue_capacity,
396 TFrontendOptions::unbounded_queue_max_capacity, TFrontendOptions::huge_pages_policy};
398 return scoped_thread_context.get_thread_context();
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:37
Definition: ThreadContextManager.h:330
Definition: ThreadContextManager.h:210
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED std::string get_thread_name()
Returns the name of the thread.
Definition: ThreadUtilities.h:147
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
Definition: Spinlock.h:18
Definition: Spinlock.h:58
Definition: BackendWorker.h:72
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED uint32_t get_thread_id() noexcept
Returns the os assigned ID of the thread.
Definition: ThreadUtilities.h:193
Definition: ThreadContextManager.h:48