quill
ThreadContextManager.h
1 
7 #pragma once
8 
9 #include "quill/core/Attributes.h"
10 #include "quill/core/BoundedSPSCQueue.h"
11 #include "quill/core/Common.h"
12 #include "quill/core/InlinedVector.h"
13 #include "quill/core/Spinlock.h"
14 #include "quill/core/UnboundedSPSCQueue.h"
15 
16 #include <atomic>
17 #include <cassert>
18 #include <cstdint>
19 #include <cstdlib>
20 #include <memory>
21 #include <new>
22 #include <string>
23 #include <string_view>
24 #include <type_traits>
25 #include <vector>
26 
27 QUILL_BEGIN_NAMESPACE
28 
29 namespace detail
30 {
32 class TransitEventBuffer;
33 class BackendWorker;
34 
35 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__)
36  #pragma GCC diagnostic push
37  #pragma GCC diagnostic ignored "-Wredundant-decls"
38 #endif
39 
41 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED extern std::string get_thread_name();
42 QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED extern uint32_t get_thread_id() noexcept;
43 
44 #if defined(__GNUC__) || defined(__clang__) || defined(__MINGW32__)
45  #pragma GCC diagnostic pop
46 #endif
47 
49 {
50 private:
51  union SpscQueueUnion
52  {
53  UnboundedSPSCQueue unbounded_spsc_queue;
54  BoundedSPSCQueue bounded_spsc_queue;
55 
56  SpscQueueUnion() {}
57  ~SpscQueueUnion() {}
58  };
59 
60 public:
61  /***/
62  ThreadContext(QueueType queue_type, size_t initial_queue_capacity,
63  QUILL_MAYBE_UNUSED size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
64  : _queue_type(queue_type)
65  {
66  if (has_unbounded_queue_type())
67  {
68  new (&_spsc_queue_union.unbounded_spsc_queue)
69  UnboundedSPSCQueue{initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy};
70  }
71  else if (has_bounded_queue_type())
72  {
73  new (&_spsc_queue_union.bounded_spsc_queue) BoundedSPSCQueue{initial_queue_capacity, huge_pages_policy};
74  }
75  }
76 
77  /***/
78  ThreadContext(ThreadContext const&) = delete;
79  ThreadContext& operator=(ThreadContext const&) = delete;
80 
81  /***/
82  ~ThreadContext()
83  {
84  if (has_unbounded_queue_type())
85  {
86  _spsc_queue_union.unbounded_spsc_queue.~UnboundedSPSCQueue();
87  }
88  else if (has_bounded_queue_type())
89  {
90  _spsc_queue_union.bounded_spsc_queue.~BoundedSPSCQueue();
91  }
92  }
93 
94  /***/
95  template <QueueType queue_type>
96  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue>& get_spsc_queue() noexcept
97  {
98  assert((_queue_type == queue_type) && "ThreadContext queue_type mismatch");
99 
100  if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
101  {
102  return _spsc_queue_union.unbounded_spsc_queue;
103  }
104  else
105  {
106  return _spsc_queue_union.bounded_spsc_queue;
107  }
108  }
109 
110  /***/
111  template <QueueType queue_type>
112  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::conditional_t<(queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping), UnboundedSPSCQueue, BoundedSPSCQueue> const& get_spsc_queue()
113  const noexcept
114  {
115  assert((_queue_type == queue_type) && "ThreadContext queue_type mismatch");
116 
117  if constexpr ((queue_type == QueueType::UnboundedBlocking) || (queue_type == QueueType::UnboundedDropping))
118  {
119  return _spsc_queue_union.unbounded_spsc_queue;
120  }
121  else
122  {
123  return _spsc_queue_union.bounded_spsc_queue;
124  }
125  }
126 
127  /***/
128  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SizeCacheVector& get_conditional_arg_size_cache() noexcept
129  {
130  return _conditional_arg_size_cache;
131  }
132 
133  /***/
134  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_bounded_queue_type() const noexcept
135  {
136  return (_queue_type == QueueType::BoundedBlocking) || (_queue_type == QueueType::BoundedDropping);
137  }
138 
139  /***/
140  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_unbounded_queue_type() const noexcept
141  {
142  return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::UnboundedDropping);
143  }
144 
145  /***/
146  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_dropping_queue() const noexcept
147  {
148  return (_queue_type == QueueType::UnboundedDropping) || (_queue_type == QueueType::BoundedDropping);
149  }
150 
151  /***/
152  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_blocking_queue() const noexcept
153  {
154  return (_queue_type == QueueType::UnboundedBlocking) || (_queue_type == QueueType::BoundedBlocking);
155  }
156 
157  /***/
158  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion const& get_spsc_queue_union() const noexcept
159  {
160  return _spsc_queue_union;
161  }
162 
163  /***/
164  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT SpscQueueUnion& get_spsc_queue_union() noexcept
165  {
166  return _spsc_queue_union;
167  }
168 
169  /***/
170  QUILL_NODISCARD std::string_view thread_id() const noexcept { return _thread_id; }
171 
172  /***/
173  QUILL_NODISCARD std::string_view thread_name() const noexcept { return _thread_name; }
174 
175  /***/
176  void mark_invalid() noexcept { _valid.store(false, std::memory_order_relaxed); }
177 
178  /***/
179  QUILL_NODISCARD bool is_valid() const noexcept { return _valid.load(std::memory_order_relaxed); }
180 
181  /***/
182  void increment_failure_counter() noexcept
183  {
184  _failure_counter.fetch_add(1, std::memory_order_relaxed);
185  }
186 
187  /***/
188  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT size_t get_and_reset_failure_counter() noexcept
189  {
190  if (QUILL_LIKELY(_failure_counter.load(std::memory_order_relaxed) == 0))
191  {
192  return 0;
193  }
194  return _failure_counter.exchange(0, std::memory_order_relaxed);
195  }
196 
197 private:
198  friend class detail::BackendWorker;
199 
200  SpscQueueUnion _spsc_queue_union;
201  SizeCacheVector _conditional_arg_size_cache;
202  std::string _thread_id = std::to_string(get_thread_id());
203  std::string _thread_name = get_thread_name();
204  std::shared_ptr<TransitEventBuffer> _transit_event_buffer;
205  QueueType _queue_type;
206  std::atomic<bool> _valid{true};
207  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<size_t> _failure_counter{0};
208 };
209 
211 {
212 public:
213  /***/
214  QUILL_EXPORT static ThreadContextManager& instance() noexcept
215  {
216  static ThreadContextManager instance;
217  return instance;
218  }
219 
220  /***/
222  ThreadContextManager& operator=(ThreadContextManager const&) = delete;
223 
224  /***/
225  template <typename TCallback>
226  void for_each_thread_context(TCallback cb)
227  {
228  LockGuard const lock{_spinlock};
229 
230  for (auto const& elem : _thread_contexts)
231  {
232  cb(elem.get());
233  }
234  }
235 
236  /***/
237  void register_thread_context(std::shared_ptr<ThreadContext> const& thread_context)
238  {
239  _spinlock.lock();
240  _thread_contexts.push_back(thread_context);
241  _spinlock.unlock();
242  _new_thread_context_flag.store(true, std::memory_order_release);
243  }
244 
245  /***/
246  void add_invalid_thread_context() noexcept
247  {
248  _invalid_thread_context_count.fetch_add(1, std::memory_order_relaxed);
249  }
250 
251  /***/
252  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool has_invalid_thread_context() const noexcept
253  {
254  // Here we do relaxed because if the value is not zero we will look inside ThreadContext invalid
255  // flag that is also a relaxed atomic, and then we will look into the SPSC queue size that is
256  // also atomic Even if we don't read everything in order we will check again in the next circle
257  return _invalid_thread_context_count.load(std::memory_order_relaxed) != 0;
258  }
259 
260  /***/
261  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool new_thread_context_flag() noexcept
262  {
263  // Again relaxed memory model as in case it is false we will acquire the lock
264  if (_new_thread_context_flag.load(std::memory_order_relaxed))
265  {
266  // if the variable was updated to true, set it to false,
267  // There should not be any race condition here as this is the only place _changed is set to
268  // false, and we will return true anyway
269  _new_thread_context_flag.store(false, std::memory_order_relaxed);
270  return true;
271  }
272  return false;
273  }
274 
275  /***/
276  void remove_shared_invalidated_thread_context(ThreadContext const* thread_context)
277  {
278  LockGuard const lock{_spinlock};
279 
280  // We could use std::find_if, but since this header is included in Logger.h, which is essential
281  // for logging purposes, we aim to minimize the number of includes in that path.
282  // Therefore, we implement our own find_if loop here.
283  auto thread_context_it = _thread_contexts.end();
284  for (auto it = _thread_contexts.begin(); it != _thread_contexts.end(); ++it)
285  {
286  if (it->get() == thread_context)
287  {
288  thread_context_it = it;
289  break;
290  }
291  }
292 
293  assert(thread_context_it != _thread_contexts.end() &&
294  "Attempting to remove a non existent thread context");
295 
296  assert(!thread_context_it->get()->is_valid() && "Attempting to remove a valid thread context");
297 
298 #ifndef NDEBUG
299  assert(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type());
300 
301  if (thread_context->has_unbounded_queue_type())
302  {
303  assert(thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty() &&
304  "Attempting to remove a thread context with a non empty queue");
305  }
306  else if (thread_context->has_bounded_queue_type())
307  {
308  assert(thread_context->get_spsc_queue_union().bounded_spsc_queue.empty() &&
309  "Attempting to remove a thread context with a non empty queue");
310  }
311 #endif
312 
313  _thread_contexts.erase(thread_context_it);
314 
315  // Decrement the counter since we found something to
316  _invalid_thread_context_count.fetch_sub(1, std::memory_order_relaxed);
317  }
318 
319 private:
320  ThreadContextManager() = default;
321  ~ThreadContextManager() = default;
322 
323 private:
324  std::vector<std::shared_ptr<ThreadContext>> _thread_contexts;
325  Spinlock _spinlock;
326  std::atomic<bool> _new_thread_context_flag{false};
327  std::atomic<uint8_t> _invalid_thread_context_count{0};
328 };
329 
331 {
332 public:
333  /***/
334  ScopedThreadContext(QueueType queue_type, size_t initial_queue_capacity,
335  size_t unbounded_queue_max_capacity, HugePagesPolicy huge_pages_policy)
336  : _thread_context(std::make_shared<ThreadContext>(
337  queue_type, initial_queue_capacity, unbounded_queue_max_capacity, huge_pages_policy))
338  {
339 #ifndef NDEBUG
340  // Thread-local flag to track if an instance has been created for this thread.
341  // This ensures that get_local_thread_context() is not called with different template arguments
342  // when using custom FrontendOptions. Having multiple thread contexts in a single thread is fine
343  // and functional but goes against the design principle of maintaining a single thread context
344  // per thread.
345  thread_local bool thread_local_instance_created = false;
346 
347  assert(!thread_local_instance_created &&
348  R"(ScopedThreadContext can only be instantiated once per thread. It appears you may be combining default FrontendOptions with custom FrontendOptions. Ensure only one set of FrontendOptions is used to maintain a single thread context per thread.)");
349 
350  thread_local_instance_created = true;
351 #endif
352 
353  ThreadContextManager::instance().register_thread_context(_thread_context);
354  }
355 
356  /***/
357  ScopedThreadContext(ScopedThreadContext const&) = delete;
358  ScopedThreadContext& operator=(ScopedThreadContext const&) = delete;
359 
360  /***/
361  ~ScopedThreadContext() noexcept
362  {
363  // This destructor will get called when the thread that created this wrapper stops
364  // we will only invalidate the thread context
365  // The backend thread will empty an invalidated ThreadContext and then remove_file it from
366  // the ThreadContextCollection
367  // There is only exception for the thread who owns the ThreadContextCollection the
368  // main thread. The thread context of the main thread can get deleted before getting invalidated
369  _thread_context->mark_invalid();
370 
371  // Notify the backend thread that one context has been removed
372  ThreadContextManager::instance().add_invalid_thread_context();
373  }
374 
375  /***/
376  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ThreadContext* get_thread_context() const noexcept
377  {
378  assert(_thread_context && "_thread_context can not be null");
379  return _thread_context.get();
380  }
381 
382 private:
387  std::shared_ptr<ThreadContext> _thread_context;
388 };
389 
390 /***/
391 template <typename TFrontendOptions>
392 QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ThreadContext* get_local_thread_context() noexcept
393 {
394  thread_local ScopedThreadContext scoped_thread_context{
395  TFrontendOptions::queue_type, TFrontendOptions::initial_queue_capacity,
396  TFrontendOptions::unbounded_queue_max_capacity, TFrontendOptions::huge_pages_policy};
397 
398  return scoped_thread_context.get_thread_context();
399 }
400 } // namespace detail
401 
402 QUILL_END_NAMESPACE
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:37
Definition: ThreadContextManager.h:330
Definition: ThreadContextManager.h:210
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED std::string get_thread_name()
Returns the name of the thread.
Definition: ThreadUtilities.h:147
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
Definition: Spinlock.h:18
Definition: Spinlock.h:58
Definition: BackendWorker.h:72
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED uint32_t get_thread_id() noexcept
Returns the os assigned ID of the thread.
Definition: ThreadUtilities.h:193
Definition: ThreadContextManager.h:48