21 #ifndef __TBB__flow_graph_join_impl_H 22 #define __TBB__flow_graph_join_impl_H 24 #ifndef __TBB_flow_graph_H 25 #error Do not #include this internal file directly; use public TBB headers instead. 35 virtual task * decrement_port_count(
bool handle_task) = 0;
36 virtual void increment_port_count() = 0;
43 template<
typename KeyType>
45 typedef typename tbb::internal::strip<KeyType>::type current_key_type;
47 virtual task * increment_key_count(current_key_type
const & ,
bool ) = 0;
48 current_key_type current_key;
54 template<
typename TupleType,
typename PortType >
55 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
56 tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
59 template<
typename TupleType >
60 static inline void consume_reservations( TupleType &my_input ) {
61 tbb::flow::get<N-1>( my_input ).consume();
65 template<
typename TupleType >
66 static inline void release_my_reservation( TupleType &my_input ) {
67 tbb::flow::get<N-1>( my_input ).release();
70 template <
typename TupleType>
71 static inline void release_reservations( TupleType &my_input) {
73 release_my_reservation(my_input);
76 template<
typename InputTuple,
typename OutputTuple >
77 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
78 if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) )
return false;
80 release_my_reservation( my_input );
86 template<
typename InputTuple,
typename OutputTuple>
87 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
88 bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) );
92 template<
typename InputTuple,
typename OutputTuple>
93 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
94 return get_my_item(my_input, out);
97 template<
typename InputTuple>
98 static inline void reset_my_port(InputTuple &my_input) {
100 tbb::flow::get<N-1>(my_input).reset_port();
103 template<
typename InputTuple>
104 static inline void reset_ports(InputTuple& my_input) {
105 reset_my_port(my_input);
108 template<
typename InputTuple,
typename KeyFuncTuple>
109 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
110 tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
111 tbb::flow::get<N-1>(my_key_funcs) = NULL;
115 template<
typename KeyFuncTuple>
116 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
117 if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
118 tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
123 template<
typename InputTuple>
124 static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
126 tbb::flow::get<N-1>(my_input).reset_receiver(f);
129 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 130 template<
typename InputTuple>
131 static inline void extract_inputs(InputTuple &my_input) {
133 tbb::flow::get<N-1>(my_input).extract_receiver();
141 template<
typename TupleType,
typename PortType >
142 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
143 tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
146 template<
typename TupleType >
147 static inline void consume_reservations( TupleType &my_input ) {
148 tbb::flow::get<0>( my_input ).consume();
151 template<
typename TupleType >
152 static inline void release_my_reservation( TupleType &my_input ) {
153 tbb::flow::get<0>( my_input ).release();
156 template<
typename TupleType>
157 static inline void release_reservations( TupleType &my_input) {
158 release_my_reservation(my_input);
161 template<
typename InputTuple,
typename OutputTuple >
162 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
163 return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
166 template<
typename InputTuple,
typename OutputTuple>
167 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
168 return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
171 template<
typename InputTuple,
typename OutputTuple>
172 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
173 return get_my_item(my_input, out);
176 template<
typename InputTuple>
177 static inline void reset_my_port(InputTuple &my_input) {
178 tbb::flow::get<0>(my_input).reset_port();
181 template<
typename InputTuple>
182 static inline void reset_ports(InputTuple& my_input) {
183 reset_my_port(my_input);
186 template<
typename InputTuple,
typename KeyFuncTuple>
187 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
188 tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
189 tbb::flow::get<0>(my_key_funcs) = NULL;
192 template<
typename KeyFuncTuple>
193 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
194 if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
195 tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
198 template<
typename InputTuple>
199 static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
200 tbb::flow::get<0>(my_input).reset_receiver(f);
203 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 204 template<
typename InputTuple>
205 static inline void extract_inputs(InputTuple &my_input) {
206 tbb::flow::get<0>(my_input).extract_receiver();
212 template<
typename T >
215 typedef T input_type;
216 typedef typename receiver<input_type>::predecessor_type predecessor_type;
217 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 218 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
219 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
223 enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
224 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 225 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
228 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
231 class reserving_port_operation :
public aggregated_operation<reserving_port_operation> {
236 predecessor_type *my_pred;
237 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 239 predecessor_list_type *plist;
242 reserving_port_operation(
const T& e, op_type t) :
243 type(
char(t)), my_arg(const_cast<T*>(&e)) {}
244 reserving_port_operation(
const predecessor_type &s, op_type t) : type(
char(t)),
245 my_pred(const_cast<predecessor_type *>(&s)) {}
246 reserving_port_operation(op_type t) : type(
char(t)) {}
249 typedef internal::aggregating_functor<class_type, reserving_port_operation> handler_type;
250 friend class internal::aggregating_functor<class_type, reserving_port_operation>;
251 aggregator<handler_type, reserving_port_operation> my_aggregator;
253 void handle_operations(reserving_port_operation* op_list) {
254 reserving_port_operation *current;
255 bool no_predecessors;
258 op_list = op_list->next;
259 switch(current->type) {
261 no_predecessors = my_predecessors.empty();
262 my_predecessors.add(*(current->my_pred));
263 if ( no_predecessors ) {
264 (void) my_join->decrement_port_count(
true);
266 __TBB_store_with_release(current->status, SUCCEEDED);
269 my_predecessors.remove(*(current->my_pred));
270 if(my_predecessors.empty()) my_join->increment_port_count();
271 __TBB_store_with_release(current->status, SUCCEEDED);
275 __TBB_store_with_release(current->status, FAILED);
277 else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
279 __TBB_store_with_release(current->status, SUCCEEDED);
281 if ( my_predecessors.empty() ) {
282 my_join->increment_port_count();
284 __TBB_store_with_release(current->status, FAILED);
289 my_predecessors.try_release( );
290 __TBB_store_with_release(current->status, SUCCEEDED);
294 my_predecessors.try_consume( );
295 __TBB_store_with_release(current->status, SUCCEEDED);
297 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 299 my_predecessors.internal_add_built_predecessor(*(current->my_pred));
300 __TBB_store_with_release(current->status, SUCCEEDED);
303 my_predecessors.internal_delete_built_predecessor(*(current->my_pred));
304 __TBB_store_with_release(current->status, SUCCEEDED);
307 current->cnt_val = my_predecessors.predecessor_count();
308 __TBB_store_with_release(current->status, SUCCEEDED);
311 my_predecessors.copy_predecessors(*(current->plist));
312 __TBB_store_with_release(current->status, SUCCEEDED);
320 template<
typename R,
typename B >
friend class run_and_put_task;
323 task *try_put_task(
const T & ) {
332 my_predecessors.set_owner(
this );
333 my_aggregator.initialize_handler(handler_type(
this));
340 my_predecessors.set_owner(
this );
341 my_aggregator.initialize_handler(handler_type(
this));
350 reserving_port_operation op_data(src, reg_pred);
351 my_aggregator.execute(&op_data);
352 return op_data.status == SUCCEEDED;
357 reserving_port_operation op_data(src, rem_pred);
358 my_aggregator.execute(&op_data);
359 return op_data.status == SUCCEEDED;
364 reserving_port_operation op_data(v, res_item);
365 my_aggregator.execute(&op_data);
366 return op_data.status == SUCCEEDED;
371 reserving_port_operation op_data(rel_res);
372 my_aggregator.execute(&op_data);
377 reserving_port_operation op_data(con_res);
378 my_aggregator.execute(&op_data);
381 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 382 built_predecessors_type &built_predecessors() {
return my_predecessors.built_predecessors(); }
383 void internal_add_built_predecessor(predecessor_type &src) {
384 reserving_port_operation op_data(src, add_blt_pred);
385 my_aggregator.execute(&op_data);
388 void internal_delete_built_predecessor(predecessor_type &src) {
389 reserving_port_operation op_data(src, del_blt_pred);
390 my_aggregator.execute(&op_data);
393 size_t predecessor_count() {
394 reserving_port_operation op_data(blt_pred_cnt);
395 my_aggregator.execute(&op_data);
396 return op_data.cnt_val;
399 void copy_predecessors(predecessor_list_type &l) {
400 reserving_port_operation op_data(blt_pred_cpy);
402 my_aggregator.execute(&op_data);
405 void extract_receiver() {
406 my_predecessors.built_predecessors().receiver_extract(*
this);
411 void reset_receiver( reset_flags f) {
412 if(f & rf_clear_edges) my_predecessors.clear();
414 my_predecessors.reset();
416 __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(),
"port edges not removed");
429 typedef T input_type;
430 typedef typename receiver<input_type>::predecessor_type predecessor_type;
432 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 433 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
434 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
439 enum op_type { get__item, res_port, try__put_task
440 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 441 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
444 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
446 class queueing_port_operation :
public aggregated_operation<queueing_port_operation> {
451 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 452 predecessor_type *pred;
454 predecessor_list_type *plist;
458 queueing_port_operation(
const T& e, op_type t) :
459 type(
char(t)), my_val(e)
463 queueing_port_operation(
const T* p, op_type t) :
464 type(
char(t)), my_arg(const_cast<T*>(p))
468 queueing_port_operation(op_type t) : type(
char(t))
473 typedef internal::aggregating_functor<class_type, queueing_port_operation> handler_type;
474 friend class internal::aggregating_functor<class_type, queueing_port_operation>;
475 aggregator<handler_type, queueing_port_operation> my_aggregator;
477 void handle_operations(queueing_port_operation* op_list) {
478 queueing_port_operation *current;
482 op_list = op_list->next;
483 switch(current->type) {
484 case try__put_task: {
486 was_empty = this->buffer_empty();
487 this->push_back(current->my_val);
488 if (was_empty) rtask = my_join->decrement_port_count(
false);
490 rtask = SUCCESSFULLY_ENQUEUED;
491 current->bypass_t = rtask;
492 __TBB_store_with_release(current->status, SUCCEEDED);
496 if(!this->buffer_empty()) {
497 *(current->my_arg) = this->front();
498 __TBB_store_with_release(current->status, SUCCEEDED);
501 __TBB_store_with_release(current->status, FAILED);
505 __TBB_ASSERT(this->my_item_valid(this->my_head),
"No item to reset");
506 this->destroy_front();
507 if(this->my_item_valid(this->my_head)) {
508 (void)my_join->decrement_port_count(
true);
510 __TBB_store_with_release(current->status, SUCCEEDED);
512 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 514 my_built_predecessors.add_edge(*(current->pred));
515 __TBB_store_with_release(current->status, SUCCEEDED);
518 my_built_predecessors.delete_edge(*(current->pred));
519 __TBB_store_with_release(current->status, SUCCEEDED);
522 current->cnt_val = my_built_predecessors.edge_count();
523 __TBB_store_with_release(current->status, SUCCEEDED);
526 my_built_predecessors.copy_edges(*(current->plist));
527 __TBB_store_with_release(current->status, SUCCEEDED);
536 template<
typename R,
typename B >
friend class run_and_put_task;
539 task *try_put_task(
const T &v) {
540 queueing_port_operation op_data(v, try__put_task);
541 my_aggregator.execute(&op_data);
542 __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t,
"inconsistent return from aggregator");
543 if(!op_data.bypass_t)
return SUCCESSFULLY_ENQUEUED;
544 return op_data.bypass_t;
552 my_aggregator.initialize_handler(handler_type(
this));
558 my_aggregator.initialize_handler(handler_type(
this));
566 bool get_item( T &v ) {
567 queueing_port_operation op_data(&v, get__item);
568 my_aggregator.execute(&op_data);
569 return op_data.status == SUCCEEDED;
575 queueing_port_operation op_data(res_port);
576 my_aggregator.execute(&op_data);
580 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 581 built_predecessors_type &built_predecessors() {
return my_built_predecessors; }
583 void internal_add_built_predecessor(predecessor_type &p) {
584 queueing_port_operation op_data(add_blt_pred);
586 my_aggregator.execute(&op_data);
589 void internal_delete_built_predecessor(predecessor_type &p) {
590 queueing_port_operation op_data(del_blt_pred);
592 my_aggregator.execute(&op_data);
595 size_t predecessor_count() {
596 queueing_port_operation op_data(blt_pred_cnt);
597 my_aggregator.execute(&op_data);
598 return op_data.cnt_val;
601 void copy_predecessors(predecessor_list_type &l) {
602 queueing_port_operation op_data(blt_pred_cpy);
604 my_aggregator.execute(&op_data);
607 void extract_receiver() {
609 my_built_predecessors.receiver_extract(*
this);
613 void reset_receiver(reset_flags f) {
614 tbb::internal::suppress_unused_warning(f);
616 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 617 if (f & rf_clear_edges)
618 my_built_predecessors.clear();
624 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 625 edge_container<predecessor_type> my_built_predecessors;
629 #include "_flow_graph_tagged_buffer_impl.h" 639 template<
typename K >
642 const K& operator()(
const table_item_type& v) {
return v.my_key; }
647 template<
class TraitsType >
649 public receiver<typename TraitsType::T>,
650 public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
651 typename TraitsType::KHash > {
653 typedef TraitsType traits;
655 typedef typename TraitsType::T input_type;
656 typedef typename TraitsType::K key_type;
657 typedef typename tbb::internal::strip<key_type>::type noref_key_type;
658 typedef typename receiver<input_type>::predecessor_type predecessor_type;
659 typedef typename TraitsType::TtoK type_to_key_func_type;
660 typedef typename TraitsType::KHash hash_compare_type;
662 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 663 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
664 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
669 enum op_type { try__put, get__item, res_port
670 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 671 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
674 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
676 class key_matching_port_operation :
public aggregated_operation<key_matching_port_operation> {
681 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 682 predecessor_type *pred;
684 predecessor_list_type *plist;
687 key_matching_port_operation(
const input_type& e, op_type t) :
688 type(
char(t)), my_val(e) {}
690 key_matching_port_operation(
const input_type* p, op_type t) :
691 type(
char(t)), my_arg(const_cast<input_type*>(p)) {}
693 key_matching_port_operation(op_type t) : type(
char(t)) {}
696 typedef internal::aggregating_functor<class_type, key_matching_port_operation> handler_type;
697 friend class internal::aggregating_functor<class_type, key_matching_port_operation>;
698 aggregator<handler_type, key_matching_port_operation> my_aggregator;
700 void handle_operations(key_matching_port_operation* op_list) {
701 key_matching_port_operation *current;
704 op_list = op_list->next;
705 switch(current->type) {
707 bool was_inserted = this->insert_with_key(current->my_val);
709 __TBB_store_with_release(current->status, was_inserted ? SUCCEEDED : FAILED);
714 if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
715 __TBB_ASSERT(
false,
"Failed to find item corresponding to current_key.");
717 __TBB_store_with_release(current->status, SUCCEEDED);
721 this->delete_with_key(my_join->current_key);
722 __TBB_store_with_release(current->status, SUCCEEDED);
724 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 726 my_built_predecessors.add_edge(*(current->pred));
727 __TBB_store_with_release(current->status, SUCCEEDED);
730 my_built_predecessors.delete_edge(*(current->pred));
731 __TBB_store_with_release(current->status, SUCCEEDED);
734 current->cnt_val = my_built_predecessors.edge_count();
735 __TBB_store_with_release(current->status, SUCCEEDED);
738 my_built_predecessors.copy_edges(*(current->plist));
739 __TBB_store_with_release(current->status, SUCCEEDED);
747 template<
typename R,
typename B >
friend class run_and_put_task;
750 task *try_put_task(
const input_type& v) {
751 key_matching_port_operation op_data(v, try__put);
753 my_aggregator.execute(&op_data);
754 if(op_data.status == SUCCEEDED) {
755 rtask = my_join->increment_key_count((*(this->get_key_func()))(v),
false);
757 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
766 my_aggregator.initialize_handler(handler_type(
this));
772 my_aggregator.initialize_handler(handler_type(
this));
781 void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
783 type_to_key_func_type* get_my_key_func() {
return this->get_key_func(); }
785 bool get_item( input_type &v ) {
787 key_matching_port_operation op_data(&v, get__item);
788 my_aggregator.execute(&op_data);
789 return op_data.status == SUCCEEDED;
792 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 793 built_predecessors_type &built_predecessors() {
return my_built_predecessors; }
795 void internal_add_built_predecessor(predecessor_type &p) {
796 key_matching_port_operation op_data(add_blt_pred);
798 my_aggregator.execute(&op_data);
801 void internal_delete_built_predecessor(predecessor_type &p) {
802 key_matching_port_operation op_data(del_blt_pred);
804 my_aggregator.execute(&op_data);
807 size_t predecessor_count() {
808 key_matching_port_operation op_data(blt_pred_cnt);
809 my_aggregator.execute(&op_data);
810 return op_data.cnt_val;
813 void copy_predecessors(predecessor_list_type &l) {
814 key_matching_port_operation op_data(blt_pred_cpy);
816 my_aggregator.execute(&op_data);
823 key_matching_port_operation op_data(res_port);
824 my_aggregator.execute(&op_data);
828 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 829 void extract_receiver() {
830 buffer_type::reset();
831 my_built_predecessors.receiver_extract(*
this);
834 void reset_receiver(reset_flags f ) {
835 tbb::internal::suppress_unused_warning(f);
836 buffer_type::reset();
837 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 838 if (f & rf_clear_edges)
839 my_built_predecessors.clear();
847 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 848 edge_container<predecessor_type> my_built_predecessors;
852 using namespace graph_policy_namespace;
854 template<
typename JP,
typename InputTuple,
typename OutputTuple>
858 template<
typename JP,
typename InputTuple,
typename OutputTuple>
861 template<
typename InputTuple,
typename OutputTuple>
864 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
865 typedef OutputTuple output_type;
866 typedef InputTuple input_type;
870 ports_with_no_inputs = N;
875 ports_with_no_inputs = N;
879 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
881 void increment_port_count() {
882 ++ports_with_no_inputs;
886 task * decrement_port_count(
bool handle_task) {
887 if(ports_with_no_inputs.fetch_and_decrement() == 1) {
888 if(this->graph_pointer->is_active()) {
889 task *rtask =
new ( task::allocate_additional_child_of( *(this->graph_pointer->root_task()) ) )
891 if(!handle_task)
return rtask;
898 input_type &input_ports() {
return my_inputs; }
902 void reset( reset_flags f) {
904 ports_with_no_inputs = N;
908 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 911 ports_with_no_inputs = N;
918 bool tuple_build_may_succeed() {
919 return !ports_with_no_inputs;
922 bool try_to_make_tuple(output_type &out) {
923 if(ports_with_no_inputs)
return false;
927 void tuple_accepted() {
930 void tuple_rejected() {
934 input_type my_inputs;
935 base_node_type *my_node;
936 atomic<size_t> ports_with_no_inputs;
939 template<
typename InputTuple,
typename OutputTuple>
942 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
943 typedef OutputTuple output_type;
944 typedef InputTuple input_type;
948 ports_with_no_items = N;
953 ports_with_no_items = N;
958 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
960 void reset_port_count() {
961 ports_with_no_items = N;
965 task * decrement_port_count(
bool handle_task)
967 if(ports_with_no_items.fetch_and_decrement() == 1) {
968 if(this->graph_pointer->is_active()) {
969 task *rtask =
new ( task::allocate_additional_child_of( *(this->graph_pointer->root_task()) ) )
971 if(!handle_task)
return rtask;
978 void increment_port_count() { __TBB_ASSERT(
false, NULL); }
980 input_type &input_ports() {
return my_inputs; }
984 void reset( reset_flags f) {
989 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 997 bool tuple_build_may_succeed() {
998 return !ports_with_no_items;
1001 bool try_to_make_tuple(output_type &out) {
1002 if(ports_with_no_items)
return false;
1006 void tuple_accepted() {
1010 void tuple_rejected() {
1014 input_type my_inputs;
1015 base_node_type *my_node;
1016 atomic<size_t> ports_with_no_items;
1020 template<
typename InputTuple,
typename OutputTuple,
typename K,
typename KHash>
1024 typename tbb::internal::strip<K>::type&,
1025 count_element<typename tbb::internal::strip<K>::type>,
1026 internal::type_to_key_function_body<
1027 count_element<typename tbb::internal::strip<K>::type>,
1028 typename tbb::internal::strip<K>::type& >,
1033 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
1034 typedef OutputTuple output_type;
1035 typedef InputTuple input_type;
1037 typedef typename tbb::internal::strip<key_type>::type unref_key_type;
1038 typedef KHash key_hash_compare;
1057 enum op_type { res_count, inc_count, may_succeed, try_make };
1058 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1061 class key_matching_FE_operation :
public aggregated_operation<key_matching_FE_operation> {
1064 unref_key_type my_val;
1065 output_type* my_output;
1069 key_matching_FE_operation(
const unref_key_type& e ,
bool q_task , op_type t) : type(
char(t)), my_val(e),
1070 my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1071 key_matching_FE_operation(output_type *p, op_type t) : type(
char(t)), my_output(p), bypass_t(NULL),
1072 enqueue_task(
true) {}
1074 key_matching_FE_operation(op_type t) : type(
char(t)), my_output(NULL), bypass_t(NULL), enqueue_task(
true) {}
1077 typedef internal::aggregating_functor<class_type, key_matching_FE_operation> handler_type;
1078 friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1079 aggregator<handler_type, key_matching_FE_operation> my_aggregator;
1084 task * fill_output_buffer(unref_key_type &t,
bool should_enqueue,
bool handle_task) {
1087 bool do_fwd = should_enqueue && this->buffer_empty() && this->graph_pointer->is_active();
1088 this->current_key = t;
1089 this->delete_with_key(this->current_key);
1091 this->push_back(l_out);
1093 rtask =
new ( task::allocate_additional_child_of( *(this->graph_pointer->root_task()) ) )
1105 __TBB_ASSERT(
false,
"should have had something to push");
1110 void handle_operations(key_matching_FE_operation* op_list) {
1111 key_matching_FE_operation *current;
1114 op_list = op_list->next;
1115 switch(current->type) {
1118 this->destroy_front();
1119 __TBB_store_with_release(current->status, SUCCEEDED);
1123 count_element_type *p = 0;
1124 unref_key_type &t = current->my_val;
1125 bool do_enqueue = current->enqueue_task;
1126 if(!(this->find_ref_with_key(t,p))) {
1127 count_element_type ev;
1130 this->insert_with_key(ev);
1131 if(!(this->find_ref_with_key(t,p))) {
1132 __TBB_ASSERT(
false,
"should find key after inserting it");
1135 if(++(p->my_value) ==
size_t(N)) {
1136 task *rtask = fill_output_buffer(t,
true, do_enqueue);
1137 __TBB_ASSERT(!rtask || !do_enqueue,
"task should not be returned");
1138 current->bypass_t = rtask;
1141 __TBB_store_with_release(current->status, SUCCEEDED);
1144 __TBB_store_with_release(current->status, this->buffer_empty() ? FAILED : SUCCEEDED);
1147 if(this->buffer_empty()) {
1148 __TBB_store_with_release(current->status, FAILED);
1151 *(current->my_output) = this->front();
1152 __TBB_store_with_release(current->status, SUCCEEDED);
1161 template<
typename FunctionTuple>
1162 join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1165 my_aggregator.initialize_handler(handler_type(
this));
1166 TtoK_function_body_type *cfb =
new TtoK_function_body_leaf_type(key_to_count_func());
1167 this->set_key_func(cfb);
1170 join_node_FE(
const join_node_FE& other) : forwarding_base_type(*(other.forwarding_base_type::graph_pointer)), key_to_count_buffer_type(),
1171 output_buffer_type() {
1175 my_aggregator.initialize_handler(handler_type(
this));
1176 TtoK_function_body_type *cfb =
new TtoK_function_body_leaf_type(key_to_count_func());
1177 this->set_key_func(cfb);
1181 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1183 void reset_port_count() {
1184 key_matching_FE_operation op_data(res_count);
1185 my_aggregator.execute(&op_data);
1191 task *increment_key_count(unref_key_type
const & t,
bool handle_task) {
1192 key_matching_FE_operation op_data(t, handle_task, inc_count);
1193 my_aggregator.execute(&op_data);
1194 return op_data.bypass_t;
1197 task *decrement_port_count(
bool ) { __TBB_ASSERT(
false, NULL);
return NULL; }
1199 void increment_port_count() { __TBB_ASSERT(
false, NULL); }
1201 input_type &input_ports() {
return my_inputs; }
1205 void reset( reset_flags f ) {
1209 key_to_count_buffer_type::reset();
1210 output_buffer_type::reset();
1213 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1217 key_to_count_buffer_type::reset();
1218 output_buffer_type::reset();
1224 bool tuple_build_may_succeed() {
1225 key_matching_FE_operation op_data(may_succeed);
1226 my_aggregator.execute(&op_data);
1227 return op_data.status == SUCCEEDED;
1232 bool try_to_make_tuple(output_type &out) {
1233 key_matching_FE_operation op_data(&out,try_make);
1234 my_aggregator.execute(&op_data);
1235 return op_data.status == SUCCEEDED;
1238 void tuple_accepted() {
1242 void tuple_rejected() {
1246 input_type my_inputs;
1247 base_node_type *my_node;
1251 template<
typename JP,
typename InputTuple,
typename OutputTuple>
1253 public sender<OutputTuple> {
1255 using graph_node::my_graph;
1257 typedef OutputTuple output_type;
1259 typedef typename sender<output_type>::successor_type successor_type;
1261 using input_ports_type::tuple_build_may_succeed;
1262 using input_ports_type::try_to_make_tuple;
1263 using input_ports_type::tuple_accepted;
1264 using input_ports_type::tuple_rejected;
1265 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1266 typedef typename sender<output_type>::built_successors_type built_successors_type;
1267 typedef typename sender<output_type>::successor_list_type successor_list_type;
1272 enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1273 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1274 , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1277 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1280 class join_node_base_operation :
public aggregated_operation<join_node_base_operation> {
1284 output_type *my_arg;
1285 successor_type *my_succ;
1286 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1288 successor_list_type *slist;
1292 join_node_base_operation(
const output_type& e, op_type t) : type(
char(t)),
1293 my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1294 join_node_base_operation(
const successor_type &s, op_type t) : type(
char(t)),
1295 my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1296 join_node_base_operation(op_type t) : type(
char(t)), bypass_t(NULL) {}
1299 typedef internal::aggregating_functor<class_type, join_node_base_operation> handler_type;
1300 friend class internal::aggregating_functor<class_type, join_node_base_operation>;
1301 bool forwarder_busy;
1302 aggregator<handler_type, join_node_base_operation> my_aggregator;
1304 void handle_operations(join_node_base_operation* op_list) {
1305 join_node_base_operation *current;
1308 op_list = op_list->next;
1309 switch(current->type) {
1311 my_successors.register_successor(*(current->my_succ));
1312 if(tuple_build_may_succeed() && !forwarder_busy && this->graph_node::my_graph.is_active()) {
1313 task *rtask =
new ( task::allocate_additional_child_of(*(this->graph_node::my_graph.root_task())) )
1317 forwarder_busy =
true;
1319 __TBB_store_with_release(current->status, SUCCEEDED);
1323 my_successors.remove_successor(*(current->my_succ));
1324 __TBB_store_with_release(current->status, SUCCEEDED);
1327 if(tuple_build_may_succeed()) {
1328 if(try_to_make_tuple(*(current->my_arg))) {
1330 __TBB_store_with_release(current->status, SUCCEEDED);
1332 else __TBB_store_with_release(current->status, FAILED);
1334 else __TBB_store_with_release(current->status, FAILED);
1336 case do_fwrd_bypass: {
1337 bool build_succeeded;
1338 task *last_task = NULL;
1340 if(tuple_build_may_succeed()) {
1342 build_succeeded = try_to_make_tuple(out);
1343 if(build_succeeded) {
1344 task *new_task = my_successors.try_put_task(out);
1345 last_task = combine_tasks(last_task, new_task);
1351 build_succeeded =
false;
1354 }
while(build_succeeded);
1356 current->bypass_t = last_task;
1357 __TBB_store_with_release(current->status, SUCCEEDED);
1358 forwarder_busy =
false;
1361 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1363 my_successors.internal_add_built_successor(*(current->my_succ));
1364 __TBB_store_with_release(current->status, SUCCEEDED);
1367 my_successors.internal_delete_built_successor(*(current->my_succ));
1368 __TBB_store_with_release(current->status, SUCCEEDED);
1371 current->cnt_val = my_successors.successor_count();
1372 __TBB_store_with_release(current->status, SUCCEEDED);
1375 my_successors.copy_successors(*(current->slist));
1376 __TBB_store_with_release(current->status, SUCCEEDED);
1384 join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(
false) {
1385 my_successors.set_owner(
this);
1386 input_ports_type::set_my_node(
this);
1387 my_aggregator.initialize_handler(handler_type(
this));
1391 graph_node(other.graph_node::my_graph), input_ports_type(other),
1392 sender<OutputTuple>(), forwarder_busy(
false), my_successors() {
1393 my_successors.set_owner(
this);
1394 input_ports_type::set_my_node(
this);
1395 my_aggregator.initialize_handler(handler_type(
this));
1398 template<
typename FunctionTuple>
1399 join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(
false) {
1400 my_successors.set_owner(
this);
1401 input_ports_type::set_my_node(
this);
1402 my_aggregator.initialize_handler(handler_type(
this));
1405 bool register_successor(successor_type &r) {
1406 join_node_base_operation op_data(r, reg_succ);
1407 my_aggregator.execute(&op_data);
1408 return op_data.status == SUCCEEDED;
1411 bool remove_successor( successor_type &r) {
1412 join_node_base_operation op_data(r, rem_succ);
1413 my_aggregator.execute(&op_data);
1414 return op_data.status == SUCCEEDED;
1417 bool try_get( output_type &v) {
1418 join_node_base_operation op_data(v, try__get);
1419 my_aggregator.execute(&op_data);
1420 return op_data.status == SUCCEEDED;
1423 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1424 built_successors_type &built_successors() {
return my_successors.built_successors(); }
1426 void internal_add_built_successor( successor_type &r) {
1427 join_node_base_operation op_data(r, add_blt_succ);
1428 my_aggregator.execute(&op_data);
1431 void internal_delete_built_successor( successor_type &r) {
1432 join_node_base_operation op_data(r, del_blt_succ);
1433 my_aggregator.execute(&op_data);
1436 size_t successor_count() {
1437 join_node_base_operation op_data(blt_succ_cnt);
1438 my_aggregator.execute(&op_data);
1439 return op_data.cnt_val;
1442 void copy_successors(successor_list_type &l) {
1443 join_node_base_operation op_data(blt_succ_cpy);
1445 my_aggregator.execute(&op_data);
1449 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 1451 input_ports_type::extract();
1452 my_successors.built_successors().sender_extract(*
this);
1458 void reset_node(reset_flags f) {
1459 input_ports_type::reset(f);
1460 if(f & rf_clear_edges) my_successors.clear();
1467 task *forward_task() {
1468 join_node_base_operation op_data(do_fwrd_bypass);
1469 my_aggregator.execute(&op_data);
1470 return op_data.bypass_t;
1476 template<
int N,
template<
class>
class PT,
typename OutputTuple,
typename JP>
1481 template<
int N,
typename OutputTuple,
typename K,
typename KHash>
1485 typedef KHash key_hash_compare;
1496 template<
int N,
template<
class>
class PT,
typename OutputTuple,
typename JP>
1500 typedef OutputTuple output_type;
1508 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1509 template <
typename K,
typename T>
1510 struct key_from_message_body {
1511 K operator()(
const T& t)
const {
1512 using tbb::flow::key_from_message;
1513 return key_from_message<K>(t);
1517 template <
typename K,
typename T>
1518 struct key_from_message_body<K&,T> {
1519 const K& operator()(
const T& t)
const {
1520 using tbb::flow::key_from_message;
1521 return key_from_message<const K&>(t);
1528 template<
typename OutputTuple,
typename K,
typename KHash>
1530 join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1531 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1532 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1535 typedef OutputTuple output_type;
1540 typedef typename tbb::flow::tuple< f0_p, f1_p > func_initializer_type;
1542 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1544 func_initializer_type(
1550 template<
typename Body0,
typename Body1>
1552 func_initializer_type(
1556 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 2,
"wrong number of body initializers");
1561 template<
typename OutputTuple,
typename K,
typename KHash>
1563 join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1564 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1565 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1566 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1569 typedef OutputTuple output_type;
1575 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1577 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1579 func_initializer_type(
1586 template<
typename Body0,
typename Body1,
typename Body2>
1588 func_initializer_type(
1593 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 3,
"wrong number of body initializers");
1598 template<
typename OutputTuple,
typename K,
typename KHash>
1600 join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1601 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1602 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1603 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1604 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1607 typedef OutputTuple output_type;
1614 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1616 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1618 func_initializer_type(
1626 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3>
1627 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1628 func_initializer_type(
1634 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 4,
"wrong number of body initializers");
1639 template<
typename OutputTuple,
typename K,
typename KHash>
1641 join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1642 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1643 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1644 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1645 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1646 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1649 typedef OutputTuple output_type;
1657 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1659 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1661 func_initializer_type(
1670 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4>
1671 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1672 func_initializer_type(
1679 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 5,
"wrong number of body initializers");
1684 #if __TBB_VARIADIC_MAX >= 6 1685 template<
typename OutputTuple,
typename K,
typename KHash>
1686 class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1687 join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1688 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1689 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1690 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1691 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1692 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1693 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1696 typedef OutputTuple output_type;
1705 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1707 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1709 func_initializer_type(
1719 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
typename Body5>
1720 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1721 : base_type(g, func_initializer_type(
1729 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 6,
"wrong number of body initializers");
1735 #if __TBB_VARIADIC_MAX >= 7 1736 template<
typename OutputTuple,
typename K,
typename KHash>
1737 class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1738 join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1739 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1740 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1741 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1742 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1743 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1744 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1745 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1748 typedef OutputTuple output_type;
1758 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1760 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1762 func_initializer_type(
1773 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1774 typename Body5,
typename Body6>
1775 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1776 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1785 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 7,
"wrong number of body initializers");
1787 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1791 #if __TBB_VARIADIC_MAX >= 8 1792 template<
typename OutputTuple,
typename K,
typename KHash>
1793 class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1794 join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1795 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1796 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1797 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1798 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1799 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1800 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1801 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1802 typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1805 typedef OutputTuple output_type;
1816 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1818 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1820 func_initializer_type(
1832 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1833 typename Body5,
typename Body6,
typename Body7>
1834 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1835 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1845 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 8,
"wrong number of body initializers");
1847 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1851 #if __TBB_VARIADIC_MAX >= 9 1852 template<
typename OutputTuple,
typename K,
typename KHash>
1853 class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1854 join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1855 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1856 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1857 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1858 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1859 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1860 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1861 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1862 typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1863 typedef typename tbb::flow::tuple_element<8, OutputTuple>::type T8;
1866 typedef OutputTuple output_type;
1878 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1880 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1882 func_initializer_type(
1895 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1896 typename Body5,
typename Body6,
typename Body7,
typename Body8>
1897 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1898 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1909 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 9,
"wrong number of body initializers");
1911 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1915 #if __TBB_VARIADIC_MAX >= 10 1916 template<
typename OutputTuple,
typename K,
typename KHash>
1917 class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1918 join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1919 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1920 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1921 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1922 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1923 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1924 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1925 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1926 typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1927 typedef typename tbb::flow::tuple_element<8, OutputTuple>::type T8;
1928 typedef typename tbb::flow::tuple_element<9, OutputTuple>::type T9;
1931 typedef OutputTuple output_type;
1944 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1946 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1948 func_initializer_type(
1962 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1963 typename Body5,
typename Body6,
typename Body7,
typename Body8,
typename Body9>
1964 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1965 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1977 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 10,
"wrong number of body initializers");
1979 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1984 template<
size_t N,
typename JNT>
1985 typename tbb::flow::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1986 return tbb::flow::get<N>(jn.input_ports());
1990 #endif // __TBB__flow_graph_join_impl_H reserving_port()
Constructor.
Definition: _flow_graph_join_impl.h:330
queueing_port(const queueing_port &)
copy constructor
Definition: _flow_graph_join_impl.h:556
Definition: _flow_graph_join_impl.h:30
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
Definition: _flow_graph_join_impl.h:1497
bool register_predecessor(predecessor_type &src)
Add a predecessor.
Definition: _flow_graph_join_impl.h:349
void consume()
Complete use of the port.
Definition: _flow_graph_join_impl.h:376
void release()
Release the port.
Definition: _flow_graph_join_impl.h:370
Definition: _flow_graph_join_impl.h:1477
queueing_port()
Constructor.
Definition: _flow_graph_join_impl.h:550
bool remove_predecessor(predecessor_type &src)
Remove a predecessor.
Definition: _flow_graph_join_impl.h:356
Definition: _flow_graph_types_impl.h:58
Definition: _flow_graph_impl.h:40
Definition: _flow_graph_impl.h:218
A cache of successors that are put in a round-robin fashion.
Definition: _flow_graph_impl.h:796
A task that calls a node's forward_task function.
Definition: _flow_graph_impl.h:255
join_node_FE : implements input port policy
Definition: _flow_graph_join_impl.h:859
Definition: _flow_graph_join_impl.h:640
Definition: _flow_graph_join_impl.h:53
A cache of successors that are broadcast to.
Definition: _flow_graph_impl.h:751
bool reserve(T &v)
Reserve an item from the port.
Definition: _flow_graph_join_impl.h:363
The two-phase join port.
Definition: _flow_graph_join_impl.h:213
Definition: _flow_graph_async_msg_impl.h:32
Definition: _flow_graph_join_impl.h:632
queueing join_port
Definition: _flow_graph_join_impl.h:427
Definition: _flow_graph_impl.h:39
field of type K being used for matching.
Definition: _flow_graph_impl.h:46
Definition: _flow_graph_join_impl.h:648
Definition: _flow_graph_impl.h:201
Definition: _flow_graph_join_impl.h:52
void set_join_node_pointer(forwarding_base *join)
record parent for tallying available items
Definition: _flow_graph_join_impl.h:562
Definition: _flow_graph_types_impl.h:53
join_node_base
Definition: _flow_graph_join_impl.h:855
Definition: _flow_graph_item_buffer_impl.h:44
Definition: _flow_graph_join_impl.h:44