Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_node_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_node_impl_H
18 #define __TBB__flow_graph_node_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
25 
27 namespace internal {
28 
32 
33  template< typename T, typename A >
34  class function_input_queue : public item_buffer<T,A> {
35  public:
36  bool empty() const {
37  return this->buffer_empty();
38  }
39 
40  const T& front() const {
41  return this->item_buffer<T, A>::front();
42  }
43 
44  bool pop( T& t ) {
45  return this->pop_front( t );
46  }
47 
48  void pop() {
49  this->destroy_front();
50  }
51 
52  bool push( T& t ) {
53  return this->push_back( t );
54  }
55  };
56 
58  // The only up-ref is apply_body_impl, which should implement the function
59  // call and any handling of the result.
60  template< typename Input, typename Policy, typename A, typename ImplType >
61  class function_input_base : public receiver<Input>, tbb::internal::no_assign {
63 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
64  , add_blt_pred, del_blt_pred,
65  blt_pred_cnt, blt_pred_cpy // create vector copies of preds and succs
66 #endif
67  };
69 
70  public:
71 
73  typedef Input input_type;
74  typedef typename receiver<input_type>::predecessor_type predecessor_type;
77  typedef typename A::template rebind< input_queue_type >::other queue_allocator_type;
79  "queueing and rejecting policies can't be specified simultaneously");
80 
81 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
82  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
83  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
84 #endif
85 
90  , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(priority))
91  , my_queue(!internal::has_policy<rejecting, Policy>::value ? new input_queue_type() : NULL)
92  , forwarder_busy(false)
93  {
95  my_aggregator.initialize_handler(handler_type(this));
96  }
97 
100  : receiver<Input>(), tbb::internal::no_assign()
102  , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(src.my_priority))
103  , my_queue(src.my_queue ? new input_queue_type() : NULL), forwarder_busy(false)
104  {
106  my_aggregator.initialize_handler(handler_type(this));
107  }
108 
110  // The queue is allocated by the constructor for {multi}function_node.
111  // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
112  // This would be an interface-breaking change.
114  if ( my_queue ) delete my_queue;
115  }
116 
119  }
120 
123  operation_type op_data(reg_pred);
124  op_data.r = &src;
125  my_aggregator.execute(&op_data);
126  return true;
127  }
128 
131  operation_type op_data(rem_pred);
132  op_data.r = &src;
133  my_aggregator.execute(&op_data);
134  return true;
135  }
136 
137 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
138  void internal_add_built_predecessor( predecessor_type &src) __TBB_override {
140  operation_type op_data(add_blt_pred);
141  op_data.r = &src;
142  my_aggregator.execute(&op_data);
143  }
144 
146  void internal_delete_built_predecessor( predecessor_type &src) __TBB_override {
147  operation_type op_data(del_blt_pred);
148  op_data.r = &src;
149  my_aggregator.execute(&op_data);
150  }
151 
152  size_t predecessor_count() __TBB_override {
153  operation_type op_data(blt_pred_cnt);
154  my_aggregator.execute(&op_data);
155  return op_data.cnt_val;
156  }
157 
158  void copy_predecessors(predecessor_list_type &v) __TBB_override {
159  operation_type op_data(blt_pred_cpy);
160  op_data.predv = &v;
161  my_aggregator.execute(&op_data);
162  }
163 
164  built_predecessors_type &built_predecessors() __TBB_override {
165  return my_predecessors.built_predecessors();
166  }
167 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
168 
169  protected:
170 
172  my_concurrency = 0;
173  if(my_queue) {
174  my_queue->reset();
175  }
176  reset_receiver(f);
177  forwarder_busy = false;
178  }
179 
180  graph& my_graph_ref;
181  const size_t my_max_concurrency;
186 
189  else
191  __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
192  }
193 
195  return my_graph_ref;
196  }
197 
199  operation_type op_data(i, app_body_bypass); // tries to pop an item or get_item
200  my_aggregator.execute(&op_data);
201  return op_data.bypass_t;
202  }
203 
204  private:
205 
208 
209  class operation_type : public aggregated_operation< operation_type > {
210  public:
211  char type;
212  union {
215 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
216  size_t cnt_val;
217  predecessor_list_type *predv;
218 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
219  };
222  type(char(t)), elem(const_cast<input_type*>(&e)) {}
223  operation_type(op_type t) : type(char(t)), r(NULL) {}
224  };
225 
227  typedef internal::aggregating_functor<class_type, operation_type> handler_type;
228  friend class internal::aggregating_functor<class_type, operation_type>;
230 
232  task* new_task = NULL;
233  if(my_queue) {
234  if(!my_queue->empty()) {
235  ++my_concurrency;
236  new_task = create_body_task(my_queue->front());
237 
238  my_queue->pop();
239  }
240  }
241  else {
242  input_type i;
243  if(my_predecessors.get_item(i)) {
244  ++my_concurrency;
245  new_task = create_body_task(i);
246  }
247  }
248  return new_task;
249  }
250  void handle_operations(operation_type *op_list) {
251  operation_type *tmp;
252  while (op_list) {
253  tmp = op_list;
254  op_list = op_list->next;
255  switch (tmp->type) {
256  case reg_pred:
257  my_predecessors.add(*(tmp->r));
258  __TBB_store_with_release(tmp->status, SUCCEEDED);
259  if (!forwarder_busy) {
260  forwarder_busy = true;
262  }
263  break;
264  case rem_pred:
265  my_predecessors.remove(*(tmp->r));
266  __TBB_store_with_release(tmp->status, SUCCEEDED);
267  break;
268  case app_body_bypass: {
269  tmp->bypass_t = NULL;
270  __TBB_ASSERT(my_max_concurrency != 0, NULL);
271  --my_concurrency;
273  tmp->bypass_t = perform_queued_requests();
274 
275  __TBB_store_with_release(tmp->status, SUCCEEDED);
276  }
277  break;
278  case tryput_bypass: internal_try_put_task(tmp); break;
279  case try_fwd: internal_forward(tmp); break;
280  case occupy_concurrency:
282  ++my_concurrency;
283  __TBB_store_with_release(tmp->status, SUCCEEDED);
284  } else {
285  __TBB_store_with_release(tmp->status, FAILED);
286  }
287  break;
288 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
289  case add_blt_pred: {
290  my_predecessors.internal_add_built_predecessor(*(tmp->r));
291  __TBB_store_with_release(tmp->status, SUCCEEDED);
292  }
293  break;
294  case del_blt_pred:
295  my_predecessors.internal_delete_built_predecessor(*(tmp->r));
296  __TBB_store_with_release(tmp->status, SUCCEEDED);
297  break;
298  case blt_pred_cnt:
299  tmp->cnt_val = my_predecessors.predecessor_count();
300  __TBB_store_with_release(tmp->status, SUCCEEDED);
301  break;
302  case blt_pred_cpy:
303  my_predecessors.copy_predecessors( *(tmp->predv) );
304  __TBB_store_with_release(tmp->status, SUCCEEDED);
305  break;
306 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
307  }
308  }
309  }
310 
312  void internal_try_put_task(operation_type *op) {
313  __TBB_ASSERT(my_max_concurrency != 0, NULL);
315  ++my_concurrency;
316  task * new_task = create_body_task(*(op->elem));
317  op->bypass_t = new_task;
318  __TBB_store_with_release(op->status, SUCCEEDED);
319  } else if ( my_queue && my_queue->push(*(op->elem)) ) {
320  op->bypass_t = SUCCESSFULLY_ENQUEUED;
321  __TBB_store_with_release(op->status, SUCCEEDED);
322  } else {
323  op->bypass_t = NULL;
324  __TBB_store_with_release(op->status, FAILED);
325  }
326  }
327 
329  void internal_forward(operation_type *op) {
330  op->bypass_t = NULL;
332  op->bypass_t = perform_queued_requests();
333  if(op->bypass_t)
334  __TBB_store_with_release(op->status, SUCCEEDED);
335  else {
336  forwarder_busy = false;
337  __TBB_store_with_release(op->status, FAILED);
338  }
339  }
340 
342  operation_type op_data(t, tryput_bypass);
343  my_aggregator.execute(&op_data);
344  if( op_data.status == internal::SUCCEEDED ) {
345  return op_data.bypass_t;
346  }
347  return NULL;
348  }
349 
351  if( my_max_concurrency == 0 ) {
352  return apply_body_bypass(t);
353  } else {
354  operation_type check_op(t, occupy_concurrency);
355  my_aggregator.execute(&check_op);
356  if( check_op.status == internal::SUCCEEDED ) {
357  return apply_body_bypass(t);
358  }
359  return internal_try_put_bypass(t);
360  }
361  }
362 
364  if( my_max_concurrency == 0 ) {
365  return create_body_task(t);
366  } else {
367  return internal_try_put_bypass(t);
368  }
369  }
370 
372  // then decides if more work is available
374  return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
375  }
376 
378  inline task * create_body_task( const input_type &input ) {
380  new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
382  *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(input, my_priority))
383  : NULL;
384  }
385 
388  operation_type op_data(try_fwd);
389  task* rval = NULL;
390  do {
391  op_data.status = WAIT;
392  my_aggregator.execute(&op_data);
393  if(op_data.status == SUCCEEDED) {
394  task* ttask = op_data.bypass_t;
395  __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, NULL );
396  rval = combine_tasks(my_graph_ref, rval, ttask);
397  }
398  } while (op_data.status == SUCCEEDED);
399  return rval;
400  }
401 
404  new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
406  : NULL;
407  }
408 
410  inline void spawn_forward_task() {
411  task* tp = create_forward_task();
412  if(tp) {
414  }
415  }
416  }; // function_input_base
417 
419  // a type Output to its successors.
420  template< typename Input, typename Output, typename Policy, typename A>
421  class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
422  public:
423  typedef Input input_type;
424  typedef Output output_type;
429 
430  // constructor
431  template<typename Body>
433  graph &g, size_t max_concurrency,
434  __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
436  , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
437  , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) {
438  }
439 
442  base_type(src),
443  my_body( src.my_init_body->clone() ),
444  my_init_body(src.my_init_body->clone() ) {
445  }
446 
448  delete my_body;
449  delete my_init_body;
450  }
451 
452  template< typename Body >
454  function_body_type &body_ref = *this->my_body;
455  return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
456  }
457 
459  // There is an extra copied needed to capture the
460  // body execution without the try_put
462  output_type v = (*my_body)(i);
464  return v;
465  }
466 
467  //TODO: consider moving into the base class
470 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
471  task* successor_task = successors().try_put_task(v);
472 #endif
473  task* postponed_task = NULL;
474  if( base_type::my_max_concurrency != 0 ) {
475  postponed_task = base_type::try_get_postponed_task(i);
476  __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, NULL );
477  }
478 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
479  graph& g = base_type::my_graph_ref;
480  return combine_tasks(g, successor_task, postponed_task);
481 #else
482  if( postponed_task ) {
483  // make the task available for other workers since we do not know successors'
484  // execution policy
486  }
487  task* successor_task = successors().try_put_task(v);
488 #if _MSC_VER && !__INTEL_COMPILER
489 #pragma warning (push)
490 #pragma warning (disable: 4127) /* suppress conditional expression is constant */
491 #endif
493 #if _MSC_VER && !__INTEL_COMPILER
494 #pragma warning (pop)
495 #endif
496  if(!successor_task) {
497  // Return confirmative status since current
498  // node's body has been executed anyway
499  successor_task = SUCCESSFULLY_ENQUEUED;
500  }
501  }
502  return successor_task;
503 #endif /* TBB_DEPRECATED_MESSAGE_FLOW_ORDER */
504  }
505 
506  protected:
507 
510  if(f & rf_reset_bodies) {
512  delete my_body;
513  my_body = tmp;
514  }
515  }
516 
520 
521  }; // function_input
522 
523 
524  // helper templates to clear the successor edges of the output ports of an multifunction_node
525  template<int N> struct clear_element {
526  template<typename P> static void clear_this(P &p) {
527  (void)tbb::flow::get<N-1>(p).successors().clear();
529  }
530  template<typename P> static bool this_empty(P &p) {
531  if(tbb::flow::get<N-1>(p).successors().empty())
533  return false;
534  }
535  };
536 
537  template<> struct clear_element<1> {
538  template<typename P> static void clear_this(P &p) {
539  (void)tbb::flow::get<0>(p).successors().clear();
540  }
541  template<typename P> static bool this_empty(P &p) {
542  return tbb::flow::get<0>(p).successors().empty();
543  }
544  };
545 
546 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
547  // helper templates to extract the output ports of an multifunction_node from graph
548  template<int N> struct extract_element {
549  template<typename P> static void extract_this(P &p) {
550  (void)tbb::flow::get<N-1>(p).successors().built_successors().sender_extract(tbb::flow::get<N-1>(p));
551  extract_element<N-1>::extract_this(p);
552  }
553  };
554 
555  template<> struct extract_element<1> {
556  template<typename P> static void extract_this(P &p) {
557  (void)tbb::flow::get<0>(p).successors().built_successors().sender_extract(tbb::flow::get<0>(p));
558  }
559  };
560 #endif
561 
563  // and has a tuple of output ports specified.
564  template< typename Input, typename OutputPortSet, typename Policy, typename A>
565  class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
566  public:
568  typedef Input input_type;
569  typedef OutputPortSet output_ports_type;
574 
575  // constructor
576  template<typename Body>
578  __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
580  , my_body( new internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
581  , my_init_body( new internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) {
582  }
583 
586  base_type(src),
587  my_body( src.my_init_body->clone() ),
588  my_init_body(src.my_init_body->clone() ) {
589  }
590 
592  delete my_body;
593  delete my_init_body;
594  }
595 
596  template< typename Body >
598  multifunction_body_type &body_ref = *this->my_body;
599  return *static_cast<Body*>(dynamic_cast< internal::multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
600  }
601 
602  // for multifunction nodes we do not have a single successor as such. So we just tell
603  // the task we were successful.
604  //TODO: consider moving common parts with implementation in function_input into separate function
607  (*my_body)(i, my_output_ports);
609  task* ttask = NULL;
612  }
613  return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
614  }
615 
617 
618  protected:
619 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
620  void extract() {
621  extract_element<N>::extract_this(my_output_ports);
622  }
623 #endif
624 
625  void reset(reset_flags f) {
628  if(f & rf_reset_bodies) {
630  delete my_body;
631  my_body = tmp;
632  }
633  __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
634  }
635 
639 
640  }; // multifunction_input
641 
642  // template to refer to an output port of a multifunction_node
643  template<size_t N, typename MOP>
645  return tbb::flow::get<N>(op.output_ports());
646  }
647 
648  inline void check_task_and_spawn(graph& g, task* t) {
649  if (t && t != SUCCESSFULLY_ENQUEUED) {
651  }
652  }
653 
654  // helper structs for split_node
655  template<int N>
656  struct emit_element {
657  template<typename T, typename P>
658  static task* emit_this(graph& g, const T &t, P &p) {
659  // TODO: consider to collect all the tasks in task_list and spawn them all at once
660  task* last_task = tbb::flow::get<N-1>(p).try_put_task(tbb::flow::get<N-1>(t));
661  check_task_and_spawn(g, last_task);
662  return emit_element<N-1>::emit_this(g,t,p);
663  }
664  };
665 
666  template<>
667  struct emit_element<1> {
668  template<typename T, typename P>
669  static task* emit_this(graph& g, const T &t, P &p) {
670  task* last_task = tbb::flow::get<0>(p).try_put_task(tbb::flow::get<0>(t));
671  check_task_and_spawn(g, last_task);
672  return SUCCESSFULLY_ENQUEUED;
673  }
674  };
675 
677  template< typename Output, typename Policy>
678  class continue_input : public continue_receiver {
679  public:
680 
682  typedef continue_msg input_type;
683 
685  typedef Output output_type;
688 
689  template< typename Body >
691  : continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(/*number_of_predecessors=*/0, priority))
692  , my_graph_ref(g)
693  , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
694  , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
695  { }
696 
697  template< typename Body >
698  continue_input( graph &g, int number_of_predecessors,
699  __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
700  ) : continue_receiver( __TBB_FLOW_GRAPH_PRIORITY_ARG1(number_of_predecessors, priority) )
701  , my_graph_ref(g)
702  , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
703  , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
704  { }
705 
706  continue_input( const continue_input& src ) : continue_receiver(src),
708  my_body( src.my_init_body->clone() ),
709  my_init_body( src.my_init_body->clone() ) {}
710 
712  delete my_body;
713  delete my_init_body;
714  }
715 
716  template< typename Body >
718  function_body_type &body_ref = *my_body;
719  return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
720  }
721 
723  continue_receiver::reset_receiver(f);
724  if(f & rf_reset_bodies) {
726  delete my_body;
727  my_body = tmp;
728  }
729  }
730 
731  protected:
732 
733  graph& my_graph_ref;
736 
738 
739  friend class apply_body_task_bypass< class_type, continue_msg >;
740 
743  // There is an extra copied needed to capture the
744  // body execution without the try_put
746  output_type v = (*my_body)( continue_msg() );
748  return successors().try_put_task( v );
749  }
750 
753  return NULL;
754  }
755 #if _MSC_VER && !__INTEL_COMPILER
756 #pragma warning (push)
757 #pragma warning (disable: 4127) /* suppress conditional expression is constant */
758 #endif
760 #if _MSC_VER && !__INTEL_COMPILER
761 #pragma warning (pop)
762 #endif
763  return apply_body_bypass( continue_msg() );
764  }
765  else {
766  return new ( task::allocate_additional_child_of( *(my_graph_ref.root_task()) ) )
768  *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(continue_msg(), my_priority) );
769  }
770  }
771 
773  return my_graph_ref;
774  }
775  }; // continue_input
776 
778  template< typename Output >
779  class function_output : public sender<Output> {
780  public:
781 
782  template<int N> friend struct clear_element;
783  typedef Output output_type;
784  typedef typename sender<output_type>::successor_type successor_type;
786 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
787  typedef typename sender<output_type>::built_successors_type built_successors_type;
788  typedef typename sender<output_type>::successor_list_type successor_list_type;
789 #endif
790 
792  function_output(const function_output & /*other*/) : sender<output_type>() {
793  my_successors.set_owner(this);
794  }
795 
799  return true;
800  }
801 
805  return true;
806  }
807 
808 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
809  built_successors_type &built_successors() __TBB_override { return successors().built_successors(); }
810 
811 
812  void internal_add_built_successor( successor_type &r) __TBB_override {
813  successors().internal_add_built_successor( r );
814  }
815 
816  void internal_delete_built_successor( successor_type &r) __TBB_override {
817  successors().internal_delete_built_successor( r );
818  }
819 
820  size_t successor_count() __TBB_override {
821  return successors().successor_count();
822  }
823 
824  void copy_successors( successor_list_type &v) __TBB_override {
825  successors().copy_successors(v);
826  }
827 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
828 
829  // for multifunction_node. The function_body that implements
830  // the node will have an input and an output tuple of ports. To put
831  // an item to a successor, the body should
832  //
833  // get<I>(output_ports).try_put(output_value);
834  //
835  // if task pointer is returned will always spawn and return true, else
836  // return value will be bool returned from successors.try_put.
837  task *try_put_task(const output_type &i) { // not a virtual method in this class
838  return my_successors.try_put_task(i);
839  }
840 
842  protected:
844 
845  }; // function_output
846 
847  template< typename Output >
848  class multifunction_output : public function_output<Output> {
849  public:
850  typedef Output output_type;
853 
856 
857  bool try_put(const output_type &i) {
858  task *res = try_put_task(i);
859  if(!res) return false;
860  if(res != SUCCESSFULLY_ENQUEUED) {
861  FLOW_SPAWN(*res); // TODO: Spawn task inside arena
862  }
863  return true;
864  }
865 
866  protected:
867 
869  return my_successors.try_put_task(i);
870  }
871 
872  template <int N> friend struct emit_element;
873 
874  }; // multifunction_output
875 
876 //composite_node
877 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
878  template<typename CompositeType>
879  void add_nodes_impl(CompositeType*, bool) {}
880 
881  template< typename CompositeType, typename NodeType1, typename... NodeTypes >
882  void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
883  void *addr = const_cast<NodeType1 *>(&n1);
884 
885  fgt_alias_port(c_node, addr, visible);
886  add_nodes_impl(c_node, visible, n...);
887  }
888 #endif
889 
890 } // internal
891 
892 #endif // __TBB__flow_graph_node_impl_H
void set_owner(successor_type *owner)
broadcast_cache_type my_successors
continue_input< output_type, Policy > class_type
function_body_type * my_init_body
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
receiver< input_type >::predecessor_type predecessor_type
A task that calls a node's forward_task function.
predecessor_cache< input_type, null_mutex > my_predecessors
function_output< output_type > base_type
task * create_body_task(const input_type &input)
allocates a task to apply a body
continue_input(const continue_input &src)
function_body< input_type, output_type > function_body_type
Input input_type
The input type of this receiver.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
unsigned int node_priority_t
void reset_function_input(reset_flags f)
A functor that takes an Input and generates an Output.
task * apply_body_bypass(const input_type &i)
Applies the body to the provided input.
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
void reset_receiver(reset_flags f) __TBB_override
task * try_get_postponed_task(const input_type &i)
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
static task * emit_this(graph &g, const T &t, P &p)
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
continue_msg input_type
The input type of this receiver.
predecessor_cache< input_type, null_mutex > predecessor_cache_type
function_body that takes an Input and a set of output ports
function_input_queue< input_type, A > input_queue_type
function_body_type * my_init_body
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
aggregator< handler_type, operation_type > my_aggregator
#define __TBB_override
Definition: tbb_stddef.h:240
broadcast_cache< output_type > broadcast_cache_type
function_input_queue< input_type, A > input_queue_type
void internal_try_put_task(operation_type *op)
Put to the node, but return the task instead of enqueueing it.
function_body< input_type, output_type > function_body_type
task * try_put_task(const input_type &t) __TBB_override
Put item to successor; return task to run the successor if possible.
task * apply_body_impl_bypass(const input_type &i)
void reset_receiver(reset_flags f) __TBB_override
multifunction_body< input_type, output_ports_type > multifunction_body_type
Base class for user-defined tasks.
Definition: task.h:589
The graph class.
task * try_put_task_impl(const input_type &t, tbb::internal::true_type)
Implements methods for an executable node that takes continue_msg as input.
function_body_type * my_body
static void fgt_end_body(void *)
Implements methods for a function node that takes a type Input as input.
Implements methods for a function node that takes a type Input as input and sends.
#define FLOW_SPAWN(a)
Definition: flow_graph.h:49
multifunction_input< Input, OutputPortSet, Policy, A > my_class
void add_nodes_impl(CompositeType *, bool)
Input and scheduling for a function node that takes a type Input as input.
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
virtual ~function_input_base()
Destructor.
virtual multifunction_body * clone()=0
static tbb::task *const SUCCESSFULLY_ENQUEUED
function_input_base(const function_input_base &src)
Copy constructor.
function_input_base< Input, Policy, A, my_class > base_type
A task that calls a node's apply_body_bypass function, passing in an input of type Input.
task * try_put_task(const T &t) __TBB_override
internal::aggregating_functor< class_type, operation_type > handler_type
graph & graph_reference() __TBB_override
function_input_queue< input_type, A > input_queue_type
function_input_base< Input, Policy, A, my_class > base_type
multifunction_body_type * my_init_body
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
A::template rebind< input_queue_type >::other queue_allocator_type
multifunction_output(const multifunction_output &)
__TBB_STATIC_ASSERT(!((internal::has_policy< queueing, Policy >::value) &&(internal::has_policy< rejecting, Policy >::value)), "queueing and rejecting policies can't be specified simultaneously")
void remove_successor(successor_type &r)
Output output_type
The output type of this receiver.
Implements methods for both executable and function nodes that puts Output to its successors.
virtual broadcast_cache< output_type > & successors()=0
void check_task_and_spawn(graph &g, task *t)
void set_owner(owner_type *owner)
output_type apply_body_impl(const input_type &i)
continue_input(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body &body, node_priority_t priority))
task * try_put_task(const output_type &i)
void reset_function_input_base(reset_flags f)
virtual broadcast_cache< output_type > & successors()=0
function_input(graph &g, size_t max_concurrency,)
sender< output_type >::successor_type successor_type
static void fgt_begin_body(void *)
multifunction_body_type * my_body
void internal_forward(operation_type *op)
Creates tasks for postponed messages if available and if concurrency allows.
function_input_base< Input, Policy, A, ImplType > class_type
bool try_put(const output_type &i)
function_input_base(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(size_t max_concurrency, node_priority_t priority))
Constructor for function_input_base.
task * try_put_task_impl(const input_type &t, tbb::internal::false_type)
task * internal_try_put_bypass(const input_type &t)
multifunction_input(const multifunction_input &src)
Copy constructor.
static void fgt_alias_port(void *, void *, bool)
bool register_successor(successor_type &r) __TBB_override
Adds a new successor to this node.
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
Input input_type
The input type of this receiver.
Definition: flow_graph.h:436
task * execute() __TBB_override
void const char const char int ITT_FORMAT __itt_group_sync p
function_output(const function_output &)
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
void handle_operations(operation_type *op_list)
int max_concurrency()
Returns the maximal number of threads that can work inside the arena.
Definition: task_arena.h:413
continue_input(graph &g, int number_of_predecessors,)
function_body_type * my_body
broadcast_cache_type & successors()
task * forward_task()
This is executed by an enqueued task, the "forwarder".
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
the leaf for function_body
const item_type & front() const
void register_successor(successor_type &r)
void spawn_forward_task()
Spawns a task that calls forward()
virtual function_body * clone()=0
function_input(const function_input &src)
Copy constructor.
task * try_put_task(const output_type &i)
void * addr
graph & graph_reference() __TBB_override
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:167
task * apply_body_bypass(input_type)
Applies the body to the provided input.
static task * emit_this(graph &g, const T &t, P &p)
function_input< Input, Output, Policy, A > my_class
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
leaf for multifunction. OutputSet can be a std::tuple or a vector.
multifunction_input(graph &g, size_t max_concurrency,)
task * apply_body_impl_bypass(const input_type &i)

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.