Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
tbb::pipeline Class Reference

A processing pipeline that applies filters to items. More...

#include <pipeline.h>

Collaboration diagram for tbb::pipeline:

Public Member Functions

__TBB_EXPORTED_METHOD pipeline ()
 Construct empty pipeline. More...
 
virtual __TBB_EXPORTED_METHOD ~pipeline ()
 
void __TBB_EXPORTED_METHOD add_filter (filter &filter_)
 Add filter to end of pipeline. More...
 
void __TBB_EXPORTED_METHOD run (size_t max_number_of_live_tokens)
 Run the pipeline to completion. More...
 
void __TBB_EXPORTED_METHOD run (size_t max_number_of_live_tokens, tbb::task_group_context &context)
 Run the pipeline to completion with user-supplied context. More...
 
void __TBB_EXPORTED_METHOD clear ()
 Remove all filters from the pipeline. More...
 

Private Member Functions

void remove_filter (filter &filter_)
 Remove filter from pipeline. More...
 
void __TBB_EXPORTED_METHOD inject_token (task &self)
 Not used, but retained to satisfy old export files. More...
 
void clear_filters ()
 Does clean up if pipeline is cancelled or exception occurred. More...
 

Private Attributes

filterfilter_list
 Pointer to first filter in the pipeline. More...
 
filterfilter_end
 Pointer to location where address of next filter to be added should be stored. More...
 
taskend_counter
 task who's reference count is used to determine when all stages are done. More...
 
atomic< internal::Tokeninput_tokens
 Number of idle tokens waiting for input stage. More...
 
atomic< internal::Tokentoken_counter
 Global counter of tokens. More...
 
bool end_of_input
 False until fetch_input returns NULL. More...
 
bool has_thread_bound_filters
 True if the pipeline contains a thread-bound filter; false otherwise. More...
 

Friends

class internal::stage_task
 
class internal::pipeline_root_task
 
class filter
 
class thread_bound_filter
 
class internal::pipeline_cleaner
 
class tbb::interface6::internal::pipeline_proxy
 

Detailed Description

A processing pipeline that applies filters to items.

Definition at line 232 of file pipeline.h.

Constructor & Destructor Documentation

◆ pipeline()

tbb::pipeline::pipeline ( )

Construct empty pipeline.

Definition at line 531 of file pipeline.cpp.

531  :
532  filter_list(NULL),
533  filter_end(NULL),
534  end_counter(NULL),
535  end_of_input(false),
537 {
538  token_counter = 0;
539  input_tokens = 0;
540 }
task * end_counter
task who's reference count is used to determine when all stages are done.
Definition: pipeline.h:270
bool end_of_input
False until fetch_input returns NULL.
Definition: pipeline.h:279
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:267
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:282
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:264
atomic< internal::Token > token_counter
Global counter of tokens.
Definition: pipeline.h:276
atomic< internal::Token > input_tokens
Number of idle tokens waiting for input stage.
Definition: pipeline.h:273

References input_tokens, and token_counter.

◆ ~pipeline()

tbb::pipeline::~pipeline ( )
virtual

Though the current implementation declares the destructor virtual, do not rely on this detail. The virtualness is deprecated and may disappear in future versions of TBB.

Definition at line 542 of file pipeline.cpp.

542  {
543  clear();
544 }
void __TBB_EXPORTED_METHOD clear()
Remove all filters from the pipeline.
Definition: pipeline.cpp:546

References clear().

Here is the call graph for this function:

Member Function Documentation

◆ add_filter()

void tbb::pipeline::add_filter ( filter filter_)

Add filter to end of pipeline.

Definition at line 565 of file pipeline.cpp.

565  {
566 #if TBB_USE_ASSERT
567  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) )
568  __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
569  __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
570  __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
571 #endif
572  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
573  filter_.my_pipeline = this;
574  filter_.prev_filter_in_pipeline = filter_end;
575  if ( filter_list == NULL)
576  filter_list = &filter_;
577  else
579  filter_.next_filter_in_pipeline = NULL;
580  filter_end = &filter_;
581  } else {
582  if( !filter_end )
583  filter_end = reinterpret_cast<filter*>(&filter_list);
584 
585  *reinterpret_cast<filter**>(filter_end) = &filter_;
586  filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
587  *reinterpret_cast<filter**>(filter_end) = NULL;
588  }
589  if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
590  if( filter_.is_serial() ) {
591  if( filter_.is_bound() )
593  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
594  } else {
595  if(filter_.prev_filter_in_pipeline) {
596  if(filter_.prev_filter_in_pipeline->is_bound()) {
597  // successors to bound filters must have an input_buffer
598  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
599  }
600  } else { // input filter
601  if(filter_.object_may_be_null() ) {
602  //TODO: buffer only needed to hold TLS; could improve
603  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
604  filter_.my_input_buffer->create_my_tls();
605  }
606  }
607  }
608  } else {
609  if( filter_.is_serial() ) {
610  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
611  }
612  }
613 
614 }
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:38
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
static const unsigned char version_mask
Definition: pipeline.h:89
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:64
task * end_counter
task who's reference count is used to determine when all stages are done.
Definition: pipeline.h:270
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:267
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:282
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:156
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:264

References __TBB_ASSERT, __TBB_PIPELINE_VERSION, end_counter, filter_end, filter_list, has_thread_bound_filters, tbb::filter::is_bound(), tbb::filter::is_ordered(), tbb::filter::is_serial(), tbb::filter::my_filter_mode, tbb::filter::my_input_buffer, tbb::filter::my_pipeline, tbb::filter::next_filter_in_pipeline, tbb::filter::not_in_pipeline(), tbb::filter::object_may_be_null(), tbb::filter::prev_filter_in_pipeline, and tbb::filter::version_mask.

Here is the call graph for this function:

◆ clear()

void tbb::pipeline::clear ( )

Remove all filters from the pipeline.

Definition at line 546 of file pipeline.cpp.

546  {
547  filter* next;
548  for( filter* f = filter_list; f; f=next ) {
549  if( internal::input_buffer* b = f->my_input_buffer ) {
550  delete b;
551  f->my_input_buffer = NULL;
552  }
553  next=f->next_filter_in_pipeline;
554  f->next_filter_in_pipeline = filter::not_in_pipeline();
555  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
556  f->prev_filter_in_pipeline = filter::not_in_pipeline();
557  f->my_pipeline = NULL;
558  }
559  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
560  f->next_segment = NULL;
561  }
562  filter_list = filter_end = NULL;
563 }
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:38
static const unsigned char version_mask
Definition: pipeline.h:89
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:64
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:267
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:264
friend class filter
Definition: pipeline.h:258

References __TBB_PIPELINE_VERSION, filter_end, filter_list, tbb::filter::my_input_buffer, tbb::filter::next_filter_in_pipeline, tbb::filter::not_in_pipeline(), and tbb::filter::version_mask.

Referenced by ~pipeline().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ clear_filters()

void tbb::pipeline::clear_filters ( )
private

Does clean up if pipeline is cancelled or exception occurred.

Referenced by tbb::internal::pipeline_cleaner::~pipeline_cleaner().

Here is the caller graph for this function:

◆ inject_token()

void tbb::pipeline::inject_token ( task self)
private

Not used, but retained to satisfy old export files.

Definition at line 517 of file pipeline.cpp.

517  {
518  __TBB_ASSERT(false,"illegal call to inject_token");
519 }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165

References __TBB_ASSERT.

◆ remove_filter()

void tbb::pipeline::remove_filter ( filter filter_)
private

Remove filter from pipeline.

Definition at line 616 of file pipeline.cpp.

616  {
617  __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
618  __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
619  __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
620  if (&filter_ == filter_list)
622  else {
623  __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
624  filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
625  }
626  if (&filter_ == filter_end)
628  else {
629  __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
630  filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
631  }
632  if( internal::input_buffer* b = filter_.my_input_buffer ) {
633  delete b;
634  filter_.my_input_buffer = NULL;
635  }
636  filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline();
637  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
638  filter_.next_segment = NULL;
639  filter_.my_pipeline = NULL;
640 }
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:38
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
Definition: pipeline.h:181
static const unsigned char version_mask
Definition: pipeline.h:89
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:64
task * end_counter
task who's reference count is used to determine when all stages are done.
Definition: pipeline.h:270
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:267
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:156
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:264

References __TBB_ASSERT, __TBB_PIPELINE_VERSION, end_counter, filter_end, filter_list, tbb::filter::my_filter_mode, tbb::filter::my_input_buffer, tbb::filter::my_pipeline, tbb::filter::next_filter_in_pipeline, tbb::filter::next_segment, tbb::filter::not_in_pipeline(), tbb::filter::prev_filter_in_pipeline, and tbb::filter::version_mask.

Referenced by tbb::filter::~filter().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ run() [1/2]

void tbb::pipeline::run ( size_t  max_number_of_live_tokens)

Run the pipeline to completion.

Definition at line 642 of file pipeline.cpp.

646  {
647  __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
648  __TBB_ASSERT( !end_counter, "pipeline already running?" );
649  if( filter_list ) {
650  internal::pipeline_cleaner my_pipeline_cleaner(*this);
651  end_of_input = false;
652  input_tokens = internal::Token(max_number_of_live_tokens);
654  // release input filter if thread-bound
655  if(filter_list->is_bound()) {
656  filter_list->my_input_buffer->sema_V();
657  }
658  }
659 #if __TBB_TASK_GROUP_CONTEXT
660  end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this );
661 #else
662  end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this );
663 #endif
664  // Start execution of tasks
666 
669  if(f->is_bound()) {
670  f->my_input_buffer->sema_V(); // wake to end
671  }
672  }
673  }
674  }
675 }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
bool is_bound() const
True if filter is thread-bound.
Definition: pipeline.h:135
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
Definition: pipeline.h:170
task * end_counter
task who's reference count is used to determine when all stages are done.
Definition: pipeline.h:270
bool end_of_input
False until fetch_input returns NULL.
Definition: pipeline.h:279
static void spawn_root_and_wait(task &root)
Spawn task allocated by allocate_root, wait for it to complete, and deallocate it.
Definition: task.h:778
unsigned long Token
Definition: pipeline.h:40
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:282
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:156
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:264
friend class internal::pipeline_cleaner
Definition: pipeline.h:260
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:633
friend class filter
Definition: pipeline.h:258
atomic< internal::Token > input_tokens
Number of idle tokens waiting for input stage.
Definition: pipeline.h:273

References __TBB_ASSERT, tbb::task::allocate_root(), end_counter, end_of_input, filter_list, has_thread_bound_filters, input_tokens, internal::pipeline_cleaner, internal::pipeline_root_task, tbb::filter::is_bound(), tbb::filter::my_input_buffer, tbb::filter::next_filter_in_pipeline, and tbb::task::spawn_root_and_wait().

Here is the call graph for this function:

◆ run() [2/2]

void __TBB_EXPORTED_METHOD tbb::pipeline::run ( size_t  max_number_of_live_tokens,
tbb::task_group_context context 
)

Run the pipeline to completion with user-supplied context.

Friends And Related Function Documentation

◆ filter

friend class filter
friend

Definition at line 258 of file pipeline.h.

◆ internal::pipeline_cleaner

friend class internal::pipeline_cleaner
friend

Definition at line 260 of file pipeline.h.

Referenced by run().

◆ internal::pipeline_root_task

friend class internal::pipeline_root_task
friend

Definition at line 257 of file pipeline.h.

Referenced by run().

◆ internal::stage_task

friend class internal::stage_task
friend

Definition at line 256 of file pipeline.h.

◆ tbb::interface6::internal::pipeline_proxy

Definition at line 261 of file pipeline.h.

◆ thread_bound_filter

friend class thread_bound_filter
friend

Definition at line 259 of file pipeline.h.

Member Data Documentation

◆ end_counter

task* tbb::pipeline::end_counter
private

task who's reference count is used to determine when all stages are done.

Definition at line 270 of file pipeline.h.

Referenced by add_filter(), remove_filter(), run(), and tbb::internal::pipeline_cleaner::~pipeline_cleaner().

◆ end_of_input

bool tbb::pipeline::end_of_input
private

◆ filter_end

filter* tbb::pipeline::filter_end
private

Pointer to location where address of next filter to be added should be stored.

Definition at line 267 of file pipeline.h.

Referenced by add_filter(), clear(), and remove_filter().

◆ filter_list

◆ has_thread_bound_filters

bool tbb::pipeline::has_thread_bound_filters
private

True if the pipeline contains a thread-bound filter; false otherwise.

Definition at line 282 of file pipeline.h.

Referenced by add_filter(), tbb::internal::stage_task::execute(), and run().

◆ input_tokens

atomic<internal::Token> tbb::pipeline::input_tokens
private

Number of idle tokens waiting for input stage.

Definition at line 273 of file pipeline.h.

Referenced by tbb::internal::stage_task::execute(), tbb::internal::pipeline_root_task::execute(), tbb::thread_bound_filter::internal_process_item(), pipeline(), and run().

◆ token_counter


The documentation for this class was generated from the following files:

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.