BRE12
pipeline.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB_pipeline_H
22 #define __TBB_pipeline_H
23 
24 #include "atomic.h"
25 #include "task.h"
26 #include "tbb_allocator.h"
27 #include <cstddef>
28 
29 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT || __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT
30 #include <type_traits>
31 #endif
32 
33 namespace tbb {
34 
35 class pipeline;
36 class filter;
37 
39 namespace internal {
40 
41 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
42 #define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1)
43 
44 typedef unsigned long Token;
45 typedef long tokendiff_t;
46 class stage_task;
47 class input_buffer;
48 class pipeline_root_task;
49 class pipeline_cleaner;
50 
51 } // namespace internal
52 
53 namespace interface6 {
54  template<typename T, typename U> class filter_t;
55 
56  namespace internal {
57  class pipeline_proxy;
58  }
59 }
60 
62 
64 
65 class filter: internal::no_copy {
66 private:
68  static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
69 protected:
71  static const unsigned char filter_is_serial = 0x1;
72 
74 
76  static const unsigned char filter_is_out_of_order = 0x1<<4;
77 
79  static const unsigned char filter_is_bound = 0x1<<5;
80 
82  static const unsigned char filter_may_emit_null = 0x1<<6;
83 
85  static const unsigned char exact_exception_propagation =
86 #if TBB_USE_CAPTURED_EXCEPTION
87  0x0;
88 #else
89  0x1<<7;
90 #endif /* TBB_USE_CAPTURED_EXCEPTION */
91 
92  static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
93  static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
94 public:
95  enum mode {
97  parallel = current_version | filter_is_out_of_order,
99  serial_in_order = current_version | filter_is_serial,
101  serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
103  serial = serial_in_order
104  };
105 protected:
106  filter( bool is_serial_ ) :
107  next_filter_in_pipeline(not_in_pipeline()),
108  my_input_buffer(NULL),
109  my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
110  prev_filter_in_pipeline(not_in_pipeline()),
111  my_pipeline(NULL),
112  next_segment(NULL)
113  {}
114 
115  filter( mode filter_mode ) :
116  next_filter_in_pipeline(not_in_pipeline()),
117  my_input_buffer(NULL),
118  my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
119  prev_filter_in_pipeline(not_in_pipeline()),
120  my_pipeline(NULL),
121  next_segment(NULL)
122  {}
123 
124  // signal end-of-input for concrete_filters
125  void __TBB_EXPORTED_METHOD set_end_of_input();
126 
127 public:
129  bool is_serial() const {
130  return bool( my_filter_mode & filter_is_serial );
131  }
132 
134  bool is_ordered() const {
135  return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
136  }
137 
139  bool is_bound() const {
140  return ( my_filter_mode & filter_is_bound )==filter_is_bound;
141  }
142 
145  return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
146  }
147 
149 
150  virtual void* operator()( void* item ) = 0;
151 
153 
154  virtual __TBB_EXPORTED_METHOD ~filter();
155 
156 #if __TBB_TASK_GROUP_CONTEXT
157 
160  virtual void finalize( void* /*item*/ ) {};
161 #endif
162 
163 private:
165  filter* next_filter_in_pipeline;
166 
168  // (pipeline has not yet reached end_of_input or this filter has not yet
169  // seen the last token produced by input_filter)
170  bool has_more_work();
171 
173 
174  internal::input_buffer* my_input_buffer;
175 
176  friend class internal::stage_task;
177  friend class internal::pipeline_root_task;
178  friend class pipeline;
179  friend class thread_bound_filter;
180 
182  const unsigned char my_filter_mode;
183 
185  filter* prev_filter_in_pipeline;
186 
188  pipeline* my_pipeline;
189 
191 
192  filter* next_segment;
193 };
194 
196 
198 public:
199  enum result_type {
200  // item was processed
201  success,
202  // item is currently not available
203  item_not_available,
204  // there are no more items to process
205  end_of_stream
206  };
207 protected:
208  thread_bound_filter(mode filter_mode):
209  filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
210  {
211  __TBB_ASSERT(filter_mode & filter::filter_is_serial, "thread-bound filters must be serial");
212  }
213 public:
215 
220  result_type __TBB_EXPORTED_METHOD try_process_item();
221 
223 
227  result_type __TBB_EXPORTED_METHOD process_item();
228 
229 private:
231  result_type internal_process_item(bool is_blocking);
232 };
233 
235 
236 class pipeline {
237 public:
239  __TBB_EXPORTED_METHOD pipeline();
240 
243  virtual __TBB_EXPORTED_METHOD ~pipeline();
244 
246  void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
247 
249  void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
250 
251 #if __TBB_TASK_GROUP_CONTEXT
252  void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
254 #endif
255 
257  void __TBB_EXPORTED_METHOD clear();
258 
259 private:
260  friend class internal::stage_task;
261  friend class internal::pipeline_root_task;
262  friend class filter;
263  friend class thread_bound_filter;
264  friend class internal::pipeline_cleaner;
265  friend class tbb::interface6::internal::pipeline_proxy;
266 
268  filter* filter_list;
269 
271  filter* filter_end;
272 
274  task* end_counter;
275 
277  atomic<internal::Token> input_tokens;
278 
280  atomic<internal::Token> token_counter;
281 
283  bool end_of_input;
284 
286  bool has_thread_bound_filters;
287 
289  void remove_filter( filter& filter_ );
290 
292  void __TBB_EXPORTED_METHOD inject_token( task& self );
293 
294 #if __TBB_TASK_GROUP_CONTEXT
295  void clear_filters();
297 #endif
298 };
299 
300 //------------------------------------------------------------------------
301 // Support for lambda-friendly parallel_pipeline interface
302 //------------------------------------------------------------------------
303 
304 namespace interface6 {
305 
306 namespace internal {
307  template<typename T, typename U, typename Body> class concrete_filter;
308 }
309 
312  bool is_pipeline_stopped;
313  flow_control() { is_pipeline_stopped = false; }
314  template<typename T, typename U, typename Body> friend class internal::concrete_filter;
315 public:
316  void stop() { is_pipeline_stopped = true; }
317 };
318 
320 namespace internal {
321 
322 template<typename T> struct tbb_large_object {enum { value = sizeof(T) > sizeof(void *) }; };
323 
324 // Obtain type properties in one or another way
325 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT
326 template<typename T> struct tbb_trivially_copyable { enum { value = std::is_trivially_copyable<T>::value }; };
327 #elif __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT
328 template<typename T> struct tbb_trivially_copyable { enum { value = std::has_trivial_copy_constructor<T>::value }; };
329 #else
330 // Explicitly list the types we wish to be placed as-is in the pipeline input_buffers.
331 template<typename T> struct tbb_trivially_copyable { enum { value = false }; };
332 template<typename T> struct tbb_trivially_copyable <T*> { enum { value = true }; };
333 template<> struct tbb_trivially_copyable <short> { enum { value = true }; };
334 template<> struct tbb_trivially_copyable <unsigned short> { enum { value = true }; };
335 template<> struct tbb_trivially_copyable <int> { enum { value = !tbb_large_object<int>::value }; };
336 template<> struct tbb_trivially_copyable <unsigned int> { enum { value = !tbb_large_object<int>::value }; };
337 template<> struct tbb_trivially_copyable <long> { enum { value = !tbb_large_object<long>::value }; };
338 template<> struct tbb_trivially_copyable <unsigned long> { enum { value = !tbb_large_object<long>::value }; };
339 template<> struct tbb_trivially_copyable <float> { enum { value = !tbb_large_object<float>::value }; };
340 template<> struct tbb_trivially_copyable <double> { enum { value = !tbb_large_object<double>::value }; };
341 #endif // Obtaining type properties
342 
343 template<typename T> struct is_large_object {enum { value = tbb_large_object<T>::value || !tbb_trivially_copyable<T>::value }; };
344 
345 template<typename T, bool> class token_helper;
346 
347 // large object helper (uses tbb_allocator)
348 template<typename T>
349 class token_helper<T, true> {
350  public:
351  typedef typename tbb::tbb_allocator<T> allocator;
352  typedef T* pointer;
353  typedef T value_type;
354  static pointer create_token(const value_type & source) {
355  pointer output_t = allocator().allocate(1);
356  return new (output_t) T(source);
357  }
358  static value_type & token(pointer & t) { return *t;}
359  static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
360  static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
361  static void destroy_token(pointer token) {
362  allocator().destroy(token);
363  allocator().deallocate(token,1);
364  }
365 };
366 
367 // pointer specialization
368 template<typename T>
369 class token_helper<T*, false > {
370  public:
371  typedef T* pointer;
372  typedef T* value_type;
373  static pointer create_token(const value_type & source) { return source; }
374  static value_type & token(pointer & t) { return t;}
375  static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
376  static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
377  static void destroy_token( pointer /*token*/) {}
378 };
379 
380 // small object specialization (converts void* to the correct type, passes objects directly.)
381 template<typename T>
382 class token_helper<T, false> {
383  typedef union {
384  T actual_value;
385  void * void_overlay;
386  } type_to_void_ptr_map;
387  public:
388  typedef T pointer; // not really a pointer in this case.
389  typedef T value_type;
390  static pointer create_token(const value_type & source) {
391  return source; }
392  static value_type & token(pointer & t) { return t;}
393  static void * cast_to_void_ptr(pointer ref) {
394  type_to_void_ptr_map mymap;
395  mymap.void_overlay = NULL;
396  mymap.actual_value = ref;
397  return mymap.void_overlay;
398  }
399  static pointer cast_from_void_ptr(void * ref) {
400  type_to_void_ptr_map mymap;
401  mymap.void_overlay = ref;
402  return mymap.actual_value;
403  }
404  static void destroy_token( pointer /*token*/) {}
405 };
406 
407 template<typename T, typename U, typename Body>
408 class concrete_filter: public tbb::filter {
409  const Body& my_body;
410  typedef token_helper<T,is_large_object<T>::value > t_helper;
411  typedef typename t_helper::pointer t_pointer;
412  typedef token_helper<U,is_large_object<U>::value > u_helper;
413  typedef typename u_helper::pointer u_pointer;
414 
415  /*override*/ void* operator()(void* input) {
416  t_pointer temp_input = t_helper::cast_from_void_ptr(input);
417  u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
418  t_helper::destroy_token(temp_input);
419  return u_helper::cast_to_void_ptr(output_u);
420  }
421 
422  /*override*/ void finalize(void * input) {
423  t_pointer temp_input = t_helper::cast_from_void_ptr(input);
424  t_helper::destroy_token(temp_input);
425  }
426 
427 public:
428  concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
429 };
430 
431 // input
432 template<typename U, typename Body>
433 class concrete_filter<void,U,Body>: public filter {
434  const Body& my_body;
435  typedef token_helper<U, is_large_object<U>::value > u_helper;
436  typedef typename u_helper::pointer u_pointer;
437 
438  /*override*/void* operator()(void*) {
439  flow_control control;
440  u_pointer output_u = u_helper::create_token(my_body(control));
441  if(control.is_pipeline_stopped) {
442  u_helper::destroy_token(output_u);
443  set_end_of_input();
444  return NULL;
445  }
446  return u_helper::cast_to_void_ptr(output_u);
447  }
448 
449 public:
450  concrete_filter(tbb::filter::mode filter_mode, const Body& body) :
451  filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
452  my_body(body)
453  {}
454 };
455 
456 template<typename T, typename Body>
457 class concrete_filter<T,void,Body>: public filter {
458  const Body& my_body;
459  typedef token_helper<T, is_large_object<T>::value > t_helper;
460  typedef typename t_helper::pointer t_pointer;
461 
462  /*override*/ void* operator()(void* input) {
463  t_pointer temp_input = t_helper::cast_from_void_ptr(input);
464  my_body(t_helper::token(temp_input));
465  t_helper::destroy_token(temp_input);
466  return NULL;
467  }
468  /*override*/ void finalize(void* input) {
469  t_pointer temp_input = t_helper::cast_from_void_ptr(input);
470  t_helper::destroy_token(temp_input);
471  }
472 
473 public:
474  concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
475 };
476 
477 template<typename Body>
478 class concrete_filter<void,void,Body>: public filter {
479  const Body& my_body;
480 
482  /*override*/ void* operator()(void*) {
483  flow_control control;
484  my_body(control);
485  void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
486  return output;
487  }
488 public:
489  concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
490 };
491 
493 
494 class pipeline_proxy {
495  tbb::pipeline my_pipe;
496 public:
497  pipeline_proxy( const filter_t<void,void>& filter_chain );
498  ~pipeline_proxy() {
499  while( filter* f = my_pipe.filter_list )
500  delete f; // filter destructor removes it from the pipeline
501  }
502  tbb::pipeline* operator->() { return &my_pipe; }
503 };
504 
506 
507 class filter_node: tbb::internal::no_copy {
509  tbb::atomic<intptr_t> ref_count;
510 protected:
511  filter_node() {
512  ref_count = 0;
513 #ifdef __TBB_TEST_FILTER_NODE_COUNT
514  ++(__TBB_TEST_FILTER_NODE_COUNT);
515 #endif
516  }
517 public:
519  virtual void add_to( pipeline& ) = 0;
521  void add_ref() {++ref_count;}
523  void remove_ref() {
524  __TBB_ASSERT(ref_count>0,"ref_count underflow");
525  if( --ref_count==0 )
526  delete this;
527  }
528  virtual ~filter_node() {
529 #ifdef __TBB_TEST_FILTER_NODE_COUNT
530  --(__TBB_TEST_FILTER_NODE_COUNT);
531 #endif
532  }
533 };
534 
536 template<typename T, typename U, typename Body>
537 class filter_node_leaf: public filter_node {
538  const tbb::filter::mode mode;
539  const Body body;
540  /*override*/void add_to( pipeline& p ) {
541  concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
542  p.add_filter( *f );
543  }
544 public:
545  filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
546 };
547 
549 class filter_node_join: public filter_node {
550  friend class filter_node; // to suppress GCC 3.2 warnings
551  filter_node& left;
552  filter_node& right;
553  /*override*/~filter_node_join() {
554  left.remove_ref();
555  right.remove_ref();
556  }
557  /*override*/void add_to( pipeline& p ) {
558  left.add_to(p);
559  right.add_to(p);
560  }
561 public:
562  filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
563  left.add_ref();
564  right.add_ref();
565  }
566 };
567 
568 } // namespace internal
570 
572 template<typename T, typename U, typename Body>
573 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
574  return new internal::filter_node_leaf<T,U,Body>(mode, body);
575 }
576 
577 template<typename T, typename V, typename U>
578 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
579  __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
580  __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
581  return new internal::filter_node_join(*left.root,*right.root);
582 }
583 
585 template<typename T, typename U>
586 class filter_t {
587  typedef internal::filter_node filter_node;
588  filter_node* root;
589  filter_t( filter_node* root_ ) : root(root_) {
590  root->add_ref();
591  }
592  friend class internal::pipeline_proxy;
593  template<typename T_, typename U_, typename Body>
594  friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
595  template<typename T_, typename V_, typename U_>
596  friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
597 public:
598  // TODO: add move-constructors, move-assignment, etc. where C++11 is available.
599  filter_t() : root(NULL) {}
600  filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
601  if( root ) root->add_ref();
602  }
603  template<typename Body>
604  filter_t( tbb::filter::mode mode, const Body& body ) :
605  root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
606  root->add_ref();
607  }
608 
609  void operator=( const filter_t<T,U>& rhs ) {
610  // Order of operations below carefully chosen so that reference counts remain correct
611  // in unlikely event that remove_ref throws exception.
612  filter_node* old = root;
613  root = rhs.root;
614  if( root ) root->add_ref();
615  if( old ) old->remove_ref();
616  }
617  ~filter_t() {
618  if( root ) root->remove_ref();
619  }
620  void clear() {
621  // Like operator= with filter_t() on right side.
622  if( root ) {
623  filter_node* old = root;
624  root = NULL;
625  old->remove_ref();
626  }
627  }
628 };
629 
630 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
631  __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" );
632  filter_chain.root->add_to(my_pipe);
633 }
634 
635 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
636 #if __TBB_TASK_GROUP_CONTEXT
637  , tbb::task_group_context& context
638 #endif
639  ) {
640  internal::pipeline_proxy pipe(filter_chain);
641  // tbb::pipeline::run() is called via the proxy
642  pipe->run(max_number_of_live_tokens
643 #if __TBB_TASK_GROUP_CONTEXT
644  , context
645 #endif
646  );
647 }
648 
649 #if __TBB_TASK_GROUP_CONTEXT
650 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
651  tbb::task_group_context context;
652  parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
653 }
654 #endif // __TBB_TASK_GROUP_CONTEXT
655 
656 } // interface6
657 
660 using interface6::make_filter;
661 using interface6::parallel_pipeline;
662 
663 } // tbb
664 
665 #endif /* __TBB_pipeline_H */
A processing pipeline that applies filters to items.
Definition: pipeline.h:236
virtual void finalize(void *)
Destroys item if pipeline was cancelled.
Definition: pipeline.h:160
static const unsigned char filter_is_serial
The lowest bit 0 is for parallel vs. serial.
Definition: pipeline.h:71
static const unsigned char filter_is_bound
5th bit distinguishes thread-bound and regular filters.
Definition: pipeline.h:79
Class representing a chain of type-safe pipeline filters.
Definition: pipeline.h:586
input_filter control to signal end-of-input for parallel_pipeline
Definition: pipeline.h:311
bool is_serial() const
True if filter is serial.
Definition: pipeline.h:129
bool object_may_be_null()
true if an input filter can emit null
Definition: pipeline.h:144
A stage in a pipeline served by a user thread.
Definition: pipeline.h:197
mode
Definition: pipeline.h:95
A stage in a pipeline.
Definition: pipeline.h:65
pointer allocate(size_type n, const void *=0)
Allocate space for n objects.
Definition: tbb_allocator.h:100
*/
Definition: material.h:665
bool is_ordered() const
True if filter must receive stream in order.
Definition: pipeline.h:134
Definition: _flow_graph_async_msg_impl.h:32
bool is_bound() const
True if filter is thread-bound.
Definition: pipeline.h:139
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
Definition: tbb_allocator.h:73
void __TBB_EXPORTED_METHOD add_filter(filter &filter_)
Add filter to end of pipeline.