C++ Actor Framework  0.18
Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Attributes | Related Functions | List of all members
caf::scheduled_actor Class Reference

A cooperatively scheduled, event-based actor implementation. More...

#include <scheduled_actor.hpp>

Inheritance diagram for caf::scheduled_actor:
Inheritance graph
Collaboration diagram for caf::scheduled_actor:
Collaboration graph

Classes

struct  mailbox_policy
 Configures the FIFO inbox with four nested queues: More...
 

Public Types

enum  message_category {
  message_category::ordinary,
  message_category::internal,
  message_category::skipped
}
 Categorizes incoming messages. More...
 
enum  activation_result {
  activation_result::success,
  activation_result::terminated,
  activation_result::skipped,
  activation_result::dropped
}
 Result of one-shot activations. More...
 
using super = local_actor
 Base type.
 
using inbound_stream_metrics_map = std::unordered_map< type_id_t, inbound_stream_metrics_t >
 Maps types to metric objects for inbound stream traffic.
 
using outbound_stream_metrics_map = std::unordered_map< type_id_t, outbound_stream_metrics_t >
 Maps types to metric objects for outbound stream traffic.
 
using stream_manager_map = std::map< stream_slot, stream_manager_ptr >
 Maps slot IDs to stream managers.
 
using normal_queue = intrusive::drr_cached_queue< policy::normal_messages >
 Stores asynchronous messages with default priority.
 
using urgent_queue = intrusive::drr_cached_queue< policy::urgent_messages >
 Stores asynchronous messages with hifh priority.
 
using upstream_queue = intrusive::drr_queue< policy::upstream_messages >
 Stores upstream messages.
 
using downstream_queue = intrusive::wdrr_dynamic_multiplexed_queue< policy::downstream_messages >
 Stores downstream messages.
 
using mailbox_type = intrusive::fifo_inbox< mailbox_policy >
 A queue optimized for single-reader-many-writers.
 
using pending_response = std::pair< const message_id, behavior >
 The message ID of an outstanding response with its callback.
 
using pointer = scheduled_actor *
 A pointer to a scheduled actor.
 
using default_handler = std::function< skippable_result(pointer, message &)>
 Function object for handling unmatched messages.
 
using error_handler = std::function< void(pointer, error &)>
 Function object for handling error messages.
 
using down_handler = std::function< void(pointer, down_msg &)>
 Function object for handling down messages.
 
using node_down_handler = std::function< void(pointer, node_down_msg &)>
 Function object for handling node down messages.
 
using exit_handler = std::function< void(pointer, exit_msg &)>
 Function object for handling exit messages.
 
- Public Types inherited from caf::local_actor
using clock_type = std::chrono::steady_clock
 Defines a monotonic clock suitable for measuring intervals.
 
- Public Types inherited from caf::resumable
enum  resume_result {
  resume_later,
  awaiting_message,
  done,
  shutdown_execution_unit
}
 Denotes the state in which a resumable returned from its last call to resume. More...
 
enum  subtype_t {
  unspecified,
  scheduled_actor,
  io_actor,
  function_object
}
 Denotes common subtypes of resumable. More...
 

Public Member Functions

 scheduled_actor (actor_config &cfg)
 
 scheduled_actor (scheduled_actor &&)=delete
 
 scheduled_actor (const scheduled_actor &)=delete
 
scheduled_actoroperator= (scheduled_actor &&)=delete
 
scheduled_actoroperator= (const scheduled_actor &)=delete
 
void enqueue (mailbox_element_ptr ptr, execution_unit *eu) override
 Enqueues a new message wrapped in a mailbox_element to the actor. More...
 
mailbox_element * peek_at_next_mailbox_element () override
 
const char * name () const override
 Returns an implementation-dependent name for logging purposes, which is only valid as long as the actor is running. More...
 
void launch (execution_unit *eu, bool lazy, bool hide) override
 
bool cleanup (error &&fail_state, execution_unit *host) override
 
subtype_t subtype () const override
 Returns a subtype hint for this object. More...
 
void intrusive_ptr_add_ref_impl () override
 Add a strong reference count to this object.
 
void intrusive_ptr_release_impl () override
 Remove a strong reference count from this object.
 
resume_result resume (execution_unit *, size_t) override
 Resume any pending computation until it is either finished or needs to be re-scheduled later. More...
 
virtual proxy_registryproxy_registry_ptr ()
 Returns a factory for proxies created and managed by this actor or nullptr. More...
 
void quit (error x=error{})
 Finishes execution of this actor after any currently running message handler is done. More...
 
mailbox_typemailbox () noexcept
 Returns the queue for storing incoming messages.
 
stream_manager_mapstream_managers () noexcept
 Returns map for all active streams.
 
stream_manager_mappending_stream_managers () noexcept
 Returns map for all pending streams.
 
inbound_stream_metrics_t inbound_stream_metrics (type_id_t type)
 
outbound_stream_metrics_t outbound_stream_metrics (type_id_t type)
 
void set_default_handler (default_handler fun)
 Sets a custom handler for unexpected messages.
 
template<class F >
std::enable_if< std::is_convertible< F, std::function< skippable_result(message &)> >::value >::type set_default_handler (F fun)
 Sets a custom handler for unexpected messages.
 
void set_error_handler (error_handler fun)
 Sets a custom handler for error messages.
 
template<class T >
auto set_error_handler (T fun) -> decltype(fun(std::declval< error &>()))
 Sets a custom handler for error messages.
 
void set_down_handler (down_handler fun)
 Sets a custom handler for down messages.
 
template<class T >
auto set_down_handler (T fun) -> decltype(fun(std::declval< down_msg &>()))
 Sets a custom handler for down messages.
 
void set_node_down_handler (node_down_handler fun)
 Sets a custom handler for node down messages.
 
template<class T >
auto set_node_down_handler (T fun) -> decltype(fun(std::declval< node_down_msg &>()))
 Sets a custom handler for down messages.
 
void set_exit_handler (exit_handler fun)
 Sets a custom handler for error messages.
 
template<class T >
auto set_exit_handler (T fun) -> decltype(fun(std::declval< exit_msg &>()))
 Sets a custom handler for exit messages.
 
- Public Member Functions inherited from caf::local_actor
 local_actor (actor_config &cfg)
 
void on_destroy () override
 Cleans up any remaining state before the destructor is called. More...
 
void setup_metrics ()
 
clock_type::time_point now () const noexcept
 Returns the current time.
 
void request_response_timeout (timespan d, message_id mid)
 Requests a new timeout for mid. More...
 
template<class T , spawn_options Os = no_spawn_options, class... Ts>
infer_handle_from_class_t< T > spawn (Ts &&... xs)
 
template<spawn_options Os = no_spawn_options, class F , class... Ts>
infer_handle_from_fun_t< F > spawn (F fun, Ts &&... xs)
 
template<class T , spawn_options Os = no_spawn_options, class Groups , class... Ts>
actor spawn_in_groups (const Groups &gs, Ts &&... xs)
 
template<class T , spawn_options Os = no_spawn_options, class... Ts>
actor spawn_in_groups (std::initializer_list< group > gs, Ts &&... xs)
 
template<class T , spawn_options Os = no_spawn_options, class... Ts>
actor spawn_in_group (const group &grp, Ts &&... xs)
 
template<spawn_options Os = no_spawn_options, class Groups , class F , class... Ts>
actor spawn_in_groups (const Groups &gs, F fun, Ts &&... xs)
 
template<spawn_options Os = no_spawn_options, class F , class... Ts>
actor spawn_in_groups (std::initializer_list< group > gs, F fun, Ts &&... xs)
 
template<spawn_options Os = no_spawn_options, class F , class... Ts>
actor spawn_in_group (const group &grp, F fun, Ts &&... xs)
 
void send_exit (const actor_addr &whom, error reason)
 Sends an exit message to whom.
 
void send_exit (const strong_actor_ptr &whom, error reason)
 Sends an exit message to whom.
 
template<class ActorHandle >
void send_exit (const ActorHandle &whom, error reason)
 Sends an exit message to whom.
 
execution_unitcontext () const noexcept
 Returns the execution unit currently used by this actor.
 
void context (execution_unit *x) noexcept
 Sets the execution unit for this actor.
 
actor_systemsystem () const noexcept
 Returns the hosting actor system.
 
const actor_system_configconfig () const noexcept
 Returns the config of the hosting actor system.
 
actor_clockclock () const noexcept
 Returns the clock of the actor system.
 
strong_actor_ptrcurrent_sender () noexcept
 Returns a pointer to the sender of the current message. More...
 
message_id current_message_id () noexcept
 Returns the ID of the current message.
 
message_id take_current_message_id () noexcept
 Returns the ID of the current message and marks the ID stored in the current mailbox element as answered. More...
 
void drop_current_message_id () noexcept
 Marks the current message ID as answered.
 
strong_actor_ptr current_next_stage () noexcept
 Returns a pointer to the next stage from the forwarding path of the current message or nullptr if the path is empty. More...
 
strong_actor_ptr take_current_next_stage ()
 Returns a pointer to the next stage from the forwarding path of the current message and removes it from the path. More...
 
const mailbox_element::forwarding_stack & current_forwarding_stack () noexcept
 Returns the forwarding stack from the current mailbox element.
 
mailbox_element::forwarding_stack take_current_forwarding_stack () noexcept
 Moves the forwarding stack from the current mailbox element.
 
mailbox_element * current_mailbox_element () noexcept
 Returns a pointer to the currently processed mailbox element.
 
void monitor (const node_id &node)
 Adds a unidirectional monitor to node. More...
 
template<message_priority P = message_priority::normal, class Handle >
void monitor (const Handle &whom)
 Adds a unidirectional monitor to whom. More...
 
void demonitor (const actor_addr &whom)
 Removes a monitor from whom.
 
void demonitor (const strong_actor_ptr &whom)
 Removes a monitor from whom.
 
void demonitor (const node_id &node)
 Removes a monitor from node.
 
template<class Handle >
void demonitor (const Handle &whom)
 Removes a monitor from whom.
 
virtual void on_exit ()
 Can be overridden to perform cleanup code after an actor finished execution. More...
 
template<class... Ts>
detail::response_promise_t< Ts... > make_response_promise ()
 Creates a typed_response_promise to respond to a request later on. More...
 
response_promise make_response_promise ()
 Creates a response_promise to respond to a request later on.
 
template<class... Ts>
detail::response_promise_t< std::decay_t< Ts >... > response (Ts &&... xs)
 
virtual error save_state (serializer &sink, unsigned int version)
 Serializes the state of this actor to sink. More...
 
virtual error load_state (deserializer &source, unsigned int version)
 Deserializes the state of this actor from source. More...
 
const errorfail_state () const noexcept
 Returns the currently defined fail state. More...
 
- Public Member Functions inherited from caf::monitorable_actor
void attach (attachable_ptr ptr) override
 Attaches ptr to this actor. More...
 
size_t detach (const attachable::token &what) override
 Detaches the first attached object that matches what.
 
void link_to (const actor_addr &x)
 Links this actor to x.
 
template<class ActorHandle >
void link_to (const ActorHandle &x)
 Links this actor to x.
 
void unlink_from (const actor_addr &x)
 Unlinks this actor from x.
 
template<class ActorHandle >
void unlink_from (const ActorHandle &x)
 Links this actor to x.
 
- Public Member Functions inherited from caf::abstract_actor
void * operator new (std::size_t, void *ptr)
 
actor_control_blockctrl () const
 
void enqueue (strong_actor_ptr sender, message_id mid, message msg, execution_unit *host) override
 Enqueues a new message without forwarding stack to the channel.
 
template<class F >
void attach_functor (F f)
 Convenience function that attaches the functor f to this actor. More...
 
actor_addr address () const noexcept
 Returns the logical actor address.
 
virtual std::set< std::string > message_types () const
 Returns the set of accepted messages types as strings or an empty set if this actor is untyped. More...
 
actor_id id () const noexcept
 Returns the ID of this actor.
 
node_id node () const noexcept
 Returns the node this actor is living on.
 
actor_systemhome_system () const noexcept
 Returns the system that created this actor (or proxy).
 
- Public Member Functions inherited from caf::abstract_channel
bool is_abstract_actor () const
 
bool is_abstract_group () const
 
bool is_actor_decorator () const
 

Static Public Member Functions

static void default_error_handler (pointer ptr, error &x)
 
static void default_down_handler (pointer ptr, down_msg &x)
 
static void default_node_down_handler (pointer ptr, node_down_msg &x)
 
static void default_exit_handler (pointer ptr, exit_msg &x)
 

Static Public Attributes

static constexpr size_t urgent_queue_index = 0
 
static constexpr size_t normal_queue_index = 1
 
static constexpr size_t upstream_queue_index = 2
 
static constexpr size_t downstream_queue_index = 3
 
- Static Public Attributes inherited from caf::abstract_channel
static constexpr int is_abstract_actor_flag = 0x01000000
 
static constexpr int is_abstract_group_flag = 0x02000000
 
static constexpr int is_actor_bind_decorator_flag = 0x04000000
 
static constexpr int is_actor_dot_decorator_flag = 0x08000000
 
static constexpr int is_actor_decorator_mask = 0x0C000000
 
static constexpr int is_hidden_flag = 0x10000000
 

Protected Attributes

mailbox_type mailbox_
 Stores incoming messages.
 
detail::behavior_stack bhvr_stack_
 Stores user-defined callbacks for message handling.
 
uint64_t timeout_id_
 Identifies the timeout messages we are currently waiting for.
 
std::forward_list< pending_responseawaited_responses_
 Stores callbacks for awaited responses.
 
detail::unordered_flat_map< message_id, behaviormultiplexed_responses_
 Stores callbacks for multiplexed responses.
 
default_handler default_handler_
 Customization point for setting a default message callback.
 
error_handler error_handler_
 Customization point for setting a default error callback.
 
down_handler down_handler_
 Customization point for setting a default down_msg callback.
 
node_down_handler node_down_handler_
 Customization point for setting a default down_msg callback.
 
exit_handler exit_handler_
 Customization point for setting a default exit_msg callback.
 
stream_manager_map stream_managers_
 Stores stream managers for established streams.
 
stream_manager_map pending_stream_managers_
 Stores stream managers for pending streams, i.e., streams that have not yet received an ACK. More...
 
timespan max_batch_delay_
 Stores how long the actor should try to accumulate more items in order to send a full stream batch. More...
 
size_t max_batch_delay_ticks_
 Number of ticks per batch delay.
 
detail::private_thread * private_thread_
 Pointer to a private thread object associated with a detached actor.
 
inbound_stream_metrics_map inbound_stream_metrics_
 Caches metric objects for inbound stream traffic.
 
outbound_stream_metrics_map outbound_stream_metrics_
 Caches metric objects for outbound stream traffic.
 
- Protected Attributes inherited from caf::local_actor
execution_unitcontext_
 
mailbox_element * current_element_
 
message_id last_request_id_
 
detail::unique_function< behavior(local_actor *)> initial_behavior_fac_
 Factory function for returning initial behavior in function-based actors.
 
metrics_t metrics_
 
- Protected Attributes inherited from caf::monitorable_actor
error fail_state_
 
std::condition_variable cv_
 
attachable_ptr attachables_head_
 
- Protected Attributes inherited from caf::abstract_actor
std::mutex mtx_
 

Related Functions

(Note that these are not member functions.)

CAF_CORE_EXPORT skippable_result reflect (scheduled_actor *, message &)
 
CAF_CORE_EXPORT skippable_result reflect_and_quit (scheduled_actor *, message &)
 
CAF_CORE_EXPORT skippable_result print_and_drop (scheduled_actor *, message &)
 
CAF_CORE_EXPORT skippable_result drop (scheduled_actor *, message &)
 

Additional Inherited Members

- Protected Member Functions inherited from caf::monitorable_actor
virtual void on_cleanup (const error &reason)
 Allows subclasses to add additional cleanup code to the critical section in cleanup. More...
 
void bounce (mailbox_element_ptr &what)
 Sends a response message if what is a request.
 
void bounce (mailbox_element_ptr &what, const error &err)
 Sends a response message if what is a request.
 
 monitorable_actor (actor_config &cfg)
 Creates a new actor instance.
 
void attach_impl (attachable_ptr &ptr)
 
size_t detach_impl (const attachable::token &what, bool stop_on_hit=false, bool dry_run=false)
 
bool handle_system_message (mailbox_element &x, execution_unit *ctx, bool trap_exit)
 
template<class F >
bool handle_system_message (mailbox_element &x, execution_unit *context, bool trap_exit, F &down_msg_handler)
 
- Protected Member Functions inherited from caf::abstract_actor
 abstract_actor (actor_config &cfg)
 Creates a new actor instance.
 
- Protected Member Functions inherited from caf::abstract_channel
int flags () const
 
void flags (int new_value)
 

Detailed Description

A cooperatively scheduled, event-based actor implementation.

Member Enumeration Documentation

◆ activation_result

Result of one-shot activations.

Enumerator
success 

Actor is still alive and handled the activation message.

terminated 

Actor handled the activation message and terminated.

skipped 

Actor skipped the activation message.

dropped 

Actor dropped the activation message.

◆ message_category

Categorizes incoming messages.

Enumerator
ordinary 

Triggers the current behavior.

internal 

Triggers handlers for system messages such as exit_msg or down_msg.

skipped 

Delays processing.

Member Function Documentation

◆ enqueue()

void caf::scheduled_actor::enqueue ( mailbox_element_ptr  what,
execution_unit host 
)
overridevirtual

Enqueues a new message wrapped in a mailbox_element to the actor.

This enqueue variant allows to define forwarding chains.

Implements caf::abstract_actor.

Reimplemented in caf::io::abstract_broker.

◆ name()

const char* caf::scheduled_actor::name ( ) const
overridevirtual

Returns an implementation-dependent name for logging purposes, which is only valid as long as the actor is running.

The default implementation simply returns "actor".

Reimplemented from caf::local_actor.

Reimplemented in caf::io::abstract_broker, caf::io::basp_broker, and caf::detail::prometheus_broker.

◆ proxy_registry_ptr()

virtual proxy_registry* caf::scheduled_actor::proxy_registry_ptr ( )
virtual

Returns a factory for proxies created and managed by this actor or nullptr.

Reimplemented in caf::io::basp_broker.

◆ quit()

void caf::scheduled_actor::quit ( error  x = error{})

Finishes execution of this actor after any currently running message handler is done.

This member function clears the behavior stack of the running actor and invokes on_exit(). The actors does not finish execution if the implementation of on_exit() sets a new behavior. When setting a new behavior in on_exit(), one has to make sure to not produce an infinite recursion.

If on_exit() did not set a new behavior, the actor sends an exit message to all of its linked actors, sets its state to exited and finishes execution.

◆ resume()

resume_result caf::scheduled_actor::resume ( execution_unit ,
size_t  max_throughput 
)
overridevirtual

Resume any pending computation until it is either finished or needs to be re-scheduled later.

Implements caf::resumable.

Reimplemented in caf::io::abstract_broker, and caf::io::basp_broker.

◆ subtype()

subtype_t caf::scheduled_actor::subtype ( ) const
overridevirtual

Returns a subtype hint for this object.

This allows an execution unit to limit processing to a specific set of resumables and delegate other subtypes to dedicated workers.

Reimplemented from caf::resumable.

Reimplemented in caf::io::abstract_broker.

Friends And Related Function Documentation

◆ drop()

CAF_CORE_EXPORT skippable_result drop ( scheduled_actor ,
message  
)
related

Default handler function that simply drops messages.

◆ print_and_drop()

CAF_CORE_EXPORT skippable_result print_and_drop ( scheduled_actor ,
message  
)
related

Default handler function that prints messages message via aout and drops them afterwards.

◆ reflect()

CAF_CORE_EXPORT skippable_result reflect ( scheduled_actor ,
message  
)
related

Default handler function that sends the message back to the sender.

◆ reflect_and_quit()

CAF_CORE_EXPORT skippable_result reflect_and_quit ( scheduled_actor ,
message  
)
related

Default handler function that sends the message back to the sender and then quits.

Member Data Documentation

◆ max_batch_delay_

timespan caf::scheduled_actor::max_batch_delay_
protected

Stores how long the actor should try to accumulate more items in order to send a full stream batch.

◆ pending_stream_managers_

stream_manager_map caf::scheduled_actor::pending_stream_managers_
protected

Stores stream managers for pending streams, i.e., streams that have not yet received an ACK.


The documentation for this class was generated from the following file: