BRE12
_flow_graph_async_msg_impl.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB__flow_graph_async_msg_impl_H
22 #define __TBB__flow_graph_async_msg_impl_H
23 
24 #ifndef __TBB_flow_graph_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
28 // included in namespace tbb::flow::interfaceX (in flow_graph.h)
29 
30 template< typename T > class async_msg;
31 
32 namespace internal {
33 
34 template< typename T, typename = void >
35 struct async_helpers {
36  typedef async_msg<T> async_type;
37  typedef T filtered_type;
38 
39  static const bool is_async_type = false;
40 
41  static const void* to_void_ptr(const T& t) {
42  return static_cast<const void*>(&t);
43  }
44 
45  static void* to_void_ptr(T& t) {
46  return static_cast<void*>(&t);
47  }
48 
49  static const T& from_void_ptr(const void* p) {
50  return *static_cast<const T*>(p);
51  }
52 
53  static T& from_void_ptr(void* p) {
54  return *static_cast<T*>(p);
55  }
56 
57  static task* try_put_task_wrapper_impl( receiver<T>* const this_recv, const void *p, bool is_async ) {
58  if ( is_async ) {
59  // This (T) is NOT async and incoming 'A<X> t' IS async
60  // Get data from async_msg
62  task* const new_task = msg.my_storage->subscribe(*this_recv);
63  // finalize() must be called after subscribe() because set() can be called in finalize()
64  // and 'this_recv' client must be subscribed by this moment
65  msg.finalize();
66  return new_task;
67  } else {
68  // Incoming 't' is NOT async
69  return this_recv->try_put_task( from_void_ptr(p) );
70  }
71  }
72 };
73 
74 template< typename T >
75 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
76  typedef T async_type;
77  typedef typename T::async_msg_data_type filtered_type;
78 
79  static const bool is_async_type = true;
80 
81  // Receiver-classes use const interfaces
82  static const void* to_void_ptr(const T& t) {
83  return static_cast<const void*>( &static_cast<const async_msg<filtered_type>&>(t) );
84  }
85 
86  static void* to_void_ptr(T& t) {
87  return static_cast<void*>( &static_cast<async_msg<filtered_type>&>(t) );
88  }
89 
90  // Sender-classes use non-const interfaces
91  static const T& from_void_ptr(const void* p) {
92  return *static_cast<const T*>( static_cast<const async_msg<filtered_type>*>(p) );
93  }
94 
95  static T& from_void_ptr(void* p) {
96  return *static_cast<T*>( static_cast<async_msg<filtered_type>*>(p) );
97  }
98 
99  // Used in receiver<T> class
100  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
101  if ( is_async ) {
102  // Both are async
103  return this_recv->try_put_task( from_void_ptr(p) );
104  } else {
105  // This (T) is async and incoming 'X t' is NOT async
106  // Create async_msg for X
107  const filtered_type& t = async_helpers<filtered_type>::from_void_ptr(p);
108  const T msg(t);
109  return this_recv->try_put_task(msg);
110  }
111  }
112 };
113 
114 template <typename T>
116 public:
117  typedef receiver<T> async_storage_client;
118 
119  async_storage() { my_data_ready.store<tbb::relaxed>(false); }
120 
121  template<typename C>
122  async_storage(C&& data) : my_data( std::forward<C>(data) ) {
123  using namespace tbb::internal;
124  __TBB_STATIC_ASSERT( (is_same_type<typename strip<C>::type, typename strip<T>::type>::value), "incoming type must be T" );
125 
126  my_data_ready.store<tbb::relaxed>(true);
127  }
128 
129  template<typename C>
130  bool set(C&& data) {
131  using namespace tbb::internal;
132  __TBB_STATIC_ASSERT( (is_same_type<typename strip<C>::type, typename strip<T>::type>::value), "incoming type must be T" );
133 
134  {
135  tbb::spin_mutex::scoped_lock locker(my_mutex);
136 
137  if (my_data_ready.load<tbb::relaxed>()) {
138  __TBB_ASSERT(false, "double set() call");
139  return false;
140  }
141 
142  my_data = std::forward<C>(data);
143  my_data_ready.store<tbb::release>(true);
144  }
145 
146  // Thread sync is on my_data_ready flag
147  for (typename subscriber_list_type::iterator it = my_clients.begin(); it != my_clients.end(); ++it) {
148  (*it)->try_put(my_data);
149  }
150 
151  return true;
152  }
153 
154  task* subscribe(async_storage_client& client) {
155  if (! my_data_ready.load<tbb::acquire>())
156  {
157  tbb::spin_mutex::scoped_lock locker(my_mutex);
158 
159  if (! my_data_ready.load<tbb::relaxed>()) {
160 #if TBB_USE_ASSERT
161  for (typename subscriber_list_type::iterator it = my_clients.begin(); it != my_clients.end(); ++it) {
162  __TBB_ASSERT(*it != &client, "unexpected double subscription");
163  }
164 #endif // TBB_USE_ASSERT
165 
166  // Subscribe
167  my_clients.push_back(&client);
168  return SUCCESSFULLY_ENQUEUED;
169  }
170  }
171 
172  __TBB_ASSERT(my_data_ready.load<tbb::relaxed>(), "data is NOT ready");
173  return client.try_put_task(my_data);
174  }
175 
176 private:
177  tbb::spin_mutex my_mutex;
178 
179  tbb::atomic<bool> my_data_ready;
180  T my_data;
181 
182  typedef std::vector<async_storage_client*> subscriber_list_type;
183  subscriber_list_type my_clients;
184 };
185 
186 } // namespace internal
187 
188 template <typename T>
189 class async_msg {
190  template< typename > friend class receiver;
191  template< typename, typename > friend struct internal::async_helpers;
192 public:
193  typedef T async_msg_data_type;
194 
195  async_msg() : my_storage(std::make_shared< internal::async_storage<T> >()) {}
196 
197  async_msg(const T& t) : my_storage(std::make_shared< internal::async_storage<T> >(t)) {}
198 
199  async_msg(T&& t) : my_storage(std::make_shared< internal::async_storage<T> >( std::move(t) )) {}
200 
201  virtual ~async_msg() {}
202 
203  void set(const T& t) {
204  my_storage->set(t);
205  }
206 
207  void set(T&& t) {
208  my_storage->set( std::move(t) );
209  }
210 
211 protected:
212  // Can be overridden in derived class to inform that
213  // async calculation chain is over
214  virtual void finalize() const {}
215 
216 private:
217  typedef std::shared_ptr< internal::async_storage<T> > async_storage_ptr;
218  async_storage_ptr my_storage;
219 };
220 
221 #endif // __TBB__flow_graph_async_msg_impl_H
Detects whether two given types are the same.
Definition: _template_helpers.h:56
Definition: atomic.h:535
Definition: _flow_graph_async_msg_impl.h:115
Definition: _tbb_windef.h:37
Acquire.
Definition: atomic.h:47
Definition: _flow_graph_async_msg_impl.h:35
No ordering.
Definition: atomic.h:51
Definition: _flow_graph_async_msg_impl.h:30
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
Definition: _flow_graph_async_msg_impl.h:32
A lock that occupies a single byte.
Definition: spin_mutex.h:40
Release.
Definition: atomic.h:49