Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_join_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_join_impl_H
18 #define __TBB__flow_graph_join_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 namespace internal {
25 
27  forwarding_base(graph &g) : graph_ref(g) {}
28  virtual ~forwarding_base() {}
29  // decrement_port_count may create a forwarding task. If we cannot handle the task
30  // ourselves, ask decrement_port_count to deal with it.
31  virtual task * decrement_port_count(bool handle_task) = 0;
32  virtual void increment_port_count() = 0;
33  // moved here so input ports can queue tasks
34  graph& graph_ref;
35  };
36 
37  // specialization that lets us keep a copy of the current_key for building results.
38  // KeyType can be a reference type.
39  template<typename KeyType>
43  virtual task * increment_key_count(current_key_type const & /*t*/, bool /*handle_task*/) = 0; // {return NULL;}
44  current_key_type current_key; // so ports can refer to FE's desired items
45  };
46 
47  template< int N >
48  struct join_helper {
49 
50  template< typename TupleType, typename PortType >
51  static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
52  tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
54  }
55  template< typename TupleType >
56  static inline void consume_reservations( TupleType &my_input ) {
57  tbb::flow::get<N-1>( my_input ).consume();
59  }
60 
61  template< typename TupleType >
62  static inline void release_my_reservation( TupleType &my_input ) {
63  tbb::flow::get<N-1>( my_input ).release();
64  }
65 
66  template <typename TupleType>
67  static inline void release_reservations( TupleType &my_input) {
69  release_my_reservation(my_input);
70  }
71 
72  template< typename InputTuple, typename OutputTuple >
73  static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
74  if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) ) return false;
75  if ( !join_helper<N-1>::reserve( my_input, out ) ) {
76  release_my_reservation( my_input );
77  return false;
78  }
79  return true;
80  }
81 
82  template<typename InputTuple, typename OutputTuple>
83  static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
84  bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) ); // may fail
85  return join_helper<N-1>::get_my_item(my_input, out) && res; // do get on other inputs before returning
86  }
87 
88  template<typename InputTuple, typename OutputTuple>
89  static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
90  return get_my_item(my_input, out);
91  }
92 
93  template<typename InputTuple>
94  static inline void reset_my_port(InputTuple &my_input) {
96  tbb::flow::get<N-1>(my_input).reset_port();
97  }
98 
99  template<typename InputTuple>
100  static inline void reset_ports(InputTuple& my_input) {
101  reset_my_port(my_input);
102  }
103 
104  template<typename InputTuple, typename KeyFuncTuple>
105  static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
106  tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
107  tbb::flow::get<N-1>(my_key_funcs) = NULL;
108  join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
109  }
110 
111  template< typename KeyFuncTuple>
112  static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
113  if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
114  tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
115  }
116  join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
117  }
118 
119  template<typename InputTuple>
120  static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
121  join_helper<N-1>::reset_inputs(my_input, f);
122  tbb::flow::get<N-1>(my_input).reset_receiver(f);
123  }
124 
125 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
126  template<typename InputTuple>
127  static inline void extract_inputs(InputTuple &my_input) {
129  tbb::flow::get<N-1>(my_input).extract_receiver();
130  }
131 #endif
132  }; // join_helper<N>
133 
134  template< >
135  struct join_helper<1> {
136 
137  template< typename TupleType, typename PortType >
138  static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
139  tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
140  }
141 
142  template< typename TupleType >
143  static inline void consume_reservations( TupleType &my_input ) {
144  tbb::flow::get<0>( my_input ).consume();
145  }
146 
147  template< typename TupleType >
148  static inline void release_my_reservation( TupleType &my_input ) {
149  tbb::flow::get<0>( my_input ).release();
150  }
151 
152  template<typename TupleType>
153  static inline void release_reservations( TupleType &my_input) {
154  release_my_reservation(my_input);
155  }
156 
157  template< typename InputTuple, typename OutputTuple >
158  static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
159  return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
160  }
161 
162  template<typename InputTuple, typename OutputTuple>
163  static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
164  return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
165  }
166 
167  template<typename InputTuple, typename OutputTuple>
168  static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
169  return get_my_item(my_input, out);
170  }
171 
172  template<typename InputTuple>
173  static inline void reset_my_port(InputTuple &my_input) {
174  tbb::flow::get<0>(my_input).reset_port();
175  }
176 
177  template<typename InputTuple>
178  static inline void reset_ports(InputTuple& my_input) {
179  reset_my_port(my_input);
180  }
181 
182  template<typename InputTuple, typename KeyFuncTuple>
183  static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
184  tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
185  tbb::flow::get<0>(my_key_funcs) = NULL;
186  }
187 
188  template< typename KeyFuncTuple>
189  static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
190  if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
191  tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
192  }
193  }
194  template<typename InputTuple>
195  static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
196  tbb::flow::get<0>(my_input).reset_receiver(f);
197  }
198 
199 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
200  template<typename InputTuple>
201  static inline void extract_inputs(InputTuple &my_input) {
202  tbb::flow::get<0>(my_input).extract_receiver();
203  }
204 #endif
205  }; // join_helper<1>
206 
208  template< typename T >
209  class reserving_port : public receiver<T> {
210  public:
211  typedef T input_type;
212  typedef typename receiver<input_type>::predecessor_type predecessor_type;
213 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
214  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
215  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
216 #endif
217  private:
218  // ----------- Aggregator ------------
220 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
221  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
222 #endif
223  };
226 
227  class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
228  public:
229  char type;
230  union {
231  T *my_arg;
233 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
234  size_t cnt_val;
235  predecessor_list_type *plist;
236 #endif
237  };
239  type(char(t)), my_arg(const_cast<T*>(&e)) {}
241  my_pred(const_cast<predecessor_type *>(&s)) {}
243  };
244 
245  typedef internal::aggregating_functor<class_type, reserving_port_operation> handler_type;
246  friend class internal::aggregating_functor<class_type, reserving_port_operation>;
247  aggregator<handler_type, reserving_port_operation> my_aggregator;
248 
250  reserving_port_operation *current;
251  bool no_predecessors;
252  while(op_list) {
253  current = op_list;
254  op_list = op_list->next;
255  switch(current->type) {
256  case reg_pred:
257  no_predecessors = my_predecessors.empty();
258  my_predecessors.add(*(current->my_pred));
259  if ( no_predecessors ) {
260  (void) my_join->decrement_port_count(true); // may try to forward
261  }
262  __TBB_store_with_release(current->status, SUCCEEDED);
263  break;
264  case rem_pred:
265  my_predecessors.remove(*(current->my_pred));
267  __TBB_store_with_release(current->status, SUCCEEDED);
268  break;
269  case res_item:
270  if ( reserved ) {
271  __TBB_store_with_release(current->status, FAILED);
272  }
273  else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
274  reserved = true;
275  __TBB_store_with_release(current->status, SUCCEEDED);
276  } else {
277  if ( my_predecessors.empty() ) {
279  }
280  __TBB_store_with_release(current->status, FAILED);
281  }
282  break;
283  case rel_res:
284  reserved = false;
286  __TBB_store_with_release(current->status, SUCCEEDED);
287  break;
288  case con_res:
289  reserved = false;
291  __TBB_store_with_release(current->status, SUCCEEDED);
292  break;
293 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
294  case add_blt_pred:
295  my_predecessors.internal_add_built_predecessor(*(current->my_pred));
296  __TBB_store_with_release(current->status, SUCCEEDED);
297  break;
298  case del_blt_pred:
299  my_predecessors.internal_delete_built_predecessor(*(current->my_pred));
300  __TBB_store_with_release(current->status, SUCCEEDED);
301  break;
302  case blt_pred_cnt:
303  current->cnt_val = my_predecessors.predecessor_count();
304  __TBB_store_with_release(current->status, SUCCEEDED);
305  break;
306  case blt_pred_cpy:
307  my_predecessors.copy_predecessors(*(current->plist));
308  __TBB_store_with_release(current->status, SUCCEEDED);
309  break;
310 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
311  }
312  }
313  }
314 
315  protected:
316  template< typename R, typename B > friend class run_and_put_task;
317  template<typename X, typename Y> friend class internal::broadcast_cache;
318  template<typename X, typename Y> friend class internal::round_robin_cache;
320  return NULL;
321  }
322 
324  return my_join->graph_ref;
325  }
326 
327  public:
328 
331  my_join = NULL;
332  my_predecessors.set_owner( this );
333  my_aggregator.initialize_handler(handler_type(this));
334  }
335 
336  // copy constructor
337  reserving_port(const reserving_port& /* other */) : receiver<T>() {
338  reserved = false;
339  my_join = NULL;
340  my_predecessors.set_owner( this );
341  my_aggregator.initialize_handler(handler_type(this));
342  }
343 
345  my_join = join;
346  }
347 
350  reserving_port_operation op_data(src, reg_pred);
351  my_aggregator.execute(&op_data);
352  return op_data.status == SUCCEEDED;
353  }
354 
357  reserving_port_operation op_data(src, rem_pred);
358  my_aggregator.execute(&op_data);
359  return op_data.status == SUCCEEDED;
360  }
361 
363  bool reserve( T &v ) {
365  my_aggregator.execute(&op_data);
366  return op_data.status == SUCCEEDED;
367  }
368 
370  void release( ) {
372  my_aggregator.execute(&op_data);
373  }
374 
376  void consume( ) {
378  my_aggregator.execute(&op_data);
379  }
380 
381 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
382  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
383  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
384  reserving_port_operation op_data(src, add_blt_pred);
385  my_aggregator.execute(&op_data);
386  }
387 
388  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
389  reserving_port_operation op_data(src, del_blt_pred);
390  my_aggregator.execute(&op_data);
391  }
392 
393  size_t predecessor_count() __TBB_override {
394  reserving_port_operation op_data(blt_pred_cnt);
395  my_aggregator.execute(&op_data);
396  return op_data.cnt_val;
397  }
398 
399  void copy_predecessors(predecessor_list_type &l) __TBB_override {
400  reserving_port_operation op_data(blt_pred_cpy);
401  op_data.plist = &l;
402  my_aggregator.execute(&op_data);
403  }
404 
405  void extract_receiver() {
406  my_predecessors.built_predecessors().receiver_extract(*this);
407  }
408 
409 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
410 
413  else
415  reserved = false;
416  __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
417  }
418 
419  private:
422  bool reserved;
423  }; // reserving_port
424 
426  template<typename T>
427  class queueing_port : public receiver<T>, public item_buffer<T> {
428  public:
429  typedef T input_type;
430  typedef typename receiver<input_type>::predecessor_type predecessor_type;
432 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
433  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
434  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
435 #endif
436 
437  // ----------- Aggregator ------------
438  private:
440 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
441  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
442 #endif
443  };
445 
446  class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
447  public:
448  char type;
450  T *my_arg;
451 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
452  predecessor_type *pred;
453  size_t cnt_val;
454  predecessor_list_type *plist;
455 #endif
457  // constructor for value parameter
459  type(char(t)), my_val(e)
460  , bypass_t(NULL)
461  {}
462  // constructor for pointer parameter
464  type(char(t)), my_arg(const_cast<T*>(p))
465  , bypass_t(NULL)
466  {}
467  // constructor with no parameter
469  , bypass_t(NULL)
470  {}
471  };
472 
473  typedef internal::aggregating_functor<class_type, queueing_port_operation> handler_type;
474  friend class internal::aggregating_functor<class_type, queueing_port_operation>;
475  aggregator<handler_type, queueing_port_operation> my_aggregator;
476 
478  queueing_port_operation *current;
479  bool was_empty;
480  while(op_list) {
481  current = op_list;
482  op_list = op_list->next;
483  switch(current->type) {
484  case try__put_task: {
485  task *rtask = NULL;
486  was_empty = this->buffer_empty();
487  this->push_back(current->my_val);
488  if (was_empty) rtask = my_join->decrement_port_count(false);
489  else
490  rtask = SUCCESSFULLY_ENQUEUED;
491  current->bypass_t = rtask;
492  __TBB_store_with_release(current->status, SUCCEEDED);
493  }
494  break;
495  case get__item:
496  if(!this->buffer_empty()) {
497  *(current->my_arg) = this->front();
498  __TBB_store_with_release(current->status, SUCCEEDED);
499  }
500  else {
501  __TBB_store_with_release(current->status, FAILED);
502  }
503  break;
504  case res_port:
505  __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
506  this->destroy_front();
507  if(this->my_item_valid(this->my_head)) {
509  }
510  __TBB_store_with_release(current->status, SUCCEEDED);
511  break;
512 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
513  case add_blt_pred:
514  my_built_predecessors.add_edge(*(current->pred));
515  __TBB_store_with_release(current->status, SUCCEEDED);
516  break;
517  case del_blt_pred:
518  my_built_predecessors.delete_edge(*(current->pred));
519  __TBB_store_with_release(current->status, SUCCEEDED);
520  break;
521  case blt_pred_cnt:
522  current->cnt_val = my_built_predecessors.edge_count();
523  __TBB_store_with_release(current->status, SUCCEEDED);
524  break;
525  case blt_pred_cpy:
526  my_built_predecessors.copy_edges(*(current->plist));
527  __TBB_store_with_release(current->status, SUCCEEDED);
528  break;
529 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
530  }
531  }
532  }
533  // ------------ End Aggregator ---------------
534 
535  protected:
536  template< typename R, typename B > friend class run_and_put_task;
537  template<typename X, typename Y> friend class internal::broadcast_cache;
538  template<typename X, typename Y> friend class internal::round_robin_cache;
541  my_aggregator.execute(&op_data);
542  __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
543  if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
544  return op_data.bypass_t;
545  }
546 
548  return my_join->graph_ref;
549  }
550 
551  public:
552 
555  my_join = NULL;
556  my_aggregator.initialize_handler(handler_type(this));
557  }
558 
560  queueing_port(const queueing_port& /* other */) : receiver<T>(), item_buffer<T>() {
561  my_join = NULL;
562  my_aggregator.initialize_handler(handler_type(this));
563  }
564 
567  my_join = join;
568  }
569 
570  bool get_item( T &v ) {
571  queueing_port_operation op_data(&v, get__item);
572  my_aggregator.execute(&op_data);
573  return op_data.status == SUCCEEDED;
574  }
575 
576  // reset_port is called when item is accepted by successor, but
577  // is initiated by join_node.
578  void reset_port() {
580  my_aggregator.execute(&op_data);
581  return;
582  }
583 
584 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
585  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
586 
587  void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
588  queueing_port_operation op_data(add_blt_pred);
589  op_data.pred = &p;
590  my_aggregator.execute(&op_data);
591  }
592 
593  void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
594  queueing_port_operation op_data(del_blt_pred);
595  op_data.pred = &p;
596  my_aggregator.execute(&op_data);
597  }
598 
599  size_t predecessor_count() __TBB_override {
600  queueing_port_operation op_data(blt_pred_cnt);
601  my_aggregator.execute(&op_data);
602  return op_data.cnt_val;
603  }
604 
605  void copy_predecessors(predecessor_list_type &l) __TBB_override {
606  queueing_port_operation op_data(blt_pred_cpy);
607  op_data.plist = &l;
608  my_aggregator.execute(&op_data);
609  }
610 
611  void extract_receiver() {
613  my_built_predecessors.receiver_extract(*this);
614  }
615 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
616 
620 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
621  if (f & rf_clear_edges)
622  my_built_predecessors.clear();
623 #endif
624  }
625 
626  private:
628 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
629  edge_container<predecessor_type> my_built_predecessors;
630 #endif
631  }; // queueing_port
632 
634 
635  template<typename K>
636  struct count_element {
638  size_t my_value;
639  };
640 
641  // method to access the key in the counting table
642  // the ref has already been removed from K
643  template< typename K >
646  const K& operator()(const table_item_type& v) { return v.my_key; }
647  };
648 
649  // the ports can have only one template parameter. We wrap the types needed in
650  // a traits type
651  template< class TraitsType >
653  public receiver<typename TraitsType::T>,
654  public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
655  typename TraitsType::KHash > {
656  public:
657  typedef TraitsType traits;
659  typedef typename TraitsType::T input_type;
660  typedef typename TraitsType::K key_type;
662  typedef typename receiver<input_type>::predecessor_type predecessor_type;
663  typedef typename TraitsType::TtoK type_to_key_func_type;
664  typedef typename TraitsType::KHash hash_compare_type;
666 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
667  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
668  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
669 #endif
670  private:
671 // ----------- Aggregator ------------
672  private:
674 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
675  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
676 #endif
677  };
679 
680  class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
681  public:
682  char type;
685 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
686  predecessor_type *pred;
687  size_t cnt_val;
688  predecessor_list_type *plist;
689 #endif
690  // constructor for value parameter
692  type(char(t)), my_val(e) {}
693  // constructor for pointer parameter
695  type(char(t)), my_arg(const_cast<input_type*>(p)) {}
696  // constructor with no parameter
698  };
699 
700  typedef internal::aggregating_functor<class_type, key_matching_port_operation> handler_type;
701  friend class internal::aggregating_functor<class_type, key_matching_port_operation>;
702  aggregator<handler_type, key_matching_port_operation> my_aggregator;
703 
706  while(op_list) {
707  current = op_list;
708  op_list = op_list->next;
709  switch(current->type) {
710  case try__put: {
711  bool was_inserted = this->insert_with_key(current->my_val);
712  // return failure if a duplicate insertion occurs
713  __TBB_store_with_release(current->status, was_inserted ? SUCCEEDED : FAILED);
714  }
715  break;
716  case get__item:
717  // use current_key from FE for item
718  if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
719  __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
720  }
721  __TBB_store_with_release(current->status, SUCCEEDED);
722  break;
723  case res_port:
724  // use current_key from FE for item
726  __TBB_store_with_release(current->status, SUCCEEDED);
727  break;
728 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
729  case add_blt_pred:
730  my_built_predecessors.add_edge(*(current->pred));
731  __TBB_store_with_release(current->status, SUCCEEDED);
732  break;
733  case del_blt_pred:
734  my_built_predecessors.delete_edge(*(current->pred));
735  __TBB_store_with_release(current->status, SUCCEEDED);
736  break;
737  case blt_pred_cnt:
738  current->cnt_val = my_built_predecessors.edge_count();
739  __TBB_store_with_release(current->status, SUCCEEDED);
740  break;
741  case blt_pred_cpy:
742  my_built_predecessors.copy_edges(*(current->plist));
743  __TBB_store_with_release(current->status, SUCCEEDED);
744  break;
745 #endif
746  }
747  }
748  }
749 // ------------ End Aggregator ---------------
750  protected:
751  template< typename R, typename B > friend class run_and_put_task;
752  template<typename X, typename Y> friend class internal::broadcast_cache;
753  template<typename X, typename Y> friend class internal::round_robin_cache;
756  task *rtask = NULL;
757  my_aggregator.execute(&op_data);
758  if(op_data.status == SUCCEEDED) {
759  rtask = my_join->increment_key_count((*(this->get_key_func()))(v), false); // may spawn
760  // rtask has to reflect the return status of the try_put
761  if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
762  }
763  return rtask;
764  }
765 
767  return my_join->graph_ref;
768  }
769 
770  public:
771 
773  my_join = NULL;
774  my_aggregator.initialize_handler(handler_type(this));
775  }
776 
777  // copy constructor
778  key_matching_port(const key_matching_port& /*other*/) : receiver<input_type>(), buffer_type() {
779  my_join = NULL;
780  my_aggregator.initialize_handler(handler_type(this));
781  }
782 
784 
786  my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
787  }
788 
790 
792 
793  bool get_item( input_type &v ) {
794  // aggregator uses current_key from FE for Key
796  my_aggregator.execute(&op_data);
797  return op_data.status == SUCCEEDED;
798  }
799 
800 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
801  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
802 
803  void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
804  key_matching_port_operation op_data(add_blt_pred);
805  op_data.pred = &p;
806  my_aggregator.execute(&op_data);
807  }
808 
809  void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
810  key_matching_port_operation op_data(del_blt_pred);
811  op_data.pred = &p;
812  my_aggregator.execute(&op_data);
813  }
814 
815  size_t predecessor_count() __TBB_override {
816  key_matching_port_operation op_data(blt_pred_cnt);
817  my_aggregator.execute(&op_data);
818  return op_data.cnt_val;
819  }
820 
821  void copy_predecessors(predecessor_list_type &l) __TBB_override {
822  key_matching_port_operation op_data(blt_pred_cpy);
823  op_data.plist = &l;
824  my_aggregator.execute(&op_data);
825  }
826 #endif
827 
828  // reset_port is called when item is accepted by successor, but
829  // is initiated by join_node.
830  void reset_port() {
832  my_aggregator.execute(&op_data);
833  return;
834  }
835 
836 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
837  void extract_receiver() {
839  my_built_predecessors.receiver_extract(*this);
840  }
841 #endif
845 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
846  if (f & rf_clear_edges)
847  my_built_predecessors.clear();
848 #endif
849  }
850 
851  private:
852  // my_join forwarding base used to count number of inputs that
853  // received key.
855 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
856  edge_container<predecessor_type> my_built_predecessors;
857 #endif
858  }; // key_matching_port
859 
860  using namespace graph_policy_namespace;
861 
862  template<typename JP, typename InputTuple, typename OutputTuple>
864 
866  template<typename JP, typename InputTuple, typename OutputTuple>
868 
869  template<typename InputTuple, typename OutputTuple>
870  class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
871  public:
873  typedef OutputTuple output_type;
874  typedef InputTuple input_type;
876 
877  join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
878  ports_with_no_inputs = N;
879  join_helper<N>::set_join_node_pointer(my_inputs, this);
880  }
881 
882  join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
883  ports_with_no_inputs = N;
884  join_helper<N>::set_join_node_pointer(my_inputs, this);
885  }
886 
887  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
888 
890  ++ports_with_no_inputs;
891  }
892 
893  // if all input_ports have predecessors, spawn forward to try and consume tuples
895  if(ports_with_no_inputs.fetch_and_decrement() == 1) {
896  if(internal::is_graph_active(this->graph_ref)) {
897  task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
899  if(!handle_task) return rtask;
900  internal::spawn_in_graph_arena(this->graph_ref, *rtask);
901  }
902  }
903  return NULL;
904  }
905 
906  input_type &input_ports() { return my_inputs; }
907 
908  protected:
909 
910  void reset( reset_flags f) {
911  // called outside of parallel contexts
912  ports_with_no_inputs = N;
913  join_helper<N>::reset_inputs(my_inputs, f);
914  }
915 
916 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
917  void extract( ) {
918  // called outside of parallel contexts
919  ports_with_no_inputs = N;
921  }
922 #endif
923 
924  // all methods on input ports should be called under mutual exclusion from join_node_base.
925 
927  return !ports_with_no_inputs;
928  }
929 
931  if(ports_with_no_inputs) return false;
932  return join_helper<N>::reserve(my_inputs, out);
933  }
934 
935  void tuple_accepted() {
937  }
938  void tuple_rejected() {
940  }
941 
944  atomic<size_t> ports_with_no_inputs;
945  }; // join_node_FE<reserving, ... >
946 
947  template<typename InputTuple, typename OutputTuple>
948  class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
949  public:
951  typedef OutputTuple output_type;
952  typedef InputTuple input_type;
954 
955  join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
956  ports_with_no_items = N;
957  join_helper<N>::set_join_node_pointer(my_inputs, this);
958  }
959 
960  join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
961  ports_with_no_items = N;
962  join_helper<N>::set_join_node_pointer(my_inputs, this);
963  }
964 
965  // needed for forwarding
966  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
967 
969  ports_with_no_items = N;
970  }
971 
972  // if all input_ports have items, spawn forward to try and consume tuples
974  {
975  if(ports_with_no_items.fetch_and_decrement() == 1) {
976  if(internal::is_graph_active(this->graph_ref)) {
977  task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
979  if(!handle_task) return rtask;
980  internal::spawn_in_graph_arena(this->graph_ref, *rtask);
981  }
982  }
983  return NULL;
984  }
985 
986  void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); } // should never be called
987 
988  input_type &input_ports() { return my_inputs; }
989 
990  protected:
991 
992  void reset( reset_flags f) {
993  reset_port_count();
994  join_helper<N>::reset_inputs(my_inputs, f );
995  }
996 
997 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
998  void extract() {
999  reset_port_count();
1000  join_helper<N>::extract_inputs(my_inputs);
1001  }
1002 #endif
1003  // all methods on input ports should be called under mutual exclusion from join_node_base.
1004 
1006  return !ports_with_no_items;
1007  }
1008 
1010  if(ports_with_no_items) return false;
1011  return join_helper<N>::get_items(my_inputs, out);
1012  }
1013 
1015  reset_port_count();
1016  join_helper<N>::reset_ports(my_inputs);
1017  }
1019  // nothing to do.
1020  }
1021 
1024  atomic<size_t> ports_with_no_items;
1025  }; // join_node_FE<queueing, ...>
1026 
1027  // key_matching join front-end.
1028  template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
1029  class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
1030  // buffer of key value counts
1031  public hash_buffer< // typedefed below to key_to_count_buffer_type
1032  typename tbb::internal::strip<K>::type&, // force ref type on K
1033  count_element<typename tbb::internal::strip<K>::type>,
1034  internal::type_to_key_function_body<
1035  count_element<typename tbb::internal::strip<K>::type>,
1036  typename tbb::internal::strip<K>::type& >,
1037  KHash >,
1038  // buffer of output items
1039  public item_buffer<OutputTuple> {
1040  public:
1042  typedef OutputTuple output_type;
1043  typedef InputTuple input_type;
1044  typedef K key_type;
1046  typedef KHash key_hash_compare;
1047  // must use K without ref.
1049  // method that lets us refer to the key of this type.
1053  // this is the type of the special table that keeps track of the number of discrete
1054  // elements corresponding to each key that we've seen.
1058  typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
1060 
1061 // ----------- Aggregator ------------
1062  // the aggregator is only needed to serialize the access to the hash table.
1063  // and the output_buffer_type base class
1064  private:
1065  enum op_type { res_count, inc_count, may_succeed, try_make };
1068 
1069  class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
1070  public:
1071  char type;
1076  // constructor for value parameter
1077  key_matching_FE_operation(const unref_key_type& e , bool q_task , op_type t) : type(char(t)), my_val(e),
1078  my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1079  key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(NULL),
1080  enqueue_task(true) {}
1081  // constructor with no parameter
1082  key_matching_FE_operation(op_type t) : type(char(t)), my_output(NULL), bypass_t(NULL), enqueue_task(true) {}
1083  };
1084 
1085  typedef internal::aggregating_functor<class_type, key_matching_FE_operation> handler_type;
1086  friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1087  aggregator<handler_type, key_matching_FE_operation> my_aggregator;
1088 
1089  // called from aggregator, so serialized
1090  // returns a task pointer if the a task would have been enqueued but we asked that
1091  // it be returned. Otherwise returns NULL.
1092  task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task) {
1093  output_type l_out;
1094  task *rtask = NULL;
1095  bool do_fwd = should_enqueue && this->buffer_empty() && internal::is_graph_active(this->graph_ref);
1096  this->current_key = t;
1097  this->delete_with_key(this->current_key); // remove the key
1098  if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back
1099  this->push_back(l_out);
1100  if(do_fwd) { // we enqueue if receiving an item from predecessor, not if successor asks for item
1101  rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
1103  if(handle_task) {
1104  internal::spawn_in_graph_arena(this->graph_ref, *rtask);
1105  rtask = NULL;
1106  }
1107  do_fwd = false;
1108  }
1109  // retire the input values
1110  join_helper<N>::reset_ports(my_inputs); // <== call back
1111  }
1112  else {
1113  __TBB_ASSERT(false, "should have had something to push");
1114  }
1115  return rtask;
1116  }
1117 
1118  void handle_operations(key_matching_FE_operation* op_list) {
1119  key_matching_FE_operation *current;
1120  while(op_list) {
1121  current = op_list;
1122  op_list = op_list->next;
1123  switch(current->type) {
1124  case res_count: // called from BE
1125  {
1126  this->destroy_front();
1127  __TBB_store_with_release(current->status, SUCCEEDED);
1128  }
1129  break;
1130  case inc_count: { // called from input ports
1131  count_element_type *p = 0;
1132  unref_key_type &t = current->my_val;
1133  bool do_enqueue = current->enqueue_task;
1134  if(!(this->find_ref_with_key(t,p))) {
1135  count_element_type ev;
1136  ev.my_key = t;
1137  ev.my_value = 0;
1138  this->insert_with_key(ev);
1139  if(!(this->find_ref_with_key(t,p))) {
1140  __TBB_ASSERT(false, "should find key after inserting it");
1141  }
1142  }
1143  if(++(p->my_value) == size_t(N)) {
1144  task *rtask = fill_output_buffer(t, true, do_enqueue);
1145  __TBB_ASSERT(!rtask || !do_enqueue, "task should not be returned");
1146  current->bypass_t = rtask;
1147  }
1148  }
1149  __TBB_store_with_release(current->status, SUCCEEDED);
1150  break;
1151  case may_succeed: // called from BE
1152  __TBB_store_with_release(current->status, this->buffer_empty() ? FAILED : SUCCEEDED);
1153  break;
1154  case try_make: // called from BE
1155  if(this->buffer_empty()) {
1156  __TBB_store_with_release(current->status, FAILED);
1157  }
1158  else {
1159  *(current->my_output) = this->front();
1160  __TBB_store_with_release(current->status, SUCCEEDED);
1161  }
1162  break;
1163  }
1164  }
1165  }
1166 // ------------ End Aggregator ---------------
1167 
1168  public:
1169  template<typename FunctionTuple>
1170  join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1171  join_helper<N>::set_join_node_pointer(my_inputs, this);
1172  join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
1173  my_aggregator.initialize_handler(handler_type(this));
1175  this->set_key_func(cfb);
1176  }
1177 
1179  output_buffer_type() {
1180  my_node = NULL;
1181  join_helper<N>::set_join_node_pointer(my_inputs, this);
1182  join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
1183  my_aggregator.initialize_handler(handler_type(this));
1185  this->set_key_func(cfb);
1186  }
1187 
1188  // needed for forwarding
1189  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1190 
1191  void reset_port_count() { // called from BE
1192  key_matching_FE_operation op_data(res_count);
1193  my_aggregator.execute(&op_data);
1194  return;
1195  }
1196 
1197  // if all input_ports have items, spawn forward to try and consume tuples
1198  // return a task if we are asked and did create one.
1199  task *increment_key_count(unref_key_type const & t, bool handle_task) __TBB_override { // called from input_ports
1200  key_matching_FE_operation op_data(t, handle_task, inc_count);
1201  my_aggregator.execute(&op_data);
1202  return op_data.bypass_t;
1203  }
1204 
1205  task *decrement_port_count(bool /*handle_task*/) __TBB_override { __TBB_ASSERT(false, NULL); return NULL; }
1206 
1207  void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); } // should never be called
1208 
1209  input_type &input_ports() { return my_inputs; }
1210 
1211  protected:
1212 
1213  void reset( reset_flags f ) {
1214  // called outside of parallel contexts
1215  join_helper<N>::reset_inputs(my_inputs, f);
1216 
1217  key_to_count_buffer_type::reset();
1218  output_buffer_type::reset();
1219  }
1220 
1221 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1222  void extract() {
1223  // called outside of parallel contexts
1224  join_helper<N>::extract_inputs(my_inputs);
1225  key_to_count_buffer_type::reset(); // have to reset the tag counts
1226  output_buffer_type::reset(); // also the queue of outputs
1227  // my_node->current_tag = NO_TAG;
1228  }
1229 #endif
1230  // all methods on input ports should be called under mutual exclusion from join_node_base.
1231 
1232  bool tuple_build_may_succeed() { // called from back-end
1233  key_matching_FE_operation op_data(may_succeed);
1234  my_aggregator.execute(&op_data);
1235  return op_data.status == SUCCEEDED;
1236  }
1237 
1238  // cannot lock while calling back to input_ports. current_key will only be set
1239  // and reset under the aggregator, so it will remain consistent.
1241  key_matching_FE_operation op_data(&out,try_make);
1242  my_aggregator.execute(&op_data);
1243  return op_data.status == SUCCEEDED;
1244  }
1245 
1247  reset_port_count(); // reset current_key after ports reset.
1248  }
1249 
1251  // nothing to do.
1252  }
1253 
1254  input_type my_inputs; // input ports
1256  }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1257 
1259  template<typename JP, typename InputTuple, typename OutputTuple>
1260  class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1261  public sender<OutputTuple> {
1262  protected:
1263  using graph_node::my_graph;
1264  public:
1265  typedef OutputTuple output_type;
1266 
1267  typedef typename sender<output_type>::successor_type successor_type;
1269  using input_ports_type::tuple_build_may_succeed;
1270  using input_ports_type::try_to_make_tuple;
1271  using input_ports_type::tuple_accepted;
1272  using input_ports_type::tuple_rejected;
1273 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1274  typedef typename sender<output_type>::built_successors_type built_successors_type;
1275  typedef typename sender<output_type>::successor_list_type successor_list_type;
1276 #endif
1277 
1278  private:
1279  // ----------- Aggregator ------------
1280  enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1281 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1282  , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1283 #endif
1284  };
1287 
1288  class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1289  public:
1290  char type;
1291  union {
1294 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1295  size_t cnt_val;
1296  successor_list_type *slist;
1297 #endif
1298  };
1301  my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1303  my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1304  join_node_base_operation(op_type t) : type(char(t)), bypass_t(NULL) {}
1305  };
1306 
1307  typedef internal::aggregating_functor<class_type, join_node_base_operation> handler_type;
1308  friend class internal::aggregating_functor<class_type, join_node_base_operation>;
1310  aggregator<handler_type, join_node_base_operation> my_aggregator;
1311 
1313  join_node_base_operation *current;
1314  while(op_list) {
1315  current = op_list;
1316  op_list = op_list->next;
1317  switch(current->type) {
1318  case reg_succ: {
1319  my_successors.register_successor(*(current->my_succ));
1320  if(tuple_build_may_succeed() && !forwarder_busy && internal::is_graph_active(my_graph)) {
1321  task *rtask = new ( task::allocate_additional_child_of(*(my_graph.root_task())) )
1324  internal::spawn_in_graph_arena(my_graph, *rtask);
1325  forwarder_busy = true;
1326  }
1327  __TBB_store_with_release(current->status, SUCCEEDED);
1328  }
1329  break;
1330  case rem_succ:
1331  my_successors.remove_successor(*(current->my_succ));
1332  __TBB_store_with_release(current->status, SUCCEEDED);
1333  break;
1334  case try__get:
1335  if(tuple_build_may_succeed()) {
1336  if(try_to_make_tuple(*(current->my_arg))) {
1337  tuple_accepted();
1338  __TBB_store_with_release(current->status, SUCCEEDED);
1339  }
1340  else __TBB_store_with_release(current->status, FAILED);
1341  }
1342  else __TBB_store_with_release(current->status, FAILED);
1343  break;
1344  case do_fwrd_bypass: {
1345  bool build_succeeded;
1346  task *last_task = NULL;
1347  output_type out;
1348  if(tuple_build_may_succeed()) { // checks output queue of FE
1349  do {
1350  build_succeeded = try_to_make_tuple(out); // fetch front_end of queue
1351  if(build_succeeded) {
1352  task *new_task = my_successors.try_put_task(out);
1353  last_task = combine_tasks(my_graph, last_task, new_task);
1354  if(new_task) {
1355  tuple_accepted();
1356  }
1357  else {
1358  tuple_rejected();
1359  build_succeeded = false;
1360  }
1361  }
1362  } while(build_succeeded);
1363  }
1364  current->bypass_t = last_task;
1365  __TBB_store_with_release(current->status, SUCCEEDED);
1366  forwarder_busy = false;
1367  }
1368  break;
1369 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1370  case add_blt_succ:
1371  my_successors.internal_add_built_successor(*(current->my_succ));
1372  __TBB_store_with_release(current->status, SUCCEEDED);
1373  break;
1374  case del_blt_succ:
1375  my_successors.internal_delete_built_successor(*(current->my_succ));
1376  __TBB_store_with_release(current->status, SUCCEEDED);
1377  break;
1378  case blt_succ_cnt:
1379  current->cnt_val = my_successors.successor_count();
1380  __TBB_store_with_release(current->status, SUCCEEDED);
1381  break;
1382  case blt_succ_cpy:
1383  my_successors.copy_successors(*(current->slist));
1384  __TBB_store_with_release(current->status, SUCCEEDED);
1385  break;
1386 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1387  }
1388  }
1389  }
1390  // ---------- end aggregator -----------
1391  public:
1392  join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1393  my_successors.set_owner(this);
1394  input_ports_type::set_my_node(this);
1395  my_aggregator.initialize_handler(handler_type(this));
1396  }
1397 
1399  graph_node(other.graph_node::my_graph), input_ports_type(other),
1400  sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1401  my_successors.set_owner(this);
1402  input_ports_type::set_my_node(this);
1403  my_aggregator.initialize_handler(handler_type(this));
1404  }
1405 
1406  template<typename FunctionTuple>
1407  join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1408  my_successors.set_owner(this);
1409  input_ports_type::set_my_node(this);
1410  my_aggregator.initialize_handler(handler_type(this));
1411  }
1412 
1414  join_node_base_operation op_data(r, reg_succ);
1415  my_aggregator.execute(&op_data);
1416  return op_data.status == SUCCEEDED;
1417  }
1418 
1420  join_node_base_operation op_data(r, rem_succ);
1421  my_aggregator.execute(&op_data);
1422  return op_data.status == SUCCEEDED;
1423  }
1424 
1426  join_node_base_operation op_data(v, try__get);
1427  my_aggregator.execute(&op_data);
1428  return op_data.status == SUCCEEDED;
1429  }
1430 
1431 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1432  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1433 
1434  void internal_add_built_successor( successor_type &r) __TBB_override {
1435  join_node_base_operation op_data(r, add_blt_succ);
1436  my_aggregator.execute(&op_data);
1437  }
1438 
1439  void internal_delete_built_successor( successor_type &r) __TBB_override {
1440  join_node_base_operation op_data(r, del_blt_succ);
1441  my_aggregator.execute(&op_data);
1442  }
1443 
1444  size_t successor_count() __TBB_override {
1445  join_node_base_operation op_data(blt_succ_cnt);
1446  my_aggregator.execute(&op_data);
1447  return op_data.cnt_val;
1448  }
1449 
1450  void copy_successors(successor_list_type &l) __TBB_override {
1451  join_node_base_operation op_data(blt_succ_cpy);
1452  op_data.slist = &l;
1453  my_aggregator.execute(&op_data);
1454  }
1455 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1456 
1457 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1458  void extract() __TBB_override {
1459  input_ports_type::extract();
1460  my_successors.built_successors().sender_extract(*this);
1461  }
1462 #endif
1463 
1464  protected:
1465 
1467  input_ports_type::reset(f);
1468  if(f & rf_clear_edges) my_successors.clear();
1469  }
1470 
1471  private:
1473 
1474  friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1476  join_node_base_operation op_data(do_fwrd_bypass);
1477  my_aggregator.execute(&op_data);
1478  return op_data.bypass_t;
1479  }
1480 
1481  }; // join_node_base
1482 
1483  // join base class type generator
1484  template<int N, template<class> class PT, typename OutputTuple, typename JP>
1485  struct join_base {
1487  };
1488 
1489  template<int N, typename OutputTuple, typename K, typename KHash>
1490  struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1492  typedef K key_type;
1493  typedef KHash key_hash_compare;
1494  typedef typename internal::join_node_base< key_traits_type,
1495  // ports type
1497  OutputTuple > type;
1498  };
1499 
1501  // using tuple_element. The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1502  // and should match the typename.
1503 
1504  template<int N, template<class> class PT, typename OutputTuple, typename JP>
1505  class unfolded_join_node : public join_base<N,PT,OutputTuple,JP>::type {
1506  public:
1508  typedef OutputTuple output_type;
1509  private:
1511  public:
1512  unfolded_join_node(graph &g) : base_type(g) {}
1514  };
1515 
1516 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1517  template <typename K, typename T>
1518  struct key_from_message_body {
1519  K operator()(const T& t) const {
1521  return key_from_message<K>(t);
1522  }
1523  };
1524  // Adds const to reference type
1525  template <typename K, typename T>
1526  struct key_from_message_body<K&,T> {
1527  const K& operator()(const T& t) const {
1529  return key_from_message<const K&>(t);
1530  }
1531  };
1532 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1533  // key_matching unfolded_join_node. This must be a separate specialization because the constructors
1534  // differ.
1535 
1536  template<typename OutputTuple, typename K, typename KHash>
1537  class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1538  join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1541  public:
1543  typedef OutputTuple output_type;
1544  private:
1548  typedef typename tbb::flow::tuple< f0_p, f1_p > func_initializer_type;
1549  public:
1550 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1551  unfolded_join_node(graph &g) : base_type(g,
1553  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1554  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1555  ) ) {
1556  }
1557 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1558  template<typename Body0, typename Body1>
1559  unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1561  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1562  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1)
1563  ) ) {
1564  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1565  }
1567  };
1568 
1569  template<typename OutputTuple, typename K, typename KHash>
1570  class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1571  join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1575  public:
1577  typedef OutputTuple output_type;
1578  private:
1583  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1584  public:
1585 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1586  unfolded_join_node(graph &g) : base_type(g,
1588  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1589  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1590  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1591  ) ) {
1592  }
1593 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1594  template<typename Body0, typename Body1, typename Body2>
1595  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1597  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1598  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1599  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2)
1600  ) ) {
1601  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1602  }
1604  };
1605 
1606  template<typename OutputTuple, typename K, typename KHash>
1607  class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1608  join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1613  public:
1615  typedef OutputTuple output_type;
1616  private:
1622  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1623  public:
1624 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1625  unfolded_join_node(graph &g) : base_type(g,
1627  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1628  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1629  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1630  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1631  ) ) {
1632  }
1633 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1634  template<typename Body0, typename Body1, typename Body2, typename Body3>
1635  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1637  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1638  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1639  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1640  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3)
1641  ) ) {
1642  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1643  }
1645  };
1646 
1647  template<typename OutputTuple, typename K, typename KHash>
1648  class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1649  join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1655  public:
1657  typedef OutputTuple output_type;
1658  private:
1665  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1666  public:
1667 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1668  unfolded_join_node(graph &g) : base_type(g,
1670  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1671  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1672  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1673  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1674  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1675  ) ) {
1676  }
1677 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1678  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1679  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1681  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1682  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1683  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1684  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1685  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4)
1686  ) ) {
1687  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1688  }
1690  };
1691 
1692 #if __TBB_VARIADIC_MAX >= 6
1693  template<typename OutputTuple, typename K, typename KHash>
1694  class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1695  join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1702  public:
1703  typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1704  typedef OutputTuple output_type;
1705  private:
1706  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1707  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1708  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1709  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1710  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1711  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1712  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1713  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1714  public:
1715 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1716  unfolded_join_node(graph &g) : base_type(g,
1717  func_initializer_type(
1718  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1719  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1720  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1721  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1722  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1723  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1724  ) ) {
1725  }
1726 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1727  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1728  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1729  : base_type(g, func_initializer_type(
1730  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1731  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1732  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1733  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1734  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1735  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5)
1736  ) ) {
1737  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1738  }
1739  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1740  };
1741 #endif
1742 
1743 #if __TBB_VARIADIC_MAX >= 7
1744  template<typename OutputTuple, typename K, typename KHash>
1745  class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1746  join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1754  public:
1755  typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1756  typedef OutputTuple output_type;
1757  private:
1758  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1759  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1760  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1761  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1762  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1763  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1764  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1765  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1766  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1767  public:
1768 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1769  unfolded_join_node(graph &g) : base_type(g,
1770  func_initializer_type(
1771  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1772  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1773  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1774  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1775  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1776  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1777  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1778  ) ) {
1779  }
1780 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1781  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1782  typename Body5, typename Body6>
1783  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1784  Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1785  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1786  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1787  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1788  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1789  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1790  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1791  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6)
1792  ) ) {
1793  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1794  }
1795  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1796  };
1797 #endif
1798 
1799 #if __TBB_VARIADIC_MAX >= 8
1800  template<typename OutputTuple, typename K, typename KHash>
1801  class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1802  join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1811  public:
1812  typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1813  typedef OutputTuple output_type;
1814  private:
1815  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1816  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1817  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1818  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1819  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1820  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1821  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1822  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1823  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1824  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1825  public:
1826 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1827  unfolded_join_node(graph &g) : base_type(g,
1828  func_initializer_type(
1829  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1830  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1831  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1832  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1833  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1834  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1835  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1836  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1837  ) ) {
1838  }
1839 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1840  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1841  typename Body5, typename Body6, typename Body7>
1842  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1843  Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1844  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1845  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1846  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1847  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1848  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1849  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1850  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1851  new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7)
1852  ) ) {
1853  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1854  }
1855  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1856  };
1857 #endif
1858 
1859 #if __TBB_VARIADIC_MAX >= 9
1860  template<typename OutputTuple, typename K, typename KHash>
1861  class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1862  join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1872  public:
1873  typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1874  typedef OutputTuple output_type;
1875  private:
1876  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1877  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1878  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1879  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1880  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1881  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1882  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1883  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1884  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1885  typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1886  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1887  public:
1888 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1889  unfolded_join_node(graph &g) : base_type(g,
1890  func_initializer_type(
1891  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1892  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1893  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1894  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1895  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1896  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1897  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1898  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1899  new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1900  ) ) {
1901  }
1902 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1903  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1904  typename Body5, typename Body6, typename Body7, typename Body8>
1905  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1906  Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1907  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1908  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1909  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1910  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1911  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1912  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1913  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1914  new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1915  new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8)
1916  ) ) {
1917  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1918  }
1919  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1920  };
1921 #endif
1922 
1923 #if __TBB_VARIADIC_MAX >= 10
1924  template<typename OutputTuple, typename K, typename KHash>
1925  class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1926  join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1937  public:
1938  typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1939  typedef OutputTuple output_type;
1940  private:
1941  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1942  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1943  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1944  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1945  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1946  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1947  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1948  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1949  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1950  typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1951  typedef typename internal::type_to_key_function_body<T9, K> *f9_p;
1952  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1953  public:
1954 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1955  unfolded_join_node(graph &g) : base_type(g,
1956  func_initializer_type(
1957  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1958  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1959  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1960  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1961  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1962  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1963  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1964  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1965  new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1966  new internal::type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1967  ) ) {
1968  }
1969 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1970  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1971  typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1972  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1973  Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1974  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1975  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1976  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1977  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1978  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1979  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1980  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1981  new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1982  new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8),
1983  new internal::type_to_key_function_body_leaf<T9, K, Body9>(body9)
1984  ) ) {
1985  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1986  }
1987  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1988  };
1989 #endif
1990 
1992  template<size_t N, typename JNT>
1994  return tbb::flow::get<N>(jn.input_ports());
1995  }
1996 
1997 }
1998 #endif // __TBB__flow_graph_join_impl_H
1999 
void set_owner(successor_type *owner)
static void reset_my_port(InputTuple &my_input)
aggregator< handler_type, join_node_base_operation > my_aggregator
static void consume_reservations(TupleType &my_input)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
tbb::internal::strip< KeyType >::type current_key_type
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
A task that calls a node's forward_task function.
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
aggregator< handler_type, queueing_port_operation > my_aggregator
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
bool reserve(T &v)
Reserve an item from the port.
static void reset_my_port(InputTuple &my_input)
receiver< input_type >::predecessor_type predecessor_type
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
void handle_operations(reserving_port_operation *op_list)
void consume()
Complete use of the port.
void release()
Release the port.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
internal::join_node_base< JP, typename wrap_tuple_elements< N, PT, OutputTuple >::type, OutputTuple > type
void const char const char int ITT_FORMAT __itt_group_sync s
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
graph & graph_reference() __TBB_override
join_node_base< JP, input_ports_type, output_type > base_type
void set_my_key_func(type_to_key_func_type *f)
void reset_node(reset_flags f) __TBB_override
void reset_receiver(reset_flags f) __TBB_override
task * decrement_port_count(bool handle_task) __TBB_override
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
sender< output_type >::successor_type successor_type
const K & operator()(const table_item_type &v)
static void release_my_reservation(TupleType &my_input)
A cache of successors that are broadcast to.
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
task * try_put_task(const T &v) __TBB_override
void handle_operations(key_matching_port_operation *op_list)
receiver< input_type >::predecessor_type predecessor_type
task * increment_key_count(unref_key_type const &t, bool handle_task) __TBB_override
join_node_base(graph &g, FunctionTuple f)
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
join_node_base< reserving, InputTuple, OutputTuple > base_node_type
#define __TBB_override
Definition: tbb_stddef.h:240
void set_join_node_pointer(forwarding_base *join)
join_node_base< JP, InputTuple, OutputTuple > class_type
reserving_port_operation(const predecessor_type &s, op_type t)
bool remove_predecessor(predecessor_type &src) __TBB_override
Remove a predecessor.
broadcast_cache< output_type, null_rw_mutex > my_successors
reservable_predecessor_cache< T, null_mutex > my_predecessors
static bool reserve(InputTuple &my_input, OutputTuple &out)
The two-phase join port.
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void set_join_node_pointer(forwarding_base *join)
bool register_predecessor(predecessor_type &src) __TBB_override
Add a predecessor.
wrap_key_tuple_elements< 2, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
static bool get_items(InputTuple &my_input, OutputTuple &out)
internal::aggregating_functor< class_type, queueing_port_operation > handler_type
static tbb::task *const SUCCESSFULLY_ENQUEUED
wrap_key_tuple_elements< 4, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
join_node_base< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > base_node_type
aggregator< handler_type, key_matching_FE_operation > my_aggregator
wrap_key_tuple_elements< 3, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
task * try_put_task(const T &) __TBB_override
Release.
Definition: atomic.h:45
internal::join_node_base< key_traits_type, typename wrap_key_tuple_elements< N, key_matching_port, key_traits_type, OutputTuple >::type, OutputTuple > type
reserving_port< T > class_type
task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task)
aggregator< handler_type, reserving_port_operation > my_aggregator
matching_forwarding_base< key_type > * my_join
void reset_receiver(reset_flags f) __TBB_override
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
wrap_key_tuple_elements< 5, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
join_node_FE : implements input port policy
bool my_item_valid(size_type i) const
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
Definition: tbb_stddef.h:377
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
static void release_my_reservation(TupleType &my_input)
join_node_base(const join_node_base &other)
join_node_base_operation(const successor_type &s, op_type t)
static void set_join_node_pointer(TupleType &my_input, PortType *port)
A cache of successors that are put in a round-robin fashion.
field of type K being used for matching.
aggregator< handler_type, key_matching_port_operation > my_aggregator
void set_join_node_pointer(forwarding_base *join)
record parent for tallying available items
K key_from_message(const T &t)
Definition: flow_graph.h:691
hash_buffer< unref_key_type &, count_element_type, TtoK_function_body_type, key_hash_compare > key_to_count_buffer_type
type_to_key_func_type * get_my_key_func()
internal::aggregating_functor< class_type, key_matching_FE_operation > handler_type
graph & graph_reference() __TBB_override
void handle_operations(join_node_base_operation *op_list)
static bool reserve(InputTuple &my_input, OutputTuple &out)
virtual void increment_port_count()=0
void reset_receiver(reset_flags f) __TBB_override
internal::aggregating_functor< class_type, reserving_port_operation > handler_type
join_node_FE< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > class_type
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
void const char const char int ITT_FORMAT __itt_group_sync p
virtual task * decrement_port_count(bool handle_task)=0
hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type
internal::type_to_key_function_body< count_element_type, unref_key_type & > TtoK_function_body_type
graph & graph_reference() __TBB_override
reserving_port(const reserving_port &)
static void release_reservations(TupleType &my_input)
join_node_base< queueing, InputTuple, OutputTuple > base_node_type
static void reset_ports(InputTuple &my_input)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
tbb::internal::strip< key_type >::type noref_key_type
const item_type & front() const
internal::aggregating_functor< class_type, join_node_base_operation > handler_type
queueing_port(const queueing_port &)
copy constructor
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
key_matching_port< traits > class_type
internal::type_to_key_function_body_leaf< count_element_type, unref_key_type &, key_to_count_func > TtoK_function_body_leaf_type
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
internal::aggregating_functor< class_type, key_matching_port_operation > handler_type
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:167
virtual task * increment_key_count(current_key_type const &, bool)=0
static void reset_inputs(InputTuple &my_input, reset_flags f)
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4)
receiver< input_type >::predecessor_type predecessor_type
unfolded_join_node(const unfolded_join_node &other)
bool try_get(output_type &v) __TBB_override
Request an item from the sender.
join_node_FE< JP, InputTuple, OutputTuple > input_ports_type
static void set_join_node_pointer(TupleType &my_input, PortType *port)
static bool get_items(InputTuple &my_input, OutputTuple &out)
key_matching_port(const key_matching_port &)
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
static void consume_reservations(TupleType &my_input)
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:532
static void reset_inputs(InputTuple &my_input, reset_flags f)
void handle_operations(queueing_port_operation *op_list)
static void reset_ports(InputTuple &my_input)
static void release_reservations(TupleType &my_input)

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.