21 #ifndef __TBB__concurrent_queue_impl_H 22 #define __TBB__concurrent_queue_impl_H 24 #ifndef __TBB_concurrent_queue_H 25 #error Do not #include this internal file directly; use public TBB headers instead. 28 #include "../tbb_stddef.h" 29 #include "../tbb_machine.h" 30 #include "../atomic.h" 31 #include "../spin_mutex.h" 32 #include "../cache_aligned_allocator.h" 33 #include "../tbb_exception.h" 34 #include "../tbb_profiling.h" 38 #if !TBB_USE_EXCEPTIONS && _MSC_VER 40 #pragma warning (push) 41 #pragma warning (disable: 4530) 46 #if !TBB_USE_EXCEPTIONS && _MSC_VER 52 #if !__TBB_TEMPLATE_FRIENDS_BROKEN 55 namespace strict_ppl {
56 template<
typename T,
typename A>
class concurrent_queue;
59 template<
typename T,
typename A>
class concurrent_bounded_queue;
64 namespace strict_ppl {
71 typedef size_t ticket;
73 template<
typename T>
class micro_queue ;
74 template<
typename T>
class micro_queue_pop_finalizer ;
75 template<
typename T>
class concurrent_queue_base_v3;
76 template<
typename T>
struct concurrent_queue_rep;
82 struct concurrent_queue_rep_base : no_copy {
83 template<
typename T>
friend class micro_queue;
84 template<
typename T>
friend class concurrent_queue_base_v3;
88 static const size_t phi = 3;
92 static const size_t n_queue = 8;
100 atomic<ticket> head_counter;
101 char pad1[NFS_MaxLineSize-
sizeof(atomic<ticket>)];
102 atomic<ticket> tail_counter;
103 char pad2[NFS_MaxLineSize-
sizeof(atomic<ticket>)];
106 size_t items_per_page;
112 atomic<size_t> n_invalid_entries;
114 char pad3[NFS_MaxLineSize-
sizeof(size_t)-
sizeof(
size_t)-
sizeof(atomic<size_t>)];
117 inline bool is_valid_page(
const concurrent_queue_rep_base::page* p) {
118 return uintptr_t(p)>1;
125 class concurrent_queue_page_allocator
127 template<
typename T>
friend class micro_queue ;
128 template<
typename T>
friend class micro_queue_pop_finalizer ;
130 virtual ~concurrent_queue_page_allocator() {}
132 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
133 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
136 #if _MSC_VER && !defined(__INTEL_COMPILER) 138 #pragma warning( push ) 139 #pragma warning( disable: 4146 ) 146 class micro_queue : no_copy {
148 typedef void (*item_constructor_t)(T* location,
const void* src);
150 typedef concurrent_queue_rep_base::page page;
153 class destroyer: no_copy {
156 destroyer( T& value ) : my_value(value) {}
157 ~destroyer() {my_value.~T();}
160 void copy_item( page& dst,
size_t dindex,
const void* src, item_constructor_t construct_item ) {
161 construct_item( &get_ref(dst, dindex), src );
164 void copy_item( page& dst,
size_t dindex,
const page& src,
size_t sindex,
165 item_constructor_t construct_item )
167 T& src_item = get_ref( const_cast<page&>(src), sindex );
168 construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
171 void assign_and_destroy_item(
void* dst, page& src,
size_t index ) {
172 T& from = get_ref(src,index);
174 *
static_cast<T*
>(dst) = tbb::internal::move( from );
177 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb )
const ;
180 friend class micro_queue_pop_finalizer<T>;
182 struct padded_page: page {
186 void operator=(
const padded_page& );
191 static T& get_ref( page& p,
size_t index ) {
192 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
195 atomic<page*> head_page;
196 atomic<ticket> head_counter;
198 atomic<page*> tail_page;
199 atomic<ticket> tail_counter;
201 spin_mutex page_mutex;
203 void push(
const void* item, ticket k, concurrent_queue_base_v3<T>& base,
204 item_constructor_t construct_item ) ;
206 bool pop(
void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
208 micro_queue& assign(
const micro_queue& src, concurrent_queue_base_v3<T>& base,
209 item_constructor_t construct_item ) ;
211 page* make_copy( concurrent_queue_base_v3<T>& base,
const page* src_page,
size_t begin_in_page,
212 size_t end_in_page, ticket& g_index, item_constructor_t construct_item ) ;
214 void invalidate_page_and_rethrow( ticket k ) ;
218 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb )
const {
219 for( atomic_backoff b(
true);;b.pause() ) {
223 ++rb.n_invalid_entries;
224 throw_exception( eid_bad_last_alloc );
230 void micro_queue<T>::push(
const void* item, ticket k, concurrent_queue_base_v3<T>& base,
231 item_constructor_t construct_item )
233 k &= -concurrent_queue_rep_base::n_queue;
235 size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
238 concurrent_queue_page_allocator& pa = base;
239 p = pa.allocate_page();
240 } __TBB_CATCH (...) {
241 ++base.my_rep->n_invalid_entries;
242 invalidate_page_and_rethrow( k );
248 if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
249 call_itt_notify(acquired, &tail_counter);
252 spin_mutex::scoped_lock lock( page_mutex );
254 if( is_valid_page(q) )
264 copy_item( *p, index, item, construct_item );
266 itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
267 call_itt_notify(releasing, &tail_counter);
268 tail_counter += concurrent_queue_rep_base::n_queue;
269 } __TBB_CATCH (...) {
270 ++base.my_rep->n_invalid_entries;
271 call_itt_notify(releasing, &tail_counter);
272 tail_counter += concurrent_queue_rep_base::n_queue;
278 bool micro_queue<T>::pop(
void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
279 k &= -concurrent_queue_rep_base::n_queue;
280 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
281 call_itt_notify(acquired, &head_counter);
282 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
283 call_itt_notify(acquired, &tail_counter);
284 page& p = *head_page;
285 __TBB_ASSERT( &p, NULL );
286 size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
287 bool success =
false;
289 micro_queue_pop_finalizer<T> finalizer( *
this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
290 if( p.mask & uintptr_t(1)<<index ) {
292 assign_and_destroy_item( dst, p, index );
294 --base.my_rep->n_invalid_entries;
301 micro_queue<T>& micro_queue<T>::assign(
const micro_queue<T>& src, concurrent_queue_base_v3<T>& base,
302 item_constructor_t construct_item )
304 head_counter = src.head_counter;
305 tail_counter = src.tail_counter;
307 const page* srcp = src.head_page;
308 if( is_valid_page(srcp) ) {
309 ticket g_index = head_counter;
311 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
312 size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
313 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
315 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
316 page* cur_page = head_page;
318 if( srcp != src.tail_page ) {
319 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
320 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index, construct_item );
321 cur_page = cur_page->next;
324 __TBB_ASSERT( srcp==src.tail_page, NULL );
325 size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
326 if( last_index==0 ) last_index = base.my_rep->items_per_page;
328 cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
329 cur_page = cur_page->next;
331 tail_page = cur_page;
332 } __TBB_CATCH (...) {
333 invalidate_page_and_rethrow( g_index );
336 head_page = tail_page = NULL;
342 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
344 page* invalid_page = (page*)uintptr_t(1);
346 spin_mutex::scoped_lock lock( page_mutex );
347 itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
349 if( is_valid_page(q) )
350 q->next = invalid_page;
352 head_page = invalid_page;
353 tail_page = invalid_page;
359 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base,
360 const concurrent_queue_rep_base::page* src_page,
size_t begin_in_page,
size_t end_in_page,
361 ticket& g_index, item_constructor_t construct_item )
363 concurrent_queue_page_allocator& pa = base;
364 page* new_page = pa.allocate_page();
365 new_page->next = NULL;
366 new_page->mask = src_page->mask;
367 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
368 if( new_page->mask & uintptr_t(1)<<begin_in_page )
369 copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
374 class micro_queue_pop_finalizer: no_copy {
375 typedef concurrent_queue_rep_base::page page;
377 micro_queue<T>& my_queue;
379 concurrent_queue_page_allocator& allocator;
381 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
382 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
384 ~micro_queue_pop_finalizer() ;
388 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
390 if( is_valid_page(p) ) {
391 spin_mutex::scoped_lock lock( my_queue.page_mutex );
393 my_queue.head_page = q;
394 if( !is_valid_page(q) ) {
395 my_queue.tail_page = NULL;
398 itt_store_word_with_release(my_queue.head_counter, my_ticket);
399 if( is_valid_page(p) ) {
400 allocator.deallocate_page( p );
404 #if _MSC_VER && !defined(__INTEL_COMPILER) 405 #pragma warning( pop ) 406 #endif // warning 4146 is back 408 template<
typename T>
class concurrent_queue_iterator_rep ;
409 template<
typename T>
class concurrent_queue_iterator_base_v3;
416 struct concurrent_queue_rep :
public concurrent_queue_rep_base {
417 micro_queue<T> array[n_queue];
420 static size_t index( ticket k ) {
421 return k*phi%n_queue;
424 micro_queue<T>& choose( ticket k ) {
426 return array[index(k)];
436 class concurrent_queue_base_v3:
public concurrent_queue_page_allocator {
439 concurrent_queue_rep<T>* my_rep;
441 friend struct concurrent_queue_rep<T>;
442 friend class micro_queue<T>;
443 friend class concurrent_queue_iterator_rep<T>;
444 friend class concurrent_queue_iterator_base_v3<T>;
447 typedef typename concurrent_queue_rep<T>::page page;
450 typedef typename micro_queue<T>::padded_page padded_page;
451 typedef typename micro_queue<T>::item_constructor_t item_constructor_t;
453 virtual page *allocate_page() {
454 concurrent_queue_rep<T>& r = *my_rep;
455 size_t n =
sizeof(padded_page) + (r.items_per_page-1)*
sizeof(T);
456 return reinterpret_cast<page*
>(allocate_block ( n ));
459 virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
460 concurrent_queue_rep<T>& r = *my_rep;
461 size_t n =
sizeof(padded_page) + (r.items_per_page-1)*
sizeof(T);
462 deallocate_block( reinterpret_cast<void*>(p), n );
466 virtual void *allocate_block(
size_t n ) = 0;
469 virtual void deallocate_block(
void *p,
size_t n ) = 0;
472 concurrent_queue_base_v3();
474 virtual ~concurrent_queue_base_v3() {
476 size_t nq = my_rep->n_queue;
477 for(
size_t i=0; i<nq; i++ )
478 __TBB_ASSERT( my_rep->array[i].tail_page==NULL,
"pages were not freed properly" );
480 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
484 void internal_push(
const void* src, item_constructor_t construct_item ) {
485 concurrent_queue_rep<T>& r = *my_rep;
486 ticket k = r.tail_counter++;
487 r.choose(k).push( src, k, *
this, construct_item );
492 bool internal_try_pop(
void* dst ) ;
495 size_t internal_size()
const ;
498 bool internal_empty()
const ;
502 void internal_finish_clear() ;
505 void internal_throw_exception()
const {
506 throw_exception( eid_bad_alloc );
510 void assign(
const concurrent_queue_base_v3& src, item_constructor_t construct_item ) ;
512 #if __TBB_CPP11_RVALUE_REF_PRESENT 513 void internal_swap( concurrent_queue_base_v3& src ) {
515 std::swap( my_rep, src.my_rep );
521 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
522 const size_t item_size =
sizeof(T);
523 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
524 __TBB_ASSERT( (
size_t)my_rep % NFS_GetLineSize()==0,
"alignment error" );
525 __TBB_ASSERT( (
size_t)&my_rep->head_counter % NFS_GetLineSize()==0,
"alignment error" );
526 __TBB_ASSERT( (
size_t)&my_rep->tail_counter % NFS_GetLineSize()==0,
"alignment error" );
527 __TBB_ASSERT( (
size_t)&my_rep->array % NFS_GetLineSize()==0,
"alignment error" );
528 memset(my_rep,0,
sizeof(concurrent_queue_rep<T>));
529 my_rep->item_size = item_size;
530 #pragma warning(suppress: 6326) 531 my_rep->items_per_page = item_size<= 8 ? 32 :
532 item_size<= 16 ? 16 :
540 bool concurrent_queue_base_v3<T>::internal_try_pop(
void* dst ) {
541 concurrent_queue_rep<T>& r = *my_rep;
546 if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
552 #if defined(_MSC_VER) && defined(_Wp64) 553 #pragma warning (push) 554 #pragma warning (disable: 4267) 556 k = r.head_counter.compare_and_swap( tk+1, tk );
557 #if defined(_MSC_VER) && defined(_Wp64) 558 #pragma warning (pop) 564 }
while( !r.choose( k ).pop( dst, k, *
this ) );
569 size_t concurrent_queue_base_v3<T>::internal_size()
const {
570 concurrent_queue_rep<T>& r = *my_rep;
571 __TBB_ASSERT(
sizeof(ptrdiff_t)<=
sizeof(
size_t), NULL );
572 ticket hc = r.head_counter;
573 size_t nie = r.n_invalid_entries;
574 ticket tc = r.tail_counter;
575 __TBB_ASSERT( hc!=tc || !nie, NULL );
576 ptrdiff_t sz = tc-hc-nie;
577 return sz<0 ? 0 : size_t(sz);
581 bool concurrent_queue_base_v3<T>::internal_empty()
const {
582 concurrent_queue_rep<T>& r = *my_rep;
583 ticket tc = r.tail_counter;
584 ticket hc = r.head_counter;
586 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
590 void concurrent_queue_base_v3<T>::internal_finish_clear() {
591 concurrent_queue_rep<T>& r = *my_rep;
592 size_t nq = r.n_queue;
593 for(
size_t i=0; i<nq; ++i ) {
594 page* tp = r.array[i].tail_page;
595 if( is_valid_page(tp) ) {
596 __TBB_ASSERT( r.array[i].head_page==tp,
"at most one page should remain" );
597 deallocate_page( tp );
598 r.array[i].tail_page = NULL;
600 __TBB_ASSERT( !is_valid_page(r.array[i].head_page),
"head page pointer corrupt?" );
605 void concurrent_queue_base_v3<T>::assign(
const concurrent_queue_base_v3& src,
606 item_constructor_t construct_item )
608 concurrent_queue_rep<T>& r = *my_rep;
609 r.items_per_page = src.my_rep->items_per_page;
612 r.head_counter = src.my_rep->head_counter;
613 r.tail_counter = src.my_rep->tail_counter;
614 r.n_invalid_entries = src.my_rep->n_invalid_entries;
617 for(
size_t i = 0; i < r.n_queue; ++i )
618 r.array[i].assign( src.my_rep->array[i], *
this, construct_item);
620 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
621 "the source concurrent queue should not be concurrently modified." );
624 template<
typename Container,
typename Value>
class concurrent_queue_iterator;
627 class concurrent_queue_iterator_rep: no_assign {
628 typedef typename micro_queue<T>::padded_page padded_page;
631 const concurrent_queue_base_v3<T>& my_queue;
632 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
633 concurrent_queue_iterator_rep(
const concurrent_queue_base_v3<T>& queue ) :
634 head_counter(queue.my_rep->head_counter),
637 for(
size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
638 array[k] = queue.my_rep->array[k].head_page;
642 bool get_item( T*& item,
size_t k ) ;
646 bool concurrent_queue_iterator_rep<T>::get_item( T*& item,
size_t k ) {
647 if( k==my_queue.my_rep->tail_counter ) {
651 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
652 __TBB_ASSERT(p,NULL);
653 size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page );
654 item = µ_queue<T>::get_ref(*p,i);
655 return (p->mask & uintptr_t(1)<<i)!=0;
661 template<
typename Value>
662 class concurrent_queue_iterator_base_v3 : no_assign {
665 concurrent_queue_iterator_rep<Value>* my_rep;
667 template<
typename C,
typename T,
typename U>
668 friend bool operator==(
const concurrent_queue_iterator<C,T>& i,
const concurrent_queue_iterator<C,U>& j );
670 template<
typename C,
typename T,
typename U>
671 friend bool operator!=(
const concurrent_queue_iterator<C,T>& i,
const concurrent_queue_iterator<C,U>& j );
677 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
678 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN 679 __TBB_compiler_fence();
684 concurrent_queue_iterator_base_v3(
const concurrent_queue_iterator_base_v3& i )
685 : no_assign(), my_rep(NULL), my_item(NULL) {
690 concurrent_queue_iterator_base_v3(
const concurrent_queue_base_v3<Value>& queue ) ;
693 void assign(
const concurrent_queue_iterator_base_v3<Value>& other ) ;
699 ~concurrent_queue_iterator_base_v3() {
700 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
705 template<
typename Value>
706 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3(
const concurrent_queue_base_v3<Value>& queue ) {
707 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
708 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
709 size_t k = my_rep->head_counter;
710 if( !my_rep->get_item(my_item, k) ) advance();
713 template<
typename Value>
714 void concurrent_queue_iterator_base_v3<Value>::assign(
const concurrent_queue_iterator_base_v3<Value>& other ) {
715 if( my_rep!=other.my_rep ) {
717 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
721 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
722 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
725 my_item = other.my_item;
728 template<
typename Value>
729 void concurrent_queue_iterator_base_v3<Value>::advance() {
730 __TBB_ASSERT( my_item,
"attempt to increment iterator past end of queue" );
731 size_t k = my_rep->head_counter;
732 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
735 my_rep->get_item(tmp,k);
736 __TBB_ASSERT( my_item==tmp, NULL );
738 size_t i = modulo_power_of_two( k/concurrent_queue_rep<Value>::n_queue, queue.my_rep->items_per_page );
739 if( i==queue.my_rep->items_per_page-1 ) {
740 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
744 my_rep->head_counter = ++k;
745 if( !my_rep->get_item(my_item, k) ) advance();
750 template<
typename T>
struct tbb_remove_cv {
typedef T type;};
751 template<
typename T>
struct tbb_remove_cv<const T> {
typedef T type;};
752 template<
typename T>
struct tbb_remove_cv<volatile T> {
typedef T type;};
753 template<
typename T>
struct tbb_remove_cv<const volatile T> {
typedef T type;};
758 template<
typename Container,
typename Value>
759 class concurrent_queue_iterator:
public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
760 public std::iterator<std::forward_iterator_tag,Value> {
761 #if !__TBB_TEMPLATE_FRIENDS_BROKEN 762 template<
typename T,
class A>
763 friend class ::tbb::strict_ppl::concurrent_queue;
767 concurrent_queue_iterator(
const concurrent_queue_base_v3<Value>& queue ) :
769 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
774 concurrent_queue_iterator() {}
776 concurrent_queue_iterator(
const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
777 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
781 concurrent_queue_iterator& operator=(
const concurrent_queue_iterator& other ) {
787 Value& operator*()
const {
788 return *
static_cast<Value*
>(this->my_item);
791 Value* operator->()
const {
return &operator*();}
794 concurrent_queue_iterator& operator++() {
800 Value* operator++(
int) {
801 Value* result = &operator*();
808 template<
typename C,
typename T,
typename U>
809 bool operator==(
const concurrent_queue_iterator<C,T>& i,
const concurrent_queue_iterator<C,U>& j ) {
810 return i.my_item==j.my_item;
813 template<
typename C,
typename T,
typename U>
814 bool operator!=(
const concurrent_queue_iterator<C,T>& i,
const concurrent_queue_iterator<C,U>& j ) {
815 return i.my_item!=j.my_item;
827 class concurrent_queue_rep;
828 class concurrent_queue_iterator_rep;
829 class concurrent_queue_iterator_base_v3;
830 template<
typename Container,
typename Value>
class concurrent_queue_iterator;
835 class concurrent_queue_base_v3: no_copy {
838 concurrent_queue_rep* my_rep;
840 friend class concurrent_queue_rep;
841 friend struct micro_queue;
842 friend class micro_queue_pop_finalizer;
843 friend class concurrent_queue_iterator_rep;
844 friend class concurrent_queue_iterator_base_v3;
853 ptrdiff_t my_capacity;
856 size_t items_per_page;
861 enum copy_specifics { copy, move };
863 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN 867 struct padded_page: page {
871 void operator=(
const padded_page& );
877 virtual void copy_item( page& dst,
size_t index,
const void* src ) = 0;
878 virtual void assign_and_destroy_item(
void* dst, page& src,
size_t index ) = 0;
880 __TBB_EXPORTED_METHOD concurrent_queue_base_v3(
size_t item_size );
881 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
884 void __TBB_EXPORTED_METHOD internal_push(
const void* src );
887 void __TBB_EXPORTED_METHOD internal_pop(
void* dst );
890 void __TBB_EXPORTED_METHOD internal_abort();
893 bool __TBB_EXPORTED_METHOD internal_push_if_not_full(
const void* src );
897 bool __TBB_EXPORTED_METHOD internal_pop_if_present(
void* dst );
900 ptrdiff_t __TBB_EXPORTED_METHOD internal_size()
const;
903 bool __TBB_EXPORTED_METHOD internal_empty()
const;
906 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity,
size_t element_size );
909 virtual page *allocate_page() = 0;
912 virtual void deallocate_page( page *p ) = 0;
916 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
919 void __TBB_EXPORTED_METHOD internal_throw_exception()
const;
922 void __TBB_EXPORTED_METHOD assign(
const concurrent_queue_base_v3& src ) ;
924 #if __TBB_CPP11_RVALUE_REF_PRESENT 925 void internal_swap( concurrent_queue_base_v3& src ) {
927 std::swap( my_capacity, src.my_capacity );
928 std::swap( items_per_page, src.items_per_page );
929 std::swap( item_size, src.item_size );
930 std::swap( my_rep, src.my_rep );
935 void internal_insert_item(
const void* src, copy_specifics op_type );
938 bool internal_insert_if_not_full(
const void* src, copy_specifics op_type );
941 void internal_assign(
const concurrent_queue_base_v3& src, copy_specifics op_type );
943 virtual void copy_page_item( page& dst,
size_t dindex,
const page& src,
size_t sindex ) = 0;
949 class concurrent_queue_base_v8:
public concurrent_queue_base_v3 {
951 concurrent_queue_base_v8(
size_t item_sz ) : concurrent_queue_base_v3( item_sz ) {}
954 void __TBB_EXPORTED_METHOD move_content( concurrent_queue_base_v8& src ) ;
957 bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(
const void* src );
960 void __TBB_EXPORTED_METHOD internal_push_move(
const void* src );
962 friend struct micro_queue;
963 virtual void move_page_item( page& dst,
size_t dindex,
const page& src,
size_t sindex ) = 0;
964 virtual void move_item( page& dst,
size_t index,
const void* src ) = 0;
969 class concurrent_queue_iterator_base_v3 {
972 concurrent_queue_iterator_rep* my_rep;
974 template<
typename C,
typename T,
typename U>
975 friend bool operator==(
const concurrent_queue_iterator<C,T>& i,
const concurrent_queue_iterator<C,U>& j );
977 template<
typename C,
typename T,
typename U>
978 friend bool operator!=(
const concurrent_queue_iterator<C,T>& i,
const concurrent_queue_iterator<C,U>& j );
980 void initialize(
const concurrent_queue_base_v3& queue,
size_t offset_of_data );
986 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
989 concurrent_queue_iterator_base_v3(
const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
995 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3(
const concurrent_queue_base_v3& queue );
998 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3(
const concurrent_queue_base_v3& queue,
size_t offset_of_data );
1001 void __TBB_EXPORTED_METHOD assign(
const concurrent_queue_iterator_base_v3& i );
1004 void __TBB_EXPORTED_METHOD advance();
1007 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
1010 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
1015 template<
typename Container,
typename Value>
1016 class concurrent_queue_iterator:
public concurrent_queue_iterator_base,
1017 public std::iterator<std::forward_iterator_tag,Value> {
1019 #if !__TBB_TEMPLATE_FRIENDS_BROKEN 1020 template<
typename T,
class A>
1021 friend class ::tbb::concurrent_bounded_queue;
1027 concurrent_queue_iterator(
const concurrent_queue_base_v3& queue ) :
1028 concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
1033 concurrent_queue_iterator() {}
1037 concurrent_queue_iterator(
const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
1038 concurrent_queue_iterator_base_v3(other)
1042 concurrent_queue_iterator& operator=(
const concurrent_queue_iterator& other ) {
1048 Value& operator*()
const {
1049 return *
static_cast<Value*
>(my_item);
1052 Value* operator->()
const {
return &operator*();}
1055 concurrent_queue_iterator& operator++() {
1061 Value* operator++(
int) {
1062 Value* result = &operator*();
1069 template<
typename C,
typename T,
typename U>
1070 bool operator==(
const concurrent_queue_iterator<C,T>& i,
const concurrent_queue_iterator<C,U>& j ) {
1071 return i.my_item==j.my_item;
1074 template<
typename C,
typename T,
typename U>
1075 bool operator!=(
const concurrent_queue_iterator<C,T>& i,
const concurrent_queue_iterator<C,U>& j ) {
1076 return i.my_item!=j.my_item;
Definition: _flow_graph_async_msg_impl.h:32
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44