Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
market.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #include "tbb/tbb_stddef.h"
22 #include "tbb/global_control.h" // global_control::active_value
23 
24 #include "market.h"
25 #include "tbb_main.h"
26 #include "governor.h"
27 #include "scheduler.h"
28 #include "itt_notify.h"
29 
30 namespace tbb {
31 namespace internal {
32 
34 #if __TBB_TASK_PRIORITY
35  arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
36  arena *&next = my_priority_levels[a.my_top_priority].next_arena;
37 #else /* !__TBB_TASK_PRIORITY */
38  arena_list_type &arenas = my_arenas;
39  arena *&next = my_next_arena;
40 #endif /* !__TBB_TASK_PRIORITY */
41  arenas.push_front( a );
42  if ( arenas.size() == 1 )
43  next = &*arenas.begin();
44 }
45 
47 #if __TBB_TASK_PRIORITY
48  arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
49  arena *&next = my_priority_levels[a.my_top_priority].next_arena;
50 #else /* !__TBB_TASK_PRIORITY */
51  arena_list_type &arenas = my_arenas;
52  arena *&next = my_next_arena;
53 #endif /* !__TBB_TASK_PRIORITY */
54  arena_list_type::iterator it = next;
55  __TBB_ASSERT( it != arenas.end(), NULL );
56  if ( next == &a ) {
57  if ( ++it == arenas.end() && arenas.size() > 1 )
58  it = arenas.begin();
59  next = &*it;
60  }
61  arenas.remove( a );
62 }
63 
64 //------------------------------------------------------------------------
65 // market
66 //------------------------------------------------------------------------
67 
68 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size )
69  : my_num_workers_hard_limit(workers_hard_limit)
70  , my_num_workers_soft_limit(workers_soft_limit)
72  , my_global_top_priority(normalized_normal_priority)
73  , my_global_bottom_priority(normalized_normal_priority)
74 #endif /* __TBB_TASK_PRIORITY */
75  , my_ref_count(1)
76  , my_stack_size(stack_size)
77  , my_workers_soft_limit_to_report(workers_soft_limit)
78 {
79 #if __TBB_TASK_PRIORITY
80  __TBB_ASSERT( my_global_reload_epoch == 0, NULL );
81  my_priority_levels[normalized_normal_priority].workers_available = my_num_workers_soft_limit;
82 #endif /* __TBB_TASK_PRIORITY */
83 
84  // Once created RML server will start initializing workers that will need
85  // global market instance to get worker stack size
87  __TBB_ASSERT( my_server, "Failed to create RML server" );
88 }
89 
90 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
91  if( int soft_limit = market::app_parallelism_limit() )
92  workers_soft_limit = soft_limit-1;
93  else // if user set no limits (yet), use market's parameter
94  workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
95  if( workers_soft_limit >= workers_hard_limit )
96  workers_soft_limit = workers_hard_limit-1;
97  return workers_soft_limit;
98 }
99 
100 market& market::global_market ( bool is_public, unsigned workers_requested, size_t stack_size ) {
101  global_market_mutex_type::scoped_lock lock( theMarketMutex );
102  market *m = theMarket;
103  if( m ) {
104  ++m->my_ref_count;
105  const unsigned old_public_count = is_public? m->my_public_ref_count++ : /*any non-zero value*/1;
106  lock.release();
107  if( old_public_count==0 )
109 
110  // do not warn if default number of workers is requested
111  if( workers_requested != governor::default_num_threads()-1 ) {
112  __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
113  "skip_soft_limit_warning must be larger than any valid workers_requested" );
114  unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report;
115  if( soft_limit_to_report < workers_requested ) {
116  runtime_warning( "The number of workers is currently limited to %u. "
117  "The request for %u workers is ignored. Further requests for more workers "
118  "will be silently ignored until the limit changes.\n",
119  soft_limit_to_report, workers_requested );
120  // The race is possible when multiple threads report warnings.
121  // We are OK with that, as there are just multiple warnings.
123  compare_and_swap(skip_soft_limit_warning, soft_limit_to_report);
124  }
125 
126  }
127  if( m->my_stack_size < stack_size )
128  runtime_warning( "Thread stack size has been already set to %u. "
129  "The request for larger stack (%u) cannot be satisfied.\n",
130  m->my_stack_size, stack_size );
131  }
132  else {
133  // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
134  if( stack_size == 0 )
136  // Expecting that 4P is suitable for most applications.
137  // Limit to 2P for large thread number.
138  // TODO: ask RML for max concurrency and possibly correct hard_limit
139  const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
140  // The requested number of threads is intentionally not considered in
141  // computation of the hard limit, in order to separate responsibilities
142  // and avoid complicated interactions between global_control and task_scheduler_init.
143  // The market guarantees that at least 256 threads might be created.
144  const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
145  const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
146  // Create the global market instance
147  size_t size = sizeof(market);
148 #if __TBB_TASK_GROUP_CONTEXT
149  __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(generic_scheduler*) == sizeof(market),
150  "my_workers must be the last data field of the market class");
151  size += sizeof(generic_scheduler*) * (workers_hard_limit - 1);
152 #endif /* __TBB_TASK_GROUP_CONTEXT */
154  void* storage = NFS_Allocate(1, size, NULL);
155  memset( storage, 0, size );
156  // Initialize and publish global market
157  m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
158  if( is_public )
159  m->my_public_ref_count = 1;
160  theMarket = m;
161  // This check relies on the fact that for shared RML default_concurrency==max_concurrency
162  if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
163  runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
164  , m->my_server->default_concurrency(), workers_soft_limit );
165  }
166  return *m;
167 }
168 
170 #if __TBB_COUNT_TASK_NODES
171  if ( my_task_node_count )
172  runtime_warning( "Leaked %ld task objects\n", (long)my_task_node_count );
173 #endif /* __TBB_COUNT_TASK_NODES */
174  this->market::~market(); // qualified to suppress warning
175  NFS_Free( this );
177 }
178 
179 bool market::release ( bool is_public, bool blocking_terminate ) {
180  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
181  bool do_release = false;
182  {
183  global_market_mutex_type::scoped_lock lock( theMarketMutex );
184  if ( blocking_terminate ) {
185  __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
186  while ( my_public_ref_count == 1 && my_ref_count > 1 ) {
187  lock.release();
188  // To guarantee that request_close_connection() is called by the last master, we need to wait till all
189  // references are released. Re-read my_public_ref_count to limit waiting if new masters are created.
190  // Theoretically, new private references to the market can be added during waiting making it potentially
191  // endless.
192  // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
193  // Note that the market should know about its schedulers for cancelation/exception/priority propagation,
194  // see e.g. task_group_context::cancel_group_execution()
196  __TBB_Yield();
197  lock.acquire( theMarketMutex );
198  }
199  }
200  if ( is_public ) {
201  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
204  }
205  if ( --my_ref_count == 0 ) {
207  do_release = true;
208  theMarket = NULL;
209  }
210  }
211  if( do_release ) {
212  __TBB_ASSERT( !__TBB_load_with_acquire(my_public_ref_count), "No public references remain if we remove the market." );
213  // inform RML that blocking termination is required
214  my_join_workers = blocking_terminate;
215  my_server->request_close_connection();
216  return blocking_terminate;
217  }
218  return false;
219 }
220 
221 void market::set_active_num_workers ( unsigned soft_limit ) {
222  int old_requested=0, requested=0;
223  bool need_mandatory = false;
224  market *m;
225 
226  {
227  global_market_mutex_type::scoped_lock lock( theMarketMutex );
228  if ( !theMarket )
229  return; // actual value will be used at market creation
230  m = theMarket;
231  ++m->my_ref_count;
232  }
233  // have my_ref_count for market, use it safely
234  {
236  __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL);
237  m->my_num_workers_soft_limit = soft_limit;
238  // report only once after new soft limit value is set
239  m->my_workers_soft_limit_to_report = soft_limit;
240 
241 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
242  // updates soft_limit to zero must be postponed
243  // while mandatory parallelism is enabled
244  if( !(m->my_mandatory_num_requested && !soft_limit) )
245 #endif
246  {
247  const int demand =
248 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
249  m->my_mandatory_num_requested? 0 :
250 #endif
251  m->my_total_demand;
252  requested = min(demand, (int)soft_limit);
253  old_requested = m->my_num_workers_requested;
254  m->my_num_workers_requested = requested;
255 #if __TBB_TASK_PRIORITY
256  m->my_priority_levels[m->my_global_top_priority].workers_available = soft_limit;
257  m->update_allotment( m->my_global_top_priority );
258 #else
259  m->update_allotment();
260 #endif
261  }
262 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
263  if( !m->my_mandatory_num_requested && !soft_limit ) {
264  // enable mandatory concurrency, if enqueued tasks are found
265  // and zero soft_limit requested
266 #if __TBB_TASK_PRIORITY
267  for( int p = m->my_global_top_priority; p >= m->my_global_bottom_priority; --p ) {
268  priority_level_info &pl = m->my_priority_levels[p];
269  arena_list_type &arenas = pl.arenas;
270 #else
271  const int p = 0;
272  arena_list_type &arenas = m->my_arenas;
273 #endif /* __TBB_TASK_PRIORITY */
274  for( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it ) {
275  if( !it->my_task_stream.empty(p) ) {
276  // switch local_mandatory to global_mandatory unconditionally
277  if( m->mandatory_concurrency_enable_impl( &*it ) )
278  need_mandatory = true;
279  }
280  }
281 #if __TBB_TASK_PRIORITY
282  }
283 #endif /* __TBB_TASK_PRIORITY */
284  }
285 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
286  }
287  // adjust_job_count_estimate must be called outside of any locks
288  int delta = requested - old_requested;
289  if( need_mandatory ) ++delta;
290  if( delta!=0 )
291  m->my_server->adjust_job_count_estimate( delta );
292  // release internal market reference to match ++m->my_ref_count above
293  m->release( /*is_public=*/false, /*blocking_terminate=*/false );
294 }
295 
296 bool governor::does_client_join_workers (const tbb::internal::rml::tbb_client &client) {
297  return ((const market&)client).must_join_workers();
298 }
299 
300 arena* market::create_arena ( int num_slots, int num_reserved_slots, size_t stack_size ) {
301  __TBB_ASSERT( num_slots > 0, NULL );
302  __TBB_ASSERT( num_reserved_slots <= num_slots, NULL );
303  // Add public market reference for master thread/task_arena (that adds an internal reference in exchange).
304  market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
305 
306  arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots );
307  // Add newly created arena into the existing market's list.
310  return &a;
311 }
312 
315  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
316  __TBB_ASSERT( !a.my_slots[0].my_scheduler, NULL );
320 }
321 
322 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch ) {
323  bool locked = true;
324  __TBB_ASSERT( a, NULL );
325  // we hold reference to the market, so it cannot be destroyed at any moment here
326  __TBB_ASSERT( this == theMarket, NULL );
327  __TBB_ASSERT( my_ref_count!=0, NULL );
330 #if __TBB_TASK_PRIORITY
331  // scan all priority levels, not only in [my_global_bottom_priority;my_global_top_priority]
332  // range, because arena to be destroyed can have no outstanding request for workers
333  for ( int p = num_priority_levels-1; p >= 0; --p ) {
334  priority_level_info &pl = my_priority_levels[p];
335  arena_list_type &my_arenas = pl.arenas;
336 #endif /* __TBB_TASK_PRIORITY */
337  arena_list_type::iterator it = my_arenas.begin();
338  for ( ; it != my_arenas.end(); ++it ) {
339  if ( a == &*it ) {
340  if ( it->my_aba_epoch == aba_epoch ) {
341  // Arena is alive
342  if ( !a->my_num_workers_requested && !a->my_references ) {
343  __TBB_ASSERT( !a->my_num_workers_allotted && (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers), "Inconsistent arena state" );
344  // Arena is abandoned. Destroy it.
345  detach_arena( *a );
347  locked = false;
348  a->free_arena();
349  }
350  }
351  if (locked)
353  return;
354  }
355  }
356 #if __TBB_TASK_PRIORITY
357  }
358 #endif /* __TBB_TASK_PRIORITY */
360 }
361 
364  if ( arenas.empty() )
365  return NULL;
366  arena_list_type::iterator it = hint;
367  __TBB_ASSERT( it != arenas.end(), NULL );
368  do {
369  arena& a = *it;
370  if ( ++it == arenas.end() )
371  it = arenas.begin();
373 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
374  && !a.recall_by_mandatory_request()
375 #endif
376  ) {
378  return &a;
379  }
380  } while ( it != hint );
381  return NULL;
382 }
383 
384 int market::update_allotment ( arena_list_type& arenas, int workers_demand, int max_workers ) {
385  __TBB_ASSERT( workers_demand, NULL );
386  max_workers = min(workers_demand, max_workers);
387  int carry = 0;
388  int assigned = 0;
389  arena_list_type::iterator it = arenas.begin();
390  for ( ; it != arenas.end(); ++it ) {
391  arena& a = *it;
392  if ( a.my_num_workers_requested <= 0 ) {
394  continue;
395  }
396  int tmp = a.my_num_workers_requested * max_workers + carry;
397  int allotted = tmp / workers_demand;
398  carry = tmp % workers_demand;
399  // a.my_num_workers_requested may temporarily exceed a.my_max_num_workers
400  allotted = min( allotted, (int)a.my_max_num_workers );
401 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
402  if ( !allotted && a.must_have_concurrency() )
403  allotted = 1;
404 #endif
405  a.my_num_workers_allotted = allotted;
406  assigned += allotted;
407  }
408 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
409  __TBB_ASSERT( assigned <= workers_demand, NULL ); // weaker assertion due to enforced allotment
410 #else
411  __TBB_ASSERT( assigned <= max_workers, NULL );
412 #endif
413  return assigned;
414 }
415 
418  if ( a ) {
419  for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
420  if ( a == &*it )
421  return true;
422  }
423  return false;
424 }
425 
426 #if __TBB_TASK_PRIORITY
427 inline void market::update_global_top_priority ( intptr_t newPriority ) {
428  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.market_prio_switches );
429  my_global_top_priority = newPriority;
430  my_priority_levels[newPriority].workers_available =
431 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
432  my_mandatory_num_requested && !my_num_workers_soft_limit ? 1 :
433 #endif
435  advance_global_reload_epoch();
436 }
437 
438 inline void market::reset_global_priority () {
439  my_global_bottom_priority = normalized_normal_priority;
440  update_global_top_priority(normalized_normal_priority);
441 }
442 
443 arena* market::arena_in_need ( arena* prev_arena ) {
444  if( as_atomic(my_total_demand) <= 0 )
445  return NULL;
446  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false);
448  int p = my_global_top_priority;
449  arena *a = NULL;
450 
451  // Checks if arena is alive or not
452  if ( is_arena_in_list( my_priority_levels[p].arenas, prev_arena ) ) {
453  a = arena_in_need( my_priority_levels[p].arenas, prev_arena );
454  }
455 
456  while ( !a && p >= my_global_bottom_priority ) {
457  priority_level_info &pl = my_priority_levels[p--];
458  a = arena_in_need( pl.arenas, pl.next_arena );
459  if ( a ) {
460  as_atomic(pl.next_arena) = a; // a subject for innocent data race under the reader lock
461  // TODO: rework global round robin policy to local or random to avoid this write
462  }
463  // TODO: When refactoring task priority code, take into consideration the
464  // __TBB_TRACK_PRIORITY_LEVEL_SATURATION sections from earlier versions of TBB
465  }
466  return a;
467 }
468 
469 void market::update_allotment ( intptr_t highest_affected_priority ) {
470  intptr_t i = highest_affected_priority;
471  int available = my_priority_levels[i].workers_available;
472  for ( ; i >= my_global_bottom_priority; --i ) {
473  priority_level_info &pl = my_priority_levels[i];
474  pl.workers_available = available;
475  if ( pl.workers_requested ) {
476  available -= update_allotment( pl.arenas, pl.workers_requested, available );
477  if ( available < 0 ) { // TODO: assertion?
478  available = 0;
479  break;
480  }
481  }
482  }
483  __TBB_ASSERT( i <= my_global_bottom_priority || !available, NULL );
484  for ( --i; i >= my_global_bottom_priority; --i ) {
485  priority_level_info &pl = my_priority_levels[i];
486  pl.workers_available = 0;
487  arena_list_type::iterator it = pl.arenas.begin();
488  for ( ; it != pl.arenas.end(); ++it ) {
489  __TBB_ASSERT( it->my_num_workers_requested >= 0 || !it->my_num_workers_allotted, NULL );
490 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
491  it->my_num_workers_allotted = it->must_have_concurrency() ? 1 : 0;
492 #else
493  it->my_num_workers_allotted = 0;
494 #endif
495  }
496  }
497 }
498 #endif /* __TBB_TASK_PRIORITY */
499 
500 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
501 bool market::mandatory_concurrency_enable_impl ( arena *a, bool *enabled ) {
502  if( a->my_concurrency_mode==arena_base::cm_enforced_global ) {
503  if( enabled )
504  *enabled = false;
505  return false;
506  }
507  if( enabled )
508  *enabled = true;
509  a->my_max_num_workers = 1;
510  a->my_concurrency_mode = arena_base::cm_enforced_global;
511 #if __TBB_TASK_PRIORITY
512  priority_level_info &pl = my_priority_levels[a->my_top_priority];
513  pl.workers_requested++;
514  if( my_global_top_priority < a->my_top_priority ) {
515  my_global_top_priority = a->my_top_priority;
516  advance_global_reload_epoch();
517  }
518 #endif
519  a->my_num_workers_requested++;
520  a->my_num_workers_allotted++;
521  if( 1 == ++my_mandatory_num_requested ) {
522  my_total_demand++;
523  return true;
524  }
525  return false;
526 }
527 
528 bool market::mandatory_concurrency_enable ( arena *a ) {
529  bool add_thread;
530  bool enabled;
531  {
532  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
533  add_thread = mandatory_concurrency_enable_impl(a, &enabled);
534  }
535  if( add_thread )
536  my_server->adjust_job_count_estimate( 1 );
537  return enabled;
538 }
539 
540 void market::mandatory_concurrency_disable ( arena *a ) {
541  bool remove_thread = false;
542  int delta_adjust_demand = 0;
543 
544  {
545  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
546 
547  if( a->my_concurrency_mode!=arena_base::cm_enforced_global )
548  return;
549  __TBB_ASSERT( a->my_max_num_workers==1, NULL );
550  a->my_max_num_workers = 0;
551 #if __TBB_TASK_PRIORITY
552  if ( a->my_top_priority != normalized_normal_priority ) {
553  update_arena_top_priority( *a, normalized_normal_priority );
554  }
555  a->my_bottom_priority = normalized_normal_priority;
556 #endif
557 
558  int val = --my_mandatory_num_requested;
559  __TBB_ASSERT_EX( val >= 0, NULL );
560  if( val == 0 ) {
561  my_total_demand--;
562  remove_thread = true;
563  }
564  a->my_num_workers_requested--;
565  if (a->my_num_workers_requested > 0)
566  delta_adjust_demand = a->my_num_workers_requested;
567  else
568  a->my_num_workers_allotted = 0;
569 
570 #if __TBB_TASK_PRIORITY
571  priority_level_info &pl = my_priority_levels[a->my_top_priority];
572  pl.workers_requested--;
573  intptr_t p = my_global_top_priority;
574  for (; !my_priority_levels[p].workers_requested && p>0; p--)
575  ;
576  if( !p )
577  reset_global_priority();
578  else if( p!= my_global_top_priority )
579  update_global_top_priority(p);
580 #endif
581  a->my_concurrency_mode = arena::cm_normal;
582  }
583  if( delta_adjust_demand )
584  adjust_demand( *a, -delta_adjust_demand );
585  if( remove_thread )
586  my_server->adjust_job_count_estimate( -1 );
587 }
588 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
589 
590 void market::adjust_demand ( arena& a, int delta ) {
591  __TBB_ASSERT( theMarket, "market instance was destroyed prematurely?" );
592  if ( !delta )
593  return;
595  int prev_req = a.my_num_workers_requested;
596  a.my_num_workers_requested += delta;
597  if ( a.my_num_workers_requested <= 0 ) {
598 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
599  // must not recall worker from arena with mandatory parallelism
600  if ( a.my_market->my_mandatory_num_requested && a.my_concurrency_mode!=arena_base::cm_normal )
602  else
603 #endif
605  if ( prev_req <= 0 ) {
607  return;
608  }
609  delta = -prev_req;
610  }
611  else if ( prev_req < 0 ) {
612  delta = a.my_num_workers_requested;
613  }
614  my_total_demand += delta;
615 #if !__TBB_TASK_PRIORITY
617 #else /* !__TBB_TASK_PRIORITY */
618  intptr_t p = a.my_top_priority;
619  priority_level_info &pl = my_priority_levels[p];
620  pl.workers_requested += delta;
621  __TBB_ASSERT( pl.workers_requested >= 0, NULL );
622  if ( a.my_num_workers_requested <= 0 ) {
623  if ( a.my_top_priority != normalized_normal_priority ) {
624  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_resets );
625  update_arena_top_priority( a, normalized_normal_priority );
626  }
627  a.my_bottom_priority = normalized_normal_priority;
628  }
629  if ( p == my_global_top_priority ) {
630  if ( !pl.workers_requested ) {
631  while ( --p >= my_global_bottom_priority && !my_priority_levels[p].workers_requested )
632  continue;
633  if ( p < my_global_bottom_priority )
634  reset_global_priority();
635  else
636  update_global_top_priority(p);
637  }
638  update_allotment( my_global_top_priority );
639  }
640  else if ( p > my_global_top_priority ) {
641  __TBB_ASSERT( pl.workers_requested > 0, NULL );
642  // TODO: investigate if the following invariant is always valid
643  __TBB_ASSERT( a.my_num_workers_requested >= 0, NULL );
644  update_global_top_priority(p);
646 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
647  // must not recall worker from arena with mandatory parallelism
649  && a.my_market->my_mandatory_num_requested && a.my_concurrency_mode!=arena_base::cm_normal )
651 #endif
652  my_priority_levels[p - 1].workers_available = my_num_workers_soft_limit - a.my_num_workers_allotted;
653  update_allotment( p - 1 );
654  }
655  else if ( p == my_global_bottom_priority ) {
656  if ( !pl.workers_requested ) {
657  while ( ++p <= my_global_top_priority && !my_priority_levels[p].workers_requested )
658  continue;
659  if ( p > my_global_top_priority )
660  reset_global_priority();
661  else
662  my_global_bottom_priority = p;
663  }
664  else
665  update_allotment( p );
666  }
667  else if ( p < my_global_bottom_priority ) {
668  int prev_bottom = my_global_bottom_priority;
669  my_global_bottom_priority = p;
670  update_allotment( prev_bottom );
671  }
672  else {
673  __TBB_ASSERT( my_global_bottom_priority < p && p < my_global_top_priority, NULL );
674  update_allotment( p );
675  }
676  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested<=0, NULL );
678 #endif /* !__TBB_TASK_PRIORITY */
679  if ( delta > 0 ) {
680  // can't overflow soft_limit, but remember values request by arenas in
681  // my_total_demand to not prematurely release workers to RML
684  } else {
685  // the number of workers should not be decreased below my_total_demand
688  }
689  my_num_workers_requested += delta;
691 
693  // Must be called outside of any locks
694  my_server->adjust_job_count_estimate( delta );
696 }
697 
698 void market::process( job& j ) {
699  generic_scheduler& s = static_cast<generic_scheduler&>(j);
700  // s.my_arena can be dead. Don't access it until arena_in_need is called
701  arena *a = s.my_arena;
702  __TBB_ASSERT( governor::is_set(&s), NULL );
703 
704  for (int i = 0; i < 2; ++i) {
705  while ( (a = arena_in_need(a)) ) {
706  a->process(s);
707  a = NULL; // to avoid double checks in arena_in_need(arena*) for the same priority level
708  }
709  // Workers leave market because there is no arena in need. It can happen earlier than
710  // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
711  // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
712  // the yield refines this spinning.
713  if ( !i )
714  __TBB_Yield();
715  }
716 
717  GATHER_STATISTIC( ++s.my_counters.market_roundtrips );
718 }
719 
720 void market::cleanup( job& j ) {
721  __TBB_ASSERT( theMarket != this, NULL );
722  generic_scheduler& s = static_cast<generic_scheduler&>(j);
724  __TBB_ASSERT( !mine || mine->is_worker(), NULL );
725  if( mine!=&s ) {
727  generic_scheduler::cleanup_worker( &s, mine!=NULL );
729  } else {
731  }
732 }
733 
735  destroy();
736 }
737 
738 ::rml::job* market::create_one_job() {
739  unsigned index = ++my_first_unused_worker_idx;
740  __TBB_ASSERT( index > 0, NULL );
741  ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
742  // index serves as a hint decreasing conflicts between workers when they migrate between arenas
744 #if __TBB_TASK_GROUP_CONTEXT
745  __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL );
746  __TBB_ASSERT( !my_workers[index - 1], NULL );
747  my_workers[index - 1] = s;
748 #endif /* __TBB_TASK_GROUP_CONTEXT */
749  return s;
750 }
751 
752 #if __TBB_TASK_PRIORITY
753 void market::update_arena_top_priority ( arena& a, intptr_t new_priority ) {
754  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_switches );
755  __TBB_ASSERT( a.my_top_priority != new_priority, NULL );
756  priority_level_info &prev_level = my_priority_levels[a.my_top_priority],
757  &new_level = my_priority_levels[new_priority];
759  a.my_top_priority = new_priority;
761  as_atomic( a.my_reload_epoch ).fetch_and_increment<tbb::release>(); // TODO: synch with global reload epoch in order to optimize usage of local reload epoch
762  prev_level.workers_requested -= a.my_num_workers_requested;
763  new_level.workers_requested += a.my_num_workers_requested;
764  __TBB_ASSERT( prev_level.workers_requested >= 0 && new_level.workers_requested >= 0, NULL );
765 }
766 
767 bool market::lower_arena_priority ( arena& a, intptr_t new_priority, uintptr_t old_reload_epoch ) {
768  // TODO: replace the lock with a try_lock loop which performs a double check of the epoch
769  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
770  if ( a.my_reload_epoch != old_reload_epoch ) {
772  return false;
773  }
774  __TBB_ASSERT( a.my_top_priority > new_priority, NULL );
775  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
776 
777  intptr_t p = a.my_top_priority;
778  update_arena_top_priority( a, new_priority );
779  if ( a.my_num_workers_requested > 0 ) {
780  if ( my_global_bottom_priority > new_priority ) {
781  my_global_bottom_priority = new_priority;
782  }
783  if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
784  // Global top level became empty
785  for ( --p; p>my_global_bottom_priority && !my_priority_levels[p].workers_requested; --p ) continue;
786  update_global_top_priority(p);
787  }
788  update_allotment( p );
789  }
790 
791  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
793  return true;
794 }
795 
796 bool market::update_arena_priority ( arena& a, intptr_t new_priority ) {
797  // TODO: do not acquire this global lock while checking arena's state.
798  arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
799 
800  tbb::internal::assert_priority_valid(new_priority);
801  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested <= 0, NULL );
803  if ( a.my_top_priority == new_priority ) {
804  return false;
805  }
806  else if ( a.my_top_priority > new_priority ) {
807  if ( a.my_bottom_priority > new_priority )
808  a.my_bottom_priority = new_priority;
809  return false;
810  }
811  else if ( a.my_num_workers_requested <= 0 ) {
812  return false;
813  }
814 
815  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
816 
817  intptr_t p = a.my_top_priority;
818  intptr_t highest_affected_level = max(p, new_priority);
819  update_arena_top_priority( a, new_priority );
820 
821  if ( my_global_top_priority < new_priority ) {
822  update_global_top_priority(new_priority);
823  }
824  else if ( my_global_top_priority == new_priority ) {
825  advance_global_reload_epoch();
826  }
827  else {
828  __TBB_ASSERT( new_priority < my_global_top_priority, NULL );
829  __TBB_ASSERT( new_priority > my_global_bottom_priority, NULL );
830  if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
831  // Global top level became empty
832  __TBB_ASSERT( my_global_bottom_priority < p, NULL );
833  for ( --p; !my_priority_levels[p].workers_requested; --p ) continue;
834  __TBB_ASSERT( p >= new_priority, NULL );
835  update_global_top_priority(p);
836  highest_affected_level = p;
837  }
838  }
839  if ( p == my_global_bottom_priority ) {
840  // Arena priority was increased from the global bottom level.
841  __TBB_ASSERT( p < new_priority, NULL );
842  __TBB_ASSERT( new_priority <= my_global_top_priority, NULL );
843  while ( my_global_bottom_priority < my_global_top_priority
844  && !my_priority_levels[my_global_bottom_priority].workers_requested )
845  ++my_global_bottom_priority;
846  __TBB_ASSERT( my_global_bottom_priority <= new_priority, NULL );
847 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
848  const bool enforced_concurrency = my_mandatory_num_requested && a.must_have_concurrency();
849 #else
850  const bool enforced_concurrency = false;
851 #endif
852  __TBB_ASSERT_EX( enforced_concurrency || my_priority_levels[my_global_bottom_priority].workers_requested > 0, NULL );
853  }
854  update_allotment( highest_affected_level );
855 
856  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
858  return true;
859 }
860 #endif /* __TBB_TASK_PRIORITY */
861 
862 } // namespace internal
863 } // namespace tbb
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
unsigned my_workers_soft_limit_to_report
Either workers soft limit to be reported via runtime_warning() or skip_soft_limit_warning.
Definition: market.h:165
void free_arena()
Completes arena shutdown, destructs and deallocates it.
Definition: arena.cpp:253
unsigned my_public_ref_count
Count of master threads attached.
Definition: market.h:153
bool my_join_workers
Shutdown mode.
Definition: market.h:159
uintptr_t my_arenas_aba_epoch
ABA prevention marker to assign to newly created arenas.
Definition: market.h:147
static const unsigned skip_soft_limit_warning
The value indicating that the soft limit warning is unnecessary.
Definition: market.h:162
static const unsigned ref_worker
Definition: arena.h:231
unsigned my_num_workers_allotted
The number of workers that have been marked out by the resource manager to service the arena.
Definition: arena.h:55
int my_total_demand
Number of workers that were requested by all arenas.
Definition: market.h:93
arena * my_next_arena
The first arena to be checked when idle worker seeks for an arena to enter.
Definition: market.h:143
T __TBB_load_with_acquire(const volatile T &location)
Definition: tbb_machine.h:713
market * my_market
The market that owns this arena.
Definition: arena.h:135
static market * theMarket
Currently active global market.
Definition: market.h:62
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
void adjust_demand(arena &, int delta)
Request that arena's need in workers should be adjusted.
Definition: market.cpp:590
static market & global_market(bool is_public, unsigned max_num_workers=0, size_t stack_size=0)
Factory method creating new market object.
Definition: market.cpp:100
static unsigned default_num_threads()
Definition: governor.h:85
arena * arena_in_need(arena *)
Returns next arena that needs more workers, or NULL.
Definition: market.h:222
static void add_ref()
Add reference to resources. If first reference added, acquire the resources.
Definition: tbb_main.cpp:120
bool is_worker() const
True if running on a worker thread, false otherwise.
Definition: scheduler.h:595
#define __TBB_TASK_PRIORITY
Definition: tbb_config.h:576
int my_num_workers_requested
Number of workers currently requested from RML.
Definition: market.h:85
static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit)
Definition: market.cpp:90
T min(const T &val1, const T &val2)
Utility template function returning lesser of the two values.
Definition: tbb_misc.h:107
static rml::tbb_server * create_rml_server(rml::tbb_client &)
Definition: governor.cpp:96
static generic_scheduler * local_scheduler_if_initialized()
Definition: governor.h:136
#define __TBB_Yield()
Definition: ibm_aix51.h:48
static arena * create_arena(int num_slots, int num_reserved_slots, size_t stack_size)
Creates an arena object.
Definition: market.cpp:300
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:270
Work stealing task scheduler.
Definition: scheduler.h:124
static void assume_scheduler(generic_scheduler *s)
Temporarily set TLS slot to the given scheduler.
Definition: governor.cpp:120
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
Definition: market.cpp:296
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:171
unsigned my_ref_count
Reference count controlling market object lifetime.
Definition: market.h:150
friend class arena
Definition: market.h:51
void detach_arena(arena &)
Removes the arena from the market's list.
Definition: market.cpp:314
generic_scheduler * my_scheduler
Scheduler of the thread attached to the slot.
arenas_list_mutex_type my_arenas_list_mutex
Definition: market.h:71
void const char const char int ITT_FORMAT __itt_group_sync p
arena_slot my_slots[1]
Definition: arena.h:300
static bool UsePrivateRML
Definition: governor.h:65
static void cleanup_worker(void *arg, bool worker)
Perform necessary cleanup when a worker thread finishes.
Definition: scheduler.cpp:1294
#define ITT_THREAD_SET_NAME(name)
Definition: itt_notify.h:121
bool is_arena_in_list(arena_list_type &arenas, arena *a)
Definition: market.cpp:417
static size_t active_value(parameter p)
arena_list_type my_arenas
List of registered arenas.
Definition: market.h:139
static global_market_mutex_type theMarketMutex
Mutex guarding creation/destruction of theMarket, insertions/deletions in my_arenas,...
Definition: market.h:67
tbb::atomic< uintptr_t > my_pool_state
Current task pool state and estimate of available tasks amount.
Definition: arena.h:103
T max(const T &val1, const T &val2)
Utility template function returning greater of the two values.
Definition: tbb_misc.h:116
atomic< unsigned > my_references
Reference counter for the arena.
Definition: arena.h:61
market(unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size)
Constructor.
Definition: market.cpp:68
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
unsigned my_max_num_workers
The number of workers requested by the master thread owning the arena.
Definition: arena.h:93
static const pool_state_t SNAPSHOT_EMPTY
No tasks to steal since last snapshot was taken.
Definition: arena.h:221
static const intptr_t num_priority_levels
static void remove_ref()
Remove reference to resources. If last reference removed, release the resources.
Definition: tbb_main.cpp:125
The graph class.
void update_allotment()
Recalculates the number of workers assigned to each arena in the list.
Definition: market.h:215
job * create_one_job() __TBB_override
Definition: market.cpp:738
void destroy()
Destroys and deallocates market object created by market::create()
Definition: market.cpp:169
size_t my_stack_size
Stack size of worker threads.
Definition: market.h:156
int my_num_workers_requested
The number of workers that are currently requested from the resource manager.
Definition: arena.h:96
#define GATHER_STATISTIC(x)
bool release(bool is_public, bool blocking_terminate)
Decrements market's refcount and destroys it in the end.
Definition: market.cpp:179
#define _T(string_literal)
Standard Windows style macro to markup the string literals.
Definition: itt_notify.h:66
static arena & allocate_arena(market &, unsigned num_slots, unsigned num_reserved_slots)
Allocate an instance of arena.
Definition: arena.cpp:242
atomic< unsigned > my_first_unused_worker_idx
First unused index of worker.
Definition: market.h:90
rml::tbb_server * my_server
Pointer to the RML server object that services this TBB instance.
Definition: market.h:74
static unsigned app_parallelism_limit()
Reports active parallelism level according to user's settings.
Definition: tbb_main.cpp:496
uintptr_t my_aba_epoch
ABA prevention marker.
Definition: arena.h:138
void acknowledge_close_connection() __TBB_override
Definition: market.cpp:734
void lock()
Acquire writer lock.
void const char const char int ITT_FORMAT __itt_group_sync s
void unlock()
Release lock.
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
Definition: governor.cpp:124
void __TBB_EXPORTED_FUNC NFS_Free(void *)
Free memory allocated by NFS_Allocate.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
static generic_scheduler * create_worker(market &m, size_t index)
Initialize a scheduler for a worker thread.
Definition: scheduler.cpp:1239
void remove_arena_from_list(arena &a)
Definition: market.cpp:46
unsigned my_num_workers_hard_limit
Maximal number of workers allowed for use by the underlying resource manager.
Definition: market.h:78
static void set_active_num_workers(unsigned w)
Set number of active workers.
Definition: market.cpp:221
void cleanup(job &j) __TBB_override
Definition: market.cpp:720
void process(job &j) __TBB_override
Definition: market.cpp:698
The scoped locking pattern.
Definition: spin_rw_mutex.h:90
void try_destroy_arena(arena *, uintptr_t aba_epoch)
Removes the arena from the market's list.
Definition: market.cpp:322
unsigned num_workers_active()
The number of workers active in the arena.
Definition: arena.h:237
Release.
Definition: atomic.h:49
atomic< T > & as_atomic(T &t)
Definition: atomic.h:547
void assert_market_valid() const
Definition: market.h:228
unsigned my_num_workers_soft_limit
Current application-imposed limit on the number of workers (see set_active_num_workers())
Definition: market.h:82
void process(generic_scheduler &)
Registers the worker with the arena and enters TBB scheduler dispatch loop.
Definition: arena.cpp:106
void insert_arena_into_list(arena &a)
Definition: market.cpp:33

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.