BRE12
_flow_graph_impl.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB__flow_graph_impl_H
22 #define __TBB__flow_graph_impl_H
23 
24 #ifndef __TBB_flow_graph_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
28 // included in namespace tbb::flow::interfaceX (in flow_graph.h)
29 
30 namespace internal {
31 
32  typedef tbb::internal::uint64_t tag_value;
33 
35 
36  namespace graph_policy_namespace {
37 
38  struct rejecting { };
39  struct reserving { };
40  struct queueing { };
41 
42  // K == type of field used for key-matching. Each tag-matching port will be provided
43  // functor that, given an object accepted by the port, will return the
45  template<typename K, typename KHash=tbb_hash_compare<typename strip<K>::type > >
46  struct key_matching {
47  typedef K key_type;
48  typedef typename strip<K>::type base_key_type;
49  typedef KHash hash_compare_type;
50  };
51 
52  // old tag_matching join's new specifier
54  }
55 
56 // -------------- function_body containers ----------------------
57 
59  template< typename Output >
60  class source_body : tbb::internal::no_assign {
61  public:
62  virtual ~source_body() {}
63  virtual bool operator()(Output &output) = 0;
64  virtual source_body* clone() = 0;
65  };
66 
68  template< typename Output, typename Body>
69  class source_body_leaf : public source_body<Output> {
70  public:
71  source_body_leaf( const Body &_body ) : body(_body) { }
72  /*override*/ bool operator()(Output &output) { return body( output ); }
73  /*override*/ source_body_leaf* clone() {
74  return new source_body_leaf< Output, Body >(body);
75  }
76  Body get_body() { return body; }
77  private:
78  Body body;
79  };
80 
82  template< typename Input, typename Output >
83  class function_body : tbb::internal::no_assign {
84  public:
85  virtual ~function_body() {}
86  virtual Output operator()(const Input &input) = 0;
87  virtual function_body* clone() = 0;
88  };
89 
91  template <typename Input, typename Output, typename B>
92  class function_body_leaf : public function_body< Input, Output > {
93  public:
94  function_body_leaf( const B &_body ) : body(_body) { }
95  Output operator()(const Input &i) { return body(i); }
96  B get_body() { return body; }
97  /*override*/ function_body_leaf* clone() {
99  }
100  private:
101  B body;
102  };
103 
105  template <typename B>
106  class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
107  public:
108  function_body_leaf( const B &_body ) : body(_body) { }
109  continue_msg operator()( const continue_msg &i ) {
110  body(i);
111  return i;
112  }
113  B get_body() { return body; }
114  /*override*/ function_body_leaf* clone() {
116  }
117  private:
118  B body;
119  };
120 
122  template <typename Input, typename B>
123  class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
124  public:
125  function_body_leaf( const B &_body ) : body(_body) { }
126  continue_msg operator()(const Input &i) {
127  body(i);
128  return continue_msg();
129  }
130  B get_body() { return body; }
131  /*override*/ function_body_leaf* clone() {
133  }
134  private:
135  B body;
136  };
137 
139  template <typename Output, typename B>
140  class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
141  public:
142  function_body_leaf( const B &_body ) : body(_body) { }
143  Output operator()(const continue_msg &i) {
144  return body(i);
145  }
146  B get_body() { return body; }
147  /*override*/ function_body_leaf* clone() {
149  }
150  private:
151  B body;
152  };
153 
154 #if __TBB_PREVIEW_ASYNC_NODE
155 template< typename T, typename = typename T::async_gateway_type >
156 void set_async_gateway(T *body, void *g) {
157  body->set_async_gateway(static_cast<typename T::async_gateway_type *>(g));
158 }
159 
160 inline void set_async_gateway(...) { }
161 #endif
162 
164  template<typename Input, typename OutputSet>
165  class multifunction_body : tbb::internal::no_assign {
166  public:
167  virtual ~multifunction_body () {}
168  virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/) = 0;
169  virtual multifunction_body* clone() = 0;
170 #if __TBB_PREVIEW_ASYNC_NODE
171  virtual void set_gateway(void *gateway) = 0;
172 #endif
173  };
174 
176  template<typename Input, typename OutputSet, typename B >
177  class multifunction_body_leaf : public multifunction_body<Input, OutputSet> {
178  public:
179  multifunction_body_leaf(const B &_body) : body(_body) { }
180  void operator()(const Input &input, OutputSet &oset) {
181  body(input, oset); // body may explicitly put() to one or more of oset.
182  }
183  B get_body() { return body; }
184 
185 #if __TBB_PREVIEW_ASYNC_NODE
186  /*override*/ void set_gateway(void *gateway) {
187  set_async_gateway(&body, gateway);
188  }
189 #endif
190  /*override*/ multifunction_body_leaf* clone() {
192  }
193 
194  private:
195  B body;
196  };
197 
198 // ------ function bodies for hash_buffers and key-matching joins.
199 
200 template<typename Input, typename Output>
201 class type_to_key_function_body : tbb::internal::no_assign {
202  public:
203  virtual ~type_to_key_function_body() {}
204  virtual Output operator()(const Input &input) = 0; // returns an Output
205  virtual type_to_key_function_body* clone() = 0;
206 };
207 
208 // specialization for ref output
209 template<typename Input, typename Output>
210 class type_to_key_function_body<Input,Output&> : tbb::internal::no_assign {
211  public:
212  virtual ~type_to_key_function_body() {}
213  virtual const Output & operator()(const Input &input) = 0; // returns a const Output&
214  virtual type_to_key_function_body* clone() = 0;
215 };
216 
217 template <typename Input, typename Output, typename B>
219 public:
220  type_to_key_function_body_leaf( const B &_body ) : body(_body) { }
221  /*override*/Output operator()(const Input &i) { return body(i); }
222  B get_body() { return body; }
223  /*override*/ type_to_key_function_body_leaf* clone() {
225  }
226 private:
227  B body;
228 };
229 
230 template <typename Input, typename Output, typename B>
232 public:
233  type_to_key_function_body_leaf( const B &_body ) : body(_body) { }
234 
235  /*override*/const Output& operator()(const Input &i) {
236  return body(i);
237  }
238 
239  B get_body() { return body; }
240 
241  /*override*/ type_to_key_function_body_leaf* clone() {
243  }
244 
245 private:
246  B body;
247 };
248 
249 // --------------------------- end of function_body containers ------------------------
250 
251 // --------------------------- node task bodies ---------------------------------------
252 
254  template< typename NodeType >
255  class forward_task_bypass : public task {
256 
257  NodeType &my_node;
258 
259  public:
260 
261  forward_task_bypass( NodeType &n ) : my_node(n) {}
262 
263  task *execute() {
264  task * new_task = my_node.forward_task();
265  if (new_task == SUCCESSFULLY_ENQUEUED) new_task = NULL;
266  return new_task;
267  }
268  };
269 
271  // return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return NULL
272  template< typename NodeType, typename Input >
273  class apply_body_task_bypass : public task {
274 
275  NodeType &my_node;
276  Input my_input;
277 
278  public:
279 
280  apply_body_task_bypass( NodeType &n, const Input &i ) : my_node(n), my_input(i) {}
281 
282  task *execute() {
283  task * next_task = my_node.apply_body_bypass( my_input );
284  if(next_task == SUCCESSFULLY_ENQUEUED) next_task = NULL;
285  return next_task;
286  }
287  };
288 
290  template< typename NodeType >
291  class source_task_bypass : public task {
292 
293  NodeType &my_node;
294 
295  public:
296 
297  source_task_bypass( NodeType &n ) : my_node(n) {}
298 
299  task *execute() {
300  task *new_task = my_node.apply_body_bypass( );
301  if(new_task == SUCCESSFULLY_ENQUEUED) return NULL;
302  return new_task;
303  }
304  };
305 
306 // ------------------------ end of node task bodies -----------------------------------
307 
309  template< typename Input, typename Output >
310  struct empty_body {
311  Output operator()( const Input & ) const { return Output(); }
312  };
313 
315  template< typename T, typename M=spin_mutex >
316  class node_cache {
317  public:
318 
319  typedef size_t size_type;
320 
321  bool empty() {
322  typename mutex_type::scoped_lock lock( my_mutex );
323  return internal_empty();
324  }
325 
326  void add( T &n ) {
327  typename mutex_type::scoped_lock lock( my_mutex );
328  internal_push(n);
329  }
330 
331  void remove( T &n ) {
332  typename mutex_type::scoped_lock lock( my_mutex );
333  for ( size_t i = internal_size(); i != 0; --i ) {
334  T &s = internal_pop();
335  if ( &s == &n ) return; // only remove one predecessor per request
336  internal_push(s);
337  }
338  }
339 
340  void clear() {
341  while( !my_q.empty()) (void)my_q.pop();
342 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
343  my_built_predecessors.clear();
344 #endif
345  }
346 
347 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
348  typedef edge_container<T> built_predecessors_type;
349  built_predecessors_type &built_predecessors() { return my_built_predecessors; }
350 
351  typedef typename edge_container<T>::edge_list_type predecessor_list_type;
352  void internal_add_built_predecessor( T &n ) {
353  typename mutex_type::scoped_lock lock( my_mutex );
354  my_built_predecessors.add_edge(n);
355  }
356 
357  void internal_delete_built_predecessor( T &n ) {
358  typename mutex_type::scoped_lock lock( my_mutex );
359  my_built_predecessors.delete_edge(n);
360  }
361 
362  void copy_predecessors( predecessor_list_type &v) {
363  typename mutex_type::scoped_lock lock( my_mutex );
364  my_built_predecessors.copy_edges(v);
365  }
366 
367  size_t predecessor_count() {
368  typename mutex_type::scoped_lock lock(my_mutex);
369  return (size_t)(my_built_predecessors.edge_count());
370  }
371 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
372 
373  protected:
374 
375  typedef M mutex_type;
376  mutex_type my_mutex;
377  std::queue< T * > my_q;
378 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
379  built_predecessors_type my_built_predecessors;
380 #endif
381 
382  // Assumes lock is held
383  inline bool internal_empty( ) {
384  return my_q.empty();
385  }
386 
387  // Assumes lock is held
388  inline size_type internal_size( ) {
389  return my_q.size();
390  }
391 
392  // Assumes lock is held
393  inline void internal_push( T &n ) {
394  my_q.push(&n);
395  }
396 
397  // Assumes lock is held
398  inline T &internal_pop() {
399  T *v = my_q.front();
400  my_q.pop();
401  return *v;
402  }
403 
404  };
405 
407  template< typename T, typename M=spin_mutex >
408 #if __TBB_PREVIEW_ASYNC_MSG
409  // TODO: make predecessor_cache type T-independent when async_msg becomes regular feature
410  class predecessor_cache : public node_cache< untyped_sender, M > {
411 #else
412  class predecessor_cache : public node_cache< sender<T>, M > {
413 #endif // __TBB_PREVIEW_ASYNC_MSG
414  public:
415  typedef M mutex_type;
416  typedef T output_type;
417 #if __TBB_PREVIEW_ASYNC_MSG
418  typedef untyped_sender predecessor_type;
419  typedef untyped_receiver successor_type;
420 #else
421  typedef sender<output_type> predecessor_type;
422  typedef receiver<output_type> successor_type;
423 #endif // __TBB_PREVIEW_ASYNC_MSG
424 
425  predecessor_cache( ) : my_owner( NULL ) { }
426 
427  void set_owner( successor_type *owner ) { my_owner = owner; }
428 
429  bool get_item( output_type &v ) {
430 
431  bool msg = false;
432 
433  do {
434  predecessor_type *src;
435  {
436  typename mutex_type::scoped_lock lock(this->my_mutex);
437  if ( this->internal_empty() ) {
438  break;
439  }
440  src = &this->internal_pop();
441  }
442 
443  // Try to get from this sender
444  msg = src->try_get( v );
445 
446  if (msg == false) {
447  // Relinquish ownership of the edge
448  if (my_owner)
449  src->register_successor( *my_owner );
450  } else {
451  // Retain ownership of the edge
452  this->add(*src);
453  }
454  } while ( msg == false );
455  return msg;
456  }
457 
458  // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
459  void reset() {
460  if (my_owner) {
461  for(;;) {
462  predecessor_type *src;
463  {
464  if (this->internal_empty()) break;
465  src = &this->internal_pop();
466  }
467  src->register_successor( *my_owner );
468  }
469  }
470  }
471 
472  protected:
473 
474 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
476 #endif
477  successor_type *my_owner;
478  };
479 
481  // TODO: make reservable_predecessor_cache type T-independent when async_msg becomes regular feature
482  template< typename T, typename M=spin_mutex >
484  public:
485  typedef M mutex_type;
486  typedef T output_type;
487 #if __TBB_PREVIEW_ASYNC_MSG
488  typedef untyped_sender predecessor_type;
489  typedef untyped_receiver successor_type;
490 #else
491  typedef sender<T> predecessor_type;
492  typedef receiver<T> successor_type;
493 #endif // __TBB_PREVIEW_ASYNC_MSG
494 
495  reservable_predecessor_cache( ) : reserved_src(NULL) { }
496 
497  bool
498  try_reserve( output_type &v ) {
499  bool msg = false;
500 
501  do {
502  {
503  typename mutex_type::scoped_lock lock(this->my_mutex);
504  if ( reserved_src || this->internal_empty() )
505  return false;
506 
507  reserved_src = &this->internal_pop();
508  }
509 
510  // Try to get from this sender
511  msg = reserved_src->try_reserve( v );
512 
513  if (msg == false) {
514  typename mutex_type::scoped_lock lock(this->my_mutex);
515  // Relinquish ownership of the edge
516  reserved_src->register_successor( *this->my_owner );
517  reserved_src = NULL;
518  } else {
519  // Retain ownership of the edge
520  this->add( *reserved_src );
521  }
522  } while ( msg == false );
523 
524  return msg;
525  }
526 
527  bool
528  try_release( ) {
529  reserved_src->try_release( );
530  reserved_src = NULL;
531  return true;
532  }
533 
534  bool
535  try_consume( ) {
536  reserved_src->try_consume( );
537  reserved_src = NULL;
538  return true;
539  }
540 
541  void reset( ) {
542  reserved_src = NULL;
544  }
545 
546  void clear() {
547  reserved_src = NULL;
549  }
550 
551  private:
552  predecessor_type *reserved_src;
553  };
554 
555 
557  // TODO: make successor_cache type T-independent when async_msg becomes regular feature
558  template<typename T, typename M=spin_rw_mutex >
559  class successor_cache : tbb::internal::no_copy {
560  protected:
561 
562  typedef M mutex_type;
563  mutex_type my_mutex;
564 
565 #if __TBB_PREVIEW_ASYNC_MSG
566  typedef untyped_receiver successor_type;
567  typedef untyped_receiver *pointer_type;
568  typedef untyped_sender owner_type;
569 #else
570  typedef receiver<T> successor_type;
571  typedef receiver<T> *pointer_type;
572  typedef sender<T> owner_type;
573 #endif // __TBB_PREVIEW_ASYNC_MSG
574  typedef std::list< pointer_type > successors_type;
575 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
576  edge_container<successor_type> my_built_successors;
577 #endif
578  successors_type my_successors;
579 
580  owner_type *my_owner;
581 
582  public:
583 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
584  typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
585 
586  edge_container<successor_type> &built_successors() { return my_built_successors; }
587 
588  void internal_add_built_successor( successor_type &r) {
589  typename mutex_type::scoped_lock l(my_mutex, true);
590  my_built_successors.add_edge( r );
591  }
592 
593  void internal_delete_built_successor( successor_type &r) {
594  typename mutex_type::scoped_lock l(my_mutex, true);
595  my_built_successors.delete_edge(r);
596  }
597 
598  void copy_successors( successor_list_type &v) {
599  typename mutex_type::scoped_lock l(my_mutex, false);
600  my_built_successors.copy_edges(v);
601  }
602 
603  size_t successor_count() {
604  typename mutex_type::scoped_lock l(my_mutex,false);
605  return my_built_successors.edge_count();
606  }
607 
608 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
609 
610  successor_cache( ) : my_owner(NULL) {}
611 
612  void set_owner( owner_type *owner ) { my_owner = owner; }
613 
614  virtual ~successor_cache() {}
615 
616  void register_successor( successor_type &r ) {
617  typename mutex_type::scoped_lock l(my_mutex, true);
618  my_successors.push_back( &r );
619  }
620 
621  void remove_successor( successor_type &r ) {
622  typename mutex_type::scoped_lock l(my_mutex, true);
623  for ( typename successors_type::iterator i = my_successors.begin();
624  i != my_successors.end(); ++i ) {
625  if ( *i == & r ) {
626  my_successors.erase(i);
627  break;
628  }
629  }
630  }
631 
632  bool empty() {
633  typename mutex_type::scoped_lock l(my_mutex, false);
634  return my_successors.empty();
635  }
636 
637  void clear() {
638  my_successors.clear();
639 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
640  my_built_successors.clear();
641 #endif
642  }
643 
644 #if !__TBB_PREVIEW_ASYNC_MSG
645  virtual task * try_put_task( const T &t ) = 0;
646 #endif // __TBB_PREVIEW_ASYNC_MSG
647  }; // successor_cache<T>
648 
650  template<>
651  class successor_cache< continue_msg > : tbb::internal::no_copy {
652  protected:
653 
654  typedef spin_rw_mutex mutex_type;
655  mutex_type my_mutex;
656 
657 #if __TBB_PREVIEW_ASYNC_MSG
658  typedef untyped_receiver successor_type;
659  typedef untyped_receiver *pointer_type;
660 #else
661  typedef receiver<continue_msg> successor_type;
662  typedef receiver<continue_msg> *pointer_type;
663 #endif // __TBB_PREVIEW_ASYNC_MSG
664  typedef std::list< pointer_type > successors_type;
665  successors_type my_successors;
666 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
667  edge_container<successor_type> my_built_successors;
668  typedef edge_container<successor_type>::edge_list_type successor_list_type;
669 #endif
670 
671  sender<continue_msg> *my_owner;
672 
673  public:
674 
675 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
676 
677  edge_container<successor_type> &built_successors() { return my_built_successors; }
678 
679  void internal_add_built_successor( successor_type &r) {
680  mutex_type::scoped_lock l(my_mutex, true);
681  my_built_successors.add_edge( r );
682  }
683 
684  void internal_delete_built_successor( successor_type &r) {
685  mutex_type::scoped_lock l(my_mutex, true);
686  my_built_successors.delete_edge(r);
687  }
688 
689  void copy_successors( successor_list_type &v) {
690  mutex_type::scoped_lock l(my_mutex, false);
691  my_built_successors.copy_edges(v);
692  }
693 
694  size_t successor_count() {
695  mutex_type::scoped_lock l(my_mutex,false);
696  return my_built_successors.edge_count();
697  }
698 
699 #endif /* TBB_PREVIEW_FLOW_GRAPH_FEATURES */
700 
701  successor_cache( ) : my_owner(NULL) {}
702 
703  void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
704 
705  virtual ~successor_cache() {}
706 
707  void register_successor( successor_type &r ) {
708  mutex_type::scoped_lock l(my_mutex, true);
709  my_successors.push_back( &r );
710  if ( my_owner && r.is_continue_receiver() ) {
711  r.register_predecessor( *my_owner );
712  }
713  }
714 
715  void remove_successor( successor_type &r ) {
716  mutex_type::scoped_lock l(my_mutex, true);
717  for ( successors_type::iterator i = my_successors.begin();
718  i != my_successors.end(); ++i ) {
719  if ( *i == & r ) {
720  // TODO: Check if we need to test for continue_receiver before
721  // removing from r.
722  if ( my_owner )
723  r.remove_predecessor( *my_owner );
724  my_successors.erase(i);
725  break;
726  }
727  }
728  }
729 
730  bool empty() {
731  mutex_type::scoped_lock l(my_mutex, false);
732  return my_successors.empty();
733  }
734 
735  void clear() {
736  my_successors.clear();
737 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES
738  my_built_successors.clear();
739 #endif
740  }
741 
742 #if !__TBB_PREVIEW_ASYNC_MSG
743  virtual task * try_put_task( const continue_msg &t ) = 0;
744 #endif // __TBB_PREVIEW_ASYNC_MSG
745 
746  }; // successor_cache< continue_msg >
747 
749  // TODO: make broadcast_cache type T-independent when async_msg becomes regular feature
750  template<typename T, typename M=spin_rw_mutex>
751  class broadcast_cache : public successor_cache<T, M> {
752  typedef M mutex_type;
753  typedef typename successor_cache<T,M>::successors_type successors_type;
754 
755  public:
756 
757  broadcast_cache( ) {}
758 
759  // as above, but call try_put_task instead, and return the last task we received (if any)
760 #if __TBB_PREVIEW_ASYNC_MSG
761  template<typename X>
762  task * try_put_task( const X &t ) {
763 #else
764  /*override*/ task * try_put_task( const T &t ) {
765 #endif // __TBB_PREVIEW_ASYNC_MSG
766  task * last_task = NULL;
767  bool upgraded = true;
768  typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
769  typename successors_type::iterator i = this->my_successors.begin();
770  while ( i != this->my_successors.end() ) {
771  task *new_task = (*i)->try_put_task(t);
772  last_task = combine_tasks(last_task, new_task); // enqueue if necessary
773  if(new_task) {
774  ++i;
775  }
776  else { // failed
777  if ( (*i)->register_predecessor(*this->my_owner) ) {
778  if (!upgraded) {
779  l.upgrade_to_writer();
780  upgraded = true;
781  }
782  i = this->my_successors.erase(i);
783  } else {
784  ++i;
785  }
786  }
787  }
788  return last_task;
789  }
790 
791  };
792 
794  // TODO: make round_robin_cache type T-independent when async_msg becomes regular feature
795  template<typename T, typename M=spin_rw_mutex >
796  class round_robin_cache : public successor_cache<T, M> {
797  typedef size_t size_type;
798  typedef M mutex_type;
799  typedef typename successor_cache<T,M>::successors_type successors_type;
800 
801  public:
802 
803  round_robin_cache( ) {}
804 
805  size_type size() {
806  typename mutex_type::scoped_lock l(this->my_mutex, false);
807  return this->my_successors.size();
808  }
809 
810 #if __TBB_PREVIEW_ASYNC_MSG
811  template<typename X>
812  task * try_put_task( const X &t ) {
813 #else
814  /*override*/task *try_put_task( const T &t ) {
815 #endif // __TBB_PREVIEW_ASYNC_MSG
816  bool upgraded = true;
817  typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
818  typename successors_type::iterator i = this->my_successors.begin();
819  while ( i != this->my_successors.end() ) {
820  task *new_task = (*i)->try_put_task(t);
821  if ( new_task ) {
822  return new_task;
823  } else {
824  if ( (*i)->register_predecessor(*this->my_owner) ) {
825  if (!upgraded) {
826  l.upgrade_to_writer();
827  upgraded = true;
828  }
829  i = this->my_successors.erase(i);
830  }
831  else {
832  ++i;
833  }
834  }
835  }
836  return NULL;
837  }
838  };
839 
840  template<typename T>
841  class decrementer : public continue_receiver, tbb::internal::no_copy {
842 
843  T *my_node;
844 
845  task *execute() {
846  return my_node->decrement_counter();
847  }
848 
849  public:
850 
851  typedef continue_msg input_type;
852  typedef continue_msg output_type;
853  decrementer( int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
854  void set_owner( T *node ) { my_node = node; }
855  };
856 
857 }
858 
859 #endif // __TBB__flow_graph_impl_H
860 
The leaf for source_body.
Definition: _flow_graph_impl.h:69
A task that calls a node&#39;s apply_body_bypass function with no input.
Definition: _flow_graph_impl.h:291
the leaf for function_body specialized for Input and output of continue_msg
Definition: _flow_graph_impl.h:106
Definition: _flow_graph_impl.h:40
Definition: _flow_graph_impl.h:218
A cache of successors that are put in a round-robin fashion.
Definition: _flow_graph_impl.h:796
A task that calls a node&#39;s apply_body_bypass function, passing in an input of type Input...
Definition: _flow_graph_impl.h:273
A task that calls a node&#39;s forward_task function.
Definition: _flow_graph_impl.h:255
An empty functor that takes an Input and returns a default constructed Output.
Definition: _flow_graph_impl.h:310
An cache of predecessors that supports requests and reservations.
Definition: _flow_graph_impl.h:483
A cache of successors that are broadcast to.
Definition: _flow_graph_impl.h:751
A cache of predecessors that only supports try_get.
Definition: _flow_graph_impl.h:412
Definition: _flow_graph_impl.h:38
A functor that takes no input and generates a value of type Output.
Definition: _flow_graph_impl.h:60
the leaf for function_body specialized for Input of continue_msg
Definition: _flow_graph_impl.h:140
An abstract cache of successors.
Definition: _flow_graph_impl.h:559
Definition: _flow_graph_async_msg_impl.h:32
A functor that takes an Input and generates an Output.
Definition: _flow_graph_impl.h:83
Definition: _flow_graph_impl.h:841
Definition: _flow_graph_impl.h:39
field of type K being used for matching.
Definition: _flow_graph_impl.h:46
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: _flow_graph_impl.h:177
the leaf for function_body specialized for Output of continue_msg
Definition: _flow_graph_impl.h:123
Definition: _flow_graph_impl.h:201
function_body that takes an Input and a set of output ports
Definition: _flow_graph_impl.h:165
the leaf for function_body
Definition: _flow_graph_impl.h:92
Strips its template type argument from cv- and ref-qualifiers.
Definition: _template_helpers.h:33
A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock...
Definition: _flow_graph_impl.h:316