BRE12
_concurrent_queue_impl.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB__concurrent_queue_impl_H
22 #define __TBB__concurrent_queue_impl_H
23 
24 #ifndef __TBB_concurrent_queue_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
28 #include "../tbb_stddef.h"
29 #include "../tbb_machine.h"
30 #include "../atomic.h"
31 #include "../spin_mutex.h"
32 #include "../cache_aligned_allocator.h"
33 #include "../tbb_exception.h"
34 #include "../tbb_profiling.h"
35 #include <new>
36 #include <utility>
37 
38 #if !TBB_USE_EXCEPTIONS && _MSC_VER
39  // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers
40  #pragma warning (push)
41  #pragma warning (disable: 4530)
42 #endif
43 
44 #include <iterator>
45 
46 #if !TBB_USE_EXCEPTIONS && _MSC_VER
47  #pragma warning (pop)
48 #endif
49 
50 namespace tbb {
51 
52 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
53 
54 // forward declaration
55 namespace strict_ppl {
56 template<typename T, typename A> class concurrent_queue;
57 }
58 
59 template<typename T, typename A> class concurrent_bounded_queue;
60 
61 #endif
62 
64 namespace strict_ppl {
65 
67 namespace internal {
68 
69 using namespace tbb::internal;
70 
71 typedef size_t ticket;
72 
73 template<typename T> class micro_queue ;
74 template<typename T> class micro_queue_pop_finalizer ;
75 template<typename T> class concurrent_queue_base_v3;
76 template<typename T> struct concurrent_queue_rep;
77 
79 
82 struct concurrent_queue_rep_base : no_copy {
83  template<typename T> friend class micro_queue;
84  template<typename T> friend class concurrent_queue_base_v3;
85 
86 protected:
88  static const size_t phi = 3;
89 
90 public:
91  // must be power of 2
92  static const size_t n_queue = 8;
93 
95  struct page {
96  page* next;
97  uintptr_t mask;
98  };
99 
100  atomic<ticket> head_counter;
101  char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
102  atomic<ticket> tail_counter;
103  char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
104 
106  size_t items_per_page;
107 
109  size_t item_size;
110 
112  atomic<size_t> n_invalid_entries;
113 
114  char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
115 } ;
116 
117 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
118  return uintptr_t(p)>1;
119 }
120 
122 
125 class concurrent_queue_page_allocator
126 {
127  template<typename T> friend class micro_queue ;
128  template<typename T> friend class micro_queue_pop_finalizer ;
129 protected:
130  virtual ~concurrent_queue_page_allocator() {}
131 private:
132  virtual concurrent_queue_rep_base::page* allocate_page() = 0;
133  virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
134 } ;
135 
136 #if _MSC_VER && !defined(__INTEL_COMPILER)
137 // unary minus operator applied to unsigned type, result still unsigned
138 #pragma warning( push )
139 #pragma warning( disable: 4146 )
140 #endif
141 
143 
145 template<typename T>
146 class micro_queue : no_copy {
147 public:
148  typedef void (*item_constructor_t)(T* location, const void* src);
149 private:
150  typedef concurrent_queue_rep_base::page page;
151 
153  class destroyer: no_copy {
154  T& my_value;
155  public:
156  destroyer( T& value ) : my_value(value) {}
157  ~destroyer() {my_value.~T();}
158  };
159 
160  void copy_item( page& dst, size_t dindex, const void* src, item_constructor_t construct_item ) {
161  construct_item( &get_ref(dst, dindex), src );
162  }
163 
164  void copy_item( page& dst, size_t dindex, const page& src, size_t sindex,
165  item_constructor_t construct_item )
166  {
167  T& src_item = get_ref( const_cast<page&>(src), sindex );
168  construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
169  }
170 
171  void assign_and_destroy_item( void* dst, page& src, size_t index ) {
172  T& from = get_ref(src,index);
173  destroyer d(from);
174  *static_cast<T*>(dst) = tbb::internal::move( from );
175  }
176 
177  void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
178 
179 public:
180  friend class micro_queue_pop_finalizer<T>;
181 
182  struct padded_page: page {
184  padded_page();
186  void operator=( const padded_page& );
188  T last;
189  };
190 
191  static T& get_ref( page& p, size_t index ) {
192  return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
193  }
194 
195  atomic<page*> head_page;
196  atomic<ticket> head_counter;
197 
198  atomic<page*> tail_page;
199  atomic<ticket> tail_counter;
200 
201  spin_mutex page_mutex;
202 
203  void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
204  item_constructor_t construct_item ) ;
205 
206  bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
207 
208  micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base,
209  item_constructor_t construct_item ) ;
210 
211  page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page,
212  size_t end_in_page, ticket& g_index, item_constructor_t construct_item ) ;
213 
214  void invalidate_page_and_rethrow( ticket k ) ;
215 };
216 
217 template<typename T>
218 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
219  for( atomic_backoff b(true);;b.pause() ) {
220  ticket c = counter;
221  if( c==k ) return;
222  else if( c&1 ) {
223  ++rb.n_invalid_entries;
224  throw_exception( eid_bad_last_alloc );
225  }
226  }
227 }
228 
229 template<typename T>
230 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
231  item_constructor_t construct_item )
232 {
233  k &= -concurrent_queue_rep_base::n_queue;
234  page* p = NULL;
235  size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
236  if( !index ) {
237  __TBB_TRY {
238  concurrent_queue_page_allocator& pa = base;
239  p = pa.allocate_page();
240  } __TBB_CATCH (...) {
241  ++base.my_rep->n_invalid_entries;
242  invalidate_page_and_rethrow( k );
243  }
244  p->mask = 0;
245  p->next = NULL;
246  }
247 
248  if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
249  call_itt_notify(acquired, &tail_counter);
250 
251  if( p ) {
252  spin_mutex::scoped_lock lock( page_mutex );
253  page* q = tail_page;
254  if( is_valid_page(q) )
255  q->next = p;
256  else
257  head_page = p;
258  tail_page = p;
259  } else {
260  p = tail_page;
261  }
262 
263  __TBB_TRY {
264  copy_item( *p, index, item, construct_item );
265  // If no exception was thrown, mark item as present.
266  itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
267  call_itt_notify(releasing, &tail_counter);
268  tail_counter += concurrent_queue_rep_base::n_queue;
269  } __TBB_CATCH (...) {
270  ++base.my_rep->n_invalid_entries;
271  call_itt_notify(releasing, &tail_counter);
272  tail_counter += concurrent_queue_rep_base::n_queue;
273  __TBB_RETHROW();
274  }
275 }
276 
277 template<typename T>
278 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
279  k &= -concurrent_queue_rep_base::n_queue;
280  if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
281  call_itt_notify(acquired, &head_counter);
282  if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
283  call_itt_notify(acquired, &tail_counter);
284  page& p = *head_page;
285  __TBB_ASSERT( &p, NULL );
286  size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
287  bool success = false;
288  {
289  micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
290  if( p.mask & uintptr_t(1)<<index ) {
291  success = true;
292  assign_and_destroy_item( dst, p, index );
293  } else {
294  --base.my_rep->n_invalid_entries;
295  }
296  }
297  return success;
298 }
299 
300 template<typename T>
301 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base,
302  item_constructor_t construct_item )
303 {
304  head_counter = src.head_counter;
305  tail_counter = src.tail_counter;
306 
307  const page* srcp = src.head_page;
308  if( is_valid_page(srcp) ) {
309  ticket g_index = head_counter;
310  __TBB_TRY {
311  size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
312  size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
313  size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
314 
315  head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
316  page* cur_page = head_page;
317 
318  if( srcp != src.tail_page ) {
319  for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
320  cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index, construct_item );
321  cur_page = cur_page->next;
322  }
323 
324  __TBB_ASSERT( srcp==src.tail_page, NULL );
325  size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
326  if( last_index==0 ) last_index = base.my_rep->items_per_page;
327 
328  cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
329  cur_page = cur_page->next;
330  }
331  tail_page = cur_page;
332  } __TBB_CATCH (...) {
333  invalidate_page_and_rethrow( g_index );
334  }
335  } else {
336  head_page = tail_page = NULL;
337  }
338  return *this;
339 }
340 
341 template<typename T>
342 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
343  // Append an invalid page at address 1 so that no more pushes are allowed.
344  page* invalid_page = (page*)uintptr_t(1);
345  {
346  spin_mutex::scoped_lock lock( page_mutex );
347  itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
348  page* q = tail_page;
349  if( is_valid_page(q) )
350  q->next = invalid_page;
351  else
352  head_page = invalid_page;
353  tail_page = invalid_page;
354  }
355  __TBB_RETHROW();
356 }
357 
358 template<typename T>
359 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base,
360  const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page,
361  ticket& g_index, item_constructor_t construct_item )
362 {
363  concurrent_queue_page_allocator& pa = base;
364  page* new_page = pa.allocate_page();
365  new_page->next = NULL;
366  new_page->mask = src_page->mask;
367  for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
368  if( new_page->mask & uintptr_t(1)<<begin_in_page )
369  copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
370  return new_page;
371 }
372 
373 template<typename T>
374 class micro_queue_pop_finalizer: no_copy {
375  typedef concurrent_queue_rep_base::page page;
376  ticket my_ticket;
377  micro_queue<T>& my_queue;
378  page* my_page;
379  concurrent_queue_page_allocator& allocator;
380 public:
381  micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
382  my_ticket(k), my_queue(queue), my_page(p), allocator(b)
383  {}
384  ~micro_queue_pop_finalizer() ;
385 };
386 
387 template<typename T>
388 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
389  page* p = my_page;
390  if( is_valid_page(p) ) {
391  spin_mutex::scoped_lock lock( my_queue.page_mutex );
392  page* q = p->next;
393  my_queue.head_page = q;
394  if( !is_valid_page(q) ) {
395  my_queue.tail_page = NULL;
396  }
397  }
398  itt_store_word_with_release(my_queue.head_counter, my_ticket);
399  if( is_valid_page(p) ) {
400  allocator.deallocate_page( p );
401  }
402 }
403 
404 #if _MSC_VER && !defined(__INTEL_COMPILER)
405 #pragma warning( pop )
406 #endif // warning 4146 is back
407 
408 template<typename T> class concurrent_queue_iterator_rep ;
409 template<typename T> class concurrent_queue_iterator_base_v3;
410 
412 
415 template<typename T>
416 struct concurrent_queue_rep : public concurrent_queue_rep_base {
417  micro_queue<T> array[n_queue];
418 
420  static size_t index( ticket k ) {
421  return k*phi%n_queue;
422  }
423 
424  micro_queue<T>& choose( ticket k ) {
425  // The formula here approximates LRU in a cache-oblivious way.
426  return array[index(k)];
427  }
428 };
429 
431 
435 template<typename T>
436 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
437 private:
439  concurrent_queue_rep<T>* my_rep;
440 
441  friend struct concurrent_queue_rep<T>;
442  friend class micro_queue<T>;
443  friend class concurrent_queue_iterator_rep<T>;
444  friend class concurrent_queue_iterator_base_v3<T>;
445 
446 protected:
447  typedef typename concurrent_queue_rep<T>::page page;
448 
449 private:
450  typedef typename micro_queue<T>::padded_page padded_page;
451  typedef typename micro_queue<T>::item_constructor_t item_constructor_t;
452 
453  /* override */ virtual page *allocate_page() {
454  concurrent_queue_rep<T>& r = *my_rep;
455  size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
456  return reinterpret_cast<page*>(allocate_block ( n ));
457  }
458 
459  /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
460  concurrent_queue_rep<T>& r = *my_rep;
461  size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
462  deallocate_block( reinterpret_cast<void*>(p), n );
463  }
464 
466  virtual void *allocate_block( size_t n ) = 0;
467 
469  virtual void deallocate_block( void *p, size_t n ) = 0;
470 
471 protected:
472  concurrent_queue_base_v3();
473 
474  /* override */ virtual ~concurrent_queue_base_v3() {
475 #if TBB_USE_ASSERT
476  size_t nq = my_rep->n_queue;
477  for( size_t i=0; i<nq; i++ )
478  __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
479 #endif /* TBB_USE_ASSERT */
480  cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
481  }
482 
484  void internal_push( const void* src, item_constructor_t construct_item ) {
485  concurrent_queue_rep<T>& r = *my_rep;
486  ticket k = r.tail_counter++;
487  r.choose(k).push( src, k, *this, construct_item );
488  }
489 
491 
492  bool internal_try_pop( void* dst ) ;
493 
495  size_t internal_size() const ;
496 
498  bool internal_empty() const ;
499 
501  /* note that the name may be misleading, but it remains so due to a historical accident. */
502  void internal_finish_clear() ;
503 
505  void internal_throw_exception() const {
506  throw_exception( eid_bad_alloc );
507  }
508 
510  void assign( const concurrent_queue_base_v3& src, item_constructor_t construct_item ) ;
511 
512 #if __TBB_CPP11_RVALUE_REF_PRESENT
513  void internal_swap( concurrent_queue_base_v3& src ) {
515  std::swap( my_rep, src.my_rep );
516  }
517 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
518 };
519 
520 template<typename T>
521 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
522  const size_t item_size = sizeof(T);
523  my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
524  __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
525  __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
526  __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
527  __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
528  memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
529  my_rep->item_size = item_size;
530 #pragma warning(suppress: 6326)
531  my_rep->items_per_page = item_size<= 8 ? 32 :
532  item_size<= 16 ? 16 :
533  item_size<= 32 ? 8 :
534  item_size<= 64 ? 4 :
535  item_size<=128 ? 2 :
536  1;
537 }
538 
539 template<typename T>
540 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
541  concurrent_queue_rep<T>& r = *my_rep;
542  ticket k;
543  do {
544  k = r.head_counter;
545  for(;;) {
546  if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
547  // Queue is empty
548  return false;
549  }
550  // Queue had item with ticket k when we looked. Attempt to get that item.
551  ticket tk=k;
552 #if defined(_MSC_VER) && defined(_Wp64)
553  #pragma warning (push)
554  #pragma warning (disable: 4267)
555 #endif
556  k = r.head_counter.compare_and_swap( tk+1, tk );
557 #if defined(_MSC_VER) && defined(_Wp64)
558  #pragma warning (pop)
559 #endif
560  if( k==tk )
561  break;
562  // Another thread snatched the item, retry.
563  }
564  } while( !r.choose( k ).pop( dst, k, *this ) );
565  return true;
566 }
567 
568 template<typename T>
569 size_t concurrent_queue_base_v3<T>::internal_size() const {
570  concurrent_queue_rep<T>& r = *my_rep;
571  __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
572  ticket hc = r.head_counter;
573  size_t nie = r.n_invalid_entries;
574  ticket tc = r.tail_counter;
575  __TBB_ASSERT( hc!=tc || !nie, NULL );
576  ptrdiff_t sz = tc-hc-nie;
577  return sz<0 ? 0 : size_t(sz);
578 }
579 
580 template<typename T>
581 bool concurrent_queue_base_v3<T>::internal_empty() const {
582  concurrent_queue_rep<T>& r = *my_rep;
583  ticket tc = r.tail_counter;
584  ticket hc = r.head_counter;
585  // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
586  return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
587 }
588 
589 template<typename T>
590 void concurrent_queue_base_v3<T>::internal_finish_clear() {
591  concurrent_queue_rep<T>& r = *my_rep;
592  size_t nq = r.n_queue;
593  for( size_t i=0; i<nq; ++i ) {
594  page* tp = r.array[i].tail_page;
595  if( is_valid_page(tp) ) {
596  __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
597  deallocate_page( tp );
598  r.array[i].tail_page = NULL;
599  } else
600  __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
601  }
602 }
603 
604 template<typename T>
605 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src,
606  item_constructor_t construct_item )
607 {
608  concurrent_queue_rep<T>& r = *my_rep;
609  r.items_per_page = src.my_rep->items_per_page;
610 
611  // copy concurrent_queue_rep data
612  r.head_counter = src.my_rep->head_counter;
613  r.tail_counter = src.my_rep->tail_counter;
614  r.n_invalid_entries = src.my_rep->n_invalid_entries;
615 
616  // copy or move micro_queues
617  for( size_t i = 0; i < r.n_queue; ++i )
618  r.array[i].assign( src.my_rep->array[i], *this, construct_item);
619 
620  __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
621  "the source concurrent queue should not be concurrently modified." );
622 }
623 
624 template<typename Container, typename Value> class concurrent_queue_iterator;
625 
626 template<typename T>
627 class concurrent_queue_iterator_rep: no_assign {
628  typedef typename micro_queue<T>::padded_page padded_page;
629 public:
630  ticket head_counter;
631  const concurrent_queue_base_v3<T>& my_queue;
632  typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
633  concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
634  head_counter(queue.my_rep->head_counter),
635  my_queue(queue)
636  {
637  for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
638  array[k] = queue.my_rep->array[k].head_page;
639  }
640 
642  bool get_item( T*& item, size_t k ) ;
643 };
644 
645 template<typename T>
646 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
647  if( k==my_queue.my_rep->tail_counter ) {
648  item = NULL;
649  return true;
650  } else {
651  typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
652  __TBB_ASSERT(p,NULL);
653  size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page );
654  item = &micro_queue<T>::get_ref(*p,i);
655  return (p->mask & uintptr_t(1)<<i)!=0;
656  }
657 }
658 
660 
661 template<typename Value>
662 class concurrent_queue_iterator_base_v3 : no_assign {
664 
665  concurrent_queue_iterator_rep<Value>* my_rep;
666 
667  template<typename C, typename T, typename U>
668  friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
669 
670  template<typename C, typename T, typename U>
671  friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
672 protected:
674  Value* my_item;
675 
677  concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
678 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
679  __TBB_compiler_fence();
680 #endif
681  }
682 
684  concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
685  : no_assign(), my_rep(NULL), my_item(NULL) {
686  assign(i);
687  }
688 
690  concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
691 
693  void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
694 
696  void advance() ;
697 
699  ~concurrent_queue_iterator_base_v3() {
700  cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
701  my_rep = NULL;
702  }
703 };
704 
705 template<typename Value>
706 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
707  my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
708  new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
709  size_t k = my_rep->head_counter;
710  if( !my_rep->get_item(my_item, k) ) advance();
711 }
712 
713 template<typename Value>
714 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
715  if( my_rep!=other.my_rep ) {
716  if( my_rep ) {
717  cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
718  my_rep = NULL;
719  }
720  if( other.my_rep ) {
721  my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
722  new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
723  }
724  }
725  my_item = other.my_item;
726 }
727 
728 template<typename Value>
729 void concurrent_queue_iterator_base_v3<Value>::advance() {
730  __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
731  size_t k = my_rep->head_counter;
732  const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
733 #if TBB_USE_ASSERT
734  Value* tmp;
735  my_rep->get_item(tmp,k);
736  __TBB_ASSERT( my_item==tmp, NULL );
737 #endif /* TBB_USE_ASSERT */
738  size_t i = modulo_power_of_two( k/concurrent_queue_rep<Value>::n_queue, queue.my_rep->items_per_page );
739  if( i==queue.my_rep->items_per_page-1 ) {
740  typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
741  root = root->next;
742  }
743  // advance k
744  my_rep->head_counter = ++k;
745  if( !my_rep->get_item(my_item, k) ) advance();
746 }
747 
749 
750 template<typename T> struct tbb_remove_cv {typedef T type;};
751 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
752 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
753 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
754 
756 
758 template<typename Container, typename Value>
759 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
760  public std::iterator<std::forward_iterator_tag,Value> {
761 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
762  template<typename T, class A>
763  friend class ::tbb::strict_ppl::concurrent_queue;
764 #else
765 public:
766 #endif
767  concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
769  concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
770  {
771  }
772 
773 public:
774  concurrent_queue_iterator() {}
775 
776  concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
777  concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
778  {}
779 
781  concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
782  this->assign(other);
783  return *this;
784  }
785 
787  Value& operator*() const {
788  return *static_cast<Value*>(this->my_item);
789  }
790 
791  Value* operator->() const {return &operator*();}
792 
794  concurrent_queue_iterator& operator++() {
795  this->advance();
796  return *this;
797  }
798 
800  Value* operator++(int) {
801  Value* result = &operator*();
802  operator++();
803  return result;
804  }
805 }; // concurrent_queue_iterator
806 
807 
808 template<typename C, typename T, typename U>
809 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
810  return i.my_item==j.my_item;
811 }
812 
813 template<typename C, typename T, typename U>
814 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
815  return i.my_item!=j.my_item;
816 }
817 
818 } // namespace internal
819 
821 
822 } // namespace strict_ppl
823 
825 namespace internal {
826 
827 class concurrent_queue_rep;
828 class concurrent_queue_iterator_rep;
829 class concurrent_queue_iterator_base_v3;
830 template<typename Container, typename Value> class concurrent_queue_iterator;
831 
833 
835 class concurrent_queue_base_v3: no_copy {
836 private:
838  concurrent_queue_rep* my_rep;
839 
840  friend class concurrent_queue_rep;
841  friend struct micro_queue;
842  friend class micro_queue_pop_finalizer;
843  friend class concurrent_queue_iterator_rep;
844  friend class concurrent_queue_iterator_base_v3;
845 protected:
847  struct page {
848  page* next;
849  uintptr_t mask;
850  };
851 
853  ptrdiff_t my_capacity;
854 
856  size_t items_per_page;
857 
859  size_t item_size;
860 
861  enum copy_specifics { copy, move };
862 
863 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
864 public:
865 #endif
866  template<typename T>
867  struct padded_page: page {
869  padded_page();
871  void operator=( const padded_page& );
873  T last;
874  };
875 
876 private:
877  virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
878  virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
879 protected:
880  __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
881  virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
882 
884  void __TBB_EXPORTED_METHOD internal_push( const void* src );
885 
887  void __TBB_EXPORTED_METHOD internal_pop( void* dst );
888 
890  void __TBB_EXPORTED_METHOD internal_abort();
891 
893  bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
894 
896 
897  bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
898 
900  ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
901 
903  bool __TBB_EXPORTED_METHOD internal_empty() const;
904 
906  void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
907 
909  virtual page *allocate_page() = 0;
910 
912  virtual void deallocate_page( page *p ) = 0;
913 
915  /* note that the name may be misleading, but it remains so due to a historical accident. */
916  void __TBB_EXPORTED_METHOD internal_finish_clear() ;
917 
919  void __TBB_EXPORTED_METHOD internal_throw_exception() const;
920 
922  void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
923 
924 #if __TBB_CPP11_RVALUE_REF_PRESENT
925  void internal_swap( concurrent_queue_base_v3& src ) {
927  std::swap( my_capacity, src.my_capacity );
928  std::swap( items_per_page, src.items_per_page );
929  std::swap( item_size, src.item_size );
930  std::swap( my_rep, src.my_rep );
931  }
932 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
933 
935  void internal_insert_item( const void* src, copy_specifics op_type );
936 
938  bool internal_insert_if_not_full( const void* src, copy_specifics op_type );
939 
941  void internal_assign( const concurrent_queue_base_v3& src, copy_specifics op_type );
942 private:
943  virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
944 };
945 
947 
949 class concurrent_queue_base_v8: public concurrent_queue_base_v3 {
950 protected:
951  concurrent_queue_base_v8( size_t item_sz ) : concurrent_queue_base_v3( item_sz ) {}
952 
954  void __TBB_EXPORTED_METHOD move_content( concurrent_queue_base_v8& src ) ;
955 
957  bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full( const void* src );
958 
960  void __TBB_EXPORTED_METHOD internal_push_move( const void* src );
961 private:
962  friend struct micro_queue;
963  virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
964  virtual void move_item( page& dst, size_t index, const void* src ) = 0;
965 };
966 
968 
969 class concurrent_queue_iterator_base_v3 {
971 
972  concurrent_queue_iterator_rep* my_rep;
973 
974  template<typename C, typename T, typename U>
975  friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
976 
977  template<typename C, typename T, typename U>
978  friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
979 
980  void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
981 protected:
983  void* my_item;
984 
986  concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
987 
989  concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
990  assign(i);
991  }
992 
994 
995  __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
996 
998  __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
999 
1001  void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
1002 
1004  void __TBB_EXPORTED_METHOD advance();
1005 
1007  __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
1008 };
1009 
1010 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
1011 
1013 
1015 template<typename Container, typename Value>
1016 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
1017  public std::iterator<std::forward_iterator_tag,Value> {
1018 
1019 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1020  template<typename T, class A>
1021  friend class ::tbb::concurrent_bounded_queue;
1022 #else
1023 public:
1024 #endif
1025 
1027  concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
1028  concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
1029  {
1030  }
1031 
1032 public:
1033  concurrent_queue_iterator() {}
1034 
1037  concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
1038  concurrent_queue_iterator_base_v3(other)
1039  {}
1040 
1042  concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
1043  assign(other);
1044  return *this;
1045  }
1046 
1048  Value& operator*() const {
1049  return *static_cast<Value*>(my_item);
1050  }
1051 
1052  Value* operator->() const {return &operator*();}
1053 
1055  concurrent_queue_iterator& operator++() {
1056  advance();
1057  return *this;
1058  }
1059 
1061  Value* operator++(int) {
1062  Value* result = &operator*();
1063  operator++();
1064  return result;
1065  }
1066 }; // concurrent_queue_iterator
1067 
1068 
1069 template<typename C, typename T, typename U>
1070 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1071  return i.my_item==j.my_item;
1072 }
1073 
1074 template<typename C, typename T, typename U>
1075 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1076  return i.my_item!=j.my_item;
1077 }
1078 
1079 } // namespace internal;
1080 
1082 
1083 } // namespace tbb
1084 
1085 #endif /* __TBB__concurrent_queue_impl_H */
Definition: atomic.h:535
Definition: _flow_graph_async_msg_impl.h:32
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44