Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
arena.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #include "tbb/global_control.h" // thread_stack_size
18 
19 #include "scheduler.h"
20 #include "governor.h"
21 #include "arena.h"
22 #include "itt_notify.h"
23 #include "semaphore.h"
25 
26 #include <functional>
27 
28 #if __TBB_STATISTICS_STDOUT
29 #include <cstdio>
30 #endif
31 
32 namespace tbb {
33 namespace internal {
34 
35 // put it here in order to enable compiler to inline it into arena::process and nested_arena_entry
36 void generic_scheduler::attach_arena( arena* a, size_t index, bool is_master ) {
37  __TBB_ASSERT( a->my_market == my_market, NULL );
38  my_arena = a;
39  my_arena_index = index;
40  my_arena_slot = a->my_slots + index;
41  attach_mailbox( affinity_id(index+1) );
42  if ( is_master && my_inbox.is_idle_state( true ) ) {
43  // Master enters an arena with its own task to be executed. It means that master is not
44  // going to enter stealing loop and take affinity tasks.
45  my_inbox.set_is_idle( false );
46  }
47 #if __TBB_TASK_GROUP_CONTEXT
48  // Context to be used by root tasks by default (if the user has not specified one).
49  if( !is_master )
50  my_dummy_task->prefix().context = a->my_default_ctx;
51 #endif /* __TBB_TASK_GROUP_CONTEXT */
52 #if __TBB_TASK_PRIORITY
53  // In the current implementation master threads continue processing even when
54  // there are other masters with higher priority. Only TBB worker threads are
55  // redistributed between arenas based on the latters' priority. Thus master
56  // threads use arena's top priority as a reference point (in contrast to workers
57  // that use my_market->my_global_top_priority).
58  if( is_master ) {
59  my_ref_top_priority = &a->my_top_priority;
60  my_ref_reload_epoch = &a->my_reload_epoch;
61  }
62  my_local_reload_epoch = *my_ref_reload_epoch;
63  __TBB_ASSERT( !my_offloaded_tasks, NULL );
64 #endif /* __TBB_TASK_PRIORITY */
65 }
66 
67 inline static bool occupy_slot( generic_scheduler*& slot, generic_scheduler& s ) {
68  return !slot && as_atomic( slot ).compare_and_swap( &s, NULL ) == NULL;
69 }
70 
71 size_t arena::occupy_free_slot_in_range( generic_scheduler& s, size_t lower, size_t upper ) {
72  if ( lower >= upper ) return out_of_arena;
73  // Start search for an empty slot from the one we occupied the last time
74  size_t index = s.my_arena_index;
75  if ( index < lower || index >= upper ) index = s.my_random.get() % (upper - lower) + lower;
76  __TBB_ASSERT( index >= lower && index < upper, NULL );
77  // Find a free slot
78  for ( size_t i = index; i < upper; ++i )
79  if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i;
80  for ( size_t i = lower; i < index; ++i )
81  if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i;
82  return out_of_arena;
83 }
84 
85 template <bool as_worker>
87  // Firstly, masters try to occupy reserved slots
88  size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( s, 0, my_num_reserved_slots );
89  if ( index == out_of_arena ) {
90  // Secondly, all threads try to occupy all non-reserved slots
92  // Likely this arena is already saturated
93  if ( index == out_of_arena )
94  return out_of_arena;
95  }
96 
97  ITT_NOTIFY(sync_acquired, my_slots + index);
98  atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
99  return index;
100 }
101 
103  __TBB_ASSERT( is_alive(my_guard), NULL );
104  __TBB_ASSERT( governor::is_set(&s), NULL );
105  __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
106  __TBB_ASSERT( s.worker_outermost_level(), NULL );
107 
108  __TBB_ASSERT( my_num_slots > 1, NULL );
109 
110  size_t index = occupy_free_slot</*as_worker*/true>( s );
111  if ( index == out_of_arena )
112  goto quit;
113 
114  __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
115  s.attach_arena( this, index, /*is_master*/false );
116 
117 #if !__TBB_FP_CONTEXT
119 #endif
120 
121 #if __TBB_ARENA_OBSERVER
122  __TBB_ASSERT( !s.my_last_local_observer, "There cannot be notified local observers when entering arena" );
123  my_observers.notify_entry_observers( s.my_last_local_observer, /*worker=*/true );
124 #endif /* __TBB_ARENA_OBSERVER */
125 
126  // Task pool can be marked as non-empty if the worker occupies the slot left by a master.
127  if ( s.my_arena_slot->task_pool != EmptyTaskPool ) {
128  __TBB_ASSERT( s.my_inbox.is_idle_state(false), NULL );
129  s.local_wait_for_all( *s.my_dummy_task, NULL );
130  __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL );
131  }
132 
133  for ( ;; ) {
134  __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
135  __TBB_ASSERT( s.worker_outermost_level(), NULL );
136  __TBB_ASSERT( is_alive(my_guard), NULL );
137  __TBB_ASSERT( s.is_quiescent_local_task_pool_reset(),
138  "Worker cannot leave arena while its task pool is not reset" );
139  __TBB_ASSERT( s.my_arena_slot->task_pool == EmptyTaskPool, "Empty task pool is not marked appropriately" );
140  // This check prevents relinquishing more than necessary workers because
141  // of the non-atomicity of the decision making procedure
144  || recall_by_mandatory_request()
145 #endif
146  )
147  break;
148  // Try to steal a task.
149  // Passing reference count is technically unnecessary in this context,
150  // but omitting it here would add checks inside the function.
151  task* t = s.receive_or_steal_task( __TBB_ISOLATION_ARG( s.my_dummy_task->prefix().ref_count, no_isolation ) );
152  if (t) {
153  // A side effect of receive_or_steal_task is that my_innermost_running_task can be set.
154  // But for the outermost dispatch loop it has to be a dummy task.
155  s.my_innermost_running_task = s.my_dummy_task;
156  s.local_wait_for_all(*s.my_dummy_task,t);
157  }
158  }
159 #if __TBB_ARENA_OBSERVER
160  my_observers.notify_exit_observers( s.my_last_local_observer, /*worker=*/true );
161  s.my_last_local_observer = NULL;
162 #endif /* __TBB_ARENA_OBSERVER */
163 #if __TBB_TASK_PRIORITY
164  if ( s.my_offloaded_tasks )
165  orphan_offloaded_tasks( s );
166 #endif /* __TBB_TASK_PRIORITY */
167 #if __TBB_STATISTICS
168  ++s.my_counters.arena_roundtrips;
169  *my_slots[index].my_counters += s.my_counters;
170  s.my_counters.reset();
171 #endif /* __TBB_STATISTICS */
172  __TBB_store_with_release( my_slots[index].my_scheduler, (generic_scheduler*)NULL );
173  s.my_arena_slot = 0; // detached from slot
174  s.my_inbox.detach();
175  __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL );
176  __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
177  __TBB_ASSERT( s.worker_outermost_level(), NULL );
178  __TBB_ASSERT( is_alive(my_guard), NULL );
179 quit:
180  // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
181  // that arena may be temporarily left unpopulated by threads. See comments in
182  // arena::on_thread_leaving() for more details.
183  on_thread_leaving<ref_worker>();
184 }
185 
186 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots ) {
187  __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
188  __TBB_ASSERT( sizeof(my_slots[0]) % NFS_GetLineSize()==0, "arena::slot size not multiple of cache line size" );
189  __TBB_ASSERT( (uintptr_t)this % NFS_GetLineSize()==0, "arena misaligned" );
190 #if __TBB_TASK_PRIORITY
191  __TBB_ASSERT( !my_reload_epoch && !my_orphaned_tasks && !my_skipped_fifo_priority, "New arena object is not zeroed" );
192 #endif /* __TBB_TASK_PRIORITY */
193  my_market = &m;
194  my_limit = 1;
195  // Two slots are mandatory: for the master, and for 1 worker (required to support starvation resistant tasks).
196  my_num_slots = num_arena_slots(num_slots);
197  my_num_reserved_slots = num_reserved_slots;
198  my_max_num_workers = num_slots-num_reserved_slots;
199  my_references = ref_external; // accounts for the master
200 #if __TBB_TASK_PRIORITY
201  my_bottom_priority = my_top_priority = normalized_normal_priority;
202 #endif /* __TBB_TASK_PRIORITY */
204 #if __TBB_ARENA_OBSERVER
205  my_observers.my_arena = this;
206 #endif
208  // Construct slots. Mark internal synchronization elements for the tools.
209  for( unsigned i = 0; i < my_num_slots; ++i ) {
210  __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
211  __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
212  __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
213  ITT_SYNC_CREATE(my_slots + i, SyncType_Scheduler, SyncObj_WorkerTaskPool);
214  mailbox(i+1).construct();
215  ITT_SYNC_CREATE(&mailbox(i+1), SyncType_Scheduler, SyncObj_Mailbox);
216  my_slots[i].hint_for_pop = i;
217 #if __TBB_PREVIEW_CRITICAL_TASKS
218  my_slots[i].hint_for_critical = i;
219 #endif
220 #if __TBB_STATISTICS
221  my_slots[i].my_counters = new ( NFS_Allocate(1, sizeof(statistics_counters), NULL) ) statistics_counters;
222 #endif /* __TBB_STATISTICS */
223  }
225  ITT_SYNC_CREATE(&my_task_stream, SyncType_Scheduler, SyncObj_TaskStream);
226 #if __TBB_PREVIEW_CRITICAL_TASKS
227  my_critical_task_stream.initialize(my_num_slots);
228  ITT_SYNC_CREATE(&my_critical_task_stream, SyncType_Scheduler, SyncObj_CriticalTaskStream);
229 #endif
230 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
231  my_concurrency_mode = cm_normal;
232 #endif
233 #if !__TBB_FP_CONTEXT
235 #endif
236 }
237 
238 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots ) {
239  __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
240  __TBB_ASSERT( sizeof(base_type) % NFS_GetLineSize() == 0, "arena slots area misaligned: wrong padding" );
241  __TBB_ASSERT( sizeof(mail_outbox) == NFS_MaxLineSize, "Mailbox padding is wrong" );
242  size_t n = allocation_size(num_arena_slots(num_slots));
243  unsigned char* storage = (unsigned char*)NFS_Allocate( 1, n, NULL );
244  // Zero all slots to indicate that they are empty
245  memset( storage, 0, n );
246  return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) arena(m, num_slots, num_reserved_slots);
247 }
248 
250  __TBB_ASSERT( is_alive(my_guard), NULL );
251  __TBB_ASSERT( !my_references, "There are threads in the dying arena" );
252  __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
253  __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, "Inconsistent state of a dying arena" );
254 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
255  __TBB_ASSERT( my_concurrency_mode != cm_enforced_global, NULL );
256 #endif
257 #if !__TBB_STATISTICS_EARLY_DUMP
258  GATHER_STATISTIC( dump_arena_statistics() );
259 #endif
260  poison_value( my_guard );
261  intptr_t drained = 0;
262  for ( unsigned i = 0; i < my_num_slots; ++i ) {
263  __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
264  // TODO: understand the assertion and modify
265  // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
266  __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
268 #if __TBB_STATISTICS
269  NFS_Free( my_slots[i].my_counters );
270 #endif /* __TBB_STATISTICS */
271  drained += mailbox(i+1).drain();
272  }
273  __TBB_ASSERT( my_task_stream.drain()==0, "Not all enqueued tasks were executed");
274 #if __TBB_PREVIEW_CRITICAL_TASKS
275  __TBB_ASSERT( my_critical_task_stream.drain()==0, "Not all critical tasks were executed");
276 #endif
277 #if __TBB_COUNT_TASK_NODES
278  my_market->update_task_node_count( -drained );
279 #endif /* __TBB_COUNT_TASK_NODES */
280  // remove an internal reference
281  my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
282 #if __TBB_TASK_GROUP_CONTEXT
283  __TBB_ASSERT( my_default_ctx, "Master thread never entered the arena?" );
284  my_default_ctx->~task_group_context();
285  NFS_Free(my_default_ctx);
286 #endif /* __TBB_TASK_GROUP_CONTEXT */
287 #if __TBB_ARENA_OBSERVER
288  if ( !my_observers.empty() )
289  my_observers.clear();
290 #endif /* __TBB_ARENA_OBSERVER */
291  void* storage = &mailbox(my_num_slots);
292  __TBB_ASSERT( my_references == 0, NULL );
294  this->~arena();
295 #if TBB_USE_ASSERT > 1
296  memset( storage, 0, allocation_size(my_num_slots) );
297 #endif /* TBB_USE_ASSERT */
298  NFS_Free( storage );
299 }
300 
301 #if __TBB_STATISTICS
302 void arena::dump_arena_statistics () {
303  statistics_counters total;
304  for( unsigned i = 0; i < my_num_slots; ++i ) {
305 #if __TBB_STATISTICS_EARLY_DUMP
307  if ( s )
308  *my_slots[i].my_counters += s->my_counters;
309 #else
310  __TBB_ASSERT( !my_slots[i].my_scheduler, NULL );
311 #endif
312  if ( i != 0 ) {
313  total += *my_slots[i].my_counters;
314  dump_statistics( *my_slots[i].my_counters, i );
315  }
316  }
317  dump_statistics( *my_slots[0].my_counters, 0 );
318 #if __TBB_STATISTICS_STDOUT
319 #if !__TBB_STATISTICS_TOTALS_ONLY
320  printf( "----------------------------------------------\n" );
321 #endif
322  dump_statistics( total, workers_counters_total );
323  total += *my_slots[0].my_counters;
324  dump_statistics( total, arena_counters_total );
325 #if !__TBB_STATISTICS_TOTALS_ONLY
326  printf( "==============================================\n" );
327 #endif
328 #endif /* __TBB_STATISTICS_STDOUT */
329 }
330 #endif /* __TBB_STATISTICS */
331 
332 #if __TBB_TASK_PRIORITY
333 // The method inspects a scheduler to determine:
334 // 1. if it has tasks that can be retrieved and executed (via the return value);
335 // 2. if it has any tasks at all, including those of lower priority (via tasks_present);
336 // 3. if it is able to work with enqueued tasks (via dequeuing_possible).
337 inline bool arena::may_have_tasks ( generic_scheduler* s, bool& tasks_present, bool& dequeuing_possible ) {
338  if ( !s || s->my_arena != this )
339  return false;
340  dequeuing_possible |= s->worker_outermost_level();
341  if ( s->my_pool_reshuffling_pending ) {
342  // This primary task pool is nonempty and may contain tasks at the current
343  // priority level. Its owner is winnowing lower priority tasks at the moment.
344  tasks_present = true;
345  return true;
346  }
347  if ( s->my_offloaded_tasks ) {
348  tasks_present = true;
349  if ( s->my_local_reload_epoch < *s->my_ref_reload_epoch ) {
350  // This scheduler's offload area is nonempty and may contain tasks at the
351  // current priority level.
352  return true;
353  }
354  }
355  return false;
356 }
357 
358 void arena::orphan_offloaded_tasks(generic_scheduler& s) {
359  __TBB_ASSERT( s.my_offloaded_tasks, NULL );
360  GATHER_STATISTIC( ++s.my_counters.prio_orphanings );
361  ++my_abandonment_epoch;
362  __TBB_ASSERT( s.my_offloaded_task_list_tail_link && !*s.my_offloaded_task_list_tail_link, NULL );
363  task* orphans;
364  do {
365  orphans = const_cast<task*>(my_orphaned_tasks);
366  *s.my_offloaded_task_list_tail_link = orphans;
367  } while ( as_atomic(my_orphaned_tasks).compare_and_swap(s.my_offloaded_tasks, orphans) != orphans );
368  s.my_offloaded_tasks = NULL;
369 #if TBB_USE_ASSERT
370  s.my_offloaded_task_list_tail_link = NULL;
371 #endif /* TBB_USE_ASSERT */
372 }
373 #endif /* __TBB_TASK_PRIORITY */
374 
376  // Look for enqueued tasks at all priority levels
377  for ( int p = 0; p < num_priority_levels; ++p )
378  if ( !my_task_stream.empty(p) )
379  return true;
380  return false;
381 }
382 
384  // Check for the presence of enqueued tasks "lost" on some of
385  // priority levels because updating arena priority and switching
386  // arena into "populated" (FULL) state happen non-atomically.
387  // Imposing atomicity would require task::enqueue() to use a lock,
388  // which is unacceptable.
389  if ( has_enqueued_tasks() ) {
390  advertise_new_work<work_enqueued>();
391 #if __TBB_TASK_PRIORITY
392  // update_arena_priority() expects non-zero arena::my_num_workers_requested,
393  // so must be called after advertise_new_work<work_enqueued>()
394  for ( int p = 0; p < num_priority_levels; ++p )
395  if ( !my_task_stream.empty(p) ) {
396  if ( p < my_bottom_priority || p > my_top_priority )
397  my_market->update_arena_priority(*this, p);
398  }
399 #endif
400  }
401 }
402 
404  // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
405  for(;;) {
406  pool_state_t snapshot = my_pool_state;
407  switch( snapshot ) {
408  case SNAPSHOT_EMPTY:
409  return true;
410  case SNAPSHOT_FULL: {
411  // Use unique id for "busy" in order to avoid ABA problems.
412  const pool_state_t busy = pool_state_t(&busy);
413  // Request permission to take snapshot
415  // Got permission. Take the snapshot.
416  // NOTE: This is not a lock, as the state can be set to FULL at
417  // any moment by a thread that spawns/enqueues new task.
418  size_t n = my_limit;
419  // Make local copies of volatile parameters. Their change during
420  // snapshot taking procedure invalidates the attempt, and returns
421  // this thread into the dispatch loop.
422 #if __TBB_TASK_PRIORITY
423  uintptr_t reload_epoch = __TBB_load_with_acquire( my_reload_epoch );
424  intptr_t top_priority = my_top_priority;
425  // Inspect primary task pools first
426 #endif /* __TBB_TASK_PRIORITY */
427  size_t k;
428  for( k=0; k<n; ++k ) {
429  if( my_slots[k].task_pool != EmptyTaskPool &&
431  {
432  // k-th primary task pool is nonempty and does contain tasks.
433  break;
434  }
435  if( my_pool_state!=busy )
436  return false; // the work was published
437  }
438  __TBB_ASSERT( k <= n, NULL );
439  bool work_absent = k == n;
440 #if __TBB_PREVIEW_CRITICAL_TASKS
441  bool no_critical_tasks = my_critical_task_stream.empty(0);
442  work_absent &= no_critical_tasks;
443 #endif
444 #if __TBB_TASK_PRIORITY
445  // Variable tasks_present indicates presence of tasks at any priority
446  // level, while work_absent refers only to the current priority.
447  bool tasks_present = !work_absent || my_orphaned_tasks;
448  bool dequeuing_possible = false;
449  if ( work_absent ) {
450  // Check for the possibility that recent priority changes
451  // brought some tasks to the current priority level
452 
453  uintptr_t abandonment_epoch = my_abandonment_epoch;
454  // Master thread's scheduler needs special handling as it
455  // may be destroyed at any moment (workers' schedulers are
456  // guaranteed to be alive while at least one thread is in arena).
457  // The lock below excludes concurrency with task group state change
458  // propagation and guarantees lifetime of the master thread.
459  the_context_state_propagation_mutex.lock();
460  work_absent = !may_have_tasks( my_slots[0].my_scheduler, tasks_present, dequeuing_possible );
461  the_context_state_propagation_mutex.unlock();
462  // The following loop is subject to data races. While k-th slot's
463  // scheduler is being examined, corresponding worker can either
464  // leave to RML or migrate to another arena.
465  // But the races are not prevented because all of them are benign.
466  // First, the code relies on the fact that worker thread's scheduler
467  // object persists until the whole library is deinitialized.
468  // Second, in the worst case the races can only cause another
469  // round of stealing attempts to be undertaken. Introducing complex
470  // synchronization into this coldest part of the scheduler's control
471  // flow does not seem to make sense because it both is unlikely to
472  // ever have any observable performance effect, and will require
473  // additional synchronization code on the hotter paths.
474  for( k = 1; work_absent && k < n; ++k ) {
475  if( my_pool_state!=busy )
476  return false; // the work was published
477  work_absent = !may_have_tasks( my_slots[k].my_scheduler, tasks_present, dequeuing_possible );
478  }
479  // Preclude premature switching arena off because of a race in the previous loop.
480  work_absent = work_absent
481  && !__TBB_load_with_acquire(my_orphaned_tasks)
482  && abandonment_epoch == my_abandonment_epoch;
483  }
484 #endif /* __TBB_TASK_PRIORITY */
485  // Test and test-and-set.
486  if( my_pool_state==busy ) {
487 #if __TBB_TASK_PRIORITY
488  bool no_fifo_tasks = my_task_stream.empty(top_priority);
489  work_absent = work_absent && (!dequeuing_possible || no_fifo_tasks)
490  && top_priority == my_top_priority && reload_epoch == my_reload_epoch;
491 #else
492  bool no_fifo_tasks = my_task_stream.empty(0);
493  work_absent = work_absent && no_fifo_tasks;
494 #endif /* __TBB_TASK_PRIORITY */
495  if( work_absent ) {
496 #if __TBB_TASK_PRIORITY
497  if ( top_priority > my_bottom_priority ) {
498  if ( my_market->lower_arena_priority(*this, top_priority - 1, reload_epoch)
499  && !my_task_stream.empty(top_priority) )
500  {
501  atomic_update( my_skipped_fifo_priority, top_priority, std::less<intptr_t>());
502  }
503  }
504  else if ( !tasks_present && !my_orphaned_tasks && no_fifo_tasks ) {
505 #endif /* __TBB_TASK_PRIORITY */
506  // save current demand value before setting SNAPSHOT_EMPTY,
507  // to avoid race with advertise_new_work.
508  int current_demand = (int)my_max_num_workers;
509  if( my_pool_state.compare_and_swap( SNAPSHOT_EMPTY, busy )==busy ) {
510 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
511  if( my_concurrency_mode==cm_enforced_global ) {
512  // adjust_demand() called inside, if needed
513  my_market->mandatory_concurrency_disable( this );
514  } else
515 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
516  {
517  // This thread transitioned pool to empty state, and thus is
518  // responsible for telling the market that there is no work to do.
519  my_market->adjust_demand( *this, -current_demand );
520  }
522  return true;
523  }
524  return false;
525 #if __TBB_TASK_PRIORITY
526  }
527 #endif /* __TBB_TASK_PRIORITY */
528  }
529  // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
531  }
532  }
533  return false;
534  }
535  default:
536  // Another thread is taking a snapshot.
537  return false;
538  }
539  }
540 }
541 
542 #if __TBB_COUNT_TASK_NODES
543 intptr_t arena::workers_task_node_count() {
544  intptr_t result = 0;
545  for( unsigned i = 1; i < my_num_slots; ++i ) {
547  if( s )
548  result += s->my_task_node_count;
549  }
550  return result;
551 }
552 #endif /* __TBB_COUNT_TASK_NODES */
553 
554 void arena::enqueue_task( task& t, intptr_t prio, FastRandom &random )
555 {
556 #if __TBB_RECYCLE_TO_ENQUEUE
557  __TBB_ASSERT( t.state()==task::allocated || t.state()==task::to_enqueue, "attempt to enqueue task with inappropriate state" );
558 #else
559  __TBB_ASSERT( t.state()==task::allocated, "attempt to enqueue task that is not in 'allocated' state" );
560 #endif
561  t.prefix().state = task::ready;
562  t.prefix().extra_state |= es_task_enqueued; // enqueued task marker
563 
564 #if TBB_USE_ASSERT
565  if( task* parent = t.parent() ) {
566  internal::reference_count ref_count = parent->prefix().ref_count;
567  __TBB_ASSERT( ref_count!=0, "attempt to enqueue task whose parent has a ref_count==0 (forgot to set_ref_count?)" );
568  __TBB_ASSERT( ref_count>0, "attempt to enqueue task whose parent has a ref_count<0" );
569  parent->prefix().extra_state |= es_ref_count_active;
570  }
571  __TBB_ASSERT(t.prefix().affinity==affinity_id(0), "affinity is ignored for enqueued tasks");
572 #endif /* TBB_USE_ASSERT */
573 #if __TBB_PREVIEW_CRITICAL_TASKS
574  if( prio == internal::priority_critical || internal::is_critical( t ) ) {
575  // TODO: consider using of 'scheduler::handled_as_critical'
577 #if __TBB_TASK_ISOLATION
579  __TBB_ASSERT( s, "Scheduler must be initialized at this moment" );
580  // propagate isolation level to critical task
581  t.prefix().isolation = s->my_innermost_running_task->prefix().isolation;
582 #endif
583  ITT_NOTIFY(sync_releasing, &my_critical_task_stream);
584  if( !s || !s->my_arena_slot ) {
585  // Either scheduler is not initialized or it is not attached to the arena, use random
586  // lane for the task.
587  my_critical_task_stream.push( &t, 0, internal::random_lane_selector(random) );
588  } else {
589  unsigned& lane = s->my_arena_slot->hint_for_critical;
590  my_critical_task_stream.push( &t, 0, tbb::internal::subsequent_lane_selector(lane) );
591  }
592  advertise_new_work<work_spawned>();
593  return;
594  }
595 #endif /* __TBB_PREVIEW_CRITICAL_TASKS */
596 
598 #if __TBB_TASK_PRIORITY
599  intptr_t p = prio ? normalize_priority(priority_t(prio)) : normalized_normal_priority;
600  assert_priority_valid(p);
601 #if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD
602  my_task_stream.push( &t, p, internal::random_lane_selector(random) );
603 #else
604  my_task_stream.push( &t, p, random );
605 #endif
606  if ( p != my_top_priority )
607  my_market->update_arena_priority( *this, p );
608 #else /* !__TBB_TASK_PRIORITY */
609  __TBB_ASSERT_EX(prio == 0, "the library is not configured to respect the task priority");
610 #if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD
611  my_task_stream.push( &t, 0, internal::random_lane_selector(random) );
612 #else
613  my_task_stream.push( &t, 0, random );
614 #endif
615 #endif /* !__TBB_TASK_PRIORITY */
616  advertise_new_work<work_enqueued>();
617 #if __TBB_TASK_PRIORITY
618  if ( p != my_top_priority )
619  my_market->update_arena_priority( *this, p );
620 #endif /* __TBB_TASK_PRIORITY */
621 }
622 
624 public:
625  nested_arena_context(generic_scheduler *s, arena* a, size_t slot_index, bool type, bool same)
626  : my_scheduler(*s), my_orig_ctx(NULL), same_arena(same) {
627  if (same_arena) {
631  } else {
632  my_orig_state = *s;
634  s->nested_arena_entry(a, slot_index);
635  }
636  }
638 #if __TBB_TASK_GROUP_CONTEXT
639  my_scheduler.my_dummy_task->prefix().context = my_orig_ctx; // restore context of dummy task
640 #endif
641  if (same_arena) {
644  } else {
646  static_cast<scheduler_state&>(my_scheduler) = my_orig_state; // restore arena settings
647 #if __TBB_TASK_PRIORITY
648  my_scheduler.my_local_reload_epoch = *my_orig_state.my_ref_reload_epoch;
649 #endif
651  }
652  }
653 
654 private:
658  const bool same_arena;
659 
664 #if __TBB_PREVIEW_CRITICAL_TASKS
665  my_scheduler.my_properties.has_taken_critical_task = false;
666 #endif
667 #if __TBB_TASK_GROUP_CONTEXT
668  // Save dummy's context and replace it by arena's context
670  my_scheduler.my_dummy_task->prefix().context = a->my_default_ctx;
671 #endif
672  }
673 };
674 
675 void generic_scheduler::nested_arena_entry(arena* a, size_t slot_index) {
676  __TBB_ASSERT( is_alive(a->my_guard), NULL );
677  __TBB_ASSERT( a!=my_arena, NULL);
678 
679  // overwrite arena settings
680 #if __TBB_TASK_PRIORITY
681  if ( my_offloaded_tasks )
682  my_arena->orphan_offloaded_tasks( *this );
683  my_offloaded_tasks = NULL;
684 #endif /* __TBB_TASK_PRIORITY */
685  attach_arena( a, slot_index, /*is_master*/true );
686  __TBB_ASSERT( my_arena == a, NULL );
688  // TODO? ITT_NOTIFY(sync_acquired, a->my_slots + index);
689  // TODO: it requires market to have P workers (not P-1)
690  // TODO: a preempted worker should be excluded from assignment to other arenas e.g. my_slack--
691  if( !is_worker() && slot_index >= my_arena->my_num_reserved_slots )
693 #if __TBB_ARENA_OBSERVER
694  my_last_local_observer = 0; // TODO: try optimize number of calls
695  my_arena->my_observers.notify_entry_observers( my_last_local_observer, /*worker=*/false );
696 #endif
697 }
698 
700 #if __TBB_ARENA_OBSERVER
701  my_arena->my_observers.notify_exit_observers( my_last_local_observer, /*worker=*/false );
702 #endif /* __TBB_ARENA_OBSERVER */
703 #if __TBB_TASK_PRIORITY
704  if ( my_offloaded_tasks )
705  my_arena->orphan_offloaded_tasks( *this );
706 #endif
709  // Free the master slot.
710  __TBB_ASSERT(my_arena->my_slots[my_arena_index].my_scheduler, "A slot is already empty");
712  my_arena->my_exit_monitors.notify_one(); // do not relax!
713 }
714 
716  my_dummy_task->prefix().ref_count++; // prevents exit from local_wait_for_all when local work is done enforcing the stealing
719  my_dummy_task->prefix().ref_count--;
720 }
721 
722 } // namespace internal
723 } // namespace tbb
724 
725 #include "scheduler_utility.h"
726 #include "tbb/task_arena.h" // task_arena_base
727 
728 namespace tbb {
729 namespace interface7 {
730 namespace internal {
731 
734  if( my_max_concurrency < 1 )
736  __TBB_ASSERT( my_master_slots <= (unsigned)my_max_concurrency, "Number of slots reserved for master should not exceed arena concurrency");
738  // add an internal market reference; a public reference was added in create_arena
739  market &m = market::global_market( /*is_public=*/false );
740  // allocate default context for task_arena
741 #if __TBB_TASK_GROUP_CONTEXT
742  new_arena->my_default_ctx = new ( NFS_Allocate(1, sizeof(task_group_context), NULL) )
744 #if __TBB_FP_CONTEXT
745  new_arena->my_default_ctx->capture_fp_settings();
746 #endif
747 #endif /* __TBB_TASK_GROUP_CONTEXT */
748  // threads might race to initialize the arena
749  if(as_atomic(my_arena).compare_and_swap(new_arena, NULL) != NULL) {
750  __TBB_ASSERT(my_arena, NULL); // another thread won the race
751  // release public market reference
752  m.release( /*is_public=*/true, /*blocking_terminate=*/false );
753  new_arena->on_thread_leaving<arena::ref_external>(); // destroy unneeded arena
754 #if __TBB_TASK_GROUP_CONTEXT
756  } else {
757  new_arena->my_default_ctx->my_version_and_traits |= my_version_and_traits & exact_exception_flag;
758  as_atomic(my_context) = new_arena->my_default_ctx;
759 #endif
760  }
761  // TODO: should it trigger automatic initialization of this thread?
763 }
764 
766  if( my_arena ) {// task_arena was initialized
767  my_arena->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
768  my_arena->on_thread_leaving<arena::ref_external>();
769  my_arena = 0;
770 #if __TBB_TASK_GROUP_CONTEXT
771  my_context = 0;
772 #endif
773  }
774 }
775 
777  __TBB_ASSERT(!my_arena, NULL);
779  if( s && s->my_arena ) {
780  // There is an active arena to attach to.
781  // It's still used by s, so won't be destroyed right away.
782  my_arena = s->my_arena;
783  __TBB_ASSERT( my_arena->my_references > 0, NULL );
784  my_arena->my_references += arena::ref_external;
785 #if __TBB_TASK_GROUP_CONTEXT
786  my_context = my_arena->my_default_ctx;
788 #endif
789  my_master_slots = my_arena->my_num_reserved_slots;
790  my_max_concurrency = my_master_slots + my_arena->my_max_num_workers;
792  // increases market's ref count for task_arena
793  market::global_market( /*is_public=*/true );
794  }
795 }
796 
797 void task_arena_base::internal_enqueue( task& t, intptr_t prio ) const {
798  __TBB_ASSERT(my_arena, NULL);
800  __TBB_ASSERT(s, "Scheduler is not initialized"); // we allocated a task so can expect the scheduler
801 #if __TBB_TASK_GROUP_CONTEXT
802  // Is there a better place for checking the state of my_default_ctx?
803  __TBB_ASSERT(!(my_arena->my_default_ctx == t.prefix().context && my_arena->my_default_ctx->is_group_execution_cancelled()),
804  "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?");
805 #endif
806  my_arena->enqueue_task( t, prio, s->my_random );
807 }
808 
809 class delegated_task : public task {
810  internal::delegate_base & my_delegate;
815  __TBB_ASSERT(s.outermost_level(), "expected to be enqueued and received on the outermost level");
816  struct outermost_context : internal::no_copy {
817  delegated_task * t;
819  task * orig_dummy;
820  task_group_context * orig_ctx;
821  scheduler_properties orig_props;
822  outermost_context(delegated_task *_t, generic_scheduler &_s)
823  : t(_t), s(_s), orig_dummy(s.my_dummy_task), orig_props(s.my_properties) {
824  __TBB_ASSERT(s.my_innermost_running_task == t, NULL);
825 #if __TBB_TASK_GROUP_CONTEXT
826  orig_ctx = t->prefix().context;
827  t->prefix().context = s.my_arena->my_default_ctx;
828 #endif
829  // Mimics outermost master
830  s.my_dummy_task = t;
831  s.my_properties.type = scheduler_properties::master;
832  }
833  ~outermost_context() {
834 #if __TBB_TASK_GROUP_CONTEXT
835  // Restore context for sake of registering potential exception
836  t->prefix().context = orig_ctx;
837 #endif
838  s.my_properties = orig_props;
839  s.my_dummy_task = orig_dummy;
840  }
841  } scope(this, s);
842  my_delegate();
843  return NULL;
844  }
846  // potential exception was already registered. It must happen before the notification
847  __TBB_ASSERT(my_root->ref_count()==2, NULL);
848  __TBB_store_with_release(my_root->prefix().ref_count, 1); // must precede the wakeup
849  my_monitor.notify(*this); // do not relax, it needs a fence!
850  }
851 public:
852  delegated_task( internal::delegate_base & d, concurrent_monitor & s, task * t )
853  : my_delegate(d), my_monitor(s), my_root(t) {}
854  // predicate for concurrent_monitor notification
855  bool operator()(uintptr_t ctx) const { return (void*)ctx == (void*)&my_delegate; }
856 };
857 
858 void task_arena_base::internal_execute(internal::delegate_base& d) const {
859  __TBB_ASSERT(my_arena, NULL);
861  __TBB_ASSERT(s, "Scheduler is not initialized");
862 
863  bool same_arena = s->my_arena == my_arena;
864  size_t index1 = s->my_arena_index;
865  if (!same_arena) {
866  index1 = my_arena->occupy_free_slot</* as_worker*/false>(*s);
867  if (index1 == arena::out_of_arena) {
868 
869 #if __TBB_USE_OPTIONAL_RTTI
870  // Workaround for the bug inside graph. If the thread can not occupy arena slot during task_arena::execute()
871  // and all aggregator operations depend on this task completion (all other threads are inside arena already)
872  // deadlock appears, because enqueued task will never enter arena.
873  // Workaround: check if the task came from graph via RTTI (casting to graph::spawn_functor)
874  // and enqueue this task with non-blocking internal_enqueue method.
875  // TODO: have to change behaviour later in next GOLD release (maybe to add new library entry point - try_execute)
877  internal::delegated_function< graph_funct, void >* deleg_funct =
878  dynamic_cast< internal::delegated_function< graph_funct, void>* >(&d);
879 
880  if (deleg_funct) {
882  internal::function_task< internal::strip< graph_funct >::type >
883  (internal::forward< graph_funct >(deleg_funct->my_func)), 0);
884  return;
885  } else {
886 #endif /* __TBB_USE_OPTIONAL_RTTI */
888 #if __TBB_TASK_GROUP_CONTEXT
890 #if __TBB_FP_CONTEXT
891  exec_context.copy_fp_settings(*my_context);
892 #endif
893 #endif
894  auto_empty_task root(__TBB_CONTEXT_ARG(s, &exec_context));
895  root.prefix().ref_count = 2;
896  my_arena->enqueue_task(*new(task::allocate_root(__TBB_CONTEXT_ARG1(exec_context)))
897  delegated_task(d, my_arena->my_exit_monitors, &root),
898  0, s->my_random); // TODO: priority?
899  size_t index2 = arena::out_of_arena;
900  do {
901  my_arena->my_exit_monitors.prepare_wait(waiter, (uintptr_t)&d);
902  if (__TBB_load_with_acquire(root.prefix().ref_count) < 2) {
903  my_arena->my_exit_monitors.cancel_wait(waiter);
904  break;
905  }
906  index2 = my_arena->occupy_free_slot</*as_worker*/false>(*s);
907  if (index2 != arena::out_of_arena) {
908  my_arena->my_exit_monitors.cancel_wait(waiter);
910  s->local_wait_for_all(root, NULL);
911 #if TBB_USE_EXCEPTIONS
912  __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred
913 #endif
914  __TBB_ASSERT(root.prefix().ref_count == 0, NULL);
915  break;
916  }
917  my_arena->my_exit_monitors.commit_wait(waiter);
918  } while (__TBB_load_with_acquire(root.prefix().ref_count) == 2);
919  if (index2 == arena::out_of_arena) {
920  // notify a waiting thread even if this thread did not enter arena,
921  // in case it was woken by a leaving thread but did not need to enter
922  my_arena->my_exit_monitors.notify_one(); // do not relax!
923  }
924 #if TBB_USE_EXCEPTIONS
925  // process possible exception
927  TbbRethrowException(pe);
928 #endif
929  return;
930 #if __TBB_USE_OPTIONAL_RTTI
931  } // if task came from graph
932 #endif
933  } // if (index1 == arena::out_of_arena)
934  } // if (!same_arena)
935 
936  context_guard_helper</*report_tasks=*/false> context_guard;
937  context_guard.set_ctx(__TBB_CONTEXT_ARG1(my_context));
938 #if TBB_USE_EXCEPTIONS
939  try {
940 #endif
941  //TODO: replace dummy tasks for workers as well to avoid using of the_dummy_context
943  d();
944 #if TBB_USE_EXCEPTIONS
945  }
946  catch (...) {
947  context_guard.restore_default(); // TODO: is it needed on Windows?
949  else {
952  exception_container.register_pending_exception();
953  __TBB_ASSERT(exception_container.my_exception, NULL);
954  TbbRethrowException(exception_container.my_exception);
955  }
956  }
957 #endif
958 }
959 
960 // this wait task is a temporary approach to wait for arena emptiness for masters without slots
961 // TODO: it will be rather reworked for one source of notification from is_out_of_work
962 class wait_task : public task {
966  __TBB_ASSERT( s, NULL );
967  __TBB_ASSERT( s->outermost_level(), "The enqueued task can be processed only on outermost level" );
968  if ( s->is_worker() ) {
969  __TBB_ASSERT( s->my_innermost_running_task == this, NULL );
970  // Mimic worker on outermost level to run remaining tasks
971  s->my_innermost_running_task = s->my_dummy_task;
972  s->local_wait_for_all( *s->my_dummy_task, NULL );
973  s->my_innermost_running_task = this;
974  } else s->my_arena->is_out_of_work(); // avoids starvation of internal_wait: issuing this task makes arena full
975  my_signal.V();
976  return NULL;
977  }
978 public:
979  wait_task ( binary_semaphore & sema ) : my_signal(sema) {}
980 };
981 
983  __TBB_ASSERT(my_arena, NULL);
985  __TBB_ASSERT(s, "Scheduler is not initialized");
986  __TBB_ASSERT(s->my_arena != my_arena || s->my_arena_index == 0, "task_arena::wait_until_empty() is not supported within a worker context" );
987  if( s->my_arena == my_arena ) {
988  //unsupported, but try do something for outermost master
989  __TBB_ASSERT(s->master_outermost_level(), "unsupported");
990  if( !s->my_arena_index )
991  while( my_arena->num_workers_active() )
992  s->wait_until_empty();
993  } else for(;;) {
994  while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY ) {
995  if( !__TBB_load_with_acquire(my_arena->my_slots[0].my_scheduler) // TODO TEMP: one master, make more masters
996  && as_atomic(my_arena->my_slots[0].my_scheduler).compare_and_swap(s, NULL) == NULL ) {
998  s->wait_until_empty();
999  } else {
1000  binary_semaphore waiter; // TODO: replace by a single event notification from is_out_of_work
1001  internal_enqueue( *new( task::allocate_root(__TBB_CONTEXT_ARG1(*my_context)) ) wait_task(waiter), 0 ); // TODO: priority?
1002  waiter.P(); // TODO: concurrent_monitor
1003  }
1004  }
1005  if( !my_arena->num_workers_active() && !my_arena->my_slots[0].my_scheduler) // no activity
1006  break; // spin until workers active but avoid spinning in a worker
1007  __TBB_Yield(); // wait until workers and master leave
1008  }
1009 }
1010 
1013  return s? int(s->my_arena_index) : -1;
1014 }
1015 
1016 #if __TBB_TASK_ISOLATION
1017 class isolation_guard : tbb::internal::no_copy {
1018  isolation_tag &guarded;
1019  isolation_tag previous_value;
1020 public:
1021  isolation_guard( isolation_tag &isolation ) : guarded( isolation ), previous_value( isolation ) {}
1022  ~isolation_guard() {
1023  guarded = previous_value;
1024  }
1025 };
1026 
1027 void isolate_within_arena( delegate_base& d, intptr_t reserved ) {
1028  __TBB_ASSERT_EX( reserved == 0, NULL );
1029  // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
1030  generic_scheduler* s = governor::local_scheduler_weak();
1031  __TBB_ASSERT( s, "this_task_arena::isolate() needs an initialized scheduler" );
1032  // Theoretically, we can keep the current isolation in the scheduler; however, it makes sense to store it in innermost
1033  // running task because it can in principle be queried via task::self().
1034  isolation_tag& current_isolation = s->my_innermost_running_task->prefix().isolation;
1035  // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
1036  isolation_guard guard( current_isolation );
1037  current_isolation = reinterpret_cast<isolation_tag>(&d);
1038  d();
1039 }
1040 #endif /* __TBB_TASK_ISOLATION */
1041 
1043  arena* a = NULL;
1044  if( ta ) // for special cases of ta->max_concurrency()
1045  a = ta->my_arena;
1047  a = s->my_arena; // the current arena if any
1048 
1049  if( a ) { // Get parameters from the arena
1050  __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
1052  } else {
1053  __TBB_ASSERT( !ta || ta->my_max_concurrency==automatic, NULL );
1055  }
1056 }
1057 } // tbb::interfaceX::internal
1058 } // tbb::interfaceX
1059 } // tbb
void mimic_outermost_level(arena *a, bool type)
Definition: arena.cpp:660
#define __TBB_ISOLATION_ARG(arg1, isolation)
static void assume_scheduler(generic_scheduler *s)
Temporarily set TLS slot to the given scheduler.
Definition: governor.cpp:116
task_group_context * my_orig_ctx
Definition: arena.cpp:657
void free_task_pool()
Deallocate task pool that was allocated by means of allocate_task_pool.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id tail
intptr_t my_version_and_traits
Special settings.
Definition: task_arena.h:116
bool has_enqueued_tasks()
Check for the presence of enqueued tasks at all priority levels.
Definition: arena.cpp:375
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
market * my_market
The market I am in.
Definition: scheduler.h:155
int my_max_concurrency
Concurrency level for deferred initialization.
Definition: task_arena.h:110
bool outermost
Indicates that a scheduler is on outermost level.
Definition: scheduler.h:53
static const intptr_t num_priority_levels
mail_outbox & mailbox(affinity_id id)
Get reference to mailbox corresponding to given affinity_id.
Definition: arena.h:204
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
void __TBB_EXPORTED_METHOD internal_execute(delegate_base &) const
Definition: arena.cpp:858
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void const char const char int ITT_FORMAT __itt_group_sync s
#define __TBB_CONTEXT_ARG1(context)
bool is_critical(task &t)
Definition: task.h:958
void __TBB_EXPORTED_METHOD internal_attach()
Definition: arena.cpp:776
concurrent_monitor my_exit_monitors
Waiting object for master threads that cannot join the arena.
Definition: arena.h:167
scheduler_properties my_properties
Definition: scheduler.h:91
static arena * create_arena(int num_slots, int num_reserved_slots, size_t stack_size)
Creates an arena object.
Definition: market.cpp:296
static void one_time_init()
Definition: governor.cpp:156
binary_semaphore & my_signal
Definition: arena.cpp:963
Used to form groups of tasks.
Definition: task.h:332
A fast random number generator.
Definition: tbb_misc.h:128
Bit-field representing properties of a sheduler.
Definition: scheduler.h:46
static arena & allocate_arena(market &, unsigned num_slots, unsigned num_reserved_slots)
Allocate an instance of arena.
Definition: arena.cpp:238
unsigned my_master_slots
Reserved master slots.
Definition: task_arena.h:113
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
void __TBB_EXPORTED_METHOD register_pending_exception()
Records the pending exception, and cancels the task group.
static bool occupy_slot(generic_scheduler *&slot, generic_scheduler &s)
Definition: arena.cpp:67
static generic_scheduler * local_scheduler_weak()
Definition: governor.h:127
T1 atomic_update(tbb::atomic< T1 > &dst, T2 newValue, Pred compare)
Atomically replaces value of dst with newValue if they satisfy condition of compare predicate.
Definition: tbb_misc.h:179
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:331
task * parent() const
task on whose behalf this task is working, or NULL if this is a root.
Definition: task.h:835
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void __TBB_EXPORTED_METHOD internal_terminate()
Definition: arena.cpp:765
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:216
__TBB_atomic reference_count ref_count
Reference count used for synchronization.
Definition: task.h:248
#define __TBB_override
Definition: tbb_stddef.h:240
generic_scheduler & my_scheduler
Definition: arena.cpp:655
wait_task(binary_semaphore &sema)
Definition: arena.cpp:979
void __TBB_EXPORTED_METHOD internal_initialize()
Definition: arena.cpp:732
unsigned my_num_reserved_slots
The number of reserved slots (can be occupied only by masters).
Definition: arena.h:152
task is in ready pool, or is going to be put there, or was just taken off.
Definition: task.h:615
unsigned my_max_num_workers
The number of workers requested by the master thread owning the arena.
Definition: arena.h:89
Base class for user-defined tasks.
Definition: task.h:589
Work stealing task scheduler.
Definition: scheduler.h:120
intptr_t reference_count
A reference count.
Definition: task.h:117
void __TBB_EXPORTED_FUNC NFS_Free(void *)
Free memory allocated by NFS_Allocate.
The graph class.
void construct()
Construct *this as a mailbox from zeroed memory.
Definition: mailbox.h:158
intptr_t isolation_tag
A tag for task isolation.
Definition: task.h:124
unsigned my_num_slots
The number of slots in the arena.
Definition: arena.h:149
uintptr_t my_arenas_aba_epoch
ABA prevention marker to assign to newly created arenas.
Definition: market.h:143
cpu_ctl_env my_cpu_ctl_env
FPU control settings of arena's master thread captured at the moment of arena instantiation.
Definition: arena.h:138
uintptr_t pool_state_t
Definition: arena.h:214
void notify(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate.
Set if ref_count might be changed by another thread. Used for debugging.
bool is_idle_state(bool value) const
Indicate whether thread that reads this mailbox is idle.
Definition: mailbox.h:218
Class representing where mail is put.
Definition: mailbox.h:96
static int unsigned num_arena_slots(unsigned num_slots)
Definition: arena.h:195
tbb::atomic< uintptr_t > my_pool_state
Current task pool state and estimate of available tasks amount.
Definition: arena.h:99
task_group_context * my_context
default context of the arena
Definition: task_arena.h:106
virtual void local_wait_for_all(task &parent, task *child)=0
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:116
binary_semaphore for concurrent monitor
Definition: semaphore.h:222
void notify_one()
Notify one thread about the event.
static const pool_state_t SNAPSHOT_FULL
At least one task has been offered for stealing since the last snapshot started.
Definition: arena.h:220
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d int
void on_thread_leaving()
Notification that worker or master leaves its arena.
Definition: arena.h:300
int my_num_workers_requested
The number of workers that are currently requested from the resource manager.
Definition: arena.h:92
unsigned num_workers_active()
The number of workers active in the arena.
Definition: arena.h:233
Smart holder for the empty task class with automatic destruction.
void push(task *source, int level, FastRandom &random)
Push a task into a lane.
Definition: task_stream.h:101
void set_is_idle(bool value)
Indicate whether thread that reads this mailbox is idle.
Definition: mailbox.h:211
#define GATHER_STATISTIC(x)
static int __TBB_EXPORTED_FUNC internal_max_concurrency(const task_arena *)
Definition: arena.cpp:1042
const isolation_tag no_isolation
Definition: task.h:125
void restore_priority_if_need()
If enqueued tasks found, restore arena priority and task presence status.
Definition: arena.cpp:383
internal::tbb_exception_ptr exception_container_type
Definition: task.h:341
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
Definition: governor.cpp:120
atomic< unsigned > my_references
Reference counter for the arena.
Definition: arena.h:57
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
#define ITT_SYNC_CREATE(obj, type, name)
Definition: itt_notify.h:119
void nested_arena_entry(arena *, size_t)
Definition: arena.cpp:675
static const int priority_critical
Definition: task.h:287
arena * my_arena
The arena that I own (if master) or am servicing at the moment (if worker)
Definition: scheduler.h:74
exception_container_type * my_exception
Pointer to the container storing exception being propagated across this task group.
Definition: task.h:423
task * execute() __TBB_override
Should be overridden by derived classes.
Definition: arena.cpp:813
uintptr_t my_aba_epoch
ABA prevention marker.
Definition: arena.h:134
void copy_fp_settings(const task_group_context &src)
Copies FPU control setting from another context.
static unsigned default_num_threads()
Definition: governor.h:81
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id head
void __TBB_EXPORTED_FUNC isolate_within_arena(delegate_base &d, intptr_t reserved=0)
void process(generic_scheduler &)
Registers the worker with the arena and enters TBB scheduler dispatch loop.
Definition: arena.cpp:102
bool operator()(uintptr_t ctx) const
Definition: arena.cpp:855
generic_scheduler * my_scheduler
Scheduler of the thread attached to the slot.
bool is_worker() const
True if running on a worker thread, false otherwise.
Definition: scheduler.h:591
static market & global_market(bool is_public, unsigned max_num_workers=0, size_t stack_size=0)
Factory method creating new market object.
Definition: market.cpp:96
intptr_t drain()
Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.
Definition: task_stream.h:145
void make_critical(task &t)
Definition: task.h:957
delegated_task(internal::delegate_base &d, concurrent_monitor &s, task *t)
Definition: arena.cpp:852
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:391
void adjust_demand(arena &, int delta)
Request that arena's need in workers should be adjusted.
Definition: market.cpp:586
unsigned short affinity_id
An id as used for specifying affinity.
Definition: task.h:120
T __TBB_load_relaxed(const volatile T &location)
Definition: tbb_machine.h:735
arena_slot * my_arena_slot
Pointer to the slot in the arena we own at the moment.
Definition: scheduler.h:71
void initialize(unsigned n_lanes)
Definition: task_stream.h:83
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
static const size_t out_of_arena
Definition: arena.h:288
size_t occupy_free_slot(generic_scheduler &s)
Tries to occupy a slot in the arena. On success, returns the slot index; if no slot is available,...
Definition: arena.cpp:86
#define __TBB_CONTEXT_ARG(arg1, context)
static int allocation_size(unsigned num_slots)
Definition: arena.h:199
arena(market &, unsigned max_num_workers, unsigned num_reserved_slots)
Constructor.
Definition: arena.cpp:186
atomic< T > & as_atomic(T &t)
Definition: atomic.h:543
int ref_count() const
The internal reference count.
Definition: task.h:867
#define __TBB_Yield()
Definition: ibm_aix51.h:44
bool type
Indicates that a scheduler acts as a master or a worker.
Definition: scheduler.h:50
internal::delegate_base & my_delegate
Definition: arena.cpp:810
task * execute() __TBB_override
Should be overridden by derived classes.
Definition: arena.cpp:964
static const unsigned ref_external
Reference increment values for externals and workers.
Definition: arena.h:226
static const pool_state_t SNAPSHOT_EMPTY
No tasks to steal since last snapshot was taken.
Definition: arena.h:217
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d __itt_event ITT_FORMAT __itt_group_mark d void const wchar_t const wchar_t int ITT_FORMAT __itt_group_sync __itt_group_fsync x void const wchar_t int const wchar_t int int ITT_FORMAT __itt_group_sync __itt_group_fsync x void ITT_FORMAT __itt_group_sync __itt_group_fsync p void ITT_FORMAT __itt_group_sync __itt_group_fsync p void size_t ITT_FORMAT lu no args __itt_obj_prop_t __itt_obj_state_t ITT_FORMAT d const char ITT_FORMAT s __itt_frame ITT_FORMAT p const char const char ITT_FORMAT s __itt_counter ITT_FORMAT p __itt_counter unsigned long long ITT_FORMAT lu const wchar_t ITT_FORMAT S __itt_mark_type const wchar_t ITT_FORMAT S __itt_mark_type const char ITT_FORMAT s __itt_mark_type ITT_FORMAT d __itt_caller ITT_FORMAT p __itt_caller ITT_FORMAT p no args const __itt_domain __itt_clock_domain unsigned long long __itt_id ITT_FORMAT lu const __itt_domain __itt_clock_domain unsigned long long __itt_id __itt_id void ITT_FORMAT p const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_clock_domain unsigned long long __itt_id __itt_string_handle __itt_scope scope
nested_arena_context(generic_scheduler *s, arena *a, size_t slot_index, bool type, bool same)
Definition: arena.cpp:625
internal::arena * my_arena
NULL if not currently initialized.
Definition: task_arena.h:102
unsigned hint_for_pop
Hint provided for operations with the container of starvation-resistant tasks.
task * my_dummy_task
Fake root task created by slave threads.
Definition: scheduler.h:169
void const char const char int ITT_FORMAT __itt_group_sync p
uintptr_t my_version_and_traits
Version for run-time checks and behavioral traits of the context.
Definition: task.h:420
unsigned my_num_workers_allotted
The number of workers that have been marked out by the resource manager to service the arena.
Definition: arena.h:51
void free_arena()
Completes arena shutdown, destructs and deallocates it.
Definition: arena.cpp:249
void enqueue_task(task &, intptr_t, FastRandom &)
enqueue a task into starvation-resistance queue
Definition: arena.cpp:554
void __TBB_EXPORTED_METHOD internal_wait() const
Definition: arena.cpp:982
void attach_mailbox(affinity_id id)
Definition: scheduler.h:585
size_t occupy_free_slot_in_range(generic_scheduler &s, size_t lower, size_t upper)
Tries to occupy a slot in the specified range.
Definition: arena.cpp:71
priority_t
Definition: task.h:291
void set_ctx(__TBB_CONTEXT_ARG1(task_group_context *))
Definition: scheduler.h:807
void __TBB_EXPORTED_METHOD internal_enqueue(task &, intptr_t) const
Definition: arena.cpp:797
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
#define __TBB_ENQUEUE_ENFORCED_CONCURRENCY
Definition: tbb_config.h:577
bool empty(int level)
Checks existence of a task.
Definition: task_stream.h:138
internal::task_prefix & prefix(internal::version_tag *=NULL) const
Get reference to corresponding task_prefix.
Definition: task.h:946
arena_slot my_slots[1]
Definition: arena.h:296
static const int automatic
Typedef for number of threads that is automatic.
Definition: task_arena.h:146
market * my_market
The market that owns this arena.
Definition: arena.h:131
bool is_out_of_work()
Check if there is job anywhere in arena.
Definition: arena.cpp:403
task object is freshly allocated or recycled.
Definition: task.h:617
bool release(bool is_public, bool blocking_terminate)
Decrements market's refcount and destroys it in the end.
Definition: market.cpp:175
static int __TBB_EXPORTED_FUNC internal_current_slot()
Definition: arena.cpp:1011
T __TBB_load_with_acquire(const volatile T &location)
Definition: tbb_machine.h:709
task_stream< num_priority_levels > my_task_stream
Task pool for the tasks scheduled via task::enqueue() method.
Definition: arena.h:76
intptr_t drain()
Drain the mailbox.
Definition: mailbox.h:168
#define EmptyTaskPool
Definition: scheduler.h:42
void attach_arena(arena *, size_t index, bool is_master)
Definition: arena.cpp:36
value_type compare_and_swap(value_type value, value_type comparand)
Definition: atomic.h:285
atomic< unsigned > my_limit
The maximal number of currently busy slots.
Definition: arena.h:65
static generic_scheduler * local_scheduler_if_initialized()
Definition: governor.h:132
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:633
state_type state() const
Current execution state.
Definition: task.h:864
size_t my_arena_index
Index of the arena slot the scheduler occupies now, or occupied last time.
Definition: scheduler.h:68
#define poison_value(g)
task * my_innermost_running_task
Innermost task whose task::execute() is running. A dummy task on the outermost level.
Definition: scheduler.h:77

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.