BRE12
aggregator.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB__aggregator_H
22 #define __TBB__aggregator_H
23 
24 #if !TBB_PREVIEW_AGGREGATOR
25 #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
26 #endif
27 
28 #include "atomic.h"
29 #include "tbb_profiling.h"
30 
31 namespace tbb {
32 namespace interface6 {
33 
34 using namespace tbb::internal;
35 
37  template<typename handler_type> friend class aggregator_ext;
38  uintptr_t status;
39  aggregator_operation* my_next;
40 public:
41  enum aggregator_operation_status { agg_waiting=0, agg_finished };
42  aggregator_operation() : status(agg_waiting), my_next(NULL) {}
44  void start() { call_itt_notify(acquired, &status); }
46 
47  void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
48  aggregator_operation* next() { return itt_hide_load_word(my_next);}
49  void set_next(aggregator_operation* n) { itt_hide_store_word(my_next, n); }
50 };
51 
52 namespace internal {
53 
55  friend class basic_handler;
56  virtual void apply_body() = 0;
57 public:
59  virtual ~basic_operation_base() {}
60 };
61 
62 template<typename Body>
63 class basic_operation : public basic_operation_base, no_assign {
64  const Body& my_body;
65  /*override*/ void apply_body() { my_body(); }
66 public:
67  basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
68 };
69 
71 public:
72  basic_handler() {}
73  void operator()(aggregator_operation* op_list) const {
74  while (op_list) {
75  // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
76  // The executing thread "acquires" the tag (see start()) and then performs
77  // the associated operation w/o triggering a race condition diagnostics.
78  // A thread that created the operation is waiting for its status (see execute_impl()),
79  // so when this thread is done with the operation, it will "release" the tag
80  // and update the status (see finish()) to give control back to the waiting thread.
81  basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
82  // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
83  op_list = op_list->next();
84  request.start();
85  request.apply_body();
86  request.finish();
87  }
88  }
89 };
90 
91 } // namespace internal
92 
94 
96 template <typename handler_type>
97 class aggregator_ext : tbb::internal::no_copy {
98 public:
99  aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
100 
102 
103  void process(aggregator_operation *op) { execute_impl(*op); }
104 
105  protected:
110 
111  // ITT note: &(op.status) tag is used to cover accesses to this operation. This
112  // thread has created the operation, and now releases it so that the handler
113  // thread may handle the associated operation w/o triggering a race condition;
114  // thus this tag will be acquired just before the operation is handled in the
115  // handle_operations functor.
116  call_itt_notify(releasing, &(op.status));
117  // insert the operation in the queue
118  do {
119  // ITT may flag the following line as a race; it is a false positive:
120  // This is an atomic read; we don't provide itt_hide_load_word for atomics
121  op.my_next = res = mailbox; // NOT A RACE
122  } while (mailbox.compare_and_swap(&op, res) != res);
123  if (!res) { // first in the list; handle the operations
124  // ITT note: &mailbox tag covers access to the handler_busy flag, which this
125  // waiting handler thread will try to set before entering handle_operations.
126  call_itt_notify(acquired, &mailbox);
127  start_handle_operations();
128  __TBB_ASSERT(op.status, NULL);
129  }
130  else { // not first; wait for op to be ready
131  call_itt_notify(prepare, &(op.status));
132  spin_wait_while_eq(op.status, uintptr_t(aggregator_operation::agg_waiting));
133  itt_load_word_with_acquire(op.status);
134  }
135  }
136 
137 
138  private:
141 
143 
144  uintptr_t handler_busy;
145 
146  handler_type handle_operations;
147 
149  void start_handle_operations() {
150  aggregator_operation *pending_operations;
151 
152  // ITT note: &handler_busy tag covers access to mailbox as it is passed
153  // between active and waiting handlers. Below, the waiting handler waits until
154  // the active handler releases, and the waiting handler acquires &handler_busy as
155  // it becomes the active_handler. The release point is at the end of this
156  // function, when all operations in mailbox have been handled by the
157  // owner of this aggregator.
158  call_itt_notify(prepare, &handler_busy);
159  // get handler_busy: only one thread can possibly spin here at a time
160  spin_wait_until_eq(handler_busy, uintptr_t(0));
161  call_itt_notify(acquired, &handler_busy);
162  // acquire fence not necessary here due to causality rule and surrounding atomics
163  __TBB_store_with_release(handler_busy, uintptr_t(1));
164 
165  // ITT note: &mailbox tag covers access to the handler_busy flag itself.
166  // Capturing the state of the mailbox signifies that handler_busy has been
167  // set and a new active handler will now process that list's operations.
168  call_itt_notify(releasing, &mailbox);
169  // grab pending_operations
170  pending_operations = mailbox.fetch_and_store(NULL);
171 
172  // handle all the operations
173  handle_operations(pending_operations);
174 
175  // release the handler
176  itt_store_word_with_release(handler_busy, uintptr_t(0));
177  }
178 };
179 
181 class aggregator : private aggregator_ext<internal::basic_handler> {
182 public:
185 
187  template<typename Body>
188  void execute(const Body& b) {
190  this->execute_impl(op);
191  }
192 };
193 
194 } // namespace interface6
195 
199 
200 } // namespace tbb
201 
202 #endif // __TBB__aggregator_H
Definition: atomic.h:535
Definition: aggregator.h:70
void finish()
Call finish when done handling this operation.
Definition: aggregator.h:47
Definition: aggregator.h:36
void process(aggregator_operation *op)
EXPERT INTERFACE: Enter a user-made operation into the aggregator&#39;s mailbox.
Definition: aggregator.h:103
void execute(const Body &b)
BASIC INTERFACE: Enter a function for exclusive execution by the aggregator.
Definition: aggregator.h:188
Primary template for atomic.
Definition: atomic.h:405
Definition: _flow_graph_async_msg_impl.h:32
Basic aggregator interface.
Definition: aggregator.h:181
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44
Aggregator base class and expert interface.
Definition: aggregator.h:97
void execute_impl(aggregator_operation &op)
Place operation in mailbox, then either handle mailbox or wait for the operation to be completed by a...
Definition: aggregator.h:108
void start()
Call start before handling this operation.
Definition: aggregator.h:44