Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_streaming_node.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_streaming_H
18 #define __TBB_flow_graph_streaming_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 #if __TBB_PREVIEW_STREAMING_NODE
25 
26 // Included in namespace tbb::flow::interfaceX (in flow_graph.h)
27 
28 namespace internal {
29 
30 template <int N1, int N2>
31 struct port_ref_impl {
32  // "+1" since the port_ref range is a closed interval (includes its endpoints).
33  static const int size = N2 - N1 + 1;
34 };
35 
36 } // internal
37 
38 // The purpose of the port_ref_impl is the pretty syntax: the deduction of a compile-time constant is processed from the return type.
39 // So it is possible to use this helper without parentheses, e.g. "port_ref<0>".
40 template <int N1, int N2 = N1>
43 };
44 
45 namespace internal {
46 
47 template <typename T>
48 struct num_arguments {
49  static const int value = 1;
50 };
51 
52 template <int N1, int N2>
53 struct num_arguments<port_ref_impl<N1,N2>(*)()> {
54  static const int value = port_ref_impl<N1,N2>::size;
55 };
56 
57 template <int N1, int N2>
58 struct num_arguments<port_ref_impl<N1,N2>> {
59  static const int value = port_ref_impl<N1,N2>::size;
60 };
61 
62 template <typename... Args>
63 void ignore_return_values( Args&&... ) {}
64 
65 template <typename T>
66 T or_return_values( T&& t ) { return t; }
67 template <typename T, typename... Rest>
68 T or_return_values( T&& t, Rest&&... rest ) {
69  return t | or_return_values( std::forward<Rest>(rest)... );
70 }
71 
72 template<typename JP>
74  typedef size_t type;
76 };
77 
78 template<typename Key>
79 struct key_from_policy< key_matching<Key> > {
80  typedef Key type;
82 };
83 
84 template<typename Key>
85 struct key_from_policy< key_matching<Key&> > {
86  typedef const Key &type;
88 };
89 
90 template<typename Device, typename Key>
92  Device my_device;
94 public:
95  // TODO: investigate why default constructor is required
97  streaming_device_with_key( const Device& d, Key k ) : my_device( d ), my_key( k ) {}
98  Key key() const { return my_key; }
99  const Device& device() const { return my_device; }
100 };
101 
102 // --------- Kernel argument helpers --------- //
103 template <typename T>
106 };
107 
108 template <int N1, int N2>
109 struct is_port_ref_impl< port_ref_impl<N1, N2> > {
111 };
112 
113 template <int N1, int N2>
114 struct is_port_ref_impl< port_ref_impl<N1, N2>( * )() > {
116 };
117 
118 template <typename T>
119 struct is_port_ref {
121 };
122 
123 template <typename ...Args1>
125 
126 template <typename A1, typename ...Args1>
127 struct convert_and_call_impl<A1, Args1...> {
128  static const size_t my_delta = 1; // Index 0 contains device
129 
130  template <typename F, typename Tuple, typename ...Args2>
131  static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
132  convert_and_call_impl<A1, Args1...>::doit_impl(typename is_port_ref<A1>::type(), f, t, a1, args1..., args2...);
133  }
134  template <typename F, typename Tuple, typename ...Args2>
135  static void doit_impl(std::false_type, F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
136  convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., a1);
137  }
138  template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
139  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>, Args1&... args1, Args2&... args2) {
140  convert_and_call_impl<port_ref_impl<N1 + 1,N2>, Args1...>::doit_impl(x, f, t, port_ref<N1 + 1, N2>(), args1...,
141  args2..., std::get<N1 + my_delta>(t));
142  }
143  template <typename F, typename Tuple, int N, typename ...Args2>
144  static void doit_impl(std::true_type, F& f, Tuple& t, port_ref_impl<N, N>, Args1&... args1, Args2&... args2) {
145  convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., std::get<N + my_delta>(t));
146  }
147 
148  template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
149  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>(* fn)(), Args1&... args1, Args2&... args2) {
150  doit_impl(x, f, t, fn(), args1..., args2...);
151  }
152  template <typename F, typename Tuple, int N, typename ...Args2>
153  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N, N>(* fn)(), Args1&... args1, Args2&... args2) {
154  doit_impl(x, f, t, fn(), args1..., args2...);
155  }
156 };
157 
158 template <>
160  template <typename F, typename Tuple, typename ...Args2>
161  static void doit(F& f, Tuple&, Args2&... args2) {
162  f(args2...);
163  }
164 };
165 // ------------------------------------------- //
166 
167 template<typename JP, typename StreamFactory, typename... Ports>
169  // Do not use 'using' instead of 'struct' because Microsoft Visual C++ 12.0 fails to compile.
170  template <typename T>
171  struct async_msg_type {
172  typedef typename StreamFactory::template async_msg_type<T> type;
173  };
174 
179 
180  // indexer_node parameters pack expansion workaround for VS2013 for streaming_node
182 };
183 
184 // Default empty implementation
185 template<typename StreamFactory, typename KernelInputTuple, typename = void>
187  typedef typename StreamFactory::device_type device_type;
188  typedef typename StreamFactory::kernel_type kernel_type;
189  typedef KernelInputTuple kernel_input_tuple;
190 protected:
191  template <typename ...Args>
192  void enqueue_kernel_impl( kernel_input_tuple&, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
193  factory.send_kernel( device, kernel, args... );
194  }
195 };
196 
197 // Implementation for StreamFactory supporting range
198 template<typename StreamFactory, typename KernelInputTuple>
199 class kernel_executor_helper<StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type > {
200  typedef typename StreamFactory::device_type device_type;
201  typedef typename StreamFactory::kernel_type kernel_type;
202  typedef KernelInputTuple kernel_input_tuple;
203 
204  typedef typename StreamFactory::range_type range_type;
205 
206  // Container for randge. It can contain either port references or real range.
207  struct range_wrapper {
208  virtual range_type get_range( const kernel_input_tuple &ip ) const = 0;
209  virtual range_wrapper *clone() const = 0;
210  virtual ~range_wrapper() {}
211  };
212 
213  struct range_value : public range_wrapper {
214  range_value( const range_type& value ) : my_value(value) {}
215 
216  range_value( range_type&& value ) : my_value(std::move(value)) {}
217 
219  return my_value;
220  }
221 
222  range_wrapper *clone() const __TBB_override {
223  return new range_value(my_value);
224  }
225  private:
227  };
228 
229  template <int N>
230  struct range_mapper : public range_wrapper {
232 
234  // "+1" since get<0>(ip) is StreamFactory::device.
235  return get<N + 1>(ip).data(false);
236  }
237 
238  range_wrapper *clone() const __TBB_override {
239  return new range_mapper<N>;
240  }
241  };
242 
243 protected:
244  template <typename ...Args>
245  void enqueue_kernel_impl( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
246  __TBB_ASSERT(my_range_wrapper, "Range is not set. Call set_range() before running streaming_node.");
247  factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
248  }
249 
250 public:
251  kernel_executor_helper() : my_range_wrapper(NULL) {}
252 
253  kernel_executor_helper(const kernel_executor_helper& executor) : my_range_wrapper(executor.my_range_wrapper ? executor.my_range_wrapper->clone() : NULL) {}
254 
255  kernel_executor_helper(kernel_executor_helper&& executor) : my_range_wrapper(executor.my_range_wrapper) {
256  // Set moving holder mappers to NULL to prevent double deallocation
257  executor.my_range_wrapper = NULL;
258  }
259 
261  if (my_range_wrapper) delete my_range_wrapper;
262  }
263 
264  void set_range(const range_type& work_size) {
265  my_range_wrapper = new range_value(work_size);
266  }
267 
268  void set_range(range_type&& work_size) {
269  my_range_wrapper = new range_value(std::move(work_size));
270  }
271 
272  template <int N>
274  my_range_wrapper = new range_mapper<N>;
275  }
276 
277  template <int N>
279  my_range_wrapper = new range_mapper<N>;
280  }
281 
282 private:
283  range_wrapper* my_range_wrapper;
284 };
285 
286 } // internal
287 
288 /*
289 /---------------------------------------- streaming_node ------------------------------------\
290 | |
291 | /--------------\ /----------------------\ /-----------\ /----------------------\ |
292 | | | | (device_with_key) O---O | | | |
293 | | | | | | | | | |
294 O---O indexer_node O---O device_selector_node O---O join_node O---O kernel_node O---O
295 | | | | (multifunction_node) | | | | (multifunction_node) | |
296 O---O | | O---O | | O---O
297 | \--------------/ \----------------------/ \-----------/ \----------------------/ |
298 | |
299 \--------------------------------------------------------------------------------------------/
300 */
301 template<typename... Args>
303 
304 template<typename... Ports, typename JP, typename StreamFactory>
305 class streaming_node< tuple<Ports...>, JP, StreamFactory >
306  : public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
307  typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
308  , public internal::kernel_executor_helper< StreamFactory, typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple >
309 {
310  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple input_tuple;
311  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple output_tuple;
313 protected:
314  typedef typename StreamFactory::device_type device_type;
315  typedef typename StreamFactory::kernel_type kernel_type;
316 private:
318  typedef composite_node<input_tuple, output_tuple> base_type;
319  static const size_t NUM_INPUTS = tuple_size<input_tuple>::value;
320  static const size_t NUM_OUTPUTS = tuple_size<output_tuple>::value;
321 
324 
325  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::indexer_node_type indexer_node_type;
326  typedef typename indexer_node_type::output_type indexer_node_output_type;
327  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple kernel_input_tuple;
328  typedef multifunction_node<indexer_node_output_type, kernel_input_tuple> device_selector_node;
329  typedef multifunction_node<kernel_input_tuple, output_tuple> kernel_multifunction_node;
330 
331  template <int... S>
332  typename base_type::input_ports_type get_input_ports( internal::sequence<S...> ) {
333  return std::tie( internal::input_port<S>( my_indexer_node )... );
334  }
335 
336  template <int... S>
337  typename base_type::output_ports_type get_output_ports( internal::sequence<S...> ) {
338  return std::tie( internal::output_port<S>( my_kernel_node )... );
339  }
340 
341  typename base_type::input_ports_type get_input_ports() {
342  return get_input_ports( input_sequence() );
343  }
344 
345  typename base_type::output_ports_type get_output_ports() {
346  return get_output_ports( output_sequence() );
347  }
348 
349  template <int N>
351  make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
352  return 0;
353  }
354 
355  template <int... S>
357  make_edge( my_indexer_node, my_device_selector_node );
358  make_edge( my_device_selector_node, my_join_node );
359  internal::ignore_return_values( make_Nth_edge<S + 1>()... );
360  make_edge( my_join_node, my_kernel_node );
361  }
362 
363  void make_edges() {
364  make_edges( input_sequence() );
365  }
366 
367  class device_selector_base {
368  public:
369  virtual void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) = 0;
370  virtual device_selector_base *clone( streaming_node &n ) const = 0;
372  };
373 
374  template <typename UserFunctor>
375  class device_selector : public device_selector_base, tbb::internal::no_assign {
376  public:
377  device_selector( UserFunctor uf, streaming_node &n, StreamFactory &f )
378  : my_dispatch_funcs( create_dispatch_funcs( input_sequence() ) )
379  , my_user_functor( uf ), my_node(n), my_factory( f )
380  {
381  my_port_epoches.fill( 0 );
382  }
383 
384  void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) __TBB_override {
385  (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
387  || my_port_epoches[v.tag()] == 0, "Epoch is changed when key matching is requested" );
388  }
389 
390  device_selector_base *clone( streaming_node &n ) const __TBB_override {
391  return new device_selector( my_user_functor, n, my_factory );
392  }
393  private:
394  typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(size_t &, const indexer_node_output_type &, typename device_selector_node::output_ports_type &);
395  typedef std::array < send_and_put_fn_type, NUM_INPUTS > dispatch_funcs_type;
396 
397  template <int... S>
399  dispatch_funcs_type dispatch = { { &device_selector<UserFunctor>::send_and_put_impl<S>... } };
400  return dispatch;
401  }
402 
403  template <typename T>
404  key_type get_key( std::false_type, const T &, size_t &epoch ) {
406  return epoch++;
407  }
408 
409  template <typename T>
410  key_type get_key( std::true_type, const T &t, size_t &/*epoch*/ ) {
412  return key_from_message<key_type>( t );
413  }
414 
415  template <int N>
416  void send_and_put_impl( size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
417  typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
418  elem_type e = internal::cast_to<elem_type>( v );
419  device_type device = get_device( get_key( typename internal::key_from_policy<JP>::is_key_matching(), e, epoch ), get<0>( op ) );
420  my_factory.send_data( device, e );
421  get<N + 1>( op ).try_put( e );
422  }
423 
424  template< typename DevicePort >
425  device_type get_device( key_type key, DevicePort& dp ) {
426  typename std::unordered_map<typename std::decay<key_type>::type, epoch_desc>::iterator it = my_devices.find( key );
427  if ( it == my_devices.end() ) {
428  device_type d = my_user_functor( my_factory );
429  std::tie( it, std::ignore ) = my_devices.insert( std::make_pair( key, d ) );
430  bool res = dp.try_put( device_with_key_type( d, key ) );
431  __TBB_ASSERT_EX( res, NULL );
432  my_node.notify_new_device( d );
433  }
434  epoch_desc &e = it->second;
435  device_type d = e.my_device;
436  if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
437  return d;
438  }
439 
440  struct epoch_desc {
441  epoch_desc(device_type d ) : my_device( d ), my_request_number( 0 ) {}
444  };
445 
447  std::array<size_t, NUM_INPUTS> my_port_epoches;
449  UserFunctor my_user_functor;
451  StreamFactory &my_factory;
452  };
453 
454  class device_selector_body {
455  public:
456  device_selector_body( device_selector_base *d ) : my_device_selector( d ) {}
457 
458  void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
459  (*my_device_selector)(v, op);
460  }
461  private:
462  device_selector_base *my_device_selector;
463  };
464 
465  class args_storage_base : tbb::internal::no_copy {
466  public:
467  typedef typename kernel_multifunction_node::output_ports_type output_ports_type;
468 
469  virtual void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) = 0;
470  virtual void send( device_type d ) = 0;
471  virtual args_storage_base *clone() const = 0;
472  virtual ~args_storage_base () {}
473 
474  protected:
475  args_storage_base( const kernel_type& kernel, StreamFactory &f )
476  : my_kernel( kernel ), my_factory( f )
477  {}
478 
479  args_storage_base( const args_storage_base &k )
480  : my_kernel( k.my_kernel ), my_factory( k.my_factory )
481  {}
482 
484  StreamFactory &my_factory;
485  };
486 
487  template <typename... Args>
488  class args_storage : public args_storage_base {
490 
491  // ---------- Update events helpers ---------- //
492  template <int N>
493  bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op ) const {
494  const auto& t = get<N + 1>( ip );
495  auto &port = get<N>( op );
496  return port.try_put( t );
497  }
498 
499  template <int... S>
501  return internal::or_return_values( do_try_put<S>( ip, op )... );
502  }
503 
504  // ------------------------------------------- //
505  class run_kernel_func : tbb::internal::no_assign {
506  public:
507  run_kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage )
508  : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
509 
510  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
511  // Allow the compiler to deduce types for function pointers automatically.
512  template <typename... FnArgs>
513  void operator()( FnArgs&... args ) {
514  internal::convert_and_call_impl<FnArgs...>::doit( my_kernel_func, my_kernel_func.my_ip, args... );
515  }
516  private:
517  struct kernel_func : tbb::internal::no_copy {
520  const args_storage& my_storage;
522 
523  kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage, device_type device )
524  : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
525  {}
526 
527  template <typename... FnArgs>
528  void operator()( FnArgs&... args ) {
529  my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
530  }
531  } my_kernel_func;
532  };
533 
534  template<typename FinalizeFn>
535  class run_finalize_func : tbb::internal::no_assign {
536  public:
537  run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn )
538  : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(), fn ) {}
539 
540  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
541  // Allow the compiler to deduce types for function pointers automatically.
542  template <typename... FnArgs>
543  void operator()( FnArgs&... args ) {
544  internal::convert_and_call_impl<FnArgs...>::doit( my_finalize_func, my_ip, args... );
545  }
546  private:
548 
549  struct finalize_func : tbb::internal::no_assign {
550  StreamFactory &my_factory;
552  FinalizeFn my_fn;
553 
554  finalize_func( StreamFactory &factory, device_type device, FinalizeFn fn )
555  : my_factory(factory), my_device(device), my_fn(fn) {}
556 
557  template <typename... FnArgs>
558  void operator()( FnArgs&... args ) {
559  my_factory.finalize( my_device, my_fn, args... );
560  }
561  } my_finalize_func;
562  };
563 
564  template<typename FinalizeFn>
565  static run_finalize_func<FinalizeFn> make_run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn ) {
566  return run_finalize_func<FinalizeFn>( ip, factory, fn );
567  }
568 
569  class send_func : tbb::internal::no_assign {
570  public:
571  send_func( StreamFactory &factory, device_type d )
572  : my_factory(factory), my_device( d ) {}
573 
574  template <typename... FnArgs>
575  void operator()( FnArgs&... args ) {
576  my_factory.send_data( my_device, args... );
577  }
578  private:
579  StreamFactory &my_factory;
581  };
582 
583  public:
584  args_storage( const kernel_type& kernel, StreamFactory &f, Args&&... args )
585  : args_storage_base( kernel, f )
586  , my_args_pack( std::forward<Args>(args)... )
587  {}
588 
589  args_storage( const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
590 
591  args_storage( const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
592 
594  // Make const qualified args_pack (from non-const)
595  const args_pack_type& const_args_pack = my_args_pack;
596  // factory.enqure_kernel() gets
597  // - 'ip' tuple elements by reference and updates it (and 'ip') with dependencies
598  // - arguments (from my_args_pack) by const-reference via const_args_pack
599  tbb::internal::call( run_kernel_func( ip, n, *this ), const_args_pack );
600 
601  if (! do_try_put( ip, op, input_sequence() ) ) {
602  graph& g = n.my_graph;
603  // No one message was passed to successors so set a callback to extend the graph lifetime until the kernel completion.
604  g.increment_wait_count();
605 
606  // factory.finalize() gets
607  // - 'ip' tuple elements by reference, so 'ip' might be changed
608  // - arguments (from my_args_pack) by const-reference via const_args_pack
609  tbb::internal::call( make_run_finalize_func(ip, this->my_factory, [&g] {
610  g.decrement_wait_count();
611  }), const_args_pack );
612  }
613  }
614 
616  // factory.send() gets arguments by reference and updates these arguments with dependencies
617  // (it gets but usually ignores port_ref-s)
618  tbb::internal::call( send_func( this->my_factory, d ), my_args_pack );
619  }
620 
621  args_storage_base *clone() const __TBB_override {
622  // Create new args_storage with copying constructor.
623  return new args_storage<Args...>( *this );
624  }
625 
626  private:
629  };
630 
631  // Body for kernel_multifunction_node.
632  class kernel_body : tbb::internal::no_assign {
633  public:
634  kernel_body( const streaming_node &node ) : my_node( node ) {}
635 
637  __TBB_ASSERT( (my_node.my_args_storage != NULL), "No arguments storage" );
638  // 'ip' is passed by value to create local copy for updating inside enqueue_kernel()
639  my_node.my_args_storage->enqueue( ip, op, my_node );
640  }
641  private:
643  };
644 
646  struct wrap_to_async {
647  typedef T type; // Keep port_ref as it is
648  };
649 
650  template <typename T>
651  struct wrap_to_async<T, std::false_type> {
652  typedef typename StreamFactory::template async_msg_type< typename tbb::internal::strip<T>::type > type;
653  };
654 
655  template <typename... Args>
656  args_storage_base *make_args_storage(const args_storage_base& storage, Args&&... args) const {
657  // In this variadic template convert all simple types 'T' into 'async_msg_type<T>'
658  return new args_storage<Args...>(storage, std::forward<Args>(args)...);
659  }
660 
662  my_args_storage->send( d );
663  }
664 
665  template <typename ...Args>
666  void enqueue_kernel( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
667  this->enqueue_kernel_impl( ip, factory, device, kernel, args... );
668  }
669 
670 public:
671  template <typename DeviceSelector>
672  streaming_node( graph &g, const kernel_type& kernel, DeviceSelector d, StreamFactory &f )
673  : base_type( g )
674  , my_indexer_node( g )
675  , my_device_selector( new device_selector<DeviceSelector>( d, *this, f ) )
676  , my_device_selector_node( g, serial, device_selector_body( my_device_selector ) )
677  , my_join_node( g )
678  , my_kernel_node( g, serial, kernel_body( *this ) )
679  // By default, streaming_node maps all its ports to the kernel arguments on a one-to-one basis.
680  , my_args_storage( make_args_storage( args_storage<>(kernel, f), port_ref<0, NUM_INPUTS - 1>() ) )
681  {
682  base_type::set_external_ports( get_input_ports(), get_output_ports() );
683  make_edges();
684  }
685 
687  : base_type( node.my_graph )
688  , my_indexer_node( node.my_indexer_node )
689  , my_device_selector( node.my_device_selector->clone( *this ) )
690  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
691  , my_join_node( node.my_join_node )
692  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
693  , my_args_storage( node.my_args_storage->clone() )
694  {
695  base_type::set_external_ports( get_input_ports(), get_output_ports() );
696  make_edges();
697  }
698 
700  : base_type( node.my_graph )
701  , my_indexer_node( std::move( node.my_indexer_node ) )
702  , my_device_selector( node.my_device_selector->clone(*this) )
703  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
704  , my_join_node( std::move( node.my_join_node ) )
705  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
706  , my_args_storage( node.my_args_storage )
707  {
708  base_type::set_external_ports( get_input_ports(), get_output_ports() );
709  make_edges();
710  // Set moving node mappers to NULL to prevent double deallocation.
711  node.my_args_storage = NULL;
712  }
713 
715  if ( my_args_storage ) delete my_args_storage;
716  if ( my_device_selector ) delete my_device_selector;
717  }
718 
719  template <typename... Args>
720  void set_args( Args&&... args ) {
721  // Copy the base class of args_storage and create new storage for "Args...".
722  args_storage_base * const new_args_storage = make_args_storage( *my_args_storage, typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
723  delete my_args_storage;
724  my_args_storage = new_args_storage;
725  }
726 
727 protected:
728  void reset_node( reset_flags = rf_reset_protocol ) __TBB_override { __TBB_ASSERT( false, "Not implemented yet" ); }
729 
730 private:
732  device_selector_base *my_device_selector;
734  join_node<kernel_input_tuple, JP> my_join_node;
736 
737  args_storage_base *my_args_storage;
738 };
739 
740 #endif // __TBB_PREVIEW_STREAMING_NODE
741 #endif // __TBB_flow_graph_streaming_H
void enqueue(kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n) __TBB_override
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op) const
static void doit_impl(std::true_type, F &f, Tuple &t, port_ref_impl< N, N >, Args1 &... args1, Args2 &... args2)
device_selector_base * clone(streaming_node &n) const __TBB_override
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
StreamFactory::template async_msg_type< T > type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle * key
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
internal::make_sequence< NUM_OUTPUTS >::type output_sequence
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:305
multifunction_node< indexer_node_output_type, kernel_input_tuple > device_selector_node
internal::streaming_device_with_key< device_type, key_type > device_with_key_type
tuple< typename async_msg_type< Ports >::type... > input_tuple
void ignore_return_values(Args &&...)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::indexer_node_type indexer_node_type
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:331
Detects whether two given types are the same.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void reset_node(reset_flags=rf_reset_protocol) __TBB_override
#define __TBB_override
Definition: tbb_stddef.h:240
void enqueue_kernel(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
internal::make_sequence< NUM_INPUTS >::type input_sequence
The graph class.
kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage, device_type device)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::kernel_input_tuple kernel_input_tuple
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
task * do_try_put(const T &v, void *p)
static void doit_impl(std::false_type, F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
std::unordered_map< typename std::decay< key_type >::type, epoch_desc > my_devices
internal::streaming_node_traits< JP, StreamFactory, Ports... >::output_tuple output_tuple
void operator()(kernel_input_tuple ip, typename args_storage_base::output_ports_type &op)
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op) __TBB_override
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >, Args1 &... args1, Args2 &... args2)
void send_and_put_impl(size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
static dispatch_funcs_type create_dispatch_funcs(internal::sequence< S... >)
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
base_type::input_ports_type get_input_ports(internal::sequence< S... >)
void enqueue_kernel_impl(kernel_input_tuple &, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
void enqueue_kernel_impl(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3119
internal::streaming_node_traits< JP, StreamFactory, Ports... >::input_tuple input_tuple
tuple< streaming_device_with_key< typename StreamFactory::device_type, typename key_from_policy< JP >::type >, typename async_msg_type< Ports >::type... > kernel_input_tuple
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d __itt_event ITT_FORMAT __itt_group_mark d void const wchar_t const wchar_t int ITT_FORMAT __itt_group_sync __itt_group_fsync x void const wchar_t int const wchar_t int int ITT_FORMAT __itt_group_sync __itt_group_fsync x void ITT_FORMAT __itt_group_sync __itt_group_fsync p void ITT_FORMAT __itt_group_sync __itt_group_fsync p void size_t ITT_FORMAT lu no args __itt_obj_prop_t __itt_obj_state_t ITT_FORMAT d const char ITT_FORMAT s __itt_frame ITT_FORMAT p const char const char ITT_FORMAT s __itt_counter ITT_FORMAT p __itt_counter unsigned long long ITT_FORMAT lu const wchar_t ITT_FORMAT S __itt_mark_type const wchar_t ITT_FORMAT S __itt_mark_type const char ITT_FORMAT s __itt_mark_type ITT_FORMAT d __itt_caller ITT_FORMAT p __itt_caller ITT_FORMAT p no args const __itt_domain __itt_clock_domain unsigned long long __itt_id ITT_FORMAT lu const __itt_domain __itt_clock_domain unsigned long long __itt_id __itt_id void * fn
is_port_ref_impl< typename tbb::internal::strip< T >::type >::type type
StreamFactory::template async_msg_type< typename tbb::internal::strip< T >::type > type
internal::port_ref_impl< N1, N2 > port_ref()
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
static void doit(F &f, Tuple &, Args2 &... args2)
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >(*fn)(), Args1 &... args1, Args2 &... args2)
K key_from_message(const T &t)
Definition: flow_graph.h:691
args_storage_base * make_args_storage(const args_storage_base &storage, Args &&... args) const
T or_return_values(T &&t)
composite_node< input_tuple, output_tuple > base_type
static void doit(F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
run_kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage)
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op, internal::sequence< S... >) const
bool_constant< false > false_type
Definition: tbb_stddef.h:469
multifunction_node< kernel_input_tuple, output_tuple > kernel_multifunction_node
device_selector(UserFunctor uf, streaming_node &n, StreamFactory &f)
bool_constant< true > true_type
Definition: tbb_stddef.h:468
streaming_node(graph &g, const kernel_type &kernel, DeviceSelector d, StreamFactory &f)
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N, N >(*fn)(), Args1 &... args1, Args2 &... args2)
args_storage(const kernel_type &kernel, StreamFactory &f, Args &&... args)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
static run_finalize_func< FinalizeFn > make_run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
base_type::output_ports_type get_output_ports(internal::sequence< S... >)
indexer_node< typename async_msg_type< Ports >::type... > indexer_node_type
run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:532

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.