BRE12
parallel_while.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB_parallel_while
22 #define __TBB_parallel_while
23 
24 #include "task.h"
25 #include <new>
26 
27 namespace tbb {
28 
29 template<typename Body>
31 
33 namespace internal {
34 
35  template<typename Stream, typename Body> class while_task;
36 
38 
40  template<typename Body>
41  class while_iteration_task: public task {
42  const Body& my_body;
43  typename Body::argument_type my_value;
44  /*override*/ task* execute() {
45  my_body(my_value);
46  return NULL;
47  }
48  while_iteration_task( const typename Body::argument_type& value, const Body& body ) :
49  my_body(body), my_value(value)
50  {}
51  template<typename Body_> friend class while_group_task;
52  friend class tbb::parallel_while<Body>;
53  };
54 
56 
58  template<typename Body>
59  class while_group_task: public task {
60  static const size_t max_arg_size = 4;
61  const Body& my_body;
62  size_t size;
63  typename Body::argument_type my_arg[max_arg_size];
64  while_group_task( const Body& body ) : my_body(body), size(0) {}
65  /*override*/ task* execute() {
66  typedef while_iteration_task<Body> iteration_type;
67  __TBB_ASSERT( size>0, NULL );
68  task_list list;
69  task* t;
70  size_t k=0;
71  for(;;) {
72  t = new( allocate_child() ) iteration_type(my_arg[k],my_body);
73  if( ++k==size ) break;
74  list.push_back(*t);
75  }
76  set_ref_count(int(k+1));
77  spawn(list);
78  spawn_and_wait_for_all(*t);
79  return NULL;
80  }
81  template<typename Stream, typename Body_> friend class while_task;
82  };
83 
85 
87  template<typename Stream, typename Body>
88  class while_task: public task {
89  Stream& my_stream;
90  const Body& my_body;
91  empty_task& my_barrier;
92  /*override*/ task* execute() {
93  typedef while_group_task<Body> block_type;
94  block_type& t = *new( allocate_additional_child_of(my_barrier) ) block_type(my_body);
95  size_t k=0;
96  while( my_stream.pop_if_present(t.my_arg[k]) ) {
97  if( ++k==block_type::max_arg_size ) {
98  // There might be more iterations.
99  recycle_to_reexecute();
100  break;
101  }
102  }
103  if( k==0 ) {
104  destroy(t);
105  return NULL;
106  } else {
107  t.size = k;
108  return &t;
109  }
110  }
111  while_task( Stream& stream, const Body& body, empty_task& barrier ) :
112  my_stream(stream),
113  my_body(body),
114  my_barrier(barrier)
115  {}
116  friend class tbb::parallel_while<Body>;
117  };
118 
119 } // namespace internal
121 
123 
128 template<typename Body>
129 class parallel_while: internal::no_copy {
130 public:
132  parallel_while() : my_body(NULL), my_barrier(NULL) {}
133 
136  if( my_barrier ) {
137  my_barrier->destroy(*my_barrier);
138  my_barrier = NULL;
139  }
140  }
141 
143  typedef typename Body::argument_type value_type;
144 
146 
149  template<typename Stream>
150  void run( Stream& stream, const Body& body );
151 
153 
154  void add( const value_type& item );
155 
156 private:
157  const Body* my_body;
158  empty_task* my_barrier;
159 };
160 
161 template<typename Body>
162 template<typename Stream>
163 void parallel_while<Body>::run( Stream& stream, const Body& body ) {
164  using namespace internal;
165  empty_task& barrier = *new( task::allocate_root() ) empty_task();
166  my_body = &body;
167  my_barrier = &barrier;
168  my_barrier->set_ref_count(2);
169  while_task<Stream,Body>& w = *new( my_barrier->allocate_child() ) while_task<Stream,Body>( stream, body, barrier );
170  my_barrier->spawn_and_wait_for_all(w);
171  my_barrier->destroy(*my_barrier);
172  my_barrier = NULL;
173  my_body = NULL;
174 }
175 
176 template<typename Body>
178  __TBB_ASSERT(my_barrier,"attempt to add to parallel_while that is not running");
179  typedef internal::while_iteration_task<Body> iteration_type;
180  iteration_type& i = *new( task::allocate_additional_child_of(*my_barrier) ) iteration_type(item,*my_body);
181  task::self().spawn( i );
182 }
183 
184 } // namespace
185 
186 #endif /* __TBB_parallel_while */
~parallel_while()
Destructor cleans up data members before returning.
Definition: parallel_while.h:135
void add(const value_type &item)
Add a work item while running.
Definition: parallel_while.h:177
Parallel iteration over a stream, with optional addition of more work.
Definition: parallel_while.h:30
Body::argument_type value_type
Type of items.
Definition: parallel_while.h:143
parallel_while()
Construct empty non-running parallel while.
Definition: parallel_while.h:132
void run(Stream &stream, const Body &body)
Apply body.apply to each item in the stream.
Definition: parallel_while.h:163
Definition: _flow_graph_async_msg_impl.h:32
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44