21 #ifndef __TBB__flow_graph_impl_H 22 #define __TBB__flow_graph_impl_H 24 #ifndef __TBB_flow_graph_H 25 #error Do not #include this internal file directly; use public TBB headers instead. 32 typedef tbb::internal::uint64_t tag_value;
36 namespace graph_policy_namespace {
45 template<typename K, typename KHash=tbb_hash_compare<typename strip<K>::type > >
49 typedef KHash hash_compare_type;
59 template<
typename Output >
63 virtual bool operator()(Output &output) = 0;
68 template<
typename Output,
typename Body>
72 bool operator()(Output &output) {
return body( output ); }
76 Body get_body() {
return body; }
82 template<
typename Input,
typename Output >
86 virtual Output operator()(
const Input &input) = 0;
91 template <
typename Input,
typename Output,
typename B>
95 Output operator()(
const Input &i) {
return body(i); }
96 B get_body() {
return body; }
105 template <
typename B>
109 continue_msg operator()(
const continue_msg &i ) {
113 B get_body() {
return body; }
122 template <
typename Input,
typename B>
126 continue_msg operator()(
const Input &i) {
128 return continue_msg();
130 B get_body() {
return body; }
139 template <
typename Output,
typename B>
143 Output operator()(
const continue_msg &i) {
146 B get_body() {
return body; }
154 #if __TBB_PREVIEW_ASYNC_NODE 155 template<
typename T,
typename =
typename T::async_gateway_type >
156 void set_async_gateway(T *body,
void *g) {
157 body->set_async_gateway(static_cast<typename T::async_gateway_type *>(g));
160 inline void set_async_gateway(...) { }
164 template<
typename Input,
typename OutputSet>
168 virtual void operator()(
const Input &, OutputSet &) = 0;
170 #if __TBB_PREVIEW_ASYNC_NODE 171 virtual void set_gateway(
void *gateway) = 0;
176 template<
typename Input,
typename OutputSet,
typename B >
180 void operator()(
const Input &input, OutputSet &oset) {
183 B get_body() {
return body; }
185 #if __TBB_PREVIEW_ASYNC_NODE 186 void set_gateway(
void *gateway) {
187 set_async_gateway(&body, gateway);
200 template<
typename Input,
typename Output>
204 virtual Output operator()(
const Input &input) = 0;
209 template<
typename Input,
typename Output>
213 virtual const Output & operator()(
const Input &input) = 0;
217 template <
typename Input,
typename Output,
typename B>
221 Output operator()(
const Input &i) {
return body(i); }
222 B get_body() {
return body; }
230 template <
typename Input,
typename Output,
typename B>
235 const Output& operator()(
const Input &i) {
239 B get_body() {
return body; }
254 template<
typename NodeType >
264 task * new_task = my_node.forward_task();
265 if (new_task == SUCCESSFULLY_ENQUEUED) new_task = NULL;
272 template<
typename NodeType,
typename Input >
283 task * next_task = my_node.apply_body_bypass( my_input );
284 if(next_task == SUCCESSFULLY_ENQUEUED) next_task = NULL;
290 template<
typename NodeType >
300 task *new_task = my_node.apply_body_bypass( );
301 if(new_task == SUCCESSFULLY_ENQUEUED)
return NULL;
309 template<
typename Input,
typename Output >
311 Output operator()(
const Input & )
const {
return Output(); }
315 template<
typename T,
typename M=spin_mutex >
319 typedef size_t size_type;
322 typename mutex_type::scoped_lock lock( my_mutex );
323 return internal_empty();
327 typename mutex_type::scoped_lock lock( my_mutex );
331 void remove( T &n ) {
332 typename mutex_type::scoped_lock lock( my_mutex );
333 for (
size_t i = internal_size(); i != 0; --i ) {
334 T &s = internal_pop();
335 if ( &s == &n )
return;
341 while( !my_q.empty()) (
void)my_q.pop();
342 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 343 my_built_predecessors.clear();
347 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 348 typedef edge_container<T> built_predecessors_type;
349 built_predecessors_type &built_predecessors() {
return my_built_predecessors; }
351 typedef typename edge_container<T>::edge_list_type predecessor_list_type;
352 void internal_add_built_predecessor( T &n ) {
353 typename mutex_type::scoped_lock lock( my_mutex );
354 my_built_predecessors.add_edge(n);
357 void internal_delete_built_predecessor( T &n ) {
358 typename mutex_type::scoped_lock lock( my_mutex );
359 my_built_predecessors.delete_edge(n);
362 void copy_predecessors( predecessor_list_type &v) {
363 typename mutex_type::scoped_lock lock( my_mutex );
364 my_built_predecessors.copy_edges(v);
367 size_t predecessor_count() {
368 typename mutex_type::scoped_lock lock(my_mutex);
369 return (
size_t)(my_built_predecessors.edge_count());
375 typedef M mutex_type;
377 std::queue< T * > my_q;
378 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 379 built_predecessors_type my_built_predecessors;
383 inline bool internal_empty( ) {
388 inline size_type internal_size( ) {
393 inline void internal_push( T &n ) {
398 inline T &internal_pop() {
407 template<
typename T,
typename M=spin_mutex >
408 #if __TBB_PREVIEW_ASYNC_MSG 413 #endif // __TBB_PREVIEW_ASYNC_MSG 415 typedef M mutex_type;
416 typedef T output_type;
417 #if __TBB_PREVIEW_ASYNC_MSG 418 typedef untyped_sender predecessor_type;
419 typedef untyped_receiver successor_type;
421 typedef sender<output_type> predecessor_type;
422 typedef receiver<output_type> successor_type;
423 #endif // __TBB_PREVIEW_ASYNC_MSG 427 void set_owner( successor_type *owner ) { my_owner = owner; }
429 bool get_item( output_type &v ) {
434 predecessor_type *src;
436 typename mutex_type::scoped_lock lock(this->my_mutex);
437 if ( this->internal_empty() ) {
440 src = &this->internal_pop();
444 msg = src->try_get( v );
449 src->register_successor( *my_owner );
454 }
while ( msg ==
false );
462 predecessor_type *src;
464 if (this->internal_empty())
break;
465 src = &this->internal_pop();
467 src->register_successor( *my_owner );
474 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 477 successor_type *my_owner;
482 template<
typename T,
typename M=spin_mutex >
485 typedef M mutex_type;
486 typedef T output_type;
487 #if __TBB_PREVIEW_ASYNC_MSG 488 typedef untyped_sender predecessor_type;
489 typedef untyped_receiver successor_type;
491 typedef sender<T> predecessor_type;
492 typedef receiver<T> successor_type;
493 #endif // __TBB_PREVIEW_ASYNC_MSG 498 try_reserve( output_type &v ) {
503 typename mutex_type::scoped_lock lock(this->my_mutex);
504 if ( reserved_src || this->internal_empty() )
507 reserved_src = &this->internal_pop();
511 msg = reserved_src->try_reserve( v );
514 typename mutex_type::scoped_lock lock(this->my_mutex);
516 reserved_src->register_successor( *this->my_owner );
520 this->add( *reserved_src );
522 }
while ( msg ==
false );
529 reserved_src->try_release( );
536 reserved_src->try_consume( );
552 predecessor_type *reserved_src;
558 template<
typename T,
typename M=spin_rw_mutex >
562 typedef M mutex_type;
565 #if __TBB_PREVIEW_ASYNC_MSG 566 typedef untyped_receiver successor_type;
567 typedef untyped_receiver *pointer_type;
568 typedef untyped_sender owner_type;
570 typedef receiver<T> successor_type;
571 typedef receiver<T> *pointer_type;
572 typedef sender<T> owner_type;
573 #endif // __TBB_PREVIEW_ASYNC_MSG 574 typedef std::list< pointer_type > successors_type;
575 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 576 edge_container<successor_type> my_built_successors;
578 successors_type my_successors;
580 owner_type *my_owner;
583 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 584 typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
586 edge_container<successor_type> &built_successors() {
return my_built_successors; }
588 void internal_add_built_successor( successor_type &r) {
589 typename mutex_type::scoped_lock l(my_mutex,
true);
590 my_built_successors.add_edge( r );
593 void internal_delete_built_successor( successor_type &r) {
594 typename mutex_type::scoped_lock l(my_mutex,
true);
595 my_built_successors.delete_edge(r);
598 void copy_successors( successor_list_type &v) {
599 typename mutex_type::scoped_lock l(my_mutex,
false);
600 my_built_successors.copy_edges(v);
603 size_t successor_count() {
604 typename mutex_type::scoped_lock l(my_mutex,
false);
605 return my_built_successors.edge_count();
612 void set_owner( owner_type *owner ) { my_owner = owner; }
616 void register_successor( successor_type &r ) {
617 typename mutex_type::scoped_lock l(my_mutex,
true);
618 my_successors.push_back( &r );
621 void remove_successor( successor_type &r ) {
622 typename mutex_type::scoped_lock l(my_mutex,
true);
623 for (
typename successors_type::iterator i = my_successors.begin();
624 i != my_successors.end(); ++i ) {
626 my_successors.erase(i);
633 typename mutex_type::scoped_lock l(my_mutex,
false);
634 return my_successors.empty();
638 my_successors.clear();
639 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 640 my_built_successors.clear();
644 #if !__TBB_PREVIEW_ASYNC_MSG 645 virtual task * try_put_task(
const T &t ) = 0;
646 #endif // __TBB_PREVIEW_ASYNC_MSG 654 typedef spin_rw_mutex mutex_type;
657 #if __TBB_PREVIEW_ASYNC_MSG 658 typedef untyped_receiver successor_type;
659 typedef untyped_receiver *pointer_type;
661 typedef receiver<continue_msg> successor_type;
662 typedef receiver<continue_msg> *pointer_type;
663 #endif // __TBB_PREVIEW_ASYNC_MSG 664 typedef std::list< pointer_type > successors_type;
665 successors_type my_successors;
666 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 667 edge_container<successor_type> my_built_successors;
668 typedef edge_container<successor_type>::edge_list_type successor_list_type;
671 sender<continue_msg> *my_owner;
675 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 677 edge_container<successor_type> &built_successors() {
return my_built_successors; }
679 void internal_add_built_successor( successor_type &r) {
680 mutex_type::scoped_lock l(my_mutex,
true);
681 my_built_successors.add_edge( r );
684 void internal_delete_built_successor( successor_type &r) {
685 mutex_type::scoped_lock l(my_mutex,
true);
686 my_built_successors.delete_edge(r);
689 void copy_successors( successor_list_type &v) {
690 mutex_type::scoped_lock l(my_mutex,
false);
691 my_built_successors.copy_edges(v);
694 size_t successor_count() {
695 mutex_type::scoped_lock l(my_mutex,
false);
696 return my_built_successors.edge_count();
703 void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
707 void register_successor( successor_type &r ) {
708 mutex_type::scoped_lock l(my_mutex,
true);
709 my_successors.push_back( &r );
710 if ( my_owner && r.is_continue_receiver() ) {
711 r.register_predecessor( *my_owner );
715 void remove_successor( successor_type &r ) {
716 mutex_type::scoped_lock l(my_mutex,
true);
717 for ( successors_type::iterator i = my_successors.begin();
718 i != my_successors.end(); ++i ) {
723 r.remove_predecessor( *my_owner );
724 my_successors.erase(i);
731 mutex_type::scoped_lock l(my_mutex,
false);
732 return my_successors.empty();
736 my_successors.clear();
737 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 738 my_built_successors.clear();
742 #if !__TBB_PREVIEW_ASYNC_MSG 743 virtual task * try_put_task(
const continue_msg &t ) = 0;
744 #endif // __TBB_PREVIEW_ASYNC_MSG 750 template<
typename T,
typename M=spin_rw_mutex>
752 typedef M mutex_type;
753 typedef typename successor_cache<T,M>::successors_type successors_type;
760 #if __TBB_PREVIEW_ASYNC_MSG 762 task * try_put_task(
const X &t ) {
764 task * try_put_task(
const T &t ) {
765 #endif // __TBB_PREVIEW_ASYNC_MSG 766 task * last_task = NULL;
767 bool upgraded =
true;
768 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
769 typename successors_type::iterator i = this->my_successors.begin();
770 while ( i != this->my_successors.end() ) {
771 task *new_task = (*i)->try_put_task(t);
772 last_task = combine_tasks(last_task, new_task);
777 if ( (*i)->register_predecessor(*this->my_owner) ) {
779 l.upgrade_to_writer();
782 i = this->my_successors.erase(i);
795 template<
typename T,
typename M=spin_rw_mutex >
797 typedef size_t size_type;
798 typedef M mutex_type;
799 typedef typename successor_cache<T,M>::successors_type successors_type;
806 typename mutex_type::scoped_lock l(this->my_mutex,
false);
807 return this->my_successors.size();
810 #if __TBB_PREVIEW_ASYNC_MSG 812 task * try_put_task(
const X &t ) {
814 task *try_put_task(
const T &t ) {
815 #endif // __TBB_PREVIEW_ASYNC_MSG 816 bool upgraded =
true;
817 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
818 typename successors_type::iterator i = this->my_successors.begin();
819 while ( i != this->my_successors.end() ) {
820 task *new_task = (*i)->try_put_task(t);
824 if ( (*i)->register_predecessor(*this->my_owner) ) {
826 l.upgrade_to_writer();
829 i = this->my_successors.erase(i);
841 class decrementer :
public continue_receiver, tbb::internal::no_copy {
846 return my_node->decrement_counter();
851 typedef continue_msg input_type;
852 typedef continue_msg output_type;
853 decrementer(
int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
854 void set_owner( T *node ) { my_node = node; }
859 #endif // __TBB__flow_graph_impl_H The leaf for source_body.
Definition: _flow_graph_impl.h:69
Definition: _flow_graph_impl.h:210
A task that calls a node's apply_body_bypass function with no input.
Definition: _flow_graph_impl.h:291
the leaf for function_body specialized for Input and output of continue_msg
Definition: _flow_graph_impl.h:106
Definition: _flow_graph_impl.h:40
Definition: _flow_graph_impl.h:218
A cache of successors that are put in a round-robin fashion.
Definition: _flow_graph_impl.h:796
A task that calls a node's apply_body_bypass function, passing in an input of type Input...
Definition: _flow_graph_impl.h:273
A task that calls a node's forward_task function.
Definition: _flow_graph_impl.h:255
An empty functor that takes an Input and returns a default constructed Output.
Definition: _flow_graph_impl.h:310
An cache of predecessors that supports requests and reservations.
Definition: _flow_graph_impl.h:483
Definition: _flow_graph_impl.h:231
A cache of successors that are broadcast to.
Definition: _flow_graph_impl.h:751
A cache of predecessors that only supports try_get.
Definition: _flow_graph_impl.h:412
Definition: _flow_graph_impl.h:38
A functor that takes no input and generates a value of type Output.
Definition: _flow_graph_impl.h:60
the leaf for function_body specialized for Input of continue_msg
Definition: _flow_graph_impl.h:140
An abstract cache of successors.
Definition: _flow_graph_impl.h:559
Definition: _flow_graph_async_msg_impl.h:32
A functor that takes an Input and generates an Output.
Definition: _flow_graph_impl.h:83
Definition: _flow_graph_impl.h:841
Definition: _flow_graph_impl.h:39
field of type K being used for matching.
Definition: _flow_graph_impl.h:46
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: _flow_graph_impl.h:177
the leaf for function_body specialized for Output of continue_msg
Definition: _flow_graph_impl.h:123
Definition: _flow_graph_impl.h:201
function_body that takes an Input and a set of output ports
Definition: _flow_graph_impl.h:165
the leaf for function_body
Definition: _flow_graph_impl.h:92
Strips its template type argument from cv- and ref-qualifiers.
Definition: _template_helpers.h:33
A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock...
Definition: _flow_graph_impl.h:316