Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_cache_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB__flow_graph_cache_impl_H
22 #define __TBB__flow_graph_cache_impl_H
23 
24 #ifndef __TBB_flow_graph_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
28 // included in namespace tbb::flow::interfaceX (in flow_graph.h)
29 
30 namespace internal {
31 
33 template< typename T, typename M=spin_mutex >
34 class node_cache {
35  public:
36 
37  typedef size_t size_type;
38 
39  bool empty() {
40  typename mutex_type::scoped_lock lock( my_mutex );
41  return internal_empty();
42  }
43 
44  void add( T &n ) {
45  typename mutex_type::scoped_lock lock( my_mutex );
46  internal_push(n);
47  }
48 
49  void remove( T &n ) {
50  typename mutex_type::scoped_lock lock( my_mutex );
51  for ( size_t i = internal_size(); i != 0; --i ) {
52  T &s = internal_pop();
53  if ( &s == &n ) return; // only remove one predecessor per request
55  }
56  }
57 
58  void clear() {
59  while( !my_q.empty()) (void)my_q.pop();
60 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
61  my_built_predecessors.clear();
62 #endif
63  }
64 
65 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
66  typedef edge_container<T> built_predecessors_type;
67  built_predecessors_type &built_predecessors() { return my_built_predecessors; }
68 
69  typedef typename edge_container<T>::edge_list_type predecessor_list_type;
70  void internal_add_built_predecessor( T &n ) {
71  typename mutex_type::scoped_lock lock( my_mutex );
72  my_built_predecessors.add_edge(n);
73  }
74 
75  void internal_delete_built_predecessor( T &n ) {
76  typename mutex_type::scoped_lock lock( my_mutex );
77  my_built_predecessors.delete_edge(n);
78  }
79 
80  void copy_predecessors( predecessor_list_type &v) {
81  typename mutex_type::scoped_lock lock( my_mutex );
82  my_built_predecessors.copy_edges(v);
83  }
84 
85  size_t predecessor_count() {
86  typename mutex_type::scoped_lock lock(my_mutex);
87  return (size_t)(my_built_predecessors.edge_count());
88  }
89 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
90 
91 protected:
92 
93  typedef M mutex_type;
95  std::queue< T * > my_q;
96 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
97  built_predecessors_type my_built_predecessors;
98 #endif
99 
100  // Assumes lock is held
101  inline bool internal_empty( ) {
102  return my_q.empty();
103  }
104 
105  // Assumes lock is held
107  return my_q.size();
108  }
109 
110  // Assumes lock is held
111  inline void internal_push( T &n ) {
112  my_q.push(&n);
113  }
114 
115  // Assumes lock is held
116  inline T &internal_pop() {
117  T *v = my_q.front();
118  my_q.pop();
119  return *v;
120  }
121 
122 };
123 
125 template< typename T, typename M=spin_mutex >
126 #if __TBB_PREVIEW_ASYNC_MSG
127 // TODO: make predecessor_cache type T-independent when async_msg becomes regular feature
128 class predecessor_cache : public node_cache< untyped_sender, M > {
129 #else
130 class predecessor_cache : public node_cache< sender<T>, M > {
131 #endif // __TBB_PREVIEW_ASYNC_MSG
132 public:
133  typedef M mutex_type;
134  typedef T output_type;
135 #if __TBB_PREVIEW_ASYNC_MSG
136  typedef untyped_sender predecessor_type;
137  typedef untyped_receiver successor_type;
138 #else
139  typedef sender<output_type> predecessor_type;
140  typedef receiver<output_type> successor_type;
141 #endif // __TBB_PREVIEW_ASYNC_MSG
142 
143  predecessor_cache( ) : my_owner( NULL ) { }
144 
145  void set_owner( successor_type *owner ) { my_owner = owner; }
146 
147  bool get_item( output_type &v ) {
148 
149  bool msg = false;
150 
151  do {
152  predecessor_type *src;
153  {
154  typename mutex_type::scoped_lock lock(this->my_mutex);
155  if ( this->internal_empty() ) {
156  break;
157  }
158  src = &this->internal_pop();
159  }
160 
161  // Try to get from this sender
162  msg = src->try_get( v );
163 
164  if (msg == false) {
165  // Relinquish ownership of the edge
166  if (my_owner)
167  src->register_successor( *my_owner );
168  } else {
169  // Retain ownership of the edge
170  this->add(*src);
171  }
172  } while ( msg == false );
173  return msg;
174  }
175 
176  // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
177  void reset() {
178  if (my_owner) {
179  for(;;) {
180  predecessor_type *src;
181  {
182  if (this->internal_empty()) break;
183  src = &this->internal_pop();
184  }
185  src->register_successor( *my_owner );
186  }
187  }
188  }
189 
190 protected:
191 
192 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
194 #endif
196 };
197 
199 // TODO: make reservable_predecessor_cache type T-independent when async_msg becomes regular feature
200 template< typename T, typename M=spin_mutex >
202 public:
203  typedef M mutex_type;
204  typedef T output_type;
205 #if __TBB_PREVIEW_ASYNC_MSG
206  typedef untyped_sender predecessor_type;
207  typedef untyped_receiver successor_type;
208 #else
209  typedef sender<T> predecessor_type;
210  typedef receiver<T> successor_type;
211 #endif // __TBB_PREVIEW_ASYNC_MSG
212 
213  reservable_predecessor_cache( ) : reserved_src(NULL) { }
214 
215  bool
217  bool msg = false;
218 
219  do {
220  {
221  typename mutex_type::scoped_lock lock(this->my_mutex);
222  if ( reserved_src || this->internal_empty() )
223  return false;
224 
225  reserved_src = &this->internal_pop();
226  }
227 
228  // Try to get from this sender
229  msg = reserved_src->try_reserve( v );
230 
231  if (msg == false) {
232  typename mutex_type::scoped_lock lock(this->my_mutex);
233  // Relinquish ownership of the edge
234  reserved_src->register_successor( *this->my_owner );
235  reserved_src = NULL;
236  } else {
237  // Retain ownership of the edge
238  this->add( *reserved_src );
239  }
240  } while ( msg == false );
241 
242  return msg;
243  }
244 
245  bool
247  reserved_src->try_release( );
248  reserved_src = NULL;
249  return true;
250  }
251 
252  bool
254  reserved_src->try_consume( );
255  reserved_src = NULL;
256  return true;
257  }
258 
259  void reset( ) {
260  reserved_src = NULL;
262  }
263 
264  void clear() {
265  reserved_src = NULL;
267  }
268 
269 private:
271 };
272 
273 
275 // TODO: make successor_cache type T-independent when async_msg becomes regular feature
276 template<typename T, typename M=spin_rw_mutex >
278 protected:
279 
280  typedef M mutex_type;
282 
283 #if __TBB_PREVIEW_ASYNC_MSG
284  typedef untyped_receiver successor_type;
285  typedef untyped_receiver *pointer_type;
286  typedef untyped_sender owner_type;
287 #else
288  typedef receiver<T> successor_type;
289  typedef receiver<T> *pointer_type;
290  typedef sender<T> owner_type;
291 #endif // __TBB_PREVIEW_ASYNC_MSG
292  typedef std::list< pointer_type > successors_type;
293 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
294  edge_container<successor_type> my_built_successors;
295 #endif
297 
299 
300 public:
301 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
302  typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
303 
304  edge_container<successor_type> &built_successors() { return my_built_successors; }
305 
306  void internal_add_built_successor( successor_type &r) {
307  typename mutex_type::scoped_lock l(my_mutex, true);
308  my_built_successors.add_edge( r );
309  }
310 
311  void internal_delete_built_successor( successor_type &r) {
312  typename mutex_type::scoped_lock l(my_mutex, true);
313  my_built_successors.delete_edge(r);
314  }
315 
316  void copy_successors( successor_list_type &v) {
317  typename mutex_type::scoped_lock l(my_mutex, false);
318  my_built_successors.copy_edges(v);
319  }
320 
321  size_t successor_count() {
322  typename mutex_type::scoped_lock l(my_mutex,false);
323  return my_built_successors.edge_count();
324  }
325 
326 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
327 
328  successor_cache( ) : my_owner(NULL) {}
329 
330  void set_owner( owner_type *owner ) { my_owner = owner; }
331 
332  virtual ~successor_cache() {}
333 
335  typename mutex_type::scoped_lock l(my_mutex, true);
336  my_successors.push_back( &r );
337  }
338 
340  typename mutex_type::scoped_lock l(my_mutex, true);
341  for ( typename successors_type::iterator i = my_successors.begin();
342  i != my_successors.end(); ++i ) {
343  if ( *i == & r ) {
344  my_successors.erase(i);
345  break;
346  }
347  }
348  }
349 
350  bool empty() {
351  typename mutex_type::scoped_lock l(my_mutex, false);
352  return my_successors.empty();
353  }
354 
355  void clear() {
356  my_successors.clear();
357 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
358  my_built_successors.clear();
359 #endif
360  }
361 
362 #if !__TBB_PREVIEW_ASYNC_MSG
363  virtual task * try_put_task( const T &t ) = 0;
364 #endif // __TBB_PREVIEW_ASYNC_MSG
365  }; // successor_cache<T>
366 
368 template<>
369 class successor_cache< continue_msg > : tbb::internal::no_copy {
370 protected:
371 
374 
375 #if __TBB_PREVIEW_ASYNC_MSG
376  typedef untyped_receiver successor_type;
377  typedef untyped_receiver *pointer_type;
378 #else
379  typedef receiver<continue_msg> successor_type;
380  typedef receiver<continue_msg> *pointer_type;
381 #endif // __TBB_PREVIEW_ASYNC_MSG
382  typedef std::list< pointer_type > successors_type;
384 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
385  edge_container<successor_type> my_built_successors;
386  typedef edge_container<successor_type>::edge_list_type successor_list_type;
387 #endif
388 
389  sender<continue_msg> *my_owner;
390 
391 public:
392 
393 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
394 
395  edge_container<successor_type> &built_successors() { return my_built_successors; }
396 
397  void internal_add_built_successor( successor_type &r) {
398  mutex_type::scoped_lock l(my_mutex, true);
399  my_built_successors.add_edge( r );
400  }
401 
402  void internal_delete_built_successor( successor_type &r) {
403  mutex_type::scoped_lock l(my_mutex, true);
404  my_built_successors.delete_edge(r);
405  }
406 
407  void copy_successors( successor_list_type &v) {
408  mutex_type::scoped_lock l(my_mutex, false);
409  my_built_successors.copy_edges(v);
410  }
411 
412  size_t successor_count() {
413  mutex_type::scoped_lock l(my_mutex,false);
414  return my_built_successors.edge_count();
415  }
416 
417 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
418 
419  successor_cache( ) : my_owner(NULL) {}
420 
421  void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
422 
423  virtual ~successor_cache() {}
424 
426  mutex_type::scoped_lock l(my_mutex, true);
427  my_successors.push_back( &r );
428  if ( my_owner && r.is_continue_receiver() ) {
429  r.register_predecessor( *my_owner );
430  }
431  }
432 
434  mutex_type::scoped_lock l(my_mutex, true);
435  for ( successors_type::iterator i = my_successors.begin();
436  i != my_successors.end(); ++i ) {
437  if ( *i == & r ) {
438  // TODO: Check if we need to test for continue_receiver before
439  // removing from r.
440  if ( my_owner )
441  r.remove_predecessor( *my_owner );
442  my_successors.erase(i);
443  break;
444  }
445  }
446  }
447 
448  bool empty() {
449  mutex_type::scoped_lock l(my_mutex, false);
450  return my_successors.empty();
451  }
452 
453  void clear() {
454  my_successors.clear();
455 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
456  my_built_successors.clear();
457 #endif
458  }
459 
460 #if !__TBB_PREVIEW_ASYNC_MSG
461  virtual task * try_put_task( const continue_msg &t ) = 0;
462 #endif // __TBB_PREVIEW_ASYNC_MSG
463 
464 }; // successor_cache< continue_msg >
465 
467 // TODO: make broadcast_cache type T-independent when async_msg becomes regular feature
468 template<typename T, typename M=spin_rw_mutex>
469 class broadcast_cache : public successor_cache<T, M> {
470  typedef M mutex_type;
472 
473 public:
474 
476 
477  // as above, but call try_put_task instead, and return the last task we received (if any)
478 #if __TBB_PREVIEW_ASYNC_MSG
479  template<typename X>
480  task * try_put_task( const X &t ) {
481 #else
482  task * try_put_task( const T &t ) __TBB_override {
483 #endif // __TBB_PREVIEW_ASYNC_MSG
484  task * last_task = NULL;
485  bool upgraded = true;
486  typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
487  typename successors_type::iterator i = this->my_successors.begin();
488  while ( i != this->my_successors.end() ) {
489  task *new_task = (*i)->try_put_task(t);
490  // workaround for icc bug
491  graph& graph_ref = (*i)->graph_reference();
492  last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
493  if(new_task) {
494  ++i;
495  }
496  else { // failed
497  if ( (*i)->register_predecessor(*this->my_owner) ) {
498  if (!upgraded) {
499  l.upgrade_to_writer();
500  upgraded = true;
501  }
502  i = this->my_successors.erase(i);
503  } else {
504  ++i;
505  }
506  }
507  }
508  return last_task;
509  }
510 
511 };
512 
514 // TODO: make round_robin_cache type T-independent when async_msg becomes regular feature
515 template<typename T, typename M=spin_rw_mutex >
516 class round_robin_cache : public successor_cache<T, M> {
517  typedef size_t size_type;
518  typedef M mutex_type;
520 
521 public:
522 
524 
526  typename mutex_type::scoped_lock l(this->my_mutex, false);
527  return this->my_successors.size();
528  }
529 
530 #if __TBB_PREVIEW_ASYNC_MSG
531  template<typename X>
532  task * try_put_task( const X &t ) {
533 #else
535 #endif // __TBB_PREVIEW_ASYNC_MSG
536  bool upgraded = true;
537  typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
538  typename successors_type::iterator i = this->my_successors.begin();
539  while ( i != this->my_successors.end() ) {
540  task *new_task = (*i)->try_put_task(t);
541  if ( new_task ) {
542  return new_task;
543  } else {
544  if ( (*i)->register_predecessor(*this->my_owner) ) {
545  if (!upgraded) {
546  l.upgrade_to_writer();
547  upgraded = true;
548  }
549  i = this->my_successors.erase(i);
550  }
551  else {
552  ++i;
553  }
554  }
555  }
556  return NULL;
557  }
558 };
559 
560 } // namespace internal
561 
562 #endif // __TBB__flow_graph_cache_impl_H
#define __TBB_override
Definition: tbb_stddef.h:244
A cache of predecessors that only supports try_get.
void set_owner(successor_type *owner)
void set_owner(sender< continue_msg > *owner)
A cache of successors that are put in a round-robin fashion.
task * try_put_task(const T &t) __TBB_override
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void register_successor(successor_type &r)
successor_cache< T, M >::successors_type successors_type
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:171
sender< output_type > predecessor_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
std::list< pointer_type > successors_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void set_owner(owner_type *owner)
spin_rw_mutex_v3 spin_rw_mutex
Definition: spin_rw_mutex.h:37
An abstract cache of successors.
A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock.
void const char const char int ITT_FORMAT __itt_group_sync s
void remove_successor(successor_type &r)
A cache of successors that are broadcast to.
successor_cache< T, M >::successors_type successors_type
receiver< output_type > successor_type
An cache of predecessors that supports requests and reservations.
task * try_put_task(const T &t) __TBB_override

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.