actor-framework
Classes | Public Types | Public Member Functions | Static Public Member Functions | Protected Attributes | Friends | Related Functions | List of all members
caf::scheduled_actor Class Reference

A cooperatively scheduled, event-based actor implementation. More...

#include <scheduled_actor.hpp>

Inheritance diagram for caf::scheduled_actor:
Inheritance graph
[legend]
Collaboration diagram for caf::scheduled_actor:
Collaboration graph
[legend]

Classes

class  batch_forwarder
 
struct  stream_source_state
 

Public Types

enum  message_category { message_category::ordinary, message_category::internal, message_category::skipped }
 Categorizes incoming messages. More...
 
enum  activation_result { activation_result::success, activation_result::terminated, activation_result::skipped, activation_result::dropped }
 Result of one-shot activations. More...
 
using super = abstract_scheduled_actor
 Base type.
 
using batch_op_ptr = intrusive_ptr< flow::op::base< async::batch > >
 
using pending_response = std::tuple< const message_id, behavior, disposable >
 The message ID of an outstanding response with its callback and timeout.
 
using pointer = scheduled_actor *
 A pointer to a scheduled actor.
 
using default_handler = std::function< skippable_result(pointer, message &)>
 Function object for handling unmatched messages.
 
using error_handler = std::function< void(pointer, error &)>
 Function object for handling error messages.
 
using down_handler = std::function< void(pointer, down_msg &)>
 Function object for handling down messages.
 
using node_down_handler = std::function< void(pointer, node_down_msg &)>
 Function object for handling node down messages.
 
using exit_handler = std::function< void(pointer, exit_msg &)>
 Function object for handling exit messages.
 
using idle_handler = std::function< void()>
 Function object for handling timeouts.
 
using batch_publisher = flow::multicaster< async::batch >
 
using batch_forwarder_ptr = intrusive_ptr< batch_forwarder >
 
- Public Types inherited from caf::abstract_scheduled_actor
using super = local_actor
 
- Public Types inherited from caf::local_actor
using clock_type = std::chrono::steady_clock
 Defines a monotonic clock suitable for measuring intervals.
 
- Public Types inherited from caf::resumable
enum  resume_result { resume_later, awaiting_message, done, shutdown_execution_unit }
 Denotes the state in which a resumable returned from its last call to resume. More...
 
enum  subtype_t { unspecified, scheduled_actor, io_actor, function_object }
 Denotes common subtypes of resumable. More...
 
- Public Types inherited from caf::flow::coordinator
using steady_time_point = std::chrono::steady_clock::time_point
 A time point of the monotonic clock.
 

Public Member Functions

 scheduled_actor (actor_config &cfg)
 
 scheduled_actor (scheduled_actor &&)=delete
 
 scheduled_actor (const scheduled_actor &)=delete
 
scheduled_actoroperator= (scheduled_actor &&)=delete
 
scheduled_actoroperator= (const scheduled_actor &)=delete
 
bool enqueue (mailbox_element_ptr ptr, scheduler *sched) override
 Enqueues a new message wrapped in a mailbox_element to the actor. More...
 
mailbox_elementpeek_at_next_mailbox_element () override
 Called by the testing DSL to peek at the next element in the mailbox. More...
 
const char * name () const override
 Returns an implementation-dependent name for logging purposes, which is only valid as long as the actor is running. More...
 
void launch (scheduler *sched, bool lazy, bool hide) override
 
void on_cleanup (const error &reason) override
 Called from cleanup to perform extra cleanup actions for this actor.
 
subtype_t subtype () const noexcept override
 Returns a subtype hint for this object. More...
 
void ref_resumable () const noexcept final
 Add a strong reference count to this object.
 
void deref_resumable () const noexcept final
 Remove a strong reference count from this object.
 
resume_result resume (scheduler *, size_t) override
 Resume any pending computation until it is either finished or needs to be re-scheduled later. More...
 
virtual proxy_registryproxy_registry_ptr ()
 Returns a factory for proxies created and managed by this actor or nullptr. More...
 
void quit (error x=error{})
 Finishes execution of this actor after any currently running message handler is done. More...
 
abstract_mailboxmailbox () noexcept
 Returns the queue for storing incoming messages.
 
bool initialized () const noexcept
 Checks whether this actor is fully initialized.
 
bool inactive () const noexcept
 Checks whether this actor is currently inactive, i.e., not ready to run.
 
void set_default_handler (default_handler fun)
 Sets a custom handler for unexpected messages.
 
template<class F >
std::enable_if_t< std::is_invocable_r_v< skippable_result, F, message & > > set_default_handler (F fun)
 Sets a custom handler for unexpected messages.
 
void set_error_handler (error_handler fun)
 Sets a custom handler for error messages.
 
template<class F >
std::enable_if_t< std::is_invocable_v< F, error & > > set_error_handler (F fun)
 Sets a custom handler for error messages.
 
void set_down_handler (down_handler fun)
 Sets a custom handler for down messages.
 
template<class F >
std::enable_if_t< std::is_invocable_v< F, down_msg & > > set_down_handler (F fun)
 Sets a custom handler for down messages.
 
void set_node_down_handler (node_down_handler fun)
 Sets a custom handler for node down messages.
 
template<class F >
std::enable_if_t< std::is_invocable_v< F, node_down_msg & > > set_node_down_handler (F fun)
 Sets a custom handler for down messages.
 
void set_exit_handler (exit_handler fun)
 Sets a custom handler for error messages.
 
template<class F >
std::enable_if_t< std::is_invocable_v< F, exit_msg & > > set_exit_handler (F fun)
 Sets a custom handler for exit messages.
 
template<class RefType , class RepeatType >
void set_idle_handler (timespan delay, RefType, RepeatType, idle_handler fun)
 Sets a custom handler for timeouts that trigger after not receiving a message for a certain amount of time. More...
 
- Public Member Functions inherited from caf::abstract_scheduled_actor
virtual void add_awaited_response_handler (message_id response_id, behavior bhvr, disposable pending_timeout={})=0
 Adds a callback for an awaited response.
 
virtual void add_multiplexed_response_handler (message_id response_id, behavior bhvr, disposable pending_timeout={})=0
 Adds a callback for a multiplexed response.
 
virtual void call_error_handler (error &what)=0
 Calls the default error handler.
 
virtual void run_actions ()=0
 Runs all pending actions.
 
template<class... Ts>
auto response_to_flow_cell (message_id response_id, disposable pending_timeout={})
 Lifts a response message into a flow cell in order to allow the actor to turn a response into an observable or single. More...
 
- Public Member Functions inherited from caf::local_actor
 local_actor (actor_config &cfg)
 
void setup_metrics ()
 
clock_type::time_point now () const noexcept
 Returns the current time.
 
disposable request_response_timeout (timespan d, message_id mid)
 Requests a new timeout for mid. More...
 
template<class... Args>
void println (std::string_view fmt, Args &&... args)
 Adds a new line to stdout.
 
template<class... Args>
void println (term color, std::string_view fmt, Args &&... args)
 Adds a new line to stdout.
 
template<class T , spawn_options Os = no_spawn_options, class... Ts>
infer_handle_from_class_t< T > spawn (Ts &&... xs)
 
template<spawn_options Options = no_spawn_options, class CustomSpawn , class... Args>
CustomSpawn::handle_type spawn (CustomSpawn, Args &&... args)
 
template<spawn_options Os = no_spawn_options, class F , class... Ts>
infer_handle_from_fun_t< F > spawn (F fun, Ts &&... xs)
 
void send_exit (const actor_addr &receiver, error reason)
 Sends an exit message to receiver.
 
void send_exit (const strong_actor_ptr &receiver, error reason)
 Sends an exit message to receiver.
 
template<class Handle >
void send_exit (const Handle &receiver, error reason)
 Sends an exit message to receiver.
 
template<message_priority Priority = message_priority::normal, class Handle , class T , class... Ts>
void anon_send (const Handle &receiver, T &&arg, Ts &&... args)
 
template<message_priority Priority = message_priority::normal, class Handle , class T , class... Ts>
disposable scheduled_anon_send (const Handle &receiver, actor_clock::time_point timeout, T &&arg, Ts &&... args)
 
template<message_priority Priority = message_priority::normal, class Handle , class T , class... Ts>
disposable delayed_anon_send (const Handle &receiver, actor_clock::duration_type timeout, T &&arg, Ts &&... args)
 
schedulercontext () const noexcept
 Returns the execution unit currently used by this actor.
 
void context (scheduler *x) noexcept
 Sets the execution unit for this actor.
 
actor_systemsystem () const noexcept
 Returns the hosting actor system.
 
const actor_system_configconfig () const noexcept
 Returns the config of the hosting actor system.
 
actor_clockclock () const noexcept
 Returns the clock of the actor system.
 
strong_actor_ptrcurrent_sender () noexcept
 Returns a pointer to the sender of the current message. More...
 
message_id current_message_id () noexcept
 Returns the ID of the current message.
 
message_id take_current_message_id () noexcept
 Returns the ID of the current message and marks the ID stored in the current mailbox element as answered. More...
 
void drop_current_message_id () noexcept
 Marks the current message ID as answered.
 
mailbox_elementcurrent_mailbox_element () noexcept
 Returns a pointer to the currently processed mailbox element.
 
void monitor (const node_id &node)
 Adds a unidirectional monitor to node. More...
 
template<message_priority P = message_priority::normal, class Handle >
void monitor (const Handle &whom)
 Adds a unidirectional monitor to whom. More...
 
template<typename Handle , typename Fn >
disposable monitor (Handle whom, Fn func)
 Adds a unidirectional monitor to whom with custom callback. More...
 
void demonitor (const actor_addr &whom)
 Removes a monitor from whom.
 
void demonitor (const strong_actor_ptr &whom)
 Removes a monitor from whom.
 
void demonitor (const node_id &node)
 Removes a monitor from node.
 
template<class Handle >
void demonitor (const Handle &whom)
 Removes a monitor from whom.
 
virtual void on_exit ()
 Can be overridden to perform cleanup code after an actor finished execution. More...
 
template<class... Ts>
detail::response_promise_t< Ts... > make_response_promise ()
 Creates a typed_response_promise to respond to a request later on. More...
 
response_promise make_response_promise ()
 Creates a response_promise to respond to a request later on.
 
virtual error save_state (serializer &sink, unsigned int version)
 Serializes the state of this actor to sink. More...
 
virtual error load_state (deserializer &source, unsigned int version)
 Deserializes the state of this actor from source. More...
 
const errorfail_state () const noexcept
 Returns the currently defined fail state. More...
 
- Public Member Functions inherited from caf::abstract_actor
 abstract_actor (const abstract_actor &)=delete
 
abstract_actoroperator= (const abstract_actor &)=delete
 
void attach (attachable_ptr ptr)
 Attaches ptr to this actor. More...
 
template<class F >
void attach_functor (F f)
 Convenience function that attaches the functor f to this actor. More...
 
size_t detach (const attachable::token &what)
 Detaches the first attached object that matches what.
 
void link_to (const actor_addr &other)
 Links this actor to other.
 
template<class ActorHandle >
void link_to (const ActorHandle &other)
 Links this actor to other.
 
void unlink_from (const actor_addr &other)
 Unlinks this actor from addr.
 
template<class ActorHandle >
void unlink_from (const ActorHandle &other)
 Links this actor to hdl.
 
virtual std::set< std::string > message_types () const
 Returns the set of accepted messages types as strings or an empty set if this actor is untyped. More...
 
actor_id id () const noexcept
 Returns the ID of this actor.
 
node_id node () const noexcept
 Returns the node this actor is living on.
 
actor_systemhome_system () const noexcept
 Returns the system that created this actor (or proxy).
 
actor_control_blockctrl () const
 Returns the control block for this actor.
 
actor_addr address () const noexcept
 Returns the logical actor address.
 
bool cleanup (error &&reason, scheduler *sched)
 Called by the runtime system to perform cleanup actions for this actor. More...
 
- Public Member Functions inherited from caf::flow::coordinator
observable_builder make_observable ()
 Returns a factory object for new observable objects on this coordinator.
 
template<class Impl , class... Args>
std::enable_if_t< std::is_base_of_v< coordinated, Impl >, intrusive_ptr< Impl > > add_child (std::in_place_type_t< Impl >, Args &&... args)
 Creates a new coordinated object on this coordinator.
 
template<class Impl , class... Args>
std::enable_if_t< std::is_base_of_v< coordinated, Impl >, typename Impl::handle_type > add_child_hdl (std::in_place_type_t< Impl > token, Args &&... args)
 Like add_child, but wraps the result in a handle type. More...
 
virtual void release_later (coordinated_ptr &child)=0
 Resets child and releases the reference count of the coordinated object at the end of the current cycle. More...
 
template<class T >
std::enable_if_t< std::is_base_of_v< coordinated, T > > release_later (intrusive_ptr< T > &child)
 Resets child and releases the reference count of the coordinated object at the end of the current cycle. More...
 
template<class Handle >
std::enable_if< Handle::holds_coordinated > release_later (Handle &hdl)
 Resets hdl and releases the reference count of the coordinated object at the end of the current cycle. More...
 
virtual steady_time_point steady_time ()=0
 Returns the current time on the monotonic clock of this coordinator.
 
virtual void delay (action what)=0
 Delays execution of an action until all pending actions were executed. More...
 
template<class F >
void delay_fn (F &&what)
 Delays execution of an action until all pending actions were executed. More...
 
virtual disposable delay_until (steady_time_point abs_time, action what)=0
 Delays execution of an action with an absolute timeout. More...
 
template<class F >
disposable delay_until_fn (steady_time_point abs_time, F &&what)
 Delays execution of an action until all pending actions were executed. More...
 
disposable delay_for (timespan rel_time, action what)
 Delays execution of an action with a relative timeout. More...
 
template<class F >
disposable delay_for_fn (timespan rel_time, F &&what)
 Delays execution of an action with a relative timeout. More...
 
- Public Member Functions inherited from caf::async::execution_context
virtual void ref_execution_context () const noexcept=0
 Increases the reference count of the execution_context.
 
virtual void deref_execution_context () const noexcept=0
 Decreases the reference count of the execution context and destroys the object if necessary. More...
 
virtual void schedule (action what)=0
 Schedules what to run on the event loop of the execution context. More...
 
template<class F >
void schedule_fn (F &&what)
 Schedules what to run on the event loop of the execution context. More...
 
virtual void watch (disposable what)=0
 Asks the coordinator to keep its event loop running until what becomes disposed since it depends on external events or produces events that are visible to outside observers. More...
 

Static Public Member Functions

static void default_error_handler (pointer ptr, error &x)
 
static void default_down_handler (pointer ptr, down_msg &x)
 
static void default_node_down_handler (pointer ptr, node_down_msg &x)
 
static void default_exit_handler (pointer ptr, exit_msg &x)
 

Protected Attributes

abstract_mailboxmailbox_
 Stores incoming messages.
 
detail::behavior_stack bhvr_stack_
 Stores user-defined callbacks for message handling.
 
std::forward_list< pending_responseawaited_responses_
 Stores callbacks for awaited responses.
 
unordered_flat_map< message_id, std::pair< behavior, disposable > > multiplexed_responses_
 Stores callbacks for multiplexed responses.
 
default_handler default_handler_
 Customization point for setting a default message callback.
 
error_handler error_handler_
 Customization point for setting a default error callback.
 
down_handler down_handler_
 Customization point for setting a default down_msg callback.
 
node_down_handler node_down_handler_
 Customization point for setting a default down_msg callback.
 
exit_handler exit_handler_
 Customization point for setting a default exit_msg callback.
 
detail::private_threadprivate_thread_
 Pointer to a private thread object associated with a detached actor.
 
- Protected Attributes inherited from caf::local_actor
schedulercontext_
 
mailbox_elementcurrent_element_
 
message_id last_request_id_
 
detail::unique_function< behavior(local_actor *)> initial_behavior_fac_
 Factory function for returning initial behavior in function-based actors.
 
metrics_t metrics_
 
- Protected Attributes inherited from caf::abstract_actor
std::atomic< int > flags_
 Holds several state and type flags.
 
std::mutex mtx_
 Guards members that may be subject to concurrent access . More...
 
std::condition_variable cv_
 Allows blocking actors to actively wait for incoming messages.
 
error fail_state_
 Stores the user-defined exit reason if this actor has finished execution.
 
attachable_ptr attachables_head_
 Points to the first attachable in the linked list of attachables (if any).
 

Friends

class detail::batch_forwarder_impl
 
class detail::stream_bridge
 
class detail::stream_bridge_sub
 
template<class , class >
class response_handle
 

Related Functions

(Note that these are not member functions.)

CAF_CORE_EXPORT skippable_result reflect (scheduled_actor *, message &)
 
CAF_CORE_EXPORT skippable_result reflect_and_quit (scheduled_actor *, message &)
 
CAF_CORE_EXPORT skippable_result print_and_drop (scheduled_actor *, message &)
 
CAF_CORE_EXPORT skippable_result drop (scheduled_actor *, message &)
 

Additional Inherited Members

- Protected Member Functions inherited from caf::local_actor
void do_send (abstract_actor *receiver, message_priority priority, message &&msg)
 Sends msg as an asynchronous message to receiver. More...
 
disposable do_scheduled_send (strong_actor_ptr receiver, message_priority priority, actor_clock::time_point timeout, message &&msg)
 Sends msg as an asynchronous message to receiver after the timeout. More...
 
void do_anon_send (abstract_actor *receiver, message_priority priority, message &&msg)
 Sends msg as an asynchronous message to receiver without sender information. More...
 
disposable do_scheduled_anon_send (strong_actor_ptr receiver, message_priority priority, actor_clock::time_point timeout, message &&msg)
 Sends msg as an asynchronous message to receiver after the timeout without sender information. More...
 
- Protected Member Functions inherited from caf::abstract_actor
virtual void on_unreachable ()
 Called on actor if the last strong reference to it expired without a prior call to quit(exit_reason::not_exited). More...
 
int flags () const
 
void flags (int new_value)
 
bool is_terminated () const noexcept
 Checks whether this actor has terminated.
 
 abstract_actor (actor_config &cfg)
 
void attach_impl (attachable_ptr &ptr)
 
size_t detach_impl (const attachable::token &what, bool stop_on_hit=false, bool dry_run=false)
 
void add_link (abstract_actor *other)
 Causes the actor to establish a link to other.
 
void remove_link (abstract_actor *other)
 Causes the actor to remove any established link to other.
 
virtual bool add_backlink (abstract_actor *other)
 Adds an entry to other to the link table of this actor. More...
 
virtual bool remove_backlink (abstract_actor *other)
 Removes an entry to other from the link table of this actor. More...
 

Detailed Description

A cooperatively scheduled, event-based actor implementation.

Member Enumeration Documentation

◆ activation_result

Result of one-shot activations.

Enumerator
success 

Actor is still alive and handled the activation message.

terminated 

Actor handled the activation message and terminated.

skipped 

Actor skipped the activation message.

dropped 

Actor dropped the activation message.

◆ message_category

Categorizes incoming messages.

Enumerator
ordinary 

Triggers the current behavior.

internal 

Triggers handlers for system messages such as exit_msg or down_msg.

skipped 

Delays processing.

Member Function Documentation

◆ enqueue()

bool caf::scheduled_actor::enqueue ( mailbox_element_ptr  what,
scheduler sched 
)
overridevirtual

Enqueues a new message wrapped in a mailbox_element to the actor.

This enqueue variant allows to define forwarding chains.

Returns
true if the message has added to the mailbox, false otherwise. In the latter case, the actor terminated and the message has been dropped. Once this function returns false, it returns false for all future invocations.
Note
The returned value is purely informational and may be used to discard actor handles early. Messages may still get dropped later even if this function returns true. In particular when dealing with remote actors.

Implements caf::abstract_actor.

Reimplemented in caf::io::abstract_broker.

◆ name()

const char * caf::scheduled_actor::name ( ) const
overridevirtual

Returns an implementation-dependent name for logging purposes, which is only valid as long as the actor is running.

The default implementation simply returns "actor".

Reimplemented from caf::local_actor.

Reimplemented in caf::io::abstract_broker, caf::io::basp_broker, and caf::detail::prometheus_broker.

◆ peek_at_next_mailbox_element()

mailbox_element * caf::scheduled_actor::peek_at_next_mailbox_element ( )
overridevirtual

Called by the testing DSL to peek at the next element in the mailbox.

Do not call this function in production code! The default implementation always returns nullptr.

Returns
A pointer to the next mailbox element or nullptr if the mailbox is empty or the actor does not have a mailbox.

Reimplemented from caf::abstract_actor.

◆ proxy_registry_ptr()

proxy_registry * caf::scheduled_actor::proxy_registry_ptr ( )
virtual

Returns a factory for proxies created and managed by this actor or nullptr.

Reimplemented in caf::io::basp_broker.

◆ quit()

void caf::scheduled_actor::quit ( error  x = error{})

Finishes execution of this actor after any currently running message handler is done.

This member function clears the behavior stack of the running actor and invokes on_exit(). The actors does not finish execution if the implementation of on_exit() sets a new behavior. When setting a new behavior in on_exit(), one has to make sure to not produce an infinite recursion.

If on_exit() did not set a new behavior, the actor sends an exit message to all of its linked actors, sets its state to exited and finishes execution.

◆ resume()

resumable::resume_result caf::scheduled_actor::resume ( scheduler ,
size_t  max_throughput 
)
overridevirtual

Resume any pending computation until it is either finished or needs to be re-scheduled later.

Implements caf::resumable.

Reimplemented in caf::io::abstract_broker, and caf::io::basp_broker.

◆ set_idle_handler()

template<class RefType , class RepeatType >
void caf::scheduled_actor::set_idle_handler ( timespan  delay,
RefType  ,
RepeatType  ,
idle_handler  fun 
)
inline

Sets a custom handler for timeouts that trigger after not receiving a message for a certain amount of time.

◆ subtype()

resumable::subtype_t caf::scheduled_actor::subtype ( ) const
overridevirtualnoexcept

Returns a subtype hint for this object.

This allows an execution unit to limit processing to a specific set of resumables and delegate other subtypes to dedicated workers.

Reimplemented from caf::resumable.

Reimplemented in caf::io::abstract_broker.

Friends And Related Function Documentation

◆ drop()

CAF_CORE_EXPORT skippable_result drop ( scheduled_actor ,
message  
)
related

Default handler function that simply drops messages.

◆ print_and_drop()

CAF_CORE_EXPORT skippable_result print_and_drop ( scheduled_actor ,
message  
)
related

Default handler function that prints messages message via aout and drops them afterwards.

◆ reflect()

CAF_CORE_EXPORT skippable_result reflect ( scheduled_actor ,
message  
)
related

Default handler function that sends the message back to the sender.

◆ reflect_and_quit()

CAF_CORE_EXPORT skippable_result reflect_and_quit ( scheduled_actor ,
message  
)
related

Default handler function that sends the message back to the sender and then quits.

Member Data Documentation

◆ default_mailbox_

detail::default_mailbox caf::scheduled_actor::default_mailbox_

The default mailbox instance that we use if the user does not configure a mailbox via the ::actor_config.


The documentation for this class was generated from the following files: