Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
task_stream.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef _TBB_task_stream_H
22 #define _TBB_task_stream_H
23 
24 #include "tbb/tbb_stddef.h"
25 #include <deque>
26 #include <climits>
27 #include "tbb/atomic.h" // for __TBB_Atomic*
28 #include "tbb/spin_mutex.h"
29 #include "tbb/tbb_allocator.h"
30 #include "scheduler_common.h"
31 #include "tbb_misc.h" // for FastRandom
32 
33 namespace tbb {
34 namespace internal {
35 
37 
39 template< typename T, typename mutex_t >
41  typedef std::deque< T, tbb_allocator<T> > queue_base_t;
42 
45 
48 };
49 
50 typedef uintptr_t population_t;
51 const population_t one = 1;
52 
53 inline void set_one_bit( population_t& dest, int pos ) {
54  __TBB_ASSERT( pos>=0, NULL );
55  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
56  __TBB_AtomicOR( &dest, one<<pos );
57 }
58 
59 inline void clear_one_bit( population_t& dest, int pos ) {
60  __TBB_ASSERT( pos>=0, NULL );
61  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
62  __TBB_AtomicAND( &dest, ~(one<<pos) );
63 }
64 
65 inline bool is_bit_set( population_t val, int pos ) {
66  __TBB_ASSERT( pos>=0, NULL );
67  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
68  return (val & (one<<pos)) != 0;
69 }
70 
72 template<int Levels>
77  unsigned N;
78 
79 public:
80  task_stream() : N() {
81  for(int level = 0; level < Levels; level++) {
82  population[level] = 0;
83  lanes[level] = NULL;
84  }
85  }
86 
87  void initialize( unsigned n_lanes ) {
88  const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
89 
90  N = n_lanes>=max_lanes ? max_lanes : n_lanes>2 ? 1<<(__TBB_Log2(n_lanes-1)+1) : 2;
91  __TBB_ASSERT( N==max_lanes || N>=n_lanes && ((N-1)&N)==0, "number of lanes miscalculated");
92  __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
93  for(int level = 0; level < Levels; level++) {
94  lanes[level] = new padded<lane_t>[N];
95  __TBB_ASSERT( !population[level], NULL );
96  }
97  }
98 
100  for(int level = 0; level < Levels; level++)
101  if (lanes[level]) delete[] lanes[level];
102  }
103 
105  void push( task* source, int level, FastRandom& random ) {
106  // Lane selection is random. Each thread should keep a separate seed value.
107  unsigned idx;
108  for( ; ; ) {
109  idx = random.get() & (N-1);
111  if( lock.try_acquire(lanes[level][idx].my_mutex) ) {
112  lanes[level][idx].my_queue.push_back(source);
113  set_one_bit( population[level], idx ); //TODO: avoid atomic op if the bit is already set
114  break;
115  }
116  }
117  }
118 
120  task* pop( int level, unsigned& last_used_lane ) {
121  task* result = NULL;
122  // Lane selection is round-robin. Each thread should keep its last used lane.
123  unsigned idx = (last_used_lane+1)&(N-1);
124  for( ; population[level]; idx=(idx+1)&(N-1) ) {
125  if( is_bit_set( population[level], idx ) ) {
126  lane_t& lane = lanes[level][idx];
128  if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
129  result = lane.my_queue.front();
130  lane.my_queue.pop_front();
131  if( lane.my_queue.empty() )
132  clear_one_bit( population[level], idx );
133  break;
134  }
135  }
136  }
137  last_used_lane = idx;
138  return result;
139  }
140 
142  bool empty(int level) {
143  return !population[level];
144  }
145 
147 
149  intptr_t drain() {
150  intptr_t result = 0;
151  for(int level = 0; level < Levels; level++)
152  for(unsigned i=0; i<N; ++i) {
153  lane_t& lane = lanes[level][i];
155  for(lane_t::queue_base_t::iterator it=lane.my_queue.begin();
156  it!=lane.my_queue.end(); ++it, ++result)
157  {
158  __TBB_ASSERT( is_bit_set( population[level], i ), NULL );
159  task* t = *it;
160  tbb::task::destroy(*t);
161  }
162  lane.my_queue.clear();
163  clear_one_bit( population[level], i );
164  }
165  return result;
166  }
167 }; // task_stream
168 
169 } // namespace internal
170 } // namespace tbb
171 
172 #endif /* _TBB_task_stream_H */
intptr_t drain()
Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.
Definition: task_stream.h:149
The container for "fairness-oriented" aka "enqueued" tasks.
Definition: task_stream.h:73
void set_one_bit(population_t &dest, int pos)
Definition: task_stream.h:53
void initialize(unsigned n_lanes)
Definition: task_stream.h:87
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
A fast random number generator.
Definition: tbb_misc.h:132
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void __TBB_AtomicAND(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:892
Base class for user-defined tasks.
Definition: task.h:592
padded< lane_t > * lanes[Levels]
Definition: task_stream.h:76
uintptr_t population_t
Definition: task_stream.h:50
queue_and_mutex< task *, spin_mutex > lane_t
Definition: task_stream.h:74
population_t population[Levels]
Definition: task_stream.h:75
intptr_t __TBB_Log2(uintptr_t x)
Definition: tbb_machine.h:864
bool empty(int level)
Checks existence of a task.
Definition: task_stream.h:142
const population_t one
Definition: task_stream.h:51
bool is_bit_set(population_t val, int pos)
Definition: task_stream.h:65
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
The graph class.
unsigned short get()
Get a random number.
Definition: tbb_misc.h:143
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
void clear_one_bit(population_t &dest, int pos)
Definition: task_stream.h:59
Essentially, this is just a pair of a queue and a mutex to protect the queue.
Definition: task_stream.h:40
std::deque< T, tbb_allocator< T > > queue_base_t
Definition: task_stream.h:41
void push(task *source, int level, FastRandom &random)
Push a task into a lane.
Definition: task_stream.h:105
task * pop(int level, unsigned &last_used_lane)
Try finding and popping a task.
Definition: task_stream.h:120
CRITICAL_SECTION mutex_t
void __TBB_AtomicOR(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:882
Pads type T to fill out to a multiple of cache line size.
Definition: tbb_stddef.h:265

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.