Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
flow_graph.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #include "tbb_stddef.h"
21 #include "atomic.h"
22 #include "spin_mutex.h"
23 #include "null_mutex.h"
24 #include "spin_rw_mutex.h"
25 #include "null_rw_mutex.h"
26 #include "task.h"
28 #include "tbb_exception.h"
31 #include "tbb_profiling.h"
32 #include "task_arena.h"
33 
34 #if __TBB_PREVIEW_ASYNC_MSG
35 #include <vector> // std::vector in internal::async_storage
36 #include <memory> // std::shared_ptr in async_msg
37 #endif
38 
39 #if __TBB_PREVIEW_STREAMING_NODE
40 // For streaming_node
41 #include <array> // std::array
42 #include <unordered_map> // std::unordered_map
43 #include <type_traits> // std::decay, std::true_type, std::false_type
44 #endif // __TBB_PREVIEW_STREAMING_NODE
45 
46 #if TBB_DEPRECATED_FLOW_ENQUEUE
47 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
48 #else
49 #define FLOW_SPAWN(a) tbb::task::spawn((a))
50 #endif
51 
52 // use the VC10 or gcc version of tuple if it is available.
53 #if __TBB_CPP11_TUPLE_PRESENT
54  #include <tuple>
55 namespace tbb {
56  namespace flow {
57  using std::tuple;
58  using std::tuple_size;
59  using std::tuple_element;
60  using std::get;
61  }
62 }
63 #else
64  #include "compat/tuple"
65 #endif
66 
67 #include<list>
68 #include<queue>
69 
80 namespace tbb {
81 namespace flow {
82 
84 enum concurrency { unlimited = 0, serial = 1 };
85 
86 namespace interface10 {
87 
89 struct null_type {};
90 
92 class continue_msg {};
93 
95 template< typename T > class sender;
96 template< typename T > class receiver;
98 template< typename T > class limiter_node; // needed for resetting decrementer
99 template< typename R, typename B > class run_and_put_task;
101 namespace internal {
103 template<typename T, typename M> class successor_cache;
104 template<typename T, typename M> class broadcast_cache;
105 template<typename T, typename M> class round_robin_cache;
106 template<typename T, typename M> class predecessor_cache;
107 template<typename T, typename M> class reservable_predecessor_cache;
109 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
110 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
111 // C == receiver< ... > or sender< ... >, depending.
112 template<typename C>
113 class edge_container {
115 public:
116  typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
118  void add_edge(C &s) {
119  built_edges.push_back(&s);
120  }
122  void delete_edge(C &s) {
123  for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
124  if (*i == &s) {
125  (void)built_edges.erase(i);
126  return; // only remove one predecessor per request
127  }
128  }
129  }
131  void copy_edges(edge_list_type &v) {
132  v = built_edges;
133  }
135  size_t edge_count() {
136  return (size_t)(built_edges.size());
137  }
139  void clear() {
140  built_edges.clear();
141  }
143  // methods remove the statement from all predecessors/successors liste in the edge
144  // container.
145  template< typename S > void sender_extract(S &s);
146  template< typename R > void receiver_extract(R &r);
148 private:
149  edge_list_type built_edges;
150 }; // class edge_container
151 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
152 
153 } // namespace internal
155 } // namespace interface10
156 } // namespace flow
157 } // namespace tbb
158 
161 
162 namespace tbb {
163 namespace flow {
164 namespace interface10 {
165 
166 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
167 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
168  // if no RHS task, don't change left.
169  if (right == NULL) return left;
170  // right != NULL
171  if (left == NULL) return right;
172  if (left == SUCCESSFULLY_ENQUEUED) return right;
173  // left contains a task
174  if (right != SUCCESSFULLY_ENQUEUED) {
175  // both are valid tasks
177  return right;
178  }
179  return left;
180 }
182 #if __TBB_PREVIEW_ASYNC_MSG
184 template < typename T > class async_msg;
186 namespace internal {
188 template < typename T > class async_storage;
190 template< typename T, typename = void >
193  typedef T filtered_type;
195  static const bool is_async_type = false;
197  static const void* to_void_ptr(const T& t) {
198  return static_cast<const void*>(&t);
199  }
201  static void* to_void_ptr(T& t) {
202  return static_cast<void*>(&t);
203  }
205  static const T& from_void_ptr(const void* p) {
206  return *static_cast<const T*>(p);
207  }
209  static T& from_void_ptr(void* p) {
210  return *static_cast<T*>(p);
211  }
213  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
214  if (is_async) {
215  // This (T) is NOT async and incoming 'A<X> t' IS async
216  // Get data from async_msg
218  task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
219  // finalize() must be called after subscribe() because set() can be called in finalize()
220  // and 'this_recv' client must be subscribed by this moment
221  msg.finalize();
222  return new_task;
223  }
224  else {
225  // Incoming 't' is NOT async
226  return this_recv->try_put_task(from_void_ptr(p));
227  }
228  }
229 };
231 template< typename T >
232 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
233  typedef T async_type;
234  typedef typename T::async_msg_data_type filtered_type;
236  static const bool is_async_type = true;
238  // Receiver-classes use const interfaces
239  static const void* to_void_ptr(const T& t) {
240  return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
241  }
243  static void* to_void_ptr(T& t) {
244  return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
245  }
247  // Sender-classes use non-const interfaces
248  static const T& from_void_ptr(const void* p) {
249  return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
250  }
252  static T& from_void_ptr(void* p) {
253  return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
254  }
256  // Used in receiver<T> class
257  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
258  if (is_async) {
259  // Both are async
260  return this_recv->try_put_task(from_void_ptr(p));
261  }
262  else {
263  // This (T) is async and incoming 'X t' is NOT async
264  // Create async_msg for X
266  const T msg(t);
267  return this_recv->try_put_task(msg);
268  }
269  }
270 };
275  template< typename, typename > friend class internal::predecessor_cache;
276  template< typename, typename > friend class internal::reservable_predecessor_cache;
277 public:
281  virtual ~untyped_sender() {}
283  // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
285  // TODO: Prevent untyped successor registration
288  virtual bool register_successor( successor_type &r ) = 0;
291  virtual bool remove_successor( successor_type &r ) = 0;
294  virtual bool try_release( ) { return false; }
297  virtual bool try_consume( ) { return false; }
299 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
300  typedef internal::edge_container<successor_type> built_successors_type;
302  typedef built_successors_type::edge_list_type successor_list_type;
303  virtual built_successors_type &built_successors() = 0;
304  virtual void internal_add_built_successor( successor_type & ) = 0;
305  virtual void internal_delete_built_successor( successor_type & ) = 0;
306  virtual void copy_successors( successor_list_type &) = 0;
307  virtual size_t successor_count() = 0;
308 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
309 protected:
311  template< typename X >
312  bool try_get( X &t ) {
314  }
317  template< typename X >
318  bool try_reserve( X &t ) {
320  }
322  virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
323  virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
324 };
326 class untyped_receiver {
327  template< typename, typename > friend class run_and_put_task;
328  template< typename > friend class limiter_node;
330  template< typename, typename > friend class internal::broadcast_cache;
331  template< typename, typename > friend class internal::round_robin_cache;
332  template< typename, typename > friend class internal::successor_cache;
334 #if __TBB_PREVIEW_OPENCL_NODE
335  template< typename, typename > friend class proxy_dependency_receiver;
336 #endif /* __TBB_PREVIEW_OPENCL_NODE */
337 public:
342  virtual ~untyped_receiver() {}
345  template<typename X>
346  bool try_put(const X& t) {
347  task *res = try_put_task(t);
348  if (!res) return false;
350  return true;
351  }
353  // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
355  // TODO: Prevent untyped predecessor registration
356 
358  virtual bool register_predecessor( predecessor_type & ) { return false; }
361  virtual bool remove_predecessor( predecessor_type & ) { return false; }
363 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
364  typedef internal::edge_container<predecessor_type> built_predecessors_type;
365  typedef built_predecessors_type::edge_list_type predecessor_list_type;
366  virtual built_predecessors_type &built_predecessors() = 0;
367  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
368  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
369  virtual void copy_predecessors( predecessor_list_type & ) = 0;
370  virtual size_t predecessor_count() = 0;
371 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
372 protected:
373  template<typename X>
374  task *try_put_task(const X& t) {
376  }
378  virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
380  virtual graph& graph_reference() = 0;
382  // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
387  virtual bool is_continue_receiver() { return false; }
388 };
389 
390 } // namespace internal
393 template< typename T >
395 public:
397  typedef T output_type;
402  virtual bool try_get( T & ) { return false; }
405  virtual bool try_reserve( T & ) { return false; }
407 protected:
408  virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
409  // Both async OR both are NOT async
412  }
413  // Else: this (T) is async OR incoming 't' is async
414  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
415  return false;
416  }
418  virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
419  // Both async OR both are NOT async
422  }
423  // Else: this (T) is async OR incoming 't' is async
424  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
425  return false;
426  }
427 }; // class sender<T>
430 template< typename T >
432  template< typename > friend class internal::async_storage;
433  template< typename, typename > friend struct internal::async_helpers;
434 public:
436  typedef T input_type;
437 
443  }
447  }
449 protected:
450  virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
452  }
455  virtual task *try_put_task(const T& t) = 0;
457 }; // class receiver<T>
459 #else // __TBB_PREVIEW_ASYNC_MSG
462 template< typename T >
463 class sender {
464 public:
466  typedef T output_type;
471  virtual ~sender() {}
473  // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
476  virtual bool register_successor( successor_type &r ) = 0;
479  virtual bool remove_successor( successor_type &r ) = 0;
482  virtual bool try_get( T & ) { return false; }
483 
485  virtual bool try_reserve( T & ) { return false; }
486 
488  virtual bool try_release( ) { return false; }
491  virtual bool try_consume( ) { return false; }
492 
493 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
494  typedef typename internal::edge_container<successor_type> built_successors_type;
496  typedef typename built_successors_type::edge_list_type successor_list_type;
497  virtual built_successors_type &built_successors() = 0;
498  virtual void internal_add_built_successor( successor_type & ) = 0;
499  virtual void internal_delete_built_successor( successor_type & ) = 0;
500  virtual void copy_successors( successor_list_type &) = 0;
501  virtual size_t successor_count() = 0;
502 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
503 }; // class sender<T>
504 
506 template< typename T >
507 class receiver {
508 public:
510  typedef T input_type;
511 
513  typedef sender<T> predecessor_type;
516  virtual ~receiver() {}
519  bool try_put( const T& t ) {
520  task *res = try_put_task(t);
521  if (!res) return false;
523  return true;
524  }
527 protected:
528  template< typename R, typename B > friend class run_and_put_task;
529  template< typename X, typename Y > friend class internal::broadcast_cache;
530  template< typename X, typename Y > friend class internal::round_robin_cache;
531  virtual task *try_put_task(const T& t) = 0;
532  virtual graph& graph_reference() = 0;
533 public:
534  // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
537  virtual bool register_predecessor( predecessor_type & ) { return false; }
540  virtual bool remove_predecessor( predecessor_type & ) { return false; }
542 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
543  typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
544  typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
545  virtual built_predecessors_type &built_predecessors() = 0;
546  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
547  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
548  virtual void copy_predecessors( predecessor_list_type & ) = 0;
549  virtual size_t predecessor_count() = 0;
550 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
552 protected:
554  template<typename U> friend class limiter_node;
556 
557  template<typename TT, typename M> friend class internal::successor_cache;
558  virtual bool is_continue_receiver() { return false; }
560 #if __TBB_PREVIEW_OPENCL_NODE
561  template< typename, typename > friend class proxy_dependency_receiver;
562 #endif /* __TBB_PREVIEW_OPENCL_NODE */
563 }; // class receiver<T>
565 #endif // __TBB_PREVIEW_ASYNC_MSG
569 class continue_receiver : public receiver< continue_msg > {
570 public:
574 
580  __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
583  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
584  }
589  my_current_count = 0;
590  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
591  }
597  return true;
598  }
601 
607  return true;
608  }
609 
610 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
611  typedef internal::edge_container<predecessor_type> built_predecessors_type;
612  typedef built_predecessors_type::edge_list_type predecessor_list_type;
613  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
614 
615  void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
617  my_built_predecessors.add_edge( s );
618  }
619 
620  void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
622  my_built_predecessors.delete_edge(s);
623  }
625  void copy_predecessors( predecessor_list_type &v) __TBB_override {
627  my_built_predecessors.copy_edges(v);
628  }
630  size_t predecessor_count() __TBB_override {
632  return my_built_predecessors.edge_count();
633  }
635 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
637 protected:
638  template< typename R, typename B > friend class run_and_put_task;
639  template<typename X, typename Y> friend class internal::broadcast_cache;
640  template<typename X, typename Y> friend class internal::round_robin_cache;
641  // execute body is supposed to be too small to create a task for.
643  {
647  else
649  }
650  task * res = execute();
651  return res? res : SUCCESSFULLY_ENQUEUED;
652  }
654 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
655  // continue_receiver must contain its own built_predecessors because it does
656  // not have a node_cache.
657  built_predecessors_type my_built_predecessors;
658 #endif
664  // the friend declaration in the base class did not eliminate the "protected class"
665  // error in gcc 4.1.2
666  template<typename U> friend class limiter_node;
670  if (f & rf_clear_edges) {
671 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
672  my_built_predecessors.clear();
673 #endif
675  }
676  }
679 
681  virtual task * execute() = 0;
682  template<typename TT, typename M> friend class internal::successor_cache;
683  bool is_continue_receiver() __TBB_override { return true; }
685 }; // class continue_receiver
687 } // interfaceX
689 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
690  template <typename K, typename T>
691  K key_from_message( const T &t ) {
692  return t.key();
693  }
694 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
695 
699 } // flow
700 } // tbb
705 namespace tbb {
706 namespace flow {
707 namespace interface10 {
708 
712 #if __TBB_PREVIEW_ASYNC_MSG
714 #endif
716 
717 template <typename C, typename N>
718 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
719 {
720  if (begin) current_node = my_graph->my_nodes;
721  //else it is an end iterator by default
722 }
723 
724 template <typename C, typename N>
726  __TBB_ASSERT(current_node, "graph_iterator at end");
727  return *operator->();
728 }
730 template <typename C, typename N>
732  return current_node;
733 }
735 template <typename C, typename N>
737  if (current_node) current_node = current_node->next;
738 }
741 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
743  own_context = true;
744  cancelled = false;
745  caught_exception = false;
746  my_context = new task_group_context(tbb::internal::FLOW_TASKS);
750  my_is_active = true;
751 }
753 inline graph::graph(task_group_context& use_this_context) :
754  my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
755  prepare_task_arena();
756  own_context = false;
757  my_root_task = (new (task::allocate_root(*my_context)) empty_task);
758  my_root_task->set_ref_count(1);
760  my_is_active = true;
761 }
762 
763 inline graph::~graph() {
764  wait_for_all();
766  tbb::task::destroy(*my_root_task);
767  if (own_context) delete my_context;
768  delete my_task_arena;
769 }
770 
771 inline void graph::reserve_wait() {
775  }
776 }
777 
778 inline void graph::release_wait() {
782  }
783 }
786  n->next = NULL;
787  {
790  if (my_nodes_last) my_nodes_last->next = n;
792  if (!my_nodes) my_nodes = n;
793  }
794 }
795 
797  {
799  __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
800  if (n->prev) n->prev->next = n->next;
801  if (n->next) n->next->prev = n->prev;
802  if (my_nodes_last == n) my_nodes_last = n->prev;
803  if (my_nodes == n) my_nodes = n->next;
804  }
805  n->prev = n->next = NULL;
806 }
807 
808 inline void graph::reset( reset_flags f ) {
809  // reset context
811 
812  if(my_context) my_context->reset();
813  cancelled = false;
814  caught_exception = false;
815  // reset all the nodes comprising the graph
816  for(iterator ii = begin(); ii != end(); ++ii) {
817  graph_node *my_p = &(*ii);
818  my_p->reset_node(f);
819  }
820  // Reattach the arena. Might be useful to run the graph in a particular task_arena
821  // while not limiting graph lifetime to a single task_arena::execute() call.
822  prepare_task_arena( /*reinit=*/true );
824  // now spawn the tasks necessary to start the graph
825  for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
826  internal::spawn_in_graph_arena(*this, *(*rti));
827  }
828  my_reset_task_list.clear();
829 }
831 inline graph::iterator graph::begin() { return iterator(this, true); }
832 
833 inline graph::iterator graph::end() { return iterator(this, false); }
834 
835 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
836 
837 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
838 
839 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
840 
841 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
843 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
844 inline void graph::set_name(const char *name) {
846 }
847 #endif
849 inline graph_node::graph_node(graph& g) : my_graph(g) {
851 }
852 
855 }
856 
858 
860 template < typename Output >
861 class source_node : public graph_node, public sender< Output > {
862 public:
864  typedef Output output_type;
865 
869  //Source node has no input type
871 
872 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
873  typedef typename sender<output_type>::built_successors_type built_successors_type;
874  typedef typename sender<output_type>::successor_list_type successor_list_type;
875 #endif
876 
878  template< typename Body >
879  source_node( graph &g, Body body, bool is_active = true )
880  : graph_node(g), my_active(is_active), init_my_active(is_active),
881  my_body( new internal::source_body_leaf< output_type, Body>(body) ),
882  my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
883  my_reserved(false), my_has_cached_item(false)
884  {
885  my_successors.set_owner(this);
886  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
887  static_cast<sender<output_type> *>(this), this->my_body );
888  }
891  source_node( const source_node& src ) :
892  graph_node(src.my_graph), sender<Output>(),
895  my_reserved(false), my_has_cached_item(false)
896  {
897  my_successors.set_owner(this);
898  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
899  static_cast<sender<output_type> *>(this), this->my_body );
900  }
901 
903  ~source_node() { delete my_body; delete my_init_body; }
904 
905 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
906  void set_name( const char *name ) __TBB_override {
908  }
909 #endif
915  if ( my_active )
916  spawn_put();
917  return true;
918  }
919 
924  return true;
925  }
927 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
928 
929  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
931  void internal_add_built_successor( successor_type &r) __TBB_override {
933  my_successors.internal_add_built_successor(r);
934  }
936  void internal_delete_built_successor( successor_type &r) __TBB_override {
938  my_successors.internal_delete_built_successor(r);
939  }
940 
941  size_t successor_count() __TBB_override {
943  return my_successors.successor_count();
944  }
945 
946  void copy_successors(successor_list_type &v) __TBB_override {
948  my_successors.copy_successors(v);
949  }
950 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
955  if ( my_reserved )
956  return false;
957 
958  if ( my_has_cached_item ) {
959  v = my_cached_item;
961  return true;
962  }
963  // we've been asked to provide an item, but we have none. enqueue a task to
964  // provide one.
965  spawn_put();
966  return false;
967  }
972  if ( my_reserved ) {
973  return false;
974  }
975 
976  if ( my_has_cached_item ) {
977  v = my_cached_item;
978  my_reserved = true;
979  return true;
980  } else {
981  return false;
982  }
983  }
984 
989  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
990  my_reserved = false;
991  if(!my_successors.empty())
993  return true;
994  }
995 
999  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1000  my_reserved = false;
1001  my_has_cached_item = false;
1002  if ( !my_successors.empty() ) {
1003  spawn_put();
1004  }
1005  return true;
1006  }
1007 
1009  void activate() {
1011  my_active = true;
1012  if (!my_successors.empty())
1013  spawn_put();
1014  }
1015 
1016  template<typename Body>
1019  return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1020  }
1021 
1022 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1023  void extract( ) __TBB_override {
1024  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1026  my_reserved = false;
1028  }
1029 #endif
1030 
1031 protected:
1032 
1036  my_reserved =false;
1037  if(my_has_cached_item) {
1038  my_has_cached_item = false;
1039  }
1043  delete my_body;
1044  my_body = tmp;
1045  }
1048  }
1049 
1050 private:
1060 
1061  // used by apply_body_bypass, can invoke body of node.
1064  if ( my_reserved ) {
1065  return false;
1066  }
1069  bool r = (*my_body)(my_cached_item);
1071  if (r) {
1073  }
1074  }
1076  v = my_cached_item;
1077  my_reserved = true;
1078  return true;
1079  } else {
1080  return false;
1081  }
1082  }
1083 
1084  // when resetting, and if the source_node was created with my_active == true, then
1085  // when we reset the node we must store a task to run the node, and spawn it only
1086  // after the reset is complete and is_active() is again true. This is why we don't
1087  // test for is_active() here.
1089  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1091  }
1094  void spawn_put( ) {
1095  if(internal::is_graph_active(this->my_graph)) {
1097  }
1098  }
1099 
1103  output_type v;
1104  if ( !try_reserve_apply_body(v) )
1105  return NULL;
1106 
1107  task *last_task = my_successors.try_put_task(v);
1108  if ( last_task )
1109  try_consume();
1110  else
1111  try_release();
1112  return last_task;
1113  }
1114 }; // class source_node
1115 
1117 template < typename Input, typename Output = continue_msg, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1118 class function_node : public graph_node, public internal::function_input<Input,Output,Policy,Allocator>, public internal::function_output<Output> {
1119 public:
1120  typedef Input input_type;
1121  typedef Output output_type;
1127 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1128  typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1129  typedef typename fOutput_type::successor_list_type successor_list_type;
1130 #endif
1132 
1134  // input_queue_type is allocated here, but destroyed in the function_input_base.
1135  // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1136  // be done in one place. This would be an interface-breaking change.
1137  template< typename Body >
1139  graph &g, size_t concurrency,
1142  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1143  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1144  }
1145 
1148  graph_node(src.my_graph),
1149  input_impl_type(src),
1150  fOutput_type() {
1151  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1152  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1153  }
1154 
1155 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1156  void set_name( const char *name ) __TBB_override {
1158  }
1159 #endif
1160 
1161 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1162  void extract( ) __TBB_override {
1163  my_predecessors.built_predecessors().receiver_extract(*this);
1164  successors().built_successors().sender_extract(*this);
1165  }
1166 #endif
1167 
1168 protected:
1169  template< typename R, typename B > friend class run_and_put_task;
1170  template<typename X, typename Y> friend class internal::broadcast_cache;
1171  template<typename X, typename Y> friend class internal::round_robin_cache;
1173 
1175 
1178  // TODO: use clear() instead.
1179  if(f & rf_clear_edges) {
1180  successors().clear();
1182  }
1183  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1184  __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1185  }
1186 
1187 }; // class function_node
1188 
1190 // Output is a tuple of output types.
1191 template < typename Input, typename Output, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1193  public graph_node,
1195  <
1196  Input,
1197  typename internal::wrap_tuple_elements<
1198  tbb::flow::tuple_size<Output>::value, // #elements in tuple
1199  internal::multifunction_output, // wrap this around each element
1200  Output // the tuple providing the types
1201  >::type,
1202  Policy,
1203  Allocator
1204  > {
1205 protected:
1207 public:
1208  typedef Input input_type;
1213 private:
1216 public:
1217  template<typename Body>
1219  graph &g, size_t concurrency,
1221  ) : graph_node(g), base_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1222  tbb::internal::fgt_multioutput_node_with_body<N>(
1223  tbb::internal::FLOW_MULTIFUNCTION_NODE,
1224  &this->my_graph, static_cast<receiver<input_type> *>(this),
1225  this->output_ports(), this->my_body
1226  );
1227  }
1228 
1230  graph_node(other.my_graph), base_type(other) {
1231  tbb::internal::fgt_multioutput_node_with_body<N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1232  &this->my_graph, static_cast<receiver<input_type> *>(this),
1233  this->output_ports(), this->my_body );
1234  }
1235 
1236 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1237  void set_name( const char *name ) __TBB_override {
1239  }
1240 #endif
1241 
1242 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1243  void extract( ) __TBB_override {
1244  my_predecessors.built_predecessors().receiver_extract(*this);
1245  base_type::extract();
1246  }
1247 #endif
1248  // all the guts are in multifunction_input...
1249 protected:
1251 }; // multifunction_node
1252 
1254 // successors. The node has unlimited concurrency, so it does not reject inputs.
1255 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1256 class split_node : public graph_node, public receiver<TupleType> {
1259 public:
1260  typedef TupleType input_type;
1261  typedef Allocator allocator_type;
1262 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1264  typedef typename base_type::predecessor_list_type predecessor_list_type;
1266  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1267 #endif
1269  typedef typename internal::wrap_tuple_elements<
1270  N, // #elements in tuple
1271  internal::multifunction_output, // wrap this around each element
1272  TupleType // the tuple providing the types
1274 
1275  explicit split_node(graph &g) : graph_node(g)
1276  {
1277  tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1278  static_cast<receiver<input_type> *>(this), this->output_ports());
1279  }
1280  split_node( const split_node & other) : graph_node(other.my_graph), base_type(other)
1281  {
1282  tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1283  static_cast<receiver<input_type> *>(this), this->output_ports());
1284  }
1286 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1287  void set_name( const char *name ) __TBB_override {
1289  }
1290 #endif
1291 
1294 protected:
1295  task *try_put_task(const TupleType& t) __TBB_override {
1296  // Sending split messages in parallel is not justified, as overheads would prevail.
1297  // Also, we do not have successors here. So we just tell the task returned here is successful.
1299  }
1301  if (f & rf_clear_edges)
1303 
1305  }
1308  return my_graph;
1309  }
1310 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1311 private:
1312  void extract() __TBB_override {}
1313 
1315  void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1316 
1318  void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1319 
1320  size_t predecessor_count() __TBB_override { return 0; }
1321 
1322  void copy_predecessors(predecessor_list_type&) __TBB_override {}
1323 
1324  built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1325 
1327  built_predecessors_type my_predessors;
1328 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1329 
1330 private:
1332 };
1333 
1335 template <typename Output, typename Policy = internal::Policy<void> >
1336 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1337  public internal::function_output<Output> {
1338 public:
1340  typedef Output output_type;
1345 
1347  template <typename Body >
1349  graph &g,
1351  ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ) {
1352  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1353  static_cast<receiver<input_type> *>(this),
1354  static_cast<sender<output_type> *>(this), this->my_body );
1355  }
1356 
1358  template <typename Body >
1360  graph &g, int number_of_predecessors,
1362  ) : graph_node(g)
1363  , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1364  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1365  static_cast<receiver<input_type> *>(this),
1366  static_cast<sender<output_type> *>(this), this->my_body );
1367  }
1368 
1371  graph_node(src.my_graph), input_impl_type(src),
1372  internal::function_output<Output>() {
1373  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1374  static_cast<receiver<input_type> *>(this),
1375  static_cast<sender<output_type> *>(this), this->my_body );
1376  }
1377 
1378 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1379  void set_name( const char *name ) __TBB_override {
1381  }
1382 #endif
1383 
1384 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1385  void extract() __TBB_override {
1386  input_impl_type::my_built_predecessors.receiver_extract(*this);
1387  successors().built_successors().sender_extract(*this);
1388  }
1389 #endif
1390 
1391 protected:
1392  template< typename R, typename B > friend class run_and_put_task;
1393  template<typename X, typename Y> friend class internal::broadcast_cache;
1394  template<typename X, typename Y> friend class internal::round_robin_cache;
1397 
1400  if(f & rf_clear_edges)successors().clear();
1401  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1402  }
1403 }; // continue_node
1404 
1406 template <typename T>
1407 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1408 public:
1409  typedef T input_type;
1410  typedef T output_type;
1413 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1414  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1415  typedef typename sender<output_type>::successor_list_type successor_list_type;
1416 #endif
1417 private:
1419 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1420  internal::edge_container<predecessor_type> my_built_predecessors;
1421  spin_mutex pred_mutex; // serialize accesses on edge_container
1422 #endif
1423 public:
1424 
1425  explicit broadcast_node(graph& g) : graph_node(g) {
1426  my_successors.set_owner( this );
1427  tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1428  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1429  }
1430 
1431  // Copy constructor
1433  graph_node(src.my_graph), receiver<T>(), sender<T>()
1434  {
1435  my_successors.set_owner( this );
1436  tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1437  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1438  }
1439 
1440 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1441  void set_name( const char *name ) __TBB_override {
1443  }
1444 #endif
1445 
1449  return true;
1450  }
1451 
1455  return true;
1456  }
1457 
1458 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1459  typedef typename sender<T>::built_successors_type built_successors_type;
1460 
1461  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1462 
1463  void internal_add_built_successor(successor_type &r) __TBB_override {
1464  my_successors.internal_add_built_successor(r);
1465  }
1467  void internal_delete_built_successor(successor_type &r) __TBB_override {
1468  my_successors.internal_delete_built_successor(r);
1469  }
1470 
1471  size_t successor_count() __TBB_override {
1472  return my_successors.successor_count();
1473  }
1475  void copy_successors(successor_list_type &v) __TBB_override {
1476  my_successors.copy_successors(v);
1477  }
1478 
1479  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1480 
1481  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1482 
1483  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1484  spin_mutex::scoped_lock l(pred_mutex);
1485  my_built_predecessors.add_edge(p);
1486  }
1487 
1488  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1489  spin_mutex::scoped_lock l(pred_mutex);
1490  my_built_predecessors.delete_edge(p);
1491  }
1493  size_t predecessor_count() __TBB_override {
1494  spin_mutex::scoped_lock l(pred_mutex);
1495  return my_built_predecessors.edge_count();
1496  }
1498  void copy_predecessors(predecessor_list_type &v) __TBB_override {
1499  spin_mutex::scoped_lock l(pred_mutex);
1500  my_built_predecessors.copy_edges(v);
1501  }
1502 
1503  void extract() __TBB_override {
1504  my_built_predecessors.receiver_extract(*this);
1505  my_successors.built_successors().sender_extract(*this);
1506  }
1507 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1509 protected:
1510  template< typename R, typename B > friend class run_and_put_task;
1511  template<typename X, typename Y> friend class internal::broadcast_cache;
1512  template<typename X, typename Y> friend class internal::round_robin_cache;
1515  task *new_task = my_successors.try_put_task(t);
1516  if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1517  return new_task;
1518  }
1521  return my_graph;
1522  }
1523 
1525 
1528  my_successors.clear();
1529 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1530  my_built_predecessors.clear();
1531 #endif
1532  }
1533  __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1534  }
1535 }; // broadcast_node
1536 
1538 template <typename T, typename A=cache_aligned_allocator<T> >
1539 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1540 public:
1541  typedef T input_type;
1542  typedef T output_type;
1546 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1547  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1548  typedef typename sender<output_type>::successor_list_type successor_list_type;
1549 #endif
1550 protected:
1551  typedef size_t size_type;
1553 
1554 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1555  internal::edge_container<predecessor_type> my_built_predecessors;
1556 #endif
1557 
1561 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1562  , add_blt_succ, del_blt_succ,
1563  add_blt_pred, del_blt_pred,
1564  blt_succ_cnt, blt_pred_cnt,
1565  blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
1566 #endif
1567  };
1568 
1569  // implements the aggregator_operation concept
1570  class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1571  public:
1572  char type;
1573 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1575  union {
1579  size_t cnt_val;
1580  successor_list_type *svec;
1581  predecessor_list_type *pvec;
1582  };
1583 #else
1584  T *elem;
1587 #endif
1588  buffer_operation(const T& e, op_type t) : type(char(t))
1589 
1590 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1591  , ltask(NULL), elem(const_cast<T*>(&e))
1592 #else
1593  , elem(const_cast<T*>(&e)) , ltask(NULL)
1594 #endif
1595  {}
1596  buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1597  };
1598 
1600  typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
1601  friend class internal::aggregating_functor<class_type, buffer_operation>;
1602  internal::aggregator< handler_type, buffer_operation> my_aggregator;
1604  virtual void handle_operations(buffer_operation *op_list) {
1605  handle_operations_impl(op_list, this);
1606  }
1608  template<typename derived_type>
1609  void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1610  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1612  buffer_operation *tmp = NULL;
1613  bool try_forwarding = false;
1614  while (op_list) {
1615  tmp = op_list;
1616  op_list = op_list->next;
1617  switch (tmp->type) {
1618  case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1619  case rem_succ: internal_rem_succ(tmp); break;
1620  case req_item: internal_pop(tmp); break;
1621  case res_item: internal_reserve(tmp); break;
1622  case rel_res: internal_release(tmp); try_forwarding = true; break;
1623  case con_res: internal_consume(tmp); try_forwarding = true; break;
1624  case put_item: try_forwarding = internal_push(tmp); break;
1626 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1627  // edge recording
1628  case add_blt_succ: internal_add_built_succ(tmp); break;
1629  case del_blt_succ: internal_del_built_succ(tmp); break;
1630  case add_blt_pred: internal_add_built_pred(tmp); break;
1631  case del_blt_pred: internal_del_built_pred(tmp); break;
1632  case blt_succ_cnt: internal_succ_cnt(tmp); break;
1633  case blt_pred_cnt: internal_pred_cnt(tmp); break;
1634  case blt_succ_cpy: internal_copy_succs(tmp); break;
1635  case blt_pred_cpy: internal_copy_preds(tmp); break;
1636 #endif
1637  }
1638  }
1639 
1640  derived->order();
1641 
1642  if (try_forwarding && !forwarder_busy) {
1643  if(internal::is_graph_active(this->my_graph)) {
1644  forwarder_busy = true;
1645  task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
1647  < buffer_node<input_type, A> >(*this);
1648  // tmp should point to the last item handled by the aggregator. This is the operation
1649  // the handling thread enqueued. So modifying that record will be okay.
1650  // workaround for icc bug
1651  tbb::task *z = tmp->ltask;
1652  graph &g = this->my_graph;
1653  tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
1654  }
1655  }
1656  } // handle_operations
1659  return op_data.ltask;
1660  }
1662  inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1663  task *ft = grab_forwarding_task(op_data);
1664  if(ft) {
1666  return true;
1667  }
1668  return false;
1669  }
1670 
1672  virtual task *forward_task() {
1673  buffer_operation op_data(try_fwd_task);
1674  task *last_task = NULL;
1675  do {
1676  op_data.status = internal::WAIT;
1677  op_data.ltask = NULL;
1678  my_aggregator.execute(&op_data);
1680  // workaround for icc bug
1681  tbb::task *xtask = op_data.ltask;
1682  graph& g = this->my_graph;
1683  last_task = combine_tasks(g, last_task, xtask);
1684  } while (op_data.status ==internal::SUCCEEDED);
1685  return last_task;
1686  }
1687 
1689  virtual void internal_reg_succ(buffer_operation *op) {
1690  my_successors.register_successor(*(op->r));
1692  }
1693 
1696  my_successors.remove_successor(*(op->r));
1698  }
1700 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1701  typedef typename sender<T>::built_successors_type built_successors_type;
1702 
1703  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1705  virtual void internal_add_built_succ(buffer_operation *op) {
1706  my_successors.internal_add_built_successor(*(op->r));
1708  }
1710  virtual void internal_del_built_succ(buffer_operation *op) {
1711  my_successors.internal_delete_built_successor(*(op->r));
1713  }
1714 
1715  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1717  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1718 
1719  virtual void internal_add_built_pred(buffer_operation *op) {
1720  my_built_predecessors.add_edge(*(op->p));
1722  }
1723 
1724  virtual void internal_del_built_pred(buffer_operation *op) {
1725  my_built_predecessors.delete_edge(*(op->p));
1727  }
1729  virtual void internal_succ_cnt(buffer_operation *op) {
1730  op->cnt_val = my_successors.successor_count();
1732  }
1733 
1734  virtual void internal_pred_cnt(buffer_operation *op) {
1735  op->cnt_val = my_built_predecessors.edge_count();
1737  }
1738 
1739  virtual void internal_copy_succs(buffer_operation *op) {
1740  my_successors.copy_successors(*(op->svec));
1742  }
1743 
1744  virtual void internal_copy_preds(buffer_operation *op) {
1745  my_built_predecessors.copy_edges(*(op->pvec));
1747  }
1749 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1751 private:
1752  void order() {}
1754  bool is_item_valid() {
1755  return this->my_item_valid(this->my_tail - 1);
1756  }
1757 
1758  void try_put_and_add_task(task*& last_task) {
1759  task *new_task = my_successors.try_put_task(this->back());
1760  if (new_task) {
1761  // workaround for icc bug
1762  graph& g = this->my_graph;
1763  last_task = combine_tasks(g, last_task, new_task);
1764  this->destroy_back();
1765  }
1766  }
1767 
1768 protected:
1771  internal_forward_task_impl(op, this);
1772  }
1773 
1774  template<typename derived_type>
1775  void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1776  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1777 
1778  if (this->my_reserved || !derived->is_item_valid()) {
1780  this->forwarder_busy = false;
1781  return;
1782  }
1783  // Try forwarding, giving each successor a chance
1784  task * last_task = NULL;
1785  size_type counter = my_successors.size();
1786  for (; counter > 0 && derived->is_item_valid(); --counter)
1787  derived->try_put_and_add_task(last_task);
1788 
1789  op->ltask = last_task; // return task
1790  if (last_task && !counter) {
1792  }
1793  else {
1796  }
1797  }
1798 
1799  virtual bool internal_push(buffer_operation *op) {
1800  this->push_back(*(op->elem));
1802  return true;
1803  }
1805  virtual void internal_pop(buffer_operation *op) {
1806  if(this->pop_back(*(op->elem))) {
1808  }
1809  else {
1811  }
1812  }
1815  if(this->reserve_front(*(op->elem))) {
1817  }
1818  else {
1820  }
1821  }
1824  this->consume_front();
1826  }
1829  this->release_front();
1831  }
1832 
1833 public:
1835  explicit buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
1836  forwarder_busy(false) {
1837  my_successors.set_owner(this);
1838  my_aggregator.initialize_handler(handler_type(this));
1839  tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1840  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1841  }
1845  internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
1846  forwarder_busy = false;
1847  my_successors.set_owner(this);
1848  my_aggregator.initialize_handler(handler_type(this));
1849  tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1850  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1851  }
1852 
1853 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1854  void set_name( const char *name ) __TBB_override {
1856  }
1857 #endif
1858 
1859  //
1860  // message sender implementation
1861  //
1862 
1866  buffer_operation op_data(reg_succ);
1867  op_data.r = &r;
1868  my_aggregator.execute(&op_data);
1870  return true;
1871  }
1872 
1873 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1874  void internal_add_built_successor( successor_type &r) __TBB_override {
1875  buffer_operation op_data(add_blt_succ);
1876  op_data.r = &r;
1877  my_aggregator.execute(&op_data);
1878  }
1880  void internal_delete_built_successor( successor_type &r) __TBB_override {
1881  buffer_operation op_data(del_blt_succ);
1882  op_data.r = &r;
1883  my_aggregator.execute(&op_data);
1884  }
1886  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1887  buffer_operation op_data(add_blt_pred);
1888  op_data.p = &p;
1889  my_aggregator.execute(&op_data);
1890  }
1891 
1892  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1893  buffer_operation op_data(del_blt_pred);
1894  op_data.p = &p;
1895  my_aggregator.execute(&op_data);
1896  }
1897 
1898  size_t predecessor_count() __TBB_override {
1899  buffer_operation op_data(blt_pred_cnt);
1900  my_aggregator.execute(&op_data);
1901  return op_data.cnt_val;
1902  }
1903 
1904  size_t successor_count() __TBB_override {
1905  buffer_operation op_data(blt_succ_cnt);
1906  my_aggregator.execute(&op_data);
1907  return op_data.cnt_val;
1908  }
1909 
1910  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
1911  buffer_operation op_data(blt_pred_cpy);
1912  op_data.pvec = &v;
1913  my_aggregator.execute(&op_data);
1914  }
1915 
1916  void copy_successors( successor_list_type &v ) __TBB_override {
1917  buffer_operation op_data(blt_succ_cpy);
1918  op_data.svec = &v;
1919  my_aggregator.execute(&op_data);
1920  }
1921 
1922 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1923 
1928  r.remove_predecessor(*this);
1929  buffer_operation op_data(rem_succ);
1930  op_data.r = &r;
1931  my_aggregator.execute(&op_data);
1932  // even though this operation does not cause a forward, if we are the handler, and
1933  // a forward is scheduled, we may be the first to reach this point after the aggregator,
1934  // and so should check for the task.
1936  return true;
1937  }
1940 
1942  bool try_get( T &v ) __TBB_override {
1943  buffer_operation op_data(req_item);
1944  op_data.elem = &v;
1945  my_aggregator.execute(&op_data);
1947  return (op_data.status==internal::SUCCEEDED);
1948  }
1954  buffer_operation op_data(res_item);
1955  op_data.elem = &v;
1956  my_aggregator.execute(&op_data);
1957  (void)enqueue_forwarding_task(op_data);
1958  return (op_data.status==internal::SUCCEEDED);
1959  }
1960 
1962 
1964  buffer_operation op_data(rel_res);
1965  my_aggregator.execute(&op_data);
1966  (void)enqueue_forwarding_task(op_data);
1967  return true;
1968  }
1969 
1971 
1973  buffer_operation op_data(con_res);
1974  my_aggregator.execute(&op_data);
1975  (void)enqueue_forwarding_task(op_data);
1976  return true;
1977  }
1978 
1979 protected:
1980 
1981  template< typename R, typename B > friend class run_and_put_task;
1982  template<typename X, typename Y> friend class internal::broadcast_cache;
1983  template<typename X, typename Y> friend class internal::round_robin_cache;
1986  buffer_operation op_data(t, put_item);
1987  my_aggregator.execute(&op_data);
1988  task *ft = grab_forwarding_task(op_data);
1989  // sequencer_nodes can return failure (if an item has been previously inserted)
1990  // We have to spawn the returned task if our own operation fails.
1991 
1992  if(ft && op_data.status ==internal::FAILED) {
1993  // we haven't succeeded queueing the item, but for some reason the
1994  // call returned a task (if another request resulted in a successful
1995  // forward this could happen.) Queue the task and reset the pointer.
1997  }
1998  else if(!ft && op_data.status ==internal::SUCCEEDED) {
1999  ft = SUCCESSFULLY_ENQUEUED;
2000  }
2001  return ft;
2002  }
2003 
2005  return my_graph;
2006  }
2007 
2009 
2010 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2011 public:
2012  void extract() __TBB_override {
2013  my_built_predecessors.receiver_extract(*this);
2014  my_successors.built_successors().sender_extract(*this);
2015  }
2016 #endif
2017 
2018 protected:
2021  // TODO: just clear structures
2022  if (f&rf_clear_edges) {
2023  my_successors.clear();
2024 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2025  my_built_predecessors.clear();
2026 #endif
2027  }
2028  forwarder_busy = false;
2029  }
2030 }; // buffer_node
2031 
2033 template <typename T, typename A=cache_aligned_allocator<T> >
2034 class queue_node : public buffer_node<T, A> {
2035 protected:
2040 
2041 private:
2042  template<typename, typename> friend class buffer_node;
2043 
2044  bool is_item_valid() {
2045  return this->my_item_valid(this->my_head);
2046  }
2047 
2048  void try_put_and_add_task(task*& last_task) {
2049  task *new_task = this->my_successors.try_put_task(this->front());
2050  if (new_task) {
2051  // workaround for icc bug
2052  graph& graph_ref = this->graph_reference();
2053  last_task = combine_tasks(graph_ref, last_task, new_task);
2054  this->destroy_front();
2055  }
2056  }
2057 
2058 protected:
2059  void internal_forward_task(queue_operation *op) __TBB_override {
2060  this->internal_forward_task_impl(op, this);
2061  }
2062 
2064  if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2066  }
2067  else {
2068  this->pop_front(*(op->elem));
2070  }
2071  }
2073  if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2075  }
2076  else {
2077  this->reserve_front(*(op->elem));
2079  }
2080  }
2082  this->consume_front();
2084  }
2085 
2086 public:
2087  typedef T input_type;
2088  typedef T output_type;
2091 
2093  explicit queue_node( graph &g ) : base_type(g) {
2094  tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2095  static_cast<receiver<input_type> *>(this),
2096  static_cast<sender<output_type> *>(this) );
2097  }
2098 
2100  queue_node( const queue_node& src) : base_type(src) {
2101  tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2102  static_cast<receiver<input_type> *>(this),
2103  static_cast<sender<output_type> *>(this) );
2104  }
2105 
2106 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2107  void set_name( const char *name ) __TBB_override {
2109  }
2110 #endif
2111 
2112 protected:
2115  }
2116 }; // queue_node
2117 
2119 template< typename T, typename A=cache_aligned_allocator<T> >
2120 class sequencer_node : public queue_node<T, A> {
2122  // my_sequencer should be a benign function and must be callable
2123  // from a parallel context. Does this mean it needn't be reset?
2124 public:
2125  typedef T input_type;
2126  typedef T output_type;
2129 
2131  template< typename Sequencer >
2132  sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2133  my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2134  tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2135  static_cast<receiver<input_type> *>(this),
2136  static_cast<sender<output_type> *>(this) );
2137  }
2138 
2140  sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2141  my_sequencer( src.my_sequencer->clone() ) {
2142  tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2143  static_cast<receiver<input_type> *>(this),
2144  static_cast<sender<output_type> *>(this) );
2145  }
2146 
2149 
2150 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2151  void set_name( const char *name ) __TBB_override {
2153  }
2154 #endif
2155 
2156 protected:
2159 
2160 private:
2162  size_type tag = (*my_sequencer)(*(op->elem));
2163 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2164  if (tag < this->my_head) {
2165  // have already emitted a message with this tag
2167  return false;
2168  }
2169 #endif
2170  // cannot modify this->my_tail now; the buffer would be inconsistent.
2171  size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2172 
2173  if (this->size(new_tail) > this->capacity()) {
2174  this->grow_my_array(this->size(new_tail));
2175  }
2176  this->my_tail = new_tail;
2177 
2178  const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2179  __TBB_store_with_release(op->status, res);
2180  return res ==internal::SUCCEEDED;
2181  }
2182 }; // sequencer_node
2183 
2185 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2186 class priority_queue_node : public buffer_node<T, A> {
2187 public:
2188  typedef T input_type;
2189  typedef T output_type;
2194 
2196  explicit priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {
2197  tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2198  static_cast<receiver<input_type> *>(this),
2199  static_cast<sender<output_type> *>(this) );
2200  }
2201 
2203  priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2204  tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2205  static_cast<receiver<input_type> *>(this),
2206  static_cast<sender<output_type> *>(this) );
2207  }
2208 
2209 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2210  void set_name( const char *name ) __TBB_override {
2212  }
2213 #endif
2214 
2215 protected:
2216 
2218  mark = 0;
2220  }
2221 
2225 
2228  this->internal_forward_task_impl(op, this);
2229  }
2230 
2232  this->handle_operations_impl(op_list, this);
2233  }
2234 
2236  prio_push(*(op->elem));
2238  return true;
2239  }
2240 
2242  // if empty or already reserved, don't pop
2243  if ( this->my_reserved == true || this->my_tail == 0 ) {
2245  return;
2246  }
2247 
2248  *(op->elem) = prio();
2250  prio_pop();
2251 
2252  }
2253 
2254  // pops the highest-priority item, saves copy
2256  if (this->my_reserved == true || this->my_tail == 0) {
2258  return;
2259  }
2260  this->my_reserved = true;
2261  *(op->elem) = prio();
2262  reserved_item = *(op->elem);
2264  prio_pop();
2265  }
2266 
2269  this->my_reserved = false;
2271  }
2272 
2276  this->my_reserved = false;
2278  }
2279 
2280 private:
2281  template<typename, typename> friend class buffer_node;
2282 
2283  void order() {
2284  if (mark < this->my_tail) heapify();
2285  __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2286  }
2287 
2288  bool is_item_valid() {
2289  return this->my_tail > 0;
2290  }
2291 
2292  void try_put_and_add_task(task*& last_task) {
2293  task * new_task = this->my_successors.try_put_task(this->prio());
2294  if (new_task) {
2295  // workaround for icc bug
2296  graph& graph_ref = this->graph_reference();
2297  last_task = combine_tasks(graph_ref, last_task, new_task);
2298  prio_pop();
2299  }
2300  }
2301 
2302 private:
2303  Compare compare;
2305 
2307 
2308  // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2309  bool prio_use_tail() {
2310  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2311  return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2312  }
2313 
2314  // prio_push: checks that the item will fit, expand array if necessary, put at end
2315  void prio_push(const T &src) {
2316  if ( this->my_tail >= this->my_array_size )
2317  this->grow_my_array( this->my_tail + 1 );
2318  (void) this->place_item(this->my_tail, src);
2319  ++(this->my_tail);
2320  __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2321  }
2322 
2323  // prio_pop: deletes highest priority item from the array, and if it is item
2324  // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2325  // and mark. Assumes the array has already been tested for emptiness; no failure.
2326  void prio_pop() {
2327  if (prio_use_tail()) {
2328  // there are newly pushed elements; last one higher than top
2329  // copy the data
2330  this->destroy_item(this->my_tail-1);
2331  --(this->my_tail);
2332  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2333  return;
2334  }
2335  this->destroy_item(0);
2336  if(this->my_tail > 1) {
2337  // push the last element down heap
2338  __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2339  this->move_item(0,this->my_tail - 1);
2340  }
2341  --(this->my_tail);
2342  if(mark > this->my_tail) --mark;
2343  if (this->my_tail > 1) // don't reheap for heap of size 1
2344  reheap();
2345  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2346  }
2347 
2348  const T& prio() {
2349  return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2350  }
2351 
2352  // turn array into heap
2353  void heapify() {
2354  if(this->my_tail == 0) {
2355  mark = 0;
2356  return;
2357  }
2358  if (!mark) mark = 1;
2359  for (; mark<this->my_tail; ++mark) { // for each unheaped element
2360  size_type cur_pos = mark;
2361  input_type to_place;
2362  this->fetch_item(mark,to_place);
2363  do { // push to_place up the heap
2364  size_type parent = (cur_pos-1)>>1;
2365  if (!compare(this->get_my_item(parent), to_place))
2366  break;
2367  this->move_item(cur_pos, parent);
2368  cur_pos = parent;
2369  } while( cur_pos );
2370  (void) this->place_item(cur_pos, to_place);
2371  }
2372  }
2373 
2374  // otherwise heapified array with new root element; rearrange to heap
2375  void reheap() {
2376  size_type cur_pos=0, child=1;
2377  while (child < mark) {
2378  size_type target = child;
2379  if (child+1<mark &&
2380  compare(this->get_my_item(child),
2381  this->get_my_item(child+1)))
2382  ++target;
2383  // target now has the higher priority child
2384  if (compare(this->get_my_item(target),
2385  this->get_my_item(cur_pos)))
2386  break;
2387  // swap
2388  this->swap_items(cur_pos, target);
2389  cur_pos = target;
2390  child = (cur_pos<<1)+1;
2391  }
2392  }
2393 }; // priority_queue_node
2394 
2396 
2399 template< typename T >
2400 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2401 public:
2402  typedef T input_type;
2403  typedef T output_type;
2406 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2407  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2408  typedef typename sender<output_type>::built_successors_type built_successors_type;
2409  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2410  typedef typename sender<output_type>::successor_list_type successor_list_type;
2411 #endif
2412  //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2413 
2414 private:
2416  size_t my_count; //number of successful puts
2417  size_t my_tries; //number of active put attempts
2422 
2424 
2425  // Let decrementer call decrement_counter()
2427 
2428  bool check_conditions() { // always called under lock
2429  return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2430  }
2431 
2432  // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
2434  input_type v;
2435  task *rval = NULL;
2436  bool reserved = false;
2437  {
2439  if ( check_conditions() )
2440  ++my_tries;
2441  else
2442  return NULL;
2443  }
2444 
2445  //SUCCESS
2446  // if we can reserve and can put, we consume the reservation
2447  // we increment the count and decrement the tries
2448  if ( (my_predecessors.try_reserve(v)) == true ){
2449  reserved=true;
2450  if ( (rval = my_successors.try_put_task(v)) != NULL ){
2451  {
2453  ++my_count;
2454  --my_tries;
2455  my_predecessors.try_consume();
2456  if ( check_conditions() ) {
2457  if ( internal::is_graph_active(this->my_graph) ) {
2458  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2461  }
2462  }
2463  }
2464  return rval;
2465  }
2466  }
2467  //FAILURE
2468  //if we can't reserve, we decrement the tries
2469  //if we can reserve but can't put, we decrement the tries and release the reservation
2470  {
2472  --my_tries;
2473  if (reserved) my_predecessors.try_release();
2474  if ( check_conditions() ) {
2475  if ( internal::is_graph_active(this->my_graph) ) {
2476  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2478  __TBB_ASSERT(!rval, "Have two tasks to handle");
2479  return rtask;
2480  }
2481  }
2482  return rval;
2483  }
2484  }
2485 
2486  void forward() {
2487  __TBB_ASSERT(false, "Should never be called");
2488  return;
2489  }
2490 
2492  {
2494  if(my_count) --my_count;
2495  }
2496  return forward_task();
2497  }
2498 
2499 public:
2502 
2504  limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
2505  graph_node(g), my_threshold(threshold), my_count(0), my_tries(0),
2506  init_decrement_predecessors(num_decrement_predecessors),
2507  decrement(num_decrement_predecessors)
2508  {
2509  my_predecessors.set_owner(this);
2510  my_successors.set_owner(this);
2511  decrement.set_owner(this);
2512  tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2513  static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2514  static_cast<sender<output_type> *>(this) );
2515  }
2516 
2518  limiter_node( const limiter_node& src ) :
2519  graph_node(src.my_graph), receiver<T>(), sender<T>(),
2523  {
2524  my_predecessors.set_owner(this);
2525  my_successors.set_owner(this);
2526  decrement.set_owner(this);
2527  tbb::internal::fgt_node( tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2528  static_cast<receiver<input_type> *>(this), static_cast<receiver<continue_msg> *>(&decrement),
2529  static_cast<sender<output_type> *>(this) );
2530  }
2531 
2532 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2533  void set_name( const char *name ) __TBB_override {
2535  }
2536 #endif
2537 
2541  bool was_empty = my_successors.empty();
2543  //spawn a forward task if this is the only successor
2544  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2545  if ( internal::is_graph_active(this->my_graph) ) {
2546  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2549  }
2550  }
2551  return true;
2552  }
2553 
2555 
2557  r.remove_predecessor(*this);
2559  return true;
2560  }
2561 
2562 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2563  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2564  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
2565 
2566  void internal_add_built_successor(successor_type &src) __TBB_override {
2567  my_successors.internal_add_built_successor(src);
2568  }
2569 
2570  void internal_delete_built_successor(successor_type &src) __TBB_override {
2571  my_successors.internal_delete_built_successor(src);
2572  }
2573 
2574  size_t successor_count() __TBB_override { return my_successors.successor_count(); }
2575 
2576  void copy_successors(successor_list_type &v) __TBB_override {
2577  my_successors.copy_successors(v);
2578  }
2579 
2580  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
2581  my_predecessors.internal_add_built_predecessor(src);
2582  }
2583 
2584  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
2585  my_predecessors.internal_delete_built_predecessor(src);
2586  }
2587 
2588  size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
2589 
2590  void copy_predecessors(predecessor_list_type &v) __TBB_override {
2591  my_predecessors.copy_predecessors(v);
2592  }
2593 
2594  void extract() __TBB_override {
2595  my_count = 0;
2596  my_successors.built_successors().sender_extract(*this);
2597  my_predecessors.built_predecessors().receiver_extract(*this);
2598  decrement.built_predecessors().receiver_extract(decrement);
2599  }
2600 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2601 
2605  my_predecessors.add( src );
2607  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2610  }
2611  return true;
2612  }
2613 
2616  my_predecessors.remove( src );
2617  return true;
2618  }
2619 
2620 protected:
2621 
2622  template< typename R, typename B > friend class run_and_put_task;
2623  template<typename X, typename Y> friend class internal::broadcast_cache;
2624  template<typename X, typename Y> friend class internal::round_robin_cache;
2627  {
2629  if ( my_count + my_tries >= my_threshold )
2630  return NULL;
2631  else
2632  ++my_tries;
2633  }
2634 
2635  task * rtask = my_successors.try_put_task(t);
2636 
2637  if ( !rtask ) { // try_put_task failed.
2639  --my_tries;
2641  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2643  }
2644  }
2645  else {
2647  ++my_count;
2648  --my_tries;
2649  }
2650  return rtask;
2651  }
2652 
2654  return my_graph;
2655  }
2656 
2658  __TBB_ASSERT(false,NULL); // should never be called
2659  }
2660 
2662  my_count = 0;
2663  if(f & rf_clear_edges) {
2664  my_predecessors.clear();
2665  my_successors.clear();
2666  }
2667  else
2668  {
2669  my_predecessors.reset( );
2670  }
2671  decrement.reset_receiver(f);
2672  }
2673 }; // limiter_node
2674 
2676 
2680 using internal::input_port;
2681 using internal::tag_value;
2682 
2683 template<typename OutputTuple, typename JP=queueing> class join_node;
2684 
2685 template<typename OutputTuple>
2686 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2687 private:
2690 public:
2691  typedef OutputTuple output_type;
2693  explicit join_node(graph &g) : unfolded_type(g) {
2694  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2695  this->input_ports(), static_cast< sender< output_type > *>(this) );
2696  }
2697  join_node(const join_node &other) : unfolded_type(other) {
2698  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2699  this->input_ports(), static_cast< sender< output_type > *>(this) );
2700  }
2701 
2702 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2703  void set_name( const char *name ) __TBB_override {
2705  }
2706 #endif
2707 
2708 };
2709 
2710 template<typename OutputTuple>
2711 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2712 private:
2715 public:
2716  typedef OutputTuple output_type;
2718  explicit join_node(graph &g) : unfolded_type(g) {
2719  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2720  this->input_ports(), static_cast< sender< output_type > *>(this) );
2721  }
2722  join_node(const join_node &other) : unfolded_type(other) {
2723  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2724  this->input_ports(), static_cast< sender< output_type > *>(this) );
2725  }
2726 
2727 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2728  void set_name( const char *name ) __TBB_override {
2730  }
2731 #endif
2732 
2733 };
2734 
2735 // template for key_matching join_node
2736 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2737 template<typename OutputTuple, typename K, typename KHash>
2738 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2739  key_matching_port, OutputTuple, key_matching<K,KHash> > {
2740 private:
2743 public:
2744  typedef OutputTuple output_type;
2746 
2747 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2749 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2750 
2751  template<typename __TBB_B0, typename __TBB_B1>
2752  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2753  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2754  this->input_ports(), static_cast< sender< output_type > *>(this) );
2755  }
2756  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2757  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2758  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2759  this->input_ports(), static_cast< sender< output_type > *>(this) );
2760  }
2761  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2762  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2763  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2764  this->input_ports(), static_cast< sender< output_type > *>(this) );
2765  }
2766  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2767  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2768  unfolded_type(g, b0, b1, b2, b3, b4) {
2769  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2770  this->input_ports(), static_cast< sender< output_type > *>(this) );
2771  }
2772 #if __TBB_VARIADIC_MAX >= 6
2773  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2774  typename __TBB_B5>
2775  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2776  unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2777  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2778  this->input_ports(), static_cast< sender< output_type > *>(this) );
2779  }
2780 #endif
2781 #if __TBB_VARIADIC_MAX >= 7
2782  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2783  typename __TBB_B5, typename __TBB_B6>
2784  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2785  unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2786  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2787  this->input_ports(), static_cast< sender< output_type > *>(this) );
2788  }
2789 #endif
2790 #if __TBB_VARIADIC_MAX >= 8
2791  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2792  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2793  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2794  __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2795  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2796  this->input_ports(), static_cast< sender< output_type > *>(this) );
2797  }
2798 #endif
2799 #if __TBB_VARIADIC_MAX >= 9
2800  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2801  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2802  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2803  __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2804  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2805  this->input_ports(), static_cast< sender< output_type > *>(this) );
2806  }
2807 #endif
2808 #if __TBB_VARIADIC_MAX >= 10
2809  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2810  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2811  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2812  __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2813  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2814  this->input_ports(), static_cast< sender< output_type > *>(this) );
2815  }
2816 #endif
2817  join_node(const join_node &other) : unfolded_type(other) {
2818  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2819  this->input_ports(), static_cast< sender< output_type > *>(this) );
2820  }
2821 
2822 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2823  void set_name( const char *name ) __TBB_override {
2825  }
2826 #endif
2827 
2828 };
2829 
2830 // indexer node
2832 
2833 // TODO: Implement interface with variadic template or tuple
2834 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2835  typename T4=null_type, typename T5=null_type, typename T6=null_type,
2836  typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2837 
2838 //indexer node specializations
2839 template<typename T0>
2840 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
2841 private:
2842  static const int N = 1;
2843 public:
2844  typedef tuple<T0> InputTuple;
2848  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2849  this->input_ports(), static_cast< sender< output_type > *>(this) );
2850  }
2851  // Copy constructor
2852  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2853  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2854  this->input_ports(), static_cast< sender< output_type > *>(this) );
2855  }
2856 
2857 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2858  void set_name( const char *name ) __TBB_override {
2860  }
2861 #endif
2862 };
2863 
2864 template<typename T0, typename T1>
2865 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
2866 private:
2867  static const int N = 2;
2868 public:
2869  typedef tuple<T0, T1> InputTuple;
2873  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2874  this->input_ports(), static_cast< sender< output_type > *>(this) );
2875  }
2876  // Copy constructor
2877  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2878  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2879  this->input_ports(), static_cast< sender< output_type > *>(this) );
2880  }
2881 
2882 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2883  void set_name( const char *name ) __TBB_override {
2885  }
2886 #endif
2887 };
2888 
2889 template<typename T0, typename T1, typename T2>
2890 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
2891 private:
2892  static const int N = 3;
2893 public:
2894  typedef tuple<T0, T1, T2> InputTuple;
2898  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2899  this->input_ports(), static_cast< sender< output_type > *>(this) );
2900  }
2901  // Copy constructor
2902  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2903  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2904  this->input_ports(), static_cast< sender< output_type > *>(this) );
2905  }
2906 
2907 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2908  void set_name( const char *name ) __TBB_override {
2910  }
2911 #endif
2912 };
2913 
2914 template<typename T0, typename T1, typename T2, typename T3>
2915 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
2916 private:
2917  static const int N = 4;
2918 public:
2919  typedef tuple<T0, T1, T2, T3> InputTuple;
2923  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2924  this->input_ports(), static_cast< sender< output_type > *>(this) );
2925  }
2926  // Copy constructor
2927  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2928  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2929  this->input_ports(), static_cast< sender< output_type > *>(this) );
2930  }
2931 
2932 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2933  void set_name( const char *name ) __TBB_override {
2935  }
2936 #endif
2937 };
2938 
2939 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2940 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
2941 private:
2942  static const int N = 5;
2943 public:
2944  typedef tuple<T0, T1, T2, T3, T4> InputTuple;
2948  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2949  this->input_ports(), static_cast< sender< output_type > *>(this) );
2950  }
2951  // Copy constructor
2952  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2953  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2954  this->input_ports(), static_cast< sender< output_type > *>(this) );
2955  }
2956 
2957 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2958  void set_name( const char *name ) __TBB_override {
2960  }
2961 #endif
2962 };
2963 
2964 #if __TBB_VARIADIC_MAX >= 6
2965 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2966 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
2967 private:
2968  static const int N = 6;
2969 public:
2970  typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2974  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2975  this->input_ports(), static_cast< sender< output_type > *>(this) );
2976  }
2977  // Copy constructor
2978  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2979  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2980  this->input_ports(), static_cast< sender< output_type > *>(this) );
2981  }
2982 
2983 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2984  void set_name( const char *name ) __TBB_override {
2986  }
2987 #endif
2988 };
2989 #endif //variadic max 6
2990 
2991 #if __TBB_VARIADIC_MAX >= 7
2992 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2993  typename T6>
2994 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
2995 private:
2996  static const int N = 7;
2997 public:
2998  typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3002  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3003  this->input_ports(), static_cast< sender< output_type > *>(this) );
3004  }
3005  // Copy constructor
3006  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3007  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3008  this->input_ports(), static_cast< sender< output_type > *>(this) );
3009  }
3010 
3011 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3012  void set_name( const char *name ) __TBB_override {
3014  }
3015 #endif
3016 };
3017 #endif //variadic max 7
3018 
3019 #if __TBB_VARIADIC_MAX >= 8
3020 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3021  typename T6, typename T7>
3022 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3023 private:
3024  static const int N = 8;
3025 public:
3026  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3030  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3031  this->input_ports(), static_cast< sender< output_type > *>(this) );
3032  }
3033  // Copy constructor
3034  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3035  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3036  this->input_ports(), static_cast< sender< output_type > *>(this) );
3037  }
3038 
3039 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3040  void set_name( const char *name ) __TBB_override {
3042  }
3043 #endif
3044 };
3045 #endif //variadic max 8
3046 
3047 #if __TBB_VARIADIC_MAX >= 9
3048 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3049  typename T6, typename T7, typename T8>
3050 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3051 private:
3052  static const int N = 9;
3053 public:
3054  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3058  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3059  this->input_ports(), static_cast< sender< output_type > *>(this) );
3060  }
3061  // Copy constructor
3062  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3063  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3064  this->input_ports(), static_cast< sender< output_type > *>(this) );
3065  }
3066 
3067 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3068  void set_name( const char *name ) __TBB_override {
3070  }
3071 #endif
3072 };
3073 #endif //variadic max 9
3074 
3075 #if __TBB_VARIADIC_MAX >= 10
3076 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3077  typename T6, typename T7, typename T8, typename T9>
3078 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3079 private:
3080  static const int N = 10;
3081 public:
3082  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3086  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3087  this->input_ports(), static_cast< sender< output_type > *>(this) );
3088  }
3089  // Copy constructor
3090  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3091  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3092  this->input_ports(), static_cast< sender< output_type > *>(this) );
3093  }
3094 
3095 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3096  void set_name( const char *name ) __TBB_override {
3098  }
3099 #endif
3100 };
3101 #endif //variadic max 10
3102 
3103 #if __TBB_PREVIEW_ASYNC_MSG
3105 #else
3106 template< typename T >
3107 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3108 #endif
3109 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3110  s.internal_add_built_predecessor(p);
3111  p.internal_add_built_successor(s);
3112 #endif
3113  p.register_successor( s );
3115 }
3116 
3118 template< typename T >
3119 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3120  internal_make_edge( p, s );
3121 }
3122 
3123 #if __TBB_PREVIEW_ASYNC_MSG
3124 template< typename TS, typename TR,
3127 inline void make_edge( TS &p, TR &s ) {
3128  internal_make_edge( p, s );
3129 }
3130 
3131 template< typename T >
3133  internal_make_edge( p, s );
3134 }
3135 
3136 template< typename T >
3138  internal_make_edge( p, s );
3139 }
3140 
3141 #endif // __TBB_PREVIEW_ASYNC_MSG
3142 
3143 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3144 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3145 template< typename T, typename V,
3146  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3147 inline void make_edge( T& output, V& input) {
3148  make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3149 }
3150 
3151 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3152 template< typename T, typename R,
3153  typename = typename T::output_ports_type >
3154 inline void make_edge( T& output, receiver<R>& input) {
3155  make_edge(get<0>(output.output_ports()), input);
3156 }
3157 
3158 //Makes an edge from a sender to port 0 of a multi-input successor.
3159 template< typename S, typename V,
3160  typename = typename V::input_ports_type >
3161 inline void make_edge( sender<S>& output, V& input) {
3162  make_edge(output, get<0>(input.input_ports()));
3163 }
3164 #endif
3165 
3166 #if __TBB_PREVIEW_ASYNC_MSG
3168 #else
3169 template< typename T >
3170 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3171 #endif
3172  p.remove_successor( s );
3173 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3174  // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3175  p.internal_delete_built_successor(s);
3176  s.internal_delete_built_predecessor(p);
3177 #endif
3179 }
3180 
3182 template< typename T >
3183 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3184  internal_remove_edge( p, s );
3185 }
3186 
3187 #if __TBB_PREVIEW_ASYNC_MSG
3188 template< typename TS, typename TR,
3191 inline void remove_edge( TS &p, TR &s ) {
3192  internal_remove_edge( p, s );
3193 }
3194 
3195 template< typename T >
3197  internal_remove_edge( p, s );
3198 }
3199 
3200 template< typename T >
3202  internal_remove_edge( p, s );
3203 }
3204 #endif // __TBB_PREVIEW_ASYNC_MSG
3205 
3206 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3207 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3208 template< typename T, typename V,
3209  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3210 inline void remove_edge( T& output, V& input) {
3211  remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3212 }
3213 
3214 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3215 template< typename T, typename R,
3216  typename = typename T::output_ports_type >
3217 inline void remove_edge( T& output, receiver<R>& input) {
3218  remove_edge(get<0>(output.output_ports()), input);
3219 }
3220 //Removes an edge between a sender and port 0 of a multi-input successor.
3221 template< typename S, typename V,
3222  typename = typename V::input_ports_type >
3223 inline void remove_edge( sender<S>& output, V& input) {
3224  remove_edge(output, get<0>(input.input_ports()));
3225 }
3226 #endif
3227 
3228 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3229 template<typename C >
3230 template< typename S >
3231 void internal::edge_container<C>::sender_extract( S &s ) {
3232  edge_list_type e = built_edges;
3233  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3234  remove_edge(s, **i);
3235  }
3236 }
3237 
3238 template<typename C >
3239 template< typename R >
3240 void internal::edge_container<C>::receiver_extract( R &r ) {
3241  edge_list_type e = built_edges;
3242  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3243  remove_edge(**i, r);
3244  }
3245 }
3246 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3247 
3249 template< typename Body, typename Node >
3250 Body copy_body( Node &n ) {
3251  return n.template copy_function_object<Body>();
3252 }
3253 
3254 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3255 
3256 //composite_node
3257 template< typename InputTuple, typename OutputTuple > class composite_node;
3258 
3259 template< typename... InputTypes, typename... OutputTypes>
3260 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3261 
3262 public:
3263  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3264  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3265 
3266 private:
3267  std::unique_ptr<input_ports_type> my_input_ports;
3268  std::unique_ptr<output_ports_type> my_output_ports;
3269 
3270  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3271  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3272 
3273 protected:
3275 
3276 public:
3277 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3278  composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3279  tbb::internal::fgt_multiinput_multioutput_node( tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3281  }
3282 #else
3284  tbb::internal::fgt_multiinput_multioutput_node( tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3285  }
3286 #endif
3287 
3288  template<typename T1, typename T2>
3289  void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
3290  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3291  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3292  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
3293  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
3294 
3297  }
3298 
3299  template< typename... NodeTypes >
3300  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3301 
3302  template< typename... NodeTypes >
3303  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3304 
3305 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3306  void set_name( const char *name ) __TBB_override {
3308  }
3309 #endif
3310 
3312  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3313  return *my_input_ports;
3314  }
3315 
3317  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3318  return *my_output_ports;
3319  }
3320 
3321 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3322  void extract() __TBB_override {
3323  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3324  }
3325 #endif
3326 }; // class composite_node
3327 
3328 //composite_node with only input ports
3329 template< typename... InputTypes>
3330 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
3331 public:
3332  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3333 
3334 private:
3335  std::unique_ptr<input_ports_type> my_input_ports;
3336  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3337 
3338 protected:
3340 
3341 public:
3342 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3343  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3344  tbb::internal::fgt_composite( this, &g );
3346  }
3347 #else
3349  tbb::internal::fgt_composite( this, &g );
3350  }
3351 #endif
3352 
3353  template<typename T>
3354  void set_external_ports(T&& input_ports_tuple) {
3355  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3356 
3357  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
3358 
3359  tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
3360  }
3361 
3362  template< typename... NodeTypes >
3363  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3364 
3365  template< typename... NodeTypes >
3366  void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3367 
3368 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3369  void set_name( const char *name ) __TBB_override {
3371  }
3372 #endif
3373 
3375  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3376  return *my_input_ports;
3377  }
3378 
3379 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3380  void extract() __TBB_override {
3381  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3382  }
3383 #endif
3384 
3385 }; // class composite_node
3386 
3387 //composite_nodes with only output_ports
3388 template<typename... OutputTypes>
3389 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
3390 public:
3391  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3392 
3393 private:
3394  std::unique_ptr<output_ports_type> my_output_ports;
3395  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3396 
3397 protected:
3399 
3400 public:
3401 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3402  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3403  tbb::internal::fgt_composite( this, &g );
3405  }
3406 #else
3408  tbb::internal::fgt_composite( this, &g );
3409  }
3410 #endif
3411 
3412  template<typename T>
3413  void set_external_ports(T&& output_ports_tuple) {
3414  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3415 
3416  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
3417 
3418  tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
3419  }
3420 
3421  template<typename... NodeTypes >
3422  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3423 
3424  template<typename... NodeTypes >
3425  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3426 
3427 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3428  void set_name( const char *name ) __TBB_override {
3430  }
3431 #endif
3432 
3434  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3435  return *my_output_ports;
3436  }
3437 
3438 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3439  void extract() __TBB_override {
3440  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3441  }
3442 #endif
3443 
3444 }; // class composite_node
3445 
3446 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
3447 
3448 namespace internal {
3449 
3450 template<typename Gateway>
3452 public:
3453  typedef Gateway gateway_type;
3454 
3455  async_body_base(gateway_type *gateway): my_gateway(gateway) { }
3456  void set_gateway(gateway_type *gateway) {
3457  my_gateway = gateway;
3458  }
3459 
3460 protected:
3462 };
3463 
3464 template<typename Input, typename Ports, typename Gateway, typename Body>
3465 class async_body: public async_body_base<Gateway> {
3466 public:
3468  typedef Gateway gateway_type;
3469 
3470  async_body(const Body &body, gateway_type *gateway)
3471  : base_type(gateway), my_body(body) { }
3472 
3473  void operator()( const Input &v, Ports & ) {
3474  my_body(v, *this->my_gateway);
3475  }
3476 
3477  Body get_body() { return my_body; }
3478 
3479 private:
3480  Body my_body;
3481 };
3482 
3483 }
3484 
3486 template < typename Input, typename Output,
3487  typename Policy = queueing_lightweight,
3488  typename Allocator=cache_aligned_allocator<Input> >
3489 class async_node : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output > {
3492 
3493 public:
3494  typedef Input input_type;
3495  typedef Output output_type;
3502 
3503 private:
3507  // TODO: pass value by copy since we do not want to block asynchronous thread.
3508  const Output *value;
3509  bool result;
3510  try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
3511  void operator()() {
3512  result = port->try_put(*value);
3513  }
3514  };
3515 
3516  class receiver_gateway_impl: public receiver_gateway<Output> {
3517  public:
3520  tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3522  }
3523 
3526  tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3527  }
3528 
3530  bool try_put(const Output &i) __TBB_override {
3531  return my_node->try_put_impl(i);
3532  }
3533 
3534  private:
3536  } my_gateway;
3537 
3538  //The substitute of 'this' for member construction, to prevent compiler warnings
3539  async_node* self() { return this; }
3540 
3542  bool try_put_impl(const Output &i) {
3543  internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
3545  try_put_functor tpf(port_0, i);
3547  tbb::internal::fgt_async_try_put_end(this, &port_0);
3548  return tpf.result;
3549  }
3550 
3551 public:
3552  template<typename Body>
3554  graph &g, size_t concurrency,
3556  ) : base_type(
3557  g, concurrency,
3558  internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
3559  (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
3560  tbb::internal::fgt_multioutput_node_with_body<1>(
3561  tbb::internal::FLOW_ASYNC_NODE,
3562  &this->my_graph, static_cast<receiver<input_type> *>(this),
3563  this->output_ports(), this->my_body
3564  );
3565  }
3566 
3567  async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
3568  static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3569  static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3570 
3571  tbb::internal::fgt_multioutput_node_with_body<1>( tbb::internal::FLOW_ASYNC_NODE,
3572  &this->my_graph, static_cast<receiver<input_type> *>(this),
3573  this->output_ports(), this->my_body );
3574  }
3575 
3577  return my_gateway;
3578  }
3579 
3580 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3581  void set_name( const char *name ) __TBB_override {
3583  }
3584 #endif
3585 
3586  // Define sender< Output >
3587 
3590  return internal::output_port<0>(*this).register_successor(r);
3591  }
3592 
3595  return internal::output_port<0>(*this).remove_successor(r);
3596  }
3597 
3598  template<typename Body>
3602  mfn_body_type &body_ref = *this->my_body;
3603  async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3604  return ab.get_body();
3605  }
3606 
3607 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3608  typedef typename internal::edge_container<successor_type> built_successors_type;
3610  typedef typename built_successors_type::edge_list_type successor_list_type;
3611  built_successors_type &built_successors() __TBB_override {
3612  return internal::output_port<0>(*this).built_successors();
3613  }
3614 
3615  void internal_add_built_successor( successor_type &r ) __TBB_override {
3616  internal::output_port<0>(*this).internal_add_built_successor(r);
3617  }
3618 
3619  void internal_delete_built_successor( successor_type &r ) __TBB_override {
3620  internal::output_port<0>(*this).internal_delete_built_successor(r);
3621  }
3622 
3623  void copy_successors( successor_list_type &l ) __TBB_override {
3624  internal::output_port<0>(*this).copy_successors(l);
3625  }
3626 
3627  size_t successor_count() __TBB_override {
3628  return internal::output_port<0>(*this).successor_count();
3629  }
3630 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3631 
3632 protected:
3633 
3636  }
3637 };
3638 
3639 #if __TBB_PREVIEW_STREAMING_NODE
3641 #endif // __TBB_PREVIEW_STREAMING_NODE
3642 
3643 } // interfaceX
3644 
3645 
3646 namespace interface10a {
3647 
3648 using namespace interface10;
3649 namespace internal = interface10::internal;
3650 
3651 template< typename T >
3652 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3653 public:
3654  typedef T input_type;
3655  typedef T output_type;
3658 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3659  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
3660  typedef typename sender<output_type>::built_successors_type built_successors_type;
3661  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
3662  typedef typename sender<output_type>::successor_list_type successor_list_type;
3663 #endif
3664 
3665  explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
3666  my_successors.set_owner( this );
3667  tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
3668  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3669  }
3670 
3673  graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
3674  {
3675  my_successors.set_owner( this );
3676  tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
3677  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3678  }
3679 
3681 
3682 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3683  void set_name( const char *name ) __TBB_override {
3685  }
3686 #endif
3687 
3689  spin_mutex::scoped_lock l( my_mutex );
3690  if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
3691  // We have a valid value that must be forwarded immediately.
3692  bool ret = s.try_put( my_buffer );
3693  if ( ret ) {
3694  // We add the successor that accepted our put
3695  my_successors.register_successor( s );
3696  } else {
3697  // In case of reservation a race between the moment of reservation and register_successor can appear,
3698  // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3699  // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3700  // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3701  task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
3702  register_predecessor_task( *this, s );
3703  internal::spawn_in_graph_arena( my_graph, *rtask );
3704  }
3705  } else {
3706  // No valid value yet, just add as successor
3707  my_successors.register_successor( s );
3708  }
3709  return true;
3710  }
3711 
3713  spin_mutex::scoped_lock l( my_mutex );
3714  my_successors.remove_successor(s);
3715  return true;
3716  }
3717 
3718 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3719  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
3720  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3721 
3722  void internal_add_built_successor( successor_type &s) __TBB_override {
3723  spin_mutex::scoped_lock l( my_mutex );
3724  my_successors.internal_add_built_successor(s);
3725  }
3726 
3727  void internal_delete_built_successor( successor_type &s) __TBB_override {
3728  spin_mutex::scoped_lock l( my_mutex );
3729  my_successors.internal_delete_built_successor(s);
3730  }
3731 
3732  size_t successor_count() __TBB_override {
3733  spin_mutex::scoped_lock l( my_mutex );
3734  return my_successors.successor_count();
3735  }
3736 
3737  void copy_successors(successor_list_type &v) __TBB_override {
3738  spin_mutex::scoped_lock l( my_mutex );
3739  my_successors.copy_successors(v);
3740  }
3741 
3742  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
3743  spin_mutex::scoped_lock l( my_mutex );
3744  my_built_predecessors.add_edge(p);
3745  }
3746 
3747  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
3748  spin_mutex::scoped_lock l( my_mutex );
3749  my_built_predecessors.delete_edge(p);
3750  }
3751 
3752  size_t predecessor_count() __TBB_override {
3753  spin_mutex::scoped_lock l( my_mutex );
3754  return my_built_predecessors.edge_count();
3755  }
3756 
3757  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
3758  spin_mutex::scoped_lock l( my_mutex );
3759  my_built_predecessors.copy_edges(v);
3760  }
3761 
3762  void extract() __TBB_override {
3763  my_buffer_is_valid = false;
3764  built_successors().sender_extract(*this);
3765  built_predecessors().receiver_extract(*this);
3766  }
3767 
3768 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3769 
3771  spin_mutex::scoped_lock l( my_mutex );
3772  if ( my_buffer_is_valid ) {
3773  v = my_buffer;
3774  return true;
3775  }
3776  return false;
3777  }
3778 
3781  return try_get(v);
3782  }
3783 
3785  bool try_release() __TBB_override { return true; }
3786 
3788  bool try_consume() __TBB_override { return true; }
3789 
3790  bool is_valid() {
3791  spin_mutex::scoped_lock l( my_mutex );
3792  return my_buffer_is_valid;
3793  }
3794 
3795  void clear() {
3796  spin_mutex::scoped_lock l( my_mutex );
3797  my_buffer_is_valid = false;
3798  }
3799 
3800 protected:
3801 
3802  template< typename R, typename B > friend class run_and_put_task;
3803  template<typename X, typename Y> friend class internal::broadcast_cache;
3804  template<typename X, typename Y> friend class internal::round_robin_cache;
3807  return try_put_task_impl(v);
3808  }
3809 
3811  my_buffer = v;
3812  my_buffer_is_valid = true;
3813  task * rtask = my_successors.try_put_task(v);
3814  if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3815  return rtask;
3816  }
3817 
3819  return my_graph;
3820  }
3821 
3824 
3826  o(owner), s(succ) {};
3827 
3829  if (!s.register_predecessor(o)) {
3830  o.register_successor(s);
3831  }
3832  return NULL;
3833  }
3834 
3837  };
3838 
3841 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3842  internal::edge_container<predecessor_type> my_built_predecessors;
3843 #endif
3847 
3849  my_buffer_is_valid = false;
3850  if (f&rf_clear_edges) {
3851  my_successors.clear();
3852  }
3853  }
3854 }; // overwrite_node
3855 
3856 template< typename T >
3857 class write_once_node : public overwrite_node<T> {
3858 public:
3859  typedef T input_type;
3860  typedef T output_type;
3864 
3866  explicit write_once_node(graph& g) : base_type(g) {
3867  tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3868  static_cast<receiver<input_type> *>(this),
3869  static_cast<sender<output_type> *>(this) );
3870  }
3871 
3874  tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3875  static_cast<receiver<input_type> *>(this),
3876  static_cast<sender<output_type> *>(this) );
3877  }
3878 
3879 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3880  void set_name( const char *name ) __TBB_override {
3882  }
3883 #endif
3884 
3885 protected:
3886  template< typename R, typename B > friend class run_and_put_task;
3887  template<typename X, typename Y> friend class internal::broadcast_cache;
3888  template<typename X, typename Y> friend class internal::round_robin_cache;
3890  spin_mutex::scoped_lock l( this->my_mutex );
3891  return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
3892  }
3893 };
3894 } // interfaceX
3895 
3900 
3901  using interface10::graph;
3902  using interface10::graph_node;
3903  using interface10::continue_msg;
3904 
3905  using interface10::source_node;
3906  using interface10::function_node;
3907  using interface10::multifunction_node;
3908  using interface10::split_node;
3910  using interface10::indexer_node;
3911  using interface10::internal::tagged_msg;
3914  using interface10::continue_node;
3915  using interface10a::overwrite_node;
3916  using interface10a::write_once_node;
3917  using interface10::broadcast_node;
3918  using interface10::buffer_node;
3919  using interface10::queue_node;
3920  using interface10::sequencer_node;
3921  using interface10::priority_queue_node;
3922  using interface10::limiter_node;
3923  using namespace interface10::internal::graph_policy_namespace;
3924  using interface10::join_node;
3926  using interface10::copy_body;
3927  using interface10::make_edge;
3930 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3931  using interface10::composite_node;
3932 #endif
3933  using interface10::async_node;
3934 #if __TBB_PREVIEW_ASYNC_MSG
3935  using interface10::async_msg;
3936 #endif
3937 #if __TBB_PREVIEW_STREAMING_NODE
3938  using interface10::port_ref;
3939  using interface10::streaming_node;
3940 #endif // __TBB_PREVIEW_STREAMING_NODE
3941 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3943  using internal::no_priority;
3944 #endif
3945 
3946 
3947 } // flow
3948 } // tbb
3949 
3950 #undef __TBB_PFG_RESET_ARG
3951 #undef __TBB_COMMA
3952 
3953 #endif // __TBB_flow_graph_H
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Definition: flow_graph.h:3250
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
Definition: flow_graph.h:1942
reference operator*() const
Dereference.
Definition: flow_graph.h:725
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
sequencer_node(const sequencer_node &src)
Copy constructor.
Definition: flow_graph.h:2140
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:3634
buffer_node< T, A >::buffer_operation prio_operation
Definition: flow_graph.h:2224
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2405
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
A task that calls a node's apply_body_bypass function with no input.
Definition: flow_graph.h:321
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0)
Constructor.
Definition: flow_graph.h:2504
void internal_consume(prio_operation *op) __TBB_override
Definition: flow_graph.h:2267
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
Definition: flow_graph.h:1453
Implements a function node that supports Input -> Output.
Definition: flow_graph.h:1118
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2089
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:987
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
Definition: flow_graph.h:2920
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:867
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:778
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
Definition: task.h:758
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:3805
tbb::flow::interface10::async_node::receiver_gateway_impl my_gateway
base_type::size_type size_type
Definition: flow_graph.h:2037
virtual void handle_operations(buffer_operation *op_list)
Definition: flow_graph.h:1604
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:3589
Enables one or the other code branches.
bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
Definition: flow_graph.h:594
async_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:3553
bool internal_push(prio_operation *op) __TBB_override
Definition: flow_graph.h:2235
Represents acquisition of a mutex.
Definition: spin_mutex.h:50
async_body_base< Gateway > base_type
Definition: flow_graph.h:3467
~graph()
Destroys the graph.
Definition: flow_graph.h:763
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
Definition: flow_graph.h:2811
priority_queue_node(const priority_queue_node &src)
Copy constructor.
Definition: flow_graph.h:2203
function_body that takes an Input and a set of output ports
Definition: flow_graph.h:193
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1543
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Definition: flow_graph.h:3082
An cache of predecessors that supports requests and reservations.
Definition: flow_graph.h:107
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1972
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2193
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2113
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
Definition: flow_graph.h:3840
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2019
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:3862
void operator()(const Input &v, Ports &)
Definition: flow_graph.h:3473
task * try_put_task(const input_type &) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:642
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
Definition: flow_graph.h:2539
graph & graph_reference() __TBB_override
Definition: flow_graph.h:3818
Forwards messages in arbitrary order.
Definition: flow_graph.h:1539
A cache of successors that are broadcast to.
Definition: flow_graph.h:104
broadcast_node(const broadcast_node &src)
Definition: flow_graph.h:1432
virtual void internal_forward_task(buffer_operation *op)
Tries to forward valid items to successors.
Definition: flow_graph.h:1770
Breaks an infinite loop between the node reservation and register_successor call.
Definition: flow_graph.h:3823
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:408
unsigned int node_priority_t
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2222
receiver_gateway< output_type > gateway_type
Definition: flow_graph.h:3499
void const char const char int ITT_FORMAT __itt_group_sync s
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:3846
bool try_get(X &t)
Request an item from the sender.
Definition: flow_graph.h:312
void prepare_task_arena(bool reinit=false)
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
Definition: flow_graph.h:450
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
Definition: flow_graph.h:1034
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2946
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
Definition: flow_graph.h:3770
size_type size(size_t new_tail=0)
Definition: flow_graph.h:153
task that does nothing. Useful for synchronization.
Definition: task.h:964
continue_msg input_type
The input type.
Definition: flow_graph.h:573
virtual void internal_consume(buffer_operation *op)
Definition: flow_graph.h:1823
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:358
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2227
async_body(const Body &body, gateway_type *gateway)
Definition: flow_graph.h:3470
void move_item(size_t to, size_t from)
Definition: flow_graph.h:99
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
Definition: flow_graph.h:2945
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1125
void handle_operations(prio_operation *op_list) __TBB_override
Definition: flow_graph.h:2231
Used to form groups of tasks.
Definition: task.h:332
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1963
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3000
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
Definition: flow_graph.h:2775
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1211
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:96
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3056
static void fgt_composite(void *, void *)
Implements methods for both executable and function nodes that puts Output to its successors.
Definition: flow_graph.h:779
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
Definition: flow_graph.h:1985
item_buffer with reservable front-end. NOTE: if reserving, do not
Definition: flow_graph.h:249
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1009
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
virtual void internal_reserve(buffer_operation *op)
Definition: flow_graph.h:1814
receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:576
interface10::internal::Policy< queueing, lightweight > queueing_lightweight
Definition: flow_graph.h:88
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
Definition: flow_graph.h:3524
receiver_type::predecessor_type predecessor_type
Definition: flow_graph.h:3497
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:3848
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
Definition: flow_graph.h:1507
A cache of predecessors that only supports try_get.
Definition: flow_graph.h:106
virtual bool try_reserve_wrapper(void *p, bool is_async)=0
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:864
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
Definition: flow_graph.h:1273
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:997
static void fgt_graph_desc(void *, const char *)
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Definition: flow_graph.h:2615
internal::round_robin_cache< T, null_rw_mutex > my_successors
Definition: flow_graph.h:1552
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
An abstract cache of successors.
Definition: flow_graph.h:103
Detects whether two given types are the same.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
Definition: flow_graph.h:2757
virtual void reset_receiver(reset_flags f=rf_reset_protocol)=0
put receiver back in initial state
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
Definition: flow_graph.h:1775
internal::continue_input< Output, Policy > input_impl_type
Definition: flow_graph.h:1341
void increment_ref_count()
Atomically increment reference count.
Definition: task.h:741
#define __TBB_override
Definition: tbb_stddef.h:240
Implements async node.
Definition: flow_graph.h:3489
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:668
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1544
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
Definition: flow_graph.h:2626
tbb::internal::uint64_t tag_value
Definition: flow_graph.h:29
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Definition: flow_graph.h:3825
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:921
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:2418
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2090
tbb::task * root_task()
Returns the root task of the graph.
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:970
split_node: accepts a tuple as input, forwards each element of the tuple to its
Definition: flow_graph.h:1256
Base class for user-defined tasks.
Definition: task.h:589
graph & graph_reference() __TBB_override
Definition: flow_graph.h:1520
internal::decrementer< limiter_node< T > > decrement
The internal receiver< continue_msg > that decrements the count.
Definition: flow_graph.h:2501
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
Definition: flow_graph.h:1514
internal::tagged_msg< size_t, T0, T1 > output_type
Definition: flow_graph.h:2870
Base class for receivers of completion messages.
Definition: flow_graph.h:569
virtual bool try_get_wrapper(void *p, bool is_async)=0
continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
Definition: flow_graph.h:579
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3712
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:861
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3863
The graph class.
buffer_node< T, A >::buffer_operation sequencer_operation
Definition: flow_graph.h:2158
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3594
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:1295
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1102
static void fgt_node_with_body(string_index, void *, void *, void *)
Implements methods for an executable node that takes continue_msg as input.
Definition: flow_graph.h:678
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:297
static void fgt_multiinput_multioutput_node(string_index, void *, void *)
static void fgt_end_body(void *)
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
iterator begin()
start iterator
Definition: flow_graph.h:831
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: flow_graph.h:203
static void fgt_async_commit(void *, void *)
void execute_in_graph_arena(graph &g, F &f)
Executes custom functor inside graph arena.
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:361
internal::multifunction_output< Output > output_port_type
Definition: flow_graph.h:3505
Implements methods for a function node that takes a type Input as input.
Definition: flow_graph.h:565
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
Definition: flow_graph.h:3519
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:2556
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2217
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
Definition: flow_graph.h:2793
The base of all graph nodes.
static void fgt_graph(void *)
static void fgt_async_try_put_end(void *, void *)
function_node(graph &g, size_t concurrency,)
Constructor.
Definition: flow_graph.h:1138
A lock that occupies a single byte.
Definition: spin_mutex.h:36
A generic null type.
Definition: flow_graph.h:89
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2292
iterator end()
end iterator
Definition: flow_graph.h:833
static const T & from_void_ptr(const void *p)
Definition: flow_graph.h:205
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
function_node(const function_node &src)
Copy constructor.
Definition: flow_graph.h:1147
Forwards messages in FIFO order.
Definition: flow_graph.h:2034
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1300
source_node(graph &g, Body body, bool is_active=true)
Constructor for a node with a successor.
Definition: flow_graph.h:879
bool place_item(size_t here, const item_type &me)
Definition: flow_graph.h:108
Forwards messages of type T to all successors.
Definition: flow_graph.h:1407
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
internal::aggregating_functor< class_type, buffer_operation > handler_type
Definition: flow_graph.h:1600
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:1758
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1953
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1396
buffer_node(const buffer_node &src)
Copy constructor.
Definition: flow_graph.h:1844
bool enqueue_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:1662
implements a function node that supports Input -> (set of outputs)
Definition: flow_graph.h:1192
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1250
receiver< TupleType > base_type
Definition: flow_graph.h:1258
static tbb::task *const SUCCESSFULLY_ENQUEUED
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:3656
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
Definition: flow_graph.h:2689
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
Definition: flow_graph.h:441
A task that calls a node's forward_task function.
Definition: flow_graph.h:271
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:722
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1056
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
Definition: flow_graph.h:2971
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:294
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2127
virtual void finalize() const
Definition: flow_graph.h:147
static void fgt_release_wait(void *)
void add_task_to_graph_reset_list(graph &g, tbb::task *tp)
output_ports_type & output_ports()
Definition: flow_graph.h:1292
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1054
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:3780
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1524
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2048
Input and scheduling for a function node that takes a type Input as input.
Definition: flow_graph.h:61
multifunction_node(const multifunction_node &other)
Definition: flow_graph.h:1229
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1123
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1124
const V & cast_to(T const &t)
Definition: flow_graph.h:701
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
Definition: flow_graph.h:1447
sequencer_node(graph &g, const Sequencer &s)
Constructor.
Definition: flow_graph.h:2132
T output_type
The output type of this sender.
Definition: flow_graph.h:397
Forward declaration section.
Definition: flow_graph.h:95
static void fgt_async_try_put_begin(void *, void *)
static const void * to_void_ptr(const T &t)
Definition: flow_graph.h:197
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
bool internal_push(sequencer_operation *op) __TBB_override
Definition: flow_graph.h:2161
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1174
virtual task * try_put_task_wrapper(const void *p, bool is_async)=0
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:741
Base class for tasks generated by graph nodes.
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:2420
virtual void internal_reg_succ(buffer_operation *op)
Register successor.
Definition: flow_graph.h:1689
Forwards messages in priority order.
Definition: flow_graph.h:2186
void remove_node(graph_node *n)
Definition: flow_graph.h:796
static void fgt_async_reserve(void *, void *)
internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:399
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3167
indexer_node(const indexer_node &other)
Definition: flow_graph.h:2852
internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:438
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2921
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2972
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:771
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:953
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
Definition: flow_graph.h:3490
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3119
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2896
queue_node(const queue_node &src)
Copy constructor.
Definition: flow_graph.h:2100
write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
Definition: flow_graph.h:3873
virtual bool internal_push(buffer_operation *op)
Definition: flow_graph.h:1799
continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1359
Implements an executable node that supports continue_msg -> Output.
Definition: flow_graph.h:1336
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
Definition: flow_graph.h:2603
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1176
void internal_pop(prio_operation *op) __TBB_override
Definition: flow_graph.h:2241
buffer_node< T, A >::item_type item_type
Definition: flow_graph.h:2223
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2008
static task * emit_this(graph &g, const T &t, P &p)
Definition: flow_graph.h:658
static const node_priority_t no_priority
static void fgt_remove_edge(void *, void *)
virtual void internal_release(buffer_operation *op)
Definition: flow_graph.h:1828
virtual task * execute()=0
Does whatever should happen when the threshold is reached.
Forwards messages only if the threshold has not been reached.
Definition: flow_graph.h:98
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
Definition: flow_graph.h:2784
static void fgt_reserve_wait(void *)
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
Definition: flow_graph.h:2742
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
Definition: flow_graph.h:2762
multifunction_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:1218
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Definition: flow_graph.h:3055
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
Definition: flow_graph.h:1505
internal::port_ref_impl< N1, N2 > port_ref()
Definition: flow_graph.h:42
An empty class used for messages that mean "I'm done".
Definition: flow_graph.h:92
untyped_receiver successor_type
The successor type for this node.
Definition: flow_graph.h:279
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
Definition: flow_graph.h:3083
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
Definition: flow_graph.h:3027
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3028
indexer_node(const indexer_node &other)
Definition: flow_graph.h:3090
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Definition: flow_graph.h:3828
Forwards messages in sequence order.
Definition: flow_graph.h:2120
internal::tagged_msg< size_t, T0, T1, T2 > output_type
Definition: flow_graph.h:2895
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3183
void fetch_item(size_t i, item_type &o)
Definition: flow_graph.h:90
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
Definition: flow_graph.h:2999
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
buffer_node(graph &g)
Constructor.
Definition: flow_graph.h:1835
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
Definition: flow_graph.h:2767
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1306
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
Definition: flow_graph.h:1695
bool try_reserve(X &t)
Reserves an item in the sender.
Definition: flow_graph.h:318
base_type::output_ports_type output_ports_type
Definition: flow_graph.h:3501
static void fgt_node(string_index, void *, void *)
virtual void internal_pop(buffer_operation *op)
Definition: flow_graph.h:1805
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2657
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2404
K key_from_message(const T &t)
Definition: flow_graph.h:691
static void fgt_begin_body(void *)
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
graph & graph_reference() __TBB_override
Definition: flow_graph.h:1307
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
Definition: flow_graph.h:2714
internal::function_input< input_type, output_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1122
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1126
task * try_put_task_impl(const input_type &v)
Definition: flow_graph.h:3810
try_put_functor(output_port_type &p, const Output &v)
Definition: flow_graph.h:3510
receiver< input_type > receiver_type
Definition: flow_graph.h:3496
bool try_consume() __TBB_override
Consumes the reserved item.
Definition: flow_graph.h:3788
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > base_type
Definition: flow_graph.h:1214
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:405
source_node(const source_node &src)
Copy constructor.
Definition: flow_graph.h:891
internal::broadcast_cache< input_type > my_successors
Definition: flow_graph.h:1418
void internal_forward_task(queue_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2059
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:418
bool remove_successor(successor_type &r) __TBB_override
Removes a successor.
Definition: flow_graph.h:1927
tbb::task_group_context * my_context
graph_iterator< const graph, const graph_node > const_iterator
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2192
void register_node(graph_node *n)
Definition: flow_graph.h:785
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2846
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1344
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial.
Definition: flow_graph.h:84
internal::tagged_msg< size_t, T0 > output_type
Definition: flow_graph.h:2845
bool try_put(const X &t)
Put an item to the receiver.
Definition: flow_graph.h:346
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2661
void add_nodes_impl(CompositeType *, bool)
Definition: flow_graph.h:879
T input_type
The input type of this receiver.
Definition: flow_graph.h:436
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:3688
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:839
void const char const char int ITT_FORMAT __itt_group_sync p
const_iterator cend() const
end const iterator
Definition: flow_graph.h:841
continue_node(const continue_node &src)
Copy constructor.
Definition: flow_graph.h:1370
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
async_node(const async_node &other)
Definition: flow_graph.h:3567
overwrite_node(const overwrite_node &src)
Copy constructor; doesn't take anything from src; default won't work.
Definition: flow_graph.h:3672
queue_node(graph &g)
Constructor.
Definition: flow_graph.h:2093
graph_iterator< graph, graph_node > iterator
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1094
static void fgt_node_desc(const NodeType *, const char *)
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
Definition: flow_graph.h:1210
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:3889
bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
Definition: flow_graph.h:604
base_type::buffer_operation queue_operation
Definition: flow_graph.h:2038
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2871
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1212
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2157
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3530
A cache of successors that are put in a round-robin fashion.
Definition: flow_graph.h:105
graph & graph_reference() __TBB_override
Definition: flow_graph.h:2004
Implements methods for a function node that takes a type Input as input and sends.
Definition: flow_graph.h:421
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
void internal_reserve(prio_operation *op) __TBB_override
Definition: flow_graph.h:2255
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1526
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:339
friend class scoped_lock
Definition: spin_mutex.h:176
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
Definition: flow_graph.h:1993
internal::async_body_base< gateway_type > async_body_base_type
Definition: flow_graph.h:3500
graph & graph_reference() __TBB_override
Definition: flow_graph.h:2653
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
Definition: flow_graph.h:3491
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
Definition: flow_graph.h:1865
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1411
void reset(reset_flags f=rf_reset_protocol)
Definition: flow_graph.h:808
void internal_reserve(queue_operation *op) __TBB_override
Definition: flow_graph.h:2072
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3657
virtual task * forward_task()
This is executed by an enqueued task, the "forwarder".
Definition: flow_graph.h:1672
void internal_consume(queue_operation *op) __TBB_override
Definition: flow_graph.h:2081
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void internal_release(prio_operation *op) __TBB_override
Definition: flow_graph.h:2273
const item_type & get_my_item(size_t i) const
Definition: flow_graph.h:74
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:167
static task * try_put_task_wrapper_impl(receiver< T > *const this_recv, const void *p, bool is_async)
Definition: flow_graph.h:213
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1412
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3542
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3084
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1342
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:402
continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1348
void set_ref_count(int count)
Set reference count.
Definition: task.h:731
void grow_my_array(size_t minimum_size)
Grows the internal array.
Definition: flow_graph.h:160
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
Definition: flow_graph.h:2802
limiter_node(const limiter_node &src)
Copy constructor.
Definition: flow_graph.h:2518
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3104
pointer operator->() const
Dereference.
Definition: flow_graph.h:731
task * grab_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:1658
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3498
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1398
internal::function_body< T, size_t > * my_sequencer
Definition: flow_graph.h:2121
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
Definition: flow_graph.h:644
bool is_continue_receiver() __TBB_override
Definition: flow_graph.h:683
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:633
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:912
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2128
continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:587
static void fgt_multioutput_node_desc(const NodeType *, const char *)
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1055
bool try_release() __TBB_override
Releases the reserved item.
Definition: flow_graph.h:3785
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:532
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1343
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1062
void internal_pop(queue_operation *op) __TBB_override
Definition: flow_graph.h:2063
internal::aggregator< handler_type, buffer_operation > my_aggregator
Definition: flow_graph.h:1602
void handle_operations_impl(buffer_operation *op_list, derived_type *derived)
Definition: flow_graph.h:1609
static void fgt_make_edge(void *, void *)

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.