quill
ThreadContextManager.h
1 
7 #pragma once
8 
9 #include "quill/core/Attributes.h"
10 #include "quill/core/BoundedSPSCQueue.h"
11 #include "quill/core/Common.h"
12 #include "quill/core/InlinedVector.h"
13 #include "quill/core/Spinlock.h"
14 #include "quill/core/UnboundedSPSCQueue.h"
15 
16 #include <atomic>
17 #include <cstdint>
18 #include <cstdlib>
19 #include <memory>
20 #include <new>
21 #include <string>
22 #include <string_view>
23 #include <type_traits>
24 #include <vector>
25 
26 QUILL_BEGIN_NAMESPACE
27 
28 namespace detail
29 {
30 
31 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
32 #pragma warning(push)
33 #pragma warning(disable : 4324)
34 #endif
35 
37 class TransitEventBuffer;
38 class BackendWorker;
39 
40 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__)
41  #pragma GCC diagnostic push
42  #pragma GCC diagnostic ignored "-Wredundant-decls"
43 #endif
44 
46 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED extern std::string get_thread_name();
47 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED extern uint32_t get_thread_id() noexcept;
48 
49 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__)
50  #pragma GCC diagnostic pop
51 #endif
52 
54 {
55 private:
56  union SpscQueueUnion
57  {
58  UnboundedSPSCQueue unbounded_spsc_queue;
59  BoundedSPSCQueue bounded_spsc_queue;
60 
61  SpscQueueUnion() {}
62  ~SpscQueueUnion() {}
63  };
64 
65 public:
66  /***/
67  ThreadContext(QueueType queue_type, size_t initial_queue_capacity,
68  QUILL_MAYBE_UNUSED size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
69  : _queue_type(queue_type)
70  {
71  if (has_unbounded_queue_type())
72  {
73  new (&_spsc_queue_union.unbounded_spsc_queue)
74  UnboundedSPSCQueue{initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy};
75  }
76  else if (has_bounded_queue_type())
77  {
78  new (&_spsc_queue_union.bounded_spsc_queue) BoundedSPSCQueue{initial_queue_capacity, huge_pages_policy};
79  }
80  }
81 
82  /***/
83  ThreadContext(ThreadContext const&) = delete;
84  ThreadContext& operator=(ThreadContext const&) = delete;
85 
86  /***/
87  ~ThreadContext()
88  {
89  if (has_unbounded_queue_type())
90  {
91  _spsc_queue_union.unbounded_spsc_queue.~UnboundedSPSCQueue();
92  }
93  else if (has_bounded_queue_type())
94  {
95  _spsc_queue_union.bounded_spsc_queue.~BoundedSPSCQueue();
96  }
97  }
98 
99  /***/
100  template <QueueType queue_type>
101  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue>& get_spsc_queue() noexcept
102  {
103  QUILL_ASSERT(_queue_type == queue_type, "ThreadContext queue_type mismatch in get_spsc_queue()");
104 
105  if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
106  {
107  return _spsc_queue_union.unbounded_spsc_queue;
108  }
109  else
110  {
111  return _spsc_queue_union.bounded_spsc_queue;
112  }
113  }
114 
115  /***/
116  template <QueueType queue_type>
117  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue> const& get_spsc_queue()
118  const noexcept
119  {
120  QUILL_ASSERT(_queue_type == queue_type,
121  "ThreadContext queue_type mismatch in get_spsc_queue() const");
122 
123  if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
124  {
125  return _spsc_queue_union.unbounded_spsc_queue;
126  }
127  else
128  {
129  return _spsc_queue_union.bounded_spsc_queue;
130  }
131  }
132 
133  /***/
134  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SizeCacheVector& get_conditional_arg_size_cache() noexcept
135  {
136  return _conditional_arg_size_cache;
137  }
138 
139  /***/
140  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_bounded_queue_type() const noexcept
141  {
142  return (_queue_type == QueueType::BoundedBlocking) || (_queue_type == QueueType::BoundedDropping);
143  }
144 
145  /***/
146  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_unbounded_queue_type() const noexcept
147  {
148  return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::UnboundedDropping);
149  }
150 
151  /***/
152  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_dropping_queue() const noexcept
153  {
154  return (_queue_type == QueueType::UnboundedDropping) || (_queue_type == QueueType::BoundedDropping);
155  }
156 
157  /***/
158  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_blocking_queue() const noexcept
159  {
160  return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::BoundedBlocking);
161  }
162 
163  /***/
164  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion const& get_spsc_queue_union() const noexcept
165  {
166  return _spsc_queue_union;
167  }
168 
169  /***/
170  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion& get_spsc_queue_union() noexcept
171  {
172  return _spsc_queue_union;
173  }
174 
175  /***/
176  QUILL_NODISCARD std::string_view thread_id() const noexcept { return _thread_id; }
177 
178  /***/
179  QUILL_NODISCARD std::string_view thread_name() const noexcept { return _thread_name; }
180 
181  /***/
182  void mark_invalid() noexcept { _valid.store(false, std::memory_order_relaxed); }
183 
184  /***/
185  QUILL_NODISCARD bool is_valid() const noexcept { return _valid.load(std::memory_order_relaxed); }
186 
187  /***/
188  void increment_failure_counter() noexcept
189  {
190  _failure_counter.fetch_add(1, std::memory_order_relaxed);
191  }
192 
193  /***/
194  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t get_and_reset_failure_counter() noexcept
195  {
196  if (QUILL_LIKELY(_failure_counter.load(std::memory_order_relaxed) == 0))
197  {
198  return 0;
199  }
200  return _failure_counter.exchange(0, std::memory_order_relaxed);
201  }
202 
203 private:
204  friend class detail::BackendWorker;
205 
206  SpscQueueUnion _spsc_queue_union;
207  SizeCacheVector _conditional_arg_size_cache;
208  std::string _thread_id = std::to_string(get_thread_id());
209  std::string _thread_name = get_thread_name();
210  std::shared_ptr<TransitEventBuffer> _transit_event_buffer;
211  QueueType _queue_type;
212  std::atomic<bool> _valid{true};
213  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<size_t> _failure_counter{0};
214 };
215 
217 {
218 public:
219  /***/
220  QUILL_EXPORT static ThreadContextManager& instance() noexcept
221  {
222  static ThreadContextManager instance;
223  return instance;
224  }
225 
226  /***/
228  ThreadContextManager& operator=(ThreadContextManager const&) = delete;
229 
230  /***/
231  template <typename TCallback>
232  void for_each_thread_context(TCallback cb)
233  {
234  LockGuard const lock{_spinlock};
235 
236  for (auto const& elem : _thread_contexts)
237  {
238  cb(elem.get());
239  }
240  }
241 
242  /***/
243  void register_thread_context(std::shared_ptr<ThreadContext> const& thread_context)
244  {
245  _spinlock.lock();
246  _thread_contexts.push_back(thread_context);
247  _spinlock.unlock();
248  _new_thread_context_flag.store(true, std::memory_order_release);
249  }
250 
251  /***/
252  void add_invalid_thread_context() noexcept
253  {
254  _invalid_thread_context_count.fetch_add(1, std::memory_order_relaxed);
255  }
256 
257  /***/
258  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_invalid_thread_context() const noexcept
259  {
260  // Here we do relaxed because if the value is not zero we will look inside ThreadContext invalid
261  // flag that is also a relaxed atomic, and then we will look into the SPSC queue size that is
262  // also atomic Even if we don't read everything in order we will check again in the next circle
263  return _invalid_thread_context_count.load(std::memory_order_relaxed) != 0;
264  }
265 
266  /***/
267  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool new_thread_context_flag() noexcept
268  {
269  // Again relaxed memory model as in case it is false we will acquire the lock
270  if (_new_thread_context_flag.load(std::memory_order_relaxed))
271  {
272  // if the variable was updated to true, set it to false,
273  // There should not be any race condition here as this is the only place _changed is set to
274  // false, and we will return true anyway
275  _new_thread_context_flag.store(false, std::memory_order_relaxed);
276  return true;
277  }
278  return false;
279  }
280 
281  /***/
282  void remove_shared_invalidated_thread_context(ThreadContext const* thread_context)
283  {
284  LockGuard const lock{_spinlock};
285 
286  // We could use std::find_if, but since this header is included in Logger.h, which is essential
287  // for logging purposes, we aim to minimize the number of includes in that path.
288  // Therefore, we implement our own find_if loop here.
289  auto thread_context_it = _thread_contexts.end();
290  for (auto it = _thread_contexts.begin(); it != _thread_contexts.end(); ++it)
291  {
292  if (it->get() == thread_context)
293  {
294  thread_context_it = it;
295  break;
296  }
297  }
298 
299  QUILL_ASSERT(thread_context_it != _thread_contexts.end(),
300  "Attempting to remove a non-existent thread context in "
301  "ThreadContextManager::unregister_thread_context()");
302 
303  QUILL_ASSERT(!thread_context_it->get()->is_valid(),
304  "Attempting to remove a valid thread context in "
305  "ThreadContextManager::unregister_thread_context()");
306 
307  QUILL_ASSERT(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type(),
308  "Invalid queue type in ThreadContextManager::unregister_thread_context()");
309 
310  if (thread_context->has_unbounded_queue_type())
311  {
312  QUILL_ASSERT(thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty(),
313  "Attempting to remove a thread context with a non-empty unbounded queue in "
314  "ThreadContextManager::unregister_thread_context()");
315  }
316  else if (thread_context->has_bounded_queue_type())
317  {
318  QUILL_ASSERT(thread_context->get_spsc_queue_union().bounded_spsc_queue.empty(),
319  "Attempting to remove a thread context with a non-empty bounded queue in "
320  "ThreadContextManager::unregister_thread_context()");
321  }
322 
323  _thread_contexts.erase(thread_context_it);
324 
325  // Decrement the counter since we found something to
326  _invalid_thread_context_count.fetch_sub(1, std::memory_order_relaxed);
327  }
328 
329 private:
330  ThreadContextManager() = default;
331  ~ThreadContextManager() = default;
332 
333 private:
334  std::vector<std::shared_ptr<ThreadContext>> _thread_contexts;
335  Spinlock _spinlock;
336  std::atomic<bool> _new_thread_context_flag{false};
337  std::atomic<uint8_t> _invalid_thread_context_count{0};
338 };
339 
341 {
342 public:
343  /***/
344  ScopedThreadContext(QueueType queue_type, size_t initial_queue_capacity,
345  size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
346  : _thread_context(std::make_shared<ThreadContext>(
347  queue_type, initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy))
348  {
349 #if defined(QUILL_ENABLE_ASSERTIONS) || !defined(NDEBUG)
350  // Thread-local flag to track if an instance has been created for this thread.
351  // This ensures that get_local_thread_context() is not called with different template arguments
352  // when using custom FrontendOptions. Having multiple thread contexts in a single thread is fine
353  // and functional but goes against the design principle of maintaining a single thread context
354  // per thread.
355  thread_local bool thread_local_instance_created = false;
356 
357  QUILL_ASSERT(!thread_local_instance_created,
358  R"(ScopedThreadContext can only be instantiated once per thread. It appears you may be combining default FrontendOptions with custom FrontendOptions. Ensure only one set of FrontendOptions is used to maintain a single thread context per thread.)");
359 
360  thread_local_instance_created = true;
361 #endif
362 
363  ThreadContextManager::instance().register_thread_context(_thread_context);
364  }
365 
366  /***/
367  ScopedThreadContext(ScopedThreadContext const&) = delete;
368  ScopedThreadContext& operator=(ScopedThreadContext const&) = delete;
369 
370  /***/
371  ~ScopedThreadContext() noexcept
372  {
373  // This destructor will get called when the thread that created this wrapper stops
374  // we will only invalidate the thread context
375  // The backend thread will empty an invalidated ThreadContext and then remove_file it from
376  // the ThreadContextCollection
377  // There is only exception for the thread who owns the ThreadContextCollection the
378  // main thread. The thread context of the main thread can get deleted before getting invalidated
379  _thread_context->mark_invalid();
380 
381  // Notify the backend thread that one context has been removed
382  ThreadContextManager::instance().add_invalid_thread_context();
383  }
384 
385  /***/
386  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ThreadContext* get_thread_context() const noexcept
387  {
388  QUILL_ASSERT(_thread_context,
389  "_thread_context cannot be null in ScopedThreadContext::get_thread_context()");
390  return _thread_context.get();
391  }
392 
393 private:
398  std::shared_ptr<ThreadContext> _thread_context;
399 };
400 
405 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT QUILL_EXPORT inline ThreadContext* get_scoped_thread_context_impl(
406  QueueType queue_type, size_t initial_queue_capacity, size_t unbounded_queue_max_capacity,
407  HugePagesPolicy huge_pages_policy) noexcept
408 {
409  thread_local ScopedThreadContext scoped_thread_context{
410  queue_type, initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy};
411 
412  return scoped_thread_context.get_thread_context();
413 }
414 
415 /***/
416 template <typename TFrontendOptions>
417 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ThreadContext* get_local_thread_context() noexcept
418 {
420  TFrontendOptions::queue_type, TFrontendOptions::initial_queue_capacity,
421  TFrontendOptions::unbounded_queue_max_capacity, TFrontendOptions::huge_pages_policy);
422 }
423 
424 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
425 #pragma warning(pop)
426 #endif
427 
428 } // namespace detail
429 
430 QUILL_END_NAMESPACE
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:42
Definition: ThreadContextManager.h:340
Definition: ThreadContextManager.h:216
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED std::string get_thread_name()
Returns the name of the thread.
Definition: ThreadUtilities.h:148
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT QUILL_EXPORT ThreadContext * get_scoped_thread_context_impl(QueueType queue_type, size_t initial_queue_capacity, size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy) noexcept
Non-template implementation that owns the thread local context.
Definition: ThreadContextManager.h:405
Definition: Spinlock.h:18
Definition: Spinlock.h:58
Definition: BackendWorker.h:77
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED uint32_t get_thread_id() noexcept
Returns the os assigned ID of the thread.
Definition: ThreadUtilities.h:198
Definition: ThreadContextManager.h:53