quill
BackendWorker.h
1 
7 #pragma once
8 
9 #if defined(_WIN32)
10  #include "quill/backend/Utf8Conv.h"
11 #endif
12 
13 #include "quill/backend/BackendOptions.h"
14 #include "quill/backend/BackendUtilities.h"
15 #include "quill/backend/BackendWorkerLock.h"
16 #include "quill/backend/BacktraceStorage.h"
17 #include "quill/backend/PatternFormatter.h"
18 #include "quill/backend/RdtscClock.h"
19 #include "quill/backend/ThreadUtilities.h"
20 #include "quill/backend/TransitEvent.h"
21 #include "quill/backend/TransitEventBuffer.h"
22 
23 #include "quill/core/Attributes.h"
24 #include "quill/core/BoundedSPSCQueue.h"
25 #include "quill/core/ChronoTimeUtils.h"
26 #include "quill/core/Codec.h"
27 #include "quill/core/Common.h"
28 #include "quill/core/DynamicFormatArgStore.h"
29 #include "quill/core/LogLevel.h"
30 #include "quill/core/LoggerBase.h"
31 #include "quill/core/LoggerManager.h"
32 #include "quill/core/MacroMetadata.h"
33 #include "quill/core/MathUtilities.h"
34 #include "quill/core/QuillError.h"
35 #include "quill/core/SinkManager.h"
36 #include "quill/core/ThreadContextManager.h"
37 #include "quill/core/TimeUtilities.h"
38 #include "quill/core/UnboundedSPSCQueue.h"
39 #include "quill/sinks/Sink.h"
40 
41 #include "quill/bundled/fmt/base.h"
42 
43 #include <algorithm>
44 #include <atomic>
45 #include <chrono>
46 #include <condition_variable>
47 #include <cstddef>
48 #include <cstdint>
49 #include <cstring>
50 #include <ctime>
51 #include <exception>
52 #include <functional>
53 #include <iterator>
54 #include <limits>
55 #include <memory>
56 #include <mutex>
57 #include <optional>
58 #include <string>
59 #include <string_view>
60 #include <thread>
61 #include <unordered_map>
62 #include <utility>
63 #include <vector>
64 
65 QUILL_BEGIN_NAMESPACE
66 
67 class ManualBackendWorker; // Forward declaration
68 
69 namespace detail
70 {
71 
72 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
73  #pragma warning(push)
74  #pragma warning(disable : 4324)
75 #endif
76 
78 {
79 public:
83  BackendWorker() = default;
84 
88  BackendWorker(BackendWorker const&) = delete;
89  BackendWorker& operator=(BackendWorker const&) = delete;
90 
95  {
96  // This destructor will run during static destruction as the thread is part of the singleton
97  stop();
98 
99  RdtscClock const* rdtsc_clock = _rdtsc_clock.exchange(nullptr);
100  delete rdtsc_clock;
101  }
102 
103  /***/
104  QUILL_NODISCARD bool is_running() const noexcept
105  {
106  return _is_worker_running.load(std::memory_order_acquire);
107  }
108 
112  QUILL_NODISCARD uint64_t time_since_epoch(uint64_t rdtsc_value) const
113  {
114  if (QUILL_UNLIKELY(_options.sleep_duration > _options.rdtsc_resync_interval))
115  {
116  QUILL_THROW(
117  QuillError{"Invalid config, When TSC clock is used backend_thread_sleep_duration should "
118  "not be higher than rdtsc_resync_interval"});
119  }
120 
121  RdtscClock const* rdtsc_clock = _rdtsc_clock.load(std::memory_order_acquire);
122  return rdtsc_clock ? rdtsc_clock->time_since_epoch_safe(rdtsc_value) : 0;
123  }
124 
129  QUILL_NODISCARD uint32_t get_backend_thread_id() const noexcept
130  {
131  return _worker_thread_id.load();
132  }
133 
138  QUILL_ATTRIBUTE_COLD void run(BackendOptions const& options)
139  {
140  _ensure_linker_retains_symbols();
141 
142  _process_id = std::to_string(get_process_id());
143 
145  {
146  _backend_worker_lock = std::make_unique<BackendWorkerLock>(_process_id);
147  }
148 
149  std::thread worker(
150  [this, options]()
151  {
152  _init(options);
153 
154  QUILL_TRY
155  {
156  if (_options.cpu_affinity != (std::numeric_limits<uint16_t>::max)())
157  {
158  // Set cpu affinity if requested to cpu _backend_thread_cpu_affinity
159  set_cpu_affinity(_options.cpu_affinity);
160  }
161  }
162 #if !defined(QUILL_NO_EXCEPTIONS)
163  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
164  QUILL_CATCH_ALL() { _options.error_notifier(std::string{"Caught unhandled exception."}); }
165 #endif
166 
167  QUILL_TRY
168  {
169  // Set the thread name to the desired name
170  set_thread_name(_options.thread_name.data());
171  }
172 #if !defined(QUILL_NO_EXCEPTIONS)
173  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
174  QUILL_CATCH_ALL() { _options.error_notifier(std::string{"Caught unhandled exception."}); }
175 #endif
176 
177  // All okay, set the backend worker thread running flag
178  _is_worker_running.store(true);
179 
180  // Running
181  while (QUILL_LIKELY(_is_worker_running.load(std::memory_order_relaxed)))
182  {
183  // main loop
184  QUILL_TRY { _poll(); }
185 #if !defined(QUILL_NO_EXCEPTIONS)
186  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
187  QUILL_CATCH_ALL() { _options.error_notifier(std::string{"Caught unhandled exception."}); }
188 #endif
189  }
190 
191  // exit
192  QUILL_TRY { _exit(); }
193 #if !defined(QUILL_NO_EXCEPTIONS)
194  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
195  QUILL_CATCH_ALL() { _options.error_notifier(std::string{"Caught unhandled exception."}); }
196 #endif
197  });
198 
199  // Move the worker ownership to our class
200  _worker_thread.swap(worker);
201 
202  while (!_is_worker_running.load(std::memory_order_seq_cst))
203  {
204  // wait for the thread to start
205  std::this_thread::sleep_for(std::chrono::microseconds{100});
206  }
207  }
208 
212  QUILL_ATTRIBUTE_COLD void stop() noexcept
213  {
214  // Stop the backend worker
215  if (!_is_worker_running.exchange(false))
216  {
217  // already stopped
218  return;
219  }
220 
221  // signal wake up the backend worker thread
222  notify();
223 
224  // Wait the backend thread to join, if backend thread was never started it won't be joinable
225  if (_worker_thread.joinable())
226  {
227  _worker_thread.join();
228  }
229 
230  _worker_thread_id.store(0);
231  _backend_worker_lock.reset(nullptr);
232  }
233 
238  void notify()
239  {
240 #ifdef __MINGW32__
241  // MinGW can deadlock if the mutex is released before cv.notify_one(),
242  // so keep notify_one() inside the lock for MinGW
243  std::lock_guard<std::mutex> lock{_wake_up_mutex};
244  _wake_up_flag = true;
245  _wake_up_cv.notify_one();
246 #else
247  // Set the flag to indicate that the data is ready
248  {
249  std::lock_guard<std::mutex> lock{_wake_up_mutex};
250  _wake_up_flag = true;
251  }
252 
253  // Signal the condition variable to wake up the worker thread
254  _wake_up_cv.notify_one();
255 #endif
256  }
257 
258 private:
259  /***/
260  QUILL_ATTRIBUTE_HOT void _invoke_poll_hook(std::function<void()> const& hook) const
261  {
262  QUILL_TRY { hook(); }
263 #if !defined(QUILL_NO_EXCEPTIONS)
264  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
265  QUILL_CATCH_ALL() { _options.error_notifier(std::string{"Caught unhandled exception."}); }
266 #endif
267  }
268 
269  /***/
270  QUILL_ATTRIBUTE_HOT void _invoke_poll_end_once(bool& poll_end_called) const
271  {
272  if (poll_end_called)
273  {
274  return;
275  }
276 
277  poll_end_called = true;
278  _invoke_poll_hook(_options.backend_worker_on_poll_end);
279  }
280 
285  QUILL_ATTRIBUTE_COLD static void _ensure_linker_retains_symbols()
286  {
287  // Calls to ensure it is retained by the linker.
288  QUILL_MAYBE_UNUSED static auto thread_name = get_thread_name();
289  (void)thread_name;
290 
291 #if defined(_WIN32)
292  std::wstring const dummy = L"dummy";
293  QUILL_MAYBE_UNUSED static auto encode1 = utf8_encode(dummy);
294  (void)encode1;
295 
296  QUILL_MAYBE_UNUSED static auto encode2 =
297  utf8_encode(reinterpret_cast<std::byte const*>(dummy.data()), dummy.size());
298  (void)encode2;
299 #endif
300  }
301 
305  QUILL_ATTRIBUTE_HOT void _poll()
306  {
307  // poll hook
308  bool poll_end_called = false;
309  if (QUILL_UNLIKELY(static_cast<bool>(_options.backend_worker_on_poll_begin)))
310  {
311  _invoke_poll_hook(_options.backend_worker_on_poll_begin);
312  }
313 
314  // load all contexts locally
315  _update_active_thread_contexts_cache();
316 
317  // Read all frontend queues and cache the log statements and the metadata as TransitEvents
318  size_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues();
319 
320  if (cached_transit_events_count != 0)
321  {
322  // there are cached events to process
323  if (cached_transit_events_count < _options.transit_events_soft_limit)
324  {
325  // process a single transit event, then give priority to reading the frontend queues again
326  _process_lowest_timestamp_transit_event();
327  }
328  else
329  {
330  // we want to process a batch of events.
331  while (!has_pending_events_for_caching_when_transit_event_buffer_empty() &&
332  _process_lowest_timestamp_transit_event())
333  {
334  // We need to be cautious because there are log messages in the lock-free queues
335  // that have not yet been cached in the transit event buffer. Logging only the cached
336  // messages can result in out-of-order log entries, as messages with larger timestamps
337  // in the queue might be missed.
338  }
339  }
340  }
341  else
342  {
343  // No cached transit events to process, minimal thread workload.
344 
345  // force flush all remaining messages
346  _flush_and_run_active_sinks(true, _options.sink_min_flush_interval);
347 
348  // check for any dropped messages / blocked threads
349  _check_failure_counter(_options.error_notifier);
350 
351  // This is useful when BackendTscClock is used to keep it up to date
352  _resync_rdtsc_clock();
353 
354  // Also check if all queues are empty
355  bool const queues_and_events_empty = _check_frontend_queues_and_cached_transit_events_empty();
356  if (queues_and_events_empty)
357  {
358  _cleanup_invalidated_thread_contexts();
359  _cleanup_invalidated_loggers();
360  _try_shrink_empty_transit_event_buffers();
361 
362  // poll hook
363  if (QUILL_UNLIKELY(static_cast<bool>(_options.backend_worker_on_poll_end)))
364  {
365  _invoke_poll_end_once(poll_end_called);
366  }
367 
368  // There is nothing left to do, and we can let this thread sleep for a while
369  // buffer events are 0 here and also all the producer queues are empty
370  if (_options.sleep_duration.count() != 0)
371  {
372  std::unique_lock<std::mutex> lock{_wake_up_mutex};
373 
374  // Wait for a timeout or a notification to wake up
375  _wake_up_cv.wait_for(lock, _options.sleep_duration, [this] { return _wake_up_flag; });
376 
377  // set the flag back to false since we woke up here
378  _wake_up_flag = false;
379 
380  // After waking up resync rdtsc clock again and resume
381  _resync_rdtsc_clock();
382  }
383  else if (_options.enable_yield_when_idle)
384  {
385  std::this_thread::yield();
386  }
387  }
388  }
389 
390  // poll hook
391  if (QUILL_UNLIKELY(static_cast<bool>(_options.backend_worker_on_poll_end)))
392  {
393  _invoke_poll_end_once(poll_end_called);
394  }
395  }
396 
400  QUILL_ATTRIBUTE_COLD void _init(BackendOptions const& options)
401  {
402  _options = options;
403 
404  // Cache this thread's id
405  _worker_thread_id.store(get_thread_id());
406 
407  (void)get_thread_name();
408 
409  // Double check or modify some backend options before we start
410  if (_options.transit_events_hard_limit == 0)
411  {
412  // transit_events_hard_limit of 0 makes no sense as we can't process anything
413  _options.transit_events_hard_limit = 1;
414  }
415 
416  if (_options.transit_events_soft_limit == 0)
417  {
418  _options.transit_events_soft_limit = 1;
419  }
420 
422  {
423  QUILL_THROW(QuillError{fmtquill::format(
424  "transit_events_soft_limit ({}) cannot be greater than transit_events_hard_limit "
425  "({}). Please ensure that the soft limit is less than or equal to the hard limit.",
427  }
428  else if (!is_power_of_two(_options.transit_events_hard_limit))
429  {
430  QUILL_THROW(QuillError{fmtquill::format(
431  "transit_events_hard_limit ({}) must be a power of two", _options.transit_events_hard_limit)});
432  }
433  else if (!is_power_of_two(_options.transit_events_soft_limit))
434  {
435  QUILL_THROW(QuillError{fmtquill::format(
436  "transit_events_soft_limit ({}) must be a power of two", _options.transit_events_soft_limit)});
437  }
438  }
439 
443  QUILL_ATTRIBUTE_COLD void _exit()
444  {
445  while (true)
446  {
447  bool const queues_and_events_empty = (!_options.wait_for_queues_to_empty_before_exit) ||
448  _check_frontend_queues_and_cached_transit_events_empty();
449 
450  if (queues_and_events_empty)
451  {
452  // we are done, all queues are now empty
453  _check_failure_counter(_options.error_notifier);
454  _flush_and_run_active_sinks(false, std::chrono::milliseconds{0});
455  break;
456  }
457 
458  uint64_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues();
459  if (cached_transit_events_count > 0)
460  {
461  while (!has_pending_events_for_caching_when_transit_event_buffer_empty() &&
462  _process_lowest_timestamp_transit_event())
463  {
464  // We need to be cautious because there are log messages in the lock-free queues
465  // that have not yet been cached in the transit event buffer. Logging only the cached
466  // messages can result in out-of-order log entries, as messages with larger timestamps
467  // in the queue might be missed.
468  }
469  }
470  }
471 
472  _cleanup_invalidated_thread_contexts();
473  _cleanup_invalidated_loggers();
474  }
475 
479  QUILL_ATTRIBUTE_HOT size_t _populate_transit_events_from_frontend_queues()
480  {
481  uint64_t const ts_now = _options.log_timestamp_ordering_grace_period.count()
482  ? static_cast<uint64_t>((detail::get_timestamp<std::chrono::system_clock>() - _options.log_timestamp_ordering_grace_period)
483  .count())
484  : std::numeric_limits<uint64_t>::max();
485 
486  size_t cached_transit_events_count{0};
487 
488  for (ThreadContext* thread_context : _active_thread_contexts_cache)
489  {
490  QUILL_ASSERT(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type(),
491  "Invalid queue type in BackendWorker::_read_and_decode_frontend_queues()");
492 
493  if (thread_context->has_unbounded_queue_type())
494  {
495  cached_transit_events_count += _read_and_decode_frontend_queue(
496  thread_context->get_spsc_queue_union().unbounded_spsc_queue, thread_context, ts_now);
497  }
498  else if (thread_context->has_bounded_queue_type())
499  {
500  cached_transit_events_count += _read_and_decode_frontend_queue(
501  thread_context->get_spsc_queue_union().bounded_spsc_queue, thread_context, ts_now);
502  }
503  }
504 
505  return cached_transit_events_count;
506  }
507 
515  template <typename TFrontendQueue>
516  QUILL_ATTRIBUTE_HOT size_t _read_and_decode_frontend_queue(TFrontendQueue& frontend_queue,
517  ThreadContext* thread_context, uint64_t ts_now)
518  {
519  // Note: The producer commits only complete messages to the queue.
520  // Therefore, if even a single byte is present in the queue, it signifies a full message.
521  size_t const queue_capacity = frontend_queue.capacity();
522  size_t total_bytes_read{0};
523 
524  do
525  {
526  std::byte* read_pos;
527 
528  if constexpr (std::is_same_v<TFrontendQueue, UnboundedSPSCQueue>)
529  {
530  read_pos = _read_unbounded_frontend_queue(frontend_queue, thread_context);
531  }
532  else
533  {
534  read_pos = frontend_queue.prepare_read();
535  }
536 
537  if (!read_pos)
538  {
539  // Exit loop nothing to read
540  break;
541  }
542 
543  std::byte const* const read_begin = read_pos;
544 
545  if (!_populate_transit_event_from_frontend_queue(read_pos, thread_context, ts_now))
546  {
547  // If _get_transit_event_from_queue returns false, stop reading
548  break;
549  }
550 
551  // Finish reading
552  QUILL_ASSERT(
553  read_pos >= read_begin,
554  "read_pos must be >= read_begin in BackendWorker::_read_and_decode_frontend_queue()");
555 
556  auto const bytes_read = static_cast<size_t>(read_pos - read_begin);
557  frontend_queue.finish_read(bytes_read);
558  total_bytes_read += bytes_read;
559  // Reads a maximum of one full frontend queue or the transit events' hard limit to prevent
560  // getting stuck on the same producer.
561  } while ((total_bytes_read < queue_capacity) &&
562  (thread_context->_transit_event_buffer->size() < _options.transit_events_hard_limit));
563 
564  if (total_bytes_read != 0)
565  {
566  // If we read something from the queue, we commit all the reads together at the end.
567  // This strategy enhances cache coherence performance by updating the shared atomic flag
568  // only once.
569  frontend_queue.commit_read();
570  }
571 
572  return thread_context->_transit_event_buffer->size();
573  }
574 
575  /***/
576  QUILL_ATTRIBUTE_HOT bool _populate_transit_event_from_frontend_queue(std::byte*& read_pos,
577  ThreadContext* thread_context,
578  uint64_t ts_now)
579  {
580  QUILL_ASSERT(thread_context->_transit_event_buffer,
581  "transit_event_buffer is nullptr in "
582  "BackendWorker::_populate_transit_event_from_frontend_queue()");
583 
584  // Allocate a new TransitEvent or use an existing one to store the message from the queue
585  TransitEvent* transit_event = thread_context->_transit_event_buffer->back();
586 
587  QUILL_ASSERT(
588  transit_event,
589  "transit_event is nullptr in BackendWorker::_populate_transit_event_from_frontend_queue()");
590 
591  QUILL_ASSERT(
592  transit_event->formatted_msg,
593  "formatted_msg is nullptr in BackendWorker::_populate_transit_event_from_frontend_queue()");
594 
595  std::memcpy(&transit_event->timestamp, read_pos, sizeof(transit_event->timestamp));
596  read_pos += sizeof(transit_event->timestamp);
597 
598  std::memcpy(&transit_event->macro_metadata, read_pos, sizeof(transit_event->macro_metadata));
599  read_pos += sizeof(transit_event->macro_metadata);
600 
601  std::memcpy(&transit_event->logger_base, read_pos, sizeof(transit_event->logger_base));
602  read_pos += sizeof(transit_event->logger_base);
603 
604  QUILL_ASSERT(transit_event->logger_base,
605  "transit_event->logger_base is nullptr after memcpy from queue");
606 
607  QUILL_ASSERT(transit_event->logger_base->_clock_source == ClockSourceType::Tsc ||
608  transit_event->logger_base->_clock_source == ClockSourceType::System ||
609  transit_event->logger_base->_clock_source == ClockSourceType::User,
610  "transit_event->logger_base->_clock_source has invalid enum value - possible "
611  "memory corruption");
612 
613  if (transit_event->logger_base->_clock_source == ClockSourceType::Tsc)
614  {
615  // If using the rdtsc clock, convert the value to nanoseconds since epoch.
616  // This conversion ensures that every transit inserted in the buffer below has a timestamp in
617  // nanoseconds since epoch, allowing compatibility with Logger objects using different clocks.
618  if (QUILL_UNLIKELY(!_rdtsc_clock.load(std::memory_order_relaxed)))
619  {
620  // Lazy initialization of rdtsc clock on the backend thread only if the user decides to use
621  // it. The clock requires a few seconds to init as it is taking samples first.
622  _rdtsc_clock.store(new RdtscClock{_options.rdtsc_resync_interval}, std::memory_order_release);
623  _last_rdtsc_resync_time = std::chrono::steady_clock::now();
624  }
625 
626  // Convert the rdtsc value to nanoseconds since epoch.
627  transit_event->timestamp =
628  _rdtsc_clock.load(std::memory_order_relaxed)->time_since_epoch(transit_event->timestamp);
629  }
630 
631  // Check if strict log timestamp order is enabled and the clock source is not User
632  if ((transit_event->logger_base->_clock_source != ClockSourceType::User) &&
633  (ts_now != std::numeric_limits<uint64_t>::max()))
634  {
635  // We only check against `ts_now` for real timestamps, not custom timestamps by the user, and
636  // when the grace period is enabled
637 
638 #if defined(QUILL_ENABLE_ASSERTIONS) || !defined(NDEBUG)
639  // Check the timestamps we are comparing have the same digits
640  auto count_digits = [](uint64_t number)
641  {
642  uint32_t digits = 0;
643  do
644  {
645  digits++;
646  number /= 10;
647  } while (number != 0);
648  return digits;
649  };
650 
651  QUILL_ASSERT(count_digits(transit_event->timestamp) == count_digits(ts_now),
652  "Timestamp digits mismatch in "
653  "BackendWorker::_populate_transit_event_from_frontend_queue(), log timestamp "
654  "has different magnitude than current time");
655 #endif
656 
657  // Ensure the message timestamp is not greater than ts_now.
658  if (QUILL_UNLIKELY(transit_event->timestamp > ts_now))
659  {
660  // If the message timestamp is ahead of the current time, temporarily halt processing.
661  // This guarantees the integrity of message order and avoids missed messages.
662  // We halt processing here to avoid introducing out-of-sequence messages.
663  // This scenario prevents potential race conditions where timestamps from
664  // the last queue could overwrite those from the first queue before they are included.
665  // We return at this point without adding the current event to the buffer.
666  return false;
667  }
668  }
669 
670  FormatArgsDecoder format_args_decoder;
671  std::memcpy(&format_args_decoder, read_pos, sizeof(format_args_decoder));
672  read_pos += sizeof(format_args_decoder);
673 
674  if ((transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataDeepCopy) ||
675  (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataShallowCopy) ||
676  (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataHybridCopy))
677  {
678  _apply_runtime_metadata(read_pos, transit_event);
679  }
680 
681  // we need to check and do not try to format the flush events as that wouldn't be valid
682  if ((transit_event->macro_metadata->event() != MacroMetadata::Event::Flush) &&
683  (transit_event->macro_metadata->event() != MacroMetadata::Event::LoggerRemovalRequest))
684  {
685  format_args_decoder(read_pos, _format_args_store);
686 
687  if (!transit_event->macro_metadata->has_named_args())
688  {
689  _populate_formatted_log_message(transit_event, transit_event->macro_metadata->message_format());
690  }
691  else
692  {
693  // using the message_format as key for lookups
694  _named_args_format_template.assign(transit_event->macro_metadata->message_format());
695 
696  if (auto const search = _named_args_templates.find(_named_args_format_template);
697  search != std::cend(_named_args_templates))
698  {
699  // process named args message when we already have parsed the format message once,
700  // and we have the names of each arg cached
701  auto const& [message_format, arg_names] = search->second;
702 
703  _populate_formatted_log_message(transit_event, message_format.data());
704  _populate_formatted_named_args(transit_event, arg_names);
705  }
706  else
707  {
708  // process named args log when the message format is processed for the first time
709  // parse name of each arg and stored them to our lookup map
710  auto const [res_it, inserted] = _named_args_templates.try_emplace(
711  _named_args_format_template,
712  _process_named_args_format_message(transit_event->macro_metadata->message_format()));
713 
714  auto const& [message_format, arg_names] = res_it->second;
715 
716  // suppress unused warnings
717  (void)inserted;
718 
719  _populate_formatted_log_message(transit_event, message_format.data());
720  _populate_formatted_named_args(transit_event, arg_names);
721  }
722  }
723  }
724  else if (transit_event->macro_metadata->event() == MacroMetadata::Event::Flush)
725  {
726  // if this is a flush event then we do not need to format anything for the
727  // transit_event, but we need to set the transit event's flush_flag pointer instead
728  uintptr_t flush_flag_tmp;
729  std::memcpy(&flush_flag_tmp, read_pos, sizeof(uintptr_t));
730  transit_event->flush_flag = reinterpret_cast<std::atomic<bool>*>(flush_flag_tmp);
731  read_pos += sizeof(uintptr_t);
732  }
733  else
734  {
735  // Store the logger name and the sync flag
736  QUILL_ASSERT(
737  transit_event->macro_metadata->event() == MacroMetadata::Event::LoggerRemovalRequest,
738  "Unexpected event type in BackendWorker::_populate_transit_event_from_frontend_queue(), "
739  "expected LoggerRemovalRequest");
740 
741  uintptr_t logger_removal_flag_tmp;
742  std::memcpy(&logger_removal_flag_tmp, read_pos, sizeof(uintptr_t));
743  read_pos += sizeof(uintptr_t);
744  std::string_view const logger_name = Codec<std::string>::decode_arg(read_pos);
745 
746  _logger_removal_flags.emplace(std::string{logger_name},
747  reinterpret_cast<std::atomic<bool>*>(logger_removal_flag_tmp));
748  }
749 
750  // commit this transit event
751  thread_context->_transit_event_buffer->push_back();
752  _format_args_store.clear();
753 
754  return true;
755  }
756 
761  QUILL_ATTRIBUTE_HOT bool has_pending_events_for_caching_when_transit_event_buffer_empty() noexcept
762  {
763  _update_active_thread_contexts_cache();
764 
765  for (ThreadContext* thread_context : _active_thread_contexts_cache)
766  {
767  QUILL_ASSERT(thread_context->_transit_event_buffer,
768  "transit_event_buffer is nullptr in "
769  "BackendWorker::has_pending_events_for_caching_when_transit_event_buffer_empty()"
770  ", should be valid in _active_thread_contexts_cache");
771 
772  if (thread_context->_transit_event_buffer->empty())
773  {
774  // if there is no _transit_event_buffer yet then check only the queue
775  if (thread_context->has_unbounded_queue_type() &&
776  !thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty())
777  {
778  return true;
779  }
780 
781  if (thread_context->has_bounded_queue_type() &&
782  !thread_context->get_spsc_queue_union().bounded_spsc_queue.empty())
783  {
784  return true;
785  }
786  }
787  }
788 
789  return false;
790  }
791 
795  QUILL_ATTRIBUTE_HOT bool _process_lowest_timestamp_transit_event()
796  {
797  // Get the lowest timestamp
798  uint64_t min_ts{std::numeric_limits<uint64_t>::max()};
799  ThreadContext* thread_context{nullptr};
800 
801  for (ThreadContext* tc : _active_thread_contexts_cache)
802  {
803  QUILL_ASSERT(tc->_transit_event_buffer,
804  "transit_event_buffer is nullptr in "
805  "BackendWorker::_process_lowest_timestamp_transit_event(), should be valid in "
806  "_active_thread_contexts_cache");
807 
808  TransitEvent const* te = tc->_transit_event_buffer->front();
809  if (te && (min_ts > te->timestamp))
810  {
811  min_ts = te->timestamp;
812  thread_context = tc;
813  }
814  }
815 
816  if (!thread_context)
817  {
818  // all transit event buffers are empty
819  return false;
820  }
821 
822  TransitEvent* transit_event = thread_context->_transit_event_buffer->front();
823  QUILL_ASSERT(
824  transit_event,
825  "transit_event is nullptr in BackendWorker::_process_lowest_timestamp_transit_event() when "
826  "transit_buffer is set");
827 
828  std::atomic<bool>* flush_flag{nullptr};
829 
830  QUILL_TRY { _process_transit_event(*thread_context, *transit_event, flush_flag); }
831 #if !defined(QUILL_NO_EXCEPTIONS)
832  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
833  QUILL_CATCH_ALL() { _options.error_notifier(std::string{"Caught unhandled exception."}); }
834 #endif
835 
836  // Finally, clean up any remaining fields in the transit event
837  if (transit_event->extra_data)
838  {
839  transit_event->extra_data->named_args.clear();
840  transit_event->extra_data->runtime_metadata.has_runtime_metadata = false;
841  }
842 
843  thread_context->_transit_event_buffer->pop_front();
844 
845  if (flush_flag)
846  {
847  // Process the second part of the flush event after it's been removed from the buffer,
848  // ensuring that we are no longer interacting with the thread_context or transit_event.
849 
850  // This is particularly important for handling cases when Quill is used as a DLL on Windows.
851  // If `FreeLibrary` is called, the backend thread may attempt to access an invalidated
852  // `ThreadContext`, which can lead to a crash due to invalid memory access.
853  //
854  // To prevent this, whenever we receive a Flush event, we clean up any invalidated thread contexts
855  // before notifying the caller. This ensures that when `flush_log()` is invoked in `DllMain`
856  // during `DLL_PROCESS_DETACH`, the `ThreadContext` is properly cleaned up before the DLL exits.
857  _cleanup_invalidated_thread_contexts();
858 
859  // Now it’s safe to notify the caller to continue execution.
860  flush_flag->store(true);
861  }
862 
863  return true;
864  }
865 
869  QUILL_ATTRIBUTE_HOT void _process_transit_event(ThreadContext const& thread_context,
870  TransitEvent& transit_event, std::atomic<bool>*& flush_flag)
871  {
872  // If backend_process(...) throws we want to skip this event and move to the next, so we catch
873  // the error here instead of catching it in the parent try/catch block of main_loop
874  if (transit_event.macro_metadata->event() == MacroMetadata::Event::Log)
875  {
876  if (transit_event.log_level() != LogLevel::Backtrace)
877  {
878  _dispatch_transit_event_to_sinks(transit_event, thread_context.thread_id(),
879  thread_context.thread_name());
880 
881  // We also need to check the severity of the log message here against the backtrace
882  // Check if we should also flush the backtrace messages:
883  // After we forwarded the message we will check the severity of this message for this logger
884  // If the severity of the message is higher than the backtrace flush severity we will also
885  // flush the backtrace of the logger
886  if (QUILL_UNLIKELY(transit_event.log_level() >=
887  transit_event.logger_base->_backtrace_flush_level.load(std::memory_order_relaxed)))
888  {
889  if (transit_event.logger_base->_backtrace_storage)
890  {
891  transit_event.logger_base->_backtrace_storage->process(
892  [this](TransitEvent const& te, std::string_view thread_id, std::string_view thread_name)
893  { _dispatch_transit_event_to_sinks(te, thread_id, thread_name); });
894  }
895  }
896  }
897  else
898  {
899  if (transit_event.logger_base->_backtrace_storage)
900  {
901  // this is a backtrace log and we will store the transit event
902  // we need to pass a copy of transit_event here and not move the existing
903  // the transit events are reused
904  TransitEvent transit_event_copy;
905  transit_event.copy_to(transit_event_copy);
906 
907  transit_event.logger_base->_backtrace_storage->store(
908  std::move(transit_event_copy), thread_context.thread_id(), thread_context.thread_name());
909  }
910  else
911  {
912  QUILL_THROW(
913  QuillError{"logger->init_backtrace(...) needs to be called first before using "
914  "LOG_BACKTRACE(...)."});
915  }
916  }
917  }
918  else if (transit_event.macro_metadata->event() == MacroMetadata::Event::InitBacktrace)
919  {
920  // we can just convert the capacity back to int here and use it
921  if (!transit_event.logger_base->_backtrace_storage)
922  {
923  // Lazy BacktraceStorage creation
924  transit_event.logger_base->_backtrace_storage = std::make_shared<BacktraceStorage>();
925  }
926 
927  transit_event.logger_base->_backtrace_storage->set_capacity(static_cast<uint32_t>(std::stoul(
928  std::string{transit_event.formatted_msg->begin(), transit_event.formatted_msg->end()})));
929  }
930  else if (transit_event.macro_metadata->event() == MacroMetadata::Event::FlushBacktrace)
931  {
932  if (transit_event.logger_base->_backtrace_storage)
933  {
934  // process all records in backtrace for this logger and log them
935  transit_event.logger_base->_backtrace_storage->process(
936  [this](TransitEvent const& te, std::string_view thread_id, std::string_view thread_name)
937  { _dispatch_transit_event_to_sinks(te, thread_id, thread_name); });
938  }
939  }
940  else if (transit_event.macro_metadata->event() == MacroMetadata::Event::Flush)
941  {
942  _flush_and_run_active_sinks(false, std::chrono::milliseconds{0});
943 
944  // This is a flush event, so we capture the flush flag to notify the caller after processing.
945  flush_flag = transit_event.flush_flag;
946 
947  // Reset the flush flag as TransitEvents are re-used, preventing incorrect flag reuse.
948  transit_event.flush_flag = nullptr;
949 
950  // We defer notifying the caller until after this function completes.
951  }
952  }
953 
957  QUILL_ATTRIBUTE_HOT void _dispatch_transit_event_to_sinks(TransitEvent const& transit_event,
958  std::string_view const& thread_id,
959  std::string_view const& thread_name)
960  {
961  // First check to see if we should init the pattern formatter on a new Logger
962  // Look up to see if we have the formatter and if not create it
963  if (QUILL_UNLIKELY(!transit_event.logger_base->_pattern_formatter))
964  {
965  // Search for an existing pattern_formatter in each logger
966  _logger_manager.for_each_logger(
967  [&transit_event](LoggerBase* logger)
968  {
969  if (logger->_pattern_formatter &&
970  (logger->_pattern_formatter->get_options() == transit_event.logger_base->_pattern_formatter_options))
971  {
972  // hold a copy of the shared_ptr of the same formatter
973  transit_event.logger_base->_pattern_formatter = logger->_pattern_formatter;
974  return true;
975  }
976 
977  return false;
978  });
979 
980  if (!transit_event.logger_base->_pattern_formatter)
981  {
982  // Didn't find an existing formatter need to create a new pattern formatter
983  transit_event.logger_base->_pattern_formatter =
984  std::make_shared<PatternFormatter>(transit_event.logger_base->_pattern_formatter_options);
985  }
986  }
987 
988  QUILL_ASSERT(transit_event.logger_base->_pattern_formatter,
989  "pattern_formatter is nullptr in BackendWorker::_process_transit_event(), should "
990  "have been created earlier");
991 
992  // proceed after ensuring a pattern formatter exists
993  std::string_view const log_level_description =
994  log_level_to_string(transit_event.log_level(), _options.log_level_descriptions.data(),
995  _options.log_level_descriptions.size());
996 
997  std::string_view const log_level_short_code =
998  log_level_to_string(transit_event.log_level(), _options.log_level_short_codes.data(),
999  _options.log_level_short_codes.size());
1000 
1001  _write_log_statement(
1002  transit_event, thread_id, thread_name, log_level_description, log_level_short_code,
1003  std::string_view{transit_event.formatted_msg->data(), transit_event.formatted_msg->size()});
1004  }
1005 
1009  QUILL_ATTRIBUTE_HOT void _write_log_statement(TransitEvent const& transit_event,
1010  std::string_view const& thread_id,
1011  std::string_view const& thread_name,
1012  std::string_view const& log_level_description,
1013  std::string_view const& log_level_short_code,
1014  std::string_view const& log_message) const
1015  {
1016  std::string_view default_log_statement;
1017 
1018  // Process each sink with the appropriate formatting and filtering
1019  for (auto& sink : transit_event.logger_base->_sinks)
1020  {
1021  std::string_view log_to_write;
1022 
1023  // Determine which formatted log to use
1024  if (!sink->_override_pattern_formatter_options)
1025  {
1026  if (default_log_statement.empty())
1027  {
1028  // Use the default formatted log statement, here by checking empty() we try to format once
1029  // even for multiple sinks
1030  default_log_statement = transit_event.logger_base->_pattern_formatter->format(
1031  transit_event.timestamp, thread_id, thread_name, _process_id,
1032  transit_event.logger_base->_logger_name, log_level_description, log_level_short_code,
1033  *transit_event.macro_metadata, transit_event.get_named_args(), log_message);
1034  }
1035 
1036  log_to_write = default_log_statement;
1037  }
1038  else
1039  {
1040  // Sink has override_pattern_formatter_options, we do not include PatternFormatter
1041  // in the frontend fo this reason we init PatternFormatter here
1042  if (!sink->_override_pattern_formatter)
1043  {
1044  // Initialize override formatter if needed
1045  sink->_override_pattern_formatter =
1046  std::make_shared<PatternFormatter>(*sink->_override_pattern_formatter_options);
1047  }
1048 
1049  // Use the sink's override formatter
1050  log_to_write = sink->_override_pattern_formatter->format(
1051  transit_event.timestamp, thread_id, thread_name, _process_id,
1052  transit_event.logger_base->_logger_name, log_level_description, log_level_short_code,
1053  *transit_event.macro_metadata, transit_event.get_named_args(), log_message);
1054  }
1055 
1056  // Apply filters now that we have the formatted log
1057  if (sink->apply_all_filters(transit_event.macro_metadata, transit_event.timestamp, thread_id,
1058  thread_name, transit_event.logger_base->_logger_name,
1059  transit_event.log_level(), log_message, log_to_write))
1060  {
1061  // Forward the message using the computed log statement that passed the filter
1062  sink->write_log(transit_event.macro_metadata, transit_event.timestamp, thread_id,
1063  thread_name, _process_id, transit_event.logger_base->_logger_name,
1064  transit_event.log_level(), log_level_description, log_level_short_code,
1065  transit_event.get_named_args(), log_message, log_to_write);
1066  }
1067  }
1068  }
1069 
1074  QUILL_ATTRIBUTE_HOT void _check_failure_counter(std::function<void(std::string const&)> const& error_notifier) noexcept
1075  {
1076  // UnboundedNoMaxLimit does not block or drop messages
1077  for (ThreadContext* thread_context : _active_thread_contexts_cache)
1078  {
1079  if (thread_context->has_bounded_queue_type())
1080  {
1081  size_t const failed_messages_cnt = thread_context->get_and_reset_failure_counter();
1082 
1083  if (QUILL_UNLIKELY(failed_messages_cnt > 0))
1084  {
1085  char timestamp[24];
1086  time_t now = time(nullptr);
1087  tm local_time;
1088  localtime_rs(&now, &local_time);
1089  strftime(timestamp, sizeof(timestamp), "%X", &local_time);
1090 
1091  if (thread_context->has_dropping_queue())
1092  {
1093  error_notifier(fmtquill::format("{} Quill INFO: Dropped {} log messages from thread {}",
1094  timestamp, failed_messages_cnt, thread_context->thread_id()));
1095  }
1096  else if (thread_context->has_blocking_queue())
1097  {
1098  error_notifier(
1099  fmtquill::format("{} Quill INFO: Experienced {} blocking occurrences on thread {}",
1100  timestamp, failed_messages_cnt, thread_context->thread_id()));
1101  }
1102  }
1103  }
1104  }
1105  }
1106 
1112  QUILL_ATTRIBUTE_HOT static std::pair<std::string, std::vector<std::pair<std::string, std::string>>> _process_named_args_format_message(
1113  std::string_view fmt_template) noexcept
1114  {
1115  // It would be nice to do this at compile time and store it in macro metadata, but without
1116  // constexpr vector and string in c++17 it is not possible
1117  std::string fmt_str;
1118  std::vector<std::pair<std::string, std::string>> keys;
1119 
1120  size_t cur_pos = 0;
1121 
1122  size_t open_bracket_pos = fmt_template.find_first_of('{');
1123  while (open_bracket_pos != std::string::npos)
1124  {
1125  // found an open bracket
1126  if (size_t const open_bracket_2_pos = fmt_template.find_first_of('{', open_bracket_pos + 1);
1127  open_bracket_2_pos != std::string::npos)
1128  {
1129  // found another open bracket
1130  if ((open_bracket_2_pos - 1) == open_bracket_pos)
1131  {
1132  open_bracket_pos = fmt_template.find_first_of('{', open_bracket_2_pos + 1);
1133  continue;
1134  }
1135  }
1136 
1137  // look for the next close bracket
1138  size_t close_bracket_pos = fmt_template.find_first_of('}', open_bracket_pos + 1);
1139  while (close_bracket_pos != std::string::npos)
1140  {
1141  // found closed bracket
1142  if (size_t const close_bracket_2_pos = fmt_template.find_first_of('}', close_bracket_pos + 1);
1143  close_bracket_2_pos != std::string::npos)
1144  {
1145  // found another open bracket
1146  if ((close_bracket_2_pos - 1) == close_bracket_pos)
1147  {
1148  close_bracket_pos = fmt_template.find_first_of('}', close_bracket_2_pos + 1);
1149  continue;
1150  }
1151  }
1152 
1153  // construct a fmt string excluding the characters inside the brackets { }
1154  std::string_view const text_inside_placeholders =
1155  fmt_template.substr(open_bracket_pos + 1, close_bracket_pos - (open_bracket_pos + 1));
1156  std::string_view arg_syntax;
1157  std::string_view arg_name;
1158 
1159  // look in text_inside_placeholders for special syntax formating following the named arg e.g. arg:.2f
1160  if (size_t const syntax_separator = text_inside_placeholders.find(':');
1161  syntax_separator != std::string_view::npos)
1162  {
1163  arg_syntax = text_inside_placeholders.substr(
1164  syntax_separator, text_inside_placeholders.size() - syntax_separator);
1165  arg_name = text_inside_placeholders.substr(0, syntax_separator);
1166  }
1167  else
1168  {
1169  arg_name = text_inside_placeholders;
1170  }
1171 
1172  fmt_str += fmtquill::format(
1173  "{}{{{}}}", fmt_template.substr(cur_pos, open_bracket_pos - cur_pos), arg_syntax);
1174  cur_pos = close_bracket_pos + 1;
1175 
1176  // also add the keys to the vector
1177  keys.emplace_back(arg_name, arg_syntax);
1178 
1179  break;
1180  }
1181 
1182  open_bracket_pos = fmt_template.find_first_of('{', close_bracket_pos);
1183  }
1184 
1185  // add anything remaining after the last bracket
1186  fmt_str += std::string{fmt_template.substr(cur_pos, fmt_template.length() - cur_pos)};
1187  return std::make_pair(fmt_str, keys);
1188  }
1189 
1196  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* _read_unbounded_frontend_queue(UnboundedSPSCQueue& frontend_queue,
1197  ThreadContext* thread_context) const
1198  {
1199  auto const read_result = frontend_queue.prepare_read();
1200 
1201  if (read_result.allocation)
1202  {
1203  if ((read_result.new_capacity < read_result.previous_capacity) && thread_context->_transit_event_buffer)
1204  {
1205  // The user explicitly requested to shrink the queue, indicating a preference for low memory
1206  // usage. To align with this intent, we also request shrinking the backend buffer.
1207  thread_context->_transit_event_buffer->request_shrink();
1208  }
1209 
1210  // When allocation_info has a value it means that the queue has re-allocated
1211  if (_options.error_notifier)
1212  {
1213  char ts[24];
1214  time_t t = time(nullptr);
1215  tm p;
1216  localtime_rs(std::addressof(t), std::addressof(p));
1217  strftime(ts, 24, "%X", std::addressof(p));
1218 
1219  // we switched to a new here, and we also notify the user of the allocation via the
1220  // error_notifier
1221  _options.error_notifier(
1222  fmtquill::format("{} Quill INFO: Allocated a new SPSC queue with a capacity of {} KiB "
1223  "(previously {} KiB) from thread {}",
1224  ts, (read_result.new_capacity / 1024),
1225  (read_result.previous_capacity / 1024), thread_context->thread_id()));
1226  }
1227  }
1228 
1229  return read_result.read_pos;
1230  }
1231 
1232  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _check_frontend_queues_and_cached_transit_events_empty()
1233  {
1234  _update_active_thread_contexts_cache();
1235 
1236  bool all_empty{true};
1237 
1238  for (ThreadContext* thread_context : _active_thread_contexts_cache)
1239  {
1240  QUILL_ASSERT(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type(),
1241  "Invalid queue type in "
1242  "BackendWorker::_check_frontend_queues_and_cached_transit_events_empty()");
1243 
1244  if (thread_context->has_unbounded_queue_type())
1245  {
1246  all_empty &= thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty();
1247  }
1248  else if (thread_context->has_bounded_queue_type())
1249  {
1250  all_empty &= thread_context->get_spsc_queue_union().bounded_spsc_queue.empty();
1251  }
1252 
1253  QUILL_ASSERT(thread_context->_transit_event_buffer,
1254  "transit_event_buffer is nullptr in "
1255  "BackendWorker::_check_frontend_queues_and_cached_transit_events_empty(), "
1256  "should be valid in _active_thread_contexts_cache");
1257 
1258  all_empty &= thread_context->_transit_event_buffer->empty();
1259  }
1260 
1261  return all_empty;
1262  }
1263 
1267  QUILL_ATTRIBUTE_HOT void _resync_rdtsc_clock()
1268  {
1269  if (_rdtsc_clock.load(std::memory_order_relaxed))
1270  {
1271  // resync in rdtsc if we are not logging so that time_since_epoch() still works
1272  if (auto const now = std::chrono::steady_clock::now();
1273  (now - _last_rdtsc_resync_time) > _options.rdtsc_resync_interval)
1274  {
1275  if (_rdtsc_clock.load(std::memory_order_relaxed)->resync(2500))
1276  {
1277  _last_rdtsc_resync_time = now;
1278  }
1279  }
1280  }
1281  }
1282 
1283  /***/
1284  QUILL_ATTRIBUTE_HOT void _flush_and_run_active_sinks(bool run_periodic_tasks, std::chrono::milliseconds sink_min_flush_interval)
1285  {
1286  // Populate the active sinks cache with unique sinks, consider only the valid loggers
1287  _logger_manager.for_each_logger(
1288  [this](LoggerBase* logger)
1289  {
1290  if (logger->is_valid_logger())
1291  {
1292  for (std::shared_ptr<Sink> const& sink : logger->_sinks)
1293  {
1294  Sink* logger_sink_ptr = sink.get();
1295  auto search_it = std::find_if(_active_sinks_cache.begin(), _active_sinks_cache.end(),
1296  [logger_sink_ptr](Sink* elem)
1297  {
1298  // no one else can remove the shared pointer as this is
1299  // only running on backend thread
1300  return elem == logger_sink_ptr;
1301  });
1302 
1303  if (search_it == std::end(_active_sinks_cache))
1304  {
1305  _active_sinks_cache.push_back(logger_sink_ptr);
1306  }
1307  }
1308  }
1309 
1310  // return false to never end the loop early
1311  return false;
1312  });
1313 
1314  bool should_flush_sinks{false};
1315  std::chrono::steady_clock::time_point now;
1316 
1317  if (sink_min_flush_interval.count())
1318  {
1319  // conditional flush sinks
1320  now = std::chrono::steady_clock::now();
1321  if ((now - _last_sink_flush_time) > sink_min_flush_interval)
1322  {
1323  should_flush_sinks = true;
1324  }
1325  }
1326  else
1327  {
1328  // sink_min_flush_interval == 0 - always flush sinks
1329  should_flush_sinks = true;
1330  }
1331 
1332  for (auto const& sink : _active_sinks_cache)
1333  {
1334  QUILL_TRY
1335  {
1336  if (should_flush_sinks)
1337  {
1338  // If an exception is thrown, catch it here to prevent it from propagating
1339  // to the outer function. This prevents potential infinite loops caused by failing
1340  // flush operations.
1341  sink->flush_sink();
1342  }
1343  }
1344 #if !defined(QUILL_NO_EXCEPTIONS)
1345  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
1346  QUILL_CATCH_ALL() { _options.error_notifier(std::string{"Caught unhandled exception."}); }
1347 #endif
1348 
1349  if (run_periodic_tasks)
1350  {
1351  sink->run_periodic_tasks();
1352  }
1353  }
1354 
1355  // Only update the timestamp after we actually attempted to flush
1356  if (should_flush_sinks && sink_min_flush_interval.count() && !_active_sinks_cache.empty())
1357  {
1358  _last_sink_flush_time = now;
1359  }
1360 
1361  _active_sinks_cache.clear();
1362  }
1363 
1367  QUILL_ATTRIBUTE_HOT void _update_active_thread_contexts_cache()
1368  {
1369  // Check if _thread_contexts has changed. This can happen only when a new thread context is added by any Logger
1370  if (QUILL_UNLIKELY(_thread_context_manager.new_thread_context_flag()))
1371  {
1372  _active_thread_contexts_cache.clear();
1373  _thread_context_manager.for_each_thread_context(
1374  [this](ThreadContext* thread_context)
1375  {
1376  if (!thread_context->_transit_event_buffer)
1377  {
1378  // Lazy initialise the _transit_event_buffer for this thread_context
1379  thread_context->_transit_event_buffer =
1380  std::make_shared<TransitEventBuffer>(_options.transit_event_buffer_initial_capacity);
1381  }
1382 
1383  // We do not skip invalidated && empty queue thread contexts as this is very rare,
1384  // so instead we just add them and expect them to be cleaned in the next iteration
1385  _active_thread_contexts_cache.push_back(thread_context);
1386  });
1387  }
1388  }
1389 
1396  QUILL_ATTRIBUTE_HOT void _cleanup_invalidated_thread_contexts()
1397  {
1398  if (!_thread_context_manager.has_invalid_thread_context())
1399  {
1400  return;
1401  }
1402 
1403  auto find_invalid_and_empty_thread_context_callback = [](ThreadContext* thread_context)
1404  {
1405  // If the thread context is invalid it means the thread that created it has now died.
1406  // We also want to empty the queue from all LogRecords before removing the thread context
1407  if (!thread_context->is_valid())
1408  {
1409  QUILL_ASSERT(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type(),
1410  "Invalid queue type in BackendWorker::_update_active_thread_contexts_cache()");
1411 
1412  QUILL_ASSERT(thread_context->_transit_event_buffer,
1413  "transit_event_buffer is nullptr in "
1414  "BackendWorker::_update_active_thread_contexts_cache(), should be valid in "
1415  "_active_thread_contexts_cache");
1416 
1417  // detect empty queue
1418  if (thread_context->has_unbounded_queue_type())
1419  {
1420  return thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty() &&
1421  thread_context->_transit_event_buffer->empty();
1422  }
1423 
1424  if (thread_context->has_bounded_queue_type())
1425  {
1426  return thread_context->get_spsc_queue_union().bounded_spsc_queue.empty() &&
1427  thread_context->_transit_event_buffer->empty();
1428  }
1429  }
1430 
1431  return false;
1432  };
1433 
1434  // First we iterate our existing cache and we look for any invalidated contexts
1435  auto found_invalid_and_empty_thread_context =
1436  std::find_if(_active_thread_contexts_cache.begin(), _active_thread_contexts_cache.end(),
1437  find_invalid_and_empty_thread_context_callback);
1438 
1439  while (QUILL_UNLIKELY(found_invalid_and_empty_thread_context != std::end(_active_thread_contexts_cache)))
1440  {
1441  // if we found anything then remove it - Here if we have more than one to remove we will
1442  // try to acquire the lock multiple times, but it should be fine as it is unlikely to have
1443  // that many to remove
1444  _thread_context_manager.remove_shared_invalidated_thread_context(*found_invalid_and_empty_thread_context);
1445 
1446  // We also need to remove it from _thread_context_cache, that is used only by the backend
1447  _active_thread_contexts_cache.erase(found_invalid_and_empty_thread_context);
1448 
1449  // And then look again
1450  found_invalid_and_empty_thread_context =
1451  std::find_if(_active_thread_contexts_cache.begin(), _active_thread_contexts_cache.end(),
1452  find_invalid_and_empty_thread_context_callback);
1453  }
1454  }
1455 
1459  QUILL_ATTRIBUTE_HOT void _cleanup_invalidated_loggers()
1460  {
1461  // since there are no messages we can check for invalidated loggers and clean them up
1462  _logger_manager.cleanup_invalidated_loggers(
1463  [this]()
1464  {
1465  // check the queues are empty each time before removing a logger to avoid
1466  // potential race condition of the logger* still being in the queue
1467  return _check_frontend_queues_and_cached_transit_events_empty();
1468  },
1469  _removed_loggers);
1470 
1471  if (!_removed_loggers.empty())
1472  {
1473  // if loggers were removed also check for sinks to remove
1474  // cleanup_unused_sinks is expensive and should be only called when it is needed
1475  _sink_manager.cleanup_unused_sinks();
1476 
1477  for (auto const& removed_logger_name : _removed_loggers)
1478  {
1479  // Notify the user if the blocking call was used
1480  auto search_it = _logger_removal_flags.find(removed_logger_name);
1481  if (search_it != _logger_removal_flags.end())
1482  {
1483  search_it->second->store(true);
1484  _logger_removal_flags.erase(search_it);
1485  }
1486  }
1487 
1488  _removed_loggers.clear();
1489  }
1490  }
1491 
1496  QUILL_ATTRIBUTE_HOT void _try_shrink_empty_transit_event_buffers()
1497  {
1498  for (ThreadContext* thread_context : _active_thread_contexts_cache)
1499  {
1500  if (thread_context->_transit_event_buffer)
1501  {
1502  thread_context->_transit_event_buffer->try_shrink();
1503  }
1504  }
1505  }
1506 
1513  static void _format_and_split_arguments(std::vector<std::pair<std::string, std::string>> const& orig_arg_names,
1514  std::vector<std::pair<std::string, std::string>>& named_args,
1515  DynamicFormatArgStore const& format_args_store,
1516  BackendOptions const& options)
1517  {
1518  // Generate a format string
1519  std::string format_string;
1520  static constexpr std::string_view delimiter{QUILL_MAGIC_SEPARATOR};
1521 
1522  for (size_t i = 0; i < named_args.size(); ++i)
1523  {
1524  // We need an additional check here because named_args can have a size greater than orig_arg_names
1525  // This is because we are adding the arguments without a name with a placeholder name
1526  if ((i < orig_arg_names.size()) && !orig_arg_names[i].second.empty())
1527  {
1528  // orig_arg_names[i].second is special format syntax for the named argument if provided, eg name:.2f
1529  format_string += fmtquill::format("{{{}}}", orig_arg_names[i].second);
1530  }
1531  else
1532  {
1533  format_string += "{}";
1534  }
1535 
1536  if (i < named_args.size() - 1)
1537  {
1538  format_string += delimiter;
1539  }
1540  }
1541 
1542  // Format all values to a single string
1543  std::string formatted_values_str;
1544  fmtquill::vformat_to(std::back_inserter(formatted_values_str), format_string,
1545  fmtquill::basic_format_args<fmtquill::format_context>{
1546  format_args_store.data(), format_args_store.size()});
1547 
1548  // Split the formatted_values to isolate each value
1549  size_t start = 0;
1550  size_t end = 0;
1551  size_t idx = 0;
1552 
1553  while ((end = formatted_values_str.find(delimiter, start)) != std::string::npos)
1554  {
1555  if (idx < named_args.size())
1556  {
1557  named_args[idx++].second = formatted_values_str.substr(start, end - start);
1558  }
1559  start = end + delimiter.length();
1560  }
1561 
1562  // last value
1563  if (idx < named_args.size())
1564  {
1565  named_args[idx].second = formatted_values_str.substr(start);
1566  }
1567 
1568  // We call sanitize_non_printable_chars for each value because formatted_values_str already
1569  // contains non-printable characters for the argument separation
1570  if (options.check_printable_char && format_args_store.has_string_related_type())
1571  {
1572  // if non-printable chars check is configured, or if any of the provided arguments are strings
1573  for (auto& named_arg : named_args)
1574  {
1575  sanitize_non_printable_chars(named_arg.second, options);
1576  }
1577  }
1578  }
1579 
1580  void _populate_formatted_named_args(TransitEvent* transit_event,
1581  std::vector<std::pair<std::string, std::string>> const& arg_names)
1582  {
1583  transit_event->ensure_extra_data();
1584 
1585  auto* named_args = &transit_event->extra_data->named_args;
1586 
1587  named_args->resize(arg_names.size());
1588 
1589  // We first populate the arg names in the transit buffer
1590  for (size_t i = 0; i < arg_names.size(); ++i)
1591  {
1592  (*named_args)[i].first = arg_names[i].first;
1593  }
1594 
1595  for (size_t i = arg_names.size(); i < static_cast<size_t>(_format_args_store.size()); ++i)
1596  {
1597  // we do not have a named_arg for the argument value here so we just append its index as a placeholder
1598  named_args->push_back(std::pair<std::string, std::string>(fmtquill::format("_{}", i), std::string{}));
1599  }
1600 
1601  // Then populate all the values of each arg
1602  QUILL_TRY { _format_and_split_arguments(arg_names, *named_args, _format_args_store, _options); }
1603 #if !defined(QUILL_NO_EXCEPTIONS)
1604  QUILL_CATCH(std::exception const&)
1605  {
1606  // This catch block simply catches the exception.
1607  // Since the error has already been handled in _populate_formatted_log_message,
1608  // there is no additional action required here.
1609  }
1610 #endif
1611  }
1612 
1613  QUILL_ATTRIBUTE_HOT void _populate_formatted_log_message(TransitEvent* transit_event, char const* message_format)
1614  {
1615  transit_event->formatted_msg->clear();
1616 
1617  QUILL_TRY
1618  {
1619  fmtquill::vformat_to(std::back_inserter(*transit_event->formatted_msg), message_format,
1620  fmtquill::basic_format_args<fmtquill::format_context>{
1621  _format_args_store.data(), _format_args_store.size()});
1622 
1623  if (_options.check_printable_char && _format_args_store.has_string_related_type())
1624  {
1625  sanitize_non_printable_chars(*transit_event->formatted_msg, _options);
1626  }
1627  }
1628 #if !defined(QUILL_NO_EXCEPTIONS)
1629  QUILL_CATCH(std::exception const& e)
1630  {
1631  transit_event->formatted_msg->clear();
1632  std::string const error =
1633  fmtquill::format(R"([Could not format log statement. message: "{}", location: "{}", error: "{}"])",
1634  transit_event->macro_metadata->message_format(),
1635  transit_event->macro_metadata->short_source_location(), e.what());
1636 
1637  transit_event->formatted_msg->append(error);
1638  _options.error_notifier(error);
1639  }
1640 #endif
1641  }
1642 
1643  void _apply_runtime_metadata(std::byte*& read_pos, TransitEvent* transit_event)
1644  {
1645  char const* fmt;
1646  char const* file;
1647  char const* function;
1648  char const* tags;
1649 
1650  if (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataDeepCopy)
1651  {
1652  fmt = Codec<char const*>::decode_arg(read_pos);
1653  file = Codec<char const*>::decode_arg(read_pos);
1654  function = Codec<char const*>::decode_arg(read_pos);
1655  tags = Codec<char const*>::decode_arg(read_pos);
1656  }
1657  else if (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataShallowCopy)
1658  {
1659  fmt = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1660  file = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1661  function = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1662  tags = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1663  }
1664  else if (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataHybridCopy)
1665  {
1666  fmt = Codec<char const*>::decode_arg(read_pos);
1667  file = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1668  function = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1669  tags = Codec<char const*>::decode_arg(read_pos);
1670  }
1671  else
1672  {
1673  QUILL_THROW(
1674  QuillError{"Unexpected event type in _apply_runtime_metadata. This should never happen."});
1675  }
1676 
1677  auto const line = Codec<uint32_t>::decode_arg(read_pos);
1678  auto const log_level = Codec<LogLevel>::decode_arg(read_pos);
1679 
1680  auto temp = TransitEvent::RuntimeMetadata{file, line, function, tags, fmt, log_level};
1681 
1682  transit_event->ensure_extra_data();
1683  transit_event->extra_data->runtime_metadata = temp;
1684 
1685  // point to the runtime metadata
1686  transit_event->macro_metadata = &transit_event->extra_data->runtime_metadata.macro_metadata;
1687  }
1688 
1689  template <typename TFormattedMsg>
1690  static void sanitize_non_printable_chars(TFormattedMsg& formatted_msg, BackendOptions const& options)
1691  {
1692  // check for non-printable characters in the formatted_msg
1693  bool contains_non_printable_char{false};
1694 
1695  for (char c : formatted_msg)
1696  {
1697  if (!options.check_printable_char(c))
1698  {
1699  contains_non_printable_char = true;
1700  break;
1701  }
1702  }
1703 
1704  if (contains_non_printable_char)
1705  {
1706  // in this rare event we will replace the non-printable chars with their hex values
1707  std::string const formatted_msg_copy = {formatted_msg.data(), formatted_msg.size()};
1708  formatted_msg.clear();
1709 
1710  for (char c : formatted_msg_copy)
1711  {
1712  if (options.check_printable_char(c))
1713  {
1714  formatted_msg.append(std::string{c});
1715  }
1716  else
1717  {
1718  // convert non-printable character to hex
1719  constexpr char hex[] = "0123456789ABCDEF";
1720  formatted_msg.append(std::string{'\\'});
1721  formatted_msg.append(std::string{'x'});
1722  formatted_msg.append(std::string{hex[(c >> 4) & 0xF]});
1723  formatted_msg.append(std::string{hex[c & 0xF]});
1724  }
1725  }
1726  }
1727  }
1728 
1729 private:
1730  friend class quill::ManualBackendWorker;
1731 
1732  std::unique_ptr<BackendWorkerLock> _backend_worker_lock;
1733  ThreadContextManager& _thread_context_manager = ThreadContextManager::instance();
1734  SinkManager& _sink_manager = SinkManager::instance();
1735  LoggerManager& _logger_manager = LoggerManager::instance();
1736  BackendOptions _options;
1737  std::thread _worker_thread;
1738 
1739  DynamicFormatArgStore _format_args_store;
1740  std::vector<std::string> _removed_loggers;
1741  std::vector<ThreadContext*> _active_thread_contexts_cache;
1742  std::vector<Sink*> _active_sinks_cache;
1743  std::unordered_map<std::string, std::pair<std::string, std::vector<std::pair<std::string, std::string>>>> _named_args_templates;
1744  std::unordered_map<std::string, std::atomic<bool>*> _logger_removal_flags;
1745  std::string _named_args_format_template;
1746  std::string _process_id;
1747  std::chrono::steady_clock::time_point _last_rdtsc_resync_time;
1748  std::chrono::steady_clock::time_point _last_sink_flush_time;
1749  std::atomic<uint32_t> _worker_thread_id{0};
1750  std::atomic<bool> _is_worker_running{false};
1752  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<RdtscClock*> _rdtsc_clock{
1753  nullptr};
1754  alignas(QUILL_CACHE_LINE_ALIGNED) std::mutex _wake_up_mutex;
1755  std::condition_variable _wake_up_cv;
1756  bool _wake_up_flag{false};
1757 };
1758 
1759 #if defined(_WIN32) && defined(_MSC_VER) && !defined(__GNUC__)
1760  #pragma warning(pop)
1761 #endif
1762 
1763 } // namespace detail
1764 
1765 QUILL_END_NAMESPACE
Definition: base.h:1070
std::unique_ptr< ExtraData > extra_data
buffer for message
Definition: TransitEvent.h:217
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:42
void clear()
Erase all elements from the store.
Definition: DynamicFormatArgStore.h:147
void notify()
Wakes up the backend worker thread.
Definition: BackendWorker.h:238
std::function< void()> backend_worker_on_poll_end
Optional hook executed by the backend worker thread at the end of each poll iteration.
Definition: BackendOptions.h:192
Base class for sinks.
Definition: Sink.h:40
QUILL_ATTRIBUTE_COLD void run(BackendOptions const &options)
Starts the backend worker thread.
Definition: BackendWorker.h:138
bool enable_yield_when_idle
The backend employs "busy-waiting" by spinning around each frontend thread&#39;s queue.
Definition: BackendOptions.h:44
Definition: TransitEvent.h:32
~BackendWorker()
Destructor.
Definition: BackendWorker.h:94
Definition: TransitEvent.h:135
QUILL_NODISCARD uint64_t time_since_epoch(uint64_t rdtsc_value) const
Access the rdtsc class from any thread to convert an rdtsc value to wall time.
Definition: BackendWorker.h:112
Definition: ThreadContextManager.h:216
void(*)(std::byte *&data, DynamicFormatArgStore &args_store) FormatArgsDecoder
Decode functions.
Definition: Codec.h:399
uint16_t cpu_affinity
Pins the backend to the specified CPU.
Definition: BackendOptions.h:154
std::function< void(std::string const &)> error_notifier
The backend may encounter exceptions that cannot be caught within user threads.
Definition: BackendOptions.h:170
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED std::string get_thread_name()
Returns the name of the thread.
Definition: ThreadUtilities.h:148
std::chrono::nanoseconds sleep_duration
Specifies the duration the backend sleeps if there is no remaining work to process in the queues...
Definition: BackendOptions.h:49
Similar to fmt::dynamic_arg_store but better suited to our needs e.g does not include <functional> an...
Definition: DynamicFormatArgStore.h:77
tm * localtime_rs(time_t const *timer, tm *buf)
Portable localtime_r or _s per operating system.
Definition: TimeUtilities.h:59
Definition: LogFunctions.h:261
QUILL_NODISCARD bool is_valid_logger() const noexcept
Checks if the logger is valid.
Definition: LoggerBase.h:110
void for_each_logger(TCallback cb) const
For backend use only.
Definition: LoggerManager.h:135
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
BackendWorker()=default
Constructor.
typename = void for specializations with enable_if
Definition: Codec.h:144
Converts tsc ticks to nanoseconds since epoch.
Definition: RdtscClock.h:36
std::array< std::string, 11 > log_level_descriptions
Holds descriptive names for various log levels used in logging operations.
Definition: BackendOptions.h:247
size_t transit_events_hard_limit
The backend gives priority to reading messages from the frontend queues and temporarily buffers them...
Definition: BackendOptions.h:92
QUILL_ATTRIBUTE_COLD void stop() noexcept
Stops the backend worker thread.
Definition: BackendWorker.h:212
This class can be used when you want to run the backend worker on your own thread.
Definition: ManualBackendWorker.h:19
Definition: LoggerBase.h:35
custom exception
Definition: QuillError.h:45
QUILL_NODISCARD uint32_t get_backend_thread_id() const noexcept
Get the backend worker&#39;s thread id.
Definition: BackendWorker.h:129
std::array< std::string, 11 > log_level_short_codes
Short codes or identifiers for each log level.
Definition: BackendOptions.h:257
bool wait_for_queues_to_empty_before_exit
When this option is enabled and the application is terminating, the backend worker thread will not ex...
Definition: BackendOptions.h:145
size_t transit_events_soft_limit
The backend gives priority to reading messages from the frontend queues of all the hot threads and te...
Definition: BackendOptions.h:75
QUILL_NODISCARD constexpr bool is_power_of_two(uint64_t number) noexcept
Check if a number is a power of 2.
Definition: MathUtilities.h:24
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ReadResult prepare_read()
Prepare to read from the buffer a callback used for notifications to the user.
Definition: UnboundedSPSCQueue.h:190
Definition: SinkManager.h:28
bool check_backend_singleton_instance
Enables a runtime check to detect multiple instances of the backend singleton.
Definition: BackendOptions.h:280
std::atomic< bool > * flush_flag
A unique ptr to save space as these fields not always used.
Definition: TransitEvent.h:218
std::string thread_name
The name assigned to the backend, visible during thread naming queries (e.g., pthread_getname_np) or ...
Definition: BackendOptions.h:36
std::function< void()> backend_worker_on_poll_begin
Optional hook executed by the backend worker thread at the start of each poll iteration.
Definition: BackendOptions.h:185
Definition: BackendWorker.h:77
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED uint32_t get_thread_id() noexcept
Returns the os assigned ID of the thread.
Definition: ThreadUtilities.h:198
Definition: LoggerManager.h:33
Configuration options for the backend.
Definition: BackendOptions.h:30
std::chrono::microseconds log_timestamp_ordering_grace_period
The backend iterates through all frontend lock-free queues and pops all messages from each queue...
Definition: BackendOptions.h:132
std::function< bool(char c)> check_printable_char
This option enables a check that verifies the log message contains only printable characters before f...
Definition: BackendOptions.h:239
Definition: ThreadContextManager.h:53
std::chrono::milliseconds rdtsc_resync_interval
This option is only applicable if at least one frontend is using a Logger with ClockSourceType::Tsc.
Definition: BackendOptions.h:207
std::chrono::milliseconds sink_min_flush_interval
This option specifies the minimum time interval (in milliseconds) before the backend thread flushes t...
Definition: BackendOptions.h:224