Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
market.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #include "tbb/tbb_stddef.h"
18 #include "tbb/global_control.h" // global_control::active_value
19 
20 #include "market.h"
21 #include "tbb_main.h"
22 #include "governor.h"
23 #include "scheduler.h"
24 #include "itt_notify.h"
25 
26 namespace tbb {
27 namespace internal {
28 
30 #if __TBB_TASK_PRIORITY
31  arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
32  arena *&next = my_priority_levels[a.my_top_priority].next_arena;
33 #else /* !__TBB_TASK_PRIORITY */
34  arena_list_type &arenas = my_arenas;
35  arena *&next = my_next_arena;
36 #endif /* !__TBB_TASK_PRIORITY */
37  arenas.push_front( a );
38  if ( arenas.size() == 1 )
39  next = &*arenas.begin();
40 }
41 
43 #if __TBB_TASK_PRIORITY
44  arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
45  arena *&next = my_priority_levels[a.my_top_priority].next_arena;
46 #else /* !__TBB_TASK_PRIORITY */
47  arena_list_type &arenas = my_arenas;
48  arena *&next = my_next_arena;
49 #endif /* !__TBB_TASK_PRIORITY */
50  arena_list_type::iterator it = next;
51  __TBB_ASSERT( it != arenas.end(), NULL );
52  if ( next == &a ) {
53  if ( ++it == arenas.end() && arenas.size() > 1 )
54  it = arenas.begin();
55  next = &*it;
56  }
57  arenas.remove( a );
58 }
59 
60 //------------------------------------------------------------------------
61 // market
62 //------------------------------------------------------------------------
63 
64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size )
65  : my_num_workers_hard_limit(workers_hard_limit)
66  , my_num_workers_soft_limit(workers_soft_limit)
68  , my_global_top_priority(normalized_normal_priority)
69  , my_global_bottom_priority(normalized_normal_priority)
70 #endif /* __TBB_TASK_PRIORITY */
71  , my_ref_count(1)
72  , my_stack_size(stack_size)
73  , my_workers_soft_limit_to_report(workers_soft_limit)
74 {
75 #if __TBB_TASK_PRIORITY
76  __TBB_ASSERT( my_global_reload_epoch == 0, NULL );
77  my_priority_levels[normalized_normal_priority].workers_available = my_num_workers_soft_limit;
78 #endif /* __TBB_TASK_PRIORITY */
79 
80  // Once created RML server will start initializing workers that will need
81  // global market instance to get worker stack size
83  __TBB_ASSERT( my_server, "Failed to create RML server" );
84 }
85 
86 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
87  if( int soft_limit = market::app_parallelism_limit() )
88  workers_soft_limit = soft_limit-1;
89  else // if user set no limits (yet), use market's parameter
90  workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
91  if( workers_soft_limit >= workers_hard_limit )
92  workers_soft_limit = workers_hard_limit-1;
93  return workers_soft_limit;
94 }
95 
96 market& market::global_market ( bool is_public, unsigned workers_requested, size_t stack_size ) {
97  global_market_mutex_type::scoped_lock lock( theMarketMutex );
98  market *m = theMarket;
99  if( m ) {
100  ++m->my_ref_count;
101  const unsigned old_public_count = is_public? m->my_public_ref_count++ : /*any non-zero value*/1;
102  lock.release();
103  if( old_public_count==0 )
105 
106  // do not warn if default number of workers is requested
107  if( workers_requested != governor::default_num_threads()-1 ) {
108  __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
109  "skip_soft_limit_warning must be larger than any valid workers_requested" );
110  unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report;
111  if( soft_limit_to_report < workers_requested ) {
112  runtime_warning( "The number of workers is currently limited to %u. "
113  "The request for %u workers is ignored. Further requests for more workers "
114  "will be silently ignored until the limit changes.\n",
115  soft_limit_to_report, workers_requested );
116  // The race is possible when multiple threads report warnings.
117  // We are OK with that, as there are just multiple warnings.
119  compare_and_swap(skip_soft_limit_warning, soft_limit_to_report);
120  }
121 
122  }
123  if( m->my_stack_size < stack_size )
124  runtime_warning( "Thread stack size has been already set to %u. "
125  "The request for larger stack (%u) cannot be satisfied.\n",
126  m->my_stack_size, stack_size );
127  }
128  else {
129  // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
130  if( stack_size == 0 )
132  // Expecting that 4P is suitable for most applications.
133  // Limit to 2P for large thread number.
134  // TODO: ask RML for max concurrency and possibly correct hard_limit
135  const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
136  // The requested number of threads is intentionally not considered in
137  // computation of the hard limit, in order to separate responsibilities
138  // and avoid complicated interactions between global_control and task_scheduler_init.
139  // The market guarantees that at least 256 threads might be created.
140  const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
141  const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
142  // Create the global market instance
143  size_t size = sizeof(market);
144 #if __TBB_TASK_GROUP_CONTEXT
145  __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(generic_scheduler*) == sizeof(market),
146  "my_workers must be the last data field of the market class");
147  size += sizeof(generic_scheduler*) * (workers_hard_limit - 1);
148 #endif /* __TBB_TASK_GROUP_CONTEXT */
150  void* storage = NFS_Allocate(1, size, NULL);
151  memset( storage, 0, size );
152  // Initialize and publish global market
153  m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
154  if( is_public )
155  m->my_public_ref_count = 1;
156  theMarket = m;
157  // This check relies on the fact that for shared RML default_concurrency==max_concurrency
158  if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
159  runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
160  , m->my_server->default_concurrency(), workers_soft_limit );
161  }
162  return *m;
163 }
164 
166 #if __TBB_COUNT_TASK_NODES
167  if ( my_task_node_count )
168  runtime_warning( "Leaked %ld task objects\n", (long)my_task_node_count );
169 #endif /* __TBB_COUNT_TASK_NODES */
170  this->market::~market(); // qualified to suppress warning
171  NFS_Free( this );
173 }
174 
175 bool market::release ( bool is_public, bool blocking_terminate ) {
176  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
177  bool do_release = false;
178  {
179  global_market_mutex_type::scoped_lock lock( theMarketMutex );
180  if ( blocking_terminate ) {
181  __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
182  while ( my_public_ref_count == 1 && my_ref_count > 1 ) {
183  lock.release();
184  // To guarantee that request_close_connection() is called by the last master, we need to wait till all
185  // references are released. Re-read my_public_ref_count to limit waiting if new masters are created.
186  // Theoretically, new private references to the market can be added during waiting making it potentially
187  // endless.
188  // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
189  // Note that the market should know about its schedulers for cancelation/exception/priority propagation,
190  // see e.g. task_group_context::cancel_group_execution()
192  __TBB_Yield();
193  lock.acquire( theMarketMutex );
194  }
195  }
196  if ( is_public ) {
197  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
200  }
201  if ( --my_ref_count == 0 ) {
203  do_release = true;
204  theMarket = NULL;
205  }
206  }
207  if( do_release ) {
208  __TBB_ASSERT( !__TBB_load_with_acquire(my_public_ref_count), "No public references remain if we remove the market." );
209  // inform RML that blocking termination is required
210  my_join_workers = blocking_terminate;
211  my_server->request_close_connection();
212  return blocking_terminate;
213  }
214  return false;
215 }
216 
217 void market::set_active_num_workers ( unsigned soft_limit ) {
218  int old_requested=0, requested=0;
219  bool need_mandatory = false;
220  market *m;
221 
222  {
223  global_market_mutex_type::scoped_lock lock( theMarketMutex );
224  if ( !theMarket )
225  return; // actual value will be used at market creation
226  m = theMarket;
227  ++m->my_ref_count;
228  }
229  // have my_ref_count for market, use it safely
230  {
232  __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL);
233  m->my_num_workers_soft_limit = soft_limit;
234  // report only once after new soft limit value is set
235  m->my_workers_soft_limit_to_report = soft_limit;
236 
237 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
238  // updates soft_limit to zero must be postponed
239  // while mandatory parallelism is enabled
240  if( !(m->my_mandatory_num_requested && !soft_limit) )
241 #endif
242  {
243  const int demand =
244 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
245  m->my_mandatory_num_requested? 0 :
246 #endif
247  m->my_total_demand;
248  requested = min(demand, (int)soft_limit);
249  old_requested = m->my_num_workers_requested;
250  m->my_num_workers_requested = requested;
251 #if __TBB_TASK_PRIORITY
252  m->my_priority_levels[m->my_global_top_priority].workers_available = soft_limit;
253  m->update_allotment( m->my_global_top_priority );
254 #else
255  m->update_allotment();
256 #endif
257  }
258 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
259  if( !m->my_mandatory_num_requested && !soft_limit ) {
260  // enable mandatory concurrency, if enqueued tasks are found
261  // and zero soft_limit requested
262 #if __TBB_TASK_PRIORITY
263  for( int p = m->my_global_top_priority; p >= m->my_global_bottom_priority; --p ) {
264  priority_level_info &pl = m->my_priority_levels[p];
265  arena_list_type &arenas = pl.arenas;
266 #else
267  const int p = 0;
268  arena_list_type &arenas = m->my_arenas;
269 #endif /* __TBB_TASK_PRIORITY */
270  for( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it ) {
271  if( !it->my_task_stream.empty(p) ) {
272  // switch local_mandatory to global_mandatory unconditionally
273  if( m->mandatory_concurrency_enable_impl( &*it ) )
274  need_mandatory = true;
275  }
276  }
277 #if __TBB_TASK_PRIORITY
278  }
279 #endif /* __TBB_TASK_PRIORITY */
280  }
281 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
282  }
283  // adjust_job_count_estimate must be called outside of any locks
284  int delta = requested - old_requested;
285  if( need_mandatory ) ++delta;
286  if( delta!=0 )
287  m->my_server->adjust_job_count_estimate( delta );
288  // release internal market reference to match ++m->my_ref_count above
289  m->release( /*is_public=*/false, /*blocking_terminate=*/false );
290 }
291 
292 bool governor::does_client_join_workers (const tbb::internal::rml::tbb_client &client) {
293  return ((const market&)client).must_join_workers();
294 }
295 
296 arena* market::create_arena ( int num_slots, int num_reserved_slots, size_t stack_size ) {
297  __TBB_ASSERT( num_slots > 0, NULL );
298  __TBB_ASSERT( num_reserved_slots <= num_slots, NULL );
299  // Add public market reference for master thread/task_arena (that adds an internal reference in exchange).
300  market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
301 
302  arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots );
303  // Add newly created arena into the existing market's list.
306  return &a;
307 }
308 
311  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
312  __TBB_ASSERT( !a.my_slots[0].my_scheduler, NULL );
316 }
317 
318 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch ) {
319  bool locked = true;
320  __TBB_ASSERT( a, NULL );
321  // we hold reference to the market, so it cannot be destroyed at any moment here
322  __TBB_ASSERT( this == theMarket, NULL );
323  __TBB_ASSERT( my_ref_count!=0, NULL );
326 #if __TBB_TASK_PRIORITY
327  // scan all priority levels, not only in [my_global_bottom_priority;my_global_top_priority]
328  // range, because arena to be destroyed can have no outstanding request for workers
329  for ( int p = num_priority_levels-1; p >= 0; --p ) {
330  priority_level_info &pl = my_priority_levels[p];
331  arena_list_type &my_arenas = pl.arenas;
332 #endif /* __TBB_TASK_PRIORITY */
333  arena_list_type::iterator it = my_arenas.begin();
334  for ( ; it != my_arenas.end(); ++it ) {
335  if ( a == &*it ) {
336  if ( it->my_aba_epoch == aba_epoch ) {
337  // Arena is alive
338  if ( !a->my_num_workers_requested && !a->my_references ) {
339  __TBB_ASSERT( !a->my_num_workers_allotted && (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers), "Inconsistent arena state" );
340  // Arena is abandoned. Destroy it.
341  detach_arena( *a );
343  locked = false;
344  a->free_arena();
345  }
346  }
347  if (locked)
349  return;
350  }
351  }
352 #if __TBB_TASK_PRIORITY
353  }
354 #endif /* __TBB_TASK_PRIORITY */
356 }
357 
360  if ( arenas.empty() )
361  return NULL;
362  arena_list_type::iterator it = hint;
363  __TBB_ASSERT( it != arenas.end(), NULL );
364  do {
365  arena& a = *it;
366  if ( ++it == arenas.end() )
367  it = arenas.begin();
369 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
370  && !a.recall_by_mandatory_request()
371 #endif
372  ) {
374  return &a;
375  }
376  } while ( it != hint );
377  return NULL;
378 }
379 
380 int market::update_allotment ( arena_list_type& arenas, int workers_demand, int max_workers ) {
381  __TBB_ASSERT( workers_demand, NULL );
382  max_workers = min(workers_demand, max_workers);
383  int carry = 0;
384  int assigned = 0;
385  arena_list_type::iterator it = arenas.begin();
386  for ( ; it != arenas.end(); ++it ) {
387  arena& a = *it;
388  if ( a.my_num_workers_requested <= 0 ) {
390  continue;
391  }
392  int tmp = a.my_num_workers_requested * max_workers + carry;
393  int allotted = tmp / workers_demand;
394  carry = tmp % workers_demand;
395  // a.my_num_workers_requested may temporarily exceed a.my_max_num_workers
396  allotted = min( allotted, (int)a.my_max_num_workers );
397 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
398  if ( !allotted && a.must_have_concurrency() )
399  allotted = 1;
400 #endif
401  a.my_num_workers_allotted = allotted;
402  assigned += allotted;
403  }
404 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
405  __TBB_ASSERT( assigned <= workers_demand, NULL ); // weaker assertion due to enforced allotment
406 #else
407  __TBB_ASSERT( assigned <= max_workers, NULL );
408 #endif
409  return assigned;
410 }
411 
414  if ( a ) {
415  for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
416  if ( a == &*it )
417  return true;
418  }
419  return false;
420 }
421 
422 #if __TBB_TASK_PRIORITY
423 inline void market::update_global_top_priority ( intptr_t newPriority ) {
424  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.market_prio_switches );
425  my_global_top_priority = newPriority;
426  my_priority_levels[newPriority].workers_available =
427 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
428  my_mandatory_num_requested && !my_num_workers_soft_limit ? 1 :
429 #endif
431  advance_global_reload_epoch();
432 }
433 
434 inline void market::reset_global_priority () {
435  my_global_bottom_priority = normalized_normal_priority;
436  update_global_top_priority(normalized_normal_priority);
437 }
438 
439 arena* market::arena_in_need ( arena* prev_arena ) {
440  if( as_atomic(my_total_demand) <= 0 )
441  return NULL;
442  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false);
444  int p = my_global_top_priority;
445  arena *a = NULL;
446 
447  // Checks if arena is alive or not
448  if ( is_arena_in_list( my_priority_levels[p].arenas, prev_arena ) ) {
449  a = arena_in_need( my_priority_levels[p].arenas, prev_arena );
450  }
451 
452  while ( !a && p >= my_global_bottom_priority ) {
453  priority_level_info &pl = my_priority_levels[p--];
454  a = arena_in_need( pl.arenas, pl.next_arena );
455  if ( a ) {
456  as_atomic(pl.next_arena) = a; // a subject for innocent data race under the reader lock
457  // TODO: rework global round robin policy to local or random to avoid this write
458  }
459  // TODO: When refactoring task priority code, take into consideration the
460  // __TBB_TRACK_PRIORITY_LEVEL_SATURATION sections from earlier versions of TBB
461  }
462  return a;
463 }
464 
465 void market::update_allotment ( intptr_t highest_affected_priority ) {
466  intptr_t i = highest_affected_priority;
467  int available = my_priority_levels[i].workers_available;
468  for ( ; i >= my_global_bottom_priority; --i ) {
469  priority_level_info &pl = my_priority_levels[i];
470  pl.workers_available = available;
471  if ( pl.workers_requested ) {
472  available -= update_allotment( pl.arenas, pl.workers_requested, available );
473  if ( available < 0 ) { // TODO: assertion?
474  available = 0;
475  break;
476  }
477  }
478  }
479  __TBB_ASSERT( i <= my_global_bottom_priority || !available, NULL );
480  for ( --i; i >= my_global_bottom_priority; --i ) {
481  priority_level_info &pl = my_priority_levels[i];
482  pl.workers_available = 0;
483  arena_list_type::iterator it = pl.arenas.begin();
484  for ( ; it != pl.arenas.end(); ++it ) {
485  __TBB_ASSERT( it->my_num_workers_requested >= 0 || !it->my_num_workers_allotted, NULL );
486 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
487  it->my_num_workers_allotted = it->must_have_concurrency() ? 1 : 0;
488 #else
489  it->my_num_workers_allotted = 0;
490 #endif
491  }
492  }
493 }
494 #endif /* __TBB_TASK_PRIORITY */
495 
496 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
497 bool market::mandatory_concurrency_enable_impl ( arena *a, bool *enabled ) {
498  if( a->my_concurrency_mode==arena_base::cm_enforced_global ) {
499  if( enabled )
500  *enabled = false;
501  return false;
502  }
503  if( enabled )
504  *enabled = true;
505  a->my_max_num_workers = 1;
506  a->my_concurrency_mode = arena_base::cm_enforced_global;
507 #if __TBB_TASK_PRIORITY
508  priority_level_info &pl = my_priority_levels[a->my_top_priority];
509  pl.workers_requested++;
510  if( my_global_top_priority < a->my_top_priority ) {
511  my_global_top_priority = a->my_top_priority;
512  advance_global_reload_epoch();
513  }
514 #endif
515  a->my_num_workers_requested++;
516  a->my_num_workers_allotted++;
517  if( 1 == ++my_mandatory_num_requested ) {
518  my_total_demand++;
519  return true;
520  }
521  return false;
522 }
523 
524 bool market::mandatory_concurrency_enable ( arena *a ) {
525  bool add_thread;
526  bool enabled;
527  {
528  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
529  add_thread = mandatory_concurrency_enable_impl(a, &enabled);
530  }
531  if( add_thread )
532  my_server->adjust_job_count_estimate( 1 );
533  return enabled;
534 }
535 
536 void market::mandatory_concurrency_disable ( arena *a ) {
537  bool remove_thread = false;
538  int delta_adjust_demand = 0;
539 
540  {
541  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
542 
543  if( a->my_concurrency_mode!=arena_base::cm_enforced_global )
544  return;
545  __TBB_ASSERT( a->my_max_num_workers==1, NULL );
546  a->my_max_num_workers = 0;
547 #if __TBB_TASK_PRIORITY
548  if ( a->my_top_priority != normalized_normal_priority ) {
549  update_arena_top_priority( *a, normalized_normal_priority );
550  }
551  a->my_bottom_priority = normalized_normal_priority;
552 #endif
553 
554  int val = --my_mandatory_num_requested;
555  __TBB_ASSERT_EX( val >= 0, NULL );
556  if( val == 0 ) {
557  my_total_demand--;
558  remove_thread = true;
559  }
560  a->my_num_workers_requested--;
561  if (a->my_num_workers_requested > 0)
562  delta_adjust_demand = a->my_num_workers_requested;
563  else
564  a->my_num_workers_allotted = 0;
565 
566 #if __TBB_TASK_PRIORITY
567  priority_level_info &pl = my_priority_levels[a->my_top_priority];
568  pl.workers_requested--;
569  intptr_t p = my_global_top_priority;
570  for (; !my_priority_levels[p].workers_requested && p>0; p--)
571  ;
572  if( !p )
573  reset_global_priority();
574  else if( p!= my_global_top_priority )
575  update_global_top_priority(p);
576 #endif
577  a->my_concurrency_mode = arena::cm_normal;
578  }
579  if( delta_adjust_demand )
580  adjust_demand( *a, -delta_adjust_demand );
581  if( remove_thread )
582  my_server->adjust_job_count_estimate( -1 );
583 }
584 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
585 
586 void market::adjust_demand ( arena& a, int delta ) {
587  __TBB_ASSERT( theMarket, "market instance was destroyed prematurely?" );
588  if ( !delta )
589  return;
591  int prev_req = a.my_num_workers_requested;
592  a.my_num_workers_requested += delta;
593  if ( a.my_num_workers_requested <= 0 ) {
594 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
595  // must not recall worker from arena with mandatory parallelism
596  if ( a.my_market->my_mandatory_num_requested && a.my_concurrency_mode!=arena_base::cm_normal )
598  else
599 #endif
601  if ( prev_req <= 0 ) {
603  return;
604  }
605  delta = -prev_req;
606  }
607  else if ( prev_req < 0 ) {
608  delta = a.my_num_workers_requested;
609  }
610  my_total_demand += delta;
611 #if !__TBB_TASK_PRIORITY
613 #else /* !__TBB_TASK_PRIORITY */
614  intptr_t p = a.my_top_priority;
615  priority_level_info &pl = my_priority_levels[p];
616  pl.workers_requested += delta;
617  __TBB_ASSERT( pl.workers_requested >= 0, NULL );
618  if ( a.my_num_workers_requested <= 0 ) {
619  if ( a.my_top_priority != normalized_normal_priority ) {
620  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_resets );
621  update_arena_top_priority( a, normalized_normal_priority );
622  }
623  a.my_bottom_priority = normalized_normal_priority;
624  }
625  if ( p == my_global_top_priority ) {
626  if ( !pl.workers_requested ) {
627  while ( --p >= my_global_bottom_priority && !my_priority_levels[p].workers_requested )
628  continue;
629  if ( p < my_global_bottom_priority )
630  reset_global_priority();
631  else
632  update_global_top_priority(p);
633  }
634  update_allotment( my_global_top_priority );
635  }
636  else if ( p > my_global_top_priority ) {
637  __TBB_ASSERT( pl.workers_requested > 0, NULL );
638  // TODO: investigate if the following invariant is always valid
639  __TBB_ASSERT( a.my_num_workers_requested >= 0, NULL );
640  update_global_top_priority(p);
642 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
643  // must not recall worker from arena with mandatory parallelism
645  && a.my_market->my_mandatory_num_requested && a.my_concurrency_mode!=arena_base::cm_normal )
647 #endif
648  my_priority_levels[p - 1].workers_available = my_num_workers_soft_limit - a.my_num_workers_allotted;
649  update_allotment( p - 1 );
650  }
651  else if ( p == my_global_bottom_priority ) {
652  if ( !pl.workers_requested ) {
653  while ( ++p <= my_global_top_priority && !my_priority_levels[p].workers_requested )
654  continue;
655  if ( p > my_global_top_priority )
656  reset_global_priority();
657  else
658  my_global_bottom_priority = p;
659  }
660  else
661  update_allotment( p );
662  }
663  else if ( p < my_global_bottom_priority ) {
664  int prev_bottom = my_global_bottom_priority;
665  my_global_bottom_priority = p;
666  update_allotment( prev_bottom );
667  }
668  else {
669  __TBB_ASSERT( my_global_bottom_priority < p && p < my_global_top_priority, NULL );
670  update_allotment( p );
671  }
672  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested<=0, NULL );
674 #endif /* !__TBB_TASK_PRIORITY */
675  if ( delta > 0 ) {
676  // can't overflow soft_limit, but remember values request by arenas in
677  // my_total_demand to not prematurely release workers to RML
680  } else {
681  // the number of workers should not be decreased below my_total_demand
684  }
685  my_num_workers_requested += delta;
687 
689  // Must be called outside of any locks
690  my_server->adjust_job_count_estimate( delta );
692 }
693 
694 void market::process( job& j ) {
695  generic_scheduler& s = static_cast<generic_scheduler&>(j);
696  // s.my_arena can be dead. Don't access it until arena_in_need is called
697  arena *a = s.my_arena;
698  __TBB_ASSERT( governor::is_set(&s), NULL );
699 
700  for (int i = 0; i < 2; ++i) {
701  while ( (a = arena_in_need(a)) ) {
702  a->process(s);
703  a = NULL; // to avoid double checks in arena_in_need(arena*) for the same priority level
704  }
705  // Workers leave market because there is no arena in need. It can happen earlier than
706  // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
707  // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
708  // the yield refines this spinning.
709  if ( !i )
710  __TBB_Yield();
711  }
712 
713  GATHER_STATISTIC( ++s.my_counters.market_roundtrips );
714 }
715 
716 void market::cleanup( job& j ) {
717  __TBB_ASSERT( theMarket != this, NULL );
718  generic_scheduler& s = static_cast<generic_scheduler&>(j);
720  __TBB_ASSERT( !mine || mine->is_worker(), NULL );
721  if( mine!=&s ) {
723  generic_scheduler::cleanup_worker( &s, mine!=NULL );
725  } else {
727  }
728 }
729 
731  destroy();
732 }
733 
734 ::rml::job* market::create_one_job() {
735  unsigned index = ++my_first_unused_worker_idx;
736  __TBB_ASSERT( index > 0, NULL );
737  ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
738  // index serves as a hint decreasing conflicts between workers when they migrate between arenas
740 #if __TBB_TASK_GROUP_CONTEXT
741  __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL );
742  __TBB_ASSERT( !my_workers[index - 1], NULL );
743  my_workers[index - 1] = s;
744 #endif /* __TBB_TASK_GROUP_CONTEXT */
745  return s;
746 }
747 
748 #if __TBB_TASK_PRIORITY
749 void market::update_arena_top_priority ( arena& a, intptr_t new_priority ) {
750  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_switches );
751  __TBB_ASSERT( a.my_top_priority != new_priority, NULL );
752  priority_level_info &prev_level = my_priority_levels[a.my_top_priority],
753  &new_level = my_priority_levels[new_priority];
755  a.my_top_priority = new_priority;
757  as_atomic( a.my_reload_epoch ).fetch_and_increment<tbb::release>(); // TODO: synch with global reload epoch in order to optimize usage of local reload epoch
758  prev_level.workers_requested -= a.my_num_workers_requested;
759  new_level.workers_requested += a.my_num_workers_requested;
760  __TBB_ASSERT( prev_level.workers_requested >= 0 && new_level.workers_requested >= 0, NULL );
761 }
762 
763 bool market::lower_arena_priority ( arena& a, intptr_t new_priority, uintptr_t old_reload_epoch ) {
764  // TODO: replace the lock with a try_lock loop which performs a double check of the epoch
765  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
766  if ( a.my_reload_epoch != old_reload_epoch ) {
768  return false;
769  }
770  __TBB_ASSERT( a.my_top_priority > new_priority, NULL );
771  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
772 
773  intptr_t p = a.my_top_priority;
774  update_arena_top_priority( a, new_priority );
775  if ( a.my_num_workers_requested > 0 ) {
776  if ( my_global_bottom_priority > new_priority ) {
777  my_global_bottom_priority = new_priority;
778  }
779  if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
780  // Global top level became empty
781  for ( --p; p>my_global_bottom_priority && !my_priority_levels[p].workers_requested; --p ) continue;
782  update_global_top_priority(p);
783  }
784  update_allotment( p );
785  }
786 
787  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
789  return true;
790 }
791 
792 bool market::update_arena_priority ( arena& a, intptr_t new_priority ) {
793  // TODO: do not acquire this global lock while checking arena's state.
794  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
795 
796  tbb::internal::assert_priority_valid(new_priority);
797  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested <= 0, NULL );
799  if ( a.my_top_priority == new_priority ) {
800  return false;
801  }
802  else if ( a.my_top_priority > new_priority ) {
803  if ( a.my_bottom_priority > new_priority )
804  a.my_bottom_priority = new_priority;
805  return false;
806  }
807  else if ( a.my_num_workers_requested <= 0 ) {
808  return false;
809  }
810 
811  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
812 
813  intptr_t p = a.my_top_priority;
814  intptr_t highest_affected_level = max(p, new_priority);
815  update_arena_top_priority( a, new_priority );
816 
817  if ( my_global_top_priority < new_priority ) {
818  update_global_top_priority(new_priority);
819  }
820  else if ( my_global_top_priority == new_priority ) {
821  advance_global_reload_epoch();
822  }
823  else {
824  __TBB_ASSERT( new_priority < my_global_top_priority, NULL );
825  __TBB_ASSERT( new_priority > my_global_bottom_priority, NULL );
826  if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
827  // Global top level became empty
828  __TBB_ASSERT( my_global_bottom_priority < p, NULL );
829  for ( --p; !my_priority_levels[p].workers_requested; --p ) continue;
830  __TBB_ASSERT( p >= new_priority, NULL );
831  update_global_top_priority(p);
832  highest_affected_level = p;
833  }
834  }
835  if ( p == my_global_bottom_priority ) {
836  // Arena priority was increased from the global bottom level.
837  __TBB_ASSERT( p < new_priority, NULL );
838  __TBB_ASSERT( new_priority <= my_global_top_priority, NULL );
839  while ( my_global_bottom_priority < my_global_top_priority
840  && !my_priority_levels[my_global_bottom_priority].workers_requested )
841  ++my_global_bottom_priority;
842  __TBB_ASSERT( my_global_bottom_priority <= new_priority, NULL );
843 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
844  const bool enforced_concurrency = my_mandatory_num_requested && a.must_have_concurrency();
845 #else
846  const bool enforced_concurrency = false;
847 #endif
848  __TBB_ASSERT_EX( enforced_concurrency || my_priority_levels[my_global_bottom_priority].workers_requested > 0, NULL );
849  }
850  update_allotment( highest_affected_level );
851 
852  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
854  return true;
855 }
856 #endif /* __TBB_TASK_PRIORITY */
857 
858 } // namespace internal
859 } // namespace tbb
unsigned my_public_ref_count
Count of master threads attached.
Definition: market.h:149
void process(job &j) __TBB_override
Definition: market.cpp:694
static void assume_scheduler(generic_scheduler *s)
Temporarily set TLS slot to the given scheduler.
Definition: governor.cpp:116
bool is_arena_in_list(arena_list_type &arenas, arena *a)
Definition: market.cpp:413
void try_destroy_arena(arena *, uintptr_t aba_epoch)
Removes the arena from the market's list.
Definition: market.cpp:318
static void remove_ref()
Remove reference to resources. If last reference removed, release the resources.
Definition: tbb_main.cpp:121
static rml::tbb_server * create_rml_server(rml::tbb_client &)
Definition: governor.cpp:92
static market * theMarket
Currently active global market.
Definition: market.h:58
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:266
static const intptr_t num_priority_levels
void unlock()
Release lock.
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:167
int my_total_demand
Number of workers that were requested by all arenas.
Definition: market.h:89
static global_market_mutex_type theMarketMutex
Mutex guarding creation/destruction of theMarket, insertions/deletions in my_arenas,...
Definition: market.h:63
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.
unsigned my_ref_count
Reference count controlling market object lifetime.
Definition: market.h:146
void const char const char int ITT_FORMAT __itt_group_sync s
void update_allotment()
Recalculates the number of workers assigned to each arena in the list.
Definition: market.h:211
static bool UsePrivateRML
Definition: governor.h:61
static arena * create_arena(int num_slots, int num_reserved_slots, size_t stack_size)
Creates an arena object.
Definition: market.cpp:296
static arena & allocate_arena(market &, unsigned num_slots, unsigned num_reserved_slots)
Allocate an instance of arena.
Definition: arena.cpp:238
#define __TBB_TASK_PRIORITY
Definition: tbb_config.h:565
friend class arena
Definition: market.h:47
static generic_scheduler * create_worker(market &m, size_t index)
Initialize a scheduler for a worker thread.
Definition: scheduler.cpp:1235
void insert_arena_into_list(arena &a)
Definition: market.cpp:29
arena * arena_in_need(arena *)
Returns next arena that needs more workers, or NULL.
Definition: market.h:218
arenas_list_mutex_type my_arenas_list_mutex
Definition: market.h:67
#define ITT_THREAD_SET_NAME(name)
Definition: itt_notify.h:117
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void cleanup(job &j) __TBB_override
Definition: market.cpp:716
unsigned my_max_num_workers
The number of workers requested by the master thread owning the arena.
Definition: arena.h:89
Work stealing task scheduler.
Definition: scheduler.h:120
void __TBB_EXPORTED_FUNC NFS_Free(void *)
Free memory allocated by NFS_Allocate.
The graph class.
uintptr_t my_arenas_aba_epoch
ABA prevention marker to assign to newly created arenas.
Definition: market.h:143
static void cleanup_worker(void *arg, bool worker)
Perform necessary cleanup when a worker thread finishes.
Definition: scheduler.cpp:1290
The scoped locking pattern.
Definition: spin_rw_mutex.h:86
static size_t active_value(parameter p)
tbb::atomic< uintptr_t > my_pool_state
Current task pool state and estimate of available tasks amount.
Definition: arena.h:99
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
unsigned my_workers_soft_limit_to_report
Either workers soft limit to be reported via runtime_warning() or skip_soft_limit_warning.
Definition: market.h:161
static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit)
Definition: market.cpp:86
int my_num_workers_requested
The number of workers that are currently requested from the resource manager.
Definition: arena.h:92
unsigned num_workers_active()
The number of workers active in the arena.
Definition: arena.h:233
bool my_join_workers
Shutdown mode.
Definition: market.h:155
#define GATHER_STATISTIC(x)
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
Definition: governor.cpp:120
atomic< unsigned > my_references
Reference counter for the arena.
Definition: arena.h:57
T min(const T &val1, const T &val2)
Utility template function returning lesser of the two values.
Definition: tbb_misc.h:103
static const unsigned ref_worker
Definition: arena.h:227
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
Release.
Definition: atomic.h:45
int my_num_workers_requested
Number of workers currently requested from RML.
Definition: market.h:81
static unsigned app_parallelism_limit()
Reports active parallelism level according to user's settings.
Definition: tbb_main.cpp:492
uintptr_t my_aba_epoch
ABA prevention marker.
Definition: arena.h:134
static unsigned default_num_threads()
Definition: governor.h:81
job * create_one_job() __TBB_override
Definition: market.cpp:734
void detach_arena(arena &)
Removes the arena from the market's list.
Definition: market.cpp:310
void process(generic_scheduler &)
Registers the worker with the arena and enters TBB scheduler dispatch loop.
Definition: arena.cpp:102
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
Definition: market.cpp:292
#define _T(string_literal)
Standard Windows style macro to markup the string literals.
Definition: itt_notify.h:62
generic_scheduler * my_scheduler
Scheduler of the thread attached to the slot.
bool is_worker() const
True if running on a worker thread, false otherwise.
Definition: scheduler.h:591
atomic< unsigned > my_first_unused_worker_idx
First unused index of worker.
Definition: market.h:86
static market & global_market(bool is_public, unsigned max_num_workers=0, size_t stack_size=0)
Factory method creating new market object.
Definition: market.cpp:96
void assert_market_valid() const
Definition: market.h:224
static void add_ref()
Add reference to resources. If first reference added, acquire the resources.
Definition: tbb_main.cpp:116
void destroy()
Destroys and deallocates market object created by market::create()
Definition: market.cpp:165
unsigned my_num_workers_hard_limit
Maximal number of workers allowed for use by the underlying resource manager.
Definition: market.h:74
void adjust_demand(arena &, int delta)
Request that arena's need in workers should be adjusted.
Definition: market.cpp:586
T max(const T &val1, const T &val2)
Utility template function returning greater of the two values.
Definition: tbb_misc.h:112
atomic< T > & as_atomic(T &t)
Definition: atomic.h:543
#define __TBB_Yield()
Definition: ibm_aix51.h:44
size_t my_stack_size
Stack size of worker threads.
Definition: market.h:152
arena * my_next_arena
The first arena to be checked when idle worker seeks for an arena to enter.
Definition: market.h:139
static const unsigned skip_soft_limit_warning
The value indicating that the soft limit warning is unnecessary.
Definition: market.h:158
static const pool_state_t SNAPSHOT_EMPTY
No tasks to steal since last snapshot was taken.
Definition: arena.h:217
void lock()
Acquire writer lock.
market(unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size)
Constructor.
Definition: market.cpp:64
void const char const char int ITT_FORMAT __itt_group_sync p
unsigned my_num_workers_allotted
The number of workers that have been marked out by the resource manager to service the arena.
Definition: arena.h:51
void free_arena()
Completes arena shutdown, destructs and deallocates it.
Definition: arena.cpp:249
unsigned my_num_workers_soft_limit
Current application-imposed limit on the number of workers (see set_active_num_workers())
Definition: market.h:78
static void set_active_num_workers(unsigned w)
Set number of active workers.
Definition: market.cpp:217
void acknowledge_close_connection() __TBB_override
Definition: market.cpp:730
arena_slot my_slots[1]
Definition: arena.h:296
market * my_market
The market that owns this arena.
Definition: arena.h:131
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
bool release(bool is_public, bool blocking_terminate)
Decrements market's refcount and destroys it in the end.
Definition: market.cpp:175
T __TBB_load_with_acquire(const volatile T &location)
Definition: tbb_machine.h:709
rml::tbb_server * my_server
Pointer to the RML server object that services this TBB instance.
Definition: market.h:70
void remove_arena_from_list(arena &a)
Definition: market.cpp:42
arena_list_type my_arenas
List of registered arenas.
Definition: market.h:135
static generic_scheduler * local_scheduler_if_initialized()
Definition: governor.h:132

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.