21 #ifndef __TBB__flow_graph_node_impl_H 22 #define __TBB__flow_graph_node_impl_H 24 #ifndef __TBB_flow_graph_H 25 #error Do not #include this internal file directly; use public TBB headers instead. 28 #include "_flow_graph_item_buffer_impl.h" 37 template<
typename T,
typename A >
38 class function_input_queue :
public item_buffer<T,A> {
41 return this->pop_front( t );
45 return this->push_back( t );
52 template<
typename Input,
typename A,
typename ImplType >
53 class function_input_base :
public receiver<Input>, tbb::internal::no_assign {
54 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
55 enum op_type {reg_pred, rem_pred, app_body, try_fwd, tryput_bypass, app_body_bypass
56 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 57 , add_blt_pred, del_blt_pred,
58 blt_pred_cnt, blt_pred_cpy
61 typedef function_input_base<Input, A, ImplType> class_type;
66 typedef Input input_type;
67 typedef typename receiver<input_type>::predecessor_type predecessor_type;
68 typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
69 typedef function_input_queue<input_type, A> input_queue_type;
70 typedef typename A::template rebind< input_queue_type >::other queue_allocator_type;
72 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 73 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
74 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
78 function_input_base( graph &g,
size_t max_concurrency, input_queue_type *q = NULL)
79 : my_graph(g), my_max_concurrency(max_concurrency), my_concurrency(0),
80 my_queue(q), forwarder_busy(false) {
81 my_predecessors.set_owner(
this);
82 my_aggregator.initialize_handler(handler_type(
this));
86 function_input_base(
const function_input_base& src, input_queue_type *q = NULL) :
88 my_graph(src.my_graph), my_max_concurrency(src.my_max_concurrency),
89 my_concurrency(0), my_queue(q), forwarder_busy(false)
91 my_predecessors.set_owner(
this);
92 my_aggregator.initialize_handler(handler_type(
this));
99 virtual ~function_input_base() {
100 if ( my_queue )
delete my_queue;
104 virtual task * try_put_task(
const input_type &t ) {
105 if ( my_max_concurrency == 0 ) {
106 return create_body_task( t );
108 operation_type op_data(t, tryput_bypass);
109 my_aggregator.execute(&op_data);
110 if(op_data.status == SUCCEEDED ) {
111 return op_data.bypass_t;
118 bool register_predecessor( predecessor_type &src ) {
119 operation_type op_data(reg_pred);
121 my_aggregator.execute(&op_data);
126 bool remove_predecessor( predecessor_type &src ) {
127 operation_type op_data(rem_pred);
129 my_aggregator.execute(&op_data);
133 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 134 void internal_add_built_predecessor( predecessor_type &src) {
136 operation_type op_data(add_blt_pred);
138 my_aggregator.execute(&op_data);
142 void internal_delete_built_predecessor( predecessor_type &src) {
143 operation_type op_data(del_blt_pred);
145 my_aggregator.execute(&op_data);
148 size_t predecessor_count() {
149 operation_type op_data(blt_pred_cnt);
150 my_aggregator.execute(&op_data);
151 return op_data.cnt_val;
154 void copy_predecessors(predecessor_list_type &v) {
155 operation_type op_data(blt_pred_cpy);
157 my_aggregator.execute(&op_data);
160 built_predecessors_type &built_predecessors() {
161 return my_predecessors.built_predecessors();
167 void reset_function_input_base( reset_flags f) {
173 forwarder_busy =
false;
177 const size_t my_max_concurrency;
178 size_t my_concurrency;
179 input_queue_type *my_queue;
180 predecessor_cache<input_type, null_mutex > my_predecessors;
182 void reset_receiver( reset_flags f) {
183 if( f & rf_clear_edges) my_predecessors.clear();
185 my_predecessors.reset();
186 __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(),
"function_input_base reset failed");
191 friend class apply_body_task_bypass< class_type, input_type >;
192 friend class forward_task_bypass< class_type >;
194 class operation_type :
public aggregated_operation< operation_type > {
200 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 202 predecessor_list_type *predv;
206 operation_type(
const input_type& e, op_type t) :
207 type(char(t)), elem(const_cast<input_type*>(&e)) {}
208 operation_type(op_type t) : type(char(t)), r(NULL) {}
212 typedef internal::aggregating_functor<class_type, operation_type> handler_type;
213 friend class internal::aggregating_functor<class_type, operation_type>;
214 aggregator< handler_type, operation_type > my_aggregator;
216 void handle_operations(operation_type *op_list) {
220 op_list = op_list->next;
223 my_predecessors.add(*(tmp->r));
224 __TBB_store_with_release(tmp->status, SUCCEEDED);
225 if (!forwarder_busy) {
226 forwarder_busy =
true;
227 spawn_forward_task();
231 my_predecessors.remove(*(tmp->r));
232 __TBB_store_with_release(tmp->status, SUCCEEDED);
235 __TBB_ASSERT(my_max_concurrency != 0, NULL);
237 __TBB_store_with_release(tmp->status, SUCCEEDED);
238 if (my_concurrency<my_max_concurrency) {
240 bool item_was_retrieved =
false;
242 item_was_retrieved = my_queue->pop(i);
244 item_was_retrieved = my_predecessors.get_item(i);
245 if (item_was_retrieved) {
251 case app_body_bypass: {
252 task * new_task = NULL;
253 __TBB_ASSERT(my_max_concurrency != 0, NULL);
255 if (my_concurrency<my_max_concurrency) {
257 bool item_was_retrieved =
false;
259 item_was_retrieved = my_queue->pop(i);
261 item_was_retrieved = my_predecessors.get_item(i);
262 if (item_was_retrieved) {
264 new_task = create_body_task(i);
267 tmp->bypass_t = new_task;
268 __TBB_store_with_release(tmp->status, SUCCEEDED);
271 case tryput_bypass: internal_try_put_task(tmp);
break;
272 case try_fwd: internal_forward(tmp);
break;
273 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 275 my_predecessors.internal_add_built_predecessor(*(tmp->r));
276 __TBB_store_with_release(tmp->status, SUCCEEDED);
280 my_predecessors.internal_delete_built_predecessor(*(tmp->r));
281 __TBB_store_with_release(tmp->status, SUCCEEDED);
284 tmp->cnt_val = my_predecessors.predecessor_count();
285 __TBB_store_with_release(tmp->status, SUCCEEDED);
288 my_predecessors.copy_predecessors( *(tmp->predv) );
289 __TBB_store_with_release(tmp->status, SUCCEEDED);
297 void internal_try_put_task(operation_type *op) {
298 __TBB_ASSERT(my_max_concurrency != 0, NULL);
299 if (my_concurrency < my_max_concurrency) {
301 task * new_task = create_body_task(*(op->elem));
302 op->bypass_t = new_task;
303 __TBB_store_with_release(op->status, SUCCEEDED);
304 }
else if ( my_queue && my_queue->push(*(op->elem)) ) {
305 op->bypass_t = SUCCESSFULLY_ENQUEUED;
306 __TBB_store_with_release(op->status, SUCCEEDED);
309 __TBB_store_with_release(op->status, FAILED);
314 void internal_forward(operation_type *op) {
316 if (my_concurrency<my_max_concurrency || !my_max_concurrency) {
318 bool item_was_retrieved =
false;
320 item_was_retrieved = my_queue->pop(i);
322 item_was_retrieved = my_predecessors.get_item(i);
323 if (item_was_retrieved) {
325 op->bypass_t = create_body_task(i);
326 __TBB_store_with_release(op->status, SUCCEEDED);
330 __TBB_store_with_release(op->status, FAILED);
331 forwarder_busy =
false;
336 task * apply_body_bypass( input_type &i ) {
337 task * new_task =
static_cast<ImplType *
>(
this)->apply_body_impl_bypass(i);
338 if ( my_max_concurrency != 0 ) {
339 operation_type op_data(app_body_bypass);
340 my_aggregator.execute(&op_data);
341 tbb::task *ttask = op_data.bypass_t;
342 new_task = combine_tasks(new_task, ttask);
348 inline task * create_body_task(
const input_type &input ) {
350 return (my_graph.is_active()) ?
351 new(task::allocate_additional_child_of(*(my_graph.root_task())))
352 apply_body_task_bypass < class_type, input_type >(*
this, input) :
357 inline void spawn_body_task(
const input_type &input ) {
358 task* tp = create_body_task(input);
366 task *forward_task() {
367 operation_type op_data(try_fwd);
370 op_data.status = WAIT;
371 my_aggregator.execute(&op_data);
372 if(op_data.status == SUCCEEDED) {
373 tbb::task *ttask = op_data.bypass_t;
374 rval = combine_tasks(rval, ttask);
376 }
while (op_data.status == SUCCEEDED);
380 inline task *create_forward_task() {
381 return (my_graph.is_active()) ?
382 new(task::allocate_additional_child_of(*(my_graph.root_task()))) forward_task_bypass< class_type >(*
this) :
387 inline void spawn_forward_task() {
388 task* tp = create_forward_task();
397 template<
typename Input,
typename Output,
typename A>
398 class function_input :
public function_input_base<Input, A, function_input<Input,Output,A> > {
400 typedef Input input_type;
401 typedef Output output_type;
402 typedef function_body<input_type, output_type> function_body_type;
403 typedef function_input<Input,Output,A> my_class;
404 typedef function_input_base<Input, A, my_class> base_type;
405 typedef function_input_queue<input_type, A> input_queue_type;
408 template<
typename Body>
409 function_input( graph &g,
size_t max_concurrency, Body& body, input_queue_type *q = NULL ) :
410 base_type(g, max_concurrency, q),
411 my_body( new
internal::function_body_leaf< input_type, output_type, Body>(body) ),
412 my_init_body( new
internal::function_body_leaf< input_type, output_type, Body>(body) ) {
416 function_input(
const function_input& src, input_queue_type *q = NULL ) :
418 my_body( src.my_init_body->clone() ),
419 my_init_body(src.my_init_body->clone() ) {
427 template<
typename Body >
428 Body copy_function_object() {
429 function_body_type &body_ref = *this->my_body;
433 task * apply_body_impl_bypass(
const input_type &i) {
434 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 437 tbb::internal::fgt_begin_body( my_body );
438 output_type v = (*my_body)(i);
439 tbb::internal::fgt_end_body( my_body );
440 task * new_task = successors().try_put_task( v );
442 task * new_task = successors().try_put_task( (*my_body)(i) );
449 void reset_function_input(reset_flags f) {
450 base_type::reset_function_input_base(f);
451 if(f & rf_reset_bodies) {
452 function_body_type *tmp = my_init_body->clone();
458 function_body_type *my_body;
459 function_body_type *my_init_body;
460 virtual broadcast_cache<output_type > &successors() = 0;
466 template<
int N>
struct clear_element {
467 template<
typename P>
static void clear_this(P &p) {
468 (void)tbb::flow::get<N-1>(p).successors().clear();
469 clear_element<N-1>::clear_this(p);
471 template<
typename P>
static bool this_empty(P &p) {
472 if(tbb::flow::get<N-1>(p).successors().empty())
473 return clear_element<N-1>::this_empty(p);
478 template<>
struct clear_element<1> {
479 template<
typename P>
static void clear_this(P &p) {
480 (void)tbb::flow::get<0>(p).successors().clear();
482 template<
typename P>
static bool this_empty(P &p) {
483 return tbb::flow::get<0>(p).successors().empty();
487 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 489 template<
int N>
struct extract_element {
490 template<
typename P>
static void extract_this(P &p) {
491 (void)tbb::flow::get<N-1>(p).successors().built_successors().sender_extract(tbb::flow::get<N-1>(p));
492 extract_element<N-1>::extract_this(p);
496 template<>
struct extract_element<1> {
497 template<
typename P>
static void extract_this(P &p) {
498 (void)tbb::flow::get<0>(p).successors().built_successors().sender_extract(tbb::flow::get<0>(p));
505 template<
typename Input,
typename OutputPortSet,
typename A>
506 class multifunction_input :
public function_input_base<Input, A, multifunction_input<Input,OutputPortSet,A> > {
508 static const int N = tbb::flow::tuple_size<OutputPortSet>::value;
509 typedef Input input_type;
510 typedef OutputPortSet output_ports_type;
511 typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
512 typedef multifunction_input<Input,OutputPortSet,A> my_class;
513 typedef function_input_base<Input, A, my_class> base_type;
514 typedef function_input_queue<input_type, A> input_queue_type;
517 template<
typename Body>
520 size_t max_concurrency,
522 input_queue_type *q = NULL ) :
523 base_type(g, max_concurrency, q),
524 my_body( new
internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) ),
525 my_init_body( new
internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) {
529 multifunction_input(
const multifunction_input& src, input_queue_type *q = NULL ) :
531 my_body( src.my_init_body->clone() ),
532 my_init_body(src.my_init_body->clone() ) {
535 ~multifunction_input() {
540 template<
typename Body >
541 Body copy_function_object() {
542 multifunction_body_type &body_ref = *this->my_body;
548 task * apply_body_impl_bypass(
const input_type &i) {
549 tbb::internal::fgt_begin_body( my_body );
550 (*my_body)(i, my_output_ports);
551 tbb::internal::fgt_end_body( my_body );
552 task * new_task = SUCCESSFULLY_ENQUEUED;
556 output_ports_type &output_ports(){
return my_output_ports; }
559 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 561 extract_element<N>::extract_this(my_output_ports);
565 void reset(reset_flags f) {
566 base_type::reset_function_input_base(f);
567 if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
568 if(f & rf_reset_bodies) {
569 multifunction_body_type *tmp = my_init_body->clone();
573 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports),
"multifunction_node reset failed");
576 multifunction_body_type *my_body;
577 multifunction_body_type *my_init_body;
578 output_ports_type my_output_ports;
583 template<
size_t N,
typename MOP>
584 typename tbb::flow::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
585 return tbb::flow::get<N>(op.output_ports());
590 struct emit_element {
591 template<
typename T,
typename P>
592 static void emit_this(
const T &t, P &p) {
593 (void)tbb::flow::get<N-1>(p).try_put(tbb::flow::get<N-1>(t));
594 emit_element<N-1>::emit_this(t,p);
599 struct emit_element<1> {
600 template<
typename T,
typename P>
601 static void emit_this(
const T &t, P &p) {
602 (void)tbb::flow::get<0>(p).try_put(tbb::flow::get<0>(t));
607 template<
typename Output >
608 class continue_input :
public continue_receiver {
612 typedef continue_msg input_type;
615 typedef Output output_type;
616 typedef function_body<input_type, output_type> function_body_type;
618 template<
typename Body >
619 continue_input( graph &g, Body& body )
621 my_body( new
internal::function_body_leaf< input_type, output_type, Body>(body) ),
622 my_init_body( new
internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
624 template<
typename Body >
625 continue_input( graph &g,
int number_of_predecessors, Body& body )
626 : continue_receiver( number_of_predecessors ), my_graph_ptr(&g),
627 my_body( new
internal::function_body_leaf< input_type, output_type, Body>(body) ),
628 my_init_body( new
internal::function_body_leaf< input_type, output_type, Body>(body) )
631 continue_input(
const continue_input& src ) : continue_receiver(src),
632 my_graph_ptr(src.my_graph_ptr),
633 my_body( src.my_init_body->clone() ),
634 my_init_body( src.my_init_body->clone() ) {}
641 template<
typename Body >
642 Body copy_function_object() {
643 function_body_type &body_ref = *my_body;
647 void reset_receiver( reset_flags f) {
648 continue_receiver::reset_receiver(f);
649 if(f & rf_reset_bodies) {
650 function_body_type *tmp = my_init_body->clone();
659 function_body_type *my_body;
660 function_body_type *my_init_body;
662 virtual broadcast_cache<output_type > &successors() = 0;
664 friend class apply_body_task_bypass< continue_input< Output >, continue_msg >;
667 task *apply_body_bypass( input_type ) {
668 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 671 tbb::internal::fgt_begin_body( my_body );
672 output_type v = (*my_body)( continue_msg() );
673 tbb::internal::fgt_end_body( my_body );
674 return successors().try_put_task( v );
676 return successors().try_put_task( (*my_body)( continue_msg() ) );
682 return (my_graph_ptr->is_active()) ?
683 new ( task::allocate_additional_child_of( *(my_graph_ptr->root_task()) ) )
684 apply_body_task_bypass< continue_input< Output >, continue_msg >( *
this, continue_msg() ) :
691 template<
typename Output >
692 class function_output :
public sender<Output> {
695 template<
int N>
friend struct clear_element;
696 typedef Output output_type;
697 typedef typename sender<output_type>::successor_type successor_type;
698 typedef broadcast_cache<output_type> broadcast_cache_type;
699 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 700 typedef typename sender<output_type>::built_successors_type built_successors_type;
701 typedef typename sender<output_type>::successor_list_type successor_list_type;
704 function_output() { my_successors.set_owner(
this); }
705 function_output(
const function_output & ) : sender<output_type>() {
706 my_successors.set_owner(
this);
710 bool register_successor( successor_type &r ) {
711 successors().register_successor( r );
716 bool remove_successor( successor_type &r ) {
717 successors().remove_successor( r );
721 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 722 built_successors_type &built_successors() {
return successors().built_successors(); }
725 void internal_add_built_successor( successor_type &r) {
726 successors().internal_add_built_successor( r );
729 void internal_delete_built_successor( successor_type &r) {
730 successors().internal_delete_built_successor( r );
733 size_t successor_count() {
734 return successors().successor_count();
737 void copy_successors( successor_list_type &v) {
738 successors().copy_successors(v);
750 task *try_put_task(
const output_type &i) {
return my_successors.try_put_task(i); }
752 broadcast_cache_type &successors() {
return my_successors; }
754 broadcast_cache_type my_successors;
758 template<
typename Output >
759 class multifunction_output :
public function_output<Output> {
761 typedef Output output_type;
762 typedef function_output<output_type> base_type;
763 using base_type::my_successors;
765 multifunction_output() : base_type() {my_successors.set_owner(
this);}
766 multifunction_output(
const multifunction_output &) : base_type() { my_successors.set_owner(
this); }
768 bool try_put(
const output_type &i) {
769 task *res = my_successors.try_put_task(i);
770 if(!res)
return false;
771 if(res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
777 #if TBB_PREVIEW_FLOW_GRAPH_TRACE && __TBB_FLOW_GRAPH_CPP11_FEATURES 778 template<
typename CompositeType>
779 void add_nodes_impl(CompositeType*,
bool) {}
781 template<
typename CompositeType,
typename NodeType1,
typename... NodeTypes >
782 void add_nodes_impl(CompositeType *c_node,
bool visible,
const NodeType1& n1,
const NodeTypes&... n) {
783 void *addr =
const_cast<NodeType1 *
>(&n1);
786 tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_FLOW, c_node, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_parent_of, addr, tbb::internal::FLOW_NODE );
788 tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_child_of, c_node, tbb::internal::FLOW_NODE );
789 add_nodes_impl(c_node, visible, n...);
795 #endif // __TBB__flow_graph_node_impl_H Definition: _aggregator_impl.h:160
aggregated_operation base class
Definition: _aggregator_impl.h:37
Definition: _aggregator_impl.h:144
Definition: _flow_graph_async_msg_impl.h:32
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: _flow_graph_impl.h:177
the leaf for function_body
Definition: _flow_graph_impl.h:92