quill
BackendWorker.h
1 
7 #pragma once
8 
9 #if defined(_WIN32)
10  #include "quill/backend/Utf8Conv.h"
11 #endif
12 
13 #include "quill/backend/BackendOptions.h"
14 #include "quill/backend/BackendUtilities.h"
15 #include "quill/backend/BackendWorkerLock.h"
16 #include "quill/backend/BacktraceStorage.h"
17 #include "quill/backend/PatternFormatter.h"
18 #include "quill/backend/RdtscClock.h"
19 #include "quill/backend/ThreadUtilities.h"
20 #include "quill/backend/TransitEvent.h"
21 #include "quill/backend/TransitEventBuffer.h"
22 
23 #include "quill/core/Attributes.h"
24 #include "quill/core/BoundedSPSCQueue.h"
25 #include "quill/core/ChronoTimeUtils.h"
26 #include "quill/core/Codec.h"
27 #include "quill/core/Common.h"
28 #include "quill/core/DynamicFormatArgStore.h"
29 #include "quill/core/LogLevel.h"
30 #include "quill/core/LoggerBase.h"
31 #include "quill/core/LoggerManager.h"
32 #include "quill/core/MacroMetadata.h"
33 #include "quill/core/MathUtilities.h"
34 #include "quill/core/QuillError.h"
35 #include "quill/core/SinkManager.h"
36 #include "quill/core/ThreadContextManager.h"
37 #include "quill/core/TimeUtilities.h"
38 #include "quill/core/UnboundedSPSCQueue.h"
39 #include "quill/sinks/Sink.h"
40 
41 #include "quill/bundled/fmt/base.h"
42 
43 #include <algorithm>
44 #include <atomic>
45 #include <cassert>
46 #include <chrono>
47 #include <condition_variable>
48 #include <cstddef>
49 #include <cstdint>
50 #include <cstring>
51 #include <ctime>
52 #include <exception>
53 #include <functional>
54 #include <iterator>
55 #include <limits>
56 #include <memory>
57 #include <mutex>
58 #include <optional>
59 #include <string>
60 #include <string_view>
61 #include <thread>
62 #include <unordered_map>
63 #include <utility>
64 #include <vector>
65 
66 QUILL_BEGIN_NAMESPACE
67 
68 class ManualBackendWorker; // Forward declaration
69 
70 namespace detail
71 {
73 {
74 public:
78  BackendWorker() { _process_id = std::to_string(get_process_id()); }
79 
83  BackendWorker(BackendWorker const&) = delete;
84  BackendWorker& operator=(BackendWorker const&) = delete;
85 
90  {
91  // This destructor will run during static destruction as the thread is part of the singleton
92  stop();
93 
94  RdtscClock const* rdtsc_clock = _rdtsc_clock.exchange(nullptr);
95  delete rdtsc_clock;
96  }
97 
98  /***/
99  QUILL_NODISCARD bool is_running() const noexcept
100  {
101  return _is_worker_running.load(std::memory_order_acquire);
102  }
103 
107  QUILL_NODISCARD uint64_t time_since_epoch(uint64_t rdtsc_value) const
108  {
109  if (QUILL_UNLIKELY(_options.sleep_duration > _options.rdtsc_resync_interval))
110  {
111  QUILL_THROW(
112  QuillError{"Invalid config, When TSC clock is used backend_thread_sleep_duration should "
113  "not be higher than rdtsc_resync_interval"});
114  }
115 
116  RdtscClock const* rdtsc_clock = _rdtsc_clock.load(std::memory_order_acquire);
117  return rdtsc_clock ? rdtsc_clock->time_since_epoch_safe(rdtsc_value) : 0;
118  }
119 
124  QUILL_NODISCARD uint32_t get_backend_thread_id() const noexcept
125  {
126  return _worker_thread_id.load();
127  }
128 
133  QUILL_ATTRIBUTE_COLD void run(BackendOptions const& options)
134  {
135  _ensure_linker_retains_symbols();
136 
138  {
139  _backend_worker_lock = std::make_unique<BackendWorkerLock>(_process_id);
140  }
141 
142  std::thread worker(
143  [this, options]()
144  {
145  _init(options);
146 
147  QUILL_TRY
148  {
149  if (_options.cpu_affinity != (std::numeric_limits<uint16_t>::max)())
150  {
151  // Set cpu affinity if requested to cpu _backend_thread_cpu_affinity
152  set_cpu_affinity(_options.cpu_affinity);
153  }
154  }
155 #if !defined(QUILL_NO_EXCEPTIONS)
156  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
157  QUILL_CATCH_ALL() { _options.error_notifier(std::string{"Caught unhandled exception."}); }
158 #endif
159 
160  QUILL_TRY
161  {
162  // Set the thread name to the desired name
163  set_thread_name(_options.thread_name.data());
164  }
165 #if !defined(QUILL_NO_EXCEPTIONS)
166  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
167  QUILL_CATCH_ALL() { _options.error_notifier(std::string{"Caught unhandled exception."}); }
168 #endif
169 
170  // All okay, set the backend worker thread running flag
171  _is_worker_running.store(true);
172 
173  // Running
174  while (QUILL_LIKELY(_is_worker_running.load(std::memory_order_relaxed)))
175  {
176  // main loop
177  QUILL_TRY { _poll(); }
178 #if !defined(QUILL_NO_EXCEPTIONS)
179  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
180  QUILL_CATCH_ALL()
181  {
182  _options.error_notifier(std::string{"Caught unhandled exception."});
183  } // clang-format on
184 #endif
185  }
186 
187  // exit
188  QUILL_TRY { _exit(); }
189 #if !defined(QUILL_NO_EXCEPTIONS)
190  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
191  QUILL_CATCH_ALL()
192  {
193  _options.error_notifier(std::string{"Caught unhandled exception."});
194  } // clang-format on
195 #endif
196  });
197 
198  // Move the worker ownership to our class
199  _worker_thread.swap(worker);
200 
201  while (!_is_worker_running.load(std::memory_order_seq_cst))
202  {
203  // wait for the thread to start
204  std::this_thread::sleep_for(std::chrono::microseconds{100});
205  }
206  }
207 
211  QUILL_ATTRIBUTE_COLD void stop() noexcept
212  {
213  // Stop the backend worker
214  if (!_is_worker_running.exchange(false))
215  {
216  // already stopped
217  return;
218  }
219 
220  // signal wake up the backend worker thread
221  notify();
222 
223  // Wait the backend thread to join, if backend thread was never started it won't be joinable
224  if (_worker_thread.joinable())
225  {
226  _worker_thread.join();
227  }
228 
229  _worker_thread_id.store(0);
230  _backend_worker_lock.reset(nullptr);
231  }
232 
237  void notify()
238  {
239  // Set the flag to indicate that the data is ready
240  {
241  std::lock_guard<std::mutex> lock{_wake_up_mutex};
242  _wake_up_flag = true;
243  }
244 
245  // Signal the condition variable to wake up the worker thread
246  _wake_up_cv.notify_one();
247  }
248 
249 private:
254  QUILL_ATTRIBUTE_COLD static void _ensure_linker_retains_symbols()
255  {
256  // Calls to ensure it is retained by the linker.
257  QUILL_MAYBE_UNUSED static auto thread_name = get_thread_name();
258  (void)thread_name;
259 
260 #if defined(_WIN32)
261  std::wstring const dummy = L"dummy";
262  QUILL_MAYBE_UNUSED static auto encode1 = utf8_encode(dummy);
263  (void)encode1;
264 
265  QUILL_MAYBE_UNUSED static auto encode2 =
266  utf8_encode(reinterpret_cast<std::byte const*>(dummy.data()), dummy.size());
267  (void)encode2;
268 #endif
269  }
270 
274  QUILL_ATTRIBUTE_HOT void _poll()
275  {
276  // load all contexts locally
277  _update_active_thread_contexts_cache();
278 
279  // Read all frontend queues and cache the log statements and the metadata as TransitEvents
280  size_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues();
281 
282  if (cached_transit_events_count != 0)
283  {
284  // there are cached events to process
285  if (cached_transit_events_count < _options.transit_events_soft_limit)
286  {
287  // process a single transit event, then give priority to reading the frontend queues again
288  _process_lowest_timestamp_transit_event();
289  }
290  else
291  {
292  // we want to process a batch of events.
293  while (!has_pending_events_for_caching_when_transit_event_buffer_empty() &&
294  _process_lowest_timestamp_transit_event())
295  {
296  // We need to be cautious because there are log messages in the lock-free queues
297  // that have not yet been cached in the transit event buffer. Logging only the cached
298  // messages can result in out-of-order log entries, as messages with larger timestamps
299  // in the queue might be missed.
300  }
301  }
302  }
303  else
304  {
305  // No cached transit events to process, minimal thread workload.
306 
307  // force flush all remaining messages
308  _flush_and_run_active_sinks(true, _options.sink_min_flush_interval);
309 
310  // check for any dropped messages / blocked threads
311  _check_failure_counter(_options.error_notifier);
312 
313  // This is useful when BackendTscClock is used to keep it up to date
314  _resync_rdtsc_clock();
315 
316  // Also check if all queues are empty
317  bool const queues_and_events_empty = _check_frontend_queues_and_cached_transit_events_empty();
318  if (queues_and_events_empty)
319  {
320  _cleanup_invalidated_thread_contexts();
321  _cleanup_invalidated_loggers();
322  _try_shrink_empty_transit_event_buffers();
323 
324  // There is nothing left to do, and we can let this thread sleep for a while
325  // buffer events are 0 here and also all the producer queues are empty
326  if (_options.sleep_duration.count() != 0)
327  {
328  std::unique_lock<std::mutex> lock{_wake_up_mutex};
329 
330  // Wait for a timeout or a notification to wake up
331  _wake_up_cv.wait_for(lock, _options.sleep_duration, [this] { return _wake_up_flag; });
332 
333  // set the flag back to false since we woke up here
334  _wake_up_flag = false;
335 
336  // After waking up resync rdtsc clock again and resume
337  _resync_rdtsc_clock();
338  }
339  else if (_options.enable_yield_when_idle)
340  {
341  std::this_thread::yield();
342  }
343  }
344  }
345  }
346 
350  QUILL_ATTRIBUTE_COLD void _init(BackendOptions const& options)
351  {
352  _options = options;
353 
354  // Cache this thread's id
355  _worker_thread_id.store(get_thread_id());
356 
357  (void)get_thread_name();
358 
359  // Double check or modify some backend options before we start
360  if (_options.transit_events_hard_limit == 0)
361  {
362  // transit_events_hard_limit of 0 makes no sense as we can't process anything
363  _options.transit_events_hard_limit = 1;
364  }
365 
366  if (_options.transit_events_soft_limit == 0)
367  {
368  _options.transit_events_soft_limit = 1;
369  }
370 
372  {
373  QUILL_THROW(QuillError{fmtquill::format(
374  "transit_events_soft_limit ({}) cannot be greater than transit_events_hard_limit "
375  "({}). Please ensure that the soft limit is less than or equal to the hard limit.",
377  }
378  else if (!is_power_of_two(_options.transit_events_hard_limit))
379  {
380  QUILL_THROW(QuillError{fmtquill::format(
381  "transit_events_hard_limit ({}) must be a power of two", _options.transit_events_hard_limit)});
382  }
383  else if (!is_power_of_two(_options.transit_events_soft_limit))
384  {
385  QUILL_THROW(QuillError{fmtquill::format(
386  "transit_events_soft_limit ({}) must be a power of two", _options.transit_events_soft_limit)});
387  }
388  }
389 
393  QUILL_ATTRIBUTE_COLD void _exit()
394  {
395  while (true)
396  {
397  bool const queues_and_events_empty = (!_options.wait_for_queues_to_empty_before_exit) ||
398  _check_frontend_queues_and_cached_transit_events_empty();
399 
400  if (queues_and_events_empty)
401  {
402  // we are done, all queues are now empty
403  _check_failure_counter(_options.error_notifier);
404  _flush_and_run_active_sinks(false, std::chrono::milliseconds{0});
405  break;
406  }
407 
408  uint64_t const cached_transit_events_count = _populate_transit_events_from_frontend_queues();
409  if (cached_transit_events_count > 0)
410  {
411  while (!has_pending_events_for_caching_when_transit_event_buffer_empty() &&
412  _process_lowest_timestamp_transit_event())
413  {
414  // We need to be cautious because there are log messages in the lock-free queues
415  // that have not yet been cached in the transit event buffer. Logging only the cached
416  // messages can result in out-of-order log entries, as messages with larger timestamps
417  // in the queue might be missed.
418  }
419  }
420  }
421 
422  _cleanup_invalidated_thread_contexts();
423  _cleanup_invalidated_loggers();
424  }
425 
429  QUILL_ATTRIBUTE_HOT size_t _populate_transit_events_from_frontend_queues()
430  {
431  uint64_t const ts_now = _options.log_timestamp_ordering_grace_period.count()
432  ? static_cast<uint64_t>((detail::get_timestamp<std::chrono::system_clock>() - _options.log_timestamp_ordering_grace_period)
433  .count())
434  : std::numeric_limits<uint64_t>::max();
435 
436  size_t cached_transit_events_count{0};
437 
438  for (ThreadContext* thread_context : _active_thread_contexts_cache)
439  {
440  assert(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type());
441 
442  if (thread_context->has_unbounded_queue_type())
443  {
444  cached_transit_events_count += _read_and_decode_frontend_queue(
445  thread_context->get_spsc_queue_union().unbounded_spsc_queue, thread_context, ts_now);
446  }
447  else if (thread_context->has_bounded_queue_type())
448  {
449  cached_transit_events_count += _read_and_decode_frontend_queue(
450  thread_context->get_spsc_queue_union().bounded_spsc_queue, thread_context, ts_now);
451  }
452  }
453 
454  return cached_transit_events_count;
455  }
456 
464  template <typename TFrontendQueue>
465  QUILL_ATTRIBUTE_HOT size_t _read_and_decode_frontend_queue(TFrontendQueue& frontend_queue,
466  ThreadContext* thread_context, uint64_t ts_now)
467  {
468  // Note: The producer commits only complete messages to the queue.
469  // Therefore, if even a single byte is present in the queue, it signifies a full message.
470  size_t const queue_capacity = frontend_queue.capacity();
471  size_t total_bytes_read{0};
472 
473  do
474  {
475  std::byte* read_pos;
476 
477  if constexpr (std::is_same_v<TFrontendQueue, UnboundedSPSCQueue>)
478  {
479  read_pos = _read_unbounded_frontend_queue(frontend_queue, thread_context);
480  }
481  else
482  {
483  read_pos = frontend_queue.prepare_read();
484  }
485 
486  if (!read_pos)
487  {
488  // Exit loop nothing to read
489  break;
490  }
491 
492  std::byte const* const read_begin = read_pos;
493 
494  if (!_populate_transit_event_from_frontend_queue(read_pos, thread_context, ts_now))
495  {
496  // If _get_transit_event_from_queue returns false, stop reading
497  break;
498  }
499 
500  // Finish reading
501  assert((read_pos >= read_begin) && "read_buffer should be greater or equal to read_begin");
502  auto const bytes_read = static_cast<size_t>(read_pos - read_begin);
503  frontend_queue.finish_read(bytes_read);
504  total_bytes_read += bytes_read;
505  // Reads a maximum of one full frontend queue or the transit events' hard limit to prevent
506  // getting stuck on the same producer.
507  } while ((total_bytes_read < queue_capacity) &&
508  (thread_context->_transit_event_buffer->size() < _options.transit_events_hard_limit));
509 
510  if (total_bytes_read != 0)
511  {
512  // If we read something from the queue, we commit all the reads together at the end.
513  // This strategy enhances cache coherence performance by updating the shared atomic flag
514  // only once.
515  frontend_queue.commit_read();
516  }
517 
518  return thread_context->_transit_event_buffer->size();
519  }
520 
521  /***/
522  QUILL_ATTRIBUTE_HOT bool _populate_transit_event_from_frontend_queue(std::byte*& read_pos,
523  ThreadContext* thread_context,
524  uint64_t ts_now)
525  {
526  assert(thread_context->_transit_event_buffer);
527 
528  // Allocate a new TransitEvent or use an existing one to store the message from the queue
529  TransitEvent* transit_event = thread_context->_transit_event_buffer->back();
530 
531  assert(transit_event->formatted_msg);
532 
533  std::memcpy(&transit_event->timestamp, read_pos, sizeof(transit_event->timestamp));
534  read_pos += sizeof(transit_event->timestamp);
535 
536  std::memcpy(&transit_event->macro_metadata, read_pos, sizeof(transit_event->macro_metadata));
537  read_pos += sizeof(transit_event->macro_metadata);
538 
539  std::memcpy(&transit_event->logger_base, read_pos, sizeof(transit_event->logger_base));
540  read_pos += sizeof(transit_event->logger_base);
541 
542  if (transit_event->logger_base->_clock_source == ClockSourceType::Tsc)
543  {
544  // If using the rdtsc clock, convert the value to nanoseconds since epoch.
545  // This conversion ensures that every transit inserted in the buffer below has a timestamp in
546  // nanoseconds since epoch, allowing compatibility with Logger objects using different clocks.
547  if (QUILL_UNLIKELY(!_rdtsc_clock.load(std::memory_order_relaxed)))
548  {
549  // Lazy initialization of rdtsc clock on the backend thread only if the user decides to use
550  // it. The clock requires a few seconds to init as it is taking samples first.
551  _rdtsc_clock.store(new RdtscClock{_options.rdtsc_resync_interval}, std::memory_order_release);
552  _last_rdtsc_resync_time = std::chrono::steady_clock::now();
553  }
554 
555  // Convert the rdtsc value to nanoseconds since epoch.
556  transit_event->timestamp =
557  _rdtsc_clock.load(std::memory_order_relaxed)->time_since_epoch(transit_event->timestamp);
558  }
559 
560  // Check if strict log timestamp order is enabled and the clock source is not User
561  if ((transit_event->logger_base->_clock_source != ClockSourceType::User) &&
562  (ts_now != std::numeric_limits<uint64_t>::max()))
563  {
564  // We only check against `ts_now` for real timestamps, not custom timestamps by the user, and
565  // when the grace period is enabled
566 
567 #ifndef NDEBUG
568  // Check the timestamps we are comparing have the same digits
569  auto count_digits = [](uint64_t number)
570  {
571  uint32_t digits = 0;
572  do
573  {
574  digits++;
575  number /= 10;
576  } while (number != 0);
577  return digits;
578  };
579 
580  assert(count_digits(transit_event->timestamp) == count_digits(ts_now));
581 #endif
582 
583  // Ensure the message timestamp is not greater than ts_now.
584  if (QUILL_UNLIKELY(transit_event->timestamp > ts_now))
585  {
586  // If the message timestamp is ahead of the current time, temporarily halt processing.
587  // This guarantees the integrity of message order and avoids missed messages.
588  // We halt processing here to avoid introducing out-of-sequence messages.
589  // This scenario prevents potential race conditions where timestamps from
590  // the last queue could overwrite those from the first queue before they are included.
591  // We return at this point without adding the current event to the buffer.
592  return false;
593  }
594  }
595 
596  FormatArgsDecoder format_args_decoder;
597  std::memcpy(&format_args_decoder, read_pos, sizeof(format_args_decoder));
598  read_pos += sizeof(format_args_decoder);
599 
600  if ((transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataDeepCopy) ||
601  (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataShallowCopy) ||
602  (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataHybridCopy))
603  {
604  _apply_runtime_metadata(read_pos, transit_event);
605  }
606 
607  // we need to check and do not try to format the flush events as that wouldn't be valid
608  if ((transit_event->macro_metadata->event() != MacroMetadata::Event::Flush) &&
609  (transit_event->macro_metadata->event() != MacroMetadata::Event::LoggerRemovalRequest))
610  {
611  format_args_decoder(read_pos, _format_args_store);
612 
613  if (!transit_event->macro_metadata->has_named_args())
614  {
615  _populate_formatted_log_message(transit_event, transit_event->macro_metadata->message_format());
616  }
617  else
618  {
619  // using the message_format as key for lookups
620  _named_args_format_template.assign(transit_event->macro_metadata->message_format());
621 
622  if (auto const search = _named_args_templates.find(_named_args_format_template);
623  search != std::cend(_named_args_templates))
624  {
625  // process named args message when we already have parsed the format message once,
626  // and we have the names of each arg cached
627  auto const& [message_format, arg_names] = search->second;
628 
629  _populate_formatted_log_message(transit_event, message_format.data());
630  _populate_formatted_named_args(transit_event, arg_names);
631  }
632  else
633  {
634  // process named args log when the message format is processed for the first time
635  // parse name of each arg and stored them to our lookup map
636  auto const [res_it, inserted] = _named_args_templates.try_emplace(
637  _named_args_format_template,
638  _process_named_args_format_message(transit_event->macro_metadata->message_format()));
639 
640  auto const& [message_format, arg_names] = res_it->second;
641 
642  // suppress unused warnings
643  (void)inserted;
644 
645  _populate_formatted_log_message(transit_event, message_format.data());
646  _populate_formatted_named_args(transit_event, arg_names);
647  }
648  }
649  }
650  else if (transit_event->macro_metadata->event() == MacroMetadata::Event::Flush)
651  {
652  // if this is a flush event then we do not need to format anything for the
653  // transit_event, but we need to set the transit event's flush_flag pointer instead
654  uintptr_t flush_flag_tmp;
655  std::memcpy(&flush_flag_tmp, read_pos, sizeof(uintptr_t));
656  transit_event->flush_flag = reinterpret_cast<std::atomic<bool>*>(flush_flag_tmp);
657  read_pos += sizeof(uintptr_t);
658  }
659  else
660  {
661  // Store the logger name and the sync flag
662  assert(transit_event->macro_metadata->event() == MacroMetadata::Event::LoggerRemovalRequest);
663 
664  uintptr_t logger_removal_flag_tmp;
665  std::memcpy(&logger_removal_flag_tmp, read_pos, sizeof(uintptr_t));
666  read_pos += sizeof(uintptr_t);
667  std::string_view const logger_name = Codec<std::string>::decode_arg(read_pos);
668 
669  _logger_removal_flags.emplace(std::string{logger_name},
670  reinterpret_cast<std::atomic<bool>*>(logger_removal_flag_tmp));
671  }
672 
673  // commit this transit event
674  thread_context->_transit_event_buffer->push_back();
675 
676  return true;
677  }
678 
683  QUILL_ATTRIBUTE_HOT bool has_pending_events_for_caching_when_transit_event_buffer_empty() noexcept
684  {
685  _update_active_thread_contexts_cache();
686 
687  for (ThreadContext* thread_context : _active_thread_contexts_cache)
688  {
689  assert(thread_context->_transit_event_buffer &&
690  "transit_event_buffer should always be valid here as we always populate it with the "
691  "_active_thread_contexts_cache");
692 
693  if (thread_context->_transit_event_buffer->empty())
694  {
695  // if there is no _transit_event_buffer yet then check only the queue
696  if (thread_context->has_unbounded_queue_type() &&
697  !thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty())
698  {
699  return true;
700  }
701 
702  if (thread_context->has_bounded_queue_type() &&
703  !thread_context->get_spsc_queue_union().bounded_spsc_queue.empty())
704  {
705  return true;
706  }
707  }
708  }
709 
710  return false;
711  }
712 
716  QUILL_ATTRIBUTE_HOT bool _process_lowest_timestamp_transit_event()
717  {
718  // Get the lowest timestamp
719  uint64_t min_ts{std::numeric_limits<uint64_t>::max()};
720  ThreadContext* thread_context{nullptr};
721 
722  for (ThreadContext* tc : _active_thread_contexts_cache)
723  {
724  assert(tc->_transit_event_buffer &&
725  "transit_event_buffer should always be valid here as we always populate it with the "
726  "_active_thread_contexts_cache");
727 
728  TransitEvent const* te = tc->_transit_event_buffer->front();
729  if (te && (min_ts > te->timestamp))
730  {
731  min_ts = te->timestamp;
732  thread_context = tc;
733  }
734  }
735 
736  if (!thread_context)
737  {
738  // all transit event buffers are empty
739  return false;
740  }
741 
742  TransitEvent* transit_event = thread_context->_transit_event_buffer->front();
743  assert(transit_event && "transit_buffer is set only when transit_event is valid");
744 
745  std::atomic<bool>* flush_flag{nullptr};
746 
747  QUILL_TRY { _process_transit_event(*thread_context, *transit_event, flush_flag); }
748 #if !defined(QUILL_NO_EXCEPTIONS)
749  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
750  QUILL_CATCH_ALL()
751  {
752  _options.error_notifier(std::string{"Caught unhandled exception."});
753  } // clang-format on
754 #endif
755 
756  // Finally, clean up any remaining fields in the transit event
757  if (transit_event->extra_data)
758  {
759  transit_event->extra_data->named_args.clear();
760  transit_event->extra_data->runtime_metadata.has_runtime_metadata = false;
761  }
762 
763  thread_context->_transit_event_buffer->pop_front();
764 
765  if (flush_flag)
766  {
767  // Process the second part of the flush event after it's been removed from the buffer,
768  // ensuring that we are no longer interacting with the thread_context or transit_event.
769 
770  // This is particularly important for handling cases when Quill is used as a DLL on Windows.
771  // If `FreeLibrary` is called, the backend thread may attempt to access an invalidated
772  // `ThreadContext`, which can lead to a crash due to invalid memory access.
773  //
774  // To prevent this, whenever we receive a Flush event, we clean up any invalidated thread contexts
775  // before notifying the caller. This ensures that when `flush_log()` is invoked in `DllMain`
776  // during `DLL_PROCESS_DETACH`, the `ThreadContext` is properly cleaned up before the DLL exits.
777  _cleanup_invalidated_thread_contexts();
778 
779  // Now it’s safe to notify the caller to continue execution.
780  flush_flag->store(true);
781  }
782 
783  return true;
784  }
785 
789  QUILL_ATTRIBUTE_HOT void _process_transit_event(ThreadContext const& thread_context,
790  TransitEvent& transit_event, std::atomic<bool>*& flush_flag)
791  {
792  // If backend_process(...) throws we want to skip this event and move to the next, so we catch
793  // the error here instead of catching it in the parent try/catch block of main_loop
794  if (transit_event.macro_metadata->event() == MacroMetadata::Event::Log)
795  {
796  if (transit_event.log_level() != LogLevel::Backtrace)
797  {
798  _dispatch_transit_event_to_sinks(transit_event, thread_context.thread_id(),
799  thread_context.thread_name());
800 
801  // We also need to check the severity of the log message here against the backtrace
802  // Check if we should also flush the backtrace messages:
803  // After we forwarded the message we will check the severity of this message for this logger
804  // If the severity of the message is higher than the backtrace flush severity we will also
805  // flush the backtrace of the logger
806  if (QUILL_UNLIKELY(transit_event.log_level() >=
807  transit_event.logger_base->_backtrace_flush_level.load(std::memory_order_relaxed)))
808  {
809  if (transit_event.logger_base->_backtrace_storage)
810  {
811  transit_event.logger_base->_backtrace_storage->process(
812  [this](TransitEvent const& te, std::string_view thread_id, std::string_view thread_name)
813  { _dispatch_transit_event_to_sinks(te, thread_id, thread_name); });
814  }
815  }
816  }
817  else
818  {
819  if (transit_event.logger_base->_backtrace_storage)
820  {
821  // this is a backtrace log and we will store the transit event
822  // we need to pass a copy of transit_event here and not move the existing
823  // the transit events are reused
824  TransitEvent transit_event_copy;
825  transit_event.copy_to(transit_event_copy);
826 
827  transit_event.logger_base->_backtrace_storage->store(
828  std::move(transit_event_copy), thread_context.thread_id(), thread_context.thread_name());
829  }
830  else
831  {
832  QUILL_THROW(
833  QuillError{"logger->init_backtrace(...) needs to be called first before using "
834  "LOG_BACKTRACE(...)."});
835  }
836  }
837  }
838  else if (transit_event.macro_metadata->event() == MacroMetadata::Event::InitBacktrace)
839  {
840  // we can just convert the capacity back to int here and use it
841  if (!transit_event.logger_base->_backtrace_storage)
842  {
843  // Lazy BacktraceStorage creation
844  transit_event.logger_base->_backtrace_storage = std::make_shared<BacktraceStorage>();
845  }
846 
847  transit_event.logger_base->_backtrace_storage->set_capacity(static_cast<uint32_t>(std::stoul(
848  std::string{transit_event.formatted_msg->begin(), transit_event.formatted_msg->end()})));
849  }
850  else if (transit_event.macro_metadata->event() == MacroMetadata::Event::FlushBacktrace)
851  {
852  if (transit_event.logger_base->_backtrace_storage)
853  {
854  // process all records in backtrace for this logger and log them
855  transit_event.logger_base->_backtrace_storage->process(
856  [this](TransitEvent const& te, std::string_view thread_id, std::string_view thread_name)
857  { _dispatch_transit_event_to_sinks(te, thread_id, thread_name); });
858  }
859  }
860  else if (transit_event.macro_metadata->event() == MacroMetadata::Event::Flush)
861  {
862  _flush_and_run_active_sinks(false, std::chrono::milliseconds{0});
863 
864  // This is a flush event, so we capture the flush flag to notify the caller after processing.
865  flush_flag = transit_event.flush_flag;
866 
867  // Reset the flush flag as TransitEvents are re-used, preventing incorrect flag reuse.
868  transit_event.flush_flag = nullptr;
869 
870  // We defer notifying the caller until after this function completes.
871  }
872  }
873 
877  QUILL_ATTRIBUTE_HOT void _dispatch_transit_event_to_sinks(TransitEvent const& transit_event,
878  std::string_view const& thread_id,
879  std::string_view const& thread_name)
880  {
881  // First check to see if we should init the pattern formatter on a new Logger
882  // Look up to see if we have the formatter and if not create it
883  if (QUILL_UNLIKELY(!transit_event.logger_base->_pattern_formatter))
884  {
885  // Search for an existing pattern_formatter in each logger
886  _logger_manager.for_each_logger(
887  [&transit_event](LoggerBase* logger)
888  {
889  if (logger->_pattern_formatter &&
890  (logger->_pattern_formatter->get_options() == transit_event.logger_base->_pattern_formatter_options))
891  {
892  // hold a copy of the shared_ptr of the same formatter
893  transit_event.logger_base->_pattern_formatter = logger->_pattern_formatter;
894  return true;
895  }
896 
897  return false;
898  });
899 
900  if (!transit_event.logger_base->_pattern_formatter)
901  {
902  // Didn't find an existing formatter need to create a new pattern formatter
903  transit_event.logger_base->_pattern_formatter =
904  std::make_shared<PatternFormatter>(transit_event.logger_base->_pattern_formatter_options);
905  }
906  }
907 
908  assert(transit_event.logger_base->_pattern_formatter &&
909  "transit_event->logger_base->pattern_formatter should be valid here");
910 
911  // proceed after ensuring a pattern formatter exists
912  std::string_view const log_level_description =
913  log_level_to_string(transit_event.log_level(), _options.log_level_descriptions.data(),
914  _options.log_level_descriptions.size());
915 
916  std::string_view const log_level_short_code =
917  log_level_to_string(transit_event.log_level(), _options.log_level_short_codes.data(),
918  _options.log_level_short_codes.size());
919 
920  _write_log_statement(
921  transit_event, thread_id, thread_name, log_level_description, log_level_short_code,
922  std::string_view{transit_event.formatted_msg->data(), transit_event.formatted_msg->size()});
923  }
924 
928  QUILL_ATTRIBUTE_HOT void _write_log_statement(TransitEvent const& transit_event,
929  std::string_view const& thread_id,
930  std::string_view const& thread_name,
931  std::string_view const& log_level_description,
932  std::string_view const& log_level_short_code,
933  std::string_view const& log_message) const
934  {
935  std::string_view default_log_statement;
936 
937  // Process each sink with the appropriate formatting and filtering
938  for (auto& sink : transit_event.logger_base->_sinks)
939  {
940  std::string_view log_to_write;
941 
942  // Determine which formatted log to use
943  if (!sink->_override_pattern_formatter_options)
944  {
945  if (default_log_statement.empty())
946  {
947  // Use the default formatted log statement, here by checking empty() we try to format once
948  // even for multiple sinks
949  default_log_statement = transit_event.logger_base->_pattern_formatter->format(
950  transit_event.timestamp, thread_id, thread_name, _process_id,
951  transit_event.logger_base->_logger_name, log_level_description, log_level_short_code,
952  *transit_event.macro_metadata, transit_event.get_named_args(), log_message);
953  }
954 
955  log_to_write = default_log_statement;
956  }
957  else
958  {
959  // Sink has override_pattern_formatter_options, we do not include PatternFormatter
960  // in the frontend fo this reason we init PatternFormatter here
961  if (!sink->_override_pattern_formatter)
962  {
963  // Initialize override formatter if needed
964  sink->_override_pattern_formatter =
965  std::make_shared<PatternFormatter>(*sink->_override_pattern_formatter_options);
966  }
967 
968  // Use the sink's override formatter
969  log_to_write = sink->_override_pattern_formatter->format(
970  transit_event.timestamp, thread_id, thread_name, _process_id,
971  transit_event.logger_base->_logger_name, log_level_description, log_level_short_code,
972  *transit_event.macro_metadata, transit_event.get_named_args(), log_message);
973  }
974 
975  // Apply filters now that we have the formatted log
976  if (sink->apply_all_filters(transit_event.macro_metadata, transit_event.timestamp, thread_id,
977  thread_name, transit_event.logger_base->_logger_name,
978  transit_event.log_level(), log_message, log_to_write))
979  {
980  // Forward the message using the computed log statement that passed the filter
981  sink->write_log(transit_event.macro_metadata, transit_event.timestamp, thread_id,
982  thread_name, _process_id, transit_event.logger_base->_logger_name,
983  transit_event.log_level(), log_level_description, log_level_short_code,
984  transit_event.get_named_args(), log_message, log_to_write);
985  }
986  }
987  }
988 
993  QUILL_ATTRIBUTE_HOT void _check_failure_counter(std::function<void(std::string const&)> const& error_notifier) noexcept
994  {
995  // UnboundedNoMaxLimit does not block or drop messages
996  for (ThreadContext* thread_context : _active_thread_contexts_cache)
997  {
998  if (thread_context->has_bounded_queue_type())
999  {
1000  size_t const failed_messages_cnt = thread_context->get_and_reset_failure_counter();
1001 
1002  if (QUILL_UNLIKELY(failed_messages_cnt > 0))
1003  {
1004  char timestamp[24];
1005  time_t now = time(nullptr);
1006  tm local_time;
1007  localtime_rs(&now, &local_time);
1008  strftime(timestamp, sizeof(timestamp), "%X", &local_time);
1009 
1010  if (thread_context->has_dropping_queue())
1011  {
1012  error_notifier(fmtquill::format("{} Quill INFO: Dropped {} log messages from thread {}",
1013  timestamp, failed_messages_cnt, thread_context->thread_id()));
1014  }
1015  else if (thread_context->has_blocking_queue())
1016  {
1017  error_notifier(
1018  fmtquill::format("{} Quill INFO: Experienced {} blocking occurrences on thread {}",
1019  timestamp, failed_messages_cnt, thread_context->thread_id()));
1020  }
1021  }
1022  }
1023  }
1024  }
1025 
1031  QUILL_ATTRIBUTE_HOT static std::pair<std::string, std::vector<std::pair<std::string, std::string>>> _process_named_args_format_message(
1032  std::string_view fmt_template) noexcept
1033  {
1034  // It would be nice to do this at compile time and store it in macro metadata, but without
1035  // constexpr vector and string in c++17 it is not possible
1036  std::string fmt_str;
1037  std::vector<std::pair<std::string, std::string>> keys;
1038 
1039  size_t cur_pos = 0;
1040 
1041  size_t open_bracket_pos = fmt_template.find_first_of('{');
1042  while (open_bracket_pos != std::string::npos)
1043  {
1044  // found an open bracket
1045  if (size_t const open_bracket_2_pos = fmt_template.find_first_of('{', open_bracket_pos + 1);
1046  open_bracket_2_pos != std::string::npos)
1047  {
1048  // found another open bracket
1049  if ((open_bracket_2_pos - 1) == open_bracket_pos)
1050  {
1051  open_bracket_pos = fmt_template.find_first_of('{', open_bracket_2_pos + 1);
1052  continue;
1053  }
1054  }
1055 
1056  // look for the next close bracket
1057  size_t close_bracket_pos = fmt_template.find_first_of('}', open_bracket_pos + 1);
1058  while (close_bracket_pos != std::string::npos)
1059  {
1060  // found closed bracket
1061  if (size_t const close_bracket_2_pos = fmt_template.find_first_of('}', close_bracket_pos + 1);
1062  close_bracket_2_pos != std::string::npos)
1063  {
1064  // found another open bracket
1065  if ((close_bracket_2_pos - 1) == close_bracket_pos)
1066  {
1067  close_bracket_pos = fmt_template.find_first_of('}', close_bracket_2_pos + 1);
1068  continue;
1069  }
1070  }
1071 
1072  // construct a fmt string excluding the characters inside the brackets { }
1073  std::string_view const text_inside_placeholders =
1074  fmt_template.substr(open_bracket_pos + 1, close_bracket_pos - (open_bracket_pos + 1));
1075  std::string_view arg_syntax;
1076  std::string_view arg_name;
1077 
1078  // look in text_inside_placeholders for special syntax formating following the named arg e.g. arg:.2f
1079  if (size_t const syntax_separator = text_inside_placeholders.find(':');
1080  syntax_separator != std::string_view::npos)
1081  {
1082  arg_syntax = text_inside_placeholders.substr(
1083  syntax_separator, text_inside_placeholders.size() - syntax_separator);
1084  arg_name = text_inside_placeholders.substr(0, syntax_separator);
1085  }
1086  else
1087  {
1088  arg_name = text_inside_placeholders;
1089  }
1090 
1091  fmt_str += fmtquill::format(
1092  "{}{{{}}}", fmt_template.substr(cur_pos, open_bracket_pos - cur_pos), arg_syntax);
1093  cur_pos = close_bracket_pos + 1;
1094 
1095  // also add the keys to the vector
1096  keys.emplace_back(arg_name, arg_syntax);
1097 
1098  break;
1099  }
1100 
1101  open_bracket_pos = fmt_template.find_first_of('{', close_bracket_pos);
1102  }
1103 
1104  // add anything remaining after the last bracket
1105  fmt_str += std::string{fmt_template.substr(cur_pos, fmt_template.length() - cur_pos)};
1106  return std::make_pair(fmt_str, keys);
1107  }
1108 
1115  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* _read_unbounded_frontend_queue(UnboundedSPSCQueue& frontend_queue,
1116  ThreadContext* thread_context) const
1117  {
1118  auto const read_result = frontend_queue.prepare_read();
1119 
1120  if (read_result.allocation)
1121  {
1122  if ((read_result.new_capacity < read_result.previous_capacity) && thread_context->_transit_event_buffer)
1123  {
1124  // The user explicitly requested to shrink the queue, indicating a preference for low memory
1125  // usage. To align with this intent, we also request shrinking the backend buffer.
1126  thread_context->_transit_event_buffer->request_shrink();
1127  }
1128 
1129  // When allocation_info has a value it means that the queue has re-allocated
1130  if (_options.error_notifier)
1131  {
1132  char ts[24];
1133  time_t t = time(nullptr);
1134  tm p;
1135  localtime_rs(std::addressof(t), std::addressof(p));
1136  strftime(ts, 24, "%X", std::addressof(p));
1137 
1138  // we switched to a new here, and we also notify the user of the allocation via the
1139  // error_notifier
1140  _options.error_notifier(
1141  fmtquill::format("{} Quill INFO: Allocated a new SPSC queue with a capacity of {} KiB "
1142  "(previously {} KiB) from thread {}",
1143  ts, (read_result.new_capacity / 1024),
1144  (read_result.previous_capacity / 1024), thread_context->thread_id()));
1145  }
1146  }
1147 
1148  return read_result.read_pos;
1149  }
1150 
1151  QUILL_NODISCARD QUILL_ATTRIBUTE_HOT bool _check_frontend_queues_and_cached_transit_events_empty()
1152  {
1153  _update_active_thread_contexts_cache();
1154 
1155  bool all_empty{true};
1156 
1157  for (ThreadContext* thread_context : _active_thread_contexts_cache)
1158  {
1159  assert(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type());
1160 
1161  if (thread_context->has_unbounded_queue_type())
1162  {
1163  all_empty &= thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty();
1164  }
1165  else if (thread_context->has_bounded_queue_type())
1166  {
1167  all_empty &= thread_context->get_spsc_queue_union().bounded_spsc_queue.empty();
1168  }
1169 
1170  assert(thread_context->_transit_event_buffer &&
1171  "transit_event_buffer should always be valid here as we always populate it with the "
1172  "_active_thread_contexts_cache");
1173 
1174  all_empty &= thread_context->_transit_event_buffer->empty();
1175  }
1176 
1177  return all_empty;
1178  }
1179 
1183  QUILL_ATTRIBUTE_HOT void _resync_rdtsc_clock()
1184  {
1185  if (_rdtsc_clock.load(std::memory_order_relaxed))
1186  {
1187  // resync in rdtsc if we are not logging so that time_since_epoch() still works
1188  if (auto const now = std::chrono::steady_clock::now();
1189  (now - _last_rdtsc_resync_time) > _options.rdtsc_resync_interval)
1190  {
1191  if (_rdtsc_clock.load(std::memory_order_relaxed)->resync(2500))
1192  {
1193  _last_rdtsc_resync_time = now;
1194  }
1195  }
1196  }
1197  }
1198 
1199  /***/
1200  QUILL_ATTRIBUTE_HOT void _flush_and_run_active_sinks(bool run_periodic_tasks, std::chrono::milliseconds sink_min_flush_interval)
1201  {
1202  // Populate the active sinks cache with unique sinks, consider only the valid loggers
1203  _logger_manager.for_each_logger(
1204  [this](LoggerBase* logger)
1205  {
1206  if (logger->is_valid_logger())
1207  {
1208  for (std::shared_ptr<Sink> const& sink : logger->_sinks)
1209  {
1210  Sink* logger_sink_ptr = sink.get();
1211  auto search_it = std::find_if(_active_sinks_cache.begin(), _active_sinks_cache.end(),
1212  [logger_sink_ptr](Sink* elem)
1213  {
1214  // no one else can remove the shared pointer as this is
1215  // only running on backend thread
1216  return elem == logger_sink_ptr;
1217  });
1218 
1219  if (search_it == std::end(_active_sinks_cache))
1220  {
1221  _active_sinks_cache.push_back(logger_sink_ptr);
1222  }
1223  }
1224  }
1225 
1226  // return false to never end the loop early
1227  return false;
1228  });
1229 
1230  bool should_flush_sinks{false};
1231  if (sink_min_flush_interval.count())
1232  {
1233  // conditional flush sinks
1234  if (auto const now = std::chrono::steady_clock::now(); (now - _last_sink_flush_time) > sink_min_flush_interval)
1235  {
1236  should_flush_sinks = true;
1237  _last_sink_flush_time = now;
1238  }
1239  }
1240  else
1241  {
1242  // sink_min_flush_interval == 0 - always flush sinks
1243  should_flush_sinks = true;
1244  }
1245 
1246  for (auto const& sink : _active_sinks_cache)
1247  {
1248  QUILL_TRY
1249  {
1250  if (should_flush_sinks)
1251  {
1252  // If an exception is thrown, catch it here to prevent it from propagating
1253  // to the outer function. This prevents potential infinite loops caused by failing
1254  // flush operations.
1255  sink->flush_sink();
1256  }
1257  }
1258 #if !defined(QUILL_NO_EXCEPTIONS)
1259  QUILL_CATCH(std::exception const& e) { _options.error_notifier(e.what()); }
1260  QUILL_CATCH_ALL() { _options.error_notifier(std::string{"Caught unhandled exception."}); }
1261 #endif
1262 
1263  if (run_periodic_tasks)
1264  {
1265  sink->run_periodic_tasks();
1266  }
1267  }
1268 
1269  _active_sinks_cache.clear();
1270  }
1271 
1275  QUILL_ATTRIBUTE_HOT void _update_active_thread_contexts_cache()
1276  {
1277  // Check if _thread_contexts has changed. This can happen only when a new thread context is added by any Logger
1278  if (QUILL_UNLIKELY(_thread_context_manager.new_thread_context_flag()))
1279  {
1280  _active_thread_contexts_cache.clear();
1281  _thread_context_manager.for_each_thread_context(
1282  [this](ThreadContext* thread_context)
1283  {
1284  if (!thread_context->_transit_event_buffer)
1285  {
1286  // Lazy initialise the _transit_event_buffer for this thread_context
1287  thread_context->_transit_event_buffer =
1288  std::make_shared<TransitEventBuffer>(_options.transit_event_buffer_initial_capacity);
1289  }
1290 
1291  // We do not skip invalidated && empty queue thread contexts as this is very rare,
1292  // so instead we just add them and expect them to be cleaned in the next iteration
1293  _active_thread_contexts_cache.push_back(thread_context);
1294  });
1295  }
1296  }
1297 
1304  QUILL_ATTRIBUTE_HOT void _cleanup_invalidated_thread_contexts()
1305  {
1306  if (!_thread_context_manager.has_invalid_thread_context())
1307  {
1308  return;
1309  }
1310 
1311  auto find_invalid_and_empty_thread_context_callback = [](ThreadContext* thread_context)
1312  {
1313  // If the thread context is invalid it means the thread that created it has now died.
1314  // We also want to empty the queue from all LogRecords before removing the thread context
1315  if (!thread_context->is_valid())
1316  {
1317  assert(thread_context->has_unbounded_queue_type() || thread_context->has_bounded_queue_type());
1318 
1319  assert(thread_context->_transit_event_buffer &&
1320  "transit_event_buffer should always be valid here as we always populate it with the "
1321  "_active_thread_contexts_cache");
1322 
1323  // detect empty queue
1324  if (thread_context->has_unbounded_queue_type())
1325  {
1326  return thread_context->get_spsc_queue_union().unbounded_spsc_queue.empty() &&
1327  thread_context->_transit_event_buffer->empty();
1328  }
1329 
1330  if (thread_context->has_bounded_queue_type())
1331  {
1332  return thread_context->get_spsc_queue_union().bounded_spsc_queue.empty() &&
1333  thread_context->_transit_event_buffer->empty();
1334  }
1335  }
1336 
1337  return false;
1338  };
1339 
1340  // First we iterate our existing cache and we look for any invalidated contexts
1341  auto found_invalid_and_empty_thread_context =
1342  std::find_if(_active_thread_contexts_cache.begin(), _active_thread_contexts_cache.end(),
1343  find_invalid_and_empty_thread_context_callback);
1344 
1345  while (QUILL_UNLIKELY(found_invalid_and_empty_thread_context != std::end(_active_thread_contexts_cache)))
1346  {
1347  // if we found anything then remove it - Here if we have more than one to remove we will
1348  // try to acquire the lock multiple times, but it should be fine as it is unlikely to have
1349  // that many to remove
1350  _thread_context_manager.remove_shared_invalidated_thread_context(*found_invalid_and_empty_thread_context);
1351 
1352  // We also need to remove it from _thread_context_cache, that is used only by the backend
1353  _active_thread_contexts_cache.erase(found_invalid_and_empty_thread_context);
1354 
1355  // And then look again
1356  found_invalid_and_empty_thread_context =
1357  std::find_if(_active_thread_contexts_cache.begin(), _active_thread_contexts_cache.end(),
1358  find_invalid_and_empty_thread_context_callback);
1359  }
1360  }
1361 
1365  QUILL_ATTRIBUTE_HOT void _cleanup_invalidated_loggers()
1366  {
1367  // since there are no messages we can check for invalidated loggers and clean them up
1368  std::vector<std::string> const removed_loggers = _logger_manager.cleanup_invalidated_loggers(
1369  [this]()
1370  {
1371  // check the queues are empty each time before removing a logger to avoid
1372  // potential race condition of the logger* still being in the queue
1373  return _check_frontend_queues_and_cached_transit_events_empty();
1374  });
1375 
1376  if (!removed_loggers.empty())
1377  {
1378  // if loggers were removed also check for sinks to remove
1379  // cleanup_unused_sinks is expensive and should be only called when it is needed
1380  _sink_manager.cleanup_unused_sinks();
1381 
1382  for (auto const& removed_logger_name : removed_loggers)
1383  {
1384  // Notify the user if the blocking call was used
1385  auto search_it = _logger_removal_flags.find(removed_logger_name);
1386  if (search_it != _logger_removal_flags.end())
1387  {
1388  search_it->second->store(true);
1389  _logger_removal_flags.erase(search_it);
1390  }
1391  }
1392  }
1393  }
1394 
1399  QUILL_ATTRIBUTE_HOT void _try_shrink_empty_transit_event_buffers()
1400  {
1401  for (ThreadContext* thread_context : _active_thread_contexts_cache)
1402  {
1403  if (thread_context->_transit_event_buffer)
1404  {
1405  thread_context->_transit_event_buffer->try_shrink();
1406  }
1407  }
1408  }
1409 
1416  static void _format_and_split_arguments(std::vector<std::pair<std::string, std::string>> const& orig_arg_names,
1417  std::vector<std::pair<std::string, std::string>>& named_args,
1418  DynamicFormatArgStore const& format_args_store,
1419  BackendOptions const& options)
1420  {
1421  // Generate a format string
1422  std::string format_string;
1423  static constexpr std::string_view delimiter{QUILL_MAGIC_SEPARATOR};
1424 
1425  for (size_t i = 0; i < named_args.size(); ++i)
1426  {
1427  // We need an additional check here because named_args can have a size greater than orig_arg_names
1428  // This is because we are adding the arguments without a name with a placeholder name
1429  if ((i < orig_arg_names.size()) && !orig_arg_names[i].second.empty())
1430  {
1431  // orig_arg_names[i].second is special format syntax for the named argument if provided, eg name:.2f
1432  format_string += fmtquill::format("{{{}}}", orig_arg_names[i].second);
1433  }
1434  else
1435  {
1436  format_string += "{}";
1437  }
1438 
1439  if (i < named_args.size() - 1)
1440  {
1441  format_string += delimiter;
1442  }
1443  }
1444 
1445  // Format all values to a single string
1446  std::string formatted_values_str;
1447  fmtquill::vformat_to(std::back_inserter(formatted_values_str), format_string,
1448  fmtquill::basic_format_args<fmtquill::format_context>{
1449  format_args_store.data(), format_args_store.size()});
1450 
1451  // Split the formatted_values to isolate each value
1452  size_t start = 0;
1453  size_t end = 0;
1454  size_t idx = 0;
1455 
1456  while ((end = formatted_values_str.find(delimiter, start)) != std::string::npos)
1457  {
1458  if (idx < named_args.size())
1459  {
1460  named_args[idx++].second = formatted_values_str.substr(start, end - start);
1461  }
1462  start = end + delimiter.length();
1463  }
1464 
1465  // last value
1466  if (idx < named_args.size())
1467  {
1468  named_args[idx].second = formatted_values_str.substr(start);
1469  }
1470 
1471  // We call sanitize_non_printable_chars for each value because formatted_values_str already
1472  // contains non-printable characters for the argument separation
1473  if (options.check_printable_char && format_args_store.has_string_related_type())
1474  {
1475  // if non-printable chars check is configured, or if any of the provided arguments are strings
1476  for (auto& named_arg : named_args)
1477  {
1478  sanitize_non_printable_chars(named_arg.second, options);
1479  }
1480  }
1481  }
1482 
1483  void _populate_formatted_named_args(TransitEvent* transit_event,
1484  std::vector<std::pair<std::string, std::string>> const& arg_names)
1485  {
1486  transit_event->ensure_extra_data();
1487 
1488  auto* named_args = &transit_event->extra_data->named_args;
1489 
1490  named_args->resize(arg_names.size());
1491 
1492  // We first populate the arg names in the transit buffer
1493  for (size_t i = 0; i < arg_names.size(); ++i)
1494  {
1495  (*named_args)[i].first = arg_names[i].first;
1496  }
1497 
1498  for (size_t i = arg_names.size(); i < static_cast<size_t>(_format_args_store.size()); ++i)
1499  {
1500  // we do not have a named_arg for the argument value here so we just append its index as a placeholder
1501  named_args->push_back(std::pair<std::string, std::string>(fmtquill::format("_{}", i), std::string{}));
1502  }
1503 
1504  // Then populate all the values of each arg
1505  QUILL_TRY { _format_and_split_arguments(arg_names, *named_args, _format_args_store, _options); }
1506 #if !defined(QUILL_NO_EXCEPTIONS)
1507  QUILL_CATCH(std::exception const&)
1508  {
1509  // This catch block simply catches the exception.
1510  // Since the error has already been handled in _populate_formatted_log_message,
1511  // there is no additional action required here.
1512  }
1513 #endif
1514  }
1515 
1516  QUILL_ATTRIBUTE_HOT void _populate_formatted_log_message(TransitEvent* transit_event, char const* message_format)
1517  {
1518  transit_event->formatted_msg->clear();
1519 
1520  QUILL_TRY
1521  {
1522  fmtquill::vformat_to(std::back_inserter(*transit_event->formatted_msg), message_format,
1523  fmtquill::basic_format_args<fmtquill::format_context>{
1524  _format_args_store.data(), _format_args_store.size()});
1525 
1526  if (_options.check_printable_char && _format_args_store.has_string_related_type())
1527  {
1528  sanitize_non_printable_chars(*transit_event->formatted_msg, _options);
1529  }
1530  }
1531 #if !defined(QUILL_NO_EXCEPTIONS)
1532  QUILL_CATCH(std::exception const& e)
1533  {
1534  transit_event->formatted_msg->clear();
1535  std::string const error =
1536  fmtquill::format(R"([Could not format log statement. message: "{}", location: "{}", error: "{}"])",
1537  transit_event->macro_metadata->message_format(),
1538  transit_event->macro_metadata->short_source_location(), e.what());
1539 
1540  transit_event->formatted_msg->append(error);
1541  _options.error_notifier(error);
1542  }
1543 #endif
1544  }
1545 
1546  void _apply_runtime_metadata(std::byte*& read_pos, TransitEvent* transit_event)
1547  {
1548  char const* fmt;
1549  char const* file;
1550  char const* function;
1551  char const* tags;
1552 
1553  if (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataDeepCopy)
1554  {
1555  fmt = Codec<char const*>::decode_arg(read_pos);
1556  file = Codec<char const*>::decode_arg(read_pos);
1557  function = Codec<char const*>::decode_arg(read_pos);
1558  tags = Codec<char const*>::decode_arg(read_pos);
1559  }
1560  else if (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataShallowCopy)
1561  {
1562  fmt = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1563  file = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1564  function = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1565  tags = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1566  }
1567  else if (transit_event->macro_metadata->event() == MacroMetadata::Event::LogWithRuntimeMetadataHybridCopy)
1568  {
1569  fmt = Codec<char const*>::decode_arg(read_pos);
1570  file = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1571  function = static_cast<char const*>(Codec<void const*>::decode_arg(read_pos));
1572  tags = Codec<char const*>::decode_arg(read_pos);
1573  }
1574  else
1575  {
1576  QUILL_THROW(
1577  QuillError{"Unexpected event type in _apply_runtime_metadata. This should never happen."});
1578  }
1579 
1580  auto const line = Codec<uint32_t>::decode_arg(read_pos);
1581  auto const log_level = Codec<LogLevel>::decode_arg(read_pos);
1582 
1583  auto temp = TransitEvent::RuntimeMetadata{file, line, function, tags, fmt, log_level};
1584 
1585  transit_event->ensure_extra_data();
1586  transit_event->extra_data->runtime_metadata = temp;
1587 
1588  // point to the runtime metadata
1589  transit_event->macro_metadata = &transit_event->extra_data->runtime_metadata.macro_metadata;
1590  }
1591 
1592  template <typename TFormattedMsg>
1593  static void sanitize_non_printable_chars(TFormattedMsg& formatted_msg, BackendOptions const& options)
1594  {
1595  // check for non-printable characters in the formatted_msg
1596  bool contains_non_printable_char{false};
1597 
1598  for (char c : formatted_msg)
1599  {
1600  if (!options.check_printable_char(c))
1601  {
1602  contains_non_printable_char = true;
1603  break;
1604  }
1605  }
1606 
1607  if (contains_non_printable_char)
1608  {
1609  // in this rare event we will replace the non-printable chars with their hex values
1610  std::string const formatted_msg_copy = {formatted_msg.data(), formatted_msg.size()};
1611  formatted_msg.clear();
1612 
1613  for (char c : formatted_msg_copy)
1614  {
1615  if (options.check_printable_char(c))
1616  {
1617  formatted_msg.append(std::string{c});
1618  }
1619  else
1620  {
1621  // convert non-printable character to hex
1622  constexpr char hex[] = "0123456789ABCDEF";
1623  formatted_msg.append(std::string{'\\'});
1624  formatted_msg.append(std::string{'x'});
1625  formatted_msg.append(std::string{hex[(c >> 4) & 0xF]});
1626  formatted_msg.append(std::string{hex[c & 0xF]});
1627  }
1628  }
1629  }
1630  }
1631 
1632 private:
1633  friend class quill::ManualBackendWorker;
1634 
1635  std::unique_ptr<BackendWorkerLock> _backend_worker_lock;
1636  ThreadContextManager& _thread_context_manager = ThreadContextManager::instance();
1637  SinkManager& _sink_manager = SinkManager::instance();
1638  LoggerManager& _logger_manager = LoggerManager::instance();
1639  BackendOptions _options;
1640  std::thread _worker_thread;
1641 
1642  DynamicFormatArgStore _format_args_store;
1643  std::vector<ThreadContext*> _active_thread_contexts_cache;
1644  std::vector<Sink*> _active_sinks_cache;
1645  std::unordered_map<std::string, std::pair<std::string, std::vector<std::pair<std::string, std::string>>>> _named_args_templates;
1646  std::unordered_map<std::string, std::atomic<bool>*> _logger_removal_flags;
1647  std::string _named_args_format_template;
1648  std::string _process_id;
1649  std::chrono::steady_clock::time_point _last_rdtsc_resync_time;
1650  std::chrono::steady_clock::time_point _last_sink_flush_time;
1651  std::atomic<uint32_t> _worker_thread_id{0};
1652  std::atomic<bool> _is_worker_running{false};
1654  alignas(QUILL_CACHE_LINE_ALIGNED) std::atomic<RdtscClock*> _rdtsc_clock{
1655  nullptr};
1656  alignas(QUILL_CACHE_LINE_ALIGNED) std::mutex _wake_up_mutex;
1657  std::condition_variable _wake_up_cv;
1658  bool _wake_up_flag{false};
1659 };
1660 } // namespace detail
1661 
1662 QUILL_END_NAMESPACE
Definition: base.h:1044
std::unique_ptr< ExtraData > extra_data
buffer for message
Definition: TransitEvent.h:216
A single-producer single-consumer FIFO circular buffer.
Definition: UnboundedSPSCQueue.h:37
void notify()
Wakes up the backend worker thread.
Definition: BackendWorker.h:237
Base class for sinks.
Definition: Sink.h:40
QUILL_ATTRIBUTE_COLD void run(BackendOptions const &options)
Starts the backend worker thread.
Definition: BackendWorker.h:133
bool enable_yield_when_idle
The backend employs "busy-waiting" by spinning around each frontend thread&#39;s queue.
Definition: BackendOptions.h:44
Definition: TransitEvent.h:32
~BackendWorker()
Destructor.
Definition: BackendWorker.h:89
Definition: TransitEvent.h:134
QUILL_NODISCARD uint64_t time_since_epoch(uint64_t rdtsc_value) const
Access the rdtsc class from any thread to convert an rdtsc value to wall time.
Definition: BackendWorker.h:107
Definition: ThreadContextManager.h:210
void(*)(std::byte *&data, DynamicFormatArgStore &args_store) FormatArgsDecoder
Decode functions.
Definition: Codec.h:382
uint16_t cpu_affinity
Pins the backend to the specified CPU.
Definition: BackendOptions.h:146
std::function< void(std::string const &)> error_notifier
The backend may encounter exceptions that cannot be caught within user threads.
Definition: BackendOptions.h:162
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED std::string get_thread_name()
Returns the name of the thread.
Definition: ThreadUtilities.h:147
std::chrono::nanoseconds sleep_duration
Specifies the duration the backend sleeps if there is no remaining work to process in the queues...
Definition: BackendOptions.h:49
Similar to fmt::dynamic_arg_store but better suited to our needs e.g does not include <functional> an...
Definition: DynamicFormatArgStore.h:60
tm * localtime_rs(time_t const *timer, tm *buf)
Portable localtime_r or _s per operating system.
Definition: TimeUtilities.h:59
BackendWorker()
Constructor.
Definition: BackendWorker.h:78
Definition: LogFunctions.h:261
QUILL_NODISCARD bool is_valid_logger() const noexcept
Checks if the logger is valid.
Definition: LoggerBase.h:111
void for_each_logger(TCallback cb) const
For backend use only.
Definition: LoggerManager.h:109
Setups a signal handler to handle fatal signals.
Definition: BackendManager.h:24
typename = void for specializations with enable_if
Definition: Codec.h:142
Converts tsc ticks to nanoseconds since epoch.
Definition: RdtscClock.h:30
std::array< std::string, 11 > log_level_descriptions
Holds descriptive names for various log levels used in logging operations.
Definition: BackendOptions.h:225
size_t transit_events_hard_limit
The backend gives priority to reading messages from the frontend queues and temporarily buffers them...
Definition: BackendOptions.h:92
QUILL_ATTRIBUTE_COLD void stop() noexcept
Stops the backend worker thread.
Definition: BackendWorker.h:211
This class can be used when you want to run the backend worker on your own thread.
Definition: ManualBackendWorker.h:20
Definition: LoggerBase.h:36
custom exception
Definition: QuillError.h:45
QUILL_NODISCARD uint32_t get_backend_thread_id() const noexcept
Get the backend worker&#39;s thread id.
Definition: BackendWorker.h:124
std::array< std::string, 11 > log_level_short_codes
Short codes or identifiers for each log level.
Definition: BackendOptions.h:235
bool wait_for_queues_to_empty_before_exit
When this option is enabled and the application is terminating, the backend worker thread will not ex...
Definition: BackendOptions.h:137
size_t transit_events_soft_limit
The backend gives priority to reading messages from the frontend queues of all the hot threads and te...
Definition: BackendOptions.h:75
QUILL_NODISCARD constexpr bool is_power_of_two(uint64_t number) noexcept
Check if a number is a power of 2.
Definition: MathUtilities.h:25
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT ReadResult prepare_read()
Prepare to read from the buffer a callback used for notifications to the user.
Definition: UnboundedSPSCQueue.h:185
Definition: SinkManager.h:28
bool check_backend_singleton_instance
Enables a runtime check to detect multiple instances of the backend singleton.
Definition: BackendOptions.h:258
std::atomic< bool > * flush_flag
A unique ptr to save space as these fields not always used.
Definition: TransitEvent.h:217
std::string thread_name
The name assigned to the backend, visible during thread naming queries (e.g., pthread_getname_np) or ...
Definition: BackendOptions.h:36
Definition: BackendWorker.h:72
QUILL_NODISCARD QUILL_EXPORT QUILL_ATTRIBUTE_USED uint32_t get_thread_id() noexcept
Returns the os assigned ID of the thread.
Definition: ThreadUtilities.h:193
Definition: LoggerManager.h:34
Configuration options for the backend.
Definition: BackendOptions.h:30
std::chrono::microseconds log_timestamp_ordering_grace_period
The backend iterates through all frontend lock-free queues and pops all messages from each queue...
Definition: BackendOptions.h:124
std::function< bool(char c)> check_printable_char
This option enables a check that verifies the log message contains only printable characters before f...
Definition: BackendOptions.h:217
Definition: ThreadContextManager.h:48
std::chrono::milliseconds rdtsc_resync_interval
This option is only applicable if at least one frontend is using a Logger with ClockSourceType::Tsc.
Definition: BackendOptions.h:185
std::chrono::milliseconds sink_min_flush_interval
This option specifies the minimum time interval (in milliseconds) before the backend thread flushes t...
Definition: BackendOptions.h:202