BRE12
flow_graph_opencl_node.h
1 /*
2  Copyright 2005-2016 Intel Corporation. All Rights Reserved.
3 
4  This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5  you can redistribute it and/or modify it under the terms of the GNU General Public License
6  version 2 as published by the Free Software Foundation. Threading Building Blocks is
7  distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9  See the GNU General Public License for more details. You should have received a copy of
10  the GNU General Public License along with Threading Building Blocks; if not, write to the
11  Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12 
13  As a special exception, you may use this file as part of a free software library without
14  restriction. Specifically, if other files instantiate templates or use macros or inline
15  functions from this file, or you compile this file and link it with other files to produce
16  an executable, this file does not by itself cause the resulting executable to be covered
17  by the GNU General Public License. This exception does not however invalidate any other
18  reasons why the executable file might be covered by the GNU General Public License.
19 */
20 
21 #ifndef __TBB_flow_graph_opencl_node_H
22 #define __TBB_flow_graph_opencl_node_H
23 
24 #include "tbb/tbb_config.h"
25 #if __TBB_PREVIEW_OPENCL_NODE
26 
27 #include "flow_graph.h"
28 
29 #include <vector>
30 #include <string>
31 #include <algorithm>
32 #include <iostream>
33 #include <fstream>
34 #include <map>
35 #include <array>
36 #include <mutex>
37 #include <unordered_map>
38 
39 #ifdef __APPLE__
40 #include <OpenCL/opencl.h>
41 #else
42 #include <CL/cl.h>
43 #endif
44 
45 namespace tbb {
46 namespace flow {
47 
48 namespace interface8 {
49 
50 class opencl_foundation;
51 class opencl_device_list;
52 
53 template <typename Factory>
54 class opencl_buffer_impl;
55 
56 class default_opencl_factory;
57 
58 class opencl_graph : public graph {
59 public:
61  opencl_graph() : my_opencl_foundation( NULL ) {}
63  explicit opencl_graph( task_group_context& context ) : graph( context ), my_opencl_foundation( NULL ) {}
65  ~opencl_graph();
67  const opencl_device_list& available_devices();
68  default_opencl_factory& opencl_factory();
69 protected:
70  opencl_foundation *my_opencl_foundation;
71  opencl_foundation &get_opencl_foundation();
72 
73  template <typename T, typename Factory>
74  friend class opencl_buffer;
75  template <cl_channel_order channel_order, cl_channel_type channel_type, typename Factory>
76  friend class opencl_image2d;
77  template<typename... Args>
78  friend class opencl_node;
79  template <typename DeviceFilter>
80  friend class opencl_factory;
81 };
82 
83 template <typename T, typename Factory>
84 class dependency_msg;
85 
86 template< typename T, typename Factory >
87 class proxy_dependency_receiver;
88 
89 template< typename T, typename Factory >
90 class receiver<dependency_msg<T, Factory>> {
91 public:
93  typedef sender<dependency_msg<T, Factory>> predecessor_type;
94  typedef proxy_dependency_receiver<T, Factory> proxy;
95 
96  receiver() : my_ordinary_receiver( *this ) {}
97 
99  bool try_put( const T& t ) {
100  return my_ordinary_receiver.try_put(t);
101  }
102 
104  virtual task *try_put_task( const dependency_msg<T, Factory>& ) = 0;
105 
107  virtual bool register_predecessor( predecessor_type & ) { return false; }
108 
110  virtual bool remove_predecessor( predecessor_type & ) { return false; }
111 
112 protected:
114  virtual void reset_receiver( reset_flags f = rf_reset_protocol ) = 0;
115  virtual bool is_continue_receiver() { return false; }
116 private:
117  class ordinary_receiver : public receiver < T >, tbb::internal::no_assign {
119  typedef sender<T> predecessor_type;
120  typedef sender<dependency_msg<T, Factory>> dependency_predecessor_type;
121  public:
122  ordinary_receiver(receiver<dependency_msg<T, Factory>>& owner) : my_owner(owner) {}
123 
125  /* override */ task *try_put_task( const T& t ) {
126  return my_owner.try_put_task( dependency_msg<T, Factory>( t ) );
127  }
128 
130  /* override */ bool register_predecessor( predecessor_type &p ) {
131  tbb::spin_mutex::scoped_lock lock( my_predecessor_map_mutex );
132  typename predecessor_map_type::iterator it = my_predecessor_map.emplace( std::piecewise_construct_t(), std::make_tuple( &p ), std::tie( p ) );
133  if ( !my_owner.register_predecessor( it->second ) ) {
134  my_predecessor_map.erase( it );
135  return false;
136  }
137  return true;
138  }
139 
141  /* override */ bool remove_predecessor( predecessor_type &p ) {
142  tbb::spin_mutex::scoped_lock lock( my_predecessor_map_mutex );
143  typename predecessor_map_type::iterator it = my_predecessor_map.find( &p );
144  __TBB_ASSERT( it != my_predecessor_map.end(), "Failed to find the predecessor" );
145  if ( !my_owner.remove_predecessor( it->second ) )
146  return false;
147  my_predecessor_map.erase( it );
148  return true;
149  }
150 
151  protected:
153  /* override */ void reset_receiver( reset_flags f = rf_reset_protocol ) {
154  my_owner.reset_receiver( f );
155  };
156  /* override */ bool is_continue_receiver() {
157  return my_owner.is_continue_receiver();
158  }
159 
160  private:
161  receiver<dependency_msg<T, Factory>>& my_owner;
162 
163  typedef std::multimap<predecessor_type*, typename dependency_predecessor_type::proxy> predecessor_map_type;
164  predecessor_map_type my_predecessor_map;
165  tbb::spin_mutex my_predecessor_map_mutex;
166  };
167  ordinary_receiver my_ordinary_receiver;
168 public:
169  ordinary_receiver& ordinary_receiver() { return my_ordinary_receiver; }
170 };
171 
172 template< typename T, typename Factory >
173 class proxy_dependency_sender;
174 
175 template< typename T, typename Factory >
176 class proxy_dependency_receiver : public receiver < dependency_msg<T, Factory> >, tbb::internal::no_assign {
177 public:
178  typedef sender<dependency_msg<T, Factory>> predecessor_type;
179 
180  proxy_dependency_receiver( receiver<T>& r ) : my_r( r ) {}
181 
183  /* override */ task *try_put_task( const dependency_msg<T, Factory> &d ) {
184  receive_if_memory_object( d );
185  receiver<T> *r = &my_r;
186  d.register_callback( [r]( const T& t ) {
187  r->try_put( t );
188  } );
189  d.clear_event();
190  return SUCCESSFULLY_ENQUEUED;
191  }
192 
194  /* override */ bool register_predecessor( predecessor_type &s ) {
195  return my_r.register_predecessor( s.ordinary_sender() );
196  }
198  /* override */ bool remove_predecessor( predecessor_type &s ) {
199  return my_r.remove_predecessor( s.ordinary_sender() );
200  }
201 protected:
203  /* override */ void reset_receiver( reset_flags f = rf_reset_protocol ) {
204  my_r.reset_receiver( f );
205  };
206 
207  /* override */ bool is_continue_receiver() {
208  return my_r.is_continue_receiver();
209  }
210 private:
211  receiver<T> &my_r;
212 };
213 
214 template< typename T, typename Factory >
215 class sender<dependency_msg<T, Factory>> {
216 public:
217  sender() : my_ordinary_sender( *this ) {}
218 
220  typedef receiver<dependency_msg<T, Factory>> successor_type;
221  typedef proxy_dependency_sender<T, Factory> proxy;
222 
224  virtual bool register_successor( successor_type &r ) = 0;
225 
227  virtual bool remove_successor( successor_type &r ) = 0;
228 
230  virtual bool try_get( dependency_msg<T, Factory> & ) { return false; }
231 
233  virtual bool try_reserve( dependency_msg<T, Factory> & ) { return false; }
234 private:
235  class ordinary_sender : public sender < T >, tbb::internal::no_assign {
237  typedef receiver<T> successor_type;
238  typedef receiver<dependency_msg<T, Factory>> dependency_successor_type;
239  public:
240  ordinary_sender(sender<dependency_msg<T, Factory>>& owner) : my_owner(owner) {}
241 
243  /* override */ bool register_successor( successor_type &r ) {
244  tbb::spin_mutex::scoped_lock lock( my_successor_map_mutex );
245  typename successor_map_type::iterator it = my_successor_map.emplace( std::piecewise_construct_t(), std::make_tuple( &r ), std::tie( r ) );
246  if ( !my_owner.register_successor( it->second ) ) {
247  my_successor_map.erase( it );
248  return false;
249  }
250  return true;
251  }
252 
254  /* override */ bool remove_successor( successor_type &r ) {
255  tbb::spin_mutex::scoped_lock lock( my_successor_map_mutex );
256  typename successor_map_type::iterator it = my_successor_map.find( &r );
257  __TBB_ASSERT( it != my_successor_map.end(), "The predecessor has already been registered" );
258  if ( !my_owner.remove_successor( it->second ) )
259  return false;
260  my_successor_map.erase( it );
261  return true;
262  }
263 
265  /* override */ bool try_get( T &t ) {
266  dependency_msg<T, Factory> d;
267  if ( my_owner.try_get( d ) ) {
268  t = d.data();
269  return true;
270  }
271  return false;
272  }
273 
274  /* override */ bool try_reserve( T &t ) {
275  dependency_msg<T, Factory> d;
276  if ( my_owner.try_reserve( d ) ) {
277  t = d.data();
278  return true;
279  }
280  return false;
281  }
282 
283  bool has_host_successors() {
284  tbb::spin_mutex::scoped_lock lock( my_successor_map_mutex );
285  return !my_successor_map.empty();
286  }
287  private:
288  sender<dependency_msg<T, Factory>>& my_owner;
289 
290  typedef std::multimap<successor_type*, typename dependency_successor_type::proxy> successor_map_type;
291  successor_map_type my_successor_map;
292  tbb::spin_mutex my_successor_map_mutex;
293  };
294  ordinary_sender my_ordinary_sender;
295 public:
296  ordinary_sender& ordinary_sender() { return my_ordinary_sender; }
297 
298  bool has_host_successors() {
299  return my_ordinary_sender.has_host_successors();
300  }
301 };
302 
303 template< typename T, typename Factory >
304 class proxy_dependency_sender : public sender < dependency_msg<T, Factory> >, tbb::internal::no_assign {
305 public:
306  typedef receiver<dependency_msg<T, Factory>> successor_type;
307 
308  proxy_dependency_sender( sender<T>& s ) : my_s( s ) {}
309 
311  /* override */ bool register_successor( successor_type &r ) {
312  return my_s.register_successor( r.ordinary_receiver() );
313  }
314 
316  /* override */ bool remove_successor( successor_type &r ) {
317  return my_s.remove_successor( r.ordinary_receiver() );
318  }
319 
321  /* override */ bool try_get( dependency_msg<T, Factory> &d ) {
322  return my_s.try_get( d.data() );
323  }
324 
326  /* override */ bool try_reserve( dependency_msg<T, Factory> &d ) {
327  return my_s.try_reserve( d.data() );
328  }
329 
331  /* override */ bool try_release() {
332  return my_s.try_release();
333  }
334 
336  /* override */ bool try_consume() {
337  return my_s.try_consume();
338  }
339 private:
340  sender<T> &my_s;
341 };
342 
343 template<typename T, typename Factory>
344 inline void make_edge( sender<T> &s, receiver<dependency_msg<T, Factory>> &r ) {
345  make_edge( s, r.ordinary_receiver() );
346 }
347 
348 template<typename T, typename Factory>
349 inline void make_edge( sender<dependency_msg<T, Factory>> &s, receiver<T> &r ) {
350  make_edge( s.ordinary_sender(), r );
351 }
352 
353 template<typename T, typename Factory>
354 inline void remove_edge( sender<T> &s, receiver<dependency_msg<T, Factory>> &r ) {
355  remove_edge( s, r.ordinary_receiver() );
356 }
357 
358 template<typename T, typename Factory>
359 inline void remove_edge( sender<dependency_msg<T, Factory>> &s, receiver<T> &r ) {
360  remove_edge( s.ordinary_sender(), r );
361 }
362 
363 inline void enforce_cl_retcode( cl_int err, std::string msg ) {
364  if ( err != CL_SUCCESS ) {
365  std::cerr << msg << std::endl;
366  throw msg;
367  }
368 }
369 
370 template <typename T>
371 T event_info( cl_event e, cl_event_info i ) {
372  T res;
373  enforce_cl_retcode( clGetEventInfo( e, i, sizeof( res ), &res, NULL ), "Failed to get OpenCL event information" );
374  return res;
375 }
376 
377 template <typename T>
378 T device_info( cl_device_id d, cl_device_info i ) {
379  T res;
380  enforce_cl_retcode( clGetDeviceInfo( d, i, sizeof( res ), &res, NULL ), "Failed to get OpenCL device information" );
381  return res;
382 }
383 template <>
384 std::string device_info<std::string>( cl_device_id d, cl_device_info i ) {
385  size_t required;
386  enforce_cl_retcode( clGetDeviceInfo( d, i, 0, NULL, &required ), "Failed to get OpenCL device information" );
387 
388  char *buff = (char*)alloca( required );
389  enforce_cl_retcode( clGetDeviceInfo( d, i, required, buff, NULL ), "Failed to get OpenCL device information" );
390 
391  return buff;
392 }
393 template <typename T>
394 T platform_info( cl_platform_id p, cl_platform_info i ) {
395  T res;
396  enforce_cl_retcode( clGetPlatformInfo( p, i, sizeof( res ), &res, NULL ), "Failed to get OpenCL platform information" );
397  return res;
398 }
399 template <>
400 std::string platform_info<std::string>( cl_platform_id p, cl_platform_info i ) {
401  size_t required;
402  enforce_cl_retcode( clGetPlatformInfo( p, i, 0, NULL, &required ), "Failed to get OpenCL platform information" );
403 
404  char *buff = (char*)alloca( required );
405  enforce_cl_retcode( clGetPlatformInfo( p, i, required, buff, NULL ), "Failed to get OpenCL platform information" );
406 
407  return buff;
408 }
409 
410 
411 class opencl_device {
412 public:
413  typedef size_t device_id_type;
414  enum : device_id_type {
415  unknown = device_id_type( -2 ),
416  host = device_id_type( -1 )
417  };
418 
419  opencl_device() : my_device_id( unknown ) {}
420 
421  opencl_device( cl_device_id cl_d_id, device_id_type device_id ) : my_device_id( device_id ), my_cl_device_id( cl_d_id ) {}
422 
423  std::string platform_profile() const {
424  return platform_info<std::string>( platform(), CL_PLATFORM_PROFILE );
425  }
426  std::string platform_version() const {
427  return platform_info<std::string>( platform(), CL_PLATFORM_VERSION );
428  }
429  std::string platform_name() const {
430  return platform_info<std::string>( platform(), CL_PLATFORM_NAME );
431  }
432  std::string platform_vendor() const {
433  return platform_info<std::string>( platform(), CL_PLATFORM_VENDOR );
434  }
435  std::string platform_extensions() const {
436  return platform_info<std::string>( platform(), CL_PLATFORM_EXTENSIONS );
437  }
438 
439  template <typename T>
440  void info( cl_device_info i, T &t ) const {
441  t = device_info<T>( my_cl_device_id, i );
442  }
443  std::string version() const {
444  // The version string format: OpenCL<space><major_version.minor_version><space><vendor-specific information>
445  return device_info<std::string>( my_cl_device_id, CL_DEVICE_VERSION );
446  }
447  int major_version() const {
448  int major;
449  std::sscanf( version().c_str(), "OpenCL %d", &major );
450  return major;
451  }
452  int minor_version() const {
453  int major, minor;
454  std::sscanf( version().c_str(), "OpenCL %d.%d", &major, &minor );
455  return minor;
456  }
457  bool out_of_order_exec_mode_on_host_present() const {
458 #if CL_VERSION_2_0
459  if ( major_version() >= 2 )
460  return (device_info<cl_command_queue_properties>( my_cl_device_id, CL_DEVICE_QUEUE_ON_HOST_PROPERTIES ) & CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE) != 0;
461  else
462 #endif /* CL_VERSION_2_0 */
463  return (device_info<cl_command_queue_properties>( my_cl_device_id, CL_DEVICE_QUEUE_PROPERTIES ) & CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE) != 0;
464  }
465  bool out_of_order_exec_mode_on_device_present() const {
466 #if CL_VERSION_2_0
467  if ( major_version() >= 2 )
468  return (device_info<cl_command_queue_properties>( my_cl_device_id, CL_DEVICE_QUEUE_ON_DEVICE_PROPERTIES ) & CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE) != 0;
469  else
470 #endif /* CL_VERSION_2_0 */
471  return false;
472  }
473  std::array<size_t, 3> max_work_item_sizes() const {
474  return device_info<std::array<size_t, 3>>( my_cl_device_id, CL_DEVICE_MAX_WORK_ITEM_SIZES );
475  }
476  size_t max_work_group_size() const {
477  return device_info<size_t>( my_cl_device_id, CL_DEVICE_MAX_WORK_GROUP_SIZE );
478  }
479  bool built_in_kernel_available( const std::string& k ) const {
480  const std::string semi = ";";
481  // Added semicolumns to force an exact match (to avoid a partial match, e.g. "add" is partly matched with "madd").
482  return (semi + built_in_kernels() + semi).find( semi + k + semi ) != std::string::npos;
483  }
484  std::string built_in_kernels() const {
485  return device_info<std::string>( my_cl_device_id, CL_DEVICE_BUILT_IN_KERNELS );
486  }
487  std::string name() const {
488  return device_info<std::string>( my_cl_device_id, CL_DEVICE_NAME );
489  }
490  cl_bool available() const {
491  return device_info<cl_bool>( my_cl_device_id, CL_DEVICE_AVAILABLE );
492  }
493  cl_bool compiler_available() const {
494  return device_info<cl_bool>( my_cl_device_id, CL_DEVICE_COMPILER_AVAILABLE );
495  }
496  cl_bool linker_available() const {
497  return device_info<cl_bool>( my_cl_device_id, CL_DEVICE_LINKER_AVAILABLE );
498  }
499  bool extension_available( const std::string &ext ) const {
500  const std::string space = " ";
501  // Added space to force an exact match (to avoid a partial match, e.g. "ext" is partly matched with "ext2").
502  return (space + extensions() + space).find( space + ext + space ) != std::string::npos;
503  }
504  std::string extensions() const {
505  return device_info<std::string>( my_cl_device_id, CL_DEVICE_EXTENSIONS );
506  }
507 
508  cl_device_type type() const {
509  return device_info<cl_device_type>( my_cl_device_id, CL_DEVICE_TYPE );
510  }
511 
512  std::string vendor() const {
513  return device_info<std::string>( my_cl_device_id, CL_DEVICE_VENDOR );
514  }
515 
516  cl_uint address_bits() const {
517  return device_info<cl_uint>( my_cl_device_id, CL_DEVICE_ADDRESS_BITS );
518  }
519 
520  cl_device_id device_id() const {
521  return my_cl_device_id;
522  }
523 
524  cl_command_queue command_queue() const {
525  return my_cl_command_queue;
526  }
527 
528  void set_command_queue( cl_command_queue cmd_queue ) {
529  my_cl_command_queue = cmd_queue;
530  }
531 
532 private:
533  opencl_device( cl_device_id d_id ) : my_device_id( unknown ), my_cl_device_id( d_id ) {}
534 
535  cl_platform_id platform() const {
536  return device_info<cl_platform_id>( my_cl_device_id, CL_DEVICE_PLATFORM );
537  }
538 
539  device_id_type my_device_id;
540  cl_device_id my_cl_device_id;
541  cl_command_queue my_cl_command_queue;
542 
543  friend bool operator==(opencl_device d1, opencl_device d2) { return d1.my_cl_device_id == d2.my_cl_device_id; }
544 
545  template <typename DeviceFilter>
546  friend class opencl_factory;
547  template <typename Factory>
548  friend class opencl_memory;
549  template <typename Factory>
550  friend class opencl_program;
551  friend class opencl_foundation;
552 
553 #if TBB_USE_ASSERT
554  template <typename T, typename Factory>
555  friend class opencl_buffer;
556 #endif
557 };
558 
559 class opencl_device_list {
560  typedef std::vector<opencl_device> container_type;
561 public:
562  typedef container_type::iterator iterator;
563  typedef container_type::const_iterator const_iterator;
564  typedef container_type::size_type size_type;
565 
566  opencl_device_list() {}
567  opencl_device_list( std::initializer_list<opencl_device> il ) : my_container( il ) {}
568 
569  void add( opencl_device d ) { my_container.push_back( d ); }
570  size_type size() const { return my_container.size(); }
571  iterator begin() { return my_container.begin(); }
572  iterator end() { return my_container.end(); }
573  const_iterator begin() const { return my_container.begin(); }
574  const_iterator end() const { return my_container.end(); }
575  const_iterator cbegin() const { return my_container.cbegin(); }
576  const_iterator cend() const { return my_container.cend(); }
577 private:
578  container_type my_container;
579 };
580 
581 class callback_base : tbb::internal::no_copy {
582 public:
583  virtual void call() const = 0;
584  virtual ~callback_base() {}
585 };
586 
587 template <typename Callback, typename T>
588 class callback : public callback_base {
589  graph &my_graph;
590  Callback my_callback;
591  T my_data;
592 public:
593  callback( graph &g, Callback c, const T& t ) : my_graph( g ), my_callback( c ), my_data( t ) {
594  // Extend the graph lifetime until the callback completion.
595  my_graph.increment_wait_count();
596  }
597  ~callback() {
598  // Release the reference to the graph.
599  my_graph.decrement_wait_count();
600  }
601  /* override */ void call() const {
602  my_callback( my_data );
603  }
604 };
605 
606 template <typename T, typename Factory = default_opencl_factory>
607 class dependency_msg {
608 public:
609  typedef T value_type;
610 
611  dependency_msg() = default;
612  explicit dependency_msg( const T& data ) : my_data( data ) {}
613  dependency_msg( opencl_graph &g, const T& data ) : my_data( data ), my_graph( &g ) {}
614  dependency_msg( const T& data, cl_event event ) : my_data( data ), my_event( event ), my_is_event( true ) {
615  enforce_cl_retcode( clRetainEvent( my_event ), "Failed to retain an event" );
616  }
617  T& data( bool wait = true ) {
618  if ( my_is_event && wait ) {
619  enforce_cl_retcode( clWaitForEvents( 1, &my_event ), "Failed to wait for an event" );
620  enforce_cl_retcode( clReleaseEvent( my_event ), "Failed to release an event" );
621  my_is_event = false;
622  }
623  return my_data;
624  }
625 
626  const T& data( bool wait = true ) const {
627  if ( my_is_event && wait ) {
628  enforce_cl_retcode( clWaitForEvents( 1, &my_event ), "Failed to wait for an event" );
629  enforce_cl_retcode( clReleaseEvent( my_event ), "Failed to release an event" );
630  my_is_event = false;
631  }
632  return my_data;
633  }
634 
635  dependency_msg( const dependency_msg &dmsg ) : my_data( dmsg.my_data ), my_event( dmsg.my_event ), my_is_event( dmsg.my_is_event ), my_graph( dmsg.my_graph ) {
636  if ( my_is_event )
637  enforce_cl_retcode( clRetainEvent( my_event ), "Failed to retain an event" );
638  }
639 
640  dependency_msg( dependency_msg &&dmsg ) : my_data( std::move(dmsg.my_data) ), my_event( dmsg.my_event ), my_is_event( dmsg.my_is_event ), my_graph( dmsg.my_graph ) {
641  dmsg.my_is_event = false;
642  }
643 
644  dependency_msg& operator=(const dependency_msg &dmsg) {
645  my_data = dmsg.my_data;
646  my_event = dmsg.my_event;
647  my_is_event = dmsg.my_is_event;
648  my_graph = dmsg.my_graph;
649  if ( my_is_event )
650  enforce_cl_retcode( clRetainEvent( my_event ), "Failed to retain an event" );
651  return *this;
652  }
653 
654  ~dependency_msg() {
655  if ( my_is_event )
656  enforce_cl_retcode( clReleaseEvent( my_event ), "Failed to release an event" );
657  }
658 
659  cl_event const * get_event() const { return my_is_event ? &my_event : NULL; }
660  void set_event( cl_event e ) const {
661  if ( my_is_event ) {
662  cl_command_queue cq = event_info<cl_command_queue>( my_event, CL_EVENT_COMMAND_QUEUE );
663  if ( cq != event_info<cl_command_queue>( e, CL_EVENT_COMMAND_QUEUE ) )
664  enforce_cl_retcode( clFlush( cq ), "Failed to flush an OpenCL command queue" );
665  enforce_cl_retcode( clReleaseEvent( my_event ), "Failed to release an event" );
666  }
667  my_is_event = true;
668  my_event = e;
669  clRetainEvent( my_event );
670  }
671 
672  void set_graph( graph &g ) {
673  my_graph = &g;
674  }
675 
676  void clear_event() const {
677  if ( my_is_event ) {
678  enforce_cl_retcode( clFlush( event_info<cl_command_queue>( my_event, CL_EVENT_COMMAND_QUEUE ) ), "Failed to flush an OpenCL command queue" );
679  enforce_cl_retcode( clReleaseEvent( my_event ), "Failed to release an event" );
680  }
681  my_is_event = false;
682  }
683 
684  template <typename Callback>
685  void register_callback( Callback c ) const {
686  __TBB_ASSERT( my_is_event, "The OpenCL event is not set" );
687  __TBB_ASSERT( my_graph, "The graph is not set" );
688  enforce_cl_retcode( clSetEventCallback( my_event, CL_COMPLETE, register_callback_func, new callback<Callback, T>( *my_graph, c, my_data ) ), "Failed to set an OpenCL callback" );
689  }
690 
691  operator T&() { return data(); }
692  operator const T&() const { return data(); }
693 
694 private:
695  static void CL_CALLBACK register_callback_func( cl_event, cl_int event_command_exec_status, void *data ) {
696  tbb::internal::suppress_unused_warning( event_command_exec_status );
697  __TBB_ASSERT( event_command_exec_status == CL_COMPLETE, NULL );
698  __TBB_ASSERT( data, NULL );
699  callback_base *c = static_cast<callback_base*>(data);
700  c->call();
701  delete c;
702  }
703 
704  T my_data;
705  mutable cl_event my_event;
706  mutable bool my_is_event = false;
707  graph *my_graph = NULL;
708 };
709 
710 template <typename K, typename T, typename Factory>
711 K key_from_message( const dependency_msg<T, Factory> &dmsg ) {
712  using tbb::flow::key_from_message;
713  const T &t = dmsg.data( false );
714  __TBB_STATIC_ASSERT( true, "" );
715  return key_from_message<K, T>( t );
716 }
717 
718 template <typename Factory>
719 class opencl_memory {
720 public:
721  opencl_memory() {}
722  opencl_memory( Factory &f ) : my_host_ptr( NULL ), my_factory( &f ), my_sending_event_present( false ) {
723  my_curr_device_id = my_factory->devices().begin()->my_device_id;
724  }
725 
726  ~opencl_memory() {
727  if ( my_sending_event_present ) enforce_cl_retcode( clReleaseEvent( my_sending_event ), "Failed to release an event for the OpenCL buffer" );
728  enforce_cl_retcode( clReleaseMemObject( my_cl_mem ), "Failed to release an memory object" );
729  }
730 
731  cl_mem get_cl_mem() const {
732  return my_cl_mem;
733  }
734 
735  void* get_host_ptr() {
736  if ( !my_host_ptr ) {
737  dependency_msg<void*, Factory> d = receive( NULL );
738  d.data();
739  __TBB_ASSERT( d.data() == my_host_ptr, NULL );
740  }
741  return my_host_ptr;
742  }
743 
744  Factory *factory() const { return my_factory; }
745 
746  dependency_msg<void*, Factory> send( opencl_device d, const cl_event *e );
747  dependency_msg<void*, Factory> receive( const cl_event *e );
748  virtual void map_memory( opencl_device, dependency_msg<void*, Factory> & ) = 0;
749 protected:
750  cl_mem my_cl_mem;
752  void* my_host_ptr;
753  Factory *my_factory;
754 
755  tbb::spin_mutex my_sending_lock;
756  bool my_sending_event_present;
757  cl_event my_sending_event;
758 };
759 
760 template <typename Factory>
761 class opencl_buffer_impl : public opencl_memory<Factory> {
762  size_t my_size;
763 public:
764  opencl_buffer_impl( size_t size, Factory& f ) : opencl_memory<Factory>( f ), my_size( size ) {
765  cl_int err;
766  this->my_cl_mem = clCreateBuffer( this->my_factory->context(), CL_MEM_ALLOC_HOST_PTR, size, NULL, &err );
767  enforce_cl_retcode( err, "Failed to create an OpenCL buffer" );
768  }
769 
770  // The constructor for subbuffers.
771  opencl_buffer_impl( cl_mem m, size_t index, size_t size, Factory& f ) : opencl_memory<Factory>( f ), my_size( size ) {
772  cl_int err;
773  cl_buffer_region region = { index, size };
774  this->my_cl_mem = clCreateSubBuffer( m, 0, CL_BUFFER_CREATE_TYPE_REGION, &region, &err );
775  enforce_cl_retcode( err, "Failed to create an OpenCL subbuffer" );
776  }
777 
778  size_t size() const {
779  return my_size;
780  }
781 
782  /* override */ void map_memory( opencl_device device, dependency_msg<void*, Factory> &dmsg ) {
783  this->my_factory->enque_map_buffer( device, *this, dmsg );
784  }
785 
786 #if TBB_USE_ASSERT
787  template <typename, typename>
788  friend class opencl_buffer;
789 #endif
790 };
791 
792 enum access_type {
793  read_write,
794  write_only,
795  read_only
796 };
797 
798 template <typename T, typename Factory = default_opencl_factory>
799 class opencl_subbuffer;
800 
801 template <typename T, typename Factory = default_opencl_factory>
802 class opencl_buffer {
803 public:
804  typedef cl_mem native_object_type;
805  typedef opencl_buffer memory_object_type;
806  typedef Factory opencl_factory_type;
807 
808  template<access_type a> using iterator = T*;
809 
810  template <access_type a>
811  iterator<a> access() const {
812  T* ptr = (T*)my_impl->get_host_ptr();
813  __TBB_ASSERT( ptr, NULL );
814  return iterator<a>( ptr );
815  }
816 
817  T* data() const { return &access<read_write>()[0]; }
818 
819  template <access_type a = read_write>
820  iterator<a> begin() const { return access<a>(); }
821 
822  template <access_type a = read_write>
823  iterator<a> end() const { return access<a>()+my_impl->size()/sizeof(T); }
824 
825  size_t size() const { return my_impl->size()/sizeof(T); }
826 
827  T& operator[] ( ptrdiff_t k ) { return begin()[k]; }
828 
829  opencl_buffer() {}
830  opencl_buffer( opencl_graph &g, size_t size );
831  opencl_buffer( Factory &f, size_t size ) : my_impl( std::make_shared<impl_type>( size*sizeof(T), f ) ) {}
832 
833  cl_mem native_object() const {
834  return my_impl->get_cl_mem();
835  }
836 
837  const opencl_buffer& memory_object() const {
838  return *this;
839  }
840 
841  void send( opencl_device device, dependency_msg<opencl_buffer, Factory> &dependency ) const {
842  __TBB_ASSERT( dependency.data( /*wait = */false ) == *this, NULL );
843  dependency_msg<void*, Factory> d = my_impl->send( device, dependency.get_event() );
844  const cl_event *e = d.get_event();
845  if ( e ) dependency.set_event( *e );
846  else dependency.clear_event();
847  }
848  void receive( const dependency_msg<opencl_buffer, Factory> &dependency ) const {
849  __TBB_ASSERT( dependency.data( /*wait = */false ) == *this, NULL );
850  dependency_msg<void*, Factory> d = my_impl->receive( dependency.get_event() );
851  const cl_event *e = d.get_event();
852  if ( e ) dependency.set_event( *e );
853  else dependency.clear_event();
854  }
855 
856  opencl_subbuffer<T, Factory> subbuffer( size_t index, size_t size ) const;
857 private:
858  // The constructor for subbuffers.
859  opencl_buffer( Factory &f, cl_mem m, size_t index, size_t size ) : my_impl( std::make_shared<impl_type>( m, index*sizeof(T), size*sizeof(T), f ) ) {}
860 
861  typedef opencl_buffer_impl<Factory> impl_type;
862 
863  std::shared_ptr<impl_type> my_impl;
864 
865  friend bool operator==(const opencl_buffer<T, Factory> &lhs, const opencl_buffer<T, Factory> &rhs) {
866  return lhs.my_impl == rhs.my_impl;
867  }
868 
869  template <typename>
870  friend class opencl_factory;
871  template <typename, typename>
872  friend class opencl_subbuffer;
873 };
874 
875 template <typename T, typename Factory>
876 class opencl_subbuffer : public opencl_buffer<T, Factory> {
877  opencl_buffer<T, Factory> my_owner;
878 public:
879  opencl_subbuffer() {}
880  opencl_subbuffer( const opencl_buffer<T, Factory> &owner, size_t index, size_t size ) :
881  opencl_buffer<T, Factory>( *owner.my_impl->factory(), owner.native_object(), index, size ), my_owner( owner ) {}
882 };
883 
884 template <typename T, typename Factory>
885 opencl_subbuffer<T, Factory> opencl_buffer<T, Factory>::subbuffer( size_t index, size_t size ) const {
886  return opencl_subbuffer<T, Factory>( *this, index, size );
887 }
888 
889 
890 template <typename DeviceFilter>
891 class opencl_factory {
892 public:
893  opencl_factory( opencl_graph &g ) : my_graph( g ) {}
894  ~opencl_factory() {
895  if ( my_devices.size() ) {
896  for ( opencl_device d : my_devices ) {
897  enforce_cl_retcode( clReleaseCommandQueue( d.my_cl_command_queue ), "Failed to release a command queue" );
898  }
899  enforce_cl_retcode( clReleaseContext( my_cl_context ), "Failed to release a context" );
900  }
901  }
902 
903  bool init( const opencl_device_list &device_list ) {
904  tbb::spin_mutex::scoped_lock lock( my_devices_mutex );
905  if ( !my_devices.size() ) {
906  my_devices = device_list;
907  return true;
908  }
909  return false;
910  }
911 
912 
913 private:
914  template <typename Factory>
915  void enque_map_buffer( opencl_device device, opencl_buffer_impl<Factory> &buffer, dependency_msg<void*, Factory>& dmsg ) {
916  cl_event const* e1 = dmsg.get_event();
917  cl_event e2;
918  cl_int err;
919  void *ptr = clEnqueueMapBuffer( device.my_cl_command_queue, buffer.get_cl_mem(), false, CL_MAP_READ | CL_MAP_WRITE, 0, buffer.size(),
920  e1 == NULL ? 0 : 1, e1, &e2, &err );
921  enforce_cl_retcode( err, "Failed to map a buffer" );
922  dmsg.data( false ) = ptr;
923  dmsg.set_event( e2 );
924  enforce_cl_retcode( clReleaseEvent( e2 ), "Failed to release an event" );
925  }
926 
927 
928  template <typename Factory>
929  void enque_unmap_buffer( opencl_device device, opencl_memory<Factory> &memory, dependency_msg<void*, Factory>& dmsg ) {
930  cl_event const* e1 = dmsg.get_event();
931  cl_event e2;
932  enforce_cl_retcode(
933  clEnqueueUnmapMemObject( device.my_cl_command_queue, memory.get_cl_mem(), memory.get_host_ptr(), e1 == NULL ? 0 : 1, e1, &e2 ),
934  "Failed to unmap a buffer" );
935  dmsg.set_event( e2 );
936  enforce_cl_retcode( clReleaseEvent( e2 ), "Failed to release an event" );
937  }
938 
939  template <typename GlbNDRange, typename LclNDRange>
940  cl_event enqueue_kernel( opencl_device device, cl_kernel kernel,
941  GlbNDRange&& global_work_size, LclNDRange&& local_work_size, cl_uint num_events, cl_event* event_list ) {
942  auto g_it = global_work_size.begin();
943  auto l_it = local_work_size.begin();
944  __TBB_ASSERT( g_it != global_work_size.end() , "Empty global work size" );
945  __TBB_ASSERT( l_it != local_work_size.end() , "Empty local work size" );
946  std::array<size_t, 3> g_size, l_size, g_offset = { { 0, 0, 0 } };
947  cl_uint s;
948  for ( s = 0; s < 3 && g_it != global_work_size.end() && l_it != local_work_size.end(); ++s ) {
949  g_size[s] = *g_it++;
950  l_size[s] = *l_it++;
951  }
952  cl_event event;
953  enforce_cl_retcode(
954  clEnqueueNDRangeKernel( device.my_cl_command_queue, kernel, s,
955  g_offset.data(), g_size.data(), l_size[0] ? l_size.data() : NULL, num_events, num_events ? event_list : NULL, &event ),
956  "Failed to enqueue a kernel" );
957  return event;
958  }
959 
960  void flush( opencl_device device ) {
961  enforce_cl_retcode( clFlush( device.my_cl_command_queue ), "Failed to flush an OpenCL command queue" );
962  }
963 
964  const opencl_device_list& devices() {
965  std::call_once( my_once_flag, &opencl_factory::init_once, this );
966  return my_devices;
967  }
968 
969  bool is_same_context( opencl_device::device_id_type d1, opencl_device::device_id_type d2 ) {
970  __TBB_ASSERT( d1 != opencl_device::unknown && d2 != opencl_device::unknown, NULL );
971  // Currently, factory supports only one context so if the both devices are not host it means the are in the same context.
972  if ( d1 != opencl_device::host && d2 != opencl_device::host )
973  return true;
974  return d1 == d2;
975  }
976 
977  opencl_factory( const opencl_factory& );
978  opencl_factory& operator=(const opencl_factory&);
979 
980  cl_context context() {
981  std::call_once( my_once_flag, &opencl_factory::init_once, this );
982  return my_cl_context;
983  }
984 
985  void init_once();
986 
987  std::once_flag my_once_flag;
988  opencl_device_list my_devices;
989  cl_context my_cl_context;
990  opencl_graph &my_graph;
991 
992  tbb::spin_mutex my_devices_mutex;
993 
994  template <typename Factory>
995  friend class opencl_program;
996  template <typename Factory>
997  friend class opencl_buffer_impl;
998  template <typename Factory>
999  friend class opencl_memory;
1000  template <typename... Args>
1001  friend class opencl_node;
1002 };
1003 
1004 template <typename Factory>
1005 dependency_msg<void*, Factory> opencl_memory<Factory>::receive( const cl_event *e ) {
1006  dependency_msg<void*, Factory> d = e ? dependency_msg<void*, Factory>( my_host_ptr, *e ) : dependency_msg<void*, Factory>( my_host_ptr );
1007  // Concurrent receives are prohibited so we do not worry about synchronization.
1008  if ( my_curr_device_id.load<tbb::relaxed>() != opencl_device::host ) {
1009  map_memory( *my_factory->devices().begin(), d );
1010  my_curr_device_id.store<tbb::relaxed>( opencl_device::host );
1011  my_host_ptr = d.data( false );
1012  }
1013  // Release the sending event
1014  if ( my_sending_event_present ) {
1015  enforce_cl_retcode( clReleaseEvent( my_sending_event ), "Failed to release an event" );
1016  my_sending_event_present = false;
1017  }
1018  return d;
1019 }
1020 
1021 template <typename Factory>
1022 dependency_msg<void*, Factory> opencl_memory<Factory>::send( opencl_device device, const cl_event *e ) {
1023  opencl_device::device_id_type device_id = device.my_device_id;
1024  if ( !my_factory->is_same_context( my_curr_device_id.load<tbb::acquire>(), device_id ) ) {
1025  __TBB_ASSERT( !e, "The buffer has come from another opencl_node but it is not on a device" );
1026  {
1027  tbb::spin_mutex::scoped_lock lock( my_sending_lock );
1028  if ( !my_factory->is_same_context( my_curr_device_id.load<tbb::relaxed>(), device_id ) ) {
1029  __TBB_ASSERT( my_host_ptr, "The buffer has not been mapped" );
1030  dependency_msg<void*, Factory> d( my_host_ptr );
1031  my_factory->enque_unmap_buffer( device, *this, d );
1032  my_sending_event = *d.get_event();
1033  my_sending_event_present = true;
1034  enforce_cl_retcode( clRetainEvent( my_sending_event ), "Failed to retain an event" );
1035  my_host_ptr = NULL;
1036  my_curr_device_id.store<tbb::release>(device_id);
1037  }
1038  }
1039  __TBB_ASSERT( my_sending_event_present, NULL );
1040  }
1041 
1042  // !e means that buffer has come from the host
1043  if ( !e && my_sending_event_present ) e = &my_sending_event;
1044 
1045  __TBB_ASSERT( !my_host_ptr, "The buffer has not been unmapped" );
1046  return e ? dependency_msg<void*, Factory>( NULL, *e ) : dependency_msg<void*, Factory>( NULL );
1047 }
1048 
1049 struct default_opencl_factory_device_filter {
1050  opencl_device_list operator()( const opencl_device_list &devices ) {
1051  opencl_device_list dl;
1052  dl.add( *devices.begin() );
1053  return dl;
1054  }
1055 };
1056 
1057 class default_opencl_factory : public opencl_factory < default_opencl_factory_device_filter > {
1058 public:
1059  default_opencl_factory( opencl_graph &g ) : opencl_factory( g ) {}
1060 private:
1061  default_opencl_factory( const default_opencl_factory& );
1062  default_opencl_factory& operator=(const default_opencl_factory&);
1063 };
1064 
1065 class opencl_foundation : tbb::internal::no_assign {
1066  struct default_device_selector_type {
1067  opencl_device operator()( const opencl_device_list& devices ) {
1068  return *devices.begin();
1069  }
1070  };
1071 public:
1072  opencl_foundation( opencl_graph &g ) : my_default_opencl_factory( g ), my_default_device_selector() {
1073  cl_uint num_platforms;
1074  enforce_cl_retcode( clGetPlatformIDs( 0, NULL, &num_platforms ), "clGetPlatformIDs failed" );
1075 
1076  std::vector<cl_platform_id> platforms( num_platforms );
1077  enforce_cl_retcode( clGetPlatformIDs( num_platforms, platforms.data(), NULL ), "clGetPlatformIDs failed" );
1078 
1079  cl_uint num_all_devices = 0;
1080  for ( cl_platform_id p : platforms ) {
1081  cl_uint num_devices;
1082  enforce_cl_retcode( clGetDeviceIDs( p, CL_DEVICE_TYPE_ALL, 0, NULL, &num_devices ), "clGetDeviceIDs failed" );
1083  num_all_devices += num_devices;
1084  }
1085 
1086  std::vector<cl_device_id> devices( num_all_devices );
1087  std::vector<cl_device_id>::iterator it = devices.begin();
1088  for ( cl_platform_id p : platforms ) {
1089  cl_uint num_devices;
1090  enforce_cl_retcode( clGetDeviceIDs( p, CL_DEVICE_TYPE_ALL, (cl_uint)std::distance( it, devices.end() ), &*it, &num_devices ), "clGetDeviceIDs failed" );
1091  it += num_devices;
1092  }
1093 
1094  for ( cl_device_id d : devices ) my_devices.add( opencl_device( d ) );
1095  }
1096 
1097  default_opencl_factory &get_default_opencl_factory() {
1098  return my_default_opencl_factory;
1099  }
1100 
1101  const opencl_device_list &get_all_devices() {
1102  return my_devices;
1103  }
1104 
1105  default_device_selector_type get_default_device_selector() { return my_default_device_selector; }
1106 
1107 private:
1108  default_opencl_factory my_default_opencl_factory;
1109  opencl_device_list my_devices;
1110 
1111  const default_device_selector_type my_default_device_selector;
1112 };
1113 
1114 opencl_foundation &opencl_graph::get_opencl_foundation() {
1115  opencl_foundation* INITIALIZATION = (opencl_foundation*)1;
1116  if ( my_opencl_foundation <= INITIALIZATION ) {
1117  if ( tbb::internal::as_atomic( my_opencl_foundation ).compare_and_swap( INITIALIZATION, NULL ) == 0 ) {
1118  my_opencl_foundation = new opencl_foundation( *this );
1119  }
1120  else {
1121  tbb::internal::spin_wait_while_eq( my_opencl_foundation, INITIALIZATION );
1122  }
1123  }
1124 
1125  __TBB_ASSERT( my_opencl_foundation > INITIALIZATION, "opencl_foundation is not initialized");
1126  return *my_opencl_foundation;
1127 }
1128 
1129 opencl_graph::~opencl_graph() {
1130  if ( my_opencl_foundation )
1131  delete my_opencl_foundation;
1132 }
1133 
1134 template <typename DeviceFilter>
1135 void opencl_factory<DeviceFilter>::init_once() {
1136  {
1137  tbb::spin_mutex::scoped_lock lock( my_devices_mutex );
1138  if ( !my_devices.size() )
1139  my_devices = DeviceFilter()(my_graph.get_opencl_foundation().get_all_devices());
1140  }
1141 
1142  enforce_cl_retcode( my_devices.size() ? CL_SUCCESS : CL_INVALID_DEVICE, "No devices in the device list" );
1143  cl_platform_id platform_id = my_devices.begin()->platform();
1144  for ( opencl_device_list::iterator it = ++my_devices.begin(); it != my_devices.end(); ++it )
1145  enforce_cl_retcode( it->platform() == platform_id ? CL_SUCCESS : CL_INVALID_PLATFORM, "All devices should be in the same platform" );
1146 
1147  std::vector<cl_device_id> cl_device_ids;
1148  for ( opencl_device d : my_devices ) cl_device_ids.push_back( d.my_cl_device_id );
1149 
1150  cl_context_properties context_properties[3] = { CL_CONTEXT_PLATFORM, (cl_context_properties)platform_id, (cl_context_properties)NULL };
1151  cl_int err;
1152  cl_context ctx = clCreateContext( context_properties,
1153  (cl_uint)cl_device_ids.size(),
1154  cl_device_ids.data(),
1155  NULL, NULL, &err );
1156  enforce_cl_retcode( err, "Failed to create context" );
1157  my_cl_context = ctx;
1158 
1159  size_t device_counter = 0;
1160  for ( opencl_device &d : my_devices ) {
1161  d.my_device_id = device_counter++;
1162  cl_int err2;
1163  cl_command_queue cq;
1164 #if CL_VERSION_2_0
1165  if ( d.major_version() >= 2 ) {
1166  if ( d.out_of_order_exec_mode_on_host_present() ) {
1167  cl_queue_properties props[] = { CL_QUEUE_PROPERTIES, CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE, 0 };
1168  cq = clCreateCommandQueueWithProperties( ctx, d.my_cl_device_id, props, &err2 );
1169  } else {
1170  cl_queue_properties props[] = { 0 };
1171  cq = clCreateCommandQueueWithProperties( ctx, d.my_cl_device_id, props, &err2 );
1172  }
1173  } else
1174 #endif
1175  {
1176  cl_command_queue_properties props = d.out_of_order_exec_mode_on_host_present() ? CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE : 0;
1177  // Suppress "declared deprecated" warning for the next line.
1178 #if __TBB_GCC_WARNING_SUPPRESSION_PRESENT
1179 #pragma GCC diagnostic push
1180 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1181 #endif
1182 #if _MSC_VER || __INTEL_COMPILER
1183 #pragma warning( push )
1184 #if __INTEL_COMPILER
1185 #pragma warning (disable: 1478)
1186 #else
1187 #pragma warning (disable: 4996)
1188 #endif
1189 #endif
1190  cq = clCreateCommandQueue( ctx, d.my_cl_device_id, props, &err2 );
1191 #if _MSC_VER || __INTEL_COMPILER
1192 #pragma warning( pop )
1193 #endif
1194 #if __TBB_GCC_WARNING_SUPPRESSION_PRESENT
1195 #pragma GCC diagnostic pop
1196 #endif
1197  }
1198  enforce_cl_retcode( err2, "Failed to create command queue" );
1199  d.my_cl_command_queue = cq;
1200  }
1201 }
1202 
1203 const opencl_device_list &opencl_graph::available_devices() {
1204  return get_opencl_foundation().get_all_devices();
1205 }
1206 
1207 default_opencl_factory &opencl_graph::opencl_factory() {
1208  return get_opencl_foundation().get_default_opencl_factory();
1209 }
1210 
1211 template <typename T, typename Factory>
1212 opencl_buffer<T, Factory>::opencl_buffer( opencl_graph &g, size_t size ) : my_impl( std::make_shared<impl_type>( size*sizeof(T), g.get_opencl_foundation().get_default_opencl_factory() ) ) {}
1213 
1214 
1215 enum class opencl_program_type {
1216  SOURCE,
1217  PRECOMPILED,
1218  SPIR
1219 };
1220 
1221 template <typename Factory = default_opencl_factory>
1222 class opencl_program : tbb::internal::no_assign {
1223 public:
1224  opencl_program( opencl_program_type type, const std::string& program_name ) : my_type(type) , my_arg_str( program_name) {}
1225  opencl_program( const char* program_name ) : opencl_program( std::string( program_name ) ) {}
1226  opencl_program( const std::string& program_name ) : opencl_program( opencl_program_type::SOURCE, program_name ) {}
1227 
1228  opencl_program( const opencl_program &src ) : my_type( src.type ), my_arg_str( src.my_arg_str ), my_cl_program( src.my_cl_program ) {
1229  // Set my_do_once_flag to the called state.
1230  std::call_once( my_do_once_flag, [](){} );
1231  }
1232 private:
1233  opencl_program( cl_program program ) : my_cl_program( program ) {
1234  // Set my_do_once_flag to the called state.
1235  std::call_once( my_do_once_flag, [](){} );
1236  }
1237 
1238  cl_kernel get_kernel( const std::string& k, Factory &f ) const {
1239  std::call_once( my_do_once_flag, [this, &k, &f](){ this->init( f, k ); } );
1240  cl_int err;
1241  cl_kernel kernel = clCreateKernel( my_cl_program, k.c_str(), &err );
1242  enforce_cl_retcode( err, std::string( "Failed to create kernel: " ) + k );
1243  return kernel;
1244  }
1245 
1246  class file_reader {
1247  public:
1248  file_reader( const std::string& filepath ) {
1249  std::ifstream file_descriptor( filepath, std::ifstream::binary );
1250  if ( !file_descriptor.is_open() ) {
1251  std::string str = std::string( "Could not open file: " ) + filepath;
1252  std::cerr << str << std::endl;
1253  throw str;
1254  }
1255  file_descriptor.seekg( 0, file_descriptor.end );
1256  size_t length = size_t( file_descriptor.tellg() );
1257  file_descriptor.seekg( 0, file_descriptor.beg );
1258  my_content.resize( length );
1259  char* begin = &*my_content.begin();
1260  file_descriptor.read( begin, length );
1261  file_descriptor.close();
1262  }
1263  const char* content() { return &*my_content.cbegin(); }
1264  size_t length() { return my_content.length(); }
1265  private:
1266  std::string my_content;
1267  };
1268 
1269  class opencl_program_builder {
1270  public:
1271  typedef void (CL_CALLBACK *cl_callback_type)(cl_program, void*);
1272  opencl_program_builder( Factory& f, const std::string& name, cl_program program,
1273  cl_uint num_devices, cl_device_id* device_list,
1274  const char* options, cl_callback_type callback,
1275  void* user_data ) {
1276  cl_int err = clBuildProgram( program, num_devices, device_list, options,
1277  callback, user_data );
1278  if( err == CL_SUCCESS )
1279  return;
1280  std::string str = std::string( "Failed to build program: " ) + name;
1281  if ( err == CL_BUILD_PROGRAM_FAILURE ) {
1282  const opencl_device_list &devices = f.devices();
1283  for ( opencl_device d : devices ) {
1284  std::cerr << "Build log for device: " << d.name() << std::endl;
1285  size_t log_size;
1286  cl_int query_err = clGetProgramBuildInfo(
1287  program, d.my_cl_device_id, CL_PROGRAM_BUILD_LOG, 0, NULL,
1288  &log_size );
1289  enforce_cl_retcode( query_err, "Failed to get build log size" );
1290  if( log_size ) {
1291  std::vector<char> output;
1292  output.resize( log_size );
1293  query_err = clGetProgramBuildInfo(
1294  program, d.my_cl_device_id, CL_PROGRAM_BUILD_LOG,
1295  output.size(), output.data(), NULL );
1296  enforce_cl_retcode( query_err, "Failed to get build output" );
1297  std::cerr << output.data() << std::endl;
1298  } else {
1299  std::cerr << "No build log available" << std::endl;
1300  }
1301  }
1302  }
1303  enforce_cl_retcode( err, str );
1304  }
1305  };
1306 
1307  class opencl_device_filter {
1308  public:
1309  template<typename Filter>
1310  opencl_device_filter( cl_uint& num_devices, cl_device_id* device_list,
1311  Filter filter, const char* message ) {
1312  for ( cl_uint i = 0; i < num_devices; ++i )
1313  if ( filter(device_list[i]) ) {
1314  device_list[i--] = device_list[--num_devices];
1315  }
1316  if ( !num_devices )
1317  enforce_cl_retcode( CL_DEVICE_NOT_AVAILABLE, message );
1318  }
1319  };
1320 
1321  void init( Factory &f, const std::string& ) const {
1322  cl_uint num_devices;
1323  enforce_cl_retcode( clGetContextInfo( f.context(), CL_CONTEXT_NUM_DEVICES, sizeof( num_devices ), &num_devices, NULL ),
1324  "Failed to get OpenCL context info" );
1325  if ( !num_devices )
1326  enforce_cl_retcode( CL_DEVICE_NOT_FOUND, "No supported devices found" );
1327  cl_device_id *device_list = (cl_device_id *)alloca( num_devices*sizeof( cl_device_id ) );
1328  enforce_cl_retcode( clGetContextInfo( f.context(), CL_CONTEXT_DEVICES, num_devices*sizeof( cl_device_id ), device_list, NULL ),
1329  "Failed to get OpenCL context info" );
1330  const char *options = NULL;
1331  switch ( my_type ) {
1332  case opencl_program_type::SOURCE: {
1333  file_reader fr( my_arg_str );
1334  const char *s[] = { fr.content() };
1335  const size_t l[] = { fr.length() };
1336  cl_int err;
1337  my_cl_program = clCreateProgramWithSource( f.context(), 1, s, l, &err );
1338  enforce_cl_retcode( err, std::string( "Failed to create program: " ) + my_arg_str );
1339  opencl_device_filter(
1340  num_devices, device_list,
1341  []( const opencl_device& d ) -> bool {
1342  return !d.compiler_available() || !d.linker_available();
1343  }, "No one device supports building program from sources" );
1344  opencl_program_builder(
1345  f, my_arg_str, my_cl_program, num_devices, device_list,
1346  options, /*callback*/ NULL, /*user data*/NULL );
1347  break;
1348  }
1349  case opencl_program_type::SPIR:
1350  options = "-x spir";
1351  case opencl_program_type::PRECOMPILED: {
1352  file_reader fr( my_arg_str );
1353  std::vector<const unsigned char*> s(
1354  num_devices, reinterpret_cast<const unsigned char*>(fr.content()) );
1355  std::vector<size_t> l( num_devices, fr.length() );
1356  std::vector<cl_int> bin_statuses( num_devices, -1 );
1357  cl_int err;
1358  my_cl_program = clCreateProgramWithBinary( f.context(), num_devices,
1359  device_list, l.data(), s.data(),
1360  bin_statuses.data(), &err );
1361  if( err != CL_SUCCESS ) {
1362  std::string statuses_str;
1363  for( cl_int st : bin_statuses )
1364  statuses_str += std::to_string( st );
1365  enforce_cl_retcode( err, std::string( "Failed to create program, error " + std::to_string( err ) + " : " ) + my_arg_str +
1366  std::string( ", binary_statuses = " ) + statuses_str );
1367  }
1368  opencl_program_builder(
1369  f, my_arg_str, my_cl_program, num_devices, device_list,
1370  options, /*callback*/ NULL, /*user data*/NULL );
1371  break;
1372  }
1373  default:
1374  __TBB_ASSERT( false, "Unsupported program type" );
1375  }
1376  }
1377 
1378  opencl_program_type my_type;
1379  std::string my_arg_str;
1380  mutable cl_program my_cl_program;
1381  mutable std::once_flag my_do_once_flag;
1382 
1383  template<typename... Args>
1384  friend class opencl_node;
1385 };
1386 
1387 template <int N1,int N2>
1388 struct port_ref_impl {
1389  // "+1" since the port_ref range is a closed interval (includes its endpoints).
1390  static const int size = N2-N1+1;
1391 
1392 };
1393 
1394 // The purpose of the port_ref_impl is the pretty syntax: the deduction of a compile-time constant is processed from the return type.
1395 // So it is possible to use this helper without parentheses, e.g. "port_ref<0>".
1396 template <int N1, int N2 = N1>
1397 port_ref_impl<N1,N2> port_ref() {
1398  return port_ref_impl<N1,N2>();
1399 };
1400 
1401 template <typename T>
1402 struct num_arguments {
1403  static const int value = 1;
1404 };
1405 
1406 template <int N1, int N2>
1407 struct num_arguments<port_ref_impl<N1,N2>(*)()> {
1408  static const int value = port_ref_impl<N1,N2>::size;
1409 };
1410 
1411 template <int N1, int N2>
1412 struct num_arguments<port_ref_impl<N1,N2>> {
1413  static const int value = port_ref_impl<N1,N2>::size;
1414 };
1415 
1416 template<typename... Args>
1417 class opencl_node;
1418 
1419 template <typename... Args>
1420 void ignore_return_values( Args&&... ) {}
1421 
1422 template <typename T>
1423 T or_return_values( T&& t ) { return t; }
1424 template <typename T, typename... Rest>
1425 T or_return_values( T&& t, Rest&&... rest ) {
1426  return t | or_return_values( std::forward<Rest>(rest)... );
1427 }
1428 
1429 
1430 #define is_typedef(type) \
1431  template <typename T> \
1432  struct is_##type { \
1433  template <typename C> \
1434  static std::true_type check( typename C::type* ); \
1435  template <typename C> \
1436  static std::false_type check( ... ); \
1437  \
1438  static const bool value = decltype(check<T>(0))::value; \
1439  }
1440 
1441 is_typedef( native_object_type );
1442 is_typedef( memory_object_type );
1443 
1444 template <typename T>
1445 typename std::enable_if<is_native_object_type<T>::value, typename T::native_object_type>::type get_native_object( const T &t ) {
1446  return t.native_object();
1447 }
1448 
1449 template <typename T>
1450 typename std::enable_if<!is_native_object_type<T>::value, T>::type get_native_object( T t ) {
1451  return t;
1452 }
1453 
1454 // send_if_memory_object checks if the T type has memory_object_type and call the send method for the object.
1455 template <typename T, typename Factory>
1456 typename std::enable_if<is_memory_object_type<T>::value>::type send_if_memory_object( opencl_device device, dependency_msg<T, Factory> &dmsg ) {
1457  const T &t = dmsg.data( false );
1458  typedef typename T::memory_object_type mem_obj_t;
1459  mem_obj_t mem_obj = t.memory_object();
1460  dependency_msg<mem_obj_t, Factory> d( mem_obj );
1461  if ( dmsg.get_event() ) d.set_event( *dmsg.get_event() );
1462  mem_obj.send( device, d );
1463  if ( d.get_event() ) dmsg.set_event( *d.get_event() );
1464 }
1465 
1466 template <typename T>
1467 typename std::enable_if<is_memory_object_type<T>::value>::type send_if_memory_object( opencl_device device, const T &t ) {
1468  typedef typename T::memory_object_type mem_obj_t;
1469  mem_obj_t mem_obj = t.memory_object();
1470  dependency_msg<mem_obj_t, typename mem_obj_t::opencl_factory_type> dmsg( mem_obj );
1471  mem_obj.send( device, dmsg );
1472 }
1473 
1474 template <typename T>
1475 typename std::enable_if<!is_memory_object_type<T>::value>::type send_if_memory_object( opencl_device, const T& ) {};
1476 
1477 // receive_if_memory_object checks if the T type has memory_object_type and call the receive method for the object.
1478 template <typename T, typename Factory>
1479 typename std::enable_if<is_memory_object_type<T>::value>::type receive_if_memory_object( const dependency_msg<T, Factory> &dmsg ) {
1480  const T &t = dmsg.data( false );
1481  typedef typename T::memory_object_type mem_obj_t;
1482  mem_obj_t mem_obj = t.memory_object();
1483  dependency_msg<mem_obj_t, Factory> d( mem_obj );
1484  if ( dmsg.get_event() ) d.set_event( *dmsg.get_event() );
1485  mem_obj.receive( d );
1486  if ( d.get_event() ) dmsg.set_event( *d.get_event() );
1487 }
1488 
1489 template <typename T>
1490 typename std::enable_if<!is_memory_object_type<T>::value>::type receive_if_memory_object( const T& ) {}
1491 
1492 template<typename JP>
1493 struct key_from_policy {
1494  typedef size_t type;
1495  typedef std::false_type is_key_matching;
1496 };
1497 
1498 template<typename Key>
1499 struct key_from_policy< key_matching<Key> > {
1500  typedef Key type;
1501  typedef std::true_type is_key_matching;
1502 };
1503 
1504 template<typename Key>
1505 struct key_from_policy< key_matching<Key&> > {
1506  typedef const Key &type;
1507  typedef std::true_type is_key_matching;
1508 };
1509 
1510 template<typename Key>
1511 class opencl_device_with_key {
1512  opencl_device my_device;
1513  typename std::decay<Key>::type my_key;
1514 public:
1515  // TODO: investigate why defaul ctor is required
1516  opencl_device_with_key() {}
1517  opencl_device_with_key( opencl_device d, Key k ) : my_device( d ), my_key( k ) {}
1518  Key key() const { return my_key; }
1519  opencl_device device() const { return my_device; }
1520 };
1521 
1522 /*
1523  /---------------------------------------- opencl_node ---------------------------------------\
1524  | |
1525  | /--------------\ /----------------------\ /-----------\ /----------------------\ |
1526  | | | | (device_with_key) O---O | | | |
1527  | | | | | | | | | |
1528  O---O indexer_node O---O device_selector_node O---O join_node O---O kernel_node O---O
1529  | | | | (multifunction_node) | | | | (multifunction_node) | |
1530  O---O | | O---O | | O---O
1531  | \--------------/ \----------------------/ \-----------/ \----------------------/ |
1532  | |
1533  \--------------------------------------------------------------------------------------------/
1534 */
1535 
1536 template<typename JP, typename Factory, typename... Ports>
1537 class opencl_node< tuple<Ports...>, JP, Factory > : public composite_node < tuple<dependency_msg<Ports, Factory>...>, tuple<dependency_msg<Ports, Factory>...> >{
1538  typedef tuple<dependency_msg<Ports, Factory>...> input_tuple;
1539  typedef input_tuple output_tuple;
1540  typedef typename key_from_policy<JP>::type key_type;
1541  typedef composite_node<input_tuple, output_tuple> base_type;
1542  static const size_t NUM_INPUTS = tuple_size<input_tuple>::value;
1543  static const size_t NUM_OUTPUTS = tuple_size<output_tuple>::value;
1544 
1545  typedef typename internal::make_sequence<NUM_INPUTS>::type input_sequence;
1546  typedef typename internal::make_sequence<NUM_OUTPUTS>::type output_sequence;
1547 
1548  typedef indexer_node<dependency_msg<Ports, Factory>...> indexer_node_type;
1549  typedef typename indexer_node_type::output_type indexer_node_output_type;
1550  typedef tuple<opencl_device_with_key<key_type>, dependency_msg<Ports, Factory>...> kernel_input_tuple;
1551  typedef multifunction_node<indexer_node_output_type, kernel_input_tuple> device_selector_node;
1552  typedef multifunction_node<kernel_input_tuple, output_tuple> kernel_multifunction_node;
1553 
1554  template <int... S>
1555  typename base_type::input_ports_type get_input_ports( internal::sequence<S...> ) {
1556  return std::tie( internal::input_port<S>( my_indexer_node )... );
1557  }
1558 
1559  template <int... S>
1560  typename base_type::output_ports_type get_output_ports( internal::sequence<S...> ) {
1561  return std::tie( internal::output_port<S>( my_kernel_node )... );
1562  }
1563 
1564  typename base_type::input_ports_type get_input_ports() {
1565  return get_input_ports( input_sequence() );
1566  }
1567 
1568  typename base_type::output_ports_type get_output_ports() {
1569  return get_output_ports( output_sequence() );
1570  }
1571 
1572  template <int N>
1573  int make_Nth_edge() {
1574  make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
1575  return 0;
1576  }
1577 
1578  template <int... S>
1579  void make_edges( internal::sequence<S...> ) {
1580  make_edge( my_indexer_node, my_device_selector_node );
1581  make_edge( my_device_selector_node, my_join_node );
1582  ignore_return_values( make_Nth_edge<S + 1>()... );
1583  make_edge( my_join_node, my_kernel_node );
1584  }
1585 
1586  void make_edges() {
1587  make_edges( input_sequence() );
1588  }
1589 
1590  class device_selector_base {
1591  public:
1592  virtual void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) = 0;
1593  virtual device_selector_base *clone( opencl_node &n ) const = 0;
1594  virtual ~device_selector_base() {}
1595  };
1596 
1597  template <typename UserFunctor>
1598  class device_selector : public device_selector_base, tbb::internal::no_assign {
1599  public:
1600  device_selector( UserFunctor uf, opencl_node &n, Factory &f ) : my_user_functor( uf ), my_node(n), my_factory( f ) {
1601  my_port_epoches.fill( 0 );
1602  }
1603 
1604  /* override */ void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
1605  send_and_put( my_port_epoches[v.tag()], v, op, input_sequence() );
1606  __TBB_ASSERT( (std::is_same<typename key_from_policy<JP>::is_key_matching, std::false_type>::value) || my_port_epoches[v.tag()] == 0, "Epoch is changed when key matching is requested" );
1607  }
1608 
1609  /* override */ device_selector_base *clone( opencl_node &n ) const {
1610  return new device_selector( my_user_functor, n, my_factory );
1611  }
1612  private:
1613  template <int... S>
1614  void send_and_put( size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op, internal::sequence<S...> ) {
1615  typedef void(device_selector<UserFunctor>::*send_and_put_fn)(size_t &, const indexer_node_output_type &, typename device_selector_node::output_ports_type &);
1616  static std::array <send_and_put_fn, NUM_INPUTS > dispatch = { { &device_selector<UserFunctor>::send_and_put_impl<S>... } };
1617  (this->*dispatch[v.tag()])( epoch, v, op );
1618  }
1619 
1620  template <typename T>
1621  key_type get_key( std::false_type, const T &, size_t &epoch ) {
1622  __TBB_STATIC_ASSERT( (std::is_same<key_type, size_t>::value), "" );
1623  return epoch++;
1624  }
1625 
1626  template <typename T>
1627  key_type get_key( std::true_type, const T &t, size_t &/*epoch*/ ) {
1628  using tbb::flow::key_from_message;
1629  return key_from_message<key_type>( t );
1630  }
1631 
1632  template <int N>
1633  void send_and_put_impl( size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
1634  typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
1635  elem_type e = internal::cast_to<elem_type>( v );
1636  opencl_device device = get_device( get_key( typename key_from_policy<JP>::is_key_matching(), e, epoch ), get<0>( op ) );
1637  send_if_memory_object( device, e );
1638  get<N + 1>( op ).try_put( e );
1639  }
1640 
1641  template< typename DevicePort >
1642  opencl_device get_device( key_type key, DevicePort& dp ) {
1643  typename std::unordered_map<typename std::decay<key_type>::type, epoch_desc>::iterator it = my_devices.find( key );
1644  if ( it == my_devices.end() ) {
1645  opencl_device d = my_user_functor( my_factory.devices() );
1646  std::tie( it, std::ignore ) = my_devices.insert( std::make_pair( key, d ) );
1647  bool res = dp.try_put( opencl_device_with_key<key_type>( d, key ) );
1648  __TBB_ASSERT_EX( res, NULL );
1649  my_node.notify_new_device( d );
1650  }
1651  epoch_desc &e = it->second;
1652  opencl_device d = e.my_device;
1653  if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
1654  return d;
1655  }
1656 
1657  struct epoch_desc {
1658  epoch_desc( opencl_device d ) : my_device( d ), my_request_number( 0 ) {}
1659  opencl_device my_device;
1660  size_t my_request_number;
1661  };
1662 
1663  std::unordered_map<typename std::decay<key_type>::type, epoch_desc> my_devices;
1664  std::array<size_t, NUM_INPUTS> my_port_epoches;
1665  UserFunctor my_user_functor;
1666  opencl_node &my_node;
1667  Factory &my_factory;
1668  };
1669 
1670  class device_selector_body {
1671  public:
1672  device_selector_body( device_selector_base *d ) : my_device_selector( d ) {}
1673 
1674  /* override */ void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
1675  (*my_device_selector)(v, op);
1676  }
1677  private:
1678  device_selector_base *my_device_selector;
1679  };
1680 
1681  // Forward declaration.
1682  class ndranges_mapper_base;
1683 
1684  class opencl_kernel_base : tbb::internal::no_copy {
1685  cl_kernel clone_kernel() const {
1686  size_t ret_size;
1687 
1688  std::vector<char> kernel_name;
1689  for ( size_t curr_size = 32;; curr_size <<= 1 ) {
1690  kernel_name.resize( curr_size <<= 1 );
1691  enforce_cl_retcode( clGetKernelInfo( my_kernel, CL_KERNEL_FUNCTION_NAME, curr_size, kernel_name.data(), &ret_size ), "Failed to get kernel info" );
1692  if ( ret_size < curr_size ) break;
1693  }
1694 
1695  cl_program program;
1696  enforce_cl_retcode( clGetKernelInfo( my_kernel, CL_KERNEL_PROGRAM, sizeof( program ), &program, &ret_size ), "Failed to get kernel info" );
1697  __TBB_ASSERT( ret_size == sizeof( program ), NULL );
1698 
1699  return opencl_program<Factory>( program ).get_kernel( kernel_name.data(), my_factory );
1700  }
1701 
1702  // ------------- NDRange getters ------------- //
1703  template <typename NDRange>
1704  NDRange ndrange_value( NDRange&& r, const kernel_input_tuple& ) const { return r; }
1705  template <int N>
1706  typename tuple_element<N+1,kernel_input_tuple>::type::value_type ndrange_value( port_ref_impl<N,N>, const kernel_input_tuple& ip ) const {
1707  // "+1" since get<0>(ip) is opencl_device.
1708  return get<N+1>(ip).data(false);
1709  }
1710  template <int N1,int N2>
1711  void ndrange_value( port_ref_impl<N1,N2>, const kernel_input_tuple& ip ) const {
1712  __TBB_STATIC_ASSERT( N1==N2, "Do not use a port_ref range (e.g. port_ref<0,2>) as an argument for the set_ndranges routine" );
1713  }
1714  template <int N>
1715  typename tuple_element<N+1,kernel_input_tuple>::type::value_type ndrange_value( port_ref_impl<N,N>(*)(), const kernel_input_tuple& ip ) const {
1716  return ndrange_value(port_ref<N,N>(), ip);
1717  }
1718  template <int N1,int N2>
1719  void ndrange_value( port_ref_impl<N1,N2>(*)(), const kernel_input_tuple& ip ) const {
1720  return ndrange_value(port_ref<N1,N2>(), ip);
1721  }
1722  // ------------------------------------------- //
1723  public:
1724  typedef typename kernel_multifunction_node::output_ports_type output_ports_type;
1725 
1726  virtual void enqueue( const ndranges_mapper_base *ndranges_mapper, const kernel_input_tuple &ip, output_ports_type &op, graph &g ) = 0;
1727  virtual void send_memory_objects( opencl_device d ) = 0;
1728  virtual opencl_kernel_base *clone() const = 0;
1729  virtual ~opencl_kernel_base () {
1730  enforce_cl_retcode( clReleaseKernel( my_kernel ), "Failed to release a kernel" );
1731  }
1732 
1733  template <typename GlbNDRange, typename LclNDRange>
1734  cl_event enqueue( GlbNDRange&& glb_range, LclNDRange&& lcl_range, int num_events, std::array<cl_event, NUM_INPUTS> events, const kernel_input_tuple& ip ) {
1735  return my_factory.enqueue_kernel( get<0>( ip ).device(), my_kernel, ndrange_value( glb_range, ip ), ndrange_value( lcl_range, ip ), num_events, events.data() );
1736  }
1737  protected:
1738  opencl_kernel_base( const opencl_program<Factory>& p, const std::string& kernel_name, Factory &f )
1739  : my_kernel( p.get_kernel( kernel_name, f ) ), my_factory( f )
1740  {}
1741 
1742  opencl_kernel_base( const opencl_kernel_base &k )
1743  : my_kernel( k.clone_kernel() ), my_factory( k.my_factory )
1744  {}
1745 
1746  const cl_kernel my_kernel;
1747  Factory &my_factory;
1748  };
1749 
1750  // Container for ndrandes. It can contain either port references or real ndranges.
1751  class ndranges_mapper_base {
1752  public:
1753  virtual cl_event enqueue_kernel( opencl_kernel_base *k, const kernel_input_tuple& ip, int num_events, const std::array<cl_event, NUM_INPUTS> &events ) const = 0;
1754  virtual ndranges_mapper_base *clone() const = 0;
1755  virtual ~ndranges_mapper_base() {}
1756  };
1757 
1758  template <typename... Args>
1759  class opencl_kernel : public opencl_kernel_base {
1760  typedef typename opencl_kernel_base::output_ports_type output_ports_type;
1761  // --------- Kernel argument helpers --------- //
1762  template <int Place, typename T>
1763  void set_one_kernel_arg(const T& t) {
1764  auto p = get_native_object( t );
1765  enforce_cl_retcode( clSetKernelArg( this->my_kernel, Place, sizeof( p ), &p ), "Failed to set a kernel argument" );
1766  }
1767 
1768  template <int Place, int N>
1769  int set_one_arg_from_range( const kernel_input_tuple& ip ) {
1770  // "+1" since get<0>(ip) is opencl_device
1771  set_one_kernel_arg<Place>( get<N + 1>( ip ).data( false ) );
1772  return 0;
1773  }
1774 
1775  template <int Place, int Start, int... S>
1776  void set_args_range(const kernel_input_tuple& ip, internal::sequence<S...>) {
1777  ignore_return_values( set_one_arg_from_range<Place + S, Start + S>( ip )... );
1778  }
1779 
1780  template <int Place, int N1, int N2>
1781  void set_arg_impl( const kernel_input_tuple& ip, port_ref_impl<N1, N2> ) {
1782  set_args_range<Place,N1>( ip, typename internal::make_sequence<port_ref_impl<N1, N2>::size>::type() );
1783  }
1784 
1785  template <int Place, int N1, int N2>
1786  void set_arg_impl( const kernel_input_tuple& ip, port_ref_impl<N1, N2>(*)() ) {
1787  set_arg_impl<Place>( ip, port_ref<N1, N2>() );
1788  }
1789 
1790  template <int Place, typename T>
1791  void set_arg_impl( const kernel_input_tuple&, const T& t ) {
1792  set_one_kernel_arg<Place>( t );
1793  }
1794 
1795  template <int>
1796  void set_args( const kernel_input_tuple& ) {}
1797 
1798  template <int Place, typename T, typename... Rest>
1799  void set_args( const kernel_input_tuple& ip, const T& t, Rest&&... rest ) {
1800  set_arg_impl<Place>( ip, t );
1801  set_args<Place+num_arguments<T>::value>( ip, std::forward<Rest>(rest)... );
1802  }
1803  // ------------------------------------------- //
1804 
1805  // -------- Kernel event list helpers -------- //
1806  int add_event_to_list( std::array<cl_event, NUM_INPUTS> &events, int &num_events, const cl_event *e ) {
1807  __TBB_ASSERT( (static_cast<typename std::array<cl_event, NUM_INPUTS>::size_type>(num_events) < events.size()), NULL );
1808  if ( e ) events[num_events++] = *e;
1809  return 0;
1810  }
1811 
1812  template <int... S>
1813  int generate_event_list( std::array<cl_event, NUM_INPUTS> &events, const kernel_input_tuple& ip, internal::sequence<S...> ) {
1814  int num_events = 0;
1815  ignore_return_values( add_event_to_list( events, num_events, get<S + 1>( ip ).get_event() )... );
1816  return num_events;
1817  }
1818  // ------------------------------------------- //
1819 
1820  // ---------- Update events helpers ---------- //
1821  template <int N>
1822  bool update_event_and_try_put( graph &g, cl_event e, const kernel_input_tuple& ip, output_ports_type &op ) {
1823  auto t = get<N + 1>( ip );
1824  t.set_event( e );
1825  t.set_graph( g );
1826  auto &port = get<N>( op );
1827  return port.try_put( t );
1828  }
1829 
1830  template <int... S>
1831  bool update_events_and_try_put( graph &g, cl_event e, const kernel_input_tuple& ip, output_ports_type &op, internal::sequence<S...> ) {
1832  return or_return_values( update_event_and_try_put<S>( g, e, ip, op )... );
1833  }
1834  // ------------------------------------------- //
1835 
1836  class set_args_func : tbb::internal::no_assign {
1837  public:
1838  set_args_func( opencl_kernel &k, const kernel_input_tuple &ip ) : my_opencl_kernel( k ), my_ip( ip ) {}
1839  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
1840  // Allow the compiler to deduce types for function pointers automatically.
1841  template <typename... A>
1842  void operator()( A&&... a ) {
1843  my_opencl_kernel.set_args<0>( my_ip, std::forward<A>( a )... );
1844  }
1845  private:
1846  opencl_kernel &my_opencl_kernel;
1847  const kernel_input_tuple &my_ip;
1848  };
1849 
1850  class send_func : tbb::internal::no_assign {
1851  public:
1852  send_func( opencl_device d ) : my_device( d ) {}
1853  void operator()() {}
1854  template <typename T, typename... Rest>
1855  void operator()( T &&t, Rest&&... rest ) {
1856  send_if_memory_object( my_device, std::forward<T>( t ) );
1857  (*this)( std::forward<Rest>( rest )... );
1858  }
1859  private:
1860  opencl_device my_device;
1861  };
1862 
1863  static void CL_CALLBACK decrement_wait_count_callback( cl_event, cl_int event_command_exec_status, void *data ) {
1864  tbb::internal::suppress_unused_warning( event_command_exec_status );
1865  __TBB_ASSERT( event_command_exec_status == CL_COMPLETE, NULL );
1866  graph &g = *static_cast<graph*>(data);
1867  g.decrement_wait_count();
1868  }
1869 
1870  public:
1871  opencl_kernel( const opencl_program<Factory>& p, const std::string &kernel_name, Factory &f, Args&&... args )
1872  : opencl_kernel_base( p, kernel_name, f )
1873  , my_args_pack( std::forward<Args>( args )... )
1874  {}
1875 
1876  opencl_kernel( const opencl_kernel_base &k ) : opencl_kernel_base( k ), my_args_pack( k.my_args_pack ) {}
1877 
1878  opencl_kernel( const opencl_kernel_base &k, Args&&... args ) : opencl_kernel_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
1879 
1880  /* override */ void enqueue( const ndranges_mapper_base *ndrange_mapper, const kernel_input_tuple &ip, output_ports_type &op, graph &g ) {
1881  // Set arguments for the kernel.
1882  tbb::internal::call( set_args_func( *this, ip ), my_args_pack );
1883 
1884  // Gather events from all ports to an array.
1885  std::array<cl_event, NUM_INPUTS> events;
1886  int num_events = generate_event_list( events, ip, input_sequence() );
1887 
1888  // Enqueue the kernel. ndrange_mapper is used only to obtain ndrange. Actually, it calls opencl_kernel_base::enqueue.
1889  cl_event e = ndrange_mapper->enqueue_kernel( this, ip, num_events, events );
1890 
1891  // Update events in dependency messages and try_put them to the output ports.
1892  if ( !update_events_and_try_put( g, e, ip, op, input_sequence() ) ) {
1893  // No one message was passed to successors so set a callback to extend the graph lifetime until the kernel completion.
1894  g.increment_wait_count();
1895  enforce_cl_retcode( clSetEventCallback( e, CL_COMPLETE, decrement_wait_count_callback, &g ), "Failed to set a callback" );
1896  this->my_factory.flush( get<0>( ip ).device() );
1897  }
1898  // Release our own reference to cl_event.
1899  enforce_cl_retcode( clReleaseEvent( e ), "Failed to release an event" );
1900  }
1901 
1902  virtual void send_memory_objects( opencl_device d ) {
1903  // Find opencl_buffer and send them to the devece.
1904  tbb::internal::call( send_func( d ), my_args_pack );
1905  }
1906 
1907  /* override */ opencl_kernel_base *clone() const {
1908  // Create new opencl_kernel with copying constructor.
1909  return new opencl_kernel<Args...>( *this );
1910  }
1911 
1912  private:
1913  tbb::internal::stored_pack<Args...> my_args_pack;
1914  };
1915 
1916  template <typename GlbNDRange, typename LclNDRange>
1917  class ndranges_mapper : public ndranges_mapper_base, tbb::internal::no_assign {
1918  public:
1919  template <typename GRange, typename LRange>
1920  ndranges_mapper( GRange&& glb, LRange&& lcl ) : my_global_work_size( glb ), my_local_work_size( lcl ) {}
1921 
1922  /*override*/ cl_event enqueue_kernel( opencl_kernel_base *k, const kernel_input_tuple &ip, int num_events, const std::array<cl_event, NUM_INPUTS> &events ) const {
1923  return k->enqueue( my_global_work_size, my_local_work_size, num_events, events, ip );
1924  }
1925 
1926  /*override*/ ndranges_mapper_base *clone() const {
1927  return new ndranges_mapper<GlbNDRange, LclNDRange>( my_global_work_size, my_local_work_size );
1928  }
1929 
1930  private:
1931  GlbNDRange my_global_work_size;
1932  LclNDRange my_local_work_size;
1933  };
1934 
1935  void enqueue_kernel( const kernel_input_tuple &ip, typename opencl_kernel_base::output_ports_type &op ) const {
1936  __TBB_ASSERT(my_ndranges_mapper, "NDRanges are not set. Call set_ndranges before running opencl_node.");
1937  my_opencl_kernel->enqueue( my_ndranges_mapper, ip, op, this->my_graph );
1938  }
1939 
1940  // Body for kernel_multifunction_node.
1941  class kernel_body : tbb::internal::no_assign {
1942  public:
1943  kernel_body( const opencl_node &node ) : my_opencl_node( node ) {}
1944  void operator()( const kernel_input_tuple &ip, typename opencl_kernel_base::output_ports_type &op ) {
1945  my_opencl_node.enqueue_kernel( ip, op );
1946  }
1947  private:
1948  const opencl_node &my_opencl_node;
1949  };
1950 
1951  template <typename... Args>
1952  opencl_kernel_base *make_opencl_kernel( const opencl_program<Factory> &p, const std::string &kernel_name, Factory &f, Args&&... args ) const {
1953  return new opencl_kernel<Args...>( p, kernel_name, f, std::forward<Args>( args )... );
1954  }
1955 
1956  template <typename GlobalNDRange, typename LocalNDRange = std::array<size_t,3>>
1957  void set_ndranges_impl( GlobalNDRange&& global_work_size, LocalNDRange&& local_work_size = std::array<size_t, 3>( { { 0, 0, 0 } } ) ) {
1958  if ( my_ndranges_mapper ) delete my_ndranges_mapper;
1959  my_ndranges_mapper = new ndranges_mapper<typename std::decay<GlobalNDRange>::type, typename std::decay<LocalNDRange>::type>
1960  ( std::forward<GlobalNDRange>( global_work_size ), std::forward<LocalNDRange>( local_work_size ) );
1961  }
1962 
1963  void notify_new_device( opencl_device d ) {
1964  my_opencl_kernel->send_memory_objects( d );
1965  }
1966 
1967 public:
1968  template <typename DeviceSelector>
1969  opencl_node( opencl_graph &g, const opencl_program<Factory> &p, const std::string &kernel_name, DeviceSelector d, Factory &f )
1970  : base_type( g )
1971  , my_indexer_node( g )
1972  , my_device_selector( new device_selector<DeviceSelector>( d, *this, f ) )
1973  , my_device_selector_node( g, serial, device_selector_body( my_device_selector ) )
1974  , my_join_node( g )
1975  , my_kernel_node( g, serial, kernel_body( *this ) )
1976  // By default, opencl_node maps all its ports to the kernel arguments on a one-to-one basis.
1977  , my_opencl_kernel( make_opencl_kernel( p , kernel_name, f, port_ref<0, NUM_INPUTS - 1>()) )
1978  , my_ndranges_mapper( NULL )
1979  {
1980  base_type::set_external_ports( get_input_ports(), get_output_ports() );
1981  make_edges();
1982  }
1983 
1984  opencl_node( opencl_graph &g, const opencl_program<Factory> &p, const std::string &kernel_name, Factory &f )
1985  : opencl_node( g, p, kernel_name, g.get_opencl_foundation().get_default_device_selector(), f )
1986  {}
1987 
1988 
1989  opencl_node( const opencl_node &node )
1990  : base_type( node.my_graph )
1991  , my_indexer_node( node.my_indexer_node )
1992  , my_device_selector( node.my_device_selector->clone( *this ) )
1993  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
1994  , my_join_node( node.my_join_node )
1995  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
1996  , my_opencl_kernel( node.my_opencl_kernel->clone() )
1997  , my_ndranges_mapper( node.my_ndranges_mapper ? node.my_ndranges_mapper->clone() : NULL )
1998  {
1999  base_type::set_external_ports( get_input_ports(), get_output_ports() );
2000  make_edges();
2001  }
2002 
2003  opencl_node( opencl_node &&node )
2004  : base_type( node.my_graph )
2005  , my_indexer_node( std::move( node.my_indexer_node ) )
2006  , my_device_selector( node.my_device_selector->clone(*this) )
2007  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
2008  , my_join_node( std::move( node.my_join_node ) )
2009  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
2010  , my_opencl_kernel( node.my_opencl_kernel )
2011  , my_ndranges_mapper( node.my_ndranges_mapper )
2012  {
2013  base_type::set_external_ports( get_input_ports(), get_output_ports() );
2014  make_edges();
2015  // Set moving node mappers to NULL to prevent double deallocation.
2016  node.my_opencl_kernel = NULL;
2017  node.my_ndranges_mapper = NULL;
2018  }
2019 
2020  ~opencl_node() {
2021  if ( my_opencl_kernel ) delete my_opencl_kernel;
2022  if ( my_ndranges_mapper ) delete my_ndranges_mapper;
2023  if ( my_device_selector ) delete my_device_selector;
2024  }
2025 
2026  template <typename T>
2027  void set_ndranges( std::initializer_list<T> global_work_size ) {
2028  set_ndranges_impl( internal::initializer_list_wrapper<T>( global_work_size ) );
2029  }
2030 
2031  template <typename GlobalNDRange>
2032  void set_ndranges( GlobalNDRange&& global_work_size ) {
2033  set_ndranges_impl( std::forward<GlobalNDRange>( global_work_size ) );
2034  }
2035 
2036  template <typename T, typename LocalNDRange>
2037  void set_ndranges( std::initializer_list<T> global_work_size, LocalNDRange&& local_work_size ) {
2038  set_ndranges_impl( internal::initializer_list_wrapper<T>(global_work_size), std::forward<LocalNDRange>( local_work_size ) );
2039  }
2040 
2041  template <typename T1, typename T2 = T1>
2042  void set_ndranges( std::initializer_list<T1> global_work_size, std::initializer_list<T2> local_work_size ) {
2043  set_ndranges_impl( internal::initializer_list_wrapper<T1>(global_work_size), internal::initializer_list_wrapper<T2>(local_work_size) );
2044  }
2045 
2046  template <typename GlobalNDRange, typename LocalNDRange>
2047  void set_ndranges( GlobalNDRange&& global_work_size, LocalNDRange&& local_work_size ) {
2048  set_ndranges_impl( std::forward<GlobalNDRange>(global_work_size), std::forward<LocalNDRange>(local_work_size) );
2049  }
2050 
2051  template <typename GlobalNDRange, typename T>
2052  void set_ndranges( GlobalNDRange&& global_work_size, std::initializer_list<T> local_work_size ) {
2053  set_ndranges_impl( std::forward<GlobalNDRange>( global_work_size ), internal::initializer_list_wrapper<T>( local_work_size ) );
2054  }
2055 
2056  template <typename... Args>
2057  void set_args( Args&&... args ) {
2058  // Copy the base class of opencl_kernal and create new storage for "Args...".
2059  opencl_kernel_base *new_opencl_kernel = new opencl_kernel<Args...>( *my_opencl_kernel, std::forward<Args>( args )... );
2060  delete my_opencl_kernel;
2061  my_opencl_kernel = new_opencl_kernel;
2062  }
2063 
2064 protected:
2065  /* override */ void reset_node( reset_flags = rf_reset_protocol ) { __TBB_ASSERT( false, "Not implemented yet" ); }
2066 
2067 private:
2068  indexer_node_type my_indexer_node;
2069  device_selector_base *my_device_selector;
2070  device_selector_node my_device_selector_node;
2071  join_node<kernel_input_tuple, JP> my_join_node;
2072  kernel_multifunction_node my_kernel_node;
2073 
2074  opencl_kernel_base *my_opencl_kernel;
2075  ndranges_mapper_base *my_ndranges_mapper;
2076 };
2077 
2078 template<typename JP, typename... Ports>
2079 class opencl_node< tuple<Ports...>, JP > : public opencl_node < tuple<Ports...>, JP, default_opencl_factory > {
2080  typedef opencl_node < tuple<Ports...>, JP, default_opencl_factory > base_type;
2081 public:
2082  opencl_node( opencl_graph &g, const std::string &kernel )
2083  : base_type( g, kernel, g.get_opencl_foundation().get_default_device_selector(), g.get_opencl_foundation().get_default_opencl_factory() )
2084  {}
2085  opencl_node( opencl_graph &g, const opencl_program<default_opencl_factory> &p, const std::string &kernel )
2086  : base_type( g, p, kernel,
2087  g.get_opencl_foundation().get_default_device_selector(), g.get_opencl_foundation().get_default_opencl_factory() )
2088  {}
2089  template <typename DeviceSelector>
2090  opencl_node( opencl_graph &g, const opencl_program<default_opencl_factory> &p, const std::string &kernel, DeviceSelector d )
2091  : base_type( g, p , kernel, d, g.get_opencl_foundation().get_default_opencl_factory() )
2092  {}
2093 };
2094 
2095 template<typename... Ports>
2096 class opencl_node< tuple<Ports...> > : public opencl_node < tuple<Ports...>, queueing, default_opencl_factory > {
2097  typedef opencl_node < tuple<Ports...>, queueing, default_opencl_factory > base_type;
2098 public:
2099  opencl_node( opencl_graph &g, const std::string &kernel )
2100  : base_type( g, kernel, g.get_opencl_foundation().get_default_device_selector(), g.get_opencl_foundation().get_default_opencl_factory() )
2101  {}
2102  opencl_node( opencl_graph &g, const opencl_program<default_opencl_factory> &p, const std::string &kernel )
2103  : base_type( g, p, kernel,
2104  g.get_opencl_foundation().get_default_device_selector(), g.get_opencl_foundation().get_default_opencl_factory() )
2105  {}
2106  template <typename DeviceSelector>
2107  opencl_node( opencl_graph &g, const opencl_program<default_opencl_factory> &p, const std::string &kernel, DeviceSelector d )
2108  : base_type( g, p, kernel, d, g.get_opencl_foundation().get_default_opencl_factory() )
2109  {}
2110 };
2111 
2112 } // namespace interface8
2113 
2114 using interface8::opencl_graph;
2115 using interface8::opencl_node;
2116 using interface8::read_only;
2117 using interface8::read_write;
2118 using interface8::write_only;
2119 using interface8::opencl_buffer;
2120 using interface8::opencl_subbuffer;
2121 using interface8::opencl_device;
2122 using interface8::opencl_device_list;
2123 using interface8::opencl_program;
2124 using interface8::opencl_program_type;
2125 using interface8::dependency_msg;
2126 using interface8::port_ref;
2127 using interface8::opencl_factory;
2128 
2129 } // namespace flow
2130 } // namespace tbb
2131 #endif /* __TBB_PREVIEW_OPENCL_NODE */
2132 
2133 #endif // __TBB_flow_graph_opencl_node_H
bool try_put(const continue_msg &t)
Put an item to the receiver.
Definition: flow_graph.h:415
Definition: _tbb_windef.h:37
Acquire.
Definition: atomic.h:47
The graph related classes and functions.
No ordering.
Definition: atomic.h:51
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
Primary template for atomic.
Definition: atomic.h:405
A lock that occupies a single byte.
Definition: spin_mutex.h:40
Release.
Definition: atomic.h:49
The namespace tbb contains all components of the library.
Definition: parallel_for.h:44