BRE12
concurrent_priority_queue.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB_concurrent_priority_queue_H
22 #define __TBB_concurrent_priority_queue_H
23 
24 #include "atomic.h"
25 #include "cache_aligned_allocator.h"
26 #include "tbb_exception.h"
27 #include "tbb_stddef.h"
28 #include "tbb_profiling.h"
29 #include "internal/_aggregator_impl.h"
30 #include <vector>
31 #include <iterator>
32 #include <functional>
33 
34 #if __TBB_INITIALIZER_LISTS_PRESENT
35  #include <initializer_list>
36 #endif
37 
38 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
39  #include <type_traits>
40 #endif
41 
42 namespace tbb {
43 namespace interface5 {
44 namespace internal {
45 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
46  template<typename T, bool C = std::is_copy_constructible<T>::value>
47  struct use_element_copy_constructor {
48  typedef tbb::internal::true_type type;
49  };
50  template<typename T>
51  struct use_element_copy_constructor <T,false> {
52  typedef tbb::internal::false_type type;
53  };
54 #else
55  template<typename>
57  typedef tbb::internal::true_type type;
58  };
59 #endif
60 } // namespace internal
61 
62 using namespace tbb::internal;
63 
65 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
67  public:
69  typedef T value_type;
70 
72  typedef T& reference;
73 
75  typedef const T& const_reference;
76 
78  typedef size_t size_type;
79 
81  typedef ptrdiff_t difference_type;
82 
84  typedef A allocator_type;
85 
87  explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
88  {
89  my_aggregator.initialize_handler(my_functor_t(this));
90  }
91 
93  explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
94  mark(0), my_size(0), data(a)
95  {
96  data.reserve(init_capacity);
97  my_aggregator.initialize_handler(my_functor_t(this));
98  }
99 
101  template<typename InputIterator>
102  concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
103  mark(0), data(begin, end, a)
104  {
105  my_aggregator.initialize_handler(my_functor_t(this));
106  heapify();
107  my_size = data.size();
108  }
109 
110 #if __TBB_INITIALIZER_LISTS_PRESENT
111  concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
113  mark(0),data(init_list.begin(), init_list.end(), a)
114  {
115  my_aggregator.initialize_handler(my_functor_t(this));
116  heapify();
117  my_size = data.size();
118  }
119 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
120 
122 
123  explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
124  my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
125  {
126  my_aggregator.initialize_handler(my_functor_t(this));
127  heapify();
128  }
129 
131 
132  concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
133  my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
134  {
135  my_aggregator.initialize_handler(my_functor_t(this));
136  heapify();
137  }
138 
140 
142  if (this != &src) {
143  vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
144  mark = src.mark;
145  my_size = src.my_size;
146  }
147  return *this;
148  }
149 
150 #if __TBB_CPP11_RVALUE_REF_PRESENT
151 
154  my_size(src.my_size), data(std::move(src.data))
155  {
156  my_aggregator.initialize_handler(my_functor_t(this));
157  }
158 
160 
161  concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
162  my_size(src.my_size),
163 #if __TBB_ALLOCATOR_TRAITS_PRESENT
164  data(std::move(src.data), a)
165 #else
166  // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
167  // It seems that the reason is absence of support of allocator_traits (stateful allocators).
168  data(a)
169 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT
170  {
171  my_aggregator.initialize_handler(my_functor_t(this));
172 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
173  if (a != src.data.get_allocator()){
174  data.reserve(src.data.size());
175  data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
176  }else{
177  data = std::move(src.data);
178  }
179 #endif
180  }
181 
183 
185  if (this != &src) {
186  mark = src.mark;
187  my_size = src.my_size;
188 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
189  if (data.get_allocator() != src.data.get_allocator()){
190  vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
191  }else
192 #endif
193  {
194  data = std::move(src.data);
195  }
196  }
197  return *this;
198  }
199 #endif //__TBB_CPP11_RVALUE_REF_PRESENT
200 
202  template<typename InputIterator>
203  void assign(InputIterator begin, InputIterator end) {
204  vector_t(begin, end, data.get_allocator()).swap(data);
205  mark = 0;
206  my_size = data.size();
207  heapify();
208  }
209 
210 #if __TBB_INITIALIZER_LISTS_PRESENT
211  void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
213 
215  concurrent_priority_queue& operator=(std::initializer_list<T> il) {
216  this->assign(il.begin(), il.end());
217  return *this;
218  }
219 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
220 
222 
224  bool empty() const { return size()==0; }
225 
227 
229  size_type size() const { return __TBB_load_with_acquire(my_size); }
230 
232 
233  void push(const_reference elem) {
234 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
235  __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
236 #endif
237  cpq_operation op_data(elem, PUSH_OP);
238  my_aggregator.execute(&op_data);
239  if (op_data.status == FAILED) // exception thrown
240  throw_exception(eid_bad_alloc);
241  }
242 
243 #if __TBB_CPP11_RVALUE_REF_PRESENT
244 
246  void push(value_type &&elem) {
247  cpq_operation op_data(elem, PUSH_RVALUE_OP);
248  my_aggregator.execute(&op_data);
249  if (op_data.status == FAILED) // exception thrown
250  throw_exception(eid_bad_alloc);
251  }
252 
253 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
254 
256  template<typename... Args>
257  void emplace(Args&&... args) {
258  push(value_type(std::forward<Args>(args)...));
259  }
260 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
261 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
262 
264 
267  bool try_pop(reference elem) {
268  cpq_operation op_data(POP_OP);
269  op_data.elem = &elem;
270  my_aggregator.execute(&op_data);
271  return op_data.status==SUCCEEDED;
272  }
273 
275 
278  void clear() {
279  data.clear();
280  mark = 0;
281  my_size = 0;
282  }
283 
285 
287  using std::swap;
288  data.swap(q.data);
289  swap(mark, q.mark);
290  swap(my_size, q.my_size);
291  }
292 
294  allocator_type get_allocator() const { return data.get_allocator(); }
295 
296  private:
297  enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
298  enum operation_status { WAIT=0, SUCCEEDED, FAILED };
299 
300  class cpq_operation : public aggregated_operation<cpq_operation> {
301  public:
302  operation_type type;
303  union {
304  value_type *elem;
305  size_type sz;
306  };
307  cpq_operation(const_reference e, operation_type t) :
308  type(t), elem(const_cast<value_type*>(&e)) {}
309  cpq_operation(operation_type t) : type(t) {}
310  };
311 
312  class my_functor_t {
314  public:
315  my_functor_t() {}
316  my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
317  void operator()(cpq_operation* op_list) {
318  cpq->handle_operations(op_list);
319  }
320  };
321 
323  aggregator_t my_aggregator;
325  char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
327  size_type mark;
328  __TBB_atomic size_type my_size;
329  Compare compare;
331  char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
333 
350  typedef std::vector<value_type, allocator_type> vector_t;
351  vector_t data;
352 
353  void handle_operations(cpq_operation *op_list) {
354  cpq_operation *tmp, *pop_list=NULL;
355 
356  __TBB_ASSERT(mark == data.size(), NULL);
357 
358  // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
359  while (op_list) {
360  // ITT note: &(op_list->status) tag is used to cover accesses to op_list
361  // node. This thread is going to handle the operation, and so will acquire it
362  // and perform the associated operation w/o triggering a race condition; the
363  // thread that created the operation is waiting on the status field, so when
364  // this thread is done with the operation, it will perform a
365  // store_with_release to give control back to the waiting thread in
366  // aggregator::insert_operation.
367  call_itt_notify(acquired, &(op_list->status));
368  __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
369  tmp = op_list;
370  op_list = itt_hide_load_word(op_list->next);
371  if (tmp->type == POP_OP) {
372  if (mark < data.size() &&
373  compare(data[0], data[data.size()-1])) {
374  // there are newly pushed elems and the last one
375  // is higher than top
376  *(tmp->elem) = move(data[data.size()-1]);
377  __TBB_store_with_release(my_size, my_size-1);
378  itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
379  data.pop_back();
380  __TBB_ASSERT(mark<=data.size(), NULL);
381  }
382  else { // no convenient item to pop; postpone
383  itt_hide_store_word(tmp->next, pop_list);
384  pop_list = tmp;
385  }
386  } else { // PUSH_OP or PUSH_RVALUE_OP
387  __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
388  __TBB_TRY{
389  if (tmp->type == PUSH_OP) {
390  push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type());
391  } else {
392  data.push_back(move(*(tmp->elem)));
393  }
394  __TBB_store_with_release(my_size, my_size + 1);
395  itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
396  } __TBB_CATCH(...) {
397  itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
398  }
399  }
400  }
401 
402  // second pass processes pop operations
403  while (pop_list) {
404  tmp = pop_list;
405  pop_list = itt_hide_load_word(pop_list->next);
406  __TBB_ASSERT(tmp->type == POP_OP, NULL);
407  if (data.empty()) {
408  itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
409  }
410  else {
411  __TBB_ASSERT(mark<=data.size(), NULL);
412  if (mark < data.size() &&
413  compare(data[0], data[data.size()-1])) {
414  // there are newly pushed elems and the last one is
415  // higher than top
416  *(tmp->elem) = move(data[data.size()-1]);
417  __TBB_store_with_release(my_size, my_size-1);
418  itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
419  data.pop_back();
420  }
421  else { // extract top and push last element down heap
422  *(tmp->elem) = move(data[0]);
423  __TBB_store_with_release(my_size, my_size-1);
424  itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
425  reheap();
426  }
427  }
428  }
429 
430  // heapify any leftover pushed elements before doing the next
431  // batch of operations
432  if (mark<data.size()) heapify();
433  __TBB_ASSERT(mark == data.size(), NULL);
434  }
435 
437  void heapify() {
438  if (!mark && data.size()>0) mark = 1;
439  for (; mark<data.size(); ++mark) {
440  // for each unheapified element under size
441  size_type cur_pos = mark;
442  value_type to_place = move(data[mark]);
443  do { // push to_place up the heap
444  size_type parent = (cur_pos-1)>>1;
445  if (!compare(data[parent], to_place)) break;
446  data[cur_pos] = move(data[parent]);
447  cur_pos = parent;
448  } while( cur_pos );
449  data[cur_pos] = move(to_place);
450  }
451  }
452 
454 
455  void reheap() {
456  size_type cur_pos=0, child=1;
457 
458  while (child < mark) {
459  size_type target = child;
460  if (child+1 < mark && compare(data[child], data[child+1]))
461  ++target;
462  // target now has the higher priority child
463  if (compare(data[target], data[data.size()-1])) break;
464  data[cur_pos] = move(data[target]);
465  cur_pos = target;
466  child = (cur_pos<<1)+1;
467  }
468  if (cur_pos != data.size()-1)
469  data[cur_pos] = move(data[data.size()-1]);
470  data.pop_back();
471  if (mark > data.size()) mark = data.size();
472  }
473 
474  void push_back_helper(const T& t, tbb::internal::true_type) {
475  data.push_back(t);
476  }
477 
478  void push_back_helper(const T&, tbb::internal::false_type) {
479  __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
480  }
481 };
482 
483 } // namespace interface5
484 
486 
487 } // namespace tbb
488 
489 #endif /* __TBB_concurrent_priority_queue_H */
size_type size() const
Returns the current number of elements contained in the queue.
Definition: concurrent_priority_queue.h:229
Definition: atomic.h:535
concurrent_priority_queue(const concurrent_priority_queue &src)
Copy constructor.
Definition: concurrent_priority_queue.h:123
T & reference
Reference type.
Definition: concurrent_priority_queue.h:72
allocator_type get_allocator() const
Return allocator object.
Definition: concurrent_priority_queue.h:294
aggregated_operation base class
Definition: _aggregator_impl.h:37
Definition: _aggregator_impl.h:144
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
Definition: concurrent_priority_queue.h:233
concurrent_priority_queue(size_type init_capacity, const allocator_type &a=allocator_type())
Constructs a new concurrent_priority_queue with init_sz capacity.
Definition: concurrent_priority_queue.h:93
size_t size_type
Integral type for representing size of the queue.
Definition: concurrent_priority_queue.h:78
concurrent_priority_queue(const concurrent_priority_queue &src, const allocator_type &a)
Copy constructor with specific allocator.
Definition: concurrent_priority_queue.h:132
concurrent_priority_queue(const allocator_type &a=allocator_type())
Constructs a new concurrent_priority_queue with default capacity.
Definition: concurrent_priority_queue.h:87
void assign(InputIterator begin, InputIterator end)
Assign the queue from [begin,end) range, not thread-safe.
Definition: concurrent_priority_queue.h:203
T value_type
Element type in the queue.
Definition: concurrent_priority_queue.h:69
Definition: concurrent_priority_queue.h:56
bool empty() const
Returns true if empty, false otherwise.
Definition: concurrent_priority_queue.h:224
Concurrent priority queue.
Definition: concurrent_priority_queue.h:66
concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
Definition: concurrent_priority_queue.h:102
Definition: _flow_graph_async_msg_impl.h:32
concurrent_priority_queue & operator=(const concurrent_priority_queue &src)
Assignment operator.
Definition: concurrent_priority_queue.h:141
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44
void clear()
Clear the queue; not thread-safe.
Definition: concurrent_priority_queue.h:278
A allocator_type
Allocator type.
Definition: concurrent_priority_queue.h:84
void swap(concurrent_priority_queue &q)
Swap this queue with another; not thread-safe.
Definition: concurrent_priority_queue.h:286
bool try_pop(reference elem)
Gets a reference to and removes highest priority element.
Definition: concurrent_priority_queue.h:267
const T & const_reference
Const reference type.
Definition: concurrent_priority_queue.h:75
ptrdiff_t difference_type
Difference type for iterator.
Definition: concurrent_priority_queue.h:81