BRE12
concurrent_queue.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB_concurrent_queue_H
22 #define __TBB_concurrent_queue_H
23 
24 #include "internal/_concurrent_queue_impl.h"
25 
26 namespace tbb {
27 
28 namespace strict_ppl {
29 
31 
34 template<typename T, typename A = cache_aligned_allocator<T> >
35 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
36  template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
37 
39  typedef typename A::template rebind<char>::other page_allocator_type;
40  page_allocator_type my_allocator;
41 
43  /*override*/ virtual void *allocate_block( size_t n ) {
44  void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
45  if( !b )
46  internal::throw_exception(internal::eid_bad_alloc);
47  return b;
48  }
49 
51  /*override*/ virtual void deallocate_block( void *b, size_t n ) {
52  my_allocator.deallocate( reinterpret_cast<char*>(b), n );
53  }
54 
55  static void copy_construct_item(T* location, const void* src){
56  new (location) T(*static_cast<const T*>(src));
57  }
58 
59 #if __TBB_CPP11_RVALUE_REF_PRESENT
60  static void move_construct_item(T* location, const void* src) {
61  new (location) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
62  }
63 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
64 public:
66  typedef T value_type;
67 
69  typedef T& reference;
70 
72  typedef const T& const_reference;
73 
75  typedef size_t size_type;
76 
78  typedef ptrdiff_t difference_type;
79 
81  typedef A allocator_type;
82 
84  explicit concurrent_queue(const allocator_type& a = allocator_type()) :
85  my_allocator( a )
86  {
87  }
88 
90  template<typename InputIterator>
91  concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
92  my_allocator( a )
93  {
94  for( ; begin != end; ++begin )
95  this->push(*begin);
96  }
97 
99  concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) :
100  internal::concurrent_queue_base_v3<T>(), my_allocator( a )
101  {
102  this->assign( src, copy_construct_item );
103  }
104 
105 #if __TBB_CPP11_RVALUE_REF_PRESENT
108  internal::concurrent_queue_base_v3<T>(), my_allocator( std::move(src.my_allocator) )
109  {
110  this->internal_swap( src );
111  }
112 
113  concurrent_queue( concurrent_queue&& src, const allocator_type& a ) :
114  internal::concurrent_queue_base_v3<T>(), my_allocator( a )
115  {
116  // checking that memory allocated by one instance of allocator can be deallocated
117  // with another
118  if( my_allocator == src.my_allocator) {
119  this->internal_swap( src );
120  } else {
121  // allocators are different => performing per-element move
122  this->assign( src, move_construct_item );
123  src.clear();
124  }
125  }
126 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
127 
130 
132  void push( const T& source ) {
133  this->internal_push( &source, copy_construct_item );
134  }
135 
136 #if __TBB_CPP11_RVALUE_REF_PRESENT
137  void push( T&& source ) {
138  this->internal_push( &source, move_construct_item );
139  }
140 
141 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
142  template<typename... Arguments>
143  void emplace( Arguments&&... args ) {
144  push( T(std::forward<Arguments>( args )...) );
145  }
146 #endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
147 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
148 
150 
152  bool try_pop( T& result ) {
153  return this->internal_try_pop( &result );
154  }
155 
157  size_type unsafe_size() const {return this->internal_size();}
158 
160  bool empty() const {return this->internal_empty();}
161 
163  void clear() ;
164 
166  allocator_type get_allocator() const { return this->my_allocator; }
167 
168  typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
169  typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
170 
171  //------------------------------------------------------------------------
172  // The iterators are intended only for debugging. They are slow and not thread safe.
173  //------------------------------------------------------------------------
174  iterator unsafe_begin() {return iterator(*this);}
175  iterator unsafe_end() {return iterator();}
176  const_iterator unsafe_begin() const {return const_iterator(*this);}
177  const_iterator unsafe_end() const {return const_iterator();}
178 } ;
179 
180 template<typename T, class A>
182  clear();
183  this->internal_finish_clear();
184 }
185 
186 template<typename T, class A>
188  T value;
189  while( !empty() ) try_pop(value);
190 }
191 
192 } // namespace strict_ppl
193 
195 
200 template<typename T, class A = cache_aligned_allocator<T> >
201 class concurrent_bounded_queue: public internal::concurrent_queue_base_v8 {
202  template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
203 
205  typedef typename A::template rebind<char>::other page_allocator_type;
206  page_allocator_type my_allocator;
207 
208  typedef typename concurrent_queue_base_v3::padded_page<T> padded_page;
209  typedef typename concurrent_queue_base_v3::copy_specifics copy_specifics;
210 
212  class destroyer: internal::no_copy {
213  T& my_value;
214  public:
215  destroyer( T& value ) : my_value(value) {}
216  ~destroyer() {my_value.~T();}
217  };
218 
219  T& get_ref( page& p, size_t index ) {
220  __TBB_ASSERT( index<items_per_page, NULL );
221  return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
222  }
223 
224  /*override*/ virtual void copy_item( page& dst, size_t index, const void* src ) {
225  new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
226  }
227 
228 #if __TBB_CPP11_RVALUE_REF_PRESENT
229  /*override*/ virtual void move_item( page& dst, size_t index, const void* src ) {
230  new( &get_ref(dst,index) ) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
231  }
232 #else
233  /*override*/ virtual void move_item( page&, size_t, const void* ) {
234  __TBB_ASSERT( false, "Unreachable code" );
235  }
236 #endif
237 
238  /*override*/ virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
239  new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
240  }
241 
242 #if __TBB_CPP11_RVALUE_REF_PRESENT
243  /*override*/ virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
244  new( &get_ref(dst,dindex) ) T( std::move(get_ref( const_cast<page&>(src), sindex )) );
245  }
246 #else
247  /*override*/ virtual void move_page_item( page&, size_t, const page&, size_t ) {
248  __TBB_ASSERT( false, "Unreachable code" );
249  }
250 #endif
251 
252  /*override*/ virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) {
253  T& from = get_ref(src,index);
254  destroyer d(from);
255  *static_cast<T*>(dst) = tbb::internal::move( from );
256  }
257 
258  /*override*/ virtual page *allocate_page() {
259  size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
260  page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
261  if( !p )
262  internal::throw_exception(internal::eid_bad_alloc);
263  return p;
264  }
265 
266  /*override*/ virtual void deallocate_page( page *p ) {
267  size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
268  my_allocator.deallocate( reinterpret_cast<char*>(p), n );
269  }
270 
271 public:
273  typedef T value_type;
274 
276  typedef A allocator_type;
277 
279  typedef T& reference;
280 
282  typedef const T& const_reference;
283 
285 
287  typedef std::ptrdiff_t size_type;
288 
290  typedef std::ptrdiff_t difference_type;
291 
293  explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) :
294  concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
295  {
296  }
297 
299  concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type())
300  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
301  {
302  assign( src );
303  }
304 
305 #if __TBB_CPP11_RVALUE_REF_PRESENT
308  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( std::move(src.my_allocator) )
309  {
310  internal_swap( src );
311  }
312 
313  concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a )
314  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
315  {
316  // checking that memory allocated by one instance of allocator can be deallocated
317  // with another
318  if( my_allocator == src.my_allocator) {
319  this->internal_swap( src );
320  } else {
321  // allocators are different => performing per-element move
322  this->move_content( src );
323  src.clear();
324  }
325  }
326 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
327 
329  template<typename InputIterator>
330  concurrent_bounded_queue( InputIterator begin, InputIterator end,
331  const allocator_type& a = allocator_type())
332  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
333  {
334  for( ; begin != end; ++begin )
335  internal_push_if_not_full(&*begin);
336  }
337 
340 
342  void push( const T& source ) {
343  internal_push( &source );
344  }
345 
346 #if __TBB_CPP11_RVALUE_REF_PRESENT
347  void push( T&& source ) {
349  internal_push_move( &source );
350  }
351 
352 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
353  template<typename... Arguments>
354  void emplace( Arguments&&... args ) {
355  push( T(std::forward<Arguments>( args )...) );
356  }
357 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
358 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
359 
361 
362  void pop( T& destination ) {
363  internal_pop( &destination );
364  }
365 
366 #if TBB_USE_EXCEPTIONS
367  void abort() {
369  internal_abort();
370  }
371 #endif
372 
374 
376  bool try_push( const T& source ) {
377  return internal_push_if_not_full( &source );
378  }
379 
380 #if __TBB_CPP11_RVALUE_REF_PRESENT
381 
384  bool try_push( T&& source ) {
385  return internal_push_move_if_not_full( &source );
386  }
387 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
388  template<typename... Arguments>
389  bool try_emplace( Arguments&&... args ) {
390  return try_push( T(std::forward<Arguments>( args )...) );
391  }
392 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
393 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
394 
396 
398  bool try_pop( T& destination ) {
399  return internal_pop_if_present( &destination );
400  }
401 
403 
406  size_type size() const {return internal_size();}
407 
409  bool empty() const {return internal_empty();}
410 
412  size_type capacity() const {
413  return my_capacity;
414  }
415 
417 
419  void set_capacity( size_type new_capacity ) {
420  internal_set_capacity( new_capacity, sizeof(T) );
421  }
422 
424  allocator_type get_allocator() const { return this->my_allocator; }
425 
427  void clear() ;
428 
429  typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
430  typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
431 
432  //------------------------------------------------------------------------
433  // The iterators are intended only for debugging. They are slow and not thread safe.
434  //------------------------------------------------------------------------
435  iterator unsafe_begin() {return iterator(*this);}
436  iterator unsafe_end() {return iterator();}
437  const_iterator unsafe_begin() const {return const_iterator(*this);}
438  const_iterator unsafe_end() const {return const_iterator();}
439 
440 };
441 
442 template<typename T, class A>
444  clear();
445  internal_finish_clear();
446 }
447 
448 template<typename T, class A>
450  T value;
451  while( try_pop(value) ) /*noop*/;
452 }
453 
455 
456 } // namespace tbb
457 
458 #endif /* __TBB_concurrent_queue_H */
std::ptrdiff_t size_type
Integral type for representing size of the queue.
Definition: concurrent_queue.h:287
size_type unsafe_size() const
Return the number of items in the queue; thread unsafe.
Definition: concurrent_queue.h:157
T & reference
Reference type.
Definition: concurrent_queue.h:279
concurrent_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
Definition: concurrent_queue.h:91
size_type size() const
Return number of pushes minus number of pops.
Definition: concurrent_queue.h:406
~concurrent_queue()
Destroy queue.
Definition: concurrent_queue.h:181
concurrent_queue(const allocator_type &a=allocator_type())
Construct empty queue.
Definition: concurrent_queue.h:84
concurrent_bounded_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
Definition: concurrent_queue.h:330
void set_capacity(size_type new_capacity)
Set the capacity.
Definition: concurrent_queue.h:419
concurrent_bounded_queue(const concurrent_bounded_queue &src, const allocator_type &a=allocator_type())
Copy constructor.
Definition: concurrent_queue.h:299
void pop(T &destination)
Dequeue item from head of queue.
Definition: concurrent_queue.h:362
T & reference
Reference type.
Definition: concurrent_queue.h:69
std::ptrdiff_t difference_type
Difference type for iterator.
Definition: concurrent_queue.h:290
void push(const T &source)
Enqueue an item at tail of queue.
Definition: concurrent_queue.h:132
concurrent_bounded_queue(const allocator_type &a=allocator_type())
Construct empty queue.
Definition: concurrent_queue.h:293
A allocator_type
Allocator type.
Definition: concurrent_queue.h:81
const T & const_reference
Const reference type.
Definition: concurrent_queue.h:282
T value_type
Element type in the queue.
Definition: concurrent_queue.h:273
const T & const_reference
Const reference type.
Definition: concurrent_queue.h:72
allocator_type get_allocator() const
Return allocator object.
Definition: concurrent_queue.h:166
A allocator_type
Allocator type.
Definition: concurrent_queue.h:276
~concurrent_bounded_queue()
Destroy queue.
Definition: concurrent_queue.h:443
void push(const T &source)
Enqueue an item at tail of queue.
Definition: concurrent_queue.h:342
ptrdiff_t difference_type
Difference type for iterator.
Definition: concurrent_queue.h:78
A high-performance thread-safe non-blocking concurrent queue.
Definition: concurrent_queue.h:35
T value_type
Element type in the queue.
Definition: concurrent_queue.h:66
bool try_pop(T &destination)
Attempt to dequeue an item from head of queue.
Definition: concurrent_queue.h:398
bool empty() const
Equivalent to size()==0.
Definition: concurrent_queue.h:160
void clear()
clear the queue. not thread-safe.
Definition: concurrent_queue.h:449
concurrent_queue(const concurrent_queue &src, const allocator_type &a=allocator_type())
Copy constructor.
Definition: concurrent_queue.h:99
Definition: _flow_graph_async_msg_impl.h:32
bool try_push(const T &source)
Enqueue an item at tail of queue if queue is not already full.
Definition: concurrent_queue.h:376
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44
size_type capacity() const
Maximum number of allowed elements.
Definition: concurrent_queue.h:412
A high-performance thread-safe blocking concurrent bounded queue.
Definition: concurrent_queue.h:201
bool empty() const
Equivalent to size()<=0.
Definition: concurrent_queue.h:409
allocator_type get_allocator() const
return allocator object
Definition: concurrent_queue.h:424
size_t size_type
Integral type for representing size of the queue.
Definition: concurrent_queue.h:75
void clear()
Clear the queue. not thread-safe.
Definition: concurrent_queue.h:187
bool try_pop(T &result)
Attempt to dequeue an item from head of queue.
Definition: concurrent_queue.h:152