21 #ifndef __TBB_flow_graph_H 22 #define __TBB_flow_graph_H 24 #include "tbb_stddef.h" 26 #include "spin_mutex.h" 27 #include "null_mutex.h" 28 #include "spin_rw_mutex.h" 29 #include "null_rw_mutex.h" 31 #include "cache_aligned_allocator.h" 32 #include "tbb_exception.h" 33 #include "internal/_template_helpers.h" 34 #include "internal/_aggregator_impl.h" 35 #include "tbb_profiling.h" 37 #if __TBB_PREVIEW_ASYNC_NODE 38 #include "task_arena.h" 41 #if __TBB_PREVIEW_ASYNC_MSG 46 #if TBB_DEPRECATED_FLOW_ENQUEUE 47 #define FLOW_SPAWN(a) tbb::task::enqueue((a)) 49 #define FLOW_SPAWN(a) tbb::task::spawn((a)) 53 #if __TBB_CPP11_TUPLE_PRESENT 58 using std::tuple_size;
59 using std::tuple_element;
64 #include "compat/tuple" 86 namespace interface8 {
109 static tbb::task *
const SUCCESSFULLY_ENQUEUED = (task *)-1;
113 rf_reset_protocol = 0,
114 rf_reset_bodies = 1<<0,
115 rf_clear_edges = 1<<1
118 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 123 class edge_container {
126 typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
128 void add_edge( C &s) {
129 built_edges.push_back( &s );
132 void delete_edge( C &s) {
133 for (
typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i ) {
135 (void)built_edges.erase(i);
141 void copy_edges( edge_list_type &v) {
145 size_t edge_count() {
146 return (
size_t)(built_edges.size());
155 template<
typename S >
void sender_extract( S &s );
156 template<
typename R >
void receiver_extract( R &r );
159 edge_list_type built_edges;
164 #if __TBB_PREVIEW_ASYNC_MSG 166 #include "internal/_flow_graph_async_msg_impl.h" 170 class untyped_receiver;
172 class untyped_sender {
177 typedef untyped_receiver successor_type;
179 virtual ~untyped_sender() {}
186 virtual bool register_successor( successor_type &r ) = 0;
189 virtual bool remove_successor( successor_type &r ) = 0;
192 virtual bool try_release( ) {
return false; }
195 virtual bool try_consume( ) {
return false; }
197 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 198 typedef internal::edge_container<successor_type> built_successors_type;
200 typedef built_successors_type::edge_list_type successor_list_type;
201 virtual built_successors_type &built_successors() = 0;
202 virtual void internal_add_built_successor( successor_type & ) = 0;
203 virtual void internal_delete_built_successor( successor_type & ) = 0;
204 virtual void copy_successors( successor_list_type &) = 0;
205 virtual size_t successor_count() = 0;
209 template<
typename X >
210 bool try_get( X &t ) {
215 template<
typename X >
216 bool try_reserve( X &t ) {
220 virtual bool try_get_wrapper(
void* p,
bool is_async ) = 0;
221 virtual bool try_reserve_wrapper(
void* p,
bool is_async ) = 0;
224 class untyped_receiver {
232 #if __TBB_PREVIEW_OPENCL_NODE 233 template<
typename,
typename >
friend class proxy_dependency_receiver;
237 typedef untyped_sender predecessor_type;
240 virtual ~untyped_receiver() {}
244 bool try_put(
const X& t) {
245 task *res = try_put_task(t);
246 if (!res)
return false;
247 if (res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
256 virtual bool register_predecessor( predecessor_type & ) {
return false; }
259 virtual bool remove_predecessor( predecessor_type & ) {
return false; }
261 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 262 typedef internal::edge_container<predecessor_type> built_predecessors_type;
263 typedef built_predecessors_type::edge_list_type predecessor_list_type;
264 virtual built_predecessors_type &built_predecessors() = 0;
265 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
266 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
267 virtual void copy_predecessors( predecessor_list_type & ) = 0;
268 virtual size_t predecessor_count() = 0;
272 task *try_put_task(
const X& t) {
276 virtual task* try_put_task_wrapper(
const void* p,
bool is_async ) = 0;
281 virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
283 virtual bool is_continue_receiver() {
return false; }
289 template<
typename T >
290 class sender :
public internal::untyped_sender {
293 typedef T output_type;
295 typedef typename internal::async_helpers<T>::filtered_type filtered_type;
298 virtual bool try_get( T & ) {
return false; }
301 virtual bool try_reserve( T & ) {
return false; }
304 virtual bool try_get_wrapper(
void* p,
bool is_async ) {
310 __TBB_ASSERT(
false,
"async_msg interface does not support 'pull' protocol in try_get()");
314 virtual bool try_reserve_wrapper(
void* p,
bool is_async ) {
320 __TBB_ASSERT(
false,
"async_msg interface does not support 'pull' protocol in try_reserve()");
326 template<
typename T >
327 class receiver :
public internal::untyped_receiver {
332 typedef T input_type;
334 typedef typename internal::async_helpers<T>::filtered_type filtered_type;
337 bool try_put(
const typename internal::async_helpers<T>::filtered_type& t ) {
338 return internal::untyped_receiver::try_put(t);
342 return internal::untyped_receiver::try_put(t);
346 virtual task* try_put_task_wrapper(
const void *p,
bool is_async ) {
351 virtual task *try_put_task(
const T& t) = 0;
355 #else // __TBB_PREVIEW_ASYNC_MSG 358 template<
typename T >
372 virtual bool register_successor( successor_type &r ) = 0;
375 virtual bool remove_successor( successor_type &r ) = 0;
389 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 390 typedef typename internal::edge_container<successor_type> built_successors_type;
392 typedef typename built_successors_type::edge_list_type successor_list_type;
393 virtual built_successors_type &built_successors() = 0;
394 virtual void internal_add_built_successor( successor_type & ) = 0;
395 virtual void internal_delete_built_successor( successor_type & ) = 0;
396 virtual void copy_successors( successor_list_type &) = 0;
397 virtual size_t successor_count() = 0;
402 template<
typename T >
416 task *res = try_put_task(t);
417 if (!res)
return false;
418 if (res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
427 virtual task *try_put_task(
const T& t) = 0;
437 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 438 typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
439 typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
440 virtual built_predecessors_type &built_predecessors() = 0;
441 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
442 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
443 virtual void copy_predecessors( predecessor_list_type & ) = 0;
444 virtual size_t predecessor_count() = 0;
450 virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
453 virtual bool is_continue_receiver() {
return false; }
455 #if __TBB_PREVIEW_OPENCL_NODE 456 template<
typename,
typename >
friend class proxy_dependency_receiver;
460 #endif // __TBB_PREVIEW_ASYNC_MSG 463 static inline tbb::task *combine_tasks( tbb::task * left, tbb::task * right) {
465 if(right == NULL)
return left;
467 if(left == NULL)
return right;
468 if(left == SUCCESSFULLY_ENQUEUED)
return right;
470 if(right != SUCCESSFULLY_ENQUEUED) {
491 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
492 my_current_count = 0;
497 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
498 my_current_count = 0;
507 ++my_predecessor_count;
517 --my_predecessor_count;
521 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 522 typedef internal::edge_container<predecessor_type> built_predecessors_type;
523 typedef built_predecessors_type::edge_list_type predecessor_list_type;
524 built_predecessors_type &built_predecessors() {
return my_built_predecessors; }
526 void internal_add_built_predecessor( predecessor_type &s) {
528 my_built_predecessors.add_edge( s );
531 void internal_delete_built_predecessor( predecessor_type &s) {
533 my_built_predecessors.delete_edge(s);
536 void copy_predecessors( predecessor_list_type &v) {
538 my_built_predecessors.copy_edges(v);
541 size_t predecessor_count() {
543 return my_built_predecessors.edge_count();
553 task *try_put_task(
const input_type & ) {
556 if ( ++my_current_count < my_predecessor_count )
557 return SUCCESSFULLY_ENQUEUED;
559 my_current_count = 0;
561 task * res = execute();
562 return res? res : SUCCESSFULLY_ENQUEUED;
565 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 568 built_predecessors_type my_built_predecessors;
571 int my_predecessor_count;
572 int my_current_count;
573 int my_initial_predecessor_count;
578 void reset_receiver( reset_flags f ) {
579 my_current_count = 0;
580 if (f & rf_clear_edges) {
581 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 582 my_built_predecessors.clear();
584 my_predecessor_count = my_initial_predecessor_count;
591 virtual task * execute() = 0;
593 bool is_continue_receiver() {
return true; }
598 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 599 template <
typename K,
typename T>
600 K key_from_message(
const T &t ) {
611 #include "internal/_flow_graph_trace_impl.h" 612 #include "internal/_tbb_hash_compare_impl.h" 616 namespace interface8 {
618 #include "internal/_flow_graph_impl.h" 619 #include "internal/_flow_graph_types_impl.h" 625 template <
typename GraphContainerType,
typename GraphNodeType>
626 class graph_iterator {
628 friend class graph_node;
630 typedef size_t size_type;
631 typedef GraphNodeType value_type;
632 typedef GraphNodeType* pointer;
633 typedef GraphNodeType& reference;
634 typedef const GraphNodeType& const_reference;
635 typedef std::forward_iterator_tag iterator_category;
638 graph_iterator() : my_graph(NULL), current_node(NULL) {}
641 graph_iterator(
const graph_iterator& other) :
642 my_graph(other.my_graph), current_node(other.current_node)
646 graph_iterator& operator=(
const graph_iterator& other) {
647 if (
this != &other) {
648 my_graph = other.my_graph;
649 current_node = other.current_node;
655 reference operator*()
const;
658 pointer operator->()
const;
661 bool operator==(
const graph_iterator& other)
const {
662 return ((my_graph == other.my_graph) && (current_node == other.current_node));
666 bool operator!=(
const graph_iterator& other)
const {
return !(operator==(other)); }
669 graph_iterator& operator++() {
675 graph_iterator operator++(
int) {
676 graph_iterator result = *
this;
683 GraphContainerType *my_graph;
685 pointer current_node;
688 graph_iterator(GraphContainerType *g,
bool begin);
689 void internal_forward();
694 class graph : tbb::internal::no_copy {
695 friend class graph_node;
697 template<
typename Body >
698 class run_task :
public task {
700 run_task( Body& body ) : my_body(body) {}
709 template<
typename Receiver,
typename Body >
712 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
714 task *res = my_receiver.try_put_task( my_body() );
715 if (res == SUCCESSFULLY_ENQUEUED) res = NULL;
719 Receiver &my_receiver;
722 typedef std::list<task *> task_list_type;
724 #if __TBB_PREVIEW_ASYNC_NODE 726 task* graph_root_task;
728 wait_functor( task* t ) : graph_root_task(t) {}
729 void operator()()
const { graph_root_task->wait_for_all(); }
732 void prepare_task_arena(
bool reinit =
false ) {
734 __TBB_ASSERT( my_task_arena, NULL );
735 my_task_arena->terminate();
740 if (!my_task_arena->is_active())
741 my_task_arena->initialize();
742 __TBB_ASSERT(my_task_arena->is_active(), NULL);
748 graph() : my_nodes(NULL), my_nodes_last(NULL) {
749 #if __TBB_PREVIEW_ASYNC_NODE 750 prepare_task_arena();
754 caught_exception =
false;
755 my_context =
new task_group_context();
756 my_root_task = (
new ( task::allocate_root(*my_context) ) empty_task );
757 my_root_task->set_ref_count(1);
758 tbb::internal::fgt_graph(
this );
763 explicit graph(task_group_context& use_this_context) :
764 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL) {
765 #if __TBB_PREVIEW_ASYNC_NODE 766 prepare_task_arena();
769 my_root_task = (
new ( task::allocate_root(*my_context) ) empty_task );
770 my_root_task->set_ref_count(1);
771 tbb::internal::fgt_graph(
this );
779 my_root_task->set_ref_count(0);
780 task::destroy( *my_root_task );
781 if (own_context)
delete my_context;
782 #if __TBB_PREVIEW_ASYNC_NODE 783 delete my_task_arena;
787 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 788 void set_name(
const char *name ) {
789 tbb::internal::fgt_graph_desc(
this, name );
796 void increment_wait_count() {
798 my_root_task->increment_ref_count();
804 void decrement_wait_count() {
806 my_root_task->decrement_ref_count();
812 template<
typename Receiver,
typename Body >
813 void run( Receiver &r, Body body ) {
815 FLOW_SPAWN( (*
new ( task::allocate_additional_child_of( *root_task() ) )
823 template<
typename Body >
824 void run( Body body ) {
826 FLOW_SPAWN( *
new ( task::allocate_additional_child_of( *root_task() ) ) run_task< Body >( body ) );
832 void wait_for_all() {
834 caught_exception =
false;
836 #if TBB_USE_EXCEPTIONS 839 #if __TBB_PREVIEW_ASYNC_NODE 840 my_task_arena->execute(wait_functor(my_root_task));
842 my_root_task->wait_for_all();
844 cancelled = my_context->is_group_execution_cancelled();
845 #if TBB_USE_EXCEPTIONS 848 my_root_task->set_ref_count(1);
850 caught_exception =
true;
858 if ( !(my_context->traits() & task_group_context::concurrent_wait) ) {
860 my_root_task->set_ref_count(1);
870 void set_active(
bool a =
true) {
878 void add_task_to_reset_list(task *tp) {
879 my_reset_task_list.push_back(tp);
883 template<
typename C,
typename N>
884 friend class graph_iterator;
887 typedef graph_iterator<graph,graph_node> iterator;
888 typedef graph_iterator<const graph,const graph_node> const_iterator;
892 iterator begin() {
return iterator(
this,
true); }
894 iterator end() {
return iterator(
this,
false); }
896 const_iterator begin()
const {
return const_iterator(
this,
true); }
898 const_iterator end()
const {
return const_iterator(
this,
false); }
900 const_iterator cbegin()
const {
return const_iterator(
this,
true); }
902 const_iterator cend()
const {
return const_iterator(
this,
false); }
905 bool is_cancelled() {
return cancelled; }
906 bool exception_thrown() {
return caught_exception; }
909 void reset(reset_flags f = rf_reset_protocol);
913 task_group_context *my_context;
916 bool caught_exception;
918 task_list_type my_reset_task_list;
920 graph_node *my_nodes, *my_nodes_last;
923 void register_node(graph_node *n);
924 void remove_node(graph_node *n);
926 #if __TBB_PREVIEW_ASYNC_NODE 927 template <
typename Input,
typename Output,
typename Policy,
typename Allocator >
928 friend class async_node;
933 template <
typename C,
typename N>
934 graph_iterator<C,N>::graph_iterator(C *g,
bool begin) : my_graph(g), current_node(NULL)
936 if (begin) current_node = my_graph->my_nodes;
940 template <
typename C,
typename N>
941 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*()
const {
942 __TBB_ASSERT(current_node,
"graph_iterator at end");
943 return *operator->();
946 template <
typename C,
typename N>
947 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->()
const {
952 template <
typename C,
typename N>
953 void graph_iterator<C,N>::internal_forward() {
954 if (current_node) current_node = current_node->next;
958 class graph_node : tbb::internal::no_copy {
960 template<
typename C,
typename N>
961 friend class graph_iterator;
964 graph_node *next, *prev;
966 graph_node(graph& g) : my_graph(g) {
967 my_graph.register_node(
this);
969 virtual ~graph_node() {
970 my_graph.remove_node(
this);
973 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 974 virtual void set_name(
const char *name ) = 0;
977 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 978 virtual void extract( ) = 0;
983 virtual void reset_node(reset_flags f=rf_reset_protocol) = 0;
986 inline void graph::register_node(graph_node *n) {
990 n->prev = my_nodes_last;
991 if (my_nodes_last) my_nodes_last->next = n;
993 if (!my_nodes) my_nodes = n;
997 inline void graph::remove_node(graph_node *n) {
1000 __TBB_ASSERT(my_nodes && my_nodes_last,
"graph::remove_node: Error: no registered nodes");
1001 if (n->prev) n->prev->next = n->next;
1002 if (n->next) n->next->prev = n->prev;
1003 if (my_nodes_last == n) my_nodes_last = n->prev;
1004 if (my_nodes == n) my_nodes = n->next;
1006 n->prev = n->next = NULL;
1009 inline void graph::reset( reset_flags f ) {
1012 if(my_context) my_context->reset();
1014 caught_exception =
false;
1016 for(iterator ii = begin(); ii != end(); ++ii) {
1017 graph_node *my_p = &(*ii);
1018 my_p->reset_node(f);
1020 #if __TBB_PREVIEW_ASYNC_NODE 1023 prepare_task_arena(
true );
1027 for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
1028 FLOW_SPAWN(*(*rti));
1030 my_reset_task_list.clear();
1034 #include "internal/_flow_graph_node_impl.h" 1037 template <
typename Output >
1038 class source_node :
public graph_node,
public sender< Output > {
1040 using graph_node::my_graph;
1043 typedef Output output_type;
1051 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1057 template<
typename Body >
1058 source_node( graph &g, Body body,
bool is_active =
true )
1059 : graph_node(g), my_active(is_active), init_my_active(is_active),
1062 my_reserved(
false), my_has_cached_item(
false)
1064 my_successors.set_owner(
this);
1065 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1070 source_node(
const source_node& src ) :
1072 my_active(src.init_my_active),
1073 init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
1074 my_reserved(
false), my_has_cached_item(
false)
1076 my_successors.set_owner(
this);
1077 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1082 ~source_node() {
delete my_body;
delete my_init_body; }
1084 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1085 void set_name(
const char *name ) {
1086 tbb::internal::fgt_node_desc(
this, name );
1091 bool register_successor( successor_type &r ) {
1093 my_successors.register_successor(r);
1100 bool remove_successor( successor_type &r ) {
1102 my_successors.remove_successor(r);
1106 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1108 built_successors_type &built_successors() {
return my_successors.built_successors(); }
1110 void internal_add_built_successor( successor_type &r) {
1112 my_successors.internal_add_built_successor(r);
1115 void internal_delete_built_successor( successor_type &r) {
1117 my_successors.internal_delete_built_successor(r);
1120 size_t successor_count() {
1122 return my_successors.successor_count();
1125 void copy_successors(successor_list_type &v) {
1127 my_successors.copy_successors(v);
1132 bool try_get( output_type &v ) {
1137 if ( my_has_cached_item ) {
1139 my_has_cached_item =
false;
1149 bool try_reserve( output_type &v ) {
1151 if ( my_reserved ) {
1155 if ( my_has_cached_item ) {
1166 bool try_release( ) {
1168 __TBB_ASSERT( my_reserved && my_has_cached_item,
"releasing non-existent reservation" );
1169 my_reserved =
false;
1170 if(!my_successors.empty())
1176 bool try_consume( ) {
1178 __TBB_ASSERT( my_reserved && my_has_cached_item,
"consuming non-existent reservation" );
1179 my_reserved =
false;
1180 my_has_cached_item =
false;
1181 if ( !my_successors.empty() ) {
1191 if ( !my_successors.empty() )
1195 template<
typename Body>
1196 Body copy_function_object() {
1201 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1203 my_successors.built_successors().sender_extract(*
this);
1204 my_active = init_my_active;
1205 my_reserved =
false;
1206 if(my_has_cached_item) my_has_cached_item =
false;
1213 void reset_node( reset_flags f) {
1214 my_active = init_my_active;
1216 if(my_has_cached_item) {
1217 my_has_cached_item =
false;
1219 if(f & rf_clear_edges) my_successors.clear();
1220 if(f & rf_reset_bodies) {
1226 this->my_graph.add_task_to_reset_list(create_put_task());
1232 bool init_my_active;
1237 bool my_has_cached_item;
1238 output_type my_cached_item;
1241 bool try_reserve_apply_body(output_type &v) {
1243 if ( my_reserved ) {
1246 if ( !my_has_cached_item ) {
1247 tbb::internal::fgt_begin_body( my_body );
1248 bool r = (*my_body)(my_cached_item);
1249 tbb::internal::fgt_end_body( my_body );
1251 my_has_cached_item =
true;
1254 if ( my_has_cached_item ) {
1267 task* create_put_task() {
1268 return (
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1274 if(this->my_graph.is_active()) {
1275 FLOW_SPAWN( *create_put_task());
1281 task * apply_body_bypass( ) {
1283 if ( !try_reserve_apply_body(v) )
1286 task *last_task = my_successors.try_put_task(v);
1295 template<
typename T>
1296 struct allocate_buffer {
1297 static const bool value =
false;
1301 struct allocate_buffer<queueing> {
1302 static const bool value =
true;
1306 template <
typename Input,
typename Output = continue_msg,
typename Policy = queueing,
typename Allocator=cache_aligned_allocator<Input> >
1307 class function_node :
public graph_node,
public internal::function_input<Input,Output,Allocator>,
public internal::function_output<Output> {
1310 typedef Output output_type;
1311 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
1312 typedef internal::function_input_queue<input_type, Allocator> input_queue_type;
1313 typedef internal::function_output<output_type> fOutput_type;
1314 typedef typename fInput_type::predecessor_type predecessor_type;
1315 typedef typename fOutput_type::successor_type successor_type;
1316 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1317 typedef typename fInput_type::predecessor_list_type predecessor_list_type;
1318 typedef typename fOutput_type::successor_list_type successor_list_type;
1320 using fInput_type::my_predecessors;
1326 template<
typename Body >
1327 function_node( graph &g,
size_t concurrency, Body body ) :
1328 graph_node(g), fInput_type(g, concurrency, body, allocate_buffer<Policy>::value ?
1329 new input_queue_type( ) : NULL ) {
1330 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph,
1335 function_node(
const function_node& src ) :
1336 graph_node(src.graph_node::my_graph),
1337 fInput_type(src, allocate_buffer<Policy>::value ?
new input_queue_type : NULL),
1339 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph,
1343 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1344 void set_name(
const char *name ) {
1345 tbb::internal::fgt_node_desc(
this, name );
1349 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1351 my_predecessors.built_predecessors().receiver_extract(*
this);
1352 successors().built_successors().sender_extract(*
this);
1360 using fInput_type::try_put_task;
1365 void reset_node(reset_flags f) {
1366 fInput_type::reset_function_input(f);
1368 if(f & rf_clear_edges) {
1369 successors().clear();
1370 my_predecessors.clear();
1372 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(),
"function_node successors not empty");
1373 __TBB_ASSERT(this->my_predecessors.empty(),
"function_node predecessors not empty");
1381 template <
typename Input,
typename Output,
typename Policy = queueing,
typename Allocator=cache_aligned_allocator<Input> >
1382 class multifunction_node :
1384 public internal::multifunction_input
1387 typename internal::wrap_tuple_elements<
1388 tbb::flow::tuple_size<Output>::value,
1389 internal::multifunction_output,
1395 using graph_node::my_graph;
1396 static const int N = tbb::flow::tuple_size<Output>::value;
1401 typedef internal::multifunction_input<input_type, output_ports_type, Allocator> fInput_type;
1402 typedef internal::function_input_queue<input_type, Allocator> input_queue_type;
1404 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
1405 using fInput_type::my_predecessors;
1407 template<
typename Body>
1408 multifunction_node( graph &g,
size_t concurrency, Body body ) :
1409 graph_node(g), base_type(g,concurrency, body, allocate_buffer<Policy>::value ?
new input_queue_type : NULL) {
1410 tbb::internal::fgt_multioutput_node_with_body<N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1412 this->output_ports(), this->my_body );
1415 multifunction_node(
const multifunction_node &other) :
1416 graph_node(other.graph_node::my_graph), base_type(other, allocate_buffer<Policy>::value ?
new input_queue_type : NULL) {
1417 tbb::internal::fgt_multioutput_node_with_body<N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1419 this->output_ports(), this->my_body );
1422 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1423 void set_name(
const char *name ) {
1424 tbb::internal::fgt_multioutput_node_desc(
this, name );
1428 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1430 my_predecessors.built_predecessors().receiver_extract(*
this);
1431 base_type::extract();
1436 void reset_node(reset_flags f) { base_type::reset(f); }
1441 template<
typename TupleType,
typename Allocator=cache_aligned_allocator<TupleType> >
1442 class split_node :
public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
1443 static const int N = tbb::flow::tuple_size<TupleType>::value;
1444 typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
1446 typedef typename base_type::output_ports_type output_ports_type;
1447 typedef typename base_type::output_type output_type;
1449 struct splitting_body {
1450 void operator()(
const TupleType& t, output_ports_type &p) {
1451 internal::emit_element<N>::emit_this(t, p);
1456 typedef Allocator allocator_type;
1457 split_node(graph &g) : base_type(g, unlimited, splitting_body()) {
1458 tbb::internal::fgt_multioutput_node<N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1462 split_node(
const split_node & other) : base_type(other) {
1463 tbb::internal::fgt_multioutput_node<N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1467 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1468 void set_name(
const char *name ) {
1469 tbb::internal::fgt_multioutput_node_desc(
this, name );
1476 template <
typename Output>
1477 class continue_node :
public graph_node,
public internal::continue_input<Output>,
public internal::function_output<Output> {
1479 using graph_node::my_graph;
1482 typedef Output output_type;
1483 typedef internal::continue_input<Output> fInput_type;
1484 typedef internal::function_output<output_type> fOutput_type;
1485 typedef typename fInput_type::predecessor_type predecessor_type;
1486 typedef typename fOutput_type::successor_type successor_type;
1489 template <
typename Body >
1490 continue_node( graph &g, Body body ) :
1491 graph_node(g), internal::continue_input<output_type>( g, body ) {
1492 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1499 template <
typename Body >
1500 continue_node( graph &g,
int number_of_predecessors, Body body ) :
1501 graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body ) {
1502 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1508 continue_node(
const continue_node& src ) :
1509 graph_node(src.graph_node::my_graph), internal::continue_input<output_type>(src),
1510 internal::function_output<Output>() {
1511 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1516 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1517 void set_name(
const char *name ) {
1518 tbb::internal::fgt_node_desc(
this, name );
1522 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1524 fInput_type::my_built_predecessors.receiver_extract(*
this);
1525 successors().built_successors().sender_extract(*
this);
1533 using fInput_type::try_put_task;
1536 void reset_node(reset_flags f) {
1537 fInput_type::reset_receiver(f);
1538 if(f & rf_clear_edges)successors().clear();
1539 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(),
"continue_node not reset");
1544 template<
typename T >
1545 class overwrite_node :
public graph_node,
public receiver<T>,
public sender<T> {
1547 using graph_node::my_graph;
1550 typedef T output_type;
1553 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1560 overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(
false) {
1561 my_successors.set_owner(
this );
1562 tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1567 overwrite_node(
const overwrite_node& src ) :
1570 my_successors.set_owner(
this );
1571 tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1575 ~overwrite_node() {}
1577 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1578 void set_name(
const char *name ) {
1579 tbb::internal::fgt_node_desc(
this, name );
1583 bool register_successor( successor_type &s ) {
1585 if (my_buffer_is_valid && this->my_graph.is_active()) {
1587 if ( s.try_put( my_buffer ) || !s.register_predecessor( *
this ) ) {
1589 my_successors.register_successor( s );
1596 my_successors.register_successor( s );
1601 bool remove_successor( successor_type &s ) {
1603 my_successors.remove_successor(s);
1607 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1608 built_predecessors_type &built_predecessors() {
return my_built_predecessors; }
1609 built_successors_type &built_successors() {
return my_successors.built_successors(); }
1611 void internal_add_built_successor( successor_type &s) {
1613 my_successors.internal_add_built_successor(s);
1616 void internal_delete_built_successor( successor_type &s) {
1618 my_successors.internal_delete_built_successor(s);
1621 size_t successor_count() {
1623 return my_successors.successor_count();
1626 void copy_successors(successor_list_type &v) {
1628 my_successors.copy_successors(v);
1631 void internal_add_built_predecessor( predecessor_type &p) {
1633 my_built_predecessors.add_edge(p);
1636 void internal_delete_built_predecessor( predecessor_type &p) {
1638 my_built_predecessors.delete_edge(p);
1641 size_t predecessor_count() {
1643 return my_built_predecessors.edge_count();
1646 void copy_predecessors(predecessor_list_type &v) {
1648 my_built_predecessors.copy_edges(v);
1652 my_buffer_is_valid =
false;
1653 built_successors().sender_extract(*
this);
1654 built_predecessors().receiver_extract(*
this);
1659 bool try_get( input_type &v ) {
1661 if ( my_buffer_is_valid ) {
1670 return my_buffer_is_valid;
1675 my_buffer_is_valid =
false;
1682 task * try_put_task(
const input_type &v ) {
1684 return try_put_task_impl(v);
1687 task * try_put_task_impl(
const input_type &v) {
1689 my_buffer_is_valid =
true;
1690 task * rtask = my_successors.try_put_task(v);
1691 if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
1697 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1698 internal::edge_container<predecessor_type> my_built_predecessors;
1700 input_type my_buffer;
1701 bool my_buffer_is_valid;
1702 void reset_receiver(reset_flags ) {}
1704 void reset_node( reset_flags f) {
1705 my_buffer_is_valid =
false;
1706 if (f&rf_clear_edges) {
1707 my_successors.clear();
1712 template<
typename T >
1713 class write_once_node :
public overwrite_node<T> {
1716 typedef T output_type;
1721 write_once_node(graph& g) : overwrite_node<T>(g) {
1722 tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1728 write_once_node(
const write_once_node& src ) : overwrite_node<T>(src) {
1729 tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1734 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1735 void set_name(
const char *name ) {
1736 tbb::internal::fgt_node_desc(
this, name );
1744 task *try_put_task(
const T &v ) {
1746 return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
1751 template <
typename T>
1752 class broadcast_node :
public graph_node,
public receiver<T>,
public sender<T> {
1754 using graph_node::my_graph;
1757 typedef T output_type;
1760 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1766 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1767 internal::edge_container<predecessor_type> my_built_predecessors;
1772 broadcast_node(graph& g) : graph_node(g) {
1773 my_successors.set_owner(
this );
1774 tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1779 broadcast_node(
const broadcast_node& src ) :
1782 my_successors.set_owner(
this );
1783 tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1787 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1788 void set_name(
const char *name ) {
1789 tbb::internal::fgt_node_desc(
this, name );
1794 virtual bool register_successor( successor_type &r ) {
1795 my_successors.register_successor( r );
1800 virtual bool remove_successor( successor_type &r ) {
1801 my_successors.remove_successor( r );
1805 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1808 built_successors_type &built_successors() {
return my_successors.built_successors(); }
1810 void internal_add_built_successor(successor_type &r) {
1811 my_successors.internal_add_built_successor(r);
1814 void internal_delete_built_successor(successor_type &r) {
1815 my_successors.internal_delete_built_successor(r);
1818 size_t successor_count() {
1819 return my_successors.successor_count();
1822 void copy_successors(successor_list_type &v) {
1823 my_successors.copy_successors(v);
1828 built_predecessors_type &built_predecessors() {
return my_built_predecessors; }
1830 void internal_add_built_predecessor( predecessor_type &p) {
1832 my_built_predecessors.add_edge(p);
1835 void internal_delete_built_predecessor( predecessor_type &p) {
1837 my_built_predecessors.delete_edge(p);
1840 size_t predecessor_count() {
1842 return my_built_predecessors.edge_count();
1845 void copy_predecessors(predecessor_list_type &v) {
1847 my_built_predecessors.copy_edges(v);
1851 my_built_predecessors.receiver_extract(*
this);
1852 my_successors.built_successors().sender_extract(*
this);
1861 task *try_put_task(
const T& t) {
1862 task *new_task = my_successors.try_put_task(t);
1863 if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1867 void reset_receiver(reset_flags ) {}
1869 void reset_node(reset_flags f) {
1870 if (f&rf_clear_edges) {
1871 my_successors.clear();
1872 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1873 my_built_predecessors.clear();
1876 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(),
"Error resetting broadcast_node");
1881 template <
typename T,
typename A=cache_aligned_allocator<T> >
1884 using graph_node::my_graph;
1887 typedef T output_type;
1890 typedef buffer_node<T, A> class_type;
1891 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1896 typedef size_t size_type;
1899 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1900 internal::edge_container<predecessor_type> my_built_predecessors;
1905 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1906 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1907 , add_blt_succ, del_blt_succ,
1908 add_blt_pred, del_blt_pred,
1909 blt_succ_cnt, blt_pred_cnt,
1910 blt_succ_cpy, blt_pred_cpy
1913 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1919 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1924 predecessor_type *p;
1926 successor_list_type *svec;
1927 predecessor_list_type *pvec;
1934 buffer_operation(
const T& e, op_type t) : type(
char(t))
1936 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1937 , ltask(NULL), elem(const_cast<T*>(&e))
1939 , elem(const_cast<T*>(&e)) , ltask(NULL)
1942 buffer_operation(op_type t) : type(
char(t)), ltask(NULL) {}
1945 bool forwarder_busy;
1950 virtual void handle_operations(buffer_operation *op_list) {
1951 handle_operations_impl(op_list,
this);
1954 template<
typename derived_type>
1955 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1956 __TBB_ASSERT(static_cast<class_type*>(derived) ==
this,
"'this' is not a base class for derived");
1958 buffer_operation *tmp = NULL;
1959 bool try_forwarding=
false;
1962 op_list = op_list->next;
1963 switch (tmp->type) {
1964 case reg_succ: internal_reg_succ(tmp); try_forwarding =
true;
break;
1965 case rem_succ: internal_rem_succ(tmp);
break;
1966 case req_item: internal_pop(tmp);
break;
1967 case res_item: internal_reserve(tmp);
break;
1968 case rel_res: internal_release(tmp); try_forwarding =
true;
break;
1969 case con_res: internal_consume(tmp); try_forwarding =
true;
break;
1970 case put_item: internal_push(tmp); try_forwarding = (tmp->status == SUCCEEDED);
break;
1971 case try_fwd_task: internal_forward_task(tmp);
break;
1972 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1974 case add_blt_succ: internal_add_built_succ(tmp);
break;
1975 case del_blt_succ: internal_del_built_succ(tmp);
break;
1976 case add_blt_pred: internal_add_built_pred(tmp);
break;
1977 case del_blt_pred: internal_del_built_pred(tmp);
break;
1978 case blt_succ_cnt: internal_succ_cnt(tmp);
break;
1979 case blt_pred_cnt: internal_pred_cnt(tmp);
break;
1980 case blt_succ_cpy: internal_copy_succs(tmp);
break;
1981 case blt_pred_cpy: internal_copy_preds(tmp);
break;
1988 if (try_forwarding && !forwarder_busy) {
1989 if(this->my_graph.is_active()) {
1990 forwarder_busy =
true;
1991 task *new_task =
new(task::allocate_additional_child_of(*(this->my_graph.root_task())))
internal::
1993 < buffer_node<input_type, A> >(*
this);
1996 tbb::task *z = tmp->ltask;
1997 tmp->ltask = combine_tasks(z, new_task);
2002 inline task *grab_forwarding_task( buffer_operation &op_data) {
2003 return op_data.ltask;
2006 inline bool enqueue_forwarding_task(buffer_operation &op_data) {
2007 task *ft = grab_forwarding_task(op_data);
2016 virtual task *forward_task() {
2017 buffer_operation op_data(try_fwd_task);
2018 task *last_task = NULL;
2020 op_data.status = WAIT;
2021 op_data.ltask = NULL;
2022 my_aggregator.execute(&op_data);
2023 tbb::task *xtask = op_data.ltask;
2024 last_task = combine_tasks(last_task, xtask);
2025 }
while (op_data.status == SUCCEEDED);
2030 virtual void internal_reg_succ(buffer_operation *op) {
2031 my_successors.register_successor(*(op->r));
2032 __TBB_store_with_release(op->status, SUCCEEDED);
2036 virtual void internal_rem_succ(buffer_operation *op) {
2037 my_successors.remove_successor(*(op->r));
2038 __TBB_store_with_release(op->status, SUCCEEDED);
2041 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 2044 built_successors_type &built_successors() {
return my_successors.built_successors(); }
2046 virtual void internal_add_built_succ(buffer_operation *op) {
2047 my_successors.internal_add_built_successor(*(op->r));
2048 __TBB_store_with_release(op->status, SUCCEEDED);
2051 virtual void internal_del_built_succ(buffer_operation *op) {
2052 my_successors.internal_delete_built_successor(*(op->r));
2053 __TBB_store_with_release(op->status, SUCCEEDED);
2058 built_predecessors_type &built_predecessors() {
return my_built_predecessors; }
2060 virtual void internal_add_built_pred(buffer_operation *op) {
2061 my_built_predecessors.add_edge(*(op->p));
2062 __TBB_store_with_release(op->status, SUCCEEDED);
2065 virtual void internal_del_built_pred(buffer_operation *op) {
2066 my_built_predecessors.delete_edge(*(op->p));
2067 __TBB_store_with_release(op->status, SUCCEEDED);
2070 virtual void internal_succ_cnt(buffer_operation *op) {
2071 op->cnt_val = my_successors.successor_count();
2072 __TBB_store_with_release(op->status, SUCCEEDED);
2075 virtual void internal_pred_cnt(buffer_operation *op) {
2076 op->cnt_val = my_built_predecessors.edge_count();
2077 __TBB_store_with_release(op->status, SUCCEEDED);
2080 virtual void internal_copy_succs(buffer_operation *op) {
2081 my_successors.copy_successors(*(op->svec));
2082 __TBB_store_with_release(op->status, SUCCEEDED);
2085 virtual void internal_copy_preds(buffer_operation *op) {
2086 my_built_predecessors.copy_edges(*(op->pvec));
2087 __TBB_store_with_release(op->status, SUCCEEDED);
2095 bool is_item_valid() {
2096 return this->my_item_valid(this->my_tail - 1);
2099 void try_put_and_add_task(task*& last_task) {
2100 task *new_task = my_successors.try_put_task(this->back());
2102 last_task = combine_tasks(last_task, new_task);
2103 this->destroy_back();
2109 virtual void internal_forward_task(buffer_operation *op) {
2110 internal_forward_task_impl(op,
this);
2113 template<
typename derived_type>
2114 void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
2115 __TBB_ASSERT(static_cast<class_type*>(derived) ==
this,
"'this' is not a base class for derived");
2117 if (this->my_reserved || !derived->is_item_valid()) {
2118 __TBB_store_with_release(op->status, FAILED);
2119 this->forwarder_busy =
false;
2123 task * last_task = NULL;
2124 size_type counter = my_successors.size();
2125 for (; counter > 0 && derived->is_item_valid(); --counter)
2126 derived->try_put_and_add_task(last_task);
2128 op->ltask = last_task;
2129 if (last_task && !counter) {
2130 __TBB_store_with_release(op->status, SUCCEEDED);
2133 __TBB_store_with_release(op->status, FAILED);
2134 forwarder_busy =
false;
2138 virtual void internal_push(buffer_operation *op) {
2139 this->push_back(*(op->elem));
2140 __TBB_store_with_release(op->status, SUCCEEDED);
2143 virtual void internal_pop(buffer_operation *op) {
2144 if(this->pop_back(*(op->elem))) {
2145 __TBB_store_with_release(op->status, SUCCEEDED);
2148 __TBB_store_with_release(op->status, FAILED);
2152 virtual void internal_reserve(buffer_operation *op) {
2153 if(this->reserve_front(*(op->elem))) {
2154 __TBB_store_with_release(op->status, SUCCEEDED);
2157 __TBB_store_with_release(op->status, FAILED);
2161 virtual void internal_consume(buffer_operation *op) {
2162 this->consume_front();
2163 __TBB_store_with_release(op->status, SUCCEEDED);
2166 virtual void internal_release(buffer_operation *op) {
2167 this->release_front();
2168 __TBB_store_with_release(op->status, SUCCEEDED);
2174 forwarder_busy(
false) {
2175 my_successors.set_owner(
this);
2176 my_aggregator.initialize_handler(handler_type(
this));
2177 tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2182 buffer_node(
const buffer_node& src ) : graph_node(src.my_graph),
2184 forwarder_busy =
false;
2185 my_successors.set_owner(
this);
2186 my_aggregator.initialize_handler(handler_type(
this));
2187 tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2191 virtual ~buffer_node() {}
2193 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2194 void set_name(
const char *name ) {
2195 tbb::internal::fgt_node_desc(
this, name );
2205 bool register_successor( successor_type &r ) {
2206 buffer_operation op_data(reg_succ);
2208 my_aggregator.execute(&op_data);
2209 (void)enqueue_forwarding_task(op_data);
2213 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 2214 void internal_add_built_successor( successor_type &r) {
2215 buffer_operation op_data(add_blt_succ);
2217 my_aggregator.execute(&op_data);
2220 void internal_delete_built_successor( successor_type &r) {
2221 buffer_operation op_data(del_blt_succ);
2223 my_aggregator.execute(&op_data);
2226 void internal_add_built_predecessor( predecessor_type &p) {
2227 buffer_operation op_data(add_blt_pred);
2229 my_aggregator.execute(&op_data);
2232 void internal_delete_built_predecessor( predecessor_type &p) {
2233 buffer_operation op_data(del_blt_pred);
2235 my_aggregator.execute(&op_data);
2238 size_t predecessor_count() {
2239 buffer_operation op_data(blt_pred_cnt);
2240 my_aggregator.execute(&op_data);
2241 return op_data.cnt_val;
2244 size_t successor_count() {
2245 buffer_operation op_data(blt_succ_cnt);
2246 my_aggregator.execute(&op_data);
2247 return op_data.cnt_val;
2250 void copy_predecessors( predecessor_list_type &v ) {
2251 buffer_operation op_data(blt_pred_cpy);
2253 my_aggregator.execute(&op_data);
2256 void copy_successors( successor_list_type &v ) {
2257 buffer_operation op_data(blt_succ_cpy);
2259 my_aggregator.execute(&op_data);
2267 bool remove_successor( successor_type &r ) {
2268 r.remove_predecessor(*
this);
2269 buffer_operation op_data(rem_succ);
2271 my_aggregator.execute(&op_data);
2275 (void)enqueue_forwarding_task(op_data);
2282 bool try_get( T &v ) {
2283 buffer_operation op_data(req_item);
2285 my_aggregator.execute(&op_data);
2286 (void)enqueue_forwarding_task(op_data);
2287 return (op_data.status==SUCCEEDED);
2293 bool try_reserve( T &v ) {
2294 buffer_operation op_data(res_item);
2296 my_aggregator.execute(&op_data);
2297 (void)enqueue_forwarding_task(op_data);
2298 return (op_data.status==SUCCEEDED);
2303 bool try_release() {
2304 buffer_operation op_data(rel_res);
2305 my_aggregator.execute(&op_data);
2306 (void)enqueue_forwarding_task(op_data);
2312 bool try_consume() {
2313 buffer_operation op_data(con_res);
2314 my_aggregator.execute(&op_data);
2315 (void)enqueue_forwarding_task(op_data);
2325 task *try_put_task(
const T &t) {
2326 buffer_operation op_data(t, put_item);
2327 my_aggregator.execute(&op_data);
2328 task *ft = grab_forwarding_task(op_data);
2332 if(ft && op_data.status == FAILED) {
2336 FLOW_SPAWN(*ft); ft = NULL;
2338 else if(!ft && op_data.status == SUCCEEDED) {
2339 ft = SUCCESSFULLY_ENQUEUED;
2344 void reset_receiver(reset_flags ) { }
2346 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 2349 my_built_predecessors.receiver_extract(*
this);
2350 my_successors.built_successors().sender_extract(*
this);
2355 void reset_node( reset_flags f) {
2358 if (f&rf_clear_edges) {
2359 my_successors.clear();
2360 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 2361 my_built_predecessors.clear();
2364 forwarder_busy =
false;
2371 template <
typename T,
typename A=cache_aligned_allocator<T> >
2372 class queue_node :
public buffer_node<T, A> {
2374 typedef buffer_node<T, A> base_type;
2375 typedef typename base_type::size_type size_type;
2376 typedef typename base_type::buffer_operation queue_operation;
2377 typedef queue_node class_type;
2379 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2382 template<
typename,
typename>
friend class buffer_node;
2384 bool is_item_valid() {
2385 return this->my_item_valid(this->my_head);
2388 void try_put_and_add_task(task*& last_task) {
2389 task *new_task = this->my_successors.try_put_task(this->front());
2391 last_task = combine_tasks(last_task, new_task);
2392 this->destroy_front();
2397 void internal_forward_task(queue_operation *op) {
2398 this->internal_forward_task_impl(op,
this);
2401 void internal_pop(queue_operation *op) {
2402 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2403 __TBB_store_with_release(op->status, FAILED);
2406 this->pop_front(*(op->elem));
2407 __TBB_store_with_release(op->status, SUCCEEDED);
2410 void internal_reserve(queue_operation *op) {
2411 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2412 __TBB_store_with_release(op->status, FAILED);
2415 this->reserve_front(*(op->elem));
2416 __TBB_store_with_release(op->status, SUCCEEDED);
2419 void internal_consume(queue_operation *op) {
2420 this->consume_front();
2421 __TBB_store_with_release(op->status, SUCCEEDED);
2426 typedef T output_type;
2431 queue_node( graph &g ) : base_type(g) {
2432 tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2438 queue_node(
const queue_node& src) : base_type(src) {
2439 tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2444 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2445 void set_name(
const char *name ) {
2446 tbb::internal::fgt_node_desc(
this, name );
2451 void reset_node( reset_flags f) {
2452 base_type::reset_node(f);
2457 template<
typename T,
typename A=cache_aligned_allocator<T> >
2458 class sequencer_node :
public queue_node<T, A> {
2464 typedef T output_type;
2469 template<
typename Sequencer >
2470 sequencer_node( graph &g,
const Sequencer& s ) : queue_node<T, A>(g),
2472 tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2478 sequencer_node(
const sequencer_node& src ) : queue_node<T, A>(src),
2479 my_sequencer( src.my_sequencer->clone() ) {
2480 tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2486 ~sequencer_node() {
delete my_sequencer; }
2488 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2489 void set_name(
const char *name ) {
2490 tbb::internal::fgt_node_desc(
this, name );
2495 typedef typename buffer_node<T, A>::size_type size_type;
2496 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
2498 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2501 void internal_push(sequencer_operation *op) {
2502 size_type tag = (*my_sequencer)(*(op->elem));
2503 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES 2504 if (tag < this->my_head) {
2506 __TBB_store_with_release(op->status, FAILED);
2511 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2513 if (this->size(new_tail) > this->capacity()) {
2514 this->grow_my_array(this->size(new_tail));
2516 this->my_tail = new_tail;
2517 if (this->place_item(tag, *(op->elem))) {
2518 __TBB_store_with_release(op->status, SUCCEEDED);
2522 __TBB_store_with_release(op->status, FAILED);
2528 template<
typename T,
typename Compare = std::less<T>,
typename A=cache_aligned_allocator<T> >
2529 class priority_queue_node :
public buffer_node<T, A> {
2532 typedef T output_type;
2533 typedef buffer_node<T,A> base_type;
2534 typedef priority_queue_node class_type;
2539 priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {
2540 tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2546 priority_queue_node(
const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2547 tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2552 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2553 void set_name(
const char *name ) {
2554 tbb::internal::fgt_node_desc(
this, name );
2561 void reset_node( reset_flags f) {
2563 base_type::reset_node(f);
2566 typedef typename buffer_node<T, A>::size_type size_type;
2567 typedef typename buffer_node<T, A>::item_type item_type;
2568 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
2570 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2573 void internal_forward_task(prio_operation *op) {
2574 this->internal_forward_task_impl(op,
this);
2577 void handle_operations(prio_operation *op_list) {
2578 this->handle_operations_impl(op_list,
this);
2581 void internal_push(prio_operation *op) {
2582 prio_push(*(op->elem));
2583 __TBB_store_with_release(op->status, SUCCEEDED);
2586 void internal_pop(prio_operation *op) {
2588 if ( this->my_reserved ==
true || this->my_tail == 0 ) {
2589 __TBB_store_with_release(op->status, FAILED);
2593 *(op->elem) = prio();
2594 __TBB_store_with_release(op->status, SUCCEEDED);
2600 void internal_reserve(prio_operation *op) {
2601 if (this->my_reserved ==
true || this->my_tail == 0) {
2602 __TBB_store_with_release(op->status, FAILED);
2605 this->my_reserved =
true;
2606 *(op->elem) = prio();
2607 reserved_item = *(op->elem);
2608 __TBB_store_with_release(op->status, SUCCEEDED);
2612 void internal_consume(prio_operation *op) {
2613 __TBB_store_with_release(op->status, SUCCEEDED);
2614 this->my_reserved =
false;
2615 reserved_item = input_type();
2618 void internal_release(prio_operation *op) {
2619 __TBB_store_with_release(op->status, SUCCEEDED);
2620 prio_push(reserved_item);
2621 this->my_reserved =
false;
2622 reserved_item = input_type();
2626 template<
typename,
typename>
friend class buffer_node;
2629 if (mark < this->my_tail) heapify();
2630 __TBB_ASSERT(mark == this->my_tail,
"mark unequal after heapify");
2633 bool is_item_valid() {
2634 return this->my_tail > 0;
2637 void try_put_and_add_task(task*& last_task) {
2638 task * new_task = this->my_successors.try_put_task(this->prio());
2640 last_task = combine_tasks(last_task, new_task);
2649 input_type reserved_item;
2652 bool prio_use_tail() {
2653 __TBB_ASSERT(mark <= this->my_tail,
"mark outside bounds before test");
2654 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2658 void prio_push(
const T &src) {
2659 if ( this->my_tail >= this->my_array_size )
2660 this->grow_my_array( this->my_tail + 1 );
2661 (void) this->place_item(this->my_tail, src);
2663 __TBB_ASSERT(mark < this->my_tail,
"mark outside bounds after push");
2670 if (prio_use_tail()) {
2673 this->destroy_item(this->my_tail-1);
2675 __TBB_ASSERT(mark <= this->my_tail,
"mark outside bounds after pop");
2678 this->destroy_item(0);
2679 if(this->my_tail > 1) {
2681 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2682 this->move_item(0,this->my_tail - 1);
2685 if(mark > this->my_tail) --mark;
2686 if (this->my_tail > 1)
2688 __TBB_ASSERT(mark <= this->my_tail,
"mark outside bounds after pop");
2692 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2697 if(this->my_tail == 0) {
2701 if (!mark) mark = 1;
2702 for (; mark<this->my_tail; ++mark) {
2703 size_type cur_pos = mark;
2704 input_type to_place;
2705 this->fetch_item(mark,to_place);
2707 size_type parent = (cur_pos-1)>>1;
2708 if (!compare(this->get_my_item(parent), to_place))
2710 this->move_item(cur_pos, parent);
2713 (void) this->place_item(cur_pos, to_place);
2719 size_type cur_pos=0, child=1;
2720 while (child < mark) {
2721 size_type target = child;
2723 compare(this->get_my_item(child),
2724 this->get_my_item(child+1)))
2727 if (compare(this->get_my_item(target),
2728 this->get_my_item(cur_pos)))
2731 this->swap_items(cur_pos, target);
2733 child = (cur_pos<<1)+1;
2742 template<
typename T >
2745 using graph_node::my_graph;
2748 typedef T output_type;
2751 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 2759 size_t my_threshold;
2765 int init_decrement_predecessors;
2772 bool check_conditions() {
2773 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2777 task *forward_task() {
2780 bool reserved =
false;
2783 if ( check_conditions() )
2792 if ( (my_predecessors.try_reserve(v)) ==
true ){
2794 if ( (rval = my_successors.try_put_task(v)) != NULL ){
2799 my_predecessors.try_consume();
2800 if ( check_conditions() ) {
2801 if ( this->my_graph.is_active() ) {
2802 task *rtask =
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2804 FLOW_SPAWN (*rtask);
2817 if (reserved) my_predecessors.try_release();
2818 if ( check_conditions() ) {
2819 if ( this->my_graph.is_active() ) {
2820 task *rtask =
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2822 __TBB_ASSERT(!rval,
"Have two tasks to handle");
2831 __TBB_ASSERT(
false,
"Should never be called");
2835 task * decrement_counter() {
2838 if(my_count) --my_count;
2840 return forward_task();
2848 limiter_node(graph &g,
size_t threshold,
int num_decrement_predecessors=0) :
2849 graph_node(g), my_threshold(threshold), my_count(0), my_tries(0),
2850 init_decrement_predecessors(num_decrement_predecessors),
2851 decrement(num_decrement_predecessors)
2853 my_predecessors.set_owner(
this);
2854 my_successors.set_owner(
this);
2855 decrement.set_owner(
this);
2856 tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2862 limiter_node(
const limiter_node& src ) :
2864 my_threshold(src.my_threshold), my_count(0), my_tries(0),
2865 init_decrement_predecessors(src.init_decrement_predecessors),
2866 decrement(src.init_decrement_predecessors)
2868 my_predecessors.set_owner(
this);
2869 my_successors.set_owner(
this);
2870 decrement.set_owner(
this);
2871 tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2876 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2877 void set_name(
const char *name ) {
2878 tbb::internal::fgt_node_desc(
this, name );
2883 bool register_successor( successor_type &r ) {
2885 bool was_empty = my_successors.empty();
2886 my_successors.register_successor(r);
2888 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2889 if ( this->my_graph.is_active() ) {
2890 FLOW_SPAWN( (*
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2899 bool remove_successor( successor_type &r ) {
2900 r.remove_predecessor(*
this);
2901 my_successors.remove_successor(r);
2905 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 2906 built_successors_type &built_successors() {
return my_successors.built_successors(); }
2907 built_predecessors_type &built_predecessors() {
return my_predecessors.built_predecessors(); }
2909 void internal_add_built_successor(successor_type &src) {
2910 my_successors.internal_add_built_successor(src);
2913 void internal_delete_built_successor(successor_type &src) {
2914 my_successors.internal_delete_built_successor(src);
2917 size_t successor_count() {
return my_successors.successor_count(); }
2919 void copy_successors(successor_list_type &v) {
2920 my_successors.copy_successors(v);
2923 void internal_add_built_predecessor(predecessor_type &src) {
2924 my_predecessors.internal_add_built_predecessor(src);
2927 void internal_delete_built_predecessor(predecessor_type &src) {
2928 my_predecessors.internal_delete_built_predecessor(src);
2931 size_t predecessor_count() {
return my_predecessors.predecessor_count(); }
2933 void copy_predecessors(predecessor_list_type &v) {
2934 my_predecessors.copy_predecessors(v);
2939 my_successors.built_successors().sender_extract(*
this);
2940 my_predecessors.built_predecessors().receiver_extract(*
this);
2941 decrement.built_predecessors().receiver_extract(decrement);
2946 bool register_predecessor( predecessor_type &src ) {
2948 my_predecessors.add( src );
2949 if ( my_count + my_tries < my_threshold && !my_successors.empty() && this->my_graph.is_active() ) {
2950 FLOW_SPAWN( (*
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2957 bool remove_predecessor( predecessor_type &src ) {
2958 my_predecessors.remove( src );
2968 task *try_put_task(
const T &t ) {
2971 if ( my_count + my_tries >= my_threshold )
2977 task * rtask = my_successors.try_put_task(t);
2982 if ( check_conditions() && this->my_graph.is_active() ) {
2983 rtask =
new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2995 void reset_receiver(reset_flags ) {
2996 __TBB_ASSERT(
false,NULL);
2999 void reset_node( reset_flags f) {
3001 if(f & rf_clear_edges) {
3002 my_predecessors.clear();
3003 my_successors.clear();
3007 my_predecessors.reset( );
3009 decrement.reset_receiver(f);
3013 #include "internal/_flow_graph_join_impl.h" 3018 using internal::input_port;
3019 using internal::tag_value;
3021 template<
typename OutputTuple,
typename JP=queueing>
class join_node;
3023 template<
typename OutputTuple>
3024 class join_node<OutputTuple,reserving>:
public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
3026 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3029 typedef OutputTuple output_type;
3030 typedef typename unfolded_type::input_ports_type input_ports_type;
3031 join_node(graph &g) : unfolded_type(g) {
3032 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3035 join_node(
const join_node &other) : unfolded_type(other) {
3036 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3040 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3041 void set_name(
const char *name ) {
3042 tbb::internal::fgt_node_desc(
this, name );
3048 template<
typename OutputTuple>
3049 class join_node<OutputTuple,queueing>:
public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
3051 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3054 typedef OutputTuple output_type;
3055 typedef typename unfolded_type::input_ports_type input_ports_type;
3056 join_node(graph &g) : unfolded_type(g) {
3057 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3060 join_node(
const join_node &other) : unfolded_type(other) {
3061 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3065 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3066 void set_name(
const char *name ) {
3067 tbb::internal::fgt_node_desc(
this, name );
3075 template<
typename OutputTuple,
typename K,
typename KHash>
3077 key_matching_port, OutputTuple, key_matching<K,KHash> > {
3079 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3082 typedef OutputTuple output_type;
3083 typedef typename unfolded_type::input_ports_type input_ports_type;
3085 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 3086 join_node(graph &g) : unfolded_type(g) {}
3089 template<
typename __TBB_B0,
typename __TBB_B1>
3090 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
3091 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3094 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2>
3095 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3096 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3099 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3>
3100 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3101 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3104 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4>
3105 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3106 unfolded_type(g, b0, b1, b2, b3, b4) {
3107 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3110 #if __TBB_VARIADIC_MAX >= 6 3111 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
3113 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3114 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3115 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3119 #if __TBB_VARIADIC_MAX >= 7 3120 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
3121 typename __TBB_B5,
typename __TBB_B6>
3122 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3123 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3124 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3128 #if __TBB_VARIADIC_MAX >= 8 3129 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
3130 typename __TBB_B5,
typename __TBB_B6,
typename __TBB_B7>
3131 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3132 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3133 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3137 #if __TBB_VARIADIC_MAX >= 9 3138 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
3139 typename __TBB_B5,
typename __TBB_B6,
typename __TBB_B7,
typename __TBB_B8>
3140 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3141 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3142 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3146 #if __TBB_VARIADIC_MAX >= 10 3147 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
3148 typename __TBB_B5,
typename __TBB_B6,
typename __TBB_B7,
typename __TBB_B8,
typename __TBB_B9>
3149 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3150 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3151 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3155 join_node(
const join_node &other) : unfolded_type(other) {
3156 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3160 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3161 void set_name(
const char *name ) {
3162 tbb::internal::fgt_node_desc(
this, name );
3169 #include "internal/_flow_graph_indexer_impl.h" 3176 template<
typename T0>
3179 static const int N = 1;
3181 typedef tuple<T0> InputTuple;
3184 indexer_node(graph& g) : unfolded_type(g) {
3185 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3189 indexer_node(
const indexer_node& other ) : unfolded_type(other) {
3190 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3194 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3195 void set_name(
const char *name ) {
3196 tbb::internal::fgt_node_desc(
this, name );
3201 template<
typename T0,
typename T1>
3204 static const int N = 2;
3206 typedef tuple<T0, T1> InputTuple;
3209 indexer_node(graph& g) : unfolded_type(g) {
3210 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3214 indexer_node(
const indexer_node& other ) : unfolded_type(other) {
3215 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3219 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3220 void set_name(
const char *name ) {
3221 tbb::internal::fgt_node_desc(
this, name );
3226 template<
typename T0,
typename T1,
typename T2>
3229 static const int N = 3;
3231 typedef tuple<T0, T1, T2> InputTuple;
3234 indexer_node(graph& g) : unfolded_type(g) {
3235 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3239 indexer_node(
const indexer_node& other ) : unfolded_type(other) {
3240 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3244 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3245 void set_name(
const char *name ) {
3246 tbb::internal::fgt_node_desc(
this, name );
3251 template<
typename T0,
typename T1,
typename T2,
typename T3>
3254 static const int N = 4;
3256 typedef tuple<T0, T1, T2, T3> InputTuple;
3259 indexer_node(graph& g) : unfolded_type(g) {
3260 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3264 indexer_node(
const indexer_node& other ) : unfolded_type(other) {
3265 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3269 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3270 void set_name(
const char *name ) {
3271 tbb::internal::fgt_node_desc(
this, name );
3276 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4>
3279 static const int N = 5;
3281 typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3284 indexer_node(graph& g) : unfolded_type(g) {
3285 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3289 indexer_node(
const indexer_node& other ) : unfolded_type(other) {
3290 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3294 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3295 void set_name(
const char *name ) {
3296 tbb::internal::fgt_node_desc(
this, name );
3301 #if __TBB_VARIADIC_MAX >= 6 3302 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5>
3303 class indexer_node<T0, T1, T2, T3, T4, T5> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3305 static const int N = 6;
3307 typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3310 indexer_node(graph& g) : unfolded_type(g) {
3311 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3315 indexer_node(
const indexer_node& other ) : unfolded_type(other) {
3316 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3320 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3321 void set_name(
const char *name ) {
3322 tbb::internal::fgt_node_desc(
this, name );
3326 #endif //variadic max 6 3328 #if __TBB_VARIADIC_MAX >= 7 3329 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3331 class indexer_node<T0, T1, T2, T3, T4, T5, T6> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3333 static const int N = 7;
3335 typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3338 indexer_node(graph& g) : unfolded_type(g) {
3339 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3343 indexer_node(
const indexer_node& other ) : unfolded_type(other) {
3344 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3348 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3349 void set_name(
const char *name ) {
3350 tbb::internal::fgt_node_desc(
this, name );
3354 #endif //variadic max 7 3356 #if __TBB_VARIADIC_MAX >= 8 3357 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3358 typename T6,
typename T7>
3359 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3361 static const int N = 8;
3363 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3366 indexer_node(graph& g) : unfolded_type(g) {
3367 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3371 indexer_node(
const indexer_node& other ) : unfolded_type(other) {
3372 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3376 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3377 void set_name(
const char *name ) {
3378 tbb::internal::fgt_node_desc(
this, name );
3382 #endif //variadic max 8 3384 #if __TBB_VARIADIC_MAX >= 9 3385 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3386 typename T6,
typename T7,
typename T8>
3387 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3389 static const int N = 9;
3391 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3394 indexer_node(graph& g) : unfolded_type(g) {
3395 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3399 indexer_node(
const indexer_node& other ) : unfolded_type(other) {
3400 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3404 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3405 void set_name(
const char *name ) {
3406 tbb::internal::fgt_node_desc(
this, name );
3410 #endif //variadic max 9 3412 #if __TBB_VARIADIC_MAX >= 10 3413 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3414 typename T6,
typename T7,
typename T8,
typename T9>
3417 static const int N = 10;
3419 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3420 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3422 indexer_node(graph& g) : unfolded_type(g) {
3423 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3427 indexer_node(
const indexer_node& other ) : unfolded_type(other) {
3428 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3432 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3433 void set_name(
const char *name ) {
3434 tbb::internal::fgt_node_desc(
this, name );
3438 #endif //variadic max 10 3440 #if __TBB_PREVIEW_ASYNC_MSG 3441 inline void internal_make_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3443 template<
typename T >
3446 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 3447 s.internal_add_built_predecessor(p);
3448 p.internal_add_built_successor(s);
3451 tbb::internal::fgt_make_edge( &p, &s );
3455 template<
typename T >
3457 internal_make_edge( p, s );
3460 #if __TBB_PREVIEW_ASYNC_MSG 3461 template<
typename TS,
typename TR,
3464 inline void make_edge( TS &p, TR &s ) {
3465 internal_make_edge( p, s );
3468 template<
typename T >
3470 internal_make_edge( p, s );
3473 template<
typename T >
3475 internal_make_edge( p, s );
3478 #endif // __TBB_PREVIEW_ASYNC_MSG 3480 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3482 template<
typename T,
typename V,
3483 typename =
typename T::output_ports_type,
typename =
typename V::input_ports_type >
3484 inline void make_edge( T& output, V& input) {
3485 make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3489 template<
typename T,
typename R,
3490 typename =
typename T::output_ports_type >
3491 inline void make_edge( T& output,
receiver<R>& input) {
3492 make_edge(get<0>(output.output_ports()), input);
3496 template<
typename S,
typename V,
3497 typename =
typename V::input_ports_type >
3498 inline void make_edge(
sender<S>& output, V& input) {
3499 make_edge(output, get<0>(input.input_ports()));
3503 #if __TBB_PREVIEW_ASYNC_MSG 3504 inline void internal_remove_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3506 template<
typename T >
3510 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 3512 p.internal_delete_built_successor(s);
3513 s.internal_delete_built_predecessor(p);
3515 tbb::internal::fgt_remove_edge( &p, &s );
3519 template<
typename T >
3521 internal_remove_edge( p, s );
3524 #if __TBB_PREVIEW_ASYNC_MSG 3525 template<
typename TS,
typename TR,
3528 inline void remove_edge( TS &p, TR &s ) {
3529 internal_remove_edge( p, s );
3532 template<
typename T >
3534 internal_remove_edge( p, s );
3537 template<
typename T >
3539 internal_remove_edge( p, s );
3541 #endif // __TBB_PREVIEW_ASYNC_MSG 3543 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3545 template<
typename T,
typename V,
3546 typename =
typename T::output_ports_type,
typename =
typename V::input_ports_type >
3547 inline void remove_edge( T& output, V& input) {
3548 remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3552 template<
typename T,
typename R,
3553 typename =
typename T::output_ports_type >
3554 inline void remove_edge( T& output,
receiver<R>& input) {
3555 remove_edge(get<0>(output.output_ports()), input);
3558 template<
typename S,
typename V,
3559 typename =
typename V::input_ports_type >
3560 inline void remove_edge(
sender<S>& output, V& input) {
3561 remove_edge(output, get<0>(input.input_ports()));
3565 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 3566 template<
typename C >
3567 template<
typename S >
3568 void internal::edge_container<C>::sender_extract( S &s ) {
3569 edge_list_type e = built_edges;
3570 for (
typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3571 remove_edge(s, **i);
3575 template<
typename C >
3576 template<
typename R >
3577 void internal::edge_container<C>::receiver_extract( R &r ) {
3578 edge_list_type e = built_edges;
3579 for (
typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3580 remove_edge(**i, r);
3586 template<
typename Body,
typename Node >
3587 Body copy_body( Node &n ) {
3588 return n.template copy_function_object<Body>();
3591 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3594 template<
typename InputTuple,
typename OutputTuple >
class composite_node;
3596 template<
typename... InputTypes,
typename... OutputTypes>
3597 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > :
public graph_node{
3600 typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3601 typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3604 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3605 const char *my_type_name;
3607 input_ports_type *my_input_ports;
3608 output_ports_type *my_output_ports;
3610 static const size_t NUM_INPUTS =
sizeof...(InputTypes);
3611 static const size_t NUM_OUTPUTS =
sizeof...(OutputTypes);
3614 void reset_node(reset_flags) {}
3617 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3618 composite_node( graph &g,
const char *type_name =
"composite_node") : graph_node(g), my_type_name(type_name), my_input_ports(NULL), my_output_ports(NULL) {
3619 tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW,
this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::internal::FLOW_COMPOSITE_NODE );
3620 tbb::internal::fgt_multiinput_multioutput_node_desc(
this, my_type_name );
3623 composite_node( graph &g) : graph_node(g), my_input_ports(NULL), my_output_ports(NULL) {}
3626 template<
typename T1,
typename T2>
3627 void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
3628 __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value,
"number of arguments does not match number of input ports");
3629 __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value,
"number of arguments does not match number of output ports");
3630 my_input_ports =
new input_ports_type(std::forward<T1>(input_ports_tuple));
3631 my_output_ports =
new output_ports_type(std::forward<T2>(output_ports_tuple));
3633 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3634 tbb::internal::fgt_internal_input_helper<T1, NUM_INPUTS>::register_port(
this, input_ports_tuple);
3635 tbb::internal::fgt_internal_output_helper<T2, NUM_OUTPUTS>::register_port(
this, output_ports_tuple);
3639 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3640 template<
typename... NodeTypes >
3641 void add_visible_nodes(
const NodeTypes&... n) { internal::add_nodes_impl(
this,
true, n...); }
3643 template<
typename... NodeTypes >
3644 void add_nodes(
const NodeTypes&... n) { internal::add_nodes_impl(
this,
false, n...); }
3646 template<
typename... Nodes>
void add_nodes(Nodes&...) { }
3647 template<
typename... Nodes>
void add_visible_nodes(Nodes&...) { }
3650 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3651 void set_name(
const char *name ) {
3652 tbb::internal::fgt_multiinput_multioutput_node_desc(
this, name );
3656 input_ports_type input_ports() {
3657 __TBB_ASSERT(my_input_ports,
"input ports not set, call set_external_ports to set input ports");
3658 return *my_input_ports;
3661 output_ports_type output_ports() {
3662 __TBB_ASSERT(my_output_ports,
"output ports not set, call set_external_ports to set output ports");
3663 return *my_output_ports;
3666 virtual ~composite_node() {
3667 if(my_input_ports)
delete my_input_ports;
3668 if(my_output_ports)
delete my_output_ports;
3671 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 3673 __TBB_ASSERT(
false,
"Current composite_node implementation does not support extract");
3679 template<
typename... InputTypes>
3680 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > :
public graph_node {
3682 typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3685 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3686 const char *my_type_name;
3688 input_ports_type *my_input_ports;
3689 static const size_t NUM_INPUTS =
sizeof...(InputTypes);
3692 void reset_node(reset_flags) {}
3695 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3696 composite_node( graph &g,
const char *type_name =
"composite_node") : graph_node(g), my_type_name(type_name), my_input_ports(NULL) {
3697 tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW,
this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::internal::FLOW_COMPOSITE_NODE );
3698 tbb::internal::fgt_multiinput_multioutput_node_desc(
this, my_type_name );
3701 composite_node( graph &g) : graph_node(g), my_input_ports(NULL) {}
3704 template<
typename T>
3705 void set_external_ports(T&& input_ports_tuple) {
3706 __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value,
"number of arguments does not match number of input ports");
3708 my_input_ports =
new input_ports_type(std::forward<T>(input_ports_tuple));
3710 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3711 tbb::internal::fgt_internal_input_helper<T, NUM_INPUTS>::register_port(
this, std::forward<T>(input_ports_tuple));
3715 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3716 template<
typename... NodeTypes >
3717 void add_visible_nodes(
const NodeTypes&... n) { internal::add_nodes_impl(
this,
true, n...); }
3719 template<
typename... NodeTypes >
3720 void add_nodes(
const NodeTypes&... n) { internal::add_nodes_impl(
this,
false, n...); }
3722 template<
typename... Nodes>
void add_nodes(Nodes&...) {}
3723 template<
typename... Nodes>
void add_visible_nodes(Nodes&...) {}
3726 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3727 void set_name(
const char *name ) {
3728 tbb::internal::fgt_multiinput_multioutput_node_desc(
this, name );
3732 input_ports_type input_ports() {
3733 __TBB_ASSERT(my_input_ports,
"input ports not set, call set_external_ports to set input ports");
3734 return *my_input_ports;
3737 virtual ~composite_node() {
3738 if(my_input_ports)
delete my_input_ports;
3741 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 3743 __TBB_ASSERT(
false,
"Current composite_node implementation does not support extract");
3750 template<
typename... OutputTypes>
3751 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > :
public graph_node {
3753 typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3756 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3757 const char *my_type_name;
3759 output_ports_type *my_output_ports;
3760 static const size_t NUM_OUTPUTS =
sizeof...(OutputTypes);
3763 void reset_node(reset_flags) {}
3766 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3767 composite_node( graph &g,
const char *type_name =
"composite_node") : graph_node(g), my_type_name(type_name), my_output_ports(NULL) {
3768 tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW,
this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::internal::FLOW_COMPOSITE_NODE );
3769 tbb::internal::fgt_multiinput_multioutput_node_desc(
this, my_type_name );
3772 composite_node( graph &g) : graph_node(g), my_output_ports(NULL) {}
3775 template<
typename T>
3776 void set_external_ports(T&& output_ports_tuple) {
3777 __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value,
"number of arguments does not match number of output ports");
3779 my_output_ports =
new output_ports_type(std::forward<T>(output_ports_tuple));
3781 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3782 tbb::internal::fgt_internal_output_helper<T, NUM_OUTPUTS>::register_port(
this, std::forward<T>(output_ports_tuple));
3786 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3787 template<
typename... NodeTypes >
3788 void add_visible_nodes(
const NodeTypes&... n) { internal::add_nodes_impl(
this,
true, n...); }
3790 template<
typename... NodeTypes >
3791 void add_nodes(
const NodeTypes&... n) { internal::add_nodes_impl(
this,
false, n...); }
3793 template<
typename... Nodes>
void add_nodes(Nodes&...) {}
3794 template<
typename... Nodes>
void add_visible_nodes(Nodes&...) {}
3797 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3798 void set_name(
const char *name ) {
3799 tbb::internal::fgt_multiinput_multioutput_node_desc(
this, name );
3803 output_ports_type output_ports() {
3804 __TBB_ASSERT(my_output_ports,
"output ports not set, call set_external_ports to set output ports");
3805 return *my_output_ports;
3808 virtual ~composite_node() {
3809 if(my_output_ports)
delete my_output_ports;
3812 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 3814 __TBB_ASSERT(
false,
"Current composite_node implementation does not support extract");
3820 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES 3822 #if __TBB_PREVIEW_ASYNC_NODE 3825 template <
typename Output >
3826 class async_gateway {
3828 typedef Output output_type;
3831 virtual bool async_try_put(
const output_type &i ) = 0;
3834 virtual void async_reserve() = 0;
3837 virtual void async_commit() = 0;
3839 virtual ~async_gateway() {}
3842 template<
typename Input,
typename Ports,
typename AsyncGateway,
typename Body>
3845 typedef AsyncGateway async_gateway_type;
3847 async_body(
const Body &body, async_gateway_type *gateway) : my_body(body), my_async_gateway(gateway) { }
3849 async_body(
const async_body &other) : my_body(other.my_body), my_async_gateway(other.my_async_gateway) { }
3851 void operator()(
const Input &v, Ports & ) {
3852 my_body(v, *my_async_gateway);
3855 Body get_body() {
return my_body; }
3857 void set_async_gateway(async_gateway_type *gateway) {
3858 my_async_gateway = gateway;
3863 async_gateway_type *my_async_gateway;
3869 template <
typename Input,
typename Output,
typename Policy = queueing,
typename Allocator=cache_aligned_allocator<Input> >
3870 class async_node :
public multifunction_node< Input, tuple< Output >, Policy, Allocator >,
public internal::async_gateway<Output>,
public sender< Output > {
3872 typedef multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type;
3876 typedef Output output_type;
3879 typedef internal::async_gateway< output_type > async_gateway_type;
3882 typedef typename internal::multifunction_input<Input, typename base_type::output_ports_type, Allocator> mfn_input_type;
3884 struct try_put_functor {
3885 typedef internal::multifunction_output<Output> output_port_type;
3886 output_port_type *port;
3887 const Output *value;
3889 try_put_functor(output_port_type &p,
const Output &v) : port(&p), value(&v), result(
false) { }
3891 result = port->try_put(*value);
3896 template<
typename Body>
3897 async_node( graph &g,
size_t concurrency, Body body ) :
3898 base_type( g, concurrency, internal::async_body<Input, typename base_type::output_ports_type, async_gateway_type, Body>(body,
this) ) {
3899 tbb::internal::fgt_multioutput_node<1>( tbb::internal::FLOW_ASYNC_NODE,
3900 &this->graph_node::my_graph,
3902 this->output_ports() );
3905 async_node(
const async_node &other ) : base_type(other) {
3907 mfn_body_type &body_ref = *this->my_body;
3908 body_ref.set_gateway(static_cast<async_gateway_type *>(
this));
3909 mfn_body_type &init_body_ref = *this->my_init_body;
3910 init_body_ref.set_gateway(static_cast<async_gateway_type *>(
this));
3911 tbb::internal::fgt_multioutput_node<1>( tbb::internal::FLOW_ASYNC_NODE, &this->graph_node::my_graph,
static_cast<receiver<input_type> *
>(
this), this->output_ports() );
3914 virtual ~async_node() {}
3916 async_gateway_type& async_gateway() {
3917 return static_cast< async_gateway_type&
>(*this);
3921 bool async_try_put(
const output_type &i ) {
3922 internal::multifunction_output<output_type> &port_0 = internal::output_port<0>(*this);
3923 graph &g = this->graph_node::my_graph;
3925 __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), NULL);
3926 try_put_functor tpf(port_0, i);
3927 g.my_task_arena->execute(tpf);
3932 void async_reserve() {
3933 this->graph_node::my_graph.increment_wait_count();
3934 tbb::internal::fgt_async_reserve(
static_cast<receiver<input_type> *
>(
this), &this->graph_node::my_graph);
3937 void async_commit() {
3938 this->graph_node::my_graph.decrement_wait_count();
3939 tbb::internal::fgt_async_commit(
static_cast<receiver<input_type> *
>(
this), &this->graph_node::my_graph);
3942 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3943 void set_name(
const char *name ) {
3944 tbb::internal::fgt_node_desc(
this, name );
3951 bool register_successor( successor_type &r ) {
3952 return internal::output_port<0>(*this).register_successor(r);
3956 bool remove_successor( successor_type &r ) {
3957 return internal::output_port<0>(*this).remove_successor(r);
3960 template<
typename Body>
3961 Body copy_function_object() {
3963 typedef internal::async_body<Input, typename base_type::output_ports_type, async_gateway_type, Body> async_body_type;
3964 mfn_body_type &body_ref = *this->my_body;
3966 return ab.get_body();
3969 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 3970 typedef typename internal::edge_container<successor_type> built_successors_type;
3972 typedef typename built_successors_type::edge_list_type successor_list_type;
3973 built_successors_type &built_successors() {
3974 return internal::output_port<0>(*this).built_successors();
3977 void internal_add_built_successor( successor_type &r ) {
3978 internal::output_port<0>(*this).internal_add_built_successor(r);
3981 void internal_delete_built_successor( successor_type &r ) {
3982 internal::output_port<0>(*this).internal_delete_built_successor(r);
3985 void copy_successors( successor_list_type &l ) {
3986 internal::output_port<0>(*this).copy_successors(l);
3989 size_t successor_count() {
3990 return internal::output_port<0>(*this).successor_count();
3996 void reset_node( reset_flags f) {
3997 base_type::reset_node(f);
4002 #endif // __TBB_PREVIEW_ASYNC_NODE 4006 using interface8::reset_flags;
4007 using interface8::rf_reset_protocol;
4008 using interface8::rf_reset_bodies;
4009 using interface8::rf_clear_edges;
4011 using interface8::graph;
4012 using interface8::graph_node;
4015 using interface8::source_node;
4016 using interface8::function_node;
4017 using interface8::multifunction_node;
4018 using interface8::split_node;
4019 using interface8::internal::output_port;
4020 using interface8::indexer_node;
4021 using interface8::internal::tagged_msg;
4022 using interface8::internal::cast_to;
4023 using interface8::internal::is_a;
4024 using interface8::continue_node;
4025 using interface8::overwrite_node;
4026 using interface8::write_once_node;
4027 using interface8::broadcast_node;
4028 using interface8::buffer_node;
4029 using interface8::queue_node;
4030 using interface8::sequencer_node;
4031 using interface8::priority_queue_node;
4033 using namespace interface8::internal::graph_policy_namespace;
4034 using interface8::join_node;
4035 using interface8::input_port;
4036 using interface8::copy_body;
4037 using interface8::make_edge;
4038 using interface8::remove_edge;
4039 using interface8::internal::tag_value;
4040 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 4041 using interface8::composite_node;
4043 #if __TBB_PREVIEW_ASYNC_NODE 4044 using interface8::async_node;
4046 #if __TBB_PREVIEW_ASYNC_MSG 4047 using interface8::async_msg;
4052 #undef __TBB_PFG_RESET_ARG 4055 #endif // __TBB_flow_graph_H Enables one or the other code branches.
Definition: _template_helpers.h:29
bool remove_predecessor(predecessor_type &)
Decrements the trigger threshold.
Definition: flow_graph.h:515
Definition: flow_graph.h:97
bool register_predecessor(predecessor_type &)
Increments the trigger threshold.
Definition: flow_graph.h:505
Definition: flow_graph.h:93
Definition: _aggregator_impl.h:160
bool try_put(const T &t)
Put an item to the receiver.
Definition: flow_graph.h:415
Detects whether two given types are the same.
Definition: _template_helpers.h:56
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:387
continue_msg input_type
The input type.
Definition: flow_graph.h:484
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
Definition: _flow_graph_join_impl.h:1497
The leaf for source_body.
Definition: _flow_graph_impl.h:69
item_buffer with reservable front-end. NOTE: if reserving, do not
Definition: _flow_graph_item_buffer_impl.h:244
Definition: flow_graph.h:92
Definition: flow_graph.h:107
Definition: _flow_graph_async_msg_impl.h:115
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:381
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial...
Definition: flow_graph.h:84
aggregated_operation base class
Definition: _aggregator_impl.h:37
Definition: _aggregator_impl.h:144
An empty class used for messages that mean "I'm done".
Definition: flow_graph.h:100
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:435
A task that calls a node's apply_body_bypass function with no input.
Definition: _flow_graph_impl.h:291
Definition: _flow_graph_async_msg_impl.h:35
sender< T > predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:409
continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:496
Definition: flow_graph.h:90
A task that calls a node's forward_task function.
Definition: _flow_graph_impl.h:255
virtual ~continue_receiver()
Destructor.
Definition: flow_graph.h:502
virtual ~receiver()
Destructor.
Definition: flow_graph.h:412
Definition: _flow_graph_async_msg_impl.h:30
Definition: _flow_graph_indexer_impl.h:469
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:378
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
T output_type
The output type of this sender.
Definition: flow_graph.h:362
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
Definition: _flow_graph_types_impl.h:602
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:103
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
Definition: flow_graph.h:89
The two-phase join port.
Definition: _flow_graph_join_impl.h:213
A functor that takes no input and generates a value of type Output.
Definition: _flow_graph_impl.h:60
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:384
receiver< T > successor_type
The successor type for this node.
Definition: flow_graph.h:365
Definition: _flow_graph_async_msg_impl.h:32
A lock that occupies a single byte.
Definition: spin_mutex.h:40
T input_type
The input type of this receiver.
Definition: flow_graph.h:406
A functor that takes an Input and generates an Output.
Definition: _flow_graph_impl.h:83
Definition: _flow_graph_impl.h:841
queueing join_port
Definition: _flow_graph_join_impl.h:427
Base class for receivers of completion messages.
Definition: flow_graph.h:480
Definition: _flow_graph_impl.h:36
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44
1-to-1 proxy representation class of scheduler's arena Constructors set up settings only...
Definition: task_arena.h:120
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:432
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: _flow_graph_impl.h:177
receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:487
Definition: _flow_graph_join_impl.h:648
Tag class used to indicate the "attaching" constructor.
Definition: task_arena.h:156
Definition: flow_graph.h:91
Pure virtual template class that defines a sender of messages of type T.
Definition: flow_graph.h:102
function_body that takes an Input and a set of output ports
Definition: _flow_graph_impl.h:165
continue_receiver(int number_of_predecessors=0)
Constructor.
Definition: flow_graph.h:490
Definition: flow_graph.h:106
Definition: _flow_graph_types_impl.h:53
the leaf for function_body
Definition: _flow_graph_impl.h:92