BRE12
flow_graph.h
Go to the documentation of this file.
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB_flow_graph_H
22 #define __TBB_flow_graph_H
23 
24 #include "tbb_stddef.h"
25 #include "atomic.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
30 #include "task.h"
31 #include "cache_aligned_allocator.h"
32 #include "tbb_exception.h"
33 #include "internal/_template_helpers.h"
34 #include "internal/_aggregator_impl.h"
35 #include "tbb_profiling.h"
36 
37 #if __TBB_PREVIEW_ASYNC_NODE
38 #include "task_arena.h"
39 #endif
40 
41 #if __TBB_PREVIEW_ASYNC_MSG
42 #include <vector> // std::vector in internal::async_storage
43 #include <memory> // std::shared_ptr in async_msg
44 #endif
45 
46 #if TBB_DEPRECATED_FLOW_ENQUEUE
47 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
48 #else
49 #define FLOW_SPAWN(a) tbb::task::spawn((a))
50 #endif
51 
52 // use the VC10 or gcc version of tuple if it is available.
53 #if __TBB_CPP11_TUPLE_PRESENT
54  #include <tuple>
55 namespace tbb {
56  namespace flow {
57  using std::tuple;
58  using std::tuple_size;
59  using std::tuple_element;
60  using std::get;
61  }
62 }
63 #else
64  #include "compat/tuple"
65 #endif
66 
67 #include<list>
68 #include<queue>
69 
80 namespace tbb {
81 namespace flow {
82 
84 enum concurrency { unlimited = 0, serial = 1 };
85 
86 namespace interface8 {
87 
88 namespace internal {
89  template<typename T, typename M> class successor_cache;
90  template<typename T, typename M> class broadcast_cache;
91  template<typename T, typename M> class round_robin_cache;
92  template<typename T, typename M> class predecessor_cache;
93  template<typename T, typename M> class reservable_predecessor_cache;
94 }
95 
96 //A generic null type
97 struct null_type {};
98 
100 class continue_msg {};
101 
102 template< typename T > class sender;
103 template< typename T > class receiver;
104 class continue_receiver;
105 
106 template< typename T > class limiter_node; // needed for resetting decrementer
107 template< typename R, typename B > class run_and_put_task;
108 
109 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
110 
111 // flags to modify the behavior of the graph reset(). Can be combined.
112 enum reset_flags {
113  rf_reset_protocol = 0,
114  rf_reset_bodies = 1<<0, // delete the current node body, reset to a copy of the initial node body.
115  rf_clear_edges = 1<<1 // delete edges
116 };
117 
118 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
119 //* holder of edges both for caches and for those nodes which do not have predecessor caches.
120 // C == receiver< ... > or sender< ... >, depending.
121 namespace internal {
122 template<typename C>
123 class edge_container {
124 
125 public:
126  typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
127 
128  void add_edge( C &s) {
129  built_edges.push_back( &s );
130  }
131 
132  void delete_edge( C &s) {
133  for ( typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i ) {
134  if ( *i == &s ) {
135  (void)built_edges.erase(i);
136  return; // only remove one predecessor per request
137  }
138  }
139  }
140 
141  void copy_edges( edge_list_type &v) {
142  v = built_edges;
143  }
144 
145  size_t edge_count() {
146  return (size_t)(built_edges.size());
147  }
148 
149  void clear() {
150  built_edges.clear();
151  }
152 
153  // methods remove the statement from all predecessors/successors liste in the edge
154  // container.
155  template< typename S > void sender_extract( S &s );
156  template< typename R > void receiver_extract( R &r );
157 
158 private:
159  edge_list_type built_edges;
160 }; // class edge_container
161 } // namespace internal
162 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
163 
164 #if __TBB_PREVIEW_ASYNC_MSG
165 
166 #include "internal/_flow_graph_async_msg_impl.h"
167 
168 namespace internal {
169 
170 class untyped_receiver;
171 
172 class untyped_sender {
173  template< typename, typename > friend class internal::predecessor_cache;
174  template< typename, typename > friend class internal::reservable_predecessor_cache;
175 public:
177  typedef untyped_receiver successor_type;
178 
179  virtual ~untyped_sender() {}
180 
181  // NOTE: Folowing part of PUBLIC section is copy-paste from original sender<T> class
182 
183  // TODO: Prevent untyped successor registration
184 
186  virtual bool register_successor( successor_type &r ) = 0;
187 
189  virtual bool remove_successor( successor_type &r ) = 0;
190 
192  virtual bool try_release( ) { return false; }
193 
195  virtual bool try_consume( ) { return false; }
196 
197 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
198  typedef internal::edge_container<successor_type> built_successors_type;
200  typedef built_successors_type::edge_list_type successor_list_type;
201  virtual built_successors_type &built_successors() = 0;
202  virtual void internal_add_built_successor( successor_type & ) = 0;
203  virtual void internal_delete_built_successor( successor_type & ) = 0;
204  virtual void copy_successors( successor_list_type &) = 0;
205  virtual size_t successor_count() = 0;
206 #endif
207 protected:
209  template< typename X >
210  bool try_get( X &t ) {
212  }
213 
215  template< typename X >
216  bool try_reserve( X &t ) {
218  }
219 
220  virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
221  virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
222 };
223 
224 class untyped_receiver {
225  template< typename, typename > friend class run_and_put_task;
226  template< typename > friend class limiter_node;
227 
228  template< typename, typename > friend class internal::broadcast_cache;
229  template< typename, typename > friend class internal::round_robin_cache;
230  template< typename, typename > friend class internal::successor_cache;
231 
232 #if __TBB_PREVIEW_OPENCL_NODE
233  template< typename, typename > friend class proxy_dependency_receiver;
234 #endif /* __TBB_PREVIEW_OPENCL_NODE */
235 public:
237  typedef untyped_sender predecessor_type;
238 
240  virtual ~untyped_receiver() {}
241 
243  template<typename X>
244  bool try_put(const X& t) {
245  task *res = try_put_task(t);
246  if (!res) return false;
247  if (res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
248  return true;
249  }
250 
251  // NOTE: Folowing part of PUBLIC section is copy-paste from original receiver<T> class
252 
253  // TODO: Prevent untyped predecessor registration
254 
256  virtual bool register_predecessor( predecessor_type & ) { return false; }
257 
259  virtual bool remove_predecessor( predecessor_type & ) { return false; }
260 
261 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
262  typedef internal::edge_container<predecessor_type> built_predecessors_type;
263  typedef built_predecessors_type::edge_list_type predecessor_list_type;
264  virtual built_predecessors_type &built_predecessors() = 0;
265  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
266  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
267  virtual void copy_predecessors( predecessor_list_type & ) = 0;
268  virtual size_t predecessor_count() = 0;
269 #endif
270 protected:
271  template<typename X>
272  task *try_put_task(const X& t) {
274  }
275 
276  virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
277 
278  // NOTE: Folowing part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
279 
281  virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
282 
283  virtual bool is_continue_receiver() { return false; }
284 };
285 
286 } // namespace internal
287 
289 template< typename T >
290 class sender : public internal::untyped_sender {
291 public:
293  typedef T output_type;
294 
295  typedef typename internal::async_helpers<T>::filtered_type filtered_type;
296 
298  virtual bool try_get( T & ) { return false; }
299 
301  virtual bool try_reserve( T & ) { return false; }
302 
303 protected:
304  virtual bool try_get_wrapper( void* p, bool is_async ) {
305  // Both async OR both are NOT async
306  if ( internal::async_helpers<T>::is_async_type == is_async ) {
307  return try_get( internal::async_helpers<T>::from_void_ptr(p) );
308  }
309  // Else: this (T) is async OR incoming 't' is async
310  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
311  return false;
312  }
313 
314  virtual bool try_reserve_wrapper( void* p, bool is_async ) {
315  // Both async OR both are NOT async
316  if ( internal::async_helpers<T>::is_async_type == is_async ) {
317  return try_reserve( internal::async_helpers<T>::from_void_ptr(p) );
318  }
319  // Else: this (T) is async OR incoming 't' is async
320  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
321  return false;
322  }
323 }; // class sender<T>
324 
326 template< typename T >
327 class receiver : public internal::untyped_receiver {
328  template< typename > friend class internal::async_storage;
329  template< typename, typename > friend struct internal::async_helpers;
330 public:
332  typedef T input_type;
333 
334  typedef typename internal::async_helpers<T>::filtered_type filtered_type;
335 
337  bool try_put( const typename internal::async_helpers<T>::filtered_type& t ) {
338  return internal::untyped_receiver::try_put(t);
339  }
340 
341  bool try_put( const typename internal::async_helpers<T>::async_type& t ) {
342  return internal::untyped_receiver::try_put(t);
343  }
344 
345 protected:
346  virtual task* try_put_task_wrapper( const void *p, bool is_async ) {
348  }
349 
351  virtual task *try_put_task(const T& t) = 0;
352 
353 }; // class receiver<T>
354 
355 #else // __TBB_PREVIEW_ASYNC_MSG
356 
358 template< typename T >
359 class sender {
360 public:
362  typedef T output_type;
363 
366 
367  virtual ~sender() {}
368 
369  // NOTE: Folowing part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
370 
372  virtual bool register_successor( successor_type &r ) = 0;
373 
375  virtual bool remove_successor( successor_type &r ) = 0;
376 
378  virtual bool try_get( T & ) { return false; }
379 
381  virtual bool try_reserve( T & ) { return false; }
382 
384  virtual bool try_release( ) { return false; }
385 
387  virtual bool try_consume( ) { return false; }
388 
389 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
390  typedef typename internal::edge_container<successor_type> built_successors_type;
392  typedef typename built_successors_type::edge_list_type successor_list_type;
393  virtual built_successors_type &built_successors() = 0;
394  virtual void internal_add_built_successor( successor_type & ) = 0;
395  virtual void internal_delete_built_successor( successor_type & ) = 0;
396  virtual void copy_successors( successor_list_type &) = 0;
397  virtual size_t successor_count() = 0;
398 #endif
399 }; // class sender<T>
400 
402 template< typename T >
403 class receiver {
404 public:
406  typedef T input_type;
407 
410 
412  virtual ~receiver() {}
413 
415  bool try_put( const T& t ) {
416  task *res = try_put_task(t);
417  if (!res) return false;
418  if (res != SUCCESSFULLY_ENQUEUED) FLOW_SPAWN(*res);
419  return true;
420  }
421 
423 protected:
424  template< typename R, typename B > friend class run_and_put_task;
425  template< typename X, typename Y > friend class internal::broadcast_cache;
426  template< typename X, typename Y > friend class internal::round_robin_cache;
427  virtual task *try_put_task(const T& t) = 0;
428 public:
429  // NOTE: Folowing part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
430 
432  virtual bool register_predecessor( predecessor_type & ) { return false; }
433 
435  virtual bool remove_predecessor( predecessor_type & ) { return false; }
436 
437 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
438  typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
439  typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
440  virtual built_predecessors_type &built_predecessors() = 0;
441  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
442  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
443  virtual void copy_predecessors( predecessor_list_type & ) = 0;
444  virtual size_t predecessor_count() = 0;
445 #endif
446 
447 protected:
449  template<typename U> friend class limiter_node;
450  virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
451 
452  template<typename TT, typename M> friend class internal::successor_cache;
453  virtual bool is_continue_receiver() { return false; }
454 
455 #if __TBB_PREVIEW_OPENCL_NODE
456  template< typename, typename > friend class proxy_dependency_receiver;
457 #endif /* __TBB_PREVIEW_OPENCL_NODE */
458 }; // class receiver<T>
459 
460 #endif // __TBB_PREVIEW_ASYNC_MSG
461 
462 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
463 static inline tbb::task *combine_tasks( tbb::task * left, tbb::task * right) {
464  // if no RHS task, don't change left.
465  if(right == NULL) return left;
466  // right != NULL
467  if(left == NULL) return right;
468  if(left == SUCCESSFULLY_ENQUEUED) return right;
469  // left contains a task
470  if(right != SUCCESSFULLY_ENQUEUED) {
471  // both are valid tasks
472  FLOW_SPAWN(*left);
473  return right;
474  }
475  return left;
476 }
477 
479 
480 class continue_receiver : public receiver< continue_msg > {
481 public:
482 
485 
488 
490  continue_receiver( int number_of_predecessors = 0 ) {
491  my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
492  my_current_count = 0;
493  }
494 
497  my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
498  my_current_count = 0;
499  }
500 
502  virtual ~continue_receiver() { }
503 
505  /* override */ bool register_predecessor( predecessor_type & ) {
506  spin_mutex::scoped_lock l(my_mutex);
507  ++my_predecessor_count;
508  return true;
509  }
510 
512 
515  /* override */ bool remove_predecessor( predecessor_type & ) {
516  spin_mutex::scoped_lock l(my_mutex);
517  --my_predecessor_count;
518  return true;
519  }
520 
521 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
522  typedef internal::edge_container<predecessor_type> built_predecessors_type;
523  typedef built_predecessors_type::edge_list_type predecessor_list_type;
524  /*override*/ built_predecessors_type &built_predecessors() { return my_built_predecessors; }
525 
526  /*override*/ void internal_add_built_predecessor( predecessor_type &s) {
527  spin_mutex::scoped_lock l(my_mutex);
528  my_built_predecessors.add_edge( s );
529  }
530 
531  /*override*/ void internal_delete_built_predecessor( predecessor_type &s) {
532  spin_mutex::scoped_lock l(my_mutex);
533  my_built_predecessors.delete_edge(s);
534  }
535 
536  /*override*/ void copy_predecessors( predecessor_list_type &v) {
537  spin_mutex::scoped_lock l(my_mutex);
538  my_built_predecessors.copy_edges(v);
539  }
540 
541  /*override*/ size_t predecessor_count() {
542  spin_mutex::scoped_lock l(my_mutex);
543  return my_built_predecessors.edge_count();
544  }
545 
546 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
547 
548 protected:
549  template< typename R, typename B > friend class run_and_put_task;
550  template<typename X, typename Y> friend class internal::broadcast_cache;
551  template<typename X, typename Y> friend class internal::round_robin_cache;
552  // execute body is supposed to be too small to create a task for.
553  /* override */ task *try_put_task( const input_type & ) {
554  {
555  spin_mutex::scoped_lock l(my_mutex);
556  if ( ++my_current_count < my_predecessor_count )
557  return SUCCESSFULLY_ENQUEUED;
558  else
559  my_current_count = 0;
560  }
561  task * res = execute();
562  return res? res : SUCCESSFULLY_ENQUEUED;
563  }
564 
565 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
566  // continue_receiver must contain its own built_predecessors because it does
567  // not have a node_cache.
568  built_predecessors_type my_built_predecessors;
569 #endif
570  spin_mutex my_mutex;
571  int my_predecessor_count;
572  int my_current_count;
573  int my_initial_predecessor_count;
574  // the friend declaration in the base class did not eliminate the "protected class"
575  // error in gcc 4.1.2
576  template<typename U> friend class limiter_node;
577 
578  /*override*/void reset_receiver( reset_flags f ) {
579  my_current_count = 0;
580  if (f & rf_clear_edges) {
581 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
582  my_built_predecessors.clear();
583 #endif
584  my_predecessor_count = my_initial_predecessor_count;
585  }
586  }
587 
589 
591  virtual task * execute() = 0;
592  template<typename TT, typename M> friend class internal::successor_cache;
593  /*override*/ bool is_continue_receiver() { return true; }
594 
595 }; // class continue_receiver
596 } // interface8
597 
598 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
599  template <typename K, typename T>
600  K key_from_message( const T &t ) {
601  return t.key();
602  }
603 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
604 
605  using interface8::sender;
606  using interface8::receiver;
608 } // flow
609 } // tbb
610 
611 #include "internal/_flow_graph_trace_impl.h"
612 #include "internal/_tbb_hash_compare_impl.h"
613 
614 namespace tbb {
615 namespace flow {
616 namespace interface8 {
617 
618 #include "internal/_flow_graph_impl.h"
619 #include "internal/_flow_graph_types_impl.h"
620 using namespace internal::graph_policy_namespace;
621 
622 class graph;
623 class graph_node;
624 
625 template <typename GraphContainerType, typename GraphNodeType>
626 class graph_iterator {
627  friend class graph;
628  friend class graph_node;
629 public:
630  typedef size_t size_type;
631  typedef GraphNodeType value_type;
632  typedef GraphNodeType* pointer;
633  typedef GraphNodeType& reference;
634  typedef const GraphNodeType& const_reference;
635  typedef std::forward_iterator_tag iterator_category;
636 
638  graph_iterator() : my_graph(NULL), current_node(NULL) {}
639 
641  graph_iterator(const graph_iterator& other) :
642  my_graph(other.my_graph), current_node(other.current_node)
643  {}
644 
646  graph_iterator& operator=(const graph_iterator& other) {
647  if (this != &other) {
648  my_graph = other.my_graph;
649  current_node = other.current_node;
650  }
651  return *this;
652  }
653 
655  reference operator*() const;
656 
658  pointer operator->() const;
659 
661  bool operator==(const graph_iterator& other) const {
662  return ((my_graph == other.my_graph) && (current_node == other.current_node));
663  }
664 
666  bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
667 
669  graph_iterator& operator++() {
670  internal_forward();
671  return *this;
672  }
673 
675  graph_iterator operator++(int) {
676  graph_iterator result = *this;
677  operator++();
678  return result;
679  }
680 
681 private:
682  // the graph over which we are iterating
683  GraphContainerType *my_graph;
684  // pointer into my_graph's my_nodes list
685  pointer current_node;
686 
688  graph_iterator(GraphContainerType *g, bool begin);
689  void internal_forward();
690 }; // class graph_iterator
691 
693 
694 class graph : tbb::internal::no_copy {
695  friend class graph_node;
696 
697  template< typename Body >
698  class run_task : public task {
699  public:
700  run_task( Body& body ) : my_body(body) {}
701  task *execute() {
702  my_body();
703  return NULL;
704  }
705  private:
706  Body my_body;
707  };
708 
709  template< typename Receiver, typename Body >
710  class run_and_put_task : public task {
711  public:
712  run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
713  task *execute() {
714  task *res = my_receiver.try_put_task( my_body() );
715  if (res == SUCCESSFULLY_ENQUEUED) res = NULL;
716  return res;
717  }
718  private:
719  Receiver &my_receiver;
720  Body my_body;
721  };
722  typedef std::list<task *> task_list_type;
723 
724 #if __TBB_PREVIEW_ASYNC_NODE
725  class wait_functor {
726  task* graph_root_task;
727  public:
728  wait_functor( task* t ) : graph_root_task(t) {}
729  void operator()() const { graph_root_task->wait_for_all(); }
730  };
731 
732  void prepare_task_arena( bool reinit = false ) {
733  if (reinit) {
734  __TBB_ASSERT( my_task_arena, NULL );
735  my_task_arena->terminate();
736  my_task_arena->initialize(tbb::task_arena::attach());
737  } else {
738  my_task_arena = new tbb::task_arena(tbb::task_arena::attach());
739  }
740  if (!my_task_arena->is_active()) // failed to attach
741  my_task_arena->initialize(); // create a new, default-initialized arena
742  __TBB_ASSERT(my_task_arena->is_active(), NULL);
743  }
744 #endif
745 
746 public:
748  graph() : my_nodes(NULL), my_nodes_last(NULL) {
749 #if __TBB_PREVIEW_ASYNC_NODE
750  prepare_task_arena();
751 #endif
752  own_context = true;
753  cancelled = false;
754  caught_exception = false;
755  my_context = new task_group_context();
756  my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
757  my_root_task->set_ref_count(1);
758  tbb::internal::fgt_graph( this );
759  my_is_active = true;
760  }
761 
763  explicit graph(task_group_context& use_this_context) :
764  my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL) {
765 #if __TBB_PREVIEW_ASYNC_NODE
766  prepare_task_arena();
767 #endif
768  own_context = false;
769  my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
770  my_root_task->set_ref_count(1);
771  tbb::internal::fgt_graph( this );
772  my_is_active = true;
773  }
774 
776 
777  ~graph() {
778  wait_for_all();
779  my_root_task->set_ref_count(0);
780  task::destroy( *my_root_task );
781  if (own_context) delete my_context;
782 #if __TBB_PREVIEW_ASYNC_NODE
783  delete my_task_arena;
784 #endif
785  }
786 
787 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
788  void set_name( const char *name ) {
789  tbb::internal::fgt_graph_desc( this, name );
790  }
791 #endif
792 
794 
796  void increment_wait_count() {
797  if (my_root_task)
798  my_root_task->increment_ref_count();
799  }
800 
802 
804  void decrement_wait_count() {
805  if (my_root_task)
806  my_root_task->decrement_ref_count();
807  }
808 
810 
812  template< typename Receiver, typename Body >
813  void run( Receiver &r, Body body ) {
814  if(is_active()) {
815  FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *root_task() ) )
817  }
818  }
819 
821 
823  template< typename Body >
824  void run( Body body ) {
825  if(is_active()) {
826  FLOW_SPAWN( * new ( task::allocate_additional_child_of( *root_task() ) ) run_task< Body >( body ) );
827  }
828  }
829 
831 
832  void wait_for_all() {
833  cancelled = false;
834  caught_exception = false;
835  if (my_root_task) {
836 #if TBB_USE_EXCEPTIONS
837  try {
838 #endif
839 #if __TBB_PREVIEW_ASYNC_NODE
840  my_task_arena->execute(wait_functor(my_root_task));
841 #else
842  my_root_task->wait_for_all();
843 #endif
844  cancelled = my_context->is_group_execution_cancelled();
845 #if TBB_USE_EXCEPTIONS
846  }
847  catch(...) {
848  my_root_task->set_ref_count(1);
849  my_context->reset();
850  caught_exception = true;
851  cancelled = true;
852  throw;
853  }
854 #endif
855  // TODO: the "if" condition below is just a work-around to support the concurrent wait
856  // mode. The cancelation and exception mechanisms are still broken in this mode.
857  // Consider using task group not to re-implement the same functionality.
858  if ( !(my_context->traits() & task_group_context::concurrent_wait) ) {
859  my_context->reset(); // consistent with behavior in catch()
860  my_root_task->set_ref_count(1);
861  }
862  }
863  }
864 
866  task * root_task() {
867  return my_root_task;
868  }
869 
870  void set_active(bool a = true) {
871  my_is_active = a;
872  }
873 
874  bool is_active() {
875  return my_is_active;
876  }
877 
878  void add_task_to_reset_list(task *tp) {
879  my_reset_task_list.push_back(tp);
880  }
881 
882  // ITERATORS
883  template<typename C, typename N>
884  friend class graph_iterator;
885 
886  // Graph iterator typedefs
887  typedef graph_iterator<graph,graph_node> iterator;
888  typedef graph_iterator<const graph,const graph_node> const_iterator;
889 
890  // Graph iterator constructors
892  iterator begin() { return iterator(this, true); }
894  iterator end() { return iterator(this, false); }
896  const_iterator begin() const { return const_iterator(this, true); }
898  const_iterator end() const { return const_iterator(this, false); }
900  const_iterator cbegin() const { return const_iterator(this, true); }
902  const_iterator cend() const { return const_iterator(this, false); }
903 
905  bool is_cancelled() { return cancelled; }
906  bool exception_thrown() { return caught_exception; }
907 
908  // thread-unsafe state reset.
909  void reset(reset_flags f = rf_reset_protocol);
910 
911 private:
912  task *my_root_task;
913  task_group_context *my_context;
914  bool own_context;
915  bool cancelled;
916  bool caught_exception;
917  bool my_is_active;
918  task_list_type my_reset_task_list;
919 
920  graph_node *my_nodes, *my_nodes_last;
921 
922  spin_mutex nodelist_mutex;
923  void register_node(graph_node *n);
924  void remove_node(graph_node *n);
925 
926 #if __TBB_PREVIEW_ASYNC_NODE
927  template < typename Input, typename Output, typename Policy, typename Allocator >
928  friend class async_node;
929  task_arena* my_task_arena;
930 #endif
931 }; // class graph
932 
933 template <typename C, typename N>
934 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
935 {
936  if (begin) current_node = my_graph->my_nodes;
937  //else it is an end iterator by default
938 }
939 
940 template <typename C, typename N>
941 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
942  __TBB_ASSERT(current_node, "graph_iterator at end");
943  return *operator->();
944 }
945 
946 template <typename C, typename N>
947 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
948  return current_node;
949 }
950 
951 
952 template <typename C, typename N>
953 void graph_iterator<C,N>::internal_forward() {
954  if (current_node) current_node = current_node->next;
955 }
956 
958 class graph_node : tbb::internal::no_copy {
959  friend class graph;
960  template<typename C, typename N>
961  friend class graph_iterator;
962 protected:
963  graph& my_graph;
964  graph_node *next, *prev;
965 public:
966  graph_node(graph& g) : my_graph(g) {
967  my_graph.register_node(this);
968  }
969  virtual ~graph_node() {
970  my_graph.remove_node(this);
971  }
972 
973 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
974  virtual void set_name( const char *name ) = 0;
975 #endif
976 
977 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
978  virtual void extract( ) = 0;
979 #endif
980 
981 protected:
982  // performs the reset on an individual node.
983  virtual void reset_node(reset_flags f=rf_reset_protocol) = 0;
984 }; // class graph_node
985 
986 inline void graph::register_node(graph_node *n) {
987  n->next = NULL;
988  {
989  spin_mutex::scoped_lock lock(nodelist_mutex);
990  n->prev = my_nodes_last;
991  if (my_nodes_last) my_nodes_last->next = n;
992  my_nodes_last = n;
993  if (!my_nodes) my_nodes = n;
994  }
995 }
996 
997 inline void graph::remove_node(graph_node *n) {
998  {
999  spin_mutex::scoped_lock lock(nodelist_mutex);
1000  __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
1001  if (n->prev) n->prev->next = n->next;
1002  if (n->next) n->next->prev = n->prev;
1003  if (my_nodes_last == n) my_nodes_last = n->prev;
1004  if (my_nodes == n) my_nodes = n->next;
1005  }
1006  n->prev = n->next = NULL;
1007 }
1008 
1009 inline void graph::reset( reset_flags f ) {
1010  // reset context
1011  set_active(false);
1012  if(my_context) my_context->reset();
1013  cancelled = false;
1014  caught_exception = false;
1015  // reset all the nodes comprising the graph
1016  for(iterator ii = begin(); ii != end(); ++ii) {
1017  graph_node *my_p = &(*ii);
1018  my_p->reset_node(f);
1019  }
1020 #if __TBB_PREVIEW_ASYNC_NODE
1021  // Reattach the arena. Might be useful to run the graph in a particular task_arena
1022  // while not limiting graph lifetime to a single task_arena::execute() call.
1023  prepare_task_arena( /*reinit=*/true );
1024 #endif
1025  set_active(true);
1026  // now spawn the tasks necessary to start the graph
1027  for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
1028  FLOW_SPAWN(*(*rti));
1029  }
1030  my_reset_task_list.clear();
1031 }
1032 
1033 
1034 #include "internal/_flow_graph_node_impl.h"
1035 
1037 template < typename Output >
1038 class source_node : public graph_node, public sender< Output > {
1039 protected:
1040  using graph_node::my_graph;
1041 public:
1043  typedef Output output_type;
1044 
1046  typedef typename sender<output_type>::successor_type successor_type;
1047 
1048  //Source node has no input type
1049  typedef null_type input_type;
1050 
1051 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1052  typedef typename sender<output_type>::built_successors_type built_successors_type;
1053  typedef typename sender<output_type>::successor_list_type successor_list_type;
1054 #endif
1055 
1057  template< typename Body >
1058  source_node( graph &g, Body body, bool is_active = true )
1059  : graph_node(g), my_active(is_active), init_my_active(is_active),
1061  my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
1062  my_reserved(false), my_has_cached_item(false)
1063  {
1064  my_successors.set_owner(this);
1065  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1066  static_cast<sender<output_type> *>(this), this->my_body );
1067  }
1068 
1070  source_node( const source_node& src ) :
1071  graph_node(src.my_graph), sender<Output>(),
1072  my_active(src.init_my_active),
1073  init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
1074  my_reserved(false), my_has_cached_item(false)
1075  {
1076  my_successors.set_owner(this);
1077  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1078  static_cast<sender<output_type> *>(this), this->my_body );
1079  }
1080 
1082  ~source_node() { delete my_body; delete my_init_body; }
1083 
1084 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1085  /* override */ void set_name( const char *name ) {
1086  tbb::internal::fgt_node_desc( this, name );
1087  }
1088 #endif
1089 
1091  /* override */ bool register_successor( successor_type &r ) {
1092  spin_mutex::scoped_lock lock(my_mutex);
1093  my_successors.register_successor(r);
1094  if ( my_active )
1095  spawn_put();
1096  return true;
1097  }
1098 
1100  /* override */ bool remove_successor( successor_type &r ) {
1101  spin_mutex::scoped_lock lock(my_mutex);
1102  my_successors.remove_successor(r);
1103  return true;
1104  }
1105 
1106 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1107 
1108  /*override*/ built_successors_type &built_successors() { return my_successors.built_successors(); }
1109 
1110  /*override*/void internal_add_built_successor( successor_type &r) {
1111  spin_mutex::scoped_lock lock(my_mutex);
1112  my_successors.internal_add_built_successor(r);
1113  }
1114 
1115  /*override*/void internal_delete_built_successor( successor_type &r) {
1116  spin_mutex::scoped_lock lock(my_mutex);
1117  my_successors.internal_delete_built_successor(r);
1118  }
1119 
1120  /*override*/size_t successor_count() {
1121  spin_mutex::scoped_lock lock(my_mutex);
1122  return my_successors.successor_count();
1123  }
1124 
1125  /*override*/void copy_successors(successor_list_type &v) {
1126  spin_mutex::scoped_lock l(my_mutex);
1127  my_successors.copy_successors(v);
1128  }
1129 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1130 
1132  /*override */ bool try_get( output_type &v ) {
1133  spin_mutex::scoped_lock lock(my_mutex);
1134  if ( my_reserved )
1135  return false;
1136 
1137  if ( my_has_cached_item ) {
1138  v = my_cached_item;
1139  my_has_cached_item = false;
1140  return true;
1141  }
1142  // we've been asked to provide an item, but we have none. enqueue a task to
1143  // provide one.
1144  spawn_put();
1145  return false;
1146  }
1147 
1149  /* override */ bool try_reserve( output_type &v ) {
1150  spin_mutex::scoped_lock lock(my_mutex);
1151  if ( my_reserved ) {
1152  return false;
1153  }
1154 
1155  if ( my_has_cached_item ) {
1156  v = my_cached_item;
1157  my_reserved = true;
1158  return true;
1159  } else {
1160  return false;
1161  }
1162  }
1163 
1165 
1166  /* override */ bool try_release( ) {
1167  spin_mutex::scoped_lock lock(my_mutex);
1168  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1169  my_reserved = false;
1170  if(!my_successors.empty())
1171  spawn_put();
1172  return true;
1173  }
1174 
1176  /* override */ bool try_consume( ) {
1177  spin_mutex::scoped_lock lock(my_mutex);
1178  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1179  my_reserved = false;
1180  my_has_cached_item = false;
1181  if ( !my_successors.empty() ) {
1182  spawn_put();
1183  }
1184  return true;
1185  }
1186 
1188  void activate() {
1189  spin_mutex::scoped_lock lock(my_mutex);
1190  my_active = true;
1191  if ( !my_successors.empty() )
1192  spawn_put();
1193  }
1194 
1195  template<typename Body>
1196  Body copy_function_object() {
1197  internal::source_body<output_type> &body_ref = *this->my_body;
1198  return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1199  }
1200 
1201 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1202  /*override*/void extract( ) {
1203  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1204  my_active = init_my_active;
1205  my_reserved = false;
1206  if(my_has_cached_item) my_has_cached_item = false;
1207  }
1208 #endif
1209 
1210 protected:
1211 
1213  /*override*/void reset_node( reset_flags f) {
1214  my_active = init_my_active;
1215  my_reserved =false;
1216  if(my_has_cached_item) {
1217  my_has_cached_item = false;
1218  }
1219  if(f & rf_clear_edges) my_successors.clear();
1220  if(f & rf_reset_bodies) {
1221  internal::source_body<output_type> *tmp = my_init_body->clone();
1222  delete my_body;
1223  my_body = tmp;
1224  }
1225  if(my_active)
1226  this->my_graph.add_task_to_reset_list(create_put_task());
1227  }
1228 
1229 private:
1230  spin_mutex my_mutex;
1231  bool my_active;
1232  bool init_my_active;
1234  internal::source_body<output_type> *my_init_body;
1236  bool my_reserved;
1237  bool my_has_cached_item;
1238  output_type my_cached_item;
1239 
1240  // used by apply_body_bypass, can invoke body of node.
1241  bool try_reserve_apply_body(output_type &v) {
1242  spin_mutex::scoped_lock lock(my_mutex);
1243  if ( my_reserved ) {
1244  return false;
1245  }
1246  if ( !my_has_cached_item ) {
1247  tbb::internal::fgt_begin_body( my_body );
1248  bool r = (*my_body)(my_cached_item);
1249  tbb::internal::fgt_end_body( my_body );
1250  if (r) {
1251  my_has_cached_item = true;
1252  }
1253  }
1254  if ( my_has_cached_item ) {
1255  v = my_cached_item;
1256  my_reserved = true;
1257  return true;
1258  } else {
1259  return false;
1260  }
1261  }
1262 
1263  // when resetting, and if the source_node was created with my_active == true, then
1264  // when we reset the node we must store a task to run the node, and spawn it only
1265  // after the reset is complete and is_active() is again true. This is why we don't
1266  // test for is_active() here.
1267  task* create_put_task() {
1268  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1269  internal:: source_task_bypass < source_node< output_type > >( *this ) );
1270  }
1271 
1273  /* override */ void spawn_put( ) {
1274  if(this->my_graph.is_active()) {
1275  FLOW_SPAWN( *create_put_task());
1276  }
1277  }
1278 
1279  friend class internal::source_task_bypass< source_node< output_type > >;
1281  /* override */ task * apply_body_bypass( ) {
1282  output_type v;
1283  if ( !try_reserve_apply_body(v) )
1284  return NULL;
1285 
1286  task *last_task = my_successors.try_put_task(v);
1287  if ( last_task )
1288  try_consume();
1289  else
1290  try_release();
1291  return last_task;
1292  }
1293 }; // class source_node
1294 
1295 template<typename T>
1296 struct allocate_buffer {
1297  static const bool value = false;
1298 };
1299 
1300 template<>
1301 struct allocate_buffer<queueing> {
1302  static const bool value = true;
1303 };
1304 
1306 template < typename Input, typename Output = continue_msg, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1307 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
1308 public:
1309  typedef Input input_type;
1310  typedef Output output_type;
1311  typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
1312  typedef internal::function_input_queue<input_type, Allocator> input_queue_type;
1313  typedef internal::function_output<output_type> fOutput_type;
1314  typedef typename fInput_type::predecessor_type predecessor_type;
1315  typedef typename fOutput_type::successor_type successor_type;
1316 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1317  typedef typename fInput_type::predecessor_list_type predecessor_list_type;
1318  typedef typename fOutput_type::successor_list_type successor_list_type;
1319 #endif
1320  using fInput_type::my_predecessors;
1321 
1323  // input_queue_type is allocated here, but destroyed in the function_input_base.
1324  // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1325  // be done in one place. This would be an interface-breaking change.
1326  template< typename Body >
1327  function_node( graph &g, size_t concurrency, Body body ) :
1328  graph_node(g), fInput_type(g, concurrency, body, allocate_buffer<Policy>::value ?
1329  new input_queue_type( ) : NULL ) {
1330  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph,
1331  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1332  }
1333 
1335  function_node( const function_node& src ) :
1336  graph_node(src.graph_node::my_graph),
1337  fInput_type(src, allocate_buffer<Policy>::value ? new input_queue_type : NULL),
1338  fOutput_type() {
1339  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->graph_node::my_graph,
1340  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1341  }
1342 
1343 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1344  /* override */ void set_name( const char *name ) {
1345  tbb::internal::fgt_node_desc( this, name );
1346  }
1347 #endif
1348 
1349 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1350  /*override*/void extract( ) {
1351  my_predecessors.built_predecessors().receiver_extract(*this);
1352  successors().built_successors().sender_extract(*this);
1353  }
1354 #endif
1355 
1356 protected:
1357  template< typename R, typename B > friend class run_and_put_task;
1358  template<typename X, typename Y> friend class internal::broadcast_cache;
1359  template<typename X, typename Y> friend class internal::round_robin_cache;
1360  using fInput_type::try_put_task;
1361 
1362  /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1363 
1364  // override of graph_node's reset.
1365  /*override*/void reset_node(reset_flags f) {
1366  fInput_type::reset_function_input(f);
1367  // TODO: use clear() instead.
1368  if(f & rf_clear_edges) {
1369  successors().clear();
1370  my_predecessors.clear();
1371  }
1372  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1373  __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1374  }
1375 
1376 }; // class function_node
1377 
1378 
1380 // Output is a tuple of output types.
1381 template < typename Input, typename Output, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1382 class multifunction_node :
1383  public graph_node,
1384  public internal::multifunction_input
1385  <
1386  Input,
1387  typename internal::wrap_tuple_elements<
1388  tbb::flow::tuple_size<Output>::value, // #elements in tuple
1389  internal::multifunction_output, // wrap this around each element
1390  Output // the tuple providing the types
1391  >::type,
1392  Allocator
1393  > {
1394 protected:
1395  using graph_node::my_graph;
1396  static const int N = tbb::flow::tuple_size<Output>::value;
1397 public:
1398  typedef Input input_type;
1399  typedef null_type output_type;
1401  typedef internal::multifunction_input<input_type, output_ports_type, Allocator> fInput_type;
1402  typedef internal::function_input_queue<input_type, Allocator> input_queue_type;
1403 private:
1404  typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
1405  using fInput_type::my_predecessors;
1406 public:
1407  template<typename Body>
1408  multifunction_node( graph &g, size_t concurrency, Body body ) :
1409  graph_node(g), base_type(g,concurrency, body, allocate_buffer<Policy>::value ? new input_queue_type : NULL) {
1410  tbb::internal::fgt_multioutput_node_with_body<N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1411  &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1412  this->output_ports(), this->my_body );
1413  }
1414 
1415  multifunction_node( const multifunction_node &other) :
1416  graph_node(other.graph_node::my_graph), base_type(other, allocate_buffer<Policy>::value ? new input_queue_type : NULL) {
1417  tbb::internal::fgt_multioutput_node_with_body<N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1418  &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this),
1419  this->output_ports(), this->my_body );
1420  }
1421 
1422 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1423  /* override */ void set_name( const char *name ) {
1424  tbb::internal::fgt_multioutput_node_desc( this, name );
1425  }
1426 #endif
1427 
1428 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1429  void extract( ) {
1430  my_predecessors.built_predecessors().receiver_extract(*this);
1431  base_type::extract();
1432  }
1433 #endif
1434  // all the guts are in multifunction_input...
1435 protected:
1436  /*override*/void reset_node(reset_flags f) { base_type::reset(f); }
1437 }; // multifunction_node
1438 
1440 // successors. The node has unlimited concurrency, so it does not reject inputs.
1441 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1442 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
1443  static const int N = tbb::flow::tuple_size<TupleType>::value;
1444  typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
1445 public:
1446  typedef typename base_type::output_ports_type output_ports_type;
1447  typedef typename base_type::output_type output_type;
1448 private:
1449  struct splitting_body {
1450  void operator()(const TupleType& t, output_ports_type &p) {
1451  internal::emit_element<N>::emit_this(t, p);
1452  }
1453  };
1454 public:
1455  typedef TupleType input_type;
1456  typedef Allocator allocator_type;
1457  split_node(graph &g) : base_type(g, unlimited, splitting_body()) {
1458  tbb::internal::fgt_multioutput_node<N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1459  static_cast<receiver<input_type> *>(this), this->output_ports() );
1460  }
1461 
1462  split_node( const split_node & other) : base_type(other) {
1463  tbb::internal::fgt_multioutput_node<N>( tbb::internal::FLOW_SPLIT_NODE, &this->graph_node::my_graph,
1464  static_cast<receiver<input_type> *>(this), this->output_ports() );
1465  }
1466 
1467 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1468  /* override */ void set_name( const char *name ) {
1469  tbb::internal::fgt_multioutput_node_desc( this, name );
1470  }
1471 #endif
1472 
1473 };
1474 
1476 template <typename Output>
1477 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
1478 protected:
1479  using graph_node::my_graph;
1480 public:
1481  typedef continue_msg input_type;
1482  typedef Output output_type;
1483  typedef internal::continue_input<Output> fInput_type;
1484  typedef internal::function_output<output_type> fOutput_type;
1485  typedef typename fInput_type::predecessor_type predecessor_type;
1486  typedef typename fOutput_type::successor_type successor_type;
1487 
1489  template <typename Body >
1490  continue_node( graph &g, Body body ) :
1491  graph_node(g), internal::continue_input<output_type>( g, body ) {
1492  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1493  static_cast<receiver<input_type> *>(this),
1494  static_cast<sender<output_type> *>(this), this->my_body );
1495  }
1496 
1497 
1499  template <typename Body >
1500  continue_node( graph &g, int number_of_predecessors, Body body ) :
1501  graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body ) {
1502  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1503  static_cast<receiver<input_type> *>(this),
1504  static_cast<sender<output_type> *>(this), this->my_body );
1505  }
1506 
1508  continue_node( const continue_node& src ) :
1509  graph_node(src.graph_node::my_graph), internal::continue_input<output_type>(src),
1510  internal::function_output<Output>() {
1511  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1512  static_cast<receiver<input_type> *>(this),
1513  static_cast<sender<output_type> *>(this), this->my_body );
1514  }
1515 
1516 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1517  /* override */ void set_name( const char *name ) {
1518  tbb::internal::fgt_node_desc( this, name );
1519  }
1520 #endif
1521 
1522 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1523  /*override graph_node*/ void extract() {
1524  fInput_type::my_built_predecessors.receiver_extract(*this);
1525  successors().built_successors().sender_extract(*this);
1526  }
1527 #endif
1528 
1529 protected:
1530  template< typename R, typename B > friend class run_and_put_task;
1531  template<typename X, typename Y> friend class internal::broadcast_cache;
1532  template<typename X, typename Y> friend class internal::round_robin_cache;
1533  using fInput_type::try_put_task;
1534  /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
1535 
1536  /*override*/void reset_node(reset_flags f) {
1537  fInput_type::reset_receiver(f);
1538  if(f & rf_clear_edges)successors().clear();
1539  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1540  }
1541 
1542 }; // continue_node
1543 
1544 template< typename T >
1545 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
1546 protected:
1547  using graph_node::my_graph;
1548 public:
1549  typedef T input_type;
1550  typedef T output_type;
1551  typedef typename receiver<input_type>::predecessor_type predecessor_type;
1552  typedef typename sender<output_type>::successor_type successor_type;
1553 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1554  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
1555  typedef typename sender<output_type>::built_successors_type built_successors_type;
1556  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1557  typedef typename sender<output_type>::successor_list_type successor_list_type;
1558 #endif
1559 
1560  overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
1561  my_successors.set_owner( this );
1562  tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1563  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1564  }
1565 
1566  // Copy constructor; doesn't take anything from src; default won't work
1567  overwrite_node( const overwrite_node& src ) :
1568  graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
1569  {
1570  my_successors.set_owner( this );
1571  tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
1572  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1573  }
1574 
1575  ~overwrite_node() {}
1576 
1577 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1578  /* override */ void set_name( const char *name ) {
1579  tbb::internal::fgt_node_desc( this, name );
1580  }
1581 #endif
1582 
1583  /* override */ bool register_successor( successor_type &s ) {
1584  spin_mutex::scoped_lock l( my_mutex );
1585  if (my_buffer_is_valid && this->my_graph.is_active()) {
1586  // We have a valid value that must be forwarded immediately.
1587  if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
1588  // We add the successor: it accepted our put or it rejected it but won't let us become a predecessor
1589  my_successors.register_successor( s );
1590  } else {
1591  // We don't add the successor: it rejected our put and we became its predecessor instead
1592  return false;
1593  }
1594  } else {
1595  // No valid value yet, just add as successor
1596  my_successors.register_successor( s );
1597  }
1598  return true;
1599  }
1600 
1601  /* override */ bool remove_successor( successor_type &s ) {
1602  spin_mutex::scoped_lock l( my_mutex );
1603  my_successors.remove_successor(s);
1604  return true;
1605  }
1606 
1607 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1608  /*override*/built_predecessors_type &built_predecessors() { return my_built_predecessors; }
1609  /*override*/built_successors_type &built_successors() { return my_successors.built_successors(); }
1610 
1611  /*override*/void internal_add_built_successor( successor_type &s) {
1612  spin_mutex::scoped_lock l( my_mutex );
1613  my_successors.internal_add_built_successor(s);
1614  }
1615 
1616  /*override*/void internal_delete_built_successor( successor_type &s) {
1617  spin_mutex::scoped_lock l( my_mutex );
1618  my_successors.internal_delete_built_successor(s);
1619  }
1620 
1621  /*override*/size_t successor_count() {
1622  spin_mutex::scoped_lock l( my_mutex );
1623  return my_successors.successor_count();
1624  }
1625 
1626  /*override*/ void copy_successors(successor_list_type &v) {
1627  spin_mutex::scoped_lock l( my_mutex );
1628  my_successors.copy_successors(v);
1629  }
1630 
1631  /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1632  spin_mutex::scoped_lock l( my_mutex );
1633  my_built_predecessors.add_edge(p);
1634  }
1635 
1636  /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1637  spin_mutex::scoped_lock l( my_mutex );
1638  my_built_predecessors.delete_edge(p);
1639  }
1640 
1641  /*override*/size_t predecessor_count() {
1642  spin_mutex::scoped_lock l( my_mutex );
1643  return my_built_predecessors.edge_count();
1644  }
1645 
1646  /*override*/void copy_predecessors(predecessor_list_type &v) {
1647  spin_mutex::scoped_lock l( my_mutex );
1648  my_built_predecessors.copy_edges(v);
1649  }
1650 
1651  /*override*/ void extract() {
1652  my_buffer_is_valid = false;
1653  built_successors().sender_extract(*this);
1654  built_predecessors().receiver_extract(*this);
1655  }
1656 
1657 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1658 
1659  /* override */ bool try_get( input_type &v ) {
1660  spin_mutex::scoped_lock l( my_mutex );
1661  if ( my_buffer_is_valid ) {
1662  v = my_buffer;
1663  return true;
1664  }
1665  return false;
1666  }
1667 
1668  bool is_valid() {
1669  spin_mutex::scoped_lock l( my_mutex );
1670  return my_buffer_is_valid;
1671  }
1672 
1673  void clear() {
1674  spin_mutex::scoped_lock l( my_mutex );
1675  my_buffer_is_valid = false;
1676  }
1677 
1678 protected:
1679  template< typename R, typename B > friend class run_and_put_task;
1680  template<typename X, typename Y> friend class internal::broadcast_cache;
1681  template<typename X, typename Y> friend class internal::round_robin_cache;
1682  /* override */ task * try_put_task( const input_type &v ) {
1683  spin_mutex::scoped_lock l( my_mutex );
1684  return try_put_task_impl(v);
1685  }
1686 
1687  task * try_put_task_impl(const input_type &v) {
1688  my_buffer = v;
1689  my_buffer_is_valid = true;
1690  task * rtask = my_successors.try_put_task(v);
1691  if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
1692  return rtask;
1693  }
1694 
1695  spin_mutex my_mutex;
1697 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1698  internal::edge_container<predecessor_type> my_built_predecessors;
1699 #endif
1700  input_type my_buffer;
1701  bool my_buffer_is_valid;
1702  /*override*/void reset_receiver(reset_flags /*f*/) {}
1703 
1704  /*override*/void reset_node( reset_flags f) {
1705  my_buffer_is_valid = false;
1706  if (f&rf_clear_edges) {
1707  my_successors.clear();
1708  }
1709  }
1710 }; // overwrite_node
1711 
1712 template< typename T >
1713 class write_once_node : public overwrite_node<T> {
1714 public:
1715  typedef T input_type;
1716  typedef T output_type;
1717  typedef typename receiver<input_type>::predecessor_type predecessor_type;
1718  typedef typename sender<output_type>::successor_type successor_type;
1719 
1721  write_once_node(graph& g) : overwrite_node<T>(g) {
1722  tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1723  static_cast<receiver<input_type> *>(this),
1724  static_cast<sender<output_type> *>(this) );
1725  }
1726 
1728  write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {
1729  tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
1730  static_cast<receiver<input_type> *>(this),
1731  static_cast<sender<output_type> *>(this) );
1732  }
1733 
1734 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1735  /* override */ void set_name( const char *name ) {
1736  tbb::internal::fgt_node_desc( this, name );
1737  }
1738 #endif
1739 
1740 protected:
1741  template< typename R, typename B > friend class run_and_put_task;
1742  template<typename X, typename Y> friend class internal::broadcast_cache;
1743  template<typename X, typename Y> friend class internal::round_robin_cache;
1744  /* override */ task *try_put_task( const T &v ) {
1745  spin_mutex::scoped_lock l( this->my_mutex );
1746  return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
1747  }
1748 };
1749 
1751 template <typename T>
1752 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1753 protected:
1754  using graph_node::my_graph;
1755 public:
1756  typedef T input_type;
1757  typedef T output_type;
1758  typedef typename receiver<input_type>::predecessor_type predecessor_type;
1759  typedef typename sender<output_type>::successor_type successor_type;
1760 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1761  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1762  typedef typename sender<output_type>::successor_list_type successor_list_type;
1763 #endif
1764 private:
1766 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1767  internal::edge_container<predecessor_type> my_built_predecessors;
1768  spin_mutex pred_mutex; // serialize accesses on edge_container
1769 #endif
1770 public:
1771 
1772  broadcast_node(graph& g) : graph_node(g) {
1773  my_successors.set_owner( this );
1774  tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1775  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1776  }
1777 
1778  // Copy constructor
1779  broadcast_node( const broadcast_node& src ) :
1780  graph_node(src.my_graph), receiver<T>(), sender<T>()
1781  {
1782  my_successors.set_owner( this );
1783  tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1784  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1785  }
1786 
1787 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1788  /* override */ void set_name( const char *name ) {
1789  tbb::internal::fgt_node_desc( this, name );
1790  }
1791 #endif
1792 
1794  virtual bool register_successor( successor_type &r ) {
1795  my_successors.register_successor( r );
1796  return true;
1797  }
1798 
1800  virtual bool remove_successor( successor_type &r ) {
1801  my_successors.remove_successor( r );
1802  return true;
1803  }
1804 
1805 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1806  typedef typename sender<T>::built_successors_type built_successors_type;
1807 
1808  /*override sender*/ built_successors_type &built_successors() { return my_successors.built_successors(); }
1809 
1810  /*override sender*/ void internal_add_built_successor(successor_type &r) {
1811  my_successors.internal_add_built_successor(r);
1812  }
1813 
1814  /*override sender*/ void internal_delete_built_successor(successor_type &r) {
1815  my_successors.internal_delete_built_successor(r);
1816  }
1817 
1818  /*override sender*/ size_t successor_count() {
1819  return my_successors.successor_count();
1820  }
1821 
1822  /*override*/ void copy_successors(successor_list_type &v) {
1823  my_successors.copy_successors(v);
1824  }
1825 
1826  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1827 
1828  /*override receiver*/ built_predecessors_type &built_predecessors() { return my_built_predecessors; }
1829 
1830  /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
1831  spin_mutex::scoped_lock l(pred_mutex);
1832  my_built_predecessors.add_edge(p);
1833  }
1834 
1835  /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
1836  spin_mutex::scoped_lock l(pred_mutex);
1837  my_built_predecessors.delete_edge(p);
1838  }
1839 
1840  /*override*/ size_t predecessor_count() {
1841  spin_mutex::scoped_lock l(pred_mutex);
1842  return my_built_predecessors.edge_count();
1843  }
1844 
1845  /*override*/ void copy_predecessors(predecessor_list_type &v) {
1846  spin_mutex::scoped_lock l(pred_mutex);
1847  my_built_predecessors.copy_edges(v);
1848  }
1849 
1850  /*override graph_node*/ void extract() {
1851  my_built_predecessors.receiver_extract(*this);
1852  my_successors.built_successors().sender_extract(*this);
1853  }
1854 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
1855 
1856 protected:
1857  template< typename R, typename B > friend class run_and_put_task;
1858  template<typename X, typename Y> friend class internal::broadcast_cache;
1859  template<typename X, typename Y> friend class internal::round_robin_cache;
1861  /*override*/ task *try_put_task(const T& t) {
1862  task *new_task = my_successors.try_put_task(t);
1863  if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1864  return new_task;
1865  }
1866 
1867  /*override*/void reset_receiver(reset_flags /*f*/) {}
1868 
1869  /*override*/void reset_node(reset_flags f) {
1870  if (f&rf_clear_edges) {
1871  my_successors.clear();
1872 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1873  my_built_predecessors.clear();
1874 #endif
1875  }
1876  __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1877  }
1878 }; // broadcast_node
1879 
1881 template <typename T, typename A=cache_aligned_allocator<T> >
1882 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1883 protected:
1884  using graph_node::my_graph;
1885 public:
1886  typedef T input_type;
1887  typedef T output_type;
1888  typedef typename receiver<input_type>::predecessor_type predecessor_type;
1889  typedef typename sender<output_type>::successor_type successor_type;
1890  typedef buffer_node<T, A> class_type;
1891 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1892  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1893  typedef typename sender<output_type>::successor_list_type successor_list_type;
1894 #endif
1895 protected:
1896  typedef size_t size_type;
1898 
1899 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1900  internal::edge_container<predecessor_type> my_built_predecessors;
1901 #endif
1902 
1903  friend class internal::forward_task_bypass< buffer_node< T, A > >;
1904 
1905  enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1906 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1907  , add_blt_succ, del_blt_succ,
1908  add_blt_pred, del_blt_pred,
1909  blt_succ_cnt, blt_pred_cnt,
1910  blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
1911 #endif
1912  };
1913  enum op_stat {WAIT=0, SUCCEEDED, FAILED};
1914 
1915  // implements the aggregator_operation concept
1916  class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1917  public:
1918  char type;
1919 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1920  task * ltask;
1921  union {
1922  input_type *elem;
1923  successor_type *r;
1924  predecessor_type *p;
1925  size_t cnt_val;
1926  successor_list_type *svec;
1927  predecessor_list_type *pvec;
1928  };
1929 #else
1930  T *elem;
1931  task * ltask;
1932  successor_type *r;
1933 #endif
1934  buffer_operation(const T& e, op_type t) : type(char(t))
1935 
1936 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1937  , ltask(NULL), elem(const_cast<T*>(&e))
1938 #else
1939  , elem(const_cast<T*>(&e)) , ltask(NULL)
1940 #endif
1941  {}
1942  buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1943  };
1944 
1945  bool forwarder_busy;
1947  friend class internal::aggregating_functor<class_type, buffer_operation>;
1949 
1950  virtual void handle_operations(buffer_operation *op_list) {
1951  handle_operations_impl(op_list, this);
1952  }
1953 
1954  template<typename derived_type>
1955  void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1956  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1957 
1958  buffer_operation *tmp = NULL;
1959  bool try_forwarding=false;
1960  while (op_list) {
1961  tmp = op_list;
1962  op_list = op_list->next;
1963  switch (tmp->type) {
1964  case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1965  case rem_succ: internal_rem_succ(tmp); break;
1966  case req_item: internal_pop(tmp); break;
1967  case res_item: internal_reserve(tmp); break;
1968  case rel_res: internal_release(tmp); try_forwarding = true; break;
1969  case con_res: internal_consume(tmp); try_forwarding = true; break;
1970  case put_item: internal_push(tmp); try_forwarding = (tmp->status == SUCCEEDED); break;
1971  case try_fwd_task: internal_forward_task(tmp); break;
1972 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
1973  // edge recording
1974  case add_blt_succ: internal_add_built_succ(tmp); break;
1975  case del_blt_succ: internal_del_built_succ(tmp); break;
1976  case add_blt_pred: internal_add_built_pred(tmp); break;
1977  case del_blt_pred: internal_del_built_pred(tmp); break;
1978  case blt_succ_cnt: internal_succ_cnt(tmp); break;
1979  case blt_pred_cnt: internal_pred_cnt(tmp); break;
1980  case blt_succ_cpy: internal_copy_succs(tmp); break;
1981  case blt_pred_cpy: internal_copy_preds(tmp); break;
1982 #endif
1983  }
1984  }
1985 
1986  derived->order();
1987 
1988  if (try_forwarding && !forwarder_busy) {
1989  if(this->my_graph.is_active()) {
1990  forwarder_busy = true;
1991  task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
1992  forward_task_bypass
1993  < buffer_node<input_type, A> >(*this);
1994  // tmp should point to the last item handled by the aggregator. This is the operation
1995  // the handling thread enqueued. So modifying that record will be okay.
1996  tbb::task *z = tmp->ltask;
1997  tmp->ltask = combine_tasks(z, new_task); // in case the op generated a task
1998  }
1999  }
2000  } // handle_operations
2001 
2002  inline task *grab_forwarding_task( buffer_operation &op_data) {
2003  return op_data.ltask;
2004  }
2005 
2006  inline bool enqueue_forwarding_task(buffer_operation &op_data) {
2007  task *ft = grab_forwarding_task(op_data);
2008  if(ft) {
2009  FLOW_SPAWN(*ft);
2010  return true;
2011  }
2012  return false;
2013  }
2014 
2016  virtual task *forward_task() {
2017  buffer_operation op_data(try_fwd_task);
2018  task *last_task = NULL;
2019  do {
2020  op_data.status = WAIT;
2021  op_data.ltask = NULL;
2022  my_aggregator.execute(&op_data);
2023  tbb::task *xtask = op_data.ltask;
2024  last_task = combine_tasks(last_task, xtask);
2025  } while (op_data.status == SUCCEEDED);
2026  return last_task;
2027  }
2028 
2030  virtual void internal_reg_succ(buffer_operation *op) {
2031  my_successors.register_successor(*(op->r));
2032  __TBB_store_with_release(op->status, SUCCEEDED);
2033  }
2034 
2036  virtual void internal_rem_succ(buffer_operation *op) {
2037  my_successors.remove_successor(*(op->r));
2038  __TBB_store_with_release(op->status, SUCCEEDED);
2039  }
2040 
2041 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2042  typedef typename sender<T>::built_successors_type built_successors_type;
2043 
2044  /*override sender*/ built_successors_type &built_successors() { return my_successors.built_successors(); }
2045 
2046  virtual void internal_add_built_succ(buffer_operation *op) {
2047  my_successors.internal_add_built_successor(*(op->r));
2048  __TBB_store_with_release(op->status, SUCCEEDED);
2049  }
2050 
2051  virtual void internal_del_built_succ(buffer_operation *op) {
2052  my_successors.internal_delete_built_successor(*(op->r));
2053  __TBB_store_with_release(op->status, SUCCEEDED);
2054  }
2055 
2056  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
2057 
2058  /*override receiver*/ built_predecessors_type &built_predecessors() { return my_built_predecessors; }
2059 
2060  virtual void internal_add_built_pred(buffer_operation *op) {
2061  my_built_predecessors.add_edge(*(op->p));
2062  __TBB_store_with_release(op->status, SUCCEEDED);
2063  }
2064 
2065  virtual void internal_del_built_pred(buffer_operation *op) {
2066  my_built_predecessors.delete_edge(*(op->p));
2067  __TBB_store_with_release(op->status, SUCCEEDED);
2068  }
2069 
2070  virtual void internal_succ_cnt(buffer_operation *op) {
2071  op->cnt_val = my_successors.successor_count();
2072  __TBB_store_with_release(op->status, SUCCEEDED);
2073  }
2074 
2075  virtual void internal_pred_cnt(buffer_operation *op) {
2076  op->cnt_val = my_built_predecessors.edge_count();
2077  __TBB_store_with_release(op->status, SUCCEEDED);
2078  }
2079 
2080  virtual void internal_copy_succs(buffer_operation *op) {
2081  my_successors.copy_successors(*(op->svec));
2082  __TBB_store_with_release(op->status, SUCCEEDED);
2083  }
2084 
2085  virtual void internal_copy_preds(buffer_operation *op) {
2086  my_built_predecessors.copy_edges(*(op->pvec));
2087  __TBB_store_with_release(op->status, SUCCEEDED);
2088  }
2089 
2090 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
2091 
2092 private:
2093  void order() {}
2094 
2095  bool is_item_valid() {
2096  return this->my_item_valid(this->my_tail - 1);
2097  }
2098 
2099  void try_put_and_add_task(task*& last_task) {
2100  task *new_task = my_successors.try_put_task(this->back());
2101  if (new_task) {
2102  last_task = combine_tasks(last_task, new_task);
2103  this->destroy_back();
2104  }
2105  }
2106 
2107 protected:
2109  virtual void internal_forward_task(buffer_operation *op) {
2110  internal_forward_task_impl(op, this);
2111  }
2112 
2113  template<typename derived_type>
2114  void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
2115  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2116 
2117  if (this->my_reserved || !derived->is_item_valid()) {
2118  __TBB_store_with_release(op->status, FAILED);
2119  this->forwarder_busy = false;
2120  return;
2121  }
2122  // Try forwarding, giving each successor a chance
2123  task * last_task = NULL;
2124  size_type counter = my_successors.size();
2125  for (; counter > 0 && derived->is_item_valid(); --counter)
2126  derived->try_put_and_add_task(last_task);
2127 
2128  op->ltask = last_task; // return task
2129  if (last_task && !counter) {
2130  __TBB_store_with_release(op->status, SUCCEEDED);
2131  }
2132  else {
2133  __TBB_store_with_release(op->status, FAILED);
2134  forwarder_busy = false;
2135  }
2136  }
2137 
2138  virtual void internal_push(buffer_operation *op) {
2139  this->push_back(*(op->elem));
2140  __TBB_store_with_release(op->status, SUCCEEDED);
2141  }
2142 
2143  virtual void internal_pop(buffer_operation *op) {
2144  if(this->pop_back(*(op->elem))) {
2145  __TBB_store_with_release(op->status, SUCCEEDED);
2146  }
2147  else {
2148  __TBB_store_with_release(op->status, FAILED);
2149  }
2150  }
2151 
2152  virtual void internal_reserve(buffer_operation *op) {
2153  if(this->reserve_front(*(op->elem))) {
2154  __TBB_store_with_release(op->status, SUCCEEDED);
2155  }
2156  else {
2157  __TBB_store_with_release(op->status, FAILED);
2158  }
2159  }
2160 
2161  virtual void internal_consume(buffer_operation *op) {
2162  this->consume_front();
2163  __TBB_store_with_release(op->status, SUCCEEDED);
2164  }
2165 
2166  virtual void internal_release(buffer_operation *op) {
2167  this->release_front();
2168  __TBB_store_with_release(op->status, SUCCEEDED);
2169  }
2170 
2171 public:
2173  buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
2174  forwarder_busy(false) {
2175  my_successors.set_owner(this);
2176  my_aggregator.initialize_handler(handler_type(this));
2177  tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2178  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2179  }
2180 
2182  buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
2184  forwarder_busy = false;
2185  my_successors.set_owner(this);
2186  my_aggregator.initialize_handler(handler_type(this));
2187  tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2188  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2189  }
2190 
2191  virtual ~buffer_node() {}
2192 
2193 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2194  /* override */ void set_name( const char *name ) {
2195  tbb::internal::fgt_node_desc( this, name );
2196  }
2197 #endif
2198 
2199  //
2200  // message sender implementation
2201  //
2202 
2204 
2205  /* override */ bool register_successor( successor_type &r ) {
2206  buffer_operation op_data(reg_succ);
2207  op_data.r = &r;
2208  my_aggregator.execute(&op_data);
2209  (void)enqueue_forwarding_task(op_data);
2210  return true;
2211  }
2212 
2213 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2214  /*override*/ void internal_add_built_successor( successor_type &r) {
2215  buffer_operation op_data(add_blt_succ);
2216  op_data.r = &r;
2217  my_aggregator.execute(&op_data);
2218  }
2219 
2220  /*override*/ void internal_delete_built_successor( successor_type &r) {
2221  buffer_operation op_data(del_blt_succ);
2222  op_data.r = &r;
2223  my_aggregator.execute(&op_data);
2224  }
2225 
2226  /*override*/ void internal_add_built_predecessor( predecessor_type &p) {
2227  buffer_operation op_data(add_blt_pred);
2228  op_data.p = &p;
2229  my_aggregator.execute(&op_data);
2230  }
2231 
2232  /*override*/ void internal_delete_built_predecessor( predecessor_type &p) {
2233  buffer_operation op_data(del_blt_pred);
2234  op_data.p = &p;
2235  my_aggregator.execute(&op_data);
2236  }
2237 
2238  /*override*/ size_t predecessor_count() {
2239  buffer_operation op_data(blt_pred_cnt);
2240  my_aggregator.execute(&op_data);
2241  return op_data.cnt_val;
2242  }
2243 
2244  /*override*/ size_t successor_count() {
2245  buffer_operation op_data(blt_succ_cnt);
2246  my_aggregator.execute(&op_data);
2247  return op_data.cnt_val;
2248  }
2249 
2250  /*override*/ void copy_predecessors( predecessor_list_type &v ) {
2251  buffer_operation op_data(blt_pred_cpy);
2252  op_data.pvec = &v;
2253  my_aggregator.execute(&op_data);
2254  }
2255 
2256  /*override*/ void copy_successors( successor_list_type &v ) {
2257  buffer_operation op_data(blt_succ_cpy);
2258  op_data.svec = &v;
2259  my_aggregator.execute(&op_data);
2260  }
2261 
2262 #endif
2263 
2265 
2267  /* override */ bool remove_successor( successor_type &r ) {
2268  r.remove_predecessor(*this);
2269  buffer_operation op_data(rem_succ);
2270  op_data.r = &r;
2271  my_aggregator.execute(&op_data);
2272  // even though this operation does not cause a forward, if we are the handler, and
2273  // a forward is scheduled, we may be the first to reach this point after the aggregator,
2274  // and so should check for the task.
2275  (void)enqueue_forwarding_task(op_data);
2276  return true;
2277  }
2278 
2280 
2282  /* override */ bool try_get( T &v ) {
2283  buffer_operation op_data(req_item);
2284  op_data.elem = &v;
2285  my_aggregator.execute(&op_data);
2286  (void)enqueue_forwarding_task(op_data);
2287  return (op_data.status==SUCCEEDED);
2288  }
2289 
2291 
2293  /* override */ bool try_reserve( T &v ) {
2294  buffer_operation op_data(res_item);
2295  op_data.elem = &v;
2296  my_aggregator.execute(&op_data);
2297  (void)enqueue_forwarding_task(op_data);
2298  return (op_data.status==SUCCEEDED);
2299  }
2300 
2302 
2303  /* override */ bool try_release() {
2304  buffer_operation op_data(rel_res);
2305  my_aggregator.execute(&op_data);
2306  (void)enqueue_forwarding_task(op_data);
2307  return true;
2308  }
2309 
2311 
2312  /* override */ bool try_consume() {
2313  buffer_operation op_data(con_res);
2314  my_aggregator.execute(&op_data);
2315  (void)enqueue_forwarding_task(op_data);
2316  return true;
2317  }
2318 
2319 protected:
2320 
2321  template< typename R, typename B > friend class run_and_put_task;
2322  template<typename X, typename Y> friend class internal::broadcast_cache;
2323  template<typename X, typename Y> friend class internal::round_robin_cache;
2325  /* override */ task *try_put_task(const T &t) {
2326  buffer_operation op_data(t, put_item);
2327  my_aggregator.execute(&op_data);
2328  task *ft = grab_forwarding_task(op_data);
2329  // sequencer_nodes can return failure (if an item has been previously inserted)
2330  // We have to spawn the returned task if our own operation fails.
2331 
2332  if(ft && op_data.status == FAILED) {
2333  // we haven't succeeded queueing the item, but for some reason the
2334  // call returned a task (if another request resulted in a successful
2335  // forward this could happen.) Queue the task and reset the pointer.
2336  FLOW_SPAWN(*ft); ft = NULL;
2337  }
2338  else if(!ft && op_data.status == SUCCEEDED) {
2339  ft = SUCCESSFULLY_ENQUEUED;
2340  }
2341  return ft;
2342  }
2343 
2344  /*override*/void reset_receiver(reset_flags /*f*/) { }
2345 
2346 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2347 public:
2348  /* override*/ void extract() {
2349  my_built_predecessors.receiver_extract(*this);
2350  my_successors.built_successors().sender_extract(*this);
2351  }
2352 #endif
2353 
2354 protected:
2355  /*override*/void reset_node( reset_flags f) {
2357  // TODO: just clear structures
2358  if (f&rf_clear_edges) {
2359  my_successors.clear();
2360 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2361  my_built_predecessors.clear();
2362 #endif
2363  }
2364  forwarder_busy = false;
2365  }
2366 
2367 
2368 }; // buffer_node
2369 
2371 template <typename T, typename A=cache_aligned_allocator<T> >
2372 class queue_node : public buffer_node<T, A> {
2373 protected:
2374  typedef buffer_node<T, A> base_type;
2375  typedef typename base_type::size_type size_type;
2376  typedef typename base_type::buffer_operation queue_operation;
2377  typedef queue_node class_type;
2378 
2379  enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2380 
2381 private:
2382  template<typename, typename> friend class buffer_node;
2383 
2384  bool is_item_valid() {
2385  return this->my_item_valid(this->my_head);
2386  }
2387 
2388  void try_put_and_add_task(task*& last_task) {
2389  task *new_task = this->my_successors.try_put_task(this->front());
2390  if (new_task) {
2391  last_task = combine_tasks(last_task, new_task);
2392  this->destroy_front();
2393  }
2394  }
2395 
2396 protected:
2397  /* override */ void internal_forward_task(queue_operation *op) {
2398  this->internal_forward_task_impl(op, this);
2399  }
2400 
2401  /* override */ void internal_pop(queue_operation *op) {
2402  if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2403  __TBB_store_with_release(op->status, FAILED);
2404  }
2405  else {
2406  this->pop_front(*(op->elem));
2407  __TBB_store_with_release(op->status, SUCCEEDED);
2408  }
2409  }
2410  /* override */ void internal_reserve(queue_operation *op) {
2411  if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2412  __TBB_store_with_release(op->status, FAILED);
2413  }
2414  else {
2415  this->reserve_front(*(op->elem));
2416  __TBB_store_with_release(op->status, SUCCEEDED);
2417  }
2418  }
2419  /* override */ void internal_consume(queue_operation *op) {
2420  this->consume_front();
2421  __TBB_store_with_release(op->status, SUCCEEDED);
2422  }
2423 
2424 public:
2425  typedef T input_type;
2426  typedef T output_type;
2427  typedef typename receiver<input_type>::predecessor_type predecessor_type;
2428  typedef typename sender<output_type>::successor_type successor_type;
2429 
2431  queue_node( graph &g ) : base_type(g) {
2432  tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2433  static_cast<receiver<input_type> *>(this),
2434  static_cast<sender<output_type> *>(this) );
2435  }
2436 
2438  queue_node( const queue_node& src) : base_type(src) {
2439  tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2440  static_cast<receiver<input_type> *>(this),
2441  static_cast<sender<output_type> *>(this) );
2442  }
2443 
2444 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2445  /* override */ void set_name( const char *name ) {
2446  tbb::internal::fgt_node_desc( this, name );
2447  }
2448 #endif
2449 
2450 protected:
2451  /*override*/void reset_node( reset_flags f) {
2452  base_type::reset_node(f);
2453  }
2454 }; // queue_node
2455 
2457 template< typename T, typename A=cache_aligned_allocator<T> >
2458 class sequencer_node : public queue_node<T, A> {
2460  // my_sequencer should be a benign function and must be callable
2461  // from a parallel context. Does this mean it needn't be reset?
2462 public:
2463  typedef T input_type;
2464  typedef T output_type;
2465  typedef typename receiver<input_type>::predecessor_type predecessor_type;
2466  typedef typename sender<output_type>::successor_type successor_type;
2467 
2469  template< typename Sequencer >
2470  sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2472  tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2473  static_cast<receiver<input_type> *>(this),
2474  static_cast<sender<output_type> *>(this) );
2475  }
2476 
2478  sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2479  my_sequencer( src.my_sequencer->clone() ) {
2480  tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2481  static_cast<receiver<input_type> *>(this),
2482  static_cast<sender<output_type> *>(this) );
2483  }
2484 
2486  ~sequencer_node() { delete my_sequencer; }
2487 
2488 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2489  /* override */ void set_name( const char *name ) {
2490  tbb::internal::fgt_node_desc( this, name );
2491  }
2492 #endif
2493 
2494 protected:
2495  typedef typename buffer_node<T, A>::size_type size_type;
2496  typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
2497 
2498  enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2499 
2500 private:
2501  /* override */ void internal_push(sequencer_operation *op) {
2502  size_type tag = (*my_sequencer)(*(op->elem));
2503 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2504  if (tag < this->my_head) {
2505  // have already emitted a message with this tag
2506  __TBB_store_with_release(op->status, FAILED);
2507  return;
2508  }
2509 #endif
2510  // cannot modify this->my_tail now; the buffer would be inconsistent.
2511  size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2512 
2513  if (this->size(new_tail) > this->capacity()) {
2514  this->grow_my_array(this->size(new_tail));
2515  }
2516  this->my_tail = new_tail;
2517  if (this->place_item(tag, *(op->elem))) {
2518  __TBB_store_with_release(op->status, SUCCEEDED);
2519  }
2520  else {
2521  // already have a message with this tag
2522  __TBB_store_with_release(op->status, FAILED);
2523  }
2524  }
2525 }; // sequencer_node
2526 
2528 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2529 class priority_queue_node : public buffer_node<T, A> {
2530 public:
2531  typedef T input_type;
2532  typedef T output_type;
2533  typedef buffer_node<T,A> base_type;
2534  typedef priority_queue_node class_type;
2535  typedef typename receiver<input_type>::predecessor_type predecessor_type;
2536  typedef typename sender<output_type>::successor_type successor_type;
2537 
2539  priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {
2540  tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2541  static_cast<receiver<input_type> *>(this),
2542  static_cast<sender<output_type> *>(this) );
2543  }
2544 
2546  priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2547  tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2548  static_cast<receiver<input_type> *>(this),
2549  static_cast<sender<output_type> *>(this) );
2550  }
2551 
2552 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2553  /* override */ void set_name( const char *name ) {
2554  tbb::internal::fgt_node_desc( this, name );
2555  }
2556 #endif
2557 
2558 
2559 protected:
2560 
2561  /*override*/void reset_node( reset_flags f) {
2562  mark = 0;
2563  base_type::reset_node(f);
2564  }
2565 
2566  typedef typename buffer_node<T, A>::size_type size_type;
2567  typedef typename buffer_node<T, A>::item_type item_type;
2568  typedef typename buffer_node<T, A>::buffer_operation prio_operation;
2569 
2570  enum op_stat {WAIT=0, SUCCEEDED, FAILED};
2571 
2573  /* override */ void internal_forward_task(prio_operation *op) {
2574  this->internal_forward_task_impl(op, this);
2575  }
2576 
2577  /* override */ void handle_operations(prio_operation *op_list) {
2578  this->handle_operations_impl(op_list, this);
2579  }
2580 
2581  /* override */ void internal_push(prio_operation *op) {
2582  prio_push(*(op->elem));
2583  __TBB_store_with_release(op->status, SUCCEEDED);
2584  }
2585 
2586  /* override */ void internal_pop(prio_operation *op) {
2587  // if empty or already reserved, don't pop
2588  if ( this->my_reserved == true || this->my_tail == 0 ) {
2589  __TBB_store_with_release(op->status, FAILED);
2590  return;
2591  }
2592 
2593  *(op->elem) = prio();
2594  __TBB_store_with_release(op->status, SUCCEEDED);
2595  prio_pop();
2596 
2597  }
2598 
2599  // pops the highest-priority item, saves copy
2600  /* override */ void internal_reserve(prio_operation *op) {
2601  if (this->my_reserved == true || this->my_tail == 0) {
2602  __TBB_store_with_release(op->status, FAILED);
2603  return;
2604  }
2605  this->my_reserved = true;
2606  *(op->elem) = prio();
2607  reserved_item = *(op->elem);
2608  __TBB_store_with_release(op->status, SUCCEEDED);
2609  prio_pop();
2610  }
2611 
2612  /* override */ void internal_consume(prio_operation *op) {
2613  __TBB_store_with_release(op->status, SUCCEEDED);
2614  this->my_reserved = false;
2615  reserved_item = input_type();
2616  }
2617 
2618  /* override */ void internal_release(prio_operation *op) {
2619  __TBB_store_with_release(op->status, SUCCEEDED);
2620  prio_push(reserved_item);
2621  this->my_reserved = false;
2622  reserved_item = input_type();
2623  }
2624 
2625 private:
2626  template<typename, typename> friend class buffer_node;
2627 
2628  void order() {
2629  if (mark < this->my_tail) heapify();
2630  __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2631  }
2632 
2633  bool is_item_valid() {
2634  return this->my_tail > 0;
2635  }
2636 
2637  void try_put_and_add_task(task*& last_task) {
2638  task * new_task = this->my_successors.try_put_task(this->prio());
2639  if (new_task) {
2640  last_task = combine_tasks(last_task, new_task);
2641  prio_pop();
2642  }
2643  }
2644 
2645 private:
2646  Compare compare;
2647  size_type mark;
2648 
2649  input_type reserved_item;
2650 
2651  // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2652  bool prio_use_tail() {
2653  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2654  return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2655  }
2656 
2657  // prio_push: checks that the item will fit, expand array if necessary, put at end
2658  void prio_push(const T &src) {
2659  if ( this->my_tail >= this->my_array_size )
2660  this->grow_my_array( this->my_tail + 1 );
2661  (void) this->place_item(this->my_tail, src);
2662  ++(this->my_tail);
2663  __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2664  }
2665 
2666  // prio_pop: deletes highest priority item from the array, and if it is item
2667  // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2668  // and mark. Assumes the array has already been tested for emptiness; no failure.
2669  void prio_pop() {
2670  if (prio_use_tail()) {
2671  // there are newly pushed elems; last one higher than top
2672  // copy the data
2673  this->destroy_item(this->my_tail-1);
2674  --(this->my_tail);
2675  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2676  return;
2677  }
2678  this->destroy_item(0);
2679  if(this->my_tail > 1) {
2680  // push the last element down heap
2681  __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2682  this->move_item(0,this->my_tail - 1);
2683  }
2684  --(this->my_tail);
2685  if(mark > this->my_tail) --mark;
2686  if (this->my_tail > 1) // don't reheap for heap of size 1
2687  reheap();
2688  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2689  }
2690 
2691  const T& prio() {
2692  return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2693  }
2694 
2695  // turn array into heap
2696  void heapify() {
2697  if(this->my_tail == 0) {
2698  mark = 0;
2699  return;
2700  }
2701  if (!mark) mark = 1;
2702  for (; mark<this->my_tail; ++mark) { // for each unheaped element
2703  size_type cur_pos = mark;
2704  input_type to_place;
2705  this->fetch_item(mark,to_place);
2706  do { // push to_place up the heap
2707  size_type parent = (cur_pos-1)>>1;
2708  if (!compare(this->get_my_item(parent), to_place))
2709  break;
2710  this->move_item(cur_pos, parent);
2711  cur_pos = parent;
2712  } while( cur_pos );
2713  (void) this->place_item(cur_pos, to_place);
2714  }
2715  }
2716 
2717  // otherwise heapified array with new root element; rearrange to heap
2718  void reheap() {
2719  size_type cur_pos=0, child=1;
2720  while (child < mark) {
2721  size_type target = child;
2722  if (child+1<mark &&
2723  compare(this->get_my_item(child),
2724  this->get_my_item(child+1)))
2725  ++target;
2726  // target now has the higher priority child
2727  if (compare(this->get_my_item(target),
2728  this->get_my_item(cur_pos)))
2729  break;
2730  // swap
2731  this->swap_items(cur_pos, target);
2732  cur_pos = target;
2733  child = (cur_pos<<1)+1;
2734  }
2735  }
2736 }; // priority_queue_node
2737 
2739 
2742 template< typename T >
2743 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2744 protected:
2745  using graph_node::my_graph;
2746 public:
2747  typedef T input_type;
2748  typedef T output_type;
2749  typedef typename receiver<input_type>::predecessor_type predecessor_type;
2750  typedef typename sender<output_type>::successor_type successor_type;
2751 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2752  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2753  typedef typename sender<output_type>::built_successors_type built_successors_type;
2754  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2755  typedef typename sender<output_type>::successor_list_type successor_list_type;
2756 #endif
2757 
2758 private:
2759  size_t my_threshold;
2760  size_t my_count; //number of successful puts
2761  size_t my_tries; //number of active put attempts
2763  spin_mutex my_mutex;
2764  internal::broadcast_cache< T > my_successors;
2765  int init_decrement_predecessors;
2766 
2767  friend class internal::forward_task_bypass< limiter_node<T> >;
2768 
2769  // Let decrementer call decrement_counter()
2770  friend class internal::decrementer< limiter_node<T> >;
2771 
2772  bool check_conditions() { // always called under lock
2773  return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2774  }
2775 
2776  // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
2777  task *forward_task() {
2778  input_type v;
2779  task *rval = NULL;
2780  bool reserved = false;
2781  {
2782  spin_mutex::scoped_lock lock(my_mutex);
2783  if ( check_conditions() )
2784  ++my_tries;
2785  else
2786  return NULL;
2787  }
2788 
2789  //SUCCESS
2790  // if we can reserve and can put, we consume the reservation
2791  // we increment the count and decrement the tries
2792  if ( (my_predecessors.try_reserve(v)) == true ){
2793  reserved=true;
2794  if ( (rval = my_successors.try_put_task(v)) != NULL ){
2795  {
2796  spin_mutex::scoped_lock lock(my_mutex);
2797  ++my_count;
2798  --my_tries;
2799  my_predecessors.try_consume();
2800  if ( check_conditions() ) {
2801  if ( this->my_graph.is_active() ) {
2802  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2804  FLOW_SPAWN (*rtask);
2805  }
2806  }
2807  }
2808  return rval;
2809  }
2810  }
2811  //FAILURE
2812  //if we can't reserve, we decrement the tries
2813  //if we can reserve but can't put, we decrement the tries and release the reservation
2814  {
2815  spin_mutex::scoped_lock lock(my_mutex);
2816  --my_tries;
2817  if (reserved) my_predecessors.try_release();
2818  if ( check_conditions() ) {
2819  if ( this->my_graph.is_active() ) {
2820  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2822  __TBB_ASSERT(!rval, "Have two tasks to handle");
2823  return rtask;
2824  }
2825  }
2826  return rval;
2827  }
2828  }
2829 
2830  void forward() {
2831  __TBB_ASSERT(false, "Should never be called");
2832  return;
2833  }
2834 
2835  task * decrement_counter() {
2836  {
2837  spin_mutex::scoped_lock lock(my_mutex);
2838  if(my_count) --my_count;
2839  }
2840  return forward_task();
2841  }
2842 
2843 public:
2846 
2848  limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
2849  graph_node(g), my_threshold(threshold), my_count(0), my_tries(0),
2850  init_decrement_predecessors(num_decrement_predecessors),
2851  decrement(num_decrement_predecessors)
2852  {
2853  my_predecessors.set_owner(this);
2854  my_successors.set_owner(this);
2855  decrement.set_owner(this);
2856  tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2857  static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2858  static_cast<sender<output_type> *>(this) );
2859  }
2860 
2862  limiter_node( const limiter_node& src ) :
2863  graph_node(src.my_graph), receiver<T>(), sender<T>(),
2864  my_threshold(src.my_threshold), my_count(0), my_tries(0),
2865  init_decrement_predecessors(src.init_decrement_predecessors),
2866  decrement(src.init_decrement_predecessors)
2867  {
2868  my_predecessors.set_owner(this);
2869  my_successors.set_owner(this);
2870  decrement.set_owner(this);
2871  tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2872  static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2873  static_cast<sender<output_type> *>(this) );
2874  }
2875 
2876 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2877  /* override */ void set_name( const char *name ) {
2878  tbb::internal::fgt_node_desc( this, name );
2879  }
2880 #endif
2881 
2883  /* override */ bool register_successor( successor_type &r ) {
2884  spin_mutex::scoped_lock lock(my_mutex);
2885  bool was_empty = my_successors.empty();
2886  my_successors.register_successor(r);
2887  //spawn a forward task if this is the only successor
2888  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2889  if ( this->my_graph.is_active() ) {
2890  FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2892  }
2893  }
2894  return true;
2895  }
2896 
2898 
2899  /* override */ bool remove_successor( successor_type &r ) {
2900  r.remove_predecessor(*this);
2901  my_successors.remove_successor(r);
2902  return true;
2903  }
2904 
2905 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
2906  /*override*/ built_successors_type &built_successors() { return my_successors.built_successors(); }
2907  /*override*/ built_predecessors_type &built_predecessors() { return my_predecessors.built_predecessors(); }
2908 
2909  /*override*/void internal_add_built_successor(successor_type &src) {
2910  my_successors.internal_add_built_successor(src);
2911  }
2912 
2913  /*override*/void internal_delete_built_successor(successor_type &src) {
2914  my_successors.internal_delete_built_successor(src);
2915  }
2916 
2917  /*override*/size_t successor_count() { return my_successors.successor_count(); }
2918 
2919  /*override*/ void copy_successors(successor_list_type &v) {
2920  my_successors.copy_successors(v);
2921  }
2922 
2923  /*override*/void internal_add_built_predecessor(predecessor_type &src) {
2924  my_predecessors.internal_add_built_predecessor(src);
2925  }
2926 
2927  /*override*/void internal_delete_built_predecessor(predecessor_type &src) {
2928  my_predecessors.internal_delete_built_predecessor(src);
2929  }
2930 
2931  /*override*/size_t predecessor_count() { return my_predecessors.predecessor_count(); }
2932 
2933  /*override*/ void copy_predecessors(predecessor_list_type &v) {
2934  my_predecessors.copy_predecessors(v);
2935  }
2936 
2937  /*override*/void extract() {
2938  my_count = 0;
2939  my_successors.built_successors().sender_extract(*this);
2940  my_predecessors.built_predecessors().receiver_extract(*this);
2941  decrement.built_predecessors().receiver_extract(decrement);
2942  }
2943 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
2944 
2946  /* override */ bool register_predecessor( predecessor_type &src ) {
2947  spin_mutex::scoped_lock lock(my_mutex);
2948  my_predecessors.add( src );
2949  if ( my_count + my_tries < my_threshold && !my_successors.empty() && this->my_graph.is_active() ) {
2950  FLOW_SPAWN( (* new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2952  }
2953  return true;
2954  }
2955 
2957  /* override */ bool remove_predecessor( predecessor_type &src ) {
2958  my_predecessors.remove( src );
2959  return true;
2960  }
2961 
2962 protected:
2963 
2964  template< typename R, typename B > friend class run_and_put_task;
2965  template<typename X, typename Y> friend class internal::broadcast_cache;
2966  template<typename X, typename Y> friend class internal::round_robin_cache;
2968  /* override */ task *try_put_task( const T &t ) {
2969  {
2970  spin_mutex::scoped_lock lock(my_mutex);
2971  if ( my_count + my_tries >= my_threshold )
2972  return NULL;
2973  else
2974  ++my_tries;
2975  }
2976 
2977  task * rtask = my_successors.try_put_task(t);
2978 
2979  if ( !rtask ) { // try_put_task failed.
2980  spin_mutex::scoped_lock lock(my_mutex);
2981  --my_tries;
2982  if ( check_conditions() && this->my_graph.is_active() ) {
2983  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2985  }
2986  }
2987  else {
2988  spin_mutex::scoped_lock lock(my_mutex);
2989  ++my_count;
2990  --my_tries;
2991  }
2992  return rtask;
2993  }
2994 
2995  /*override*/void reset_receiver(reset_flags /*f*/) {
2996  __TBB_ASSERT(false,NULL); // should never be called
2997  }
2998 
2999  /*override*/void reset_node( reset_flags f) {
3000  my_count = 0;
3001  if(f & rf_clear_edges) {
3002  my_predecessors.clear();
3003  my_successors.clear();
3004  }
3005  else
3006  {
3007  my_predecessors.reset( );
3008  }
3009  decrement.reset_receiver(f);
3010  }
3011 }; // limiter_node
3012 
3013 #include "internal/_flow_graph_join_impl.h"
3014 
3018 using internal::input_port;
3019 using internal::tag_value;
3020 
3021 template<typename OutputTuple, typename JP=queueing> class join_node;
3022 
3023 template<typename OutputTuple>
3024 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
3025 private:
3026  static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3028 public:
3029  typedef OutputTuple output_type;
3030  typedef typename unfolded_type::input_ports_type input_ports_type;
3031  join_node(graph &g) : unfolded_type(g) {
3032  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3033  this->input_ports(), static_cast< sender< output_type > *>(this) );
3034  }
3035  join_node(const join_node &other) : unfolded_type(other) {
3036  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3037  this->input_ports(), static_cast< sender< output_type > *>(this) );
3038  }
3039 
3040 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3041  /* override */ void set_name( const char *name ) {
3042  tbb::internal::fgt_node_desc( this, name );
3043  }
3044 #endif
3045 
3046 };
3047 
3048 template<typename OutputTuple>
3049 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
3050 private:
3051  static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3053 public:
3054  typedef OutputTuple output_type;
3055  typedef typename unfolded_type::input_ports_type input_ports_type;
3056  join_node(graph &g) : unfolded_type(g) {
3057  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3058  this->input_ports(), static_cast< sender< output_type > *>(this) );
3059  }
3060  join_node(const join_node &other) : unfolded_type(other) {
3061  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3062  this->input_ports(), static_cast< sender< output_type > *>(this) );
3063  }
3064 
3065 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3066  /* override */ void set_name( const char *name ) {
3067  tbb::internal::fgt_node_desc( this, name );
3068  }
3069 #endif
3070 
3071 };
3072 
3073 // template for key_matching join_node
3074 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
3075 template<typename OutputTuple, typename K, typename KHash>
3076 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
3077  key_matching_port, OutputTuple, key_matching<K,KHash> > {
3078 private:
3079  static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3081 public:
3082  typedef OutputTuple output_type;
3083  typedef typename unfolded_type::input_ports_type input_ports_type;
3084 
3085 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
3086  join_node(graph &g) : unfolded_type(g) {}
3087 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
3088 
3089  template<typename __TBB_B0, typename __TBB_B1>
3090  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
3091  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3092  this->input_ports(), static_cast< sender< output_type > *>(this) );
3093  }
3094  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3095  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3096  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3097  this->input_ports(), static_cast< sender< output_type > *>(this) );
3098  }
3099  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3100  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3101  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3102  this->input_ports(), static_cast< sender< output_type > *>(this) );
3103  }
3104  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3105  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3106  unfolded_type(g, b0, b1, b2, b3, b4) {
3107  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3108  this->input_ports(), static_cast< sender< output_type > *>(this) );
3109  }
3110 #if __TBB_VARIADIC_MAX >= 6
3111  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3112  typename __TBB_B5>
3113  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3114  unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3115  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3116  this->input_ports(), static_cast< sender< output_type > *>(this) );
3117  }
3118 #endif
3119 #if __TBB_VARIADIC_MAX >= 7
3120  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3121  typename __TBB_B5, typename __TBB_B6>
3122  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3123  unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3124  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3125  this->input_ports(), static_cast< sender< output_type > *>(this) );
3126  }
3127 #endif
3128 #if __TBB_VARIADIC_MAX >= 8
3129  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3130  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3131  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3132  __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3133  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3134  this->input_ports(), static_cast< sender< output_type > *>(this) );
3135  }
3136 #endif
3137 #if __TBB_VARIADIC_MAX >= 9
3138  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3139  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3140  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3141  __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3142  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3143  this->input_ports(), static_cast< sender< output_type > *>(this) );
3144  }
3145 #endif
3146 #if __TBB_VARIADIC_MAX >= 10
3147  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3148  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3149  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3150  __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3151  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3152  this->input_ports(), static_cast< sender< output_type > *>(this) );
3153  }
3154 #endif
3155  join_node(const join_node &other) : unfolded_type(other) {
3156  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3157  this->input_ports(), static_cast< sender< output_type > *>(this) );
3158  }
3159 
3160 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3161  /* override */ void set_name( const char *name ) {
3162  tbb::internal::fgt_node_desc( this, name );
3163  }
3164 #endif
3165 
3166 };
3167 
3168 // indexer node
3169 #include "internal/_flow_graph_indexer_impl.h"
3170 
3171 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3172  typename T4=null_type, typename T5=null_type, typename T6=null_type,
3173  typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3174 
3175 //indexer node specializations
3176 template<typename T0>
3177 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3178 private:
3179  static const int N = 1;
3180 public:
3181  typedef tuple<T0> InputTuple;
3182  typedef typename internal::tagged_msg<size_t, T0> output_type;
3183  typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3184  indexer_node(graph& g) : unfolded_type(g) {
3185  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3186  this->input_ports(), static_cast< sender< output_type > *>(this) );
3187  }
3188  // Copy constructor
3189  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3190  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3191  this->input_ports(), static_cast< sender< output_type > *>(this) );
3192  }
3193 
3194 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3195  void set_name( const char *name ) {
3196  tbb::internal::fgt_node_desc( this, name );
3197  }
3198 #endif
3199 };
3200 
3201 template<typename T0, typename T1>
3202 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3203 private:
3204  static const int N = 2;
3205 public:
3206  typedef tuple<T0, T1> InputTuple;
3207  typedef typename internal::tagged_msg<size_t, T0, T1> output_type;
3208  typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3209  indexer_node(graph& g) : unfolded_type(g) {
3210  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3211  this->input_ports(), static_cast< sender< output_type > *>(this) );
3212  }
3213  // Copy constructor
3214  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3215  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3216  this->input_ports(), static_cast< sender< output_type > *>(this) );
3217  }
3218 
3219 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3220  void set_name( const char *name ) {
3221  tbb::internal::fgt_node_desc( this, name );
3222  }
3223 #endif
3224 };
3225 
3226 template<typename T0, typename T1, typename T2>
3227 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3228 private:
3229  static const int N = 3;
3230 public:
3231  typedef tuple<T0, T1, T2> InputTuple;
3232  typedef typename internal::tagged_msg<size_t, T0, T1, T2> output_type;
3233  typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3234  indexer_node(graph& g) : unfolded_type(g) {
3235  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3236  this->input_ports(), static_cast< sender< output_type > *>(this) );
3237  }
3238  // Copy constructor
3239  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3240  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3241  this->input_ports(), static_cast< sender< output_type > *>(this) );
3242  }
3243 
3244 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3245  void set_name( const char *name ) {
3246  tbb::internal::fgt_node_desc( this, name );
3247  }
3248 #endif
3249 };
3250 
3251 template<typename T0, typename T1, typename T2, typename T3>
3252 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3253 private:
3254  static const int N = 4;
3255 public:
3256  typedef tuple<T0, T1, T2, T3> InputTuple;
3257  typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3> output_type;
3258  typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3259  indexer_node(graph& g) : unfolded_type(g) {
3260  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3261  this->input_ports(), static_cast< sender< output_type > *>(this) );
3262  }
3263  // Copy constructor
3264  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3265  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3266  this->input_ports(), static_cast< sender< output_type > *>(this) );
3267  }
3268 
3269 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3270  /* override */ void set_name( const char *name ) {
3271  tbb::internal::fgt_node_desc( this, name );
3272  }
3273 #endif
3274 };
3275 
3276 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3277 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3278 private:
3279  static const int N = 5;
3280 public:
3281  typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3282  typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
3283  typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3284  indexer_node(graph& g) : unfolded_type(g) {
3285  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3286  this->input_ports(), static_cast< sender< output_type > *>(this) );
3287  }
3288  // Copy constructor
3289  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3290  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3291  this->input_ports(), static_cast< sender< output_type > *>(this) );
3292  }
3293 
3294 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3295  /* override */ void set_name( const char *name ) {
3296  tbb::internal::fgt_node_desc( this, name );
3297  }
3298 #endif
3299 };
3300 
3301 #if __TBB_VARIADIC_MAX >= 6
3302 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3303 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3304 private:
3305  static const int N = 6;
3306 public:
3307  typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3308  typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
3309  typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3310  indexer_node(graph& g) : unfolded_type(g) {
3311  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3312  this->input_ports(), static_cast< sender< output_type > *>(this) );
3313  }
3314  // Copy constructor
3315  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3316  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3317  this->input_ports(), static_cast< sender< output_type > *>(this) );
3318  }
3319 
3320 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3321  /* override */ void set_name( const char *name ) {
3322  tbb::internal::fgt_node_desc( this, name );
3323  }
3324 #endif
3325 };
3326 #endif //variadic max 6
3327 
3328 #if __TBB_VARIADIC_MAX >= 7
3329 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3330  typename T6>
3331 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3332 private:
3333  static const int N = 7;
3334 public:
3335  typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3336  typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
3337  typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3338  indexer_node(graph& g) : unfolded_type(g) {
3339  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3340  this->input_ports(), static_cast< sender< output_type > *>(this) );
3341  }
3342  // Copy constructor
3343  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3344  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3345  this->input_ports(), static_cast< sender< output_type > *>(this) );
3346  }
3347 
3348 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3349  /* override */ void set_name( const char *name ) {
3350  tbb::internal::fgt_node_desc( this, name );
3351  }
3352 #endif
3353 };
3354 #endif //variadic max 7
3355 
3356 #if __TBB_VARIADIC_MAX >= 8
3357 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3358  typename T6, typename T7>
3359 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3360 private:
3361  static const int N = 8;
3362 public:
3363  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3365  typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3366  indexer_node(graph& g) : unfolded_type(g) {
3367  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3368  this->input_ports(), static_cast< sender< output_type > *>(this) );
3369  }
3370  // Copy constructor
3371  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3372  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3373  this->input_ports(), static_cast< sender< output_type > *>(this) );
3374  }
3375 
3376 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3377  /* override */ void set_name( const char *name ) {
3378  tbb::internal::fgt_node_desc( this, name );
3379  }
3380 #endif
3381 };
3382 #endif //variadic max 8
3383 
3384 #if __TBB_VARIADIC_MAX >= 9
3385 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3386  typename T6, typename T7, typename T8>
3387 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3388 private:
3389  static const int N = 9;
3390 public:
3391  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3393  typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3394  indexer_node(graph& g) : unfolded_type(g) {
3395  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3396  this->input_ports(), static_cast< sender< output_type > *>(this) );
3397  }
3398  // Copy constructor
3399  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3400  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3401  this->input_ports(), static_cast< sender< output_type > *>(this) );
3402  }
3403 
3404 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3405  /* override */ void set_name( const char *name ) {
3406  tbb::internal::fgt_node_desc( this, name );
3407  }
3408 #endif
3409 };
3410 #endif //variadic max 9
3411 
3412 #if __TBB_VARIADIC_MAX >= 10
3413 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3414  typename T6, typename T7, typename T8, typename T9>
3415 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3416 private:
3417  static const int N = 10;
3418 public:
3419  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3421  typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3422  indexer_node(graph& g) : unfolded_type(g) {
3423  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3424  this->input_ports(), static_cast< sender< output_type > *>(this) );
3425  }
3426  // Copy constructor
3427  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3428  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3429  this->input_ports(), static_cast< sender< output_type > *>(this) );
3430  }
3431 
3432 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3433  /* override */ void set_name( const char *name ) {
3434  tbb::internal::fgt_node_desc( this, name );
3435  }
3436 #endif
3437 };
3438 #endif //variadic max 10
3439 
3440 #if __TBB_PREVIEW_ASYNC_MSG
3441 inline void internal_make_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3442 #else
3443 template< typename T >
3444 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3445 #endif
3446 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3447  s.internal_add_built_predecessor(p);
3448  p.internal_add_built_successor(s);
3449 #endif
3450  p.register_successor( s );
3451  tbb::internal::fgt_make_edge( &p, &s );
3452 }
3453 
3455 template< typename T >
3456 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3457  internal_make_edge( p, s );
3458 }
3459 
3460 #if __TBB_PREVIEW_ASYNC_MSG
3461 template< typename TS, typename TR,
3464 inline void make_edge( TS &p, TR &s ) {
3465  internal_make_edge( p, s );
3466 }
3467 
3468 template< typename T >
3469 inline void make_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
3470  internal_make_edge( p, s );
3471 }
3472 
3473 template< typename T >
3474 inline void make_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
3475  internal_make_edge( p, s );
3476 }
3477 
3478 #endif // __TBB_PREVIEW_ASYNC_MSG
3479 
3480 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3481 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3482 template< typename T, typename V,
3483  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3484 inline void make_edge( T& output, V& input) {
3485  make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3486 }
3487 
3488 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3489 template< typename T, typename R,
3490  typename = typename T::output_ports_type >
3491 inline void make_edge( T& output, receiver<R>& input) {
3492  make_edge(get<0>(output.output_ports()), input);
3493 }
3494 
3495 //Makes an edge from a sender to port 0 of a multi-input successor.
3496 template< typename S, typename V,
3497  typename = typename V::input_ports_type >
3498 inline void make_edge( sender<S>& output, V& input) {
3499  make_edge(output, get<0>(input.input_ports()));
3500 }
3501 #endif
3502 
3503 #if __TBB_PREVIEW_ASYNC_MSG
3504 inline void internal_remove_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3505 #else
3506 template< typename T >
3507 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3508 #endif
3509  p.remove_successor( s );
3510 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3511  // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3512  p.internal_delete_built_successor(s);
3513  s.internal_delete_built_predecessor(p);
3514 #endif
3515  tbb::internal::fgt_remove_edge( &p, &s );
3516 }
3517 
3519 template< typename T >
3520 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3521  internal_remove_edge( p, s );
3522 }
3523 
3524 #if __TBB_PREVIEW_ASYNC_MSG
3525 template< typename TS, typename TR,
3528 inline void remove_edge( TS &p, TR &s ) {
3529  internal_remove_edge( p, s );
3530 }
3531 
3532 template< typename T >
3533 inline void remove_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
3534  internal_remove_edge( p, s );
3535 }
3536 
3537 template< typename T >
3538 inline void remove_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
3539  internal_remove_edge( p, s );
3540 }
3541 #endif // __TBB_PREVIEW_ASYNC_MSG
3542 
3543 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3544 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3545 template< typename T, typename V,
3546  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3547 inline void remove_edge( T& output, V& input) {
3548  remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3549 }
3550 
3551 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3552 template< typename T, typename R,
3553  typename = typename T::output_ports_type >
3554 inline void remove_edge( T& output, receiver<R>& input) {
3555  remove_edge(get<0>(output.output_ports()), input);
3556 }
3557 //Removes an edge between a sender and port 0 of a multi-input successor.
3558 template< typename S, typename V,
3559  typename = typename V::input_ports_type >
3560 inline void remove_edge( sender<S>& output, V& input) {
3561  remove_edge(output, get<0>(input.input_ports()));
3562 }
3563 #endif
3564 
3565 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3566 template<typename C >
3567 template< typename S >
3568 void internal::edge_container<C>::sender_extract( S &s ) {
3569  edge_list_type e = built_edges;
3570  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3571  remove_edge(s, **i);
3572  }
3573 }
3574 
3575 template<typename C >
3576 template< typename R >
3577 void internal::edge_container<C>::receiver_extract( R &r ) {
3578  edge_list_type e = built_edges;
3579  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3580  remove_edge(**i, r);
3581  }
3582 }
3583 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
3584 
3586 template< typename Body, typename Node >
3587 Body copy_body( Node &n ) {
3588  return n.template copy_function_object<Body>();
3589 }
3590 
3591 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3592 
3593 //composite_node
3594 template< typename InputTuple, typename OutputTuple > class composite_node;
3595 
3596 template< typename... InputTypes, typename... OutputTypes>
3597 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3598 
3599 public:
3600  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3601  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3602 
3603 private:
3604 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3605  const char *my_type_name;
3606 #endif
3607  input_ports_type *my_input_ports;
3608  output_ports_type *my_output_ports;
3609 
3610  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3611  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3612 
3613 protected:
3614  /*override*/void reset_node(reset_flags) {}
3615 
3616 public:
3617 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3618  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g), my_type_name(type_name), my_input_ports(NULL), my_output_ports(NULL) {
3619  tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW, this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::internal::FLOW_COMPOSITE_NODE );
3620  tbb::internal::fgt_multiinput_multioutput_node_desc( this, my_type_name );
3621  }
3622 #else
3623  composite_node( graph &g) : graph_node(g), my_input_ports(NULL), my_output_ports(NULL) {}
3624 #endif
3625 
3626  template<typename T1, typename T2>
3627  void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
3628  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3629  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3630  my_input_ports = new input_ports_type(std::forward<T1>(input_ports_tuple));
3631  my_output_ports = new output_ports_type(std::forward<T2>(output_ports_tuple));
3632 
3633 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3634  tbb::internal::fgt_internal_input_helper<T1, NUM_INPUTS>::register_port( this, input_ports_tuple);
3635  tbb::internal::fgt_internal_output_helper<T2, NUM_OUTPUTS>::register_port( this, output_ports_tuple);
3636 #endif
3637  }
3638 
3639 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3640  template< typename... NodeTypes >
3641  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3642 
3643  template< typename... NodeTypes >
3644  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3645 #else
3646  template<typename... Nodes> void add_nodes(Nodes&...) { }
3647  template<typename... Nodes> void add_visible_nodes(Nodes&...) { }
3648 #endif
3649 
3650 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3651  /* override */ void set_name( const char *name ) {
3652  tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
3653  }
3654 #endif
3655 
3656  input_ports_type input_ports() {
3657  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3658  return *my_input_ports;
3659  }
3660 
3661  output_ports_type output_ports() {
3662  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3663  return *my_output_ports;
3664  }
3665 
3666  virtual ~composite_node() {
3667  if(my_input_ports) delete my_input_ports;
3668  if(my_output_ports) delete my_output_ports;
3669  }
3670 
3671 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3672  /*override*/void extract() {
3673  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3674  }
3675 #endif
3676 }; // class composite_node
3677 
3678 //composite_node with only input ports
3679 template< typename... InputTypes>
3680 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
3681 public:
3682  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3683 
3684 private:
3685 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3686  const char *my_type_name;
3687 #endif
3688  input_ports_type *my_input_ports;
3689  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3690 
3691 protected:
3692  /*override*/void reset_node(reset_flags) {}
3693 
3694 public:
3695 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3696  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g), my_type_name(type_name), my_input_ports(NULL) {
3697  tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW, this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::internal::FLOW_COMPOSITE_NODE );
3698  tbb::internal::fgt_multiinput_multioutput_node_desc( this, my_type_name );
3699  }
3700 #else
3701  composite_node( graph &g) : graph_node(g), my_input_ports(NULL) {}
3702 #endif
3703 
3704  template<typename T>
3705  void set_external_ports(T&& input_ports_tuple) {
3706  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3707 
3708  my_input_ports = new input_ports_type(std::forward<T>(input_ports_tuple));
3709 
3710 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3711  tbb::internal::fgt_internal_input_helper<T, NUM_INPUTS>::register_port( this, std::forward<T>(input_ports_tuple));
3712 #endif
3713  }
3714 
3715 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3716  template< typename... NodeTypes >
3717  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3718 
3719  template< typename... NodeTypes >
3720  void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3721 #else
3722  template<typename... Nodes> void add_nodes(Nodes&...) {}
3723  template<typename... Nodes> void add_visible_nodes(Nodes&...) {}
3724 #endif
3725 
3726 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3727  /* override */ void set_name( const char *name ) {
3728  tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
3729  }
3730 #endif
3731 
3732  input_ports_type input_ports() {
3733  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3734  return *my_input_ports;
3735  }
3736 
3737  virtual ~composite_node() {
3738  if(my_input_ports) delete my_input_ports;
3739  }
3740 
3741 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3742  /*override*/void extract() {
3743  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3744  }
3745 #endif
3746 
3747 }; // class composite_node
3748 
3749 //composite_nodes with only output_ports
3750 template<typename... OutputTypes>
3751 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
3752 public:
3753  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3754 
3755 private:
3756 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3757  const char *my_type_name;
3758 #endif
3759  output_ports_type *my_output_ports;
3760  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3761 
3762 protected:
3763  /*override*/void reset_node(reset_flags) {}
3764 
3765 public:
3766 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3767  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g), my_type_name(type_name), my_output_ports(NULL) {
3768  tbb::internal::itt_make_task_group( tbb::internal::ITT_DOMAIN_FLOW, this, tbb::internal::FLOW_NODE, &g, tbb::internal::FLOW_GRAPH, tbb::internal::FLOW_COMPOSITE_NODE );
3769  tbb::internal::fgt_multiinput_multioutput_node_desc( this, my_type_name );
3770  }
3771 #else
3772  composite_node( graph &g) : graph_node(g), my_output_ports(NULL) {}
3773 #endif
3774 
3775  template<typename T>
3776  void set_external_ports(T&& output_ports_tuple) {
3777  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3778 
3779  my_output_ports = new output_ports_type(std::forward<T>(output_ports_tuple));
3780 
3781 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3782  tbb::internal::fgt_internal_output_helper<T, NUM_OUTPUTS>::register_port( this, std::forward<T>(output_ports_tuple));
3783 #endif
3784  }
3785 
3786 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3787  template<typename... NodeTypes >
3788  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3789 
3790  template<typename... NodeTypes >
3791  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3792 #else
3793  template<typename... Nodes> void add_nodes(Nodes&...) {}
3794  template<typename... Nodes> void add_visible_nodes(Nodes&...) {}
3795 #endif
3796 
3797 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3798  /* override */ void set_name( const char *name ) {
3799  tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
3800  }
3801 #endif
3802 
3803  output_ports_type output_ports() {
3804  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3805  return *my_output_ports;
3806  }
3807 
3808  virtual ~composite_node() {
3809  if(my_output_ports) delete my_output_ports;
3810  }
3811 
3812 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3813  /*override*/void extract() {
3814  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3815  }
3816 #endif
3817 
3818 }; // class composite_node
3819 
3820 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
3821 
3822 #if __TBB_PREVIEW_ASYNC_NODE
3823 namespace internal {
3825 template < typename Output >
3826 class async_gateway {
3827 public:
3828  typedef Output output_type;
3829 
3831  virtual bool async_try_put( const output_type &i ) = 0;
3832 
3834  virtual void async_reserve() = 0;
3835 
3837  virtual void async_commit() = 0;
3838 
3839  virtual ~async_gateway() {}
3840 };
3841 
3842 template<typename Input, typename Ports, typename AsyncGateway, typename Body>
3843 class async_body {
3844 public:
3845  typedef AsyncGateway async_gateway_type;
3846 
3847  async_body(const Body &body, async_gateway_type *gateway) : my_body(body), my_async_gateway(gateway) { }
3848 
3849  async_body(const async_body &other) : my_body(other.my_body), my_async_gateway(other.my_async_gateway) { }
3850 
3851  void operator()( const Input &v, Ports & ) {
3852  my_body(v, *my_async_gateway);
3853  }
3854 
3855  Body get_body() { return my_body; }
3856 
3857  void set_async_gateway(async_gateway_type *gateway) {
3858  my_async_gateway = gateway;
3859  }
3860 
3861 private:
3862  Body my_body;
3863  async_gateway_type *my_async_gateway;
3864 };
3865 
3866 }
3867 
3869 template < typename Input, typename Output, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
3870 class async_node : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public internal::async_gateway<Output>, public sender< Output > {
3871 protected:
3872  typedef multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type;
3873 
3874 public:
3875  typedef Input input_type;
3876  typedef Output output_type;
3877  typedef typename receiver<input_type>::predecessor_type predecessor_type;
3878  typedef typename sender<output_type>::successor_type successor_type;
3879  typedef internal::async_gateway< output_type > async_gateway_type;
3880 
3881 protected:
3882  typedef typename internal::multifunction_input<Input, typename base_type::output_ports_type, Allocator> mfn_input_type;
3883 
3884  struct try_put_functor {
3885  typedef internal::multifunction_output<Output> output_port_type;
3886  output_port_type *port;
3887  const Output *value;
3888  bool result;
3889  try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
3890  void operator()() {
3891  result = port->try_put(*value);
3892  }
3893  };
3894 
3895 public:
3896  template<typename Body>
3897  async_node( graph &g, size_t concurrency, Body body ) :
3898  base_type( g, concurrency, internal::async_body<Input, typename base_type::output_ports_type, async_gateway_type, Body>(body, this) ) {
3899  tbb::internal::fgt_multioutput_node<1>( tbb::internal::FLOW_ASYNC_NODE,
3900  &this->graph_node::my_graph,
3901  static_cast<receiver<input_type> *>(this),
3902  this->output_ports() );
3903  }
3904 
3905  async_node( const async_node &other ) : base_type(other) {
3907  mfn_body_type &body_ref = *this->my_body;
3908  body_ref.set_gateway(static_cast<async_gateway_type *>(this));
3909  mfn_body_type &init_body_ref = *this->my_init_body;
3910  init_body_ref.set_gateway(static_cast<async_gateway_type *>(this));
3911  tbb::internal::fgt_multioutput_node<1>( tbb::internal::FLOW_ASYNC_NODE, &this->graph_node::my_graph, static_cast<receiver<input_type> *>(this), this->output_ports() );
3912  }
3913 
3914  virtual ~async_node() {}
3915 
3916  /* override */ async_gateway_type& async_gateway() {
3917  return static_cast< async_gateway_type& >(*this);
3918  }
3919 
3921  /*override*/ bool async_try_put(const output_type &i ) {
3922  internal::multifunction_output<output_type> &port_0 = internal::output_port<0>(*this);
3923  graph &g = this->graph_node::my_graph;
3924  tbb::internal::fgt_async_try_put_begin(static_cast<receiver<input_type> *>(this), &port_0);
3925  __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), NULL);
3926  try_put_functor tpf(port_0, i);
3927  g.my_task_arena->execute(tpf);
3928  tbb::internal::fgt_async_try_put_end(static_cast<receiver<input_type> *>(this), &port_0);
3929  return tpf.result;
3930  }
3931 
3932  /*override*/ void async_reserve() {
3933  this->graph_node::my_graph.increment_wait_count();
3934  tbb::internal::fgt_async_reserve(static_cast<receiver<input_type> *>(this), &this->graph_node::my_graph);
3935  }
3936 
3937  /*override*/ void async_commit() {
3938  this->graph_node::my_graph.decrement_wait_count();
3939  tbb::internal::fgt_async_commit(static_cast<receiver<input_type> *>(this), &this->graph_node::my_graph);
3940  }
3941 
3942 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3943  /* override */ void set_name( const char *name ) {
3944  tbb::internal::fgt_node_desc( this, name );
3945  }
3946 #endif
3947 
3948  // Define sender< Output >
3949 
3951  /* override */ bool register_successor( successor_type &r ) {
3952  return internal::output_port<0>(*this).register_successor(r);
3953  }
3954 
3956  /* override */ bool remove_successor( successor_type &r ) {
3957  return internal::output_port<0>(*this).remove_successor(r);
3958  }
3959 
3960  template<typename Body>
3961  Body copy_function_object() {
3963  typedef internal::async_body<Input, typename base_type::output_ports_type, async_gateway_type, Body> async_body_type;
3964  mfn_body_type &body_ref = *this->my_body;
3965  async_body_type ab = dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body();
3966  return ab.get_body();
3967  }
3968 
3969 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
3970  typedef typename internal::edge_container<successor_type> built_successors_type;
3972  typedef typename built_successors_type::edge_list_type successor_list_type;
3973  /* override */ built_successors_type &built_successors() {
3974  return internal::output_port<0>(*this).built_successors();
3975  }
3976 
3977  /* override */ void internal_add_built_successor( successor_type &r ) {
3978  internal::output_port<0>(*this).internal_add_built_successor(r);
3979  }
3980 
3981  /* override */ void internal_delete_built_successor( successor_type &r ) {
3982  internal::output_port<0>(*this).internal_delete_built_successor(r);
3983  }
3984 
3985  /* override */ void copy_successors( successor_list_type &l ) {
3986  internal::output_port<0>(*this).copy_successors(l);
3987  }
3988 
3989  /* override */ size_t successor_count() {
3990  return internal::output_port<0>(*this).successor_count();
3991  }
3992 #endif
3993 
3994 protected:
3995 
3996  /*override*/ void reset_node( reset_flags f) {
3997  base_type::reset_node(f);
3998  }
3999 
4000 };
4001 
4002 #endif // __TBB_PREVIEW_ASYNC_NODE
4003 
4004 } // interface8
4005 
4006  using interface8::reset_flags;
4007  using interface8::rf_reset_protocol;
4008  using interface8::rf_reset_bodies;
4009  using interface8::rf_clear_edges;
4010 
4011  using interface8::graph;
4012  using interface8::graph_node;
4014 
4015  using interface8::source_node;
4016  using interface8::function_node;
4017  using interface8::multifunction_node;
4018  using interface8::split_node;
4019  using interface8::internal::output_port;
4020  using interface8::indexer_node;
4021  using interface8::internal::tagged_msg;
4022  using interface8::internal::cast_to;
4023  using interface8::internal::is_a;
4024  using interface8::continue_node;
4025  using interface8::overwrite_node;
4026  using interface8::write_once_node;
4027  using interface8::broadcast_node;
4028  using interface8::buffer_node;
4029  using interface8::queue_node;
4030  using interface8::sequencer_node;
4031  using interface8::priority_queue_node;
4033  using namespace interface8::internal::graph_policy_namespace;
4034  using interface8::join_node;
4035  using interface8::input_port;
4036  using interface8::copy_body;
4037  using interface8::make_edge;
4038  using interface8::remove_edge;
4039  using interface8::internal::tag_value;
4040 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
4041  using interface8::composite_node;
4042 #endif
4043 #if __TBB_PREVIEW_ASYNC_NODE
4044  using interface8::async_node;
4045 #endif
4046 #if __TBB_PREVIEW_ASYNC_MSG
4047  using interface8::async_msg;
4048 #endif
4049 } // flow
4050 } // tbb
4051 
4052 #undef __TBB_PFG_RESET_ARG
4053 #undef __TBB_COMMA
4054 
4055 #endif // __TBB_flow_graph_H
Enables one or the other code branches.
Definition: _template_helpers.h:29
bool remove_predecessor(predecessor_type &)
Decrements the trigger threshold.
Definition: flow_graph.h:515
Definition: flow_graph.h:97
bool register_predecessor(predecessor_type &)
Increments the trigger threshold.
Definition: flow_graph.h:505
Definition: _aggregator_impl.h:160
bool try_put(const T &t)
Put an item to the receiver.
Definition: flow_graph.h:415
Detects whether two given types are the same.
Definition: _template_helpers.h:56
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:387
continue_msg input_type
The input type.
Definition: flow_graph.h:484
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
Definition: _flow_graph_join_impl.h:1497
The leaf for source_body.
Definition: _flow_graph_impl.h:69
item_buffer with reservable front-end. NOTE: if reserving, do not
Definition: _flow_graph_item_buffer_impl.h:244
Definition: flow_graph.h:107
Definition: _flow_graph_async_msg_impl.h:115
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:381
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial...
Definition: flow_graph.h:84
aggregated_operation base class
Definition: _aggregator_impl.h:37
Definition: _aggregator_impl.h:144
An empty class used for messages that mean "I&#39;m done".
Definition: flow_graph.h:100
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:435
A task that calls a node&#39;s apply_body_bypass function with no input.
Definition: _flow_graph_impl.h:291
Definition: _flow_graph_async_msg_impl.h:35
sender< T > predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:409
continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:496
A task that calls a node&#39;s forward_task function.
Definition: _flow_graph_impl.h:255
virtual ~continue_receiver()
Destructor.
Definition: flow_graph.h:502
virtual ~receiver()
Destructor.
Definition: flow_graph.h:412
Definition: _flow_graph_async_msg_impl.h:30
Definition: _flow_graph_indexer_impl.h:469
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:378
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
T output_type
The output type of this sender.
Definition: flow_graph.h:362
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
Definition: _flow_graph_types_impl.h:602
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:103
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
The two-phase join port.
Definition: _flow_graph_join_impl.h:213
A functor that takes no input and generates a value of type Output.
Definition: _flow_graph_impl.h:60
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:384
receiver< T > successor_type
The successor type for this node.
Definition: flow_graph.h:365
Definition: _flow_graph_async_msg_impl.h:32
A lock that occupies a single byte.
Definition: spin_mutex.h:40
T input_type
The input type of this receiver.
Definition: flow_graph.h:406
A functor that takes an Input and generates an Output.
Definition: _flow_graph_impl.h:83
Definition: _flow_graph_impl.h:841
queueing join_port
Definition: _flow_graph_join_impl.h:427
Base class for receivers of completion messages.
Definition: flow_graph.h:480
Definition: _flow_graph_impl.h:36
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44
1-to-1 proxy representation class of scheduler&#39;s arena Constructors set up settings only...
Definition: task_arena.h:120
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:432
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: _flow_graph_impl.h:177
receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:487
Definition: _flow_graph_join_impl.h:648
Tag class used to indicate the "attaching" constructor.
Definition: task_arena.h:156
Pure virtual template class that defines a sender of messages of type T.
Definition: flow_graph.h:102
function_body that takes an Input and a set of output ports
Definition: _flow_graph_impl.h:165
continue_receiver(int number_of_predecessors=0)
Constructor.
Definition: flow_graph.h:490
Definition: flow_graph.h:106
Definition: _flow_graph_types_impl.h:53
the leaf for function_body
Definition: _flow_graph_impl.h:92