Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
pipeline.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #include "tbb/pipeline.h"
22 #include "tbb/spin_mutex.h"
24 #include "itt_notify.h"
25 #include "semaphore.h"
26 #include "tls.h" // for parallel filters that do not use NULL as end_of_input
27 
28 
29 namespace tbb {
30 
31 namespace internal {
32 
34 struct task_info {
35  void* my_object;
41  bool is_valid;
43  void reset() {
44  my_object = NULL;
45  my_token = 0;
46  my_token_ready = false;
47  is_valid = false;
48  }
49 };
51 
54  friend class tbb::filter;
57  friend class tbb::pipeline;
58 
59  typedef Token size_type;
60 
63 
66 
68 
70 
72 
74 
77 
79 
80  void grow( size_type minimum_size );
81 
83 
84  static const size_type initial_buffer_size = 4;
85 
88 
90  bool is_ordered;
91 
93  bool is_bound;
94 
98  bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
99 
100  void create_sema(size_t initial_tokens) { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }
101  void free_sema() { __TBB_ASSERT(my_sem,NULL); delete my_sem; }
102  void sema_P() { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }
103  void sema_V() { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }
104 
105 public:
107  input_buffer( bool is_ordered_, bool is_bound_ ) :
108  array(NULL), my_sem(NULL), array_size(0),
109  low_token(0), high_token(0),
110  is_ordered(is_ordered_), is_bound(is_bound_),
113  __TBB_ASSERT( array, NULL );
114  if(is_bound) create_sema(0);
115  }
116 
119  __TBB_ASSERT( array, NULL );
122  if(my_sem) {
123  free_sema();
124  }
126  destroy_my_tls();
127  }
128  }
129 
131 
139  bool put_token( task_info& info_, bool force_put = false ) {
140  {
141  info_.is_valid = true;
143  Token token;
144  bool was_empty = !array[low_token&(array_size-1)].is_valid;
145  if( is_ordered ) {
146  if( !info_.my_token_ready ) {
147  info_.my_token = high_token++;
148  info_.my_token_ready = true;
149  }
150  token = info_.my_token;
151  } else
152  token = high_token++;
153  __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
154  if( token!=low_token || is_bound || force_put ) {
155  // Trying to put token that is beyond low_token.
156  // Need to wait until low_token catches up before dispatching.
157  if( token-low_token>=array_size )
158  grow( token-low_token+1 );
159  ITT_NOTIFY( sync_releasing, this );
160  array[token&(array_size-1)] = info_;
161  if(was_empty && is_bound) {
162  sema_V();
163  }
164  return true;
165  }
166  }
167  return false;
168  }
169 
171 
172  // Uses template to avoid explicit dependency on stage_task.
173  // This is only called for serial filters, and is the reason for the
174  // advance parameter in return_item (we're incrementing low_token here.)
175  // Non-TBF serial stages don't advance the token at the start because the presence
176  // of the current token in the buffer keeps another stage from being spawned.
177  template<typename StageTask>
178  void note_done( Token token, StageTask& spawner ) {
179  task_info wakee;
180  wakee.reset();
181  {
183  if( !is_ordered || token==low_token ) {
184  // Wake the next task
185  task_info& item = array[++low_token & (array_size-1)];
186  ITT_NOTIFY( sync_acquired, this );
187  wakee = item;
188  item.is_valid = false;
189  }
190  }
191  if( wakee.is_valid )
192  spawner.spawn_stage_task(wakee);
193  }
194 
195 #if __TBB_TASK_GROUP_CONTEXT
196  void clear( filter* my_filter ) {
198  long t=low_token;
199  for( size_type i=0; i<array_size; ++i, ++t ){
200  task_info& temp = array[t&(array_size-1)];
201  if (temp.is_valid ) {
202  my_filter->finalize(temp.my_object);
203  temp.is_valid = false;
204  }
205  }
206  }
207 #endif
208 
210  // is parallel (as indicated by advance == true). If the filter is serial, leave the
211  // item in the buffer to keep another stage from being spawned.
212  bool return_item(task_info& info, bool advance) {
214  task_info& item = array[low_token&(array_size-1)];
215  ITT_NOTIFY( sync_acquired, this );
216  if( item.is_valid ) {
217  info = item;
218  item.is_valid = false;
219  if (advance) low_token++;
220  return true;
221  }
222  return false;
223  }
224 
227 
228  // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
229  void create_my_tls() { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }
230  void destroy_my_tls() { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }
231  bool my_tls_end_of_input() { return end_of_input_tls.get() != 0; }
233 };
234 
235 void input_buffer::grow( size_type minimum_size ) {
236  size_type old_size = array_size;
237  size_type new_size = old_size ? 2*old_size : initial_buffer_size;
238  while( new_size<minimum_size )
239  new_size*=2;
241  task_info* old_array = array;
242  for( size_type i=0; i<new_size; ++i )
243  new_array[i].is_valid = false;
244  long t=low_token;
245  for( size_type i=0; i<old_size; ++i, ++t )
246  new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
247  array = new_array;
249  if( old_array )
250  cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
251 }
252 
253 class stage_task: public task, public task_info {
254 private:
255  friend class tbb::pipeline;
260 
261 public:
263 
267  my_at_start(true)
268  {
270  }
272  stage_task( pipeline& pipeline, filter* filter_, const task_info& info ) :
273  task_info(info),
275  my_filter(filter_),
276  my_at_start(false)
277  {}
279  void reset() {
282  my_at_start = true;
283  }
286 #if __TBB_TASK_GROUP_CONTEXT
287  ~stage_task()
288  {
290  __TBB_ASSERT(is_cancelled(), "Trying to finalize the task that wasn't cancelled");
292  my_object = NULL;
293  }
294  }
295 #endif // __TBB_TASK_GROUP_CONTEXT
296  void spawn_stage_task(const task_info& info)
298  {
299  stage_task* clone = new (allocate_additional_child_of(*parent()))
300  stage_task( my_pipeline, my_filter, info );
301  spawn(*clone);
302  }
303 };
304 
306  __TBB_ASSERT( !my_at_start || !my_object, NULL );
307  __TBB_ASSERT( !my_filter->is_bound(), NULL );
308  if( my_at_start ) {
309  if( my_filter->is_serial() ) {
310  my_object = (*my_filter)(my_object);
312  {
313  if( my_filter->is_ordered() ) {
314  my_token = my_pipeline.token_counter++; // ideally, with relaxed semantics
315  my_token_ready = true;
318  my_pipeline.token_counter++; // ideally, with relaxed semantics
319  }
320  if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
321  reset();
322  goto process_another_stage;
323  } else {
325  if( --my_pipeline.input_tokens>0 )
326  spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
327  }
328  } else {
329  my_pipeline.end_of_input = true;
330  return NULL;
331  }
332  } else /*not is_serial*/ {
334  return NULL;
338  }
340  if( --my_pipeline.input_tokens>0 )
341  spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
342  my_object = (*my_filter)(my_object);
343  if( !my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) )
344  {
345  my_pipeline.end_of_input = true;
348  my_pipeline.token_counter--; // fix token_counter
349  }
350  return NULL;
351  }
352  }
353  my_at_start = false;
354  } else {
355  my_object = (*my_filter)(my_object);
356  if( my_filter->is_serial() )
357  my_filter->my_input_buffer->note_done(my_token, *this);
358  }
360  if( my_filter ) {
361  // There is another filter to execute.
362  if( my_filter->is_serial() ) {
363  // The next filter must execute tokens in order
364  if( my_filter->my_input_buffer->put_token(*this) ){
365  // Can't proceed with the same item
366  if( my_filter->is_bound() ) {
367  // Find the next non-thread-bound filter
368  do {
370  } while( my_filter && my_filter->is_bound() );
371  // Check if there is an item ready to process
372  if( my_filter && my_filter->my_input_buffer->return_item(*this, !my_filter->is_serial()))
373  goto process_another_stage;
374  }
375  my_filter = NULL; // To prevent deleting my_object twice if exception occurs
376  return NULL;
377  }
378  }
379  } else {
380  // Reached end of the pipe.
381  size_t ntokens_avail = ++my_pipeline.input_tokens;
383  if(ntokens_avail == 1) {
385  }
386  return NULL;
387  }
388  if( ntokens_avail>1 // Only recycle if there is one available token
390  return NULL; // No need to recycle for new input
391  }
392  ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
393  // Recycle as an input stage task.
394  reset();
395  }
396 process_another_stage:
397  /* A semi-hackish way to reexecute the same task object immediately without spawning.
398  recycle_as_continuation marks the task for future execution,
399  and then 'this' pointer is returned to bypass spawning. */
401  return this;
402 }
403 
404 class pipeline_root_task: public task {
407 
411  if( my_pipeline.input_tokens > 0 ) {
413  set_ref_count(1);
414  return new( allocate_child() ) stage_task( my_pipeline );
415  }
416  if( do_segment_scanning ) {
417  filter* current_filter = my_pipeline.filter_list->next_segment;
418  /* first non-thread-bound filter that follows thread-bound one
419  and may have valid items to process */
420  filter* first_suitable_filter = current_filter;
421  while( current_filter ) {
422  __TBB_ASSERT( !current_filter->is_bound(), "filter is thread-bound?" );
423  __TBB_ASSERT( current_filter->prev_filter_in_pipeline->is_bound(), "previous filter is not thread-bound?" );
424  if( !my_pipeline.end_of_input || current_filter->has_more_work())
425  {
426  task_info info;
427  info.reset();
428  task* bypass = NULL;
429  int refcnt = 0;
430  task_list list;
431  // No new tokens are created; it's OK to process all waiting tokens.
432  // If the filter is serial, the second call to return_item will return false.
433  while( current_filter->my_input_buffer->return_item(info, !current_filter->is_serial()) ) {
434  task* t = new( allocate_child() ) stage_task( my_pipeline, current_filter, info );
435  if( ++refcnt == 1 )
436  bypass = t;
437  else // there's more than one task
438  list.push_back(*t);
439  // TODO: limit the list size (to arena size?) to spawn tasks sooner
440  __TBB_ASSERT( refcnt <= int(my_pipeline.token_counter), "token counting error" );
441  info.reset();
442  }
443  if( refcnt ) {
444  set_ref_count( refcnt );
445  if( refcnt > 1 )
446  spawn(list);
448  return bypass;
449  }
450  current_filter = current_filter->next_segment;
451  if( !current_filter ) {
452  if( !my_pipeline.end_of_input ) {
454  return this;
455  }
456  current_filter = first_suitable_filter;
457  __TBB_Yield();
458  }
459  } else {
460  /* The preceding pipeline segment is empty.
461  Fast-forward to the next post-TBF segment. */
462  first_suitable_filter = first_suitable_filter->next_segment;
463  current_filter = first_suitable_filter;
464  }
465  } /* while( current_filter ) */
466  return NULL;
467  } else {
468  if( !my_pipeline.end_of_input ) {
470  return this;
471  }
472  return NULL;
473  }
474  }
475 public:
477  {
480  if( (first->my_filter_mode & first->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
481  // Scanning the pipeline for segments
482  filter* head_of_previous_segment = first;
483  for( filter* subfilter=first->next_filter_in_pipeline;
484  subfilter!=NULL;
485  subfilter=subfilter->next_filter_in_pipeline )
486  {
487  if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
488  do_segment_scanning = true;
489  head_of_previous_segment->next_segment = subfilter;
490  head_of_previous_segment = subfilter;
491  }
492  }
493  }
494  }
495 };
496 
497 #if _MSC_VER && !defined(__INTEL_COMPILER)
498  // Workaround for overzealous compiler warnings
499  // Suppress compiler warning about constant conditional expression
500  #pragma warning (disable: 4127)
501 #endif
502 
503 // The class destroys end_counter and clears all input buffers if pipeline was cancelled.
504 class pipeline_cleaner: internal::no_copy {
506 public:
508  my_pipeline(_pipeline)
509  {}
511 #if __TBB_TASK_GROUP_CONTEXT
512  if (my_pipeline.end_counter->is_cancelled()) // Pipeline was cancelled
514 #endif
515  my_pipeline.end_counter = NULL;
516  }
517 };
518 
519 } // namespace internal
520 
522  __TBB_ASSERT(false,"illegal call to inject_token");
523 }
524 
525 #if __TBB_TASK_GROUP_CONTEXT
527  for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
528  if ((f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4))
529  if( internal::input_buffer* b = f->my_input_buffer )
530  b->clear(f);
531  }
532 }
533 #endif
534 
536  filter_list(NULL),
537  filter_end(NULL),
538  end_counter(NULL),
539  end_of_input(false),
540  has_thread_bound_filters(false)
541 {
542  token_counter = 0;
543  input_tokens = 0;
544 }
545 
547  clear();
548 }
549 
551  filter* next;
552  for( filter* f = filter_list; f; f=next ) {
553  if( internal::input_buffer* b = f->my_input_buffer ) {
554  delete b;
555  f->my_input_buffer = NULL;
556  }
557  next=f->next_filter_in_pipeline;
559  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
560  f->prev_filter_in_pipeline = filter::not_in_pipeline();
561  f->my_pipeline = NULL;
562  }
563  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
564  f->next_segment = NULL;
565  }
566  filter_list = filter_end = NULL;
567 }
568 
569 void pipeline::add_filter( filter& filter_ ) {
570 #if TBB_USE_ASSERT
572  __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
573  __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
574  __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
575 #endif
577  filter_.my_pipeline = this;
579  if ( filter_list == NULL)
580  filter_list = &filter_;
581  else
583  filter_.next_filter_in_pipeline = NULL;
584  filter_end = &filter_;
585  } else {
586  if( !filter_end )
587  filter_end = reinterpret_cast<filter*>(&filter_list);
588 
589  *reinterpret_cast<filter**>(filter_end) = &filter_;
590  filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
591  *reinterpret_cast<filter**>(filter_end) = NULL;
592  }
593  if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
594  if( filter_.is_serial() ) {
595  if( filter_.is_bound() )
597  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
598  } else {
599  if(filter_.prev_filter_in_pipeline) {
600  if(filter_.prev_filter_in_pipeline->is_bound()) {
601  // successors to bound filters must have an input_buffer
602  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
603  }
604  } else { // input filter
605  if(filter_.object_may_be_null() ) {
606  //TODO: buffer only needed to hold TLS; could improve
607  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
608  filter_.my_input_buffer->create_my_tls();
609  }
610  }
611  }
612  } else {
613  if( filter_.is_serial() ) {
614  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
615  }
616  }
617 
618 }
619 
620 void pipeline::remove_filter( filter& filter_ ) {
621  __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
622  __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
623  __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
624  if (&filter_ == filter_list)
626  else {
627  __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
629  }
630  if (&filter_ == filter_end)
632  else {
633  __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
635  }
636  if( internal::input_buffer* b = filter_.my_input_buffer ) {
637  delete b;
638  filter_.my_input_buffer = NULL;
639  }
642  filter_.next_segment = NULL;
643  filter_.my_pipeline = NULL;
644 }
645 
646 void pipeline::run( size_t max_number_of_live_tokens
648  , tbb::task_group_context& context
649 #endif
650  ) {
651  __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
652  __TBB_ASSERT( !end_counter, "pipeline already running?" );
653  if( filter_list ) {
654  internal::pipeline_cleaner my_pipeline_cleaner(*this);
655  end_of_input = false;
656  input_tokens = internal::Token(max_number_of_live_tokens);
658  // release input filter if thread-bound
659  if(filter_list->is_bound()) {
660  filter_list->my_input_buffer->sema_V();
661  }
662  }
663 #if __TBB_TASK_GROUP_CONTEXT
665 #else
667 #endif
668  // Start execution of tasks
670 
673  if(f->is_bound()) {
674  f->my_input_buffer->sema_V(); // wake to end
675  }
676  }
677  }
678  }
679 }
680 
681 #if __TBB_TASK_GROUP_CONTEXT
682 void pipeline::run( size_t max_number_of_live_tokens ) {
683  if( filter_list ) {
684  // Construct task group context with the exception propagation mode expected
685  // by the pipeline caller.
689  task_group_context context(task_group_context::bound, ctx_traits);
690  run(max_number_of_live_tokens, context);
691  }
692 }
693 #endif // __TBB_TASK_GROUP_CONTEXT
694 
696  __TBB_ASSERT(my_pipeline, NULL);
697  __TBB_ASSERT(my_input_buffer, "has_more_work() called for filter with no input buffer");
698  return (internal::tokendiff_t)(my_pipeline->token_counter - my_input_buffer->low_token) != 0;
699 }
700 
704  my_pipeline->remove_filter(*this);
705  else
706  __TBB_ASSERT( prev_filter_in_pipeline == filter::not_in_pipeline(), "probably filter list is broken" );
707  } else {
708  __TBB_ASSERT( next_filter_in_pipeline==filter::not_in_pipeline(), "cannot destroy filter that is part of pipeline" );
709  }
710 }
711 
715  if(is_serial()) {
716  my_pipeline->end_of_input = true;
717  } else {
718  __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
719  my_input_buffer->set_my_tls_end_of_input();
720  }
721 }
722 
724  return internal_process_item(true);
725 }
726 
728  return internal_process_item(false);
729 }
730 
732  __TBB_ASSERT(my_pipeline != NULL,"It's not supposed that process_item is called for a filter that is not in a pipeline.");
733  internal::task_info info;
734  info.reset();
735 
737  return end_of_stream;
738 
739  if( !prev_filter_in_pipeline ) {
741  return end_of_stream;
742  while( my_pipeline->input_tokens == 0 ) {
743  if( !is_blocking )
744  return item_not_available;
745  my_input_buffer->sema_P();
746  }
747  info.my_object = (*this)(info.my_object);
748  if( info.my_object ) {
749  __TBB_ASSERT(my_pipeline->input_tokens > 0, "Token failed in thread-bound filter");
751  if( is_ordered() ) {
752  info.my_token = my_pipeline->token_counter;
753  info.my_token_ready = true;
754  }
755  my_pipeline->token_counter++; // ideally, with relaxed semantics
756  } else {
757  my_pipeline->end_of_input = true;
758  return end_of_stream;
759  }
760  } else { /* this is not an input filter */
761  while( !my_input_buffer->has_item() ) {
762  if( !is_blocking ) {
763  return item_not_available;
764  }
765  my_input_buffer->sema_P();
766  if( my_pipeline->end_of_input && !has_more_work() ) {
767  return end_of_stream;
768  }
769  }
770  if( !my_input_buffer->return_item(info, /*advance*/true) ) {
771  __TBB_ASSERT(false,"return_item failed");
772  }
773  info.my_object = (*this)(info.my_object);
774  }
776  if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,/*force_put=*/true) ) {
777  __TBB_ASSERT(false, "Couldn't put token after thread-bound buffer");
778  }
779  } else {
780  size_t ntokens_avail = ++(my_pipeline->input_tokens);
781  if( my_pipeline->filter_list->is_bound() ) {
782  if( ntokens_avail == 1 ) {
784  }
785  }
786  }
787 
788  return success;
789 }
790 
791 } // tbb
792 
__TBB_EXPORTED_METHOD pipeline()
Construct empty pipeline.
Definition: pipeline.cpp:535
unsigned long Token
Definition: pipeline.h:44
bool has_more_work()
has the filter not yet processed all the tokens it will ever see?
Definition: pipeline.cpp:695
#define __TBB_override
Definition: tbb_stddef.h:244
internal::allocate_child_proxy & allocate_child()
Returns proxy for overloaded new that allocates a child task of *this.
Definition: task.h:654
A processing pipeline that applies filters to items.
Definition: pipeline.h:236
bool is_serial() const
True if filter is serial.
Definition: pipeline.h:129
task * end_counter
task who's reference count is used to determine when all stages are done.
Definition: pipeline.h:274
virtual __TBB_EXPORTED_METHOD ~filter()
Destroy filter.
Definition: pipeline.cpp:701
pipeline_cleaner(pipeline &_pipeline)
Definition: pipeline.cpp:507
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:73
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
result_type __TBB_EXPORTED_METHOD try_process_item()
If a data item is available, invoke operator() on that item.
Definition: pipeline.cpp:727
bool is_valid
True if my_object is valid.
Definition: pipeline.cpp:41
virtual __TBB_EXPORTED_METHOD ~pipeline()
Definition: pipeline.cpp:546
A buffer of input items for a filter.
Definition: pipeline.cpp:52
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:68
basic_tls< intptr_t > end_of_input_tls_t
for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input
Definition: pipeline.cpp:96
const unsigned char my_filter_mode
Storage for filter mode and dynamically checked implementation version.
Definition: pipeline.h:182
auto first(Container &c) -> decltype(begin(c))
Used to form groups of tasks.
Definition: task.h:335
A stage in a pipeline served by a user thread.
Definition: pipeline.h:197
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:160
bool my_at_start
True if this task has not yet read the input.
Definition: pipeline.cpp:259
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t new_size
static void spawn_root_and_wait(task &root)
Spawn task allocated by allocate_root, wait for it to complete, and deallocate it.
Definition: task.h:781
filter * next_segment
Pointer to the next "segment" of filters, or NULL if not required.
Definition: pipeline.h:192
void P()
wait/acquire
Definition: semaphore.h:109
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void __TBB_EXPORTED_METHOD run(size_t max_number_of_live_tokens)
Run the pipeline to completion.
Definition: pipeline.cpp:646
bool is_ordered
True for ordered filter, false otherwise.
Definition: pipeline.cpp:90
void reset()
Roughly equivalent to the constructor of input stage task.
Definition: pipeline.cpp:279
A list of children.
Definition: task.h:990
#define __TBB_Yield()
Definition: ibm_aix51.h:48
Base class for user-defined tasks.
Definition: task.h:592
void reset()
Set to initial state (no object, no token)
Definition: pipeline.cpp:43
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
pointer allocate(size_type n, const void *hint=0)
Allocate space for n objects, starting on a cache/sector line.
~input_buffer()
Destroy the buffer.
Definition: pipeline.cpp:118
void remove_filter(filter &filter_)
Remove filter from pipeline.
Definition: pipeline.cpp:620
void deallocate(pointer p, size_type)
Free block of memory that starts on a cache line.
void clear_filters()
Does clean up if pipeline is cancelled or exception occurred.
size_type array_size
Size of array.
Definition: pipeline.cpp:69
task * execute() __TBB_override
Should be overridden by derived classes.
Definition: pipeline.cpp:408
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:271
A lock that occupies a single byte.
Definition: spin_mutex.h:40
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:286
bool return_item(task_info &info, bool advance)
return an item, invalidate the queued item, but only advance if the filter
Definition: pipeline.cpp:212
bool is_ordered() const
True if filter must receive stream in order.
Definition: pipeline.h:134
task * parent() const
task on whose behalf this task is working, or NULL if this is a root.
Definition: task.h:830
Token my_token
Invalid unless a task went through an ordered stage.
Definition: pipeline.cpp:37
bool has_item()
true if the current low_token is valid.
Definition: pipeline.cpp:226
void __TBB_EXPORTED_METHOD add_filter(filter &filter_)
Add filter to end of pipeline.
Definition: pipeline.cpp:569
result_type internal_process_item(bool is_blocking)
Internal routine for item processing.
Definition: pipeline.cpp:731
bool put_token(task_info &info_, bool force_put=false)
Put a token into the buffer.
Definition: pipeline.cpp:139
bool object_may_be_null()
true if an input filter can emit null
Definition: pipeline.h:144
void set(T value)
Definition: tls.h:60
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
Definition: pipeline.h:185
result_type __TBB_EXPORTED_METHOD process_item()
Wait until a data item becomes available, and invoke operator() on that item.
Definition: pipeline.cpp:723
stage_task(pipeline &pipeline, filter *filter_, const task_info &info)
Construct stage_task for a subsequent stage in a pipeline.
Definition: pipeline.cpp:272
pipeline * my_pipeline
Pointer to the pipeline.
Definition: pipeline.h:188
bool my_token_ready
False until my_token is set.
Definition: pipeline.cpp:39
static const unsigned char exact_exception_propagation
7th bit defines exception propagation mode expected by the application.
Definition: pipeline.h:85
The graph class.
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:42
atomic< internal::Token > input_tokens
Number of idle tokens waiting for input stage.
Definition: pipeline.h:277
static const size_type initial_buffer_size
Initial size for "array".
Definition: pipeline.cpp:84
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
friend class internal::pipeline_cleaner
Definition: pipeline.h:264
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:65
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:62
spin_mutex array_mutex
Serializes updates.
Definition: pipeline.cpp:76
void push_back(task &task)
Push task onto back of list.
Definition: task.h:1007
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
void note_done(Token token, StageTask &spawner)
Note that processing of a token is finished.
Definition: pipeline.cpp:178
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:309
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:120
This structure is used to store task information in a input buffer.
Definition: pipeline.cpp:34
void __TBB_EXPORTED_METHOD inject_token(task &self)
Not used, but retained to satisfy old export files.
Definition: pipeline.cpp:521
pipeline_root_task(pipeline &pipeline)
Definition: pipeline.cpp:476
virtual void finalize(void *)
Destroys item if pipeline was cancelled.
Definition: pipeline.h:160
atomic< internal::Token > token_counter
Global counter of tokens.
Definition: pipeline.h:280
end_of_input_tls_t end_of_input_tls
Definition: pipeline.cpp:97
bool is_cancelled() const
Returns true if the context has received cancellation request.
Definition: task.h:913
void set_ref_count(int count)
Set reference count.
Definition: task.h:734
stage_task(pipeline &pipeline)
Construct stage_task for first stage in a pipeline.
Definition: pipeline.cpp:264
void grow(size_type minimum_size)
Resize "array".
Definition: pipeline.cpp:235
A stage in a pipeline.
Definition: pipeline.h:65
bool is_bound
True for thread-bound filter, false otherwise.
Definition: pipeline.cpp:93
void recycle_as_continuation()
Change this to be a continuation of its former self.
Definition: task.h:684
task * execute() __TBB_override
The virtual task execution method.
Definition: pipeline.cpp:305
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:636
bool is_bound() const
True if filter is thread-bound.
Definition: pipeline.h:139
void __TBB_EXPORTED_METHOD clear()
Remove all filters from the pipeline.
Definition: pipeline.cpp:550
void __TBB_EXPORTED_FUNC handle_perror(int error_code, const char *aux_info)
Throws std::runtime_error with what() returning error_code description prefixed with aux_info.
Definition: tbb_misc.cpp:78
void __TBB_EXPORTED_METHOD set_end_of_input()
Definition: pipeline.cpp:712
long tokendiff_t
Definition: pipeline.h:45
void V()
post/release
Definition: semaphore.h:114
Token high_token
Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assig...
Definition: pipeline.cpp:87
static const unsigned char version_mask
Definition: pipeline.h:93
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
Definition: pipeline.h:174
void create_sema(size_t initial_tokens)
Definition: pipeline.cpp:100
void spawn_stage_task(const task_info &info)
Creates and spawns stage_task from task_info.
Definition: pipeline.cpp:297
input_buffer(bool is_ordered_, bool is_bound_)
Construct empty buffer.
Definition: pipeline.cpp:107
#define __TBB_TASK_GROUP_CONTEXT
Definition: tbb_config.h:546
friend class internal::pipeline_root_task
Definition: pipeline.h:261
Edsger Dijkstra's counting semaphore.
Definition: semaphore.h:98
bool end_of_input
False until fetch_input returns NULL.
Definition: pipeline.h:283

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.