BRE12
_flow_graph_node_impl.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB__flow_graph_node_impl_H
22 #define __TBB__flow_graph_node_impl_H
23 
24 #ifndef __TBB_flow_graph_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
28 #include "_flow_graph_item_buffer_impl.h"
29 
31 namespace internal {
32 
36 
37  template< typename T, typename A >
38  class function_input_queue : public item_buffer<T,A> {
39  public:
40  bool pop( T& t ) {
41  return this->pop_front( t );
42  }
43 
44  bool push( T& t ) {
45  return this->push_back( t );
46  }
47  };
48 
50  // The only up-ref is apply_body_impl, which should implement the function
51  // call and any handling of the result.
52  template< typename Input, typename A, typename ImplType >
53  class function_input_base : public receiver<Input>, tbb::internal::no_assign {
54  enum op_stat {WAIT=0, SUCCEEDED, FAILED};
55  enum op_type {reg_pred, rem_pred, app_body, try_fwd, tryput_bypass, app_body_bypass
56 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
57  , add_blt_pred, del_blt_pred,
58  blt_pred_cnt, blt_pred_cpy // create vector copies of preds and succs
59 #endif
60  };
61  typedef function_input_base<Input, A, ImplType> class_type;
62 
63  public:
64 
66  typedef Input input_type;
67  typedef typename receiver<input_type>::predecessor_type predecessor_type;
68  typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
69  typedef function_input_queue<input_type, A> input_queue_type;
70  typedef typename A::template rebind< input_queue_type >::other queue_allocator_type;
71 
72 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
73  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
74  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
75 #endif
76 
78  function_input_base( graph &g, size_t max_concurrency, input_queue_type *q = NULL)
79  : my_graph(g), my_max_concurrency(max_concurrency), my_concurrency(0),
80  my_queue(q), forwarder_busy(false) {
81  my_predecessors.set_owner(this);
82  my_aggregator.initialize_handler(handler_type(this));
83  }
84 
86  function_input_base( const function_input_base& src, input_queue_type *q = NULL) :
87  receiver<Input>(), tbb::internal::no_assign(),
88  my_graph(src.my_graph), my_max_concurrency(src.my_max_concurrency),
89  my_concurrency(0), my_queue(q), forwarder_busy(false)
90  {
91  my_predecessors.set_owner(this);
92  my_aggregator.initialize_handler(handler_type(this));
93  }
94 
96  // The queue is allocated by the constructor for {multi}function_node.
97  // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
98  // This would be an interface-breaking change.
99  virtual ~function_input_base() {
100  if ( my_queue ) delete my_queue;
101  }
102 
104  virtual task * try_put_task( const input_type &t ) {
105  if ( my_max_concurrency == 0 ) {
106  return create_body_task( t );
107  } else {
108  operation_type op_data(t, tryput_bypass);
109  my_aggregator.execute(&op_data);
110  if(op_data.status == SUCCEEDED ) {
111  return op_data.bypass_t;
112  }
113  return NULL;
114  }
115  }
116 
118  /* override */ bool register_predecessor( predecessor_type &src ) {
119  operation_type op_data(reg_pred);
120  op_data.r = &src;
121  my_aggregator.execute(&op_data);
122  return true;
123  }
124 
126  /* override */ bool remove_predecessor( predecessor_type &src ) {
127  operation_type op_data(rem_pred);
128  op_data.r = &src;
129  my_aggregator.execute(&op_data);
130  return true;
131  }
132 
133 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
134  /*override*/ void internal_add_built_predecessor( predecessor_type &src) {
136  operation_type op_data(add_blt_pred);
137  op_data.r = &src;
138  my_aggregator.execute(&op_data);
139  }
140 
142  /*override*/ void internal_delete_built_predecessor( predecessor_type &src) {
143  operation_type op_data(del_blt_pred);
144  op_data.r = &src;
145  my_aggregator.execute(&op_data);
146  }
147 
148  /*override*/ size_t predecessor_count() {
149  operation_type op_data(blt_pred_cnt);
150  my_aggregator.execute(&op_data);
151  return op_data.cnt_val;
152  }
153 
154  /*override*/ void copy_predecessors(predecessor_list_type &v) {
155  operation_type op_data(blt_pred_cpy);
156  op_data.predv = &v;
157  my_aggregator.execute(&op_data);
158  }
159 
160  /*override*/built_predecessors_type &built_predecessors() {
161  return my_predecessors.built_predecessors();
162  }
163 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
164 
165  protected:
166 
167  void reset_function_input_base( reset_flags f) {
168  my_concurrency = 0;
169  if(my_queue) {
170  my_queue->reset();
171  }
172  reset_receiver(f);
173  forwarder_busy = false;
174  }
175 
176  graph& my_graph;
177  const size_t my_max_concurrency;
178  size_t my_concurrency;
179  input_queue_type *my_queue;
180  predecessor_cache<input_type, null_mutex > my_predecessors;
181 
182  /*override*/void reset_receiver( reset_flags f) {
183  if( f & rf_clear_edges) my_predecessors.clear();
184  else
185  my_predecessors.reset();
186  __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
187  }
188 
189  private:
190 
191  friend class apply_body_task_bypass< class_type, input_type >;
192  friend class forward_task_bypass< class_type >;
193 
194  class operation_type : public aggregated_operation< operation_type > {
195  public:
196  char type;
197  union {
198  input_type *elem;
199  predecessor_type *r;
200 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
201  size_t cnt_val;
202  predecessor_list_type *predv;
203 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
204  };
205  tbb::task *bypass_t;
206  operation_type(const input_type& e, op_type t) :
207  type(char(t)), elem(const_cast<input_type*>(&e)) {}
208  operation_type(op_type t) : type(char(t)), r(NULL) {}
209  };
210 
211  bool forwarder_busy;
212  typedef internal::aggregating_functor<class_type, operation_type> handler_type;
213  friend class internal::aggregating_functor<class_type, operation_type>;
214  aggregator< handler_type, operation_type > my_aggregator;
215 
216  void handle_operations(operation_type *op_list) {
217  operation_type *tmp;
218  while (op_list) {
219  tmp = op_list;
220  op_list = op_list->next;
221  switch (tmp->type) {
222  case reg_pred:
223  my_predecessors.add(*(tmp->r));
224  __TBB_store_with_release(tmp->status, SUCCEEDED);
225  if (!forwarder_busy) {
226  forwarder_busy = true;
227  spawn_forward_task();
228  }
229  break;
230  case rem_pred:
231  my_predecessors.remove(*(tmp->r));
232  __TBB_store_with_release(tmp->status, SUCCEEDED);
233  break;
234  case app_body:
235  __TBB_ASSERT(my_max_concurrency != 0, NULL);
236  --my_concurrency;
237  __TBB_store_with_release(tmp->status, SUCCEEDED);
238  if (my_concurrency<my_max_concurrency) {
239  input_type i;
240  bool item_was_retrieved = false;
241  if ( my_queue )
242  item_was_retrieved = my_queue->pop(i);
243  else
244  item_was_retrieved = my_predecessors.get_item(i);
245  if (item_was_retrieved) {
246  ++my_concurrency;
247  spawn_body_task(i);
248  }
249  }
250  break;
251  case app_body_bypass: {
252  task * new_task = NULL;
253  __TBB_ASSERT(my_max_concurrency != 0, NULL);
254  --my_concurrency;
255  if (my_concurrency<my_max_concurrency) {
256  input_type i;
257  bool item_was_retrieved = false;
258  if ( my_queue )
259  item_was_retrieved = my_queue->pop(i);
260  else
261  item_was_retrieved = my_predecessors.get_item(i);
262  if (item_was_retrieved) {
263  ++my_concurrency;
264  new_task = create_body_task(i);
265  }
266  }
267  tmp->bypass_t = new_task;
268  __TBB_store_with_release(tmp->status, SUCCEEDED);
269  }
270  break;
271  case tryput_bypass: internal_try_put_task(tmp); break;
272  case try_fwd: internal_forward(tmp); break;
273 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
274  case add_blt_pred: {
275  my_predecessors.internal_add_built_predecessor(*(tmp->r));
276  __TBB_store_with_release(tmp->status, SUCCEEDED);
277  }
278  break;
279  case del_blt_pred:
280  my_predecessors.internal_delete_built_predecessor(*(tmp->r));
281  __TBB_store_with_release(tmp->status, SUCCEEDED);
282  break;
283  case blt_pred_cnt:
284  tmp->cnt_val = my_predecessors.predecessor_count();
285  __TBB_store_with_release(tmp->status, SUCCEEDED);
286  break;
287  case blt_pred_cpy:
288  my_predecessors.copy_predecessors( *(tmp->predv) );
289  __TBB_store_with_release(tmp->status, SUCCEEDED);
290  break;
291 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
292  }
293  }
294  }
295 
297  void internal_try_put_task(operation_type *op) {
298  __TBB_ASSERT(my_max_concurrency != 0, NULL);
299  if (my_concurrency < my_max_concurrency) {
300  ++my_concurrency;
301  task * new_task = create_body_task(*(op->elem));
302  op->bypass_t = new_task;
303  __TBB_store_with_release(op->status, SUCCEEDED);
304  } else if ( my_queue && my_queue->push(*(op->elem)) ) {
305  op->bypass_t = SUCCESSFULLY_ENQUEUED;
306  __TBB_store_with_release(op->status, SUCCEEDED);
307  } else {
308  op->bypass_t = NULL;
309  __TBB_store_with_release(op->status, FAILED);
310  }
311  }
312 
314  void internal_forward(operation_type *op) {
315  op->bypass_t = NULL;
316  if (my_concurrency<my_max_concurrency || !my_max_concurrency) {
317  input_type i;
318  bool item_was_retrieved = false;
319  if ( my_queue )
320  item_was_retrieved = my_queue->pop(i);
321  else
322  item_was_retrieved = my_predecessors.get_item(i);
323  if (item_was_retrieved) {
324  ++my_concurrency;
325  op->bypass_t = create_body_task(i);
326  __TBB_store_with_release(op->status, SUCCEEDED);
327  return;
328  }
329  }
330  __TBB_store_with_release(op->status, FAILED);
331  forwarder_busy = false;
332  }
333 
335  // then decides if more work is available
336  task * apply_body_bypass( input_type &i ) {
337  task * new_task = static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
338  if ( my_max_concurrency != 0 ) {
339  operation_type op_data(app_body_bypass); // tries to pop an item or get_item, enqueues another apply_body
340  my_aggregator.execute(&op_data);
341  tbb::task *ttask = op_data.bypass_t;
342  new_task = combine_tasks(new_task, ttask);
343  }
344  return new_task;
345  }
346 
348  inline task * create_body_task( const input_type &input ) {
349 
350  return (my_graph.is_active()) ?
351  new(task::allocate_additional_child_of(*(my_graph.root_task())))
352  apply_body_task_bypass < class_type, input_type >(*this, input) :
353  NULL;
354  }
355 
357  inline void spawn_body_task( const input_type &input ) {
358  task* tp = create_body_task(input);
359  // tp == NULL => g.reset(), which shouldn't occur in concurrent context
360  if(tp) {
361  FLOW_SPAWN(*tp);
362  }
363  }
364 
366  task *forward_task() {
367  operation_type op_data(try_fwd);
368  task *rval = NULL;
369  do {
370  op_data.status = WAIT;
371  my_aggregator.execute(&op_data);
372  if(op_data.status == SUCCEEDED) {
373  tbb::task *ttask = op_data.bypass_t;
374  rval = combine_tasks(rval, ttask);
375  }
376  } while (op_data.status == SUCCEEDED);
377  return rval;
378  }
379 
380  inline task *create_forward_task() {
381  return (my_graph.is_active()) ?
382  new(task::allocate_additional_child_of(*(my_graph.root_task()))) forward_task_bypass< class_type >(*this) :
383  NULL;
384  }
385 
387  inline void spawn_forward_task() {
388  task* tp = create_forward_task();
389  if(tp) {
390  FLOW_SPAWN(*tp);
391  }
392  }
393  }; // function_input_base
394 
396  // a type Output to its successors.
397  template< typename Input, typename Output, typename A>
398  class function_input : public function_input_base<Input, A, function_input<Input,Output,A> > {
399  public:
400  typedef Input input_type;
401  typedef Output output_type;
402  typedef function_body<input_type, output_type> function_body_type;
403  typedef function_input<Input,Output,A> my_class;
404  typedef function_input_base<Input, A, my_class> base_type;
405  typedef function_input_queue<input_type, A> input_queue_type;
406 
407  // constructor
408  template<typename Body>
409  function_input( graph &g, size_t max_concurrency, Body& body, input_queue_type *q = NULL ) :
410  base_type(g, max_concurrency, q),
411  my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ),
412  my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) {
413  }
414 
416  function_input( const function_input& src, input_queue_type *q = NULL ) :
417  base_type(src, q),
418  my_body( src.my_init_body->clone() ),
419  my_init_body(src.my_init_body->clone() ) {
420  }
421 
422  ~function_input() {
423  delete my_body;
424  delete my_init_body;
425  }
426 
427  template< typename Body >
428  Body copy_function_object() {
429  function_body_type &body_ref = *this->my_body;
430  return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
431  }
432 
433  task * apply_body_impl_bypass( const input_type &i) {
434 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
435  // There is an extra copied needed to capture the
436  // body execution without the try_put
437  tbb::internal::fgt_begin_body( my_body );
438  output_type v = (*my_body)(i);
439  tbb::internal::fgt_end_body( my_body );
440  task * new_task = successors().try_put_task( v );
441 #else
442  task * new_task = successors().try_put_task( (*my_body)(i) );
443 #endif
444  return new_task;
445  }
446 
447  protected:
448 
449  void reset_function_input(reset_flags f) {
450  base_type::reset_function_input_base(f);
451  if(f & rf_reset_bodies) {
452  function_body_type *tmp = my_init_body->clone();
453  delete my_body;
454  my_body = tmp;
455  }
456  }
457 
458  function_body_type *my_body;
459  function_body_type *my_init_body;
460  virtual broadcast_cache<output_type > &successors() = 0;
461 
462  }; // function_input
463 
464 
465  // helper templates to clear the successor edges of the output ports of an multifunction_node
466  template<int N> struct clear_element {
467  template<typename P> static void clear_this(P &p) {
468  (void)tbb::flow::get<N-1>(p).successors().clear();
469  clear_element<N-1>::clear_this(p);
470  }
471  template<typename P> static bool this_empty(P &p) {
472  if(tbb::flow::get<N-1>(p).successors().empty())
473  return clear_element<N-1>::this_empty(p);
474  return false;
475  }
476  };
477 
478  template<> struct clear_element<1> {
479  template<typename P> static void clear_this(P &p) {
480  (void)tbb::flow::get<0>(p).successors().clear();
481  }
482  template<typename P> static bool this_empty(P &p) {
483  return tbb::flow::get<0>(p).successors().empty();
484  }
485  };
486 
487 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
488  // helper templates to extract the output ports of an multifunction_node from graph
489  template<int N> struct extract_element {
490  template<typename P> static void extract_this(P &p) {
491  (void)tbb::flow::get<N-1>(p).successors().built_successors().sender_extract(tbb::flow::get<N-1>(p));
492  extract_element<N-1>::extract_this(p);
493  }
494  };
495 
496  template<> struct extract_element<1> {
497  template<typename P> static void extract_this(P &p) {
498  (void)tbb::flow::get<0>(p).successors().built_successors().sender_extract(tbb::flow::get<0>(p));
499  }
500  };
501 #endif
502 
504  // and has a tuple of output ports specified.
505  template< typename Input, typename OutputPortSet, typename A>
506  class multifunction_input : public function_input_base<Input, A, multifunction_input<Input,OutputPortSet,A> > {
507  public:
508  static const int N = tbb::flow::tuple_size<OutputPortSet>::value;
509  typedef Input input_type;
510  typedef OutputPortSet output_ports_type;
511  typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
512  typedef multifunction_input<Input,OutputPortSet,A> my_class;
513  typedef function_input_base<Input, A, my_class> base_type;
514  typedef function_input_queue<input_type, A> input_queue_type;
515 
516  // constructor
517  template<typename Body>
518  multifunction_input(
519  graph &g,
520  size_t max_concurrency,
521  Body& body,
522  input_queue_type *q = NULL ) :
523  base_type(g, max_concurrency, q),
524  my_body( new internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) ),
525  my_init_body( new internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) {
526  }
527 
529  multifunction_input( const multifunction_input& src, input_queue_type *q = NULL ) :
530  base_type(src, q),
531  my_body( src.my_init_body->clone() ),
532  my_init_body(src.my_init_body->clone() ) {
533  }
534 
535  ~multifunction_input() {
536  delete my_body;
537  delete my_init_body;
538  }
539 
540  template< typename Body >
541  Body copy_function_object() {
542  multifunction_body_type &body_ref = *this->my_body;
543  return dynamic_cast< internal::multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body();
544  }
545 
546  // for multifunction nodes we do not have a single successor as such. So we just tell
547  // the task we were successful.
548  task * apply_body_impl_bypass( const input_type &i) {
549  tbb::internal::fgt_begin_body( my_body );
550  (*my_body)(i, my_output_ports);
551  tbb::internal::fgt_end_body( my_body );
552  task * new_task = SUCCESSFULLY_ENQUEUED;
553  return new_task;
554  }
555 
556  output_ports_type &output_ports(){ return my_output_ports; }
557 
558  protected:
559 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
560  /*override*/void extract() {
561  extract_element<N>::extract_this(my_output_ports);
562  }
563 #endif
564 
565  /*override*/void reset(reset_flags f) {
566  base_type::reset_function_input_base(f);
567  if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
568  if(f & rf_reset_bodies) {
569  multifunction_body_type *tmp = my_init_body->clone();
570  delete my_body;
571  my_body = tmp;
572  }
573  __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
574  }
575 
576  multifunction_body_type *my_body;
577  multifunction_body_type *my_init_body;
578  output_ports_type my_output_ports;
579 
580  }; // multifunction_input
581 
582  // template to refer to an output port of a multifunction_node
583  template<size_t N, typename MOP>
584  typename tbb::flow::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
585  return tbb::flow::get<N>(op.output_ports());
586  }
587 
588 // helper structs for split_node
589  template<int N>
590  struct emit_element {
591  template<typename T, typename P>
592  static void emit_this(const T &t, P &p) {
593  (void)tbb::flow::get<N-1>(p).try_put(tbb::flow::get<N-1>(t));
594  emit_element<N-1>::emit_this(t,p);
595  }
596  };
597 
598  template<>
599  struct emit_element<1> {
600  template<typename T, typename P>
601  static void emit_this(const T &t, P &p) {
602  (void)tbb::flow::get<0>(p).try_put(tbb::flow::get<0>(t));
603  }
604  };
605 
607  template< typename Output >
608  class continue_input : public continue_receiver {
609  public:
610 
612  typedef continue_msg input_type;
613 
615  typedef Output output_type;
616  typedef function_body<input_type, output_type> function_body_type;
617 
618  template< typename Body >
619  continue_input( graph &g, Body& body )
620  : my_graph_ptr(&g),
621  my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ),
622  my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
623 
624  template< typename Body >
625  continue_input( graph &g, int number_of_predecessors, Body& body )
626  : continue_receiver( number_of_predecessors ), my_graph_ptr(&g),
627  my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ),
628  my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
629  { }
630 
631  continue_input( const continue_input& src ) : continue_receiver(src),
632  my_graph_ptr(src.my_graph_ptr),
633  my_body( src.my_init_body->clone() ),
634  my_init_body( src.my_init_body->clone() ) {}
635 
636  ~continue_input() {
637  delete my_body;
638  delete my_init_body;
639  }
640 
641  template< typename Body >
642  Body copy_function_object() {
643  function_body_type &body_ref = *my_body;
644  return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
645  }
646 
647  /*override*/void reset_receiver( reset_flags f) {
648  continue_receiver::reset_receiver(f);
649  if(f & rf_reset_bodies) {
650  function_body_type *tmp = my_init_body->clone();
651  delete my_body;
652  my_body = tmp;
653  }
654  }
655 
656  protected:
657 
658  graph* my_graph_ptr;
659  function_body_type *my_body;
660  function_body_type *my_init_body;
661 
662  virtual broadcast_cache<output_type > &successors() = 0;
663 
664  friend class apply_body_task_bypass< continue_input< Output >, continue_msg >;
665 
667  task *apply_body_bypass( input_type ) {
668 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
669  // There is an extra copied needed to capture the
670  // body execution without the try_put
671  tbb::internal::fgt_begin_body( my_body );
672  output_type v = (*my_body)( continue_msg() );
673  tbb::internal::fgt_end_body( my_body );
674  return successors().try_put_task( v );
675 #else
676  return successors().try_put_task( (*my_body)( continue_msg() ) );
677 #endif
678  }
679 
681  /* override */ task *execute( ) {
682  return (my_graph_ptr->is_active()) ?
683  new ( task::allocate_additional_child_of( *(my_graph_ptr->root_task()) ) )
684  apply_body_task_bypass< continue_input< Output >, continue_msg >( *this, continue_msg() ) :
685  NULL;
686  }
687 
688  }; // continue_input
689 
691  template< typename Output >
692  class function_output : public sender<Output> {
693  public:
694 
695  template<int N> friend struct clear_element;
696  typedef Output output_type;
697  typedef typename sender<output_type>::successor_type successor_type;
698  typedef broadcast_cache<output_type> broadcast_cache_type;
699 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
700  typedef typename sender<output_type>::built_successors_type built_successors_type;
701  typedef typename sender<output_type>::successor_list_type successor_list_type;
702 #endif
703 
704  function_output() { my_successors.set_owner(this); }
705  function_output(const function_output & /*other*/) : sender<output_type>() {
706  my_successors.set_owner(this);
707  }
708 
710  /* override */ bool register_successor( successor_type &r ) {
711  successors().register_successor( r );
712  return true;
713  }
714 
716  /* override */ bool remove_successor( successor_type &r ) {
717  successors().remove_successor( r );
718  return true;
719  }
720 
721 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
722  built_successors_type &built_successors() { return successors().built_successors(); }
723 
724 
725  /*override*/ void internal_add_built_successor( successor_type &r) {
726  successors().internal_add_built_successor( r );
727  }
728 
729  /*override*/ void internal_delete_built_successor( successor_type &r) {
730  successors().internal_delete_built_successor( r );
731  }
732 
733  /*override*/ size_t successor_count() {
734  return successors().successor_count();
735  }
736 
737  /*override*/ void copy_successors( successor_list_type &v) {
738  successors().copy_successors(v);
739  }
740 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
741 
742  // for multifunction_node. The function_body that implements
743  // the node will have an input and an output tuple of ports. To put
744  // an item to a successor, the body should
745  //
746  // get<I>(output_ports).try_put(output_value);
747  //
748  // if task pointer is returned will always spawn and return true, else
749  // return value will be bool returned from successors.try_put.
750  task *try_put_task(const output_type &i) { return my_successors.try_put_task(i); }
751 
752  broadcast_cache_type &successors() { return my_successors; }
753  protected:
754  broadcast_cache_type my_successors;
755 
756  }; // function_output
757 
758  template< typename Output >
759  class multifunction_output : public function_output<Output> {
760  public:
761  typedef Output output_type;
762  typedef function_output<output_type> base_type;
763  using base_type::my_successors;
764 
765  multifunction_output() : base_type() {my_successors.set_owner(this);}
766  multifunction_output( const multifunction_output &/*other*/) : base_type() { my_successors.set_owner(this); }
767 
768  bool try_put(const output_type &i) {
769  task *res = my_successors.try_put_task(i);
770  if(!res) return false;
771  if(res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
772  return true;
773  }
774  }; // multifunction_output
775 
776 //composite_node
777 #if TBB_PREVIEW_FLOW_GRAPH_TRACE && __TBB_FLOW_GRAPH_CPP11_FEATURES
778  template<typename CompositeType>
779  void add_nodes_impl(CompositeType*, bool) {}
780 
781  template< typename CompositeType, typename NodeType1, typename... NodeTypes >
782  void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
783  void *addr = const_cast<NodeType1 *>(&n1);
784 
785  if(visible)
786  tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_FLOW, c_node, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_parent_of, addr, tbb::internal::FLOW_NODE );
787  else
788  tbb::internal::itt_relation_add( tbb::internal::ITT_DOMAIN_FLOW, addr, tbb::internal::FLOW_NODE, tbb::internal::__itt_relation_is_child_of, c_node, tbb::internal::FLOW_NODE );
789  add_nodes_impl(c_node, visible, n...);
790  }
791 #endif
792 
793 } // internal
794 
795 #endif // __TBB__flow_graph_node_impl_H
Definition: _aggregator_impl.h:160
aggregated_operation base class
Definition: _aggregator_impl.h:37
Definition: _aggregator_impl.h:144
Definition: _flow_graph_async_msg_impl.h:32
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: _flow_graph_impl.h:177
the leaf for function_body
Definition: _flow_graph_impl.h:92