BRE12
_flow_graph_join_impl.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB__flow_graph_join_impl_H
22 #define __TBB__flow_graph_join_impl_H
23 
24 #ifndef __TBB_flow_graph_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
28 namespace internal {
29 
30  struct forwarding_base {
31  forwarding_base(graph &g) : graph_pointer(&g) {}
32  virtual ~forwarding_base() {}
33  // decrement_port_count may create a forwarding task. If we cannot handle the task
34  // ourselves, ask decrement_port_count to deal with it.
35  virtual task * decrement_port_count(bool handle_task) = 0;
36  virtual void increment_port_count() = 0;
37  // moved here so input ports can queue tasks
38  graph* graph_pointer;
39  };
40 
41  // specialization that lets us keep a copy of the current_key for building results.
42  // KeyType can be a reference type.
43  template<typename KeyType>
45  typedef typename tbb::internal::strip<KeyType>::type current_key_type;
46  matching_forwarding_base(graph &g) : forwarding_base(g) { }
47  virtual task * increment_key_count(current_key_type const & /*t*/, bool /*handle_task*/) = 0; // {return NULL;}
48  current_key_type current_key; // so ports can refer to FE's desired items
49  };
50 
51  template< int N >
52  struct join_helper {
53 
54  template< typename TupleType, typename PortType >
55  static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
56  tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
58  }
59  template< typename TupleType >
60  static inline void consume_reservations( TupleType &my_input ) {
61  tbb::flow::get<N-1>( my_input ).consume();
63  }
64 
65  template< typename TupleType >
66  static inline void release_my_reservation( TupleType &my_input ) {
67  tbb::flow::get<N-1>( my_input ).release();
68  }
69 
70  template <typename TupleType>
71  static inline void release_reservations( TupleType &my_input) {
73  release_my_reservation(my_input);
74  }
75 
76  template< typename InputTuple, typename OutputTuple >
77  static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
78  if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) ) return false;
79  if ( !join_helper<N-1>::reserve( my_input, out ) ) {
80  release_my_reservation( my_input );
81  return false;
82  }
83  return true;
84  }
85 
86  template<typename InputTuple, typename OutputTuple>
87  static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
88  bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) ); // may fail
89  return join_helper<N-1>::get_my_item(my_input, out) && res; // do get on other inputs before returning
90  }
91 
92  template<typename InputTuple, typename OutputTuple>
93  static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
94  return get_my_item(my_input, out);
95  }
96 
97  template<typename InputTuple>
98  static inline void reset_my_port(InputTuple &my_input) {
100  tbb::flow::get<N-1>(my_input).reset_port();
101  }
102 
103  template<typename InputTuple>
104  static inline void reset_ports(InputTuple& my_input) {
105  reset_my_port(my_input);
106  }
107 
108  template<typename InputTuple, typename KeyFuncTuple>
109  static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
110  tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
111  tbb::flow::get<N-1>(my_key_funcs) = NULL;
112  join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
113  }
114 
115  template< typename KeyFuncTuple>
116  static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
117  if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
118  tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
119  }
120  join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
121  }
122 
123  template<typename InputTuple>
124  static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
125  join_helper<N-1>::reset_inputs(my_input, f);
126  tbb::flow::get<N-1>(my_input).reset_receiver(f);
127  }
128 
129 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
130  template<typename InputTuple>
131  static inline void extract_inputs(InputTuple &my_input) {
133  tbb::flow::get<N-1>(my_input).extract_receiver();
134  }
135 #endif
136  }; // join_helper<N>
137 
138  template< >
139  struct join_helper<1> {
140 
141  template< typename TupleType, typename PortType >
142  static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
143  tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
144  }
145 
146  template< typename TupleType >
147  static inline void consume_reservations( TupleType &my_input ) {
148  tbb::flow::get<0>( my_input ).consume();
149  }
150 
151  template< typename TupleType >
152  static inline void release_my_reservation( TupleType &my_input ) {
153  tbb::flow::get<0>( my_input ).release();
154  }
155 
156  template<typename TupleType>
157  static inline void release_reservations( TupleType &my_input) {
158  release_my_reservation(my_input);
159  }
160 
161  template< typename InputTuple, typename OutputTuple >
162  static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
163  return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
164  }
165 
166  template<typename InputTuple, typename OutputTuple>
167  static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
168  return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
169  }
170 
171  template<typename InputTuple, typename OutputTuple>
172  static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
173  return get_my_item(my_input, out);
174  }
175 
176  template<typename InputTuple>
177  static inline void reset_my_port(InputTuple &my_input) {
178  tbb::flow::get<0>(my_input).reset_port();
179  }
180 
181  template<typename InputTuple>
182  static inline void reset_ports(InputTuple& my_input) {
183  reset_my_port(my_input);
184  }
185 
186  template<typename InputTuple, typename KeyFuncTuple>
187  static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
188  tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
189  tbb::flow::get<0>(my_key_funcs) = NULL;
190  }
191 
192  template< typename KeyFuncTuple>
193  static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
194  if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
195  tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
196  }
197  }
198  template<typename InputTuple>
199  static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
200  tbb::flow::get<0>(my_input).reset_receiver(f);
201  }
202 
203 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
204  template<typename InputTuple>
205  static inline void extract_inputs(InputTuple &my_input) {
206  tbb::flow::get<0>(my_input).extract_receiver();
207  }
208 #endif
209  }; // join_helper<1>
210 
212  template< typename T >
213  class reserving_port : public receiver<T> {
214  public:
215  typedef T input_type;
216  typedef typename receiver<input_type>::predecessor_type predecessor_type;
217 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
218  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
219  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
220 #endif
221  private:
222  // ----------- Aggregator ------------
223  enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
224 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
225  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
226 #endif
227  };
228  enum op_stat {WAIT=0, SUCCEEDED, FAILED};
230 
231  class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
232  public:
233  char type;
234  union {
235  T *my_arg;
236  predecessor_type *my_pred;
237 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
238  size_t cnt_val;
239  predecessor_list_type *plist;
240 #endif
241  };
242  reserving_port_operation(const T& e, op_type t) :
243  type(char(t)), my_arg(const_cast<T*>(&e)) {}
244  reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
245  my_pred(const_cast<predecessor_type *>(&s)) {}
246  reserving_port_operation(op_type t) : type(char(t)) {}
247  };
248 
249  typedef internal::aggregating_functor<class_type, reserving_port_operation> handler_type;
250  friend class internal::aggregating_functor<class_type, reserving_port_operation>;
251  aggregator<handler_type, reserving_port_operation> my_aggregator;
252 
253  void handle_operations(reserving_port_operation* op_list) {
254  reserving_port_operation *current;
255  bool no_predecessors;
256  while(op_list) {
257  current = op_list;
258  op_list = op_list->next;
259  switch(current->type) {
260  case reg_pred:
261  no_predecessors = my_predecessors.empty();
262  my_predecessors.add(*(current->my_pred));
263  if ( no_predecessors ) {
264  (void) my_join->decrement_port_count(true); // may try to forward
265  }
266  __TBB_store_with_release(current->status, SUCCEEDED);
267  break;
268  case rem_pred:
269  my_predecessors.remove(*(current->my_pred));
270  if(my_predecessors.empty()) my_join->increment_port_count();
271  __TBB_store_with_release(current->status, SUCCEEDED);
272  break;
273  case res_item:
274  if ( reserved ) {
275  __TBB_store_with_release(current->status, FAILED);
276  }
277  else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
278  reserved = true;
279  __TBB_store_with_release(current->status, SUCCEEDED);
280  } else {
281  if ( my_predecessors.empty() ) {
282  my_join->increment_port_count();
283  }
284  __TBB_store_with_release(current->status, FAILED);
285  }
286  break;
287  case rel_res:
288  reserved = false;
289  my_predecessors.try_release( );
290  __TBB_store_with_release(current->status, SUCCEEDED);
291  break;
292  case con_res:
293  reserved = false;
294  my_predecessors.try_consume( );
295  __TBB_store_with_release(current->status, SUCCEEDED);
296  break;
297 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
298  case add_blt_pred:
299  my_predecessors.internal_add_built_predecessor(*(current->my_pred));
300  __TBB_store_with_release(current->status, SUCCEEDED);
301  break;
302  case del_blt_pred:
303  my_predecessors.internal_delete_built_predecessor(*(current->my_pred));
304  __TBB_store_with_release(current->status, SUCCEEDED);
305  break;
306  case blt_pred_cnt:
307  current->cnt_val = my_predecessors.predecessor_count();
308  __TBB_store_with_release(current->status, SUCCEEDED);
309  break;
310  case blt_pred_cpy:
311  my_predecessors.copy_predecessors(*(current->plist));
312  __TBB_store_with_release(current->status, SUCCEEDED);
313  break;
314 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
315  }
316  }
317  }
318 
319  protected:
320  template< typename R, typename B > friend class run_and_put_task;
321  template<typename X, typename Y> friend class internal::broadcast_cache;
322  template<typename X, typename Y> friend class internal::round_robin_cache;
323  task *try_put_task( const T & ) {
324  return NULL;
325  }
326 
327  public:
328 
330  reserving_port() : reserved(false) {
331  my_join = NULL;
332  my_predecessors.set_owner( this );
333  my_aggregator.initialize_handler(handler_type(this));
334  }
335 
336  // copy constructor
337  reserving_port(const reserving_port& /* other */) : receiver<T>() {
338  reserved = false;
339  my_join = NULL;
340  my_predecessors.set_owner( this );
341  my_aggregator.initialize_handler(handler_type(this));
342  }
343 
344  void set_join_node_pointer(forwarding_base *join) {
345  my_join = join;
346  }
347 
349  bool register_predecessor( predecessor_type &src ) {
350  reserving_port_operation op_data(src, reg_pred);
351  my_aggregator.execute(&op_data);
352  return op_data.status == SUCCEEDED;
353  }
354 
356  bool remove_predecessor( predecessor_type &src ) {
357  reserving_port_operation op_data(src, rem_pred);
358  my_aggregator.execute(&op_data);
359  return op_data.status == SUCCEEDED;
360  }
361 
363  bool reserve( T &v ) {
364  reserving_port_operation op_data(v, res_item);
365  my_aggregator.execute(&op_data);
366  return op_data.status == SUCCEEDED;
367  }
368 
370  void release( ) {
371  reserving_port_operation op_data(rel_res);
372  my_aggregator.execute(&op_data);
373  }
374 
376  void consume( ) {
377  reserving_port_operation op_data(con_res);
378  my_aggregator.execute(&op_data);
379  }
380 
381 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
382  /*override*/ built_predecessors_type &built_predecessors() { return my_predecessors.built_predecessors(); }
383  /*override*/void internal_add_built_predecessor(predecessor_type &src) {
384  reserving_port_operation op_data(src, add_blt_pred);
385  my_aggregator.execute(&op_data);
386  }
387 
388  /*override*/void internal_delete_built_predecessor(predecessor_type &src) {
389  reserving_port_operation op_data(src, del_blt_pred);
390  my_aggregator.execute(&op_data);
391  }
392 
393  /*override*/size_t predecessor_count() {
394  reserving_port_operation op_data(blt_pred_cnt);
395  my_aggregator.execute(&op_data);
396  return op_data.cnt_val;
397  }
398 
399  /*override*/void copy_predecessors(predecessor_list_type &l) {
400  reserving_port_operation op_data(blt_pred_cpy);
401  op_data.plist = &l;
402  my_aggregator.execute(&op_data);
403  }
404 
405  void extract_receiver() {
406  my_predecessors.built_predecessors().receiver_extract(*this);
407  }
408 
409 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
410 
411  /*override*/void reset_receiver( reset_flags f) {
412  if(f & rf_clear_edges) my_predecessors.clear();
413  else
414  my_predecessors.reset();
415  reserved = false;
416  __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
417  }
418 
419  private:
420  forwarding_base *my_join;
422  bool reserved;
423  }; // reserving_port
424 
426  template<typename T>
427  class queueing_port : public receiver<T>, public item_buffer<T> {
428  public:
429  typedef T input_type;
430  typedef typename receiver<input_type>::predecessor_type predecessor_type;
432 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
433  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
434  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
435 #endif
436 
437  // ----------- Aggregator ------------
438  private:
439  enum op_type { get__item, res_port, try__put_task
440 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
441  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
442 #endif
443  };
444  enum op_stat {WAIT=0, SUCCEEDED, FAILED};
445 
446  class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
447  public:
448  char type;
449  T my_val;
450  T *my_arg;
451 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
452  predecessor_type *pred;
453  size_t cnt_val;
454  predecessor_list_type *plist;
455 #endif
456  task * bypass_t;
457  // constructor for value parameter
458  queueing_port_operation(const T& e, op_type t) :
459  type(char(t)), my_val(e)
460  , bypass_t(NULL)
461  {}
462  // constructor for pointer parameter
463  queueing_port_operation(const T* p, op_type t) :
464  type(char(t)), my_arg(const_cast<T*>(p))
465  , bypass_t(NULL)
466  {}
467  // constructor with no parameter
468  queueing_port_operation(op_type t) : type(char(t))
469  , bypass_t(NULL)
470  {}
471  };
472 
473  typedef internal::aggregating_functor<class_type, queueing_port_operation> handler_type;
474  friend class internal::aggregating_functor<class_type, queueing_port_operation>;
475  aggregator<handler_type, queueing_port_operation> my_aggregator;
476 
477  void handle_operations(queueing_port_operation* op_list) {
478  queueing_port_operation *current;
479  bool was_empty;
480  while(op_list) {
481  current = op_list;
482  op_list = op_list->next;
483  switch(current->type) {
484  case try__put_task: {
485  task *rtask = NULL;
486  was_empty = this->buffer_empty();
487  this->push_back(current->my_val);
488  if (was_empty) rtask = my_join->decrement_port_count(false);
489  else
490  rtask = SUCCESSFULLY_ENQUEUED;
491  current->bypass_t = rtask;
492  __TBB_store_with_release(current->status, SUCCEEDED);
493  }
494  break;
495  case get__item:
496  if(!this->buffer_empty()) {
497  *(current->my_arg) = this->front();
498  __TBB_store_with_release(current->status, SUCCEEDED);
499  }
500  else {
501  __TBB_store_with_release(current->status, FAILED);
502  }
503  break;
504  case res_port:
505  __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
506  this->destroy_front();
507  if(this->my_item_valid(this->my_head)) {
508  (void)my_join->decrement_port_count(true);
509  }
510  __TBB_store_with_release(current->status, SUCCEEDED);
511  break;
512 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
513  case add_blt_pred:
514  my_built_predecessors.add_edge(*(current->pred));
515  __TBB_store_with_release(current->status, SUCCEEDED);
516  break;
517  case del_blt_pred:
518  my_built_predecessors.delete_edge(*(current->pred));
519  __TBB_store_with_release(current->status, SUCCEEDED);
520  break;
521  case blt_pred_cnt:
522  current->cnt_val = my_built_predecessors.edge_count();
523  __TBB_store_with_release(current->status, SUCCEEDED);
524  break;
525  case blt_pred_cpy:
526  my_built_predecessors.copy_edges(*(current->plist));
527  __TBB_store_with_release(current->status, SUCCEEDED);
528  break;
529 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
530  }
531  }
532  }
533  // ------------ End Aggregator ---------------
534 
535  protected:
536  template< typename R, typename B > friend class run_and_put_task;
537  template<typename X, typename Y> friend class internal::broadcast_cache;
538  template<typename X, typename Y> friend class internal::round_robin_cache;
539  /*override*/task *try_put_task(const T &v) {
540  queueing_port_operation op_data(v, try__put_task);
541  my_aggregator.execute(&op_data);
542  __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
543  if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
544  return op_data.bypass_t;
545  }
546 
547  public:
548 
551  my_join = NULL;
552  my_aggregator.initialize_handler(handler_type(this));
553  }
554 
556  queueing_port(const queueing_port& /* other */) : receiver<T>(), item_buffer<T>() {
557  my_join = NULL;
558  my_aggregator.initialize_handler(handler_type(this));
559  }
560 
563  my_join = join;
564  }
565 
566  bool get_item( T &v ) {
567  queueing_port_operation op_data(&v, get__item);
568  my_aggregator.execute(&op_data);
569  return op_data.status == SUCCEEDED;
570  }
571 
572  // reset_port is called when item is accepted by successor, but
573  // is initiated by join_node.
574  void reset_port() {
575  queueing_port_operation op_data(res_port);
576  my_aggregator.execute(&op_data);
577  return;
578  }
579 
580 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
581  /*override*/ built_predecessors_type &built_predecessors() { return my_built_predecessors; }
582 
583  /*override*/void internal_add_built_predecessor(predecessor_type &p) {
584  queueing_port_operation op_data(add_blt_pred);
585  op_data.pred = &p;
586  my_aggregator.execute(&op_data);
587  }
588 
589  /*override*/void internal_delete_built_predecessor(predecessor_type &p) {
590  queueing_port_operation op_data(del_blt_pred);
591  op_data.pred = &p;
592  my_aggregator.execute(&op_data);
593  }
594 
595  /*override*/size_t predecessor_count() {
596  queueing_port_operation op_data(blt_pred_cnt);
597  my_aggregator.execute(&op_data);
598  return op_data.cnt_val;
599  }
600 
601  /*override*/void copy_predecessors(predecessor_list_type &l) {
602  queueing_port_operation op_data(blt_pred_cpy);
603  op_data.plist = &l;
604  my_aggregator.execute(&op_data);
605  }
606 
607  void extract_receiver() {
609  my_built_predecessors.receiver_extract(*this);
610  }
611 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
612 
613  /*override*/void reset_receiver(reset_flags f) {
614  tbb::internal::suppress_unused_warning(f);
616 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
617  if (f & rf_clear_edges)
618  my_built_predecessors.clear();
619 #endif
620  }
621 
622  private:
623  forwarding_base *my_join;
624 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
625  edge_container<predecessor_type> my_built_predecessors;
626 #endif
627  }; // queueing_port
628 
629 #include "_flow_graph_tagged_buffer_impl.h"
630 
631  template<typename K>
632  struct count_element {
633  K my_key;
634  size_t my_value;
635  };
636 
637  // method to access the key in the counting table
638  // the ref has already been removed from K
639  template< typename K >
642  const K& operator()(const table_item_type& v) { return v.my_key; }
643  };
644 
645  // the ports can have only one template parameter. We wrap the types needed in
646  // a traits type
647  template< class TraitsType >
649  public receiver<typename TraitsType::T>,
650  public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
651  typename TraitsType::KHash > {
652  public:
653  typedef TraitsType traits;
655  typedef typename TraitsType::T input_type;
656  typedef typename TraitsType::K key_type;
657  typedef typename tbb::internal::strip<key_type>::type noref_key_type;
658  typedef typename receiver<input_type>::predecessor_type predecessor_type;
659  typedef typename TraitsType::TtoK type_to_key_func_type;
660  typedef typename TraitsType::KHash hash_compare_type;
662 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
663  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
664  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
665 #endif
666  private:
667 // ----------- Aggregator ------------
668  private:
669  enum op_type { try__put, get__item, res_port
670 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
671  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
672 #endif
673  };
674  enum op_stat {WAIT=0, SUCCEEDED, FAILED};
675 
676  class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
677  public:
678  char type;
679  input_type my_val;
680  input_type *my_arg;
681 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
682  predecessor_type *pred;
683  size_t cnt_val;
684  predecessor_list_type *plist;
685 #endif
686  // constructor for value parameter
687  key_matching_port_operation(const input_type& e, op_type t) :
688  type(char(t)), my_val(e) {}
689  // constructor for pointer parameter
690  key_matching_port_operation(const input_type* p, op_type t) :
691  type(char(t)), my_arg(const_cast<input_type*>(p)) {}
692  // constructor with no parameter
693  key_matching_port_operation(op_type t) : type(char(t)) {}
694  };
695 
696  typedef internal::aggregating_functor<class_type, key_matching_port_operation> handler_type;
697  friend class internal::aggregating_functor<class_type, key_matching_port_operation>;
698  aggregator<handler_type, key_matching_port_operation> my_aggregator;
699 
700  void handle_operations(key_matching_port_operation* op_list) {
701  key_matching_port_operation *current;
702  while(op_list) {
703  current = op_list;
704  op_list = op_list->next;
705  switch(current->type) {
706  case try__put: {
707  bool was_inserted = this->insert_with_key(current->my_val);
708  // return failure if a duplicate insertion occurs
709  __TBB_store_with_release(current->status, was_inserted ? SUCCEEDED : FAILED);
710  }
711  break;
712  case get__item:
713  // use current_key from FE for item
714  if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
715  __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
716  }
717  __TBB_store_with_release(current->status, SUCCEEDED);
718  break;
719  case res_port:
720  // use current_key from FE for item
721  this->delete_with_key(my_join->current_key);
722  __TBB_store_with_release(current->status, SUCCEEDED);
723  break;
724 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
725  case add_blt_pred:
726  my_built_predecessors.add_edge(*(current->pred));
727  __TBB_store_with_release(current->status, SUCCEEDED);
728  break;
729  case del_blt_pred:
730  my_built_predecessors.delete_edge(*(current->pred));
731  __TBB_store_with_release(current->status, SUCCEEDED);
732  break;
733  case blt_pred_cnt:
734  current->cnt_val = my_built_predecessors.edge_count();
735  __TBB_store_with_release(current->status, SUCCEEDED);
736  break;
737  case blt_pred_cpy:
738  my_built_predecessors.copy_edges(*(current->plist));
739  __TBB_store_with_release(current->status, SUCCEEDED);
740  break;
741 #endif
742  }
743  }
744  }
745 // ------------ End Aggregator ---------------
746  protected:
747  template< typename R, typename B > friend class run_and_put_task;
748  template<typename X, typename Y> friend class internal::broadcast_cache;
749  template<typename X, typename Y> friend class internal::round_robin_cache;
750  /*override*/task *try_put_task(const input_type& v) {
751  key_matching_port_operation op_data(v, try__put);
752  task *rtask = NULL;
753  my_aggregator.execute(&op_data);
754  if(op_data.status == SUCCEEDED) {
755  rtask = my_join->increment_key_count((*(this->get_key_func()))(v), false); // may spawn
756  // rtask has to reflect the return status of the try_put
757  if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
758  }
759  return rtask;
760  }
761 
762  public:
763 
764  key_matching_port() : receiver<input_type>(), buffer_type() {
765  my_join = NULL;
766  my_aggregator.initialize_handler(handler_type(this));
767  }
768 
769  // copy constructor
770  key_matching_port(const key_matching_port& /*other*/) : receiver<input_type>(), buffer_type() {
771  my_join = NULL;
772  my_aggregator.initialize_handler(handler_type(this));
773  }
774 
775  ~key_matching_port() { }
776 
777  void set_join_node_pointer(forwarding_base *join) {
778  my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
779  }
780 
781  void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
782 
783  type_to_key_func_type* get_my_key_func() { return this->get_key_func(); }
784 
785  bool get_item( input_type &v ) {
786  // aggregator uses current_key from FE for Key
787  key_matching_port_operation op_data(&v, get__item);
788  my_aggregator.execute(&op_data);
789  return op_data.status == SUCCEEDED;
790  }
791 
792 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
793  /*override*/built_predecessors_type &built_predecessors() { return my_built_predecessors; }
794 
795  /*override*/void internal_add_built_predecessor(predecessor_type &p) {
796  key_matching_port_operation op_data(add_blt_pred);
797  op_data.pred = &p;
798  my_aggregator.execute(&op_data);
799  }
800 
801  /*override*/void internal_delete_built_predecessor(predecessor_type &p) {
802  key_matching_port_operation op_data(del_blt_pred);
803  op_data.pred = &p;
804  my_aggregator.execute(&op_data);
805  }
806 
807  /*override*/size_t predecessor_count() {
808  key_matching_port_operation op_data(blt_pred_cnt);
809  my_aggregator.execute(&op_data);
810  return op_data.cnt_val;
811  }
812 
813  /*override*/void copy_predecessors(predecessor_list_type &l) {
814  key_matching_port_operation op_data(blt_pred_cpy);
815  op_data.plist = &l;
816  my_aggregator.execute(&op_data);
817  }
818 #endif
819 
820  // reset_port is called when item is accepted by successor, but
821  // is initiated by join_node.
822  void reset_port() {
823  key_matching_port_operation op_data(res_port);
824  my_aggregator.execute(&op_data);
825  return;
826  }
827 
828 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
829  void extract_receiver() {
830  buffer_type::reset();
831  my_built_predecessors.receiver_extract(*this);
832  }
833 #endif
834  /*override*/void reset_receiver(reset_flags f ) {
835  tbb::internal::suppress_unused_warning(f);
836  buffer_type::reset();
837 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
838  if (f & rf_clear_edges)
839  my_built_predecessors.clear();
840 #endif
841  }
842 
843  private:
844  // my_join forwarding base used to count number of inputs that
845  // received key.
847 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
848  edge_container<predecessor_type> my_built_predecessors;
849 #endif
850  }; // key_matching_port
851 
852  using namespace graph_policy_namespace;
853 
854  template<typename JP, typename InputTuple, typename OutputTuple>
856 
858  template<typename JP, typename InputTuple, typename OutputTuple>
860 
861  template<typename InputTuple, typename OutputTuple>
862  class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
863  public:
864  static const int N = tbb::flow::tuple_size<OutputTuple>::value;
865  typedef OutputTuple output_type;
866  typedef InputTuple input_type;
868 
869  join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
870  ports_with_no_inputs = N;
871  join_helper<N>::set_join_node_pointer(my_inputs, this);
872  }
873 
874  join_node_FE(const join_node_FE& other) : forwarding_base(*(other.forwarding_base::graph_pointer)), my_node(NULL) {
875  ports_with_no_inputs = N;
876  join_helper<N>::set_join_node_pointer(my_inputs, this);
877  }
878 
879  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
880 
881  void increment_port_count() {
882  ++ports_with_no_inputs;
883  }
884 
885  // if all input_ports have predecessors, spawn forward to try and consume tuples
886  task * decrement_port_count(bool handle_task) {
887  if(ports_with_no_inputs.fetch_and_decrement() == 1) {
888  if(this->graph_pointer->is_active()) {
889  task *rtask = new ( task::allocate_additional_child_of( *(this->graph_pointer->root_task()) ) )
891  if(!handle_task) return rtask;
892  FLOW_SPAWN(*rtask);
893  }
894  }
895  return NULL;
896  }
897 
898  input_type &input_ports() { return my_inputs; }
899 
900  protected:
901 
902  void reset( reset_flags f) {
903  // called outside of parallel contexts
904  ports_with_no_inputs = N;
905  join_helper<N>::reset_inputs(my_inputs, f);
906  }
907 
908 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
909  void extract( ) {
910  // called outside of parallel contexts
911  ports_with_no_inputs = N;
913  }
914 #endif
915 
916  // all methods on input ports should be called under mutual exclusion from join_node_base.
917 
918  bool tuple_build_may_succeed() {
919  return !ports_with_no_inputs;
920  }
921 
922  bool try_to_make_tuple(output_type &out) {
923  if(ports_with_no_inputs) return false;
924  return join_helper<N>::reserve(my_inputs, out);
925  }
926 
927  void tuple_accepted() {
929  }
930  void tuple_rejected() {
932  }
933 
934  input_type my_inputs;
935  base_node_type *my_node;
936  atomic<size_t> ports_with_no_inputs;
937  }; // join_node_FE<reserving, ... >
938 
939  template<typename InputTuple, typename OutputTuple>
940  class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
941  public:
942  static const int N = tbb::flow::tuple_size<OutputTuple>::value;
943  typedef OutputTuple output_type;
944  typedef InputTuple input_type;
946 
947  join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
948  ports_with_no_items = N;
949  join_helper<N>::set_join_node_pointer(my_inputs, this);
950  }
951 
952  join_node_FE(const join_node_FE& other) : forwarding_base(*(other.forwarding_base::graph_pointer)), my_node(NULL) {
953  ports_with_no_items = N;
954  join_helper<N>::set_join_node_pointer(my_inputs, this);
955  }
956 
957  // needed for forwarding
958  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
959 
960  void reset_port_count() {
961  ports_with_no_items = N;
962  }
963 
964  // if all input_ports have items, spawn forward to try and consume tuples
965  task * decrement_port_count(bool handle_task)
966  {
967  if(ports_with_no_items.fetch_and_decrement() == 1) {
968  if(this->graph_pointer->is_active()) {
969  task *rtask = new ( task::allocate_additional_child_of( *(this->graph_pointer->root_task()) ) )
971  if(!handle_task) return rtask;
972  FLOW_SPAWN( *rtask);
973  }
974  }
975  return NULL;
976  }
977 
978  void increment_port_count() { __TBB_ASSERT(false, NULL); } // should never be called
979 
980  input_type &input_ports() { return my_inputs; }
981 
982  protected:
983 
984  void reset( reset_flags f) {
985  reset_port_count();
986  join_helper<N>::reset_inputs(my_inputs, f );
987  }
988 
989 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
990  void extract() {
991  reset_port_count();
993  }
994 #endif
995  // all methods on input ports should be called under mutual exclusion from join_node_base.
996 
997  bool tuple_build_may_succeed() {
998  return !ports_with_no_items;
999  }
1000 
1001  bool try_to_make_tuple(output_type &out) {
1002  if(ports_with_no_items) return false;
1003  return join_helper<N>::get_items(my_inputs, out);
1004  }
1005 
1006  void tuple_accepted() {
1007  reset_port_count();
1008  join_helper<N>::reset_ports(my_inputs);
1009  }
1010  void tuple_rejected() {
1011  // nothing to do.
1012  }
1013 
1014  input_type my_inputs;
1015  base_node_type *my_node;
1016  atomic<size_t> ports_with_no_items;
1017  }; // join_node_FE<queueing, ...>
1018 
1019  // key_matching join front-end.
1020  template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
1021  class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
1022  // buffer of key value counts
1023  public hash_buffer< // typedefed below to key_to_count_buffer_type
1024  typename tbb::internal::strip<K>::type&, // force ref type on K
1025  count_element<typename tbb::internal::strip<K>::type>,
1026  internal::type_to_key_function_body<
1027  count_element<typename tbb::internal::strip<K>::type>,
1028  typename tbb::internal::strip<K>::type& >,
1029  KHash >,
1030  // buffer of output items
1031  public item_buffer<OutputTuple> {
1032  public:
1033  static const int N = tbb::flow::tuple_size<OutputTuple>::value;
1034  typedef OutputTuple output_type;
1035  typedef InputTuple input_type;
1036  typedef K key_type;
1037  typedef typename tbb::internal::strip<key_type>::type unref_key_type;
1038  typedef KHash key_hash_compare;
1039  // must use K without ref.
1041  // method that lets us refer to the key of this type.
1045  // this is the type of the special table that keeps track of the number of discrete
1046  // elements corresponding to each key that we've seen.
1050  typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
1052 
1053 // ----------- Aggregator ------------
1054  // the aggregator is only needed to serialize the access to the hash table.
1055  // and the output_buffer_type base class
1056  private:
1057  enum op_type { res_count, inc_count, may_succeed, try_make };
1058  enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1059  typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type;
1060 
1061  class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
1062  public:
1063  char type;
1064  unref_key_type my_val;
1065  output_type* my_output;
1066  task *bypass_t;
1067  bool enqueue_task;
1068  // constructor for value parameter
1069  key_matching_FE_operation(const unref_key_type& e , bool q_task , op_type t) : type(char(t)), my_val(e),
1070  my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1071  key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(NULL),
1072  enqueue_task(true) {}
1073  // constructor with no parameter
1074  key_matching_FE_operation(op_type t) : type(char(t)), my_output(NULL), bypass_t(NULL), enqueue_task(true) {}
1075  };
1076 
1077  typedef internal::aggregating_functor<class_type, key_matching_FE_operation> handler_type;
1078  friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1079  aggregator<handler_type, key_matching_FE_operation> my_aggregator;
1080 
1081  // called from aggregator, so serialized
1082  // returns a task pointer if the a task would have been enqueued but we asked that
1083  // it be returned. Otherwise returns NULL.
1084  task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task) {
1085  output_type l_out;
1086  task *rtask = NULL;
1087  bool do_fwd = should_enqueue && this->buffer_empty() && this->graph_pointer->is_active();
1088  this->current_key = t;
1089  this->delete_with_key(this->current_key); // remove the key
1090  if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back
1091  this->push_back(l_out);
1092  if(do_fwd) { // we enqueue if receiving an item from predecessor, not if successor asks for item
1093  rtask = new ( task::allocate_additional_child_of( *(this->graph_pointer->root_task()) ) )
1095  if(handle_task) {
1096  FLOW_SPAWN(*rtask);
1097  rtask = NULL;
1098  }
1099  do_fwd = false;
1100  }
1101  // retire the input values
1102  join_helper<N>::reset_ports(my_inputs); // <== call back
1103  }
1104  else {
1105  __TBB_ASSERT(false, "should have had something to push");
1106  }
1107  return rtask;
1108  }
1109 
1110  void handle_operations(key_matching_FE_operation* op_list) {
1111  key_matching_FE_operation *current;
1112  while(op_list) {
1113  current = op_list;
1114  op_list = op_list->next;
1115  switch(current->type) {
1116  case res_count: // called from BE
1117  {
1118  this->destroy_front();
1119  __TBB_store_with_release(current->status, SUCCEEDED);
1120  }
1121  break;
1122  case inc_count: { // called from input ports
1123  count_element_type *p = 0;
1124  unref_key_type &t = current->my_val;
1125  bool do_enqueue = current->enqueue_task;
1126  if(!(this->find_ref_with_key(t,p))) {
1127  count_element_type ev;
1128  ev.my_key = t;
1129  ev.my_value = 0;
1130  this->insert_with_key(ev);
1131  if(!(this->find_ref_with_key(t,p))) {
1132  __TBB_ASSERT(false, "should find key after inserting it");
1133  }
1134  }
1135  if(++(p->my_value) == size_t(N)) {
1136  task *rtask = fill_output_buffer(t, true, do_enqueue);
1137  __TBB_ASSERT(!rtask || !do_enqueue, "task should not be returned");
1138  current->bypass_t = rtask;
1139  }
1140  }
1141  __TBB_store_with_release(current->status, SUCCEEDED);
1142  break;
1143  case may_succeed: // called from BE
1144  __TBB_store_with_release(current->status, this->buffer_empty() ? FAILED : SUCCEEDED);
1145  break;
1146  case try_make: // called from BE
1147  if(this->buffer_empty()) {
1148  __TBB_store_with_release(current->status, FAILED);
1149  }
1150  else {
1151  *(current->my_output) = this->front();
1152  __TBB_store_with_release(current->status, SUCCEEDED);
1153  }
1154  break;
1155  }
1156  }
1157  }
1158 // ------------ End Aggregator ---------------
1159 
1160  public:
1161  template<typename FunctionTuple>
1162  join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1163  join_helper<N>::set_join_node_pointer(my_inputs, this);
1164  join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
1165  my_aggregator.initialize_handler(handler_type(this));
1166  TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1167  this->set_key_func(cfb);
1168  }
1169 
1170  join_node_FE(const join_node_FE& other) : forwarding_base_type(*(other.forwarding_base_type::graph_pointer)), key_to_count_buffer_type(),
1171  output_buffer_type() {
1172  my_node = NULL;
1173  join_helper<N>::set_join_node_pointer(my_inputs, this);
1174  join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
1175  my_aggregator.initialize_handler(handler_type(this));
1176  TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1177  this->set_key_func(cfb);
1178  }
1179 
1180  // needed for forwarding
1181  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1182 
1183  void reset_port_count() { // called from BE
1184  key_matching_FE_operation op_data(res_count);
1185  my_aggregator.execute(&op_data);
1186  return;
1187  }
1188 
1189  // if all input_ports have items, spawn forward to try and consume tuples
1190  // return a task if we are asked and did create one.
1191  /*override*/ task *increment_key_count(unref_key_type const & t, bool handle_task) { // called from input_ports
1192  key_matching_FE_operation op_data(t, handle_task, inc_count);
1193  my_aggregator.execute(&op_data);
1194  return op_data.bypass_t;
1195  }
1196 
1197  /*override*/ task *decrement_port_count(bool /*handle_task*/) { __TBB_ASSERT(false, NULL); return NULL; }
1198 
1199  void increment_port_count() { __TBB_ASSERT(false, NULL); } // should never be called
1200 
1201  input_type &input_ports() { return my_inputs; }
1202 
1203  protected:
1204 
1205  void reset( reset_flags f ) {
1206  // called outside of parallel contexts
1207  join_helper<N>::reset_inputs(my_inputs, f);
1208 
1209  key_to_count_buffer_type::reset();
1210  output_buffer_type::reset();
1211  }
1212 
1213 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1214  void extract() {
1215  // called outside of parallel contexts
1216  join_helper<N>::extract_inputs(my_inputs);
1217  key_to_count_buffer_type::reset(); // have to reset the tag counts
1218  output_buffer_type::reset(); // also the queue of outputs
1219  // my_node->current_tag = NO_TAG;
1220  }
1221 #endif
1222  // all methods on input ports should be called under mutual exclusion from join_node_base.
1223 
1224  bool tuple_build_may_succeed() { // called from back-end
1225  key_matching_FE_operation op_data(may_succeed);
1226  my_aggregator.execute(&op_data);
1227  return op_data.status == SUCCEEDED;
1228  }
1229 
1230  // cannot lock while calling back to input_ports. current_key will only be set
1231  // and reset under the aggregator, so it will remain consistent.
1232  bool try_to_make_tuple(output_type &out) {
1233  key_matching_FE_operation op_data(&out,try_make);
1234  my_aggregator.execute(&op_data);
1235  return op_data.status == SUCCEEDED;
1236  }
1237 
1238  void tuple_accepted() {
1239  reset_port_count(); // reset current_key after ports reset.
1240  }
1241 
1242  void tuple_rejected() {
1243  // nothing to do.
1244  }
1245 
1246  input_type my_inputs; // input ports
1247  base_node_type *my_node;
1248  }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1249 
1251  template<typename JP, typename InputTuple, typename OutputTuple>
1252  class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1253  public sender<OutputTuple> {
1254  protected:
1255  using graph_node::my_graph;
1256  public:
1257  typedef OutputTuple output_type;
1258 
1259  typedef typename sender<output_type>::successor_type successor_type;
1260  typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
1261  using input_ports_type::tuple_build_may_succeed;
1262  using input_ports_type::try_to_make_tuple;
1263  using input_ports_type::tuple_accepted;
1264  using input_ports_type::tuple_rejected;
1265 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1266  typedef typename sender<output_type>::built_successors_type built_successors_type;
1267  typedef typename sender<output_type>::successor_list_type successor_list_type;
1268 #endif
1269 
1270  private:
1271  // ----------- Aggregator ------------
1272  enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1273 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1274  , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1275 #endif
1276  };
1277  enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1279 
1280  class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1281  public:
1282  char type;
1283  union {
1284  output_type *my_arg;
1285  successor_type *my_succ;
1286 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1287  size_t cnt_val;
1288  successor_list_type *slist;
1289 #endif
1290  };
1291  task *bypass_t;
1292  join_node_base_operation(const output_type& e, op_type t) : type(char(t)),
1293  my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1294  join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
1295  my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1296  join_node_base_operation(op_type t) : type(char(t)), bypass_t(NULL) {}
1297  };
1298 
1299  typedef internal::aggregating_functor<class_type, join_node_base_operation> handler_type;
1300  friend class internal::aggregating_functor<class_type, join_node_base_operation>;
1301  bool forwarder_busy;
1302  aggregator<handler_type, join_node_base_operation> my_aggregator;
1303 
1304  void handle_operations(join_node_base_operation* op_list) {
1305  join_node_base_operation *current;
1306  while(op_list) {
1307  current = op_list;
1308  op_list = op_list->next;
1309  switch(current->type) {
1310  case reg_succ: {
1311  my_successors.register_successor(*(current->my_succ));
1312  if(tuple_build_may_succeed() && !forwarder_busy && this->graph_node::my_graph.is_active()) {
1313  task *rtask = new ( task::allocate_additional_child_of(*(this->graph_node::my_graph.root_task())) )
1316  FLOW_SPAWN(*rtask);
1317  forwarder_busy = true;
1318  }
1319  __TBB_store_with_release(current->status, SUCCEEDED);
1320  }
1321  break;
1322  case rem_succ:
1323  my_successors.remove_successor(*(current->my_succ));
1324  __TBB_store_with_release(current->status, SUCCEEDED);
1325  break;
1326  case try__get:
1327  if(tuple_build_may_succeed()) {
1328  if(try_to_make_tuple(*(current->my_arg))) {
1329  tuple_accepted();
1330  __TBB_store_with_release(current->status, SUCCEEDED);
1331  }
1332  else __TBB_store_with_release(current->status, FAILED);
1333  }
1334  else __TBB_store_with_release(current->status, FAILED);
1335  break;
1336  case do_fwrd_bypass: {
1337  bool build_succeeded;
1338  task *last_task = NULL;
1339  output_type out;
1340  if(tuple_build_may_succeed()) { // checks output queue of FE
1341  do {
1342  build_succeeded = try_to_make_tuple(out); // fetch front_end of queue
1343  if(build_succeeded) {
1344  task *new_task = my_successors.try_put_task(out);
1345  last_task = combine_tasks(last_task, new_task);
1346  if(new_task) {
1347  tuple_accepted();
1348  }
1349  else {
1350  tuple_rejected();
1351  build_succeeded = false;
1352  }
1353  }
1354  } while(build_succeeded);
1355  }
1356  current->bypass_t = last_task;
1357  __TBB_store_with_release(current->status, SUCCEEDED);
1358  forwarder_busy = false;
1359  }
1360  break;
1361 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1362  case add_blt_succ:
1363  my_successors.internal_add_built_successor(*(current->my_succ));
1364  __TBB_store_with_release(current->status, SUCCEEDED);
1365  break;
1366  case del_blt_succ:
1367  my_successors.internal_delete_built_successor(*(current->my_succ));
1368  __TBB_store_with_release(current->status, SUCCEEDED);
1369  break;
1370  case blt_succ_cnt:
1371  current->cnt_val = my_successors.successor_count();
1372  __TBB_store_with_release(current->status, SUCCEEDED);
1373  break;
1374  case blt_succ_cpy:
1375  my_successors.copy_successors(*(current->slist));
1376  __TBB_store_with_release(current->status, SUCCEEDED);
1377  break;
1378 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1379  }
1380  }
1381  }
1382  // ---------- end aggregator -----------
1383  public:
1384  join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1385  my_successors.set_owner(this);
1386  input_ports_type::set_my_node(this);
1387  my_aggregator.initialize_handler(handler_type(this));
1388  }
1389 
1390  join_node_base(const join_node_base& other) :
1391  graph_node(other.graph_node::my_graph), input_ports_type(other),
1392  sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1393  my_successors.set_owner(this);
1394  input_ports_type::set_my_node(this);
1395  my_aggregator.initialize_handler(handler_type(this));
1396  }
1397 
1398  template<typename FunctionTuple>
1399  join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1400  my_successors.set_owner(this);
1401  input_ports_type::set_my_node(this);
1402  my_aggregator.initialize_handler(handler_type(this));
1403  }
1404 
1405  bool register_successor(successor_type &r) {
1406  join_node_base_operation op_data(r, reg_succ);
1407  my_aggregator.execute(&op_data);
1408  return op_data.status == SUCCEEDED;
1409  }
1410 
1411  bool remove_successor( successor_type &r) {
1412  join_node_base_operation op_data(r, rem_succ);
1413  my_aggregator.execute(&op_data);
1414  return op_data.status == SUCCEEDED;
1415  }
1416 
1417  bool try_get( output_type &v) {
1418  join_node_base_operation op_data(v, try__get);
1419  my_aggregator.execute(&op_data);
1420  return op_data.status == SUCCEEDED;
1421  }
1422 
1423 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1424  /*override*/built_successors_type &built_successors() { return my_successors.built_successors(); }
1425 
1426  /*override*/void internal_add_built_successor( successor_type &r) {
1427  join_node_base_operation op_data(r, add_blt_succ);
1428  my_aggregator.execute(&op_data);
1429  }
1430 
1431  /*override*/void internal_delete_built_successor( successor_type &r) {
1432  join_node_base_operation op_data(r, del_blt_succ);
1433  my_aggregator.execute(&op_data);
1434  }
1435 
1436  /*override*/size_t successor_count() {
1437  join_node_base_operation op_data(blt_succ_cnt);
1438  my_aggregator.execute(&op_data);
1439  return op_data.cnt_val;
1440  }
1441 
1442  /*override*/ void copy_successors(successor_list_type &l) {
1443  join_node_base_operation op_data(blt_succ_cpy);
1444  op_data.slist = &l;
1445  my_aggregator.execute(&op_data);
1446  }
1447 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1448 
1449 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1450  /*override*/void extract() {
1451  input_ports_type::extract();
1452  my_successors.built_successors().sender_extract(*this);
1453  }
1454 #endif
1455 
1456  protected:
1457 
1458  /*override*/void reset_node(reset_flags f) {
1459  input_ports_type::reset(f);
1460  if(f & rf_clear_edges) my_successors.clear();
1461  }
1462 
1463  private:
1465 
1466  friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1467  task *forward_task() {
1468  join_node_base_operation op_data(do_fwrd_bypass);
1469  my_aggregator.execute(&op_data);
1470  return op_data.bypass_t;
1471  }
1472 
1473  }; // join_node_base
1474 
1475  // join base class type generator
1476  template<int N, template<class> class PT, typename OutputTuple, typename JP>
1477  struct join_base {
1479  };
1480 
1481  template<int N, typename OutputTuple, typename K, typename KHash>
1482  struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1484  typedef K key_type;
1485  typedef KHash key_hash_compare;
1486  typedef typename internal::join_node_base< key_traits_type,
1487  // ports type
1489  OutputTuple > type;
1490  };
1491 
1493  // using tuple_element. The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1494  // and should match the typename.
1495 
1496  template<int N, template<class> class PT, typename OutputTuple, typename JP>
1497  class unfolded_join_node : public join_base<N,PT,OutputTuple,JP>::type {
1498  public:
1500  typedef OutputTuple output_type;
1501  private:
1503  public:
1504  unfolded_join_node(graph &g) : base_type(g) {}
1505  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1506  };
1507 
1508 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1509  template <typename K, typename T>
1510  struct key_from_message_body {
1511  K operator()(const T& t) const {
1512  using tbb::flow::key_from_message;
1513  return key_from_message<K>(t);
1514  }
1515  };
1516  // Adds const to reference type
1517  template <typename K, typename T>
1518  struct key_from_message_body<K&,T> {
1519  const K& operator()(const T& t) const {
1520  using tbb::flow::key_from_message;
1521  return key_from_message<const K&>(t);
1522  }
1523  };
1524 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1525  // key_matching unfolded_join_node. This must be a separate specialization because the constructors
1526  // differ.
1527 
1528  template<typename OutputTuple, typename K, typename KHash>
1529  class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1530  join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1531  typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1532  typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1533  public:
1535  typedef OutputTuple output_type;
1536  private:
1537  typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1540  typedef typename tbb::flow::tuple< f0_p, f1_p > func_initializer_type;
1541  public:
1542 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1543  unfolded_join_node(graph &g) : base_type(g,
1544  func_initializer_type(
1545  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1546  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1547  ) ) {
1548  }
1549 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1550  template<typename Body0, typename Body1>
1551  unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1552  func_initializer_type(
1555  ) ) {
1556  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1557  }
1558  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1559  };
1560 
1561  template<typename OutputTuple, typename K, typename KHash>
1562  class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1563  join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1564  typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1565  typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1566  typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1567  public:
1569  typedef OutputTuple output_type;
1570  private:
1571  typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1575  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1576  public:
1577 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1578  unfolded_join_node(graph &g) : base_type(g,
1579  func_initializer_type(
1580  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1581  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1582  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1583  ) ) {
1584  }
1585 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1586  template<typename Body0, typename Body1, typename Body2>
1587  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1588  func_initializer_type(
1592  ) ) {
1593  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1594  }
1595  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1596  };
1597 
1598  template<typename OutputTuple, typename K, typename KHash>
1599  class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1600  join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1601  typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1602  typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1603  typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1604  typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1605  public:
1607  typedef OutputTuple output_type;
1608  private:
1609  typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1614  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1615  public:
1616 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1617  unfolded_join_node(graph &g) : base_type(g,
1618  func_initializer_type(
1619  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1620  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1621  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1622  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1623  ) ) {
1624  }
1625 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1626  template<typename Body0, typename Body1, typename Body2, typename Body3>
1627  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1628  func_initializer_type(
1633  ) ) {
1634  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1635  }
1636  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1637  };
1638 
1639  template<typename OutputTuple, typename K, typename KHash>
1640  class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1641  join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1642  typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1643  typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1644  typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1645  typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1646  typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1647  public:
1649  typedef OutputTuple output_type;
1650  private:
1651  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1657  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1658  public:
1659 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1660  unfolded_join_node(graph &g) : base_type(g,
1661  func_initializer_type(
1662  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1663  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1664  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1665  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1666  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1667  ) ) {
1668  }
1669 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1670  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1671  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1672  func_initializer_type(
1678  ) ) {
1679  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1680  }
1681  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1682  };
1683 
1684 #if __TBB_VARIADIC_MAX >= 6
1685  template<typename OutputTuple, typename K, typename KHash>
1686  class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1687  join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1688  typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1689  typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1690  typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1691  typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1692  typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1693  typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1694  public:
1696  typedef OutputTuple output_type;
1697  private:
1698  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1704  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1705  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1706  public:
1707 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1708  unfolded_join_node(graph &g) : base_type(g,
1709  func_initializer_type(
1710  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1711  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1712  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1713  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1714  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1715  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1716  ) ) {
1717  }
1718 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1719  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1720  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1721  : base_type(g, func_initializer_type(
1728  ) ) {
1729  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1730  }
1731  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1732  };
1733 #endif
1734 
1735 #if __TBB_VARIADIC_MAX >= 7
1736  template<typename OutputTuple, typename K, typename KHash>
1737  class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1738  join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1739  typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1740  typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1741  typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1742  typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1743  typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1744  typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1745  typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1746  public:
1748  typedef OutputTuple output_type;
1749  private:
1750  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1756  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1757  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1758  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1759  public:
1760 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1761  unfolded_join_node(graph &g) : base_type(g,
1762  func_initializer_type(
1763  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1764  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1765  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1766  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1767  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1768  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1769  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1770  ) ) {
1771  }
1772 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1773  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1774  typename Body5, typename Body6>
1775  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1776  Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1784  ) ) {
1785  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1786  }
1787  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1788  };
1789 #endif
1790 
1791 #if __TBB_VARIADIC_MAX >= 8
1792  template<typename OutputTuple, typename K, typename KHash>
1793  class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1794  join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1795  typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1796  typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1797  typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1798  typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1799  typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1800  typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1801  typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1802  typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1803  public:
1805  typedef OutputTuple output_type;
1806  private:
1807  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1813  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1814  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1815  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1816  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1817  public:
1818 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1819  unfolded_join_node(graph &g) : base_type(g,
1820  func_initializer_type(
1821  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1822  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1823  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1824  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1825  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1826  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1827  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1828  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1829  ) ) {
1830  }
1831 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1832  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1833  typename Body5, typename Body6, typename Body7>
1834  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1835  Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1844  ) ) {
1845  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1846  }
1847  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1848  };
1849 #endif
1850 
1851 #if __TBB_VARIADIC_MAX >= 9
1852  template<typename OutputTuple, typename K, typename KHash>
1853  class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1854  join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1855  typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1856  typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1857  typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1858  typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1859  typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1860  typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1861  typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1862  typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1863  typedef typename tbb::flow::tuple_element<8, OutputTuple>::type T8;
1864  public:
1866  typedef OutputTuple output_type;
1867  private:
1868  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1874  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1875  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1876  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1877  typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1878  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1879  public:
1880 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1881  unfolded_join_node(graph &g) : base_type(g,
1882  func_initializer_type(
1883  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1884  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1885  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1886  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1887  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1888  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1889  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1890  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1891  new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1892  ) ) {
1893  }
1894 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1895  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1896  typename Body5, typename Body6, typename Body7, typename Body8>
1897  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1898  Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1908  ) ) {
1909  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1910  }
1911  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1912  };
1913 #endif
1914 
1915 #if __TBB_VARIADIC_MAX >= 10
1916  template<typename OutputTuple, typename K, typename KHash>
1917  class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1918  join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1919  typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1920  typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1921  typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1922  typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1923  typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1924  typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1925  typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1926  typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1927  typedef typename tbb::flow::tuple_element<8, OutputTuple>::type T8;
1928  typedef typename tbb::flow::tuple_element<9, OutputTuple>::type T9;
1929  public:
1931  typedef OutputTuple output_type;
1932  private:
1933  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1939  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1940  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1941  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1942  typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1943  typedef typename internal::type_to_key_function_body<T9, K> *f9_p;
1944  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1945  public:
1946 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1947  unfolded_join_node(graph &g) : base_type(g,
1948  func_initializer_type(
1949  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1950  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1951  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1952  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1953  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1954  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1955  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1956  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1957  new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1958  new internal::type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1959  ) ) {
1960  }
1961 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1962  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1963  typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1964  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1965  Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1976  ) ) {
1977  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1978  }
1979  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1980  };
1981 #endif
1982 
1984  template<size_t N, typename JNT>
1985  typename tbb::flow::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1986  return tbb::flow::get<N>(jn.input_ports());
1987  }
1988 
1989 }
1990 #endif // __TBB__flow_graph_join_impl_H
1991 
reserving_port()
Constructor.
Definition: _flow_graph_join_impl.h:330
queueing_port(const queueing_port &)
copy constructor
Definition: _flow_graph_join_impl.h:556
Definition: _flow_graph_join_impl.h:30
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
Definition: _flow_graph_join_impl.h:1497
bool register_predecessor(predecessor_type &src)
Add a predecessor.
Definition: _flow_graph_join_impl.h:349
void consume()
Complete use of the port.
Definition: _flow_graph_join_impl.h:376
void release()
Release the port.
Definition: _flow_graph_join_impl.h:370
Definition: _flow_graph_join_impl.h:1477
queueing_port()
Constructor.
Definition: _flow_graph_join_impl.h:550
bool remove_predecessor(predecessor_type &src)
Remove a predecessor.
Definition: _flow_graph_join_impl.h:356
Definition: _flow_graph_types_impl.h:58
Definition: _flow_graph_impl.h:40
Definition: _flow_graph_impl.h:218
A cache of successors that are put in a round-robin fashion.
Definition: _flow_graph_impl.h:796
A task that calls a node&#39;s forward_task function.
Definition: _flow_graph_impl.h:255
join_node_FE : implements input port policy
Definition: _flow_graph_join_impl.h:859
Definition: _flow_graph_join_impl.h:640
Definition: _flow_graph_join_impl.h:53
A cache of successors that are broadcast to.
Definition: _flow_graph_impl.h:751
bool reserve(T &v)
Reserve an item from the port.
Definition: _flow_graph_join_impl.h:363
The two-phase join port.
Definition: _flow_graph_join_impl.h:213
Definition: _flow_graph_async_msg_impl.h:32
Definition: _flow_graph_join_impl.h:632
queueing join_port
Definition: _flow_graph_join_impl.h:427
Definition: _flow_graph_impl.h:39
field of type K being used for matching.
Definition: _flow_graph_impl.h:46
Definition: _flow_graph_join_impl.h:648
Definition: _flow_graph_impl.h:201
Definition: _flow_graph_join_impl.h:52
void set_join_node_pointer(forwarding_base *join)
record parent for tallying available items
Definition: _flow_graph_join_impl.h:562
Definition: _flow_graph_types_impl.h:53
join_node_base
Definition: _flow_graph_join_impl.h:855
Definition: _flow_graph_item_buffer_impl.h:44
Definition: _flow_graph_join_impl.h:44