21 #ifndef __TBB_pipeline_H 22 #define __TBB_pipeline_H 26 #include "tbb_allocator.h" 29 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT || __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT 30 #include <type_traits> 42 #define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1) 44 typedef unsigned long Token;
45 typedef long tokendiff_t;
48 class pipeline_root_task;
49 class pipeline_cleaner;
53 namespace interface6 {
54 template<
typename T,
typename U>
class filter_t;
68 static filter* not_in_pipeline() {
return reinterpret_cast<filter*
>(intptr_t(-1));}
71 static const unsigned char filter_is_serial = 0x1;
76 static const unsigned char filter_is_out_of_order = 0x1<<4;
79 static const unsigned char filter_is_bound = 0x1<<5;
82 static const unsigned char filter_may_emit_null = 0x1<<6;
85 static const unsigned char exact_exception_propagation =
86 #if TBB_USE_CAPTURED_EXCEPTION 92 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
93 static const unsigned char version_mask = 0x7<<1;
97 parallel = current_version | filter_is_out_of_order,
99 serial_in_order = current_version | filter_is_serial,
101 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
103 serial = serial_in_order
106 filter(
bool is_serial_ ) :
107 next_filter_in_pipeline(not_in_pipeline()),
108 my_input_buffer(NULL),
109 my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
110 prev_filter_in_pipeline(not_in_pipeline()),
116 next_filter_in_pipeline(not_in_pipeline()),
117 my_input_buffer(NULL),
118 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
119 prev_filter_in_pipeline(not_in_pipeline()),
125 void __TBB_EXPORTED_METHOD set_end_of_input();
130 return bool( my_filter_mode & filter_is_serial );
135 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
140 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
145 return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
150 virtual void* operator()(
void* item ) = 0;
154 virtual __TBB_EXPORTED_METHOD ~
filter();
156 #if __TBB_TASK_GROUP_CONTEXT 165 filter* next_filter_in_pipeline;
170 bool has_more_work();
174 internal::input_buffer* my_input_buffer;
176 friend class internal::stage_task;
177 friend class internal::pipeline_root_task;
182 const unsigned char my_filter_mode;
185 filter* prev_filter_in_pipeline;
220 result_type __TBB_EXPORTED_METHOD try_process_item();
227 result_type __TBB_EXPORTED_METHOD process_item();
231 result_type internal_process_item(
bool is_blocking);
243 virtual __TBB_EXPORTED_METHOD ~pipeline();
246 void __TBB_EXPORTED_METHOD add_filter(
filter& filter_ );
249 void __TBB_EXPORTED_METHOD run(
size_t max_number_of_live_tokens );
251 #if __TBB_TASK_GROUP_CONTEXT 252 void __TBB_EXPORTED_METHOD run(
size_t max_number_of_live_tokens, tbb::task_group_context& context );
257 void __TBB_EXPORTED_METHOD clear();
260 friend class internal::stage_task;
261 friend class internal::pipeline_root_task;
264 friend class internal::pipeline_cleaner;
265 friend class tbb::interface6::internal::pipeline_proxy;
286 bool has_thread_bound_filters;
289 void remove_filter(
filter& filter_ );
292 void __TBB_EXPORTED_METHOD inject_token( task&
self );
294 #if __TBB_TASK_GROUP_CONTEXT 295 void clear_filters();
304 namespace interface6 {
312 bool is_pipeline_stopped;
316 void stop() { is_pipeline_stopped =
true; }
322 template<
typename T>
struct tbb_large_object {
enum { value =
sizeof(T) >
sizeof(
void *) }; };
325 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT 326 template<
typename T>
struct tbb_trivially_copyable {
enum { value = std::is_trivially_copyable<T>::value }; };
327 #elif __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT 328 template<
typename T>
struct tbb_trivially_copyable {
enum { value = std::has_trivial_copy_constructor<T>::value }; };
331 template<
typename T>
struct tbb_trivially_copyable {
enum { value =
false }; };
332 template<
typename T>
struct tbb_trivially_copyable <T*> {
enum { value =
true }; };
333 template<>
struct tbb_trivially_copyable <short> {
enum { value =
true }; };
334 template<>
struct tbb_trivially_copyable <unsigned short> {
enum { value =
true }; };
335 template<>
struct tbb_trivially_copyable <int> {
enum { value = !tbb_large_object<int>::value }; };
336 template<>
struct tbb_trivially_copyable <unsigned int> {
enum { value = !tbb_large_object<int>::value }; };
337 template<>
struct tbb_trivially_copyable <long> {
enum { value = !tbb_large_object<long>::value }; };
338 template<>
struct tbb_trivially_copyable <unsigned long> {
enum { value = !tbb_large_object<long>::value }; };
339 template<>
struct tbb_trivially_copyable <float> {
enum { value = !tbb_large_object<float>::value }; };
340 template<>
struct tbb_trivially_copyable <double> {
enum { value = !tbb_large_object<double>::value }; };
341 #endif // Obtaining type properties 343 template<
typename T>
struct is_large_object {
enum { value = tbb_large_object<T>::value || !tbb_trivially_copyable<T>::value }; };
345 template<
typename T,
bool>
class token_helper;
349 class token_helper<T, true> {
353 typedef T value_type;
354 static pointer create_token(
const value_type & source) {
355 pointer output_t = allocator().
allocate(1);
356 return new (output_t) T(source);
358 static value_type & token(pointer & t) {
return *t;}
359 static void * cast_to_void_ptr(pointer ref) {
return (
void *) ref; }
360 static pointer cast_from_void_ptr(
void * ref) {
return (pointer)ref; }
361 static void destroy_token(pointer token) {
362 allocator().destroy(token);
363 allocator().deallocate(token,1);
369 class token_helper<T*,
false > {
372 typedef T* value_type;
373 static pointer create_token(
const value_type & source) {
return source; }
374 static value_type & token(pointer & t) {
return t;}
375 static void * cast_to_void_ptr(pointer ref) {
return (
void *)ref; }
376 static pointer cast_from_void_ptr(
void * ref) {
return (pointer)ref; }
377 static void destroy_token( pointer ) {}
382 class token_helper<T, false> {
386 } type_to_void_ptr_map;
389 typedef T value_type;
390 static pointer create_token(
const value_type & source) {
392 static value_type & token(pointer & t) {
return t;}
393 static void * cast_to_void_ptr(pointer ref) {
394 type_to_void_ptr_map mymap;
395 mymap.void_overlay = NULL;
396 mymap.actual_value = ref;
397 return mymap.void_overlay;
399 static pointer cast_from_void_ptr(
void * ref) {
400 type_to_void_ptr_map mymap;
401 mymap.void_overlay = ref;
402 return mymap.actual_value;
404 static void destroy_token( pointer ) {}
407 template<
typename T,
typename U,
typename Body>
410 typedef token_helper<T,is_large_object<T>::value > t_helper;
411 typedef typename t_helper::pointer t_pointer;
412 typedef token_helper<U,is_large_object<U>::value > u_helper;
413 typedef typename u_helper::pointer u_pointer;
415 void* operator()(
void* input) {
416 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
417 u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
418 t_helper::destroy_token(temp_input);
419 return u_helper::cast_to_void_ptr(output_u);
422 void finalize(
void * input) {
423 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
424 t_helper::destroy_token(temp_input);
432 template<
typename U,
typename Body>
433 class concrete_filter<void,U,Body>:
public filter {
435 typedef token_helper<U, is_large_object<U>::value > u_helper;
436 typedef typename u_helper::pointer u_pointer;
438 void* operator()(
void*) {
440 u_pointer output_u = u_helper::create_token(my_body(control));
441 if(control.is_pipeline_stopped) {
442 u_helper::destroy_token(output_u);
446 return u_helper::cast_to_void_ptr(output_u);
451 filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
456 template<
typename T,
typename Body>
457 class concrete_filter<T,void,Body>:
public filter {
459 typedef token_helper<T, is_large_object<T>::value > t_helper;
460 typedef typename t_helper::pointer t_pointer;
462 void* operator()(
void* input) {
463 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
464 my_body(t_helper::token(temp_input));
465 t_helper::destroy_token(temp_input);
468 void finalize(
void* input) {
469 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
470 t_helper::destroy_token(temp_input);
477 template<
typename Body>
478 class concrete_filter<void,void,Body>:
public filter {
482 void* operator()(
void*) {
485 void* output = control.is_pipeline_stopped ? NULL : (
void*)(intptr_t)-1;
489 concrete_filter(
filter::mode filter_mode,
const Body& body) :
filter(filter_mode), my_body(body) {}
494 class pipeline_proxy {
499 while(
filter* f = my_pipe.filter_list )
507 class filter_node: tbb::internal::no_copy {
513 #ifdef __TBB_TEST_FILTER_NODE_COUNT 514 ++(__TBB_TEST_FILTER_NODE_COUNT);
519 virtual void add_to(
pipeline& ) = 0;
521 void add_ref() {++ref_count;}
524 __TBB_ASSERT(ref_count>0,
"ref_count underflow");
528 virtual ~filter_node() {
529 #ifdef __TBB_TEST_FILTER_NODE_COUNT 530 --(__TBB_TEST_FILTER_NODE_COUNT);
536 template<
typename T,
typename U,
typename Body>
537 class filter_node_leaf:
public filter_node {
541 concrete_filter<T,U,Body>* f =
new concrete_filter<T,U,Body>(mode,body);
549 class filter_node_join:
public filter_node {
550 friend class filter_node;
553 ~filter_node_join() {
562 filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
572 template<
typename T,
typename U,
typename Body>
574 return new internal::filter_node_leaf<T,U,Body>(mode, body);
577 template<
typename T,
typename V,
typename U>
579 __TBB_ASSERT(left.root,
"cannot use default-constructed filter_t as left argument of '&'");
580 __TBB_ASSERT(right.root,
"cannot use default-constructed filter_t as right argument of '&'");
581 return new internal::filter_node_join(*left.root,*right.root);
585 template<
typename T,
typename U>
587 typedef internal::filter_node filter_node;
589 filter_t( filter_node* root_ ) : root(root_) {
592 friend class internal::pipeline_proxy;
593 template<
typename T_,
typename U_,
typename Body>
595 template<
typename T_,
typename V_,
typename U_>
601 if( root ) root->add_ref();
603 template<
typename Body>
605 root(
new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
612 filter_node* old = root;
614 if( root ) root->add_ref();
615 if( old ) old->remove_ref();
618 if( root ) root->remove_ref();
623 filter_node* old = root;
630 inline internal::pipeline_proxy::pipeline_proxy(
const filter_t<void,void>& filter_chain ) : my_pipe() {
631 __TBB_ASSERT( filter_chain.root,
"cannot apply parallel_pipeline to default-constructed filter_t" );
632 filter_chain.root->add_to(my_pipe);
635 inline void parallel_pipeline(
size_t max_number_of_live_tokens,
const filter_t<void,void>& filter_chain
636 #
if __TBB_TASK_GROUP_CONTEXT
637 , tbb::task_group_context& context
640 internal::pipeline_proxy pipe(filter_chain);
642 pipe->run(max_number_of_live_tokens
643 #
if __TBB_TASK_GROUP_CONTEXT
649 #if __TBB_TASK_GROUP_CONTEXT 650 inline void parallel_pipeline(
size_t max_number_of_live_tokens,
const filter_t<void,void>& filter_chain) {
651 tbb::task_group_context context;
652 parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
654 #endif // __TBB_TASK_GROUP_CONTEXT 660 using interface6::make_filter;
661 using interface6::parallel_pipeline;
A processing pipeline that applies filters to items.
Definition: pipeline.h:236
virtual void finalize(void *)
Destroys item if pipeline was cancelled.
Definition: pipeline.h:160
static const unsigned char filter_is_serial
The lowest bit 0 is for parallel vs. serial.
Definition: pipeline.h:71
static const unsigned char filter_is_bound
5th bit distinguishes thread-bound and regular filters.
Definition: pipeline.h:79
Class representing a chain of type-safe pipeline filters.
Definition: pipeline.h:586
input_filter control to signal end-of-input for parallel_pipeline
Definition: pipeline.h:311
bool is_serial() const
True if filter is serial.
Definition: pipeline.h:129
bool object_may_be_null()
true if an input filter can emit null
Definition: pipeline.h:144
A stage in a pipeline served by a user thread.
Definition: pipeline.h:197
mode
Definition: pipeline.h:95
Definition: pipeline.h:307
A stage in a pipeline.
Definition: pipeline.h:65
pointer allocate(size_type n, const void *=0)
Allocate space for n objects.
Definition: tbb_allocator.h:100
*/
Definition: material.h:665
bool is_ordered() const
True if filter must receive stream in order.
Definition: pipeline.h:134
Definition: _flow_graph_async_msg_impl.h:32
bool is_bound() const
True if filter is thread-bound.
Definition: pipeline.h:139
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
Definition: tbb_allocator.h:73
void __TBB_EXPORTED_METHOD add_filter(filter &filter_)
Add filter to end of pipeline.