Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_impl_H
18 #define __TBB_flow_graph_impl_H
19 
20 #include "../tbb_stddef.h"
21 #include "../task.h"
22 #include "../task_arena.h"
23 #include "../flow_graph_abstractions.h"
24 
25 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
26 #include "../concurrent_priority_queue.h"
27 #endif
28 
29 #include <list>
30 
31 #if TBB_DEPRECATED_FLOW_ENQUEUE
32 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
33 #else
34 #define FLOW_SPAWN(a) tbb::task::spawn((a))
35 #endif
36 
37 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
38 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
39 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
40 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
41 #else
42 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
43 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
44 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
45 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
46 
47 namespace tbb {
48 namespace flow {
49 
50 namespace internal {
51 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
52 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
53 typedef unsigned int node_priority_t;
55 #endif
56 }
57 
58 namespace interface10 {
59 
61 
62 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
66 struct graph_task : public task {
67  graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
69 };
70 #else
71 typedef task graph_task;
72 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
73 
74 
75 class graph;
76 class graph_node;
77 
78 template <typename GraphContainerType, typename GraphNodeType>
80  friend class graph;
81  friend class graph_node;
82 public:
83  typedef size_t size_type;
84  typedef GraphNodeType value_type;
85  typedef GraphNodeType* pointer;
86  typedef GraphNodeType& reference;
87  typedef const GraphNodeType& const_reference;
88  typedef std::forward_iterator_tag iterator_category;
89 
91  graph_iterator() : my_graph(NULL), current_node(NULL) {}
92 
96  {}
97 
100  if (this != &other) {
101  my_graph = other.my_graph;
102  current_node = other.current_node;
103  }
104  return *this;
105  }
106 
108  reference operator*() const;
109 
111  pointer operator->() const;
112 
114  bool operator==(const graph_iterator& other) const {
115  return ((my_graph == other.my_graph) && (current_node == other.current_node));
116  }
117 
119  bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
120 
124  return *this;
125  }
126 
129  graph_iterator result = *this;
130  operator++();
131  return result;
132  }
133 
134 private:
135  // the graph over which we are iterating
136  GraphContainerType *my_graph;
137  // pointer into my_graph's my_nodes list
139 
141  graph_iterator(GraphContainerType *g, bool begin);
142  void internal_forward();
143 }; // class graph_iterator
144 
145 // flags to modify the behavior of the graph reset(). Can be combined.
148  rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
149  rf_clear_edges = 1 << 1 // delete edges
150 };
151 
152 namespace internal {
153 
154 void activate_graph(graph& g);
155 void deactivate_graph(graph& g);
156 bool is_graph_active(graph& g);
157 void spawn_in_graph_arena(graph& g, tbb::task& arena_task);
159 template<typename F> void execute_in_graph_arena(graph& g, F& f);
160 
161 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
163  bool operator()(const graph_task* left, const graph_task* right) {
164  return left->priority < right->priority;
165  }
166 };
167 
169 
170 class priority_task_selector : public task {
171 public:
173  : my_priority_queue(priority_queue) {}
175  graph_task* t = NULL;
176  bool result = my_priority_queue.try_pop(t);
177  __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
178  " in graph's priority queue mismatched" );
180  "Incorrect task submitted to graph priority queue" );
182  "Tasks from graph's priority queue must have priority" );
183  task* t_next = t->execute();
184  task::destroy(*t);
185  return t_next;
186  }
187 private:
189 };
190 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
191 
192 }
193 
195 
197  friend class graph_node;
198 
199  template< typename Body >
200  class run_task : public graph_task {
201  public:
202  run_task(Body& body
204  , node_priority_t node_priority = no_priority
205  ) : graph_task(node_priority),
206 #else
207  ) :
208 #endif
209  my_body(body) { }
211  my_body();
212  return NULL;
213  }
214  private:
215  Body my_body;
216  };
217 
218  template< typename Receiver, typename Body >
219  class run_and_put_task : public graph_task {
220  public:
221  run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
223  tbb::task *res = my_receiver.try_put_task(my_body());
224  if (res == SUCCESSFULLY_ENQUEUED) res = NULL;
225  return res;
226  }
227  private:
228  Receiver &my_receiver;
229  Body my_body;
230  };
231  typedef std::list<tbb::task *> task_list_type;
232 
233  class wait_functor {
235  public:
238  };
239 
243  public:
245  void operator()() const {
247  }
248  };
249 
250  void prepare_task_arena(bool reinit = false) {
251  if (reinit) {
252  __TBB_ASSERT(my_task_arena, "task arena is NULL");
255  }
256  else {
257  __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
259  }
260  if (!my_task_arena->is_active()) // failed to attach
261  my_task_arena->initialize(); // create a new, default-initialized arena
262  __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
263  }
264 
265 public:
267  graph();
268 
270  explicit graph(tbb::task_group_context& use_this_context);
271 
273 
274  ~graph();
275 
276 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
277  void set_name(const char *name);
278 #endif
279 
281  reserve_wait();
282  }
283 
285  release_wait();
286  }
287 
289 
292 
294 
297 
299 
301  template< typename Receiver, typename Body >
302  void run(Receiver &r, Body body) {
303  if (internal::is_graph_active(*this)) {
304  task* rtask = new (task::allocate_additional_child_of(*root_task()))
307  }
308  }
309 
311 
313  template< typename Body >
314  void run(Body body) {
315  if (internal::is_graph_active(*this)) {
316  task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
318  }
319  }
320 
322 
323  void wait_for_all() {
324  cancelled = false;
325  caught_exception = false;
326  if (my_root_task) {
327 #if TBB_USE_EXCEPTIONS
328  try {
329 #endif
332 #if TBB_USE_EXCEPTIONS
333  }
334  catch (...) {
336  my_context->reset();
337  caught_exception = true;
338  cancelled = true;
339  throw;
340  }
341 #endif
342  // TODO: the "if" condition below is just a work-around to support the concurrent wait
343  // mode. The cancellation and exception mechanisms are still broken in this mode.
344  // Consider using task group not to re-implement the same functionality.
346  my_context->reset(); // consistent with behavior in catch()
348  }
349  }
350  }
351 
354  return my_root_task;
355  }
356 
357  // ITERATORS
358  template<typename C, typename N>
359  friend class graph_iterator;
360 
361  // Graph iterator typedefs
364 
365  // Graph iterator constructors
367  iterator begin();
369  iterator end();
371  const_iterator begin() const;
373  const_iterator end() const;
375  const_iterator cbegin() const;
377  const_iterator cend() const;
378 
380  bool is_cancelled() { return cancelled; }
382 
383  // thread-unsafe state reset.
385 
386 private:
390  bool cancelled;
394 
396 
398  void register_node(graph_node *n);
399  void remove_node(graph_node *n);
400 
402 
403 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
405 #endif
406 
407  friend void internal::activate_graph(graph& g);
408  friend void internal::deactivate_graph(graph& g);
409  friend bool internal::is_graph_active(graph& g);
410  friend void internal::spawn_in_graph_arena(graph& g, tbb::task& arena_task);
412  template<typename F> friend void internal::execute_in_graph_arena(graph& g, F& f);
413 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
414  template<typename Input, typename Output, typename Policy, typename Allocator>
415  friend class async_node;
416 #endif
417 
419 
420 }; // class graph
421 
424  friend class graph;
425  template<typename C, typename N>
426  friend class graph_iterator;
427 protected:
430 public:
431  explicit graph_node(graph& g);
432 
433  virtual ~graph_node();
434 
435 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
436  virtual void set_name(const char *name) = 0;
437 #endif
438 
439 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
440  virtual void extract() = 0;
441 #endif
442 
443 protected:
444  // performs the reset on an individual node.
445  virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
446 }; // class graph_node
447 
448 namespace internal {
449 
450 inline void activate_graph(graph& g) {
451  g.my_is_active = true;
452 }
453 
454 inline void deactivate_graph(graph& g) {
455  g.my_is_active = false;
456 }
457 
458 inline bool is_graph_active(graph& g) {
459  return g.my_is_active;
460 }
461 
463 template<typename F>
464 inline void execute_in_graph_arena(graph& g, F& f) {
465  if (is_graph_active(g)) {
467  g.my_task_arena->execute(f);
468  }
469 }
470 
472 inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
473  task* task_to_spawn = &arena_task;
474 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
475  // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
476  graph_task* t = static_cast<graph_task*>(&arena_task);
477  if( t->priority != no_priority ) {
482  task_to_spawn = new( t->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
483  tbb::internal::make_critical( *task_to_spawn );
484  g.my_priority_queue.push(t);
485  }
486 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
487  graph::spawn_functor s_fn(*task_to_spawn);
488  execute_in_graph_arena(g, s_fn);
489 }
490 
492  g.my_reset_task_list.push_back(tp);
493 }
494 
495 } // namespace internal
496 
497 } // namespace interface10
498 } // namespace flow
499 } // namespace tbb
500 
501 #endif // __TBB_flow_graph_impl_H
reference operator*() const
Dereference.
Definition: flow_graph.h:725
graph_iterator & operator++()
Pre-increment.
bool operator==(const graph_iterator &other) const
Equality.
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:778
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
~graph()
Destroys the graph.
Definition: flow_graph.h:763
internal::graph_task_priority_queue_t my_priority_queue
internal::allocate_continuation_proxy & allocate_continuation()
Returns proxy for overloaded new that allocates a continuation task of *this.
Definition: task.h:646
unsigned int node_priority_t
void prepare_task_arena(bool reinit=false)
void run(Receiver &r, Body body)
Spawns a task that runs a body and puts its output to a specific receiver.
Used to form groups of tasks.
Definition: task.h:332
uintptr_t traits() const
Returns the context's trait.
Definition: task.h:552
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:331
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
graph_iterator operator++(int)
Post-increment.
#define __TBB_override
Definition: tbb_stddef.h:240
Implements async node.
Definition: flow_graph.h:3489
tbb::task * root_task()
Returns the root task of the graph.
bool operator!=(const graph_iterator &other) const
Inequality.
bool operator()(const graph_task *left, const graph_task *right)
Base class for user-defined tasks.
Definition: task.h:589
task * execute() __TBB_override
Should be overridden by derived classes.
The graph class.
#define FLOW_SPAWN(a)
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
iterator begin()
start iterator
Definition: flow_graph.h:831
void execute_in_graph_arena(graph &g, F &f)
Executes custom functor inside graph arena.
The base of all graph nodes.
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
A lock that occupies a single byte.
Definition: spin_mutex.h:36
iterator end()
end iterator
Definition: flow_graph.h:833
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
bool __TBB_EXPORTED_METHOD is_group_execution_cancelled() const
Returns true if the context received cancellation request.
Pure virtual template classes that define interfaces for async communication.
static tbb::task *const SUCCESSFULLY_ENQUEUED
bool try_pop(reference elem)
Gets a reference to and removes highest priority element.
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
void run(Body body)
Spawns a task that runs a function object.
void add_task_to_graph_reset_list(graph &g, tbb::task *tp)
graph_iterator(const graph_iterator &other)
Copy constructor.
Tag class used to indicate the "attaching" constructor.
Definition: task_arena.h:236
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
graph_task(node_priority_t node_priority=no_priority)
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:741
Base class for tasks generated by graph nodes.
void remove_node(graph_node *n)
Definition: flow_graph.h:796
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:771
priority_task_selector(graph_task_priority_queue_t &priority_queue)
#define __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
Definition: tbb_config.h:811
tbb::concurrent_priority_queue< graph_task *, graph_task_comparator > graph_task_priority_queue_t
bool is_cancelled()
return status of graph execution
void make_critical(task &t)
Definition: task.h:957
static const node_priority_t no_priority
std::list< tbb::task * > task_list_type
graph_iterator & operator=(const graph_iterator &other)
Assignment.
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
void wait_for_all()
Wait for reference count to become one, and set reference count to zero.
Definition: task.h:789
tbb::task_group_context * my_context
graph_iterator< const graph, const graph_node > const_iterator
void register_node(graph_node *n)
Definition: flow_graph.h:785
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:839
const_iterator cend() const
end const iterator
Definition: flow_graph.h:841
graph_iterator< graph, graph_node > iterator
internal::return_type_or_void< F >::type execute(F &f)
Definition: task_arena.h:343
std::forward_iterator_tag iterator_category
void reset(reset_flags f=rf_reset_protocol)
Definition: flow_graph.h:808
void set_ref_count(int count)
Set reference count.
Definition: task.h:731
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
pointer operator->() const
Dereference.
Definition: flow_graph.h:731
run_task(Body &body, node_priority_t node_priority=no_priority)
virtual task * execute()=0
Should be overridden by derived classes.
void initialize()
Forces allocation of the resources for the task_arena as specified in constructor arguments.
Definition: task_arena.h:248

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.