21 #ifndef __TBB_concurrent_priority_queue_H 22 #define __TBB_concurrent_priority_queue_H 25 #include "cache_aligned_allocator.h" 26 #include "tbb_exception.h" 27 #include "tbb_stddef.h" 28 #include "tbb_profiling.h" 29 #include "internal/_aggregator_impl.h" 34 #if __TBB_INITIALIZER_LISTS_PRESENT 35 #include <initializer_list> 38 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT 39 #include <type_traits> 43 namespace interface5 {
45 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT 46 template<typename T, bool C = std::is_copy_constructible<T>::value>
47 struct use_element_copy_constructor {
48 typedef tbb::internal::true_type type;
51 struct use_element_copy_constructor <T,false> {
52 typedef tbb::internal::false_type type;
57 typedef tbb::internal::true_type type;
65 template <
typename T,
typename Compare=std::less<T>,
typename A=cache_aligned_allocator<T> >
89 my_aggregator.initialize_handler(my_functor_t(
this));
94 mark(0), my_size(0), data(a)
96 data.reserve(init_capacity);
97 my_aggregator.initialize_handler(my_functor_t(
this));
101 template<
typename InputIterator>
103 mark(0), data(begin, end, a)
105 my_aggregator.initialize_handler(my_functor_t(
this));
107 my_size = data.size();
110 #if __TBB_INITIALIZER_LISTS_PRESENT 113 mark(0),data(init_list.begin(), init_list.end(), a)
115 my_aggregator.initialize_handler(my_functor_t(
this));
117 my_size = data.size();
119 #endif //# __TBB_INITIALIZER_LISTS_PRESENT 124 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
126 my_aggregator.initialize_handler(my_functor_t(
this));
133 my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
135 my_aggregator.initialize_handler(my_functor_t(
this));
143 vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
145 my_size = src.my_size;
150 #if __TBB_CPP11_RVALUE_REF_PRESENT 154 my_size(src.my_size), data(std::move(src.data))
156 my_aggregator.initialize_handler(my_functor_t(
this));
162 my_size(src.my_size),
163 #if __TBB_ALLOCATOR_TRAITS_PRESENT 164 data(std::move(src.data), a)
169 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT 171 my_aggregator.initialize_handler(my_functor_t(
this));
172 #if !__TBB_ALLOCATOR_TRAITS_PRESENT 173 if (a != src.data.get_allocator()){
174 data.reserve(src.data.size());
175 data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
177 data = std::move(src.data);
187 my_size = src.my_size;
188 #if !__TBB_ALLOCATOR_TRAITS_PRESENT 189 if (data.get_allocator() != src.data.get_allocator()){
190 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
194 data = std::move(src.data);
199 #endif //__TBB_CPP11_RVALUE_REF_PRESENT 202 template<
typename InputIterator>
203 void assign(InputIterator begin, InputIterator end) {
204 vector_t(begin, end, data.get_allocator()).swap(data);
206 my_size = data.size();
210 #if __TBB_INITIALIZER_LISTS_PRESENT 211 void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
216 this->assign(il.begin(), il.end());
219 #endif //# __TBB_INITIALIZER_LISTS_PRESENT 224 bool empty()
const {
return size()==0; }
229 size_type
size()
const {
return __TBB_load_with_acquire(my_size); }
233 void push(const_reference elem) {
234 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT 235 __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value,
"The type is not copy constructible. Copying push operation is impossible." );
237 cpq_operation op_data(elem, PUSH_OP);
238 my_aggregator.execute(&op_data);
239 if (op_data.status == FAILED)
240 throw_exception(eid_bad_alloc);
243 #if __TBB_CPP11_RVALUE_REF_PRESENT 246 void push(value_type &&elem) {
247 cpq_operation op_data(elem, PUSH_RVALUE_OP);
248 my_aggregator.execute(&op_data);
249 if (op_data.status == FAILED)
250 throw_exception(eid_bad_alloc);
253 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT 256 template<
typename... Args>
257 void emplace(Args&&... args) {
258 push(value_type(std::forward<Args>(args)...));
268 cpq_operation op_data(POP_OP);
269 op_data.elem = &elem;
270 my_aggregator.execute(&op_data);
271 return op_data.status==SUCCEEDED;
290 swap(my_size, q.my_size);
297 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
298 enum operation_status { WAIT=0, SUCCEEDED, FAILED };
307 cpq_operation(const_reference e, operation_type t) :
308 type(t), elem(const_cast<value_type*>(&e)) {}
309 cpq_operation(operation_type t) : type(t) {}
317 void operator()(cpq_operation* op_list) {
318 cpq->handle_operations(op_list);
323 aggregator_t my_aggregator;
325 char padding1[NFS_MaxLineSize -
sizeof(aggregator_t)];
328 __TBB_atomic size_type my_size;
331 char padding2[NFS_MaxLineSize - (2*
sizeof(size_type)) -
sizeof(Compare)];
350 typedef std::vector<value_type, allocator_type> vector_t;
353 void handle_operations(cpq_operation *op_list) {
354 cpq_operation *tmp, *pop_list=NULL;
356 __TBB_ASSERT(mark == data.size(), NULL);
367 call_itt_notify(acquired, &(op_list->status));
368 __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
370 op_list = itt_hide_load_word(op_list->next);
371 if (tmp->type == POP_OP) {
372 if (mark < data.size() &&
373 compare(data[0], data[data.size()-1])) {
376 *(tmp->elem) = move(data[data.size()-1]);
377 __TBB_store_with_release(my_size, my_size-1);
378 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
380 __TBB_ASSERT(mark<=data.size(), NULL);
383 itt_hide_store_word(tmp->next, pop_list);
387 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP,
"Unknown operation" );
389 if (tmp->type == PUSH_OP) {
390 push_back_helper(*(tmp->elem),
typename internal::use_element_copy_constructor<value_type>::type());
392 data.push_back(move(*(tmp->elem)));
394 __TBB_store_with_release(my_size, my_size + 1);
395 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
397 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
405 pop_list = itt_hide_load_word(pop_list->next);
406 __TBB_ASSERT(tmp->type == POP_OP, NULL);
408 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
411 __TBB_ASSERT(mark<=data.size(), NULL);
412 if (mark < data.size() &&
413 compare(data[0], data[data.size()-1])) {
416 *(tmp->elem) = move(data[data.size()-1]);
417 __TBB_store_with_release(my_size, my_size-1);
418 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
422 *(tmp->elem) = move(data[0]);
423 __TBB_store_with_release(my_size, my_size-1);
424 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
432 if (mark<data.size()) heapify();
433 __TBB_ASSERT(mark == data.size(), NULL);
438 if (!mark && data.size()>0) mark = 1;
439 for (; mark<data.size(); ++mark) {
441 size_type cur_pos = mark;
442 value_type to_place = move(data[mark]);
444 size_type parent = (cur_pos-1)>>1;
445 if (!compare(data[parent], to_place))
break;
446 data[cur_pos] = move(data[parent]);
449 data[cur_pos] = move(to_place);
456 size_type cur_pos=0, child=1;
458 while (child < mark) {
459 size_type target = child;
460 if (child+1 < mark && compare(data[child], data[child+1]))
463 if (compare(data[target], data[data.size()-1]))
break;
464 data[cur_pos] = move(data[target]);
466 child = (cur_pos<<1)+1;
468 if (cur_pos != data.size()-1)
469 data[cur_pos] = move(data[data.size()-1]);
471 if (mark > data.size()) mark = data.size();
474 void push_back_helper(
const T& t, tbb::internal::true_type) {
478 void push_back_helper(
const T&, tbb::internal::false_type) {
479 __TBB_ASSERT(
false,
"The type is not copy constructible. Copying push operation is impossible." );
size_type size() const
Returns the current number of elements contained in the queue.
Definition: concurrent_priority_queue.h:229
concurrent_priority_queue(const concurrent_priority_queue &src)
Copy constructor.
Definition: concurrent_priority_queue.h:123
T & reference
Reference type.
Definition: concurrent_priority_queue.h:72
allocator_type get_allocator() const
Return allocator object.
Definition: concurrent_priority_queue.h:294
aggregated_operation base class
Definition: _aggregator_impl.h:37
Definition: _aggregator_impl.h:144
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
Definition: concurrent_priority_queue.h:233
concurrent_priority_queue(size_type init_capacity, const allocator_type &a=allocator_type())
Constructs a new concurrent_priority_queue with init_sz capacity.
Definition: concurrent_priority_queue.h:93
size_t size_type
Integral type for representing size of the queue.
Definition: concurrent_priority_queue.h:78
concurrent_priority_queue(const concurrent_priority_queue &src, const allocator_type &a)
Copy constructor with specific allocator.
Definition: concurrent_priority_queue.h:132
concurrent_priority_queue(const allocator_type &a=allocator_type())
Constructs a new concurrent_priority_queue with default capacity.
Definition: concurrent_priority_queue.h:87
void assign(InputIterator begin, InputIterator end)
Assign the queue from [begin,end) range, not thread-safe.
Definition: concurrent_priority_queue.h:203
T value_type
Element type in the queue.
Definition: concurrent_priority_queue.h:69
Definition: concurrent_priority_queue.h:56
bool empty() const
Returns true if empty, false otherwise.
Definition: concurrent_priority_queue.h:224
Concurrent priority queue.
Definition: concurrent_priority_queue.h:66
concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
Definition: concurrent_priority_queue.h:102
Definition: _flow_graph_async_msg_impl.h:32
concurrent_priority_queue & operator=(const concurrent_priority_queue &src)
Assignment operator.
Definition: concurrent_priority_queue.h:141
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44
void clear()
Clear the queue; not thread-safe.
Definition: concurrent_priority_queue.h:278
A allocator_type
Allocator type.
Definition: concurrent_priority_queue.h:84
void swap(concurrent_priority_queue &q)
Swap this queue with another; not thread-safe.
Definition: concurrent_priority_queue.h:286
bool try_pop(reference elem)
Gets a reference to and removes highest priority element.
Definition: concurrent_priority_queue.h:267
const T & const_reference
Const reference type.
Definition: concurrent_priority_queue.h:75
ptrdiff_t difference_type
Difference type for iterator.
Definition: concurrent_priority_queue.h:81