BRE12
_aggregator_impl.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB__aggregator_impl_H
22 #define __TBB__aggregator_impl_H
23 
24 #include "../atomic.h"
25 #if !__TBBMALLOC_BUILD
26 #include "../tbb_profiling.h"
27 #endif
28 
29 namespace tbb {
30 namespace interface6 {
31 namespace internal {
32 
33 using namespace tbb::internal;
34 
36 template <typename Derived>
38  public:
39  uintptr_t status;
40  Derived *next;
41  aggregated_operation() : status(0), next(NULL) {}
42 };
43 
45 
50 template < typename operation_type >
52 public:
53  aggregator_generic() : handler_busy(false) { pending_operations = NULL; }
54 
56 
63  template < typename handler_type >
64  void execute(operation_type *op, handler_type &handle_operations, bool long_life_time = true) {
65  operation_type *res;
66  // op->status should be read before inserting the operation in the
67  // aggregator queue since it can become invalid after executing a
68  // handler (if the operation has 'short' life time.)
69  const uintptr_t status = op->status;
70 
71  // ITT note: &(op->status) tag is used to cover accesses to this op node. This
72  // thread has created the operation, and now releases it so that the handler
73  // thread may handle the associated operation w/o triggering a race condition;
74  // thus this tag will be acquired just before the operation is handled in the
75  // handle_operations functor.
76  call_itt_notify(releasing, &(op->status));
77  // insert the operation in the queue.
78  do {
79  // ITT may flag the following line as a race; it is a false positive:
80  // This is an atomic read; we don't provide itt_hide_load_word for atomics
81  op->next = res = pending_operations; // NOT A RACE
82  } while (pending_operations.compare_and_swap(op, res) != res);
83  if (!res) { // first in the list; handle the operations.
84  // ITT note: &pending_operations tag covers access to the handler_busy flag,
85  // which this waiting handler thread will try to set before entering
86  // handle_operations.
87  call_itt_notify(acquired, &pending_operations);
88  start_handle_operations(handle_operations);
89  // The operation with 'short' life time can already be destroyed.
90  if (long_life_time)
91  __TBB_ASSERT(op->status, NULL);
92  }
93  // not first; wait for op to be ready.
94  else if (!status) { // operation is blocking here.
95  __TBB_ASSERT(long_life_time, "The blocking operation cannot have 'short' life time. Since it can already be destroyed.");
96  call_itt_notify(prepare, &(op->status));
97  spin_wait_while_eq(op->status, uintptr_t(0));
98  itt_load_word_with_acquire(op->status);
99  }
100  }
101 
102  private:
104  atomic<operation_type *> pending_operations;
106  uintptr_t handler_busy;
107 
109  template < typename handler_type >
110  void start_handle_operations( handler_type &handle_operations ) {
111  operation_type *op_list;
112 
113  // ITT note: &handler_busy tag covers access to pending_operations as it is passed
114  // between active and waiting handlers. Below, the waiting handler waits until
115  // the active handler releases, and the waiting handler acquires &handler_busy as
116  // it becomes the active_handler. The release point is at the end of this
117  // function, when all operations in pending_operations have been handled by the
118  // owner of this aggregator.
119  call_itt_notify(prepare, &handler_busy);
120  // get the handler_busy:
121  // only one thread can possibly spin here at a time
122  spin_wait_until_eq(handler_busy, uintptr_t(0));
123  call_itt_notify(acquired, &handler_busy);
124  // acquire fence not necessary here due to causality rule and surrounding atomics
125  __TBB_store_with_release(handler_busy, uintptr_t(1));
126 
127  // ITT note: &pending_operations tag covers access to the handler_busy flag
128  // itself. Capturing the state of the pending_operations signifies that
129  // handler_busy has been set and a new active handler will now process that list's
130  // operations.
131  call_itt_notify(releasing, &pending_operations);
132  // grab pending_operations
133  op_list = pending_operations.fetch_and_store(NULL);
134 
135  // handle all the operations
136  handle_operations(op_list);
137 
138  // release the handler
139  itt_store_word_with_release(handler_busy, uintptr_t(0));
140  }
141 };
142 
143 template < typename handler_type, typename operation_type >
144 class aggregator : public aggregator_generic<operation_type> {
145  handler_type handle_operations;
146 public:
147  aggregator() {}
148  explicit aggregator(handler_type h) : handle_operations(h) {}
149 
150  void initialize_handler(handler_type h) { handle_operations = h; }
151 
152  void execute(operation_type *op) {
153  aggregator_generic<operation_type>::execute(op, handle_operations);
154  }
155 };
156 
157 // the most-compatible friend declaration (vs, gcc, icc) is
158 // template<class U, class V> friend class aggregating_functor;
159 template<typename aggregating_class, typename operation_list>
161  aggregating_class *fi;
162 public:
164  aggregating_functor(aggregating_class *fi_) : fi(fi_) {}
165  void operator()(operation_list* op_list) { fi->handle_operations(op_list); }
166 };
167 
168 } // namespace internal
169 } // namespace interface6
170 
171 namespace internal {
176 } // namespace internal
177 
178 } // namespace tbb
179 
180 #endif // __TBB__aggregator_impl_H
Definition: _aggregator_impl.h:160
Definition: atomic.h:535
Aggregator base class.
Definition: _aggregator_impl.h:51
aggregated_operation base class
Definition: _aggregator_impl.h:37
Definition: _aggregator_impl.h:144
void execute(operation_type *op, handler_type &handle_operations, bool long_life_time=true)
Place operation in list.
Definition: _aggregator_impl.h:64
Primary template for atomic.
Definition: atomic.h:405
Definition: _flow_graph_async_msg_impl.h:32
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44