blob: c57c4b6523744b9cca2d94d7b99514334d81bebb [file] [log] [blame]
/*
Copyright 2005-2010 Intel Corporation. All Rights Reserved.
This file is part of Threading Building Blocks.
Threading Building Blocks is free software; you can redistribute it
and/or modify it under the terms of the GNU General Public License
version 2 as published by the Free Software Foundation.
Threading Building Blocks is distributed in the hope that it will be
useful, but WITHOUT ANY WARRANTY; without even the implied warranty
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Threading Building Blocks; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
As a special exception, you may use this file as part of a free software
library without restriction. Specifically, if other files instantiate
templates or use macros or inline functions from this file, or you compile
this file and link it with other files to produce an executable, this
file does not by itself cause the resulting executable to be covered by
the GNU General Public License. This exception does not however
invalidate any other reasons why the executable file might be covered by
the GNU General Public License.
*/
#include "tbb/tbb_machine.h"
#include "custom_scheduler.h"
#include "scheduler_utility.h"
#include "governor.h"
#include "market.h"
#include "arena.h"
#include "mailbox.h"
#include "observer_proxy.h"
#include "itt_notify.h"
namespace tbb {
namespace internal {
/** Defined in tbb_main.cpp **/
extern generic_scheduler* (*AllocateSchedulerPtr)( arena*, size_t index );
inline generic_scheduler* allocate_scheduler ( arena* a, size_t index ) {
return AllocateSchedulerPtr(a, index);
}
#if __TBB_TASK_GROUP_CONTEXT
#if !__TBB_ARENA_PER_MASTER
//! Head of the list of master thread schedulers.
static scheduler_list_node_t the_scheduler_list_head;
//! Mutex protecting access to the list of schedulers.
static mutex the_scheduler_list_mutex;
#endif /* !__TBB_ARENA_PER_MASTER */
//! Counter that is incremented whenever new cancellation signal is sent to a task group.
/** Together with generic_scheduler::local_cancel_count forms cross-thread signaling
mechanism that allows to avoid locking at the hot path of normal execution flow.
When a descendant task group context is being registered or unregistered,
the global and local counters are compared. If they differ, it means that
a cancellation signal is being propagated, and registration/deregistration
routines take slower branch that may block (at most one thread of the pool
can be blocked at any moment). Otherwise the control path is lock-free and fast. **/
uintptr_t global_cancel_count = 0;
//! Context to be associated with dummy tasks of worker threads schedulers.
/** It is never used for its direct purpose, and is introduced solely for the sake
of avoiding one extra conditional branch in the end of wait_for_all method. **/
static task_group_context dummy_context(task_group_context::isolated);
#endif /* __TBB_TASK_GROUP_CONTEXT */
#if STATISTICS
static statistics the_statistics;
#endif /* STATISTICS */
void Scheduler_OneTimeInitialization ( bool itt_present ) {
AllocateSchedulerPtr = itt_present ? &custom_scheduler<DefaultSchedulerTraits>::allocate_scheduler :
&custom_scheduler<IntelSchedulerTraits>::allocate_scheduler;
#if __TBB_TASK_GROUP_CONTEXT && !__TBB_ARENA_PER_MASTER
ITT_SYNC_CREATE(&the_scheduler_list_mutex, SyncType_GlobalLock, SyncObj_SchedulersList);
the_scheduler_list_head.my_next = &the_scheduler_list_head;
the_scheduler_list_head.my_prev = &the_scheduler_list_head;
#endif /* __TBB_TASK_GROUP_CONTEXT && !__TBB_ARENA_PER_MASTER */
}
//------------------------------------------------------------------------
// scheduler interface
//------------------------------------------------------------------------
// A pure virtual destructor should still have a body
// so the one for tbb::internal::scheduler::~scheduler() is provided here
scheduler::~scheduler( ) {}
//------------------------------------------------------------------------
// generic_scheduler
//------------------------------------------------------------------------
#if _MSC_VER && !defined(__INTEL_COMPILER)
// Suppress overzealous compiler warning about using 'this' in base initializer list.
#pragma warning(push)
#pragma warning(disable:4355)
#endif
generic_scheduler::generic_scheduler( arena* a, size_t index ) :
my_stealing_threshold(0),
arena_index(index),
task_pool_size(0),
my_arena_slot(&dummy_slot),
#if __TBB_ARENA_PER_MASTER
my_market(NULL),
#endif /* __TBB_ARENA_PER_MASTER */
my_arena(a),
random( unsigned(this-(generic_scheduler*)NULL) ),
free_list(NULL),
innermost_running_task(NULL),
dummy_task(NULL),
ref_count(1),
my_affinity_id(0),
is_registered(false),
is_auto_initialized(false),
#if __TBB_SCHEDULER_OBSERVER
local_last_observer_proxy(NULL),
#endif /* __TBB_SCHEDULER_OBSERVER */
#if __TBB_COUNT_TASK_NODES
task_node_count(0),
#endif /* __TBB_COUNT_TASK_NODES */
#if STATISTICS
current_active(0),
current_length(0),
current_big_malloc(0),
execute_count(0),
steal_count(0),
mail_received_count(0),
proxy_execute_count(0),
proxy_steal_count(0),
proxy_bypass_count(0),
#endif /* STATISTICS */
small_task_count(1), // Extra 1 is a guard reference
return_list(NULL),
#if __TBB_TASK_GROUP_CONTEXT
local_ctx_list_update(0),
nonlocal_ctx_list_update(0)
#endif /* __TBB_TASK_GROUP_CONTEXT */
{
dummy_slot.task_pool = allocate_task_pool( min_task_pool_size );
dummy_slot.head = dummy_slot.tail = 0;
dummy_task = &allocate_task( sizeof(task), __TBB_CONTEXT_ARG(NULL, NULL) );
#if __TBB_TASK_GROUP_CONTEXT
context_list_head.my_prev = &context_list_head;
context_list_head.my_next = &context_list_head;
ITT_SYNC_CREATE(&context_list_mutex, SyncType_Scheduler, SyncObj_ContextsList);
#endif /* __TBB_TASK_GROUP_CONTEXT */
dummy_task->prefix().ref_count = 2;
ITT_SYNC_CREATE(&dummy_task->prefix().ref_count, SyncType_Scheduler, SyncObj_WorkerLifeCycleMgmt);
ITT_SYNC_CREATE(&return_list, SyncType_Scheduler, SyncObj_TaskReturnList);
__TBB_ASSERT( assert_okay(), "constructor error" );
}
#if _MSC_VER && !defined(__INTEL_COMPILER)
#pragma warning(pop)
#endif // warning 4355 is back
#if TBB_USE_ASSERT
bool generic_scheduler::assert_okay() const {
#if TBB_USE_ASSERT >= 2
acquire_task_pool();
task** tp = dummy_slot.task_pool;
__TBB_ASSERT( task_pool_size >= min_task_pool_size, NULL );
__TBB_ASSERT( my_arena_slot->head <= my_arena_slot->tail, NULL );
for ( size_t i = my_arena_slot->head; i < my_arena_slot->tail; ++i ) {
__TBB_ASSERT( (uintptr_t)tp[i] + 1 > 1u, "nil or invalid task pointer in the deque" );
__TBB_ASSERT( tp[i]->prefix().state == task::ready ||
tp[i]->prefix().extra_state == es_task_proxy, "task in the deque has invalid state" );
}
release_task_pool();
#endif /* TBB_USE_ASSERT >=2 */
return true;
}
#endif /* TBB_USE_ASSERT */
#if __TBB_TASK_GROUP_CONTEXT
void generic_scheduler::propagate_cancellation () {
spin_mutex::scoped_lock lock(context_list_mutex);
// Acquire fence is necessary to ensure that the subsequent node->my_next load
// returned the correct value in case it was just inserted in another thread.
// The fence also ensures visibility of the correct my_parent value.
context_list_node_t *node = __TBB_load_with_acquire(context_list_head.my_next);
while ( node != &context_list_head ) {
task_group_context &ctx = __TBB_get_object_ref(task_group_context, my_node, node);
// The absence of acquire fence while reading my_cancellation_requested may result
// in repeated traversals of the same parents chain if another group (precedent or
// descendant) belonging to the tree being canceled sends cancellation request of
// its own around the same time.
if ( !ctx.my_cancellation_requested )
ctx.propagate_cancellation_from_ancestors();
node = node->my_next;
__TBB_ASSERT( ctx.is_alive(), "Walked into a destroyed context while propagating cancellation" );
}
}
#if !__TBB_ARENA_PER_MASTER
/** Propagates cancellation down the tree of dependent contexts by walking each
thread's local list of contexts **/
void generic_scheduler::propagate_cancellation ( task_group_context& ctx ) {
__TBB_ASSERT ( ctx.my_cancellation_requested, "No cancellation request in the context" );
// The whole propagation algorithm is under the lock in order to ensure correctness
// in case of parallel cancellations at the different levels of the context tree.
// See the note 2 at the bottom of the file.
mutex::scoped_lock lock(the_scheduler_list_mutex);
// Advance global cancellation state
__TBB_FetchAndAddWrelease(&global_cancel_count, 1);
// First propagate to workers using arena to access their context lists
size_t num_workers = my_arena->prefix().number_of_workers;
for ( size_t i = 0; i < num_workers; ++i ) {
// No fence is necessary here since the context list of worker's scheduler
// can contain anything of interest only after the first stealing was done
// by that worker. And doing it applies the necessary fence
generic_scheduler *s = my_arena->prefix().worker_list[i].scheduler;
// If the worker is in the middle of its startup sequence, skip it.
if ( s )
s->propagate_cancellation();
}
// Then propagate to masters using the global list of master's schedulers
scheduler_list_node_t *node = the_scheduler_list_head.my_next;
while ( node != &the_scheduler_list_head ) {
__TBB_get_object_ref(generic_scheduler, my_node, node).propagate_cancellation();
node = node->my_next;
}
// Now sync up the local counters
for ( size_t i = 0; i < num_workers; ++i ) {
generic_scheduler *s = my_arena->prefix().worker_list[i].scheduler;
// If the worker is in the middle of its startup sequence, skip it.
if ( s )
s->local_cancel_count = global_cancel_count;
}
node = the_scheduler_list_head.my_next;
while ( node != &the_scheduler_list_head ) {
__TBB_get_object_ref(generic_scheduler, my_node, node).local_cancel_count = global_cancel_count;
node = node->my_next;
}
}
#endif /* !__TBB_ARENA_PER_MASTER */
#endif /* __TBB_TASK_GROUP_CONTEXT */
void generic_scheduler::init_stack_info () {
// Stacks are growing top-down. Highest address is called "stack base",
// and the lowest is "stack limit".
#if __TBB_ARENA_PER_MASTER
__TBB_ASSERT( !my_stealing_threshold, "Stealing threshold has already been calculated" );
size_t stack_size = my_market->worker_stack_size();
#else /* !__TBB_ARENA_PER_MASTER */
size_t stack_size = my_arena->prefix().stack_size;
#endif /* !__TBB_ARENA_PER_MASTER */
#if USE_WINTHREAD
#if defined(_MSC_VER)&&_MSC_VER<1400 && !_WIN64
NT_TIB *pteb = (NT_TIB*)__TBB_machine_get_current_teb();
#else
NT_TIB *pteb = (NT_TIB*)NtCurrentTeb();
#endif
__TBB_ASSERT( &pteb < pteb->StackBase && &pteb > pteb->StackLimit, "invalid stack info in TEB" );
__TBB_ASSERT( stack_size >0, "stack_size not initialized?" );
// When a thread is created with the attribute STACK_SIZE_PARAM_IS_A_RESERVATION, stack limit
// in the TIB points to the committed part of the stack only. This renders the expression
// "(uintptr_t)pteb->StackBase / 2 + (uintptr_t)pteb->StackLimit / 2" virtually useless.
// Thus for worker threads we use the explicit stack size we used while creating them.
// And for master threads we rely on the following fact and assumption:
// - the default stack size of a master thread on Windows is 1M;
// - if it was explicitly set by the application it is at least as large as the size of a worker stack.
if ( is_worker() || stack_size < MByte )
my_stealing_threshold = (uintptr_t)pteb->StackBase - stack_size / 2;
else
my_stealing_threshold = (uintptr_t)pteb->StackBase - MByte / 2;
#else /* USE_PTHREAD */
// There is no portable way to get stack base address in Posix, so we use
// non-portable method (on all modern Linux) or the simplified approach
// based on the common sense assumptions. The most important assumption
// is that the main thread's stack size is not less than that of other threads.
void *stack_base = &stack_size;
#if __TBB_ipf
void *rsb_base = __TBB_get_bsp();
#endif
#if __linux__
size_t np_stack_size = 0;
void *stack_limit = NULL;
pthread_attr_t attr_stack, np_attr_stack;
if( 0 == pthread_getattr_np(pthread_self(), &np_attr_stack) ) {
if ( 0 == pthread_attr_getstack(&np_attr_stack, &stack_limit, &np_stack_size) ) {
if ( 0 == pthread_attr_init(&attr_stack) ) {
if ( 0 == pthread_attr_getstacksize(&attr_stack, &stack_size) )
{
stack_base = (char*)stack_limit + np_stack_size;
if ( np_stack_size < stack_size ) {
// We are in a secondary thread. Use reliable data.
#if __TBB_ipf
// IA64 stack is split into RSE backup and memory parts
rsb_base = stack_limit;
stack_size = np_stack_size/2;
#else
stack_size = np_stack_size;
#endif /* !__TBB_ipf */
}
// We are either in the main thread or this thread stack
// is bigger that that of the main one. As we cannot discern
// these cases we fall back to the default (heuristic) values.
}
pthread_attr_destroy(&attr_stack);
}
}
pthread_attr_destroy(&np_attr_stack);
}
#endif /* __linux__ */
__TBB_ASSERT( stack_size>0, "stack size must be positive" );
my_stealing_threshold = (uintptr_t)((char*)stack_base - stack_size/2);
#if __TBB_ipf
my_rsb_stealing_threshold = (uintptr_t)((char*)rsb_base + stack_size/2);
#endif
#endif /* USE_PTHREAD */
}
/** The function uses synchronization scheme similar to the one in the destructor
of task_group_context augmented with interlocked state change of each context
object. The purpose of this algo is to prevent threads doing nonlocal context
destruction from accessing destroyed owner-scheduler instance still pointed to
by the context object. **/
void generic_scheduler::cleanup_local_context_list () {
// Detach contexts remaining in the local list
bool wait_for_concurrent_destroyers_to_leave = false;
uintptr_t local_count_snapshot = local_cancel_count;
local_ctx_list_update = 1;
{
spin_mutex::scoped_lock lock; // The lock is necessary in acse of a conflict only
__TBB_rel_acq_fence();
// Check for the conflict with concurrent destroyer or cancelation propagator
if ( nonlocal_ctx_list_update || local_count_snapshot != global_cancel_count )
lock.acquire(context_list_mutex);
context_list_node_t *node = context_list_head.my_next;
while ( node != &context_list_head ) {
task_group_context &ctx = __TBB_get_object_ref(task_group_context, my_node, node);
__TBB_ASSERT( ctx.my_kind != task_group_context::binding_required, "Only a context bound to a root task can be detached" );
node = node->my_next;
__TBB_ASSERT( ctx.is_alive(), "Walked into a destroyed context while detaching contexts from the local list" );
// On 64-bit systems my_kind can be a 32-bit value padded with 32 uninitialized bits.
// So the cast below is necessary to throw off the higher bytes containing garbage
if ( (task_group_context::kind_type)(uintptr_t)__TBB_FetchAndStoreW(&ctx.my_kind, task_group_context::detached) == task_group_context::dying )
wait_for_concurrent_destroyers_to_leave = true;
}
}
__TBB_store_with_release( local_ctx_list_update, 0 );
// Wait until other threads referencing this scheduler object finish with it
if ( wait_for_concurrent_destroyers_to_leave )
spin_wait_until_eq( nonlocal_ctx_list_update, 0u );
}
void generic_scheduler::free_scheduler() {
if( in_arena() ) {
acquire_task_pool();
leave_arena();
}
#if __TBB_TASK_GROUP_CONTEXT
cleanup_local_context_list();
#if !__TBB_ARENA_PER_MASTER
task_group_context* default_context = dummy_task->prefix().context;
if ( default_context != &dummy_context) {
// Only master thread's dummy task has a dynamically allocated context
default_context->task_group_context::~task_group_context();
NFS_Free(default_context);
{
mutex::scoped_lock lock(the_scheduler_list_mutex);
my_node.my_next->my_prev = my_node.my_prev;
my_node.my_prev->my_next = my_node.my_next;
}
}
#endif /* !__TBB_ARENA_PER_MASTER */
#endif /* __TBB_TASK_GROUP_CONTEXT */
free_task<small_local_task>( *dummy_task );
// k accounts for a guard reference and each task that we deallocate.
intptr_t k = 1;
for(;;) {
while( task* t = free_list ) {
free_list = t->prefix().next;
deallocate_task(*t);
++k;
}
if( return_list==plugged_return_list() )
break;
free_list = (task*)__TBB_FetchAndStoreW( &return_list, (intptr_t)plugged_return_list() );
}
#if __TBB_COUNT_TASK_NODES
#if __TBB_ARENA_PER_MASTER
my_market->update_task_node_count( task_node_count );
#else /* !__TBB_ARENA_PER_MASTER */
my_arena->prefix().task_node_count += task_node_count;
#endif /* !__TBB_ARENA_PER_MASTER */
#endif /* __TBB_COUNT_TASK_NODES */
#if STATISTICS
the_statistics.record( execute_count, steal_count, mail_received_count,
proxy_execute_count, proxy_steal_count, proxy_bypass_count );
#endif /* STATISTICS */
free_task_pool( dummy_slot.task_pool );
dummy_slot.task_pool = NULL;
// Update small_task_count last. Doing so sooner might cause another thread to free *this.
__TBB_ASSERT( small_task_count>=k, "small_task_count corrupted" );
governor::sign_off(this);
if( __TBB_FetchAndAddW( &small_task_count, -k )==k )
NFS_Free( this );
}
task& generic_scheduler::allocate_task( size_t number_of_bytes,
__TBB_CONTEXT_ARG(task* parent, task_group_context* context) ) {
GATHER_STATISTIC(current_active+=1);
task* t = free_list;
if( number_of_bytes<=quick_task_size ) {
if( t ) {
GATHER_STATISTIC(current_length-=1);
__TBB_ASSERT( t->state()==task::freed, "free list of tasks is corrupted" );
free_list = t->prefix().next;
} else if( return_list ) {
// No fence required for read of return_list above, because __TBB_FetchAndStoreW has a fence.
t = (task*)__TBB_FetchAndStoreW( &return_list, 0 );
__TBB_ASSERT( t, "another thread emptied the return_list" );
__TBB_ASSERT( t->prefix().origin==this, "task returned to wrong return_list" );
ITT_NOTIFY( sync_acquired, &return_list );
free_list = t->prefix().next;
} else {
t = (task*)((char*)NFS_Allocate( task_prefix_reservation_size+quick_task_size, 1, NULL ) + task_prefix_reservation_size );
#if __TBB_COUNT_TASK_NODES
++task_node_count;
#endif /* __TBB_COUNT_TASK_NODES */
t->prefix().origin = this;
++small_task_count;
}
} else {
GATHER_STATISTIC(current_big_malloc+=1);
t = (task*)((char*)NFS_Allocate( task_prefix_reservation_size+number_of_bytes, 1, NULL ) + task_prefix_reservation_size );
#if __TBB_COUNT_TASK_NODES
++task_node_count;
#endif /* __TBB_COUNT_TASK_NODES */
t->prefix().origin = NULL;
}
task_prefix& p = t->prefix();
#if __TBB_TASK_GROUP_CONTEXT
p.context = context;
#endif /* __TBB_TASK_GROUP_CONTEXT */
p.owner = this;
p.ref_count = 0;
// Assign some not outrageously out-of-place value for a while
p.depth = 0;
p.parent = parent;
// In TBB 2.1 and later, the constructor for task sets extra_state to indicate the version of the tbb/task.h header.
// In TBB 2.0 and earlier, the constructor leaves extra_state as zero.
p.extra_state = 0;
p.affinity = 0;
p.state = task::allocated;
return *t;
}
void generic_scheduler::free_nonlocal_small_task( task& t ) {
__TBB_ASSERT( t.state()==task::freed, NULL );
generic_scheduler& s = *static_cast<generic_scheduler*>(t.prefix().origin);
__TBB_ASSERT( &s!=this, NULL );
for(;;) {
task* old = s.return_list;
if( old==plugged_return_list() )
break;
// Atomically insert t at head of s.return_list
t.prefix().next = old;
ITT_NOTIFY( sync_releasing, &s.return_list );
if( __TBB_CompareAndSwapW( &s.return_list, (intptr_t)&t, (intptr_t)old )==(intptr_t)old )
return;
}
deallocate_task(t);
if( __TBB_FetchAndDecrementWrelease( &s.small_task_count )==1 ) {
// We freed the last task allocated by scheduler s, so it's our responsibility
// to free the scheduler.
NFS_Free( &s );
}
}
task** generic_scheduler::allocate_task_pool( size_t n ) {
__TBB_ASSERT( n > task_pool_size, "Cannot shrink the task pool" );
size_t byte_size = ((n * sizeof(task*) + NFS_MaxLineSize - 1) / NFS_MaxLineSize) * NFS_MaxLineSize;
task_pool_size = byte_size / sizeof(task*);
task** new_pool = (task**)NFS_Allocate( byte_size, 1, NULL );
// No need to clear the fresh deque since valid items are designated by the head and tail members.
#if TBB_USE_ASSERT>=2
// But clear it in the high vigilance debug mode
memset( new_pool, -1, n );
#endif /* TBB_USE_ASSERT>=2 */
return new_pool;
}
void generic_scheduler::grow_task_pool( size_t new_size ) {
__TBB_ASSERT( assert_okay(), NULL );
if ( new_size < 2 * task_pool_size )
new_size = 2 * task_pool_size;
task** new_pool = allocate_task_pool( new_size ); // updates task_pool_size
task** old_pool = dummy_slot.task_pool;
acquire_task_pool(); // requires the old dummy_slot.task_pool value
// my_arena_slot->tail should not be updated before my_arena_slot->head because their
// values are used by other threads to check if this task pool is empty.
size_t new_tail = my_arena_slot->tail - my_arena_slot->head;
__TBB_ASSERT( new_tail <= task_pool_size, "new task pool is too short" );
memcpy( new_pool, old_pool + my_arena_slot->head, new_tail * sizeof(task*) );
my_arena_slot->head = 0;
my_arena_slot->tail = new_tail;
dummy_slot.task_pool = new_pool;
release_task_pool(); // updates the task pool pointer in our arena slot
free_task_pool( old_pool );
__TBB_ASSERT( assert_okay(), NULL );
}
/** ATTENTION:
This method is mostly the same as generic_scheduler::lock_task_pool(), with
a little different logic of slot state checks (slot is either locked or points
to our task pool).
Thus if either of them is changed, consider changing the counterpart as well. **/
inline void generic_scheduler::acquire_task_pool() const {
if ( !in_arena() )
return; // we are not in arena - nothing to lock
atomic_backoff backoff;
bool sync_prepare_done = false;
for(;;) {
#if TBB_USE_ASSERT
__TBB_ASSERT( my_arena_slot == my_arena->slot + arena_index, "invalid arena slot index" );
// Local copy of the arena slot task pool pointer is necessary for the next
// assertion to work correctly to exclude asynchronous state transition effect.
task** tp = my_arena_slot->task_pool;
__TBB_ASSERT( tp == LockedTaskPool || tp == dummy_slot.task_pool, "slot ownership corrupt?" );
#endif
if( my_arena_slot->task_pool != LockedTaskPool &&
__TBB_CompareAndSwapW( &my_arena_slot->task_pool, (intptr_t)LockedTaskPool,
(intptr_t)dummy_slot.task_pool ) == (intptr_t)dummy_slot.task_pool )
{
// We acquired our own slot
ITT_NOTIFY(sync_acquired, my_arena_slot);
break;
}
else if( !sync_prepare_done ) {
// Start waiting
ITT_NOTIFY(sync_prepare, my_arena_slot);
sync_prepare_done = true;
}
// Someone else acquired a lock, so pause and do exponential backoff.
backoff.pause();
}
__TBB_ASSERT( my_arena_slot->task_pool == LockedTaskPool, "not really acquired task pool" );
} // generic_scheduler::acquire_task_pool
inline void generic_scheduler::release_task_pool() const {
if ( !in_arena() )
return; // we are not in arena - nothing to unlock
__TBB_ASSERT( my_arena_slot, "we are not in arena" );
__TBB_ASSERT( my_arena_slot->task_pool == LockedTaskPool, "arena slot is not locked" );
ITT_NOTIFY(sync_releasing, my_arena_slot);
__TBB_store_with_release( my_arena_slot->task_pool, dummy_slot.task_pool );
}
/** ATTENTION:
This method is mostly the same as generic_scheduler::acquire_task_pool(),
with a little different logic of slot state checks (slot can be empty, locked
or point to any task pool other than ours, and asynchronous transitions between
all these states are possible).
Thus if any of them is changed, consider changing the counterpart as well **/
inline task** generic_scheduler::lock_task_pool( arena_slot* victim_arena_slot ) const {
task** victim_task_pool;
atomic_backoff backoff;
bool sync_prepare_done = false;
for(;;) {
victim_task_pool = victim_arena_slot->task_pool;
// TODO: Investigate the effect of bailing out on the locked pool without trying to lock it.
// When doing this update assertion in the end of the method.
if ( victim_task_pool == EmptyTaskPool ) {
// The victim thread emptied its task pool - nothing to lock
if( sync_prepare_done )
ITT_NOTIFY(sync_cancel, victim_arena_slot);
break;
}
if( victim_task_pool != LockedTaskPool &&
__TBB_CompareAndSwapW( &victim_arena_slot->task_pool,
(intptr_t)LockedTaskPool, (intptr_t)victim_task_pool ) == (intptr_t)victim_task_pool )
{
// We've locked victim's task pool
ITT_NOTIFY(sync_acquired, victim_arena_slot);
break;
}
else if( !sync_prepare_done ) {
// Start waiting
ITT_NOTIFY(sync_prepare, victim_arena_slot);
sync_prepare_done = true;
}
// Someone else acquired a lock, so pause and do exponential backoff.
backoff.pause();
}
__TBB_ASSERT( victim_task_pool == EmptyTaskPool ||
(victim_arena_slot->task_pool == LockedTaskPool && victim_task_pool != LockedTaskPool),
"not really locked victim's task pool?" );
return victim_task_pool;
} // generic_scheduler::lock_task_pool
inline void generic_scheduler::unlock_task_pool( arena_slot* victim_arena_slot,
task** victim_task_pool ) const {
__TBB_ASSERT( victim_arena_slot, "empty victim arena slot pointer" );
__TBB_ASSERT( victim_arena_slot->task_pool == LockedTaskPool, "victim arena slot is not locked" );
ITT_NOTIFY(sync_releasing, victim_arena_slot);
__TBB_store_with_release( victim_arena_slot->task_pool, victim_task_pool );
}
inline task* generic_scheduler::prepare_for_spawning( task* t ) {
__TBB_ASSERT( t->state()==task::allocated, "attempt to spawn task that is not in 'allocated' state" );
t->prefix().owner = this;
t->prefix().state = task::ready;
#if TBB_USE_ASSERT
if( task* parent = t->parent() ) {
internal::reference_count ref_count = parent->prefix().ref_count;
__TBB_ASSERT( ref_count>=0, "attempt to spawn task whose parent has a ref_count<0" );
__TBB_ASSERT( ref_count!=0, "attempt to spawn task whose parent has a ref_count==0 (forgot to set_ref_count?)" );
parent->prefix().extra_state |= es_ref_count_active;
}
#endif /* TBB_USE_ASSERT */
affinity_id dst_thread = t->prefix().affinity;
__TBB_ASSERT( dst_thread == 0 || is_version_3_task(*t), "backwards compatibility to TBB 2.0 tasks is broken" );
if( dst_thread != 0 && dst_thread != my_affinity_id ) {
task_proxy& proxy = (task_proxy&)allocate_task( sizeof(task_proxy),
__TBB_CONTEXT_ARG(NULL, NULL) );
// Mark as a proxy
proxy.prefix().extra_state = es_task_proxy;
proxy.outbox = &my_arena->mailbox(dst_thread);
proxy.task_and_tag = intptr_t(t)|3;
ITT_NOTIFY( sync_releasing, proxy.outbox );
// Mail the proxy - after this point t may be destroyed by another thread at any moment.
proxy.outbox->push(proxy);
return &proxy;
}
return t;
}
/** Conceptually, this method should be a member of class scheduler.
But doing so would force us to publish class scheduler in the headers. */
void generic_scheduler::local_spawn( task& first, task*& next ) {
__TBB_ASSERT( governor::is_set(this), NULL );
__TBB_ASSERT( assert_okay(), NULL );
if ( &first.prefix().next == &next ) {
// Single task is being spawned
if ( my_arena_slot->tail == task_pool_size ) {
// 1 compensates for head possibly temporarily incremented by a thief
if ( my_arena_slot->head > 1 ) {
// Move the busy part of the deque to the beginning of the allocated space
acquire_task_pool();
my_arena_slot->tail -= my_arena_slot->head;
memmove( dummy_slot.task_pool, dummy_slot.task_pool + my_arena_slot->head, my_arena_slot->tail * sizeof(task*) );
my_arena_slot->head = 0;
release_task_pool();
}
else {
grow_task_pool( task_pool_size + 1 );
}
}
dummy_slot.task_pool[my_arena_slot->tail] = prepare_for_spawning( &first );
ITT_NOTIFY(sync_releasing, my_arena_slot);
// The following store with release is required on ia64 only
size_t new_tail = my_arena_slot->tail + 1;
__TBB_store_with_release( my_arena_slot->tail, new_tail );
__TBB_ASSERT ( my_arena_slot->tail <= task_pool_size, "task deque end was overwritten" );
}
else {
// Task list is being spawned
const size_t initial_capacity = 64;
task *arr[initial_capacity];
fast_reverse_vector<task*> tasks(arr, initial_capacity);
task *t_next = NULL;
for( task* t = &first; ; t = t_next ) {
// After prepare_for_spawning returns t may already have been destroyed.
// So milk it while it is alive.
bool end = &t->prefix().next == &next;
t_next = t->prefix().next;
tasks.push_back( prepare_for_spawning(t) );
if( end )
break;
}
size_t num_tasks = tasks.size();
__TBB_ASSERT ( arena_index != null_arena_index, "invalid arena slot index" );
if ( my_arena_slot->tail + num_tasks > task_pool_size ) {
// 1 compensates for head possibly temporarily incremented by a thief
size_t new_size = my_arena_slot->tail - my_arena_slot->head + num_tasks + 1;
if ( new_size <= task_pool_size ) {
// Move the busy part of the deque to the beginning of the allocated space
acquire_task_pool();
my_arena_slot->tail -= my_arena_slot->head;
memmove( dummy_slot.task_pool, dummy_slot.task_pool + my_arena_slot->head, my_arena_slot->tail * sizeof(task*) );
my_arena_slot->head = 0;
release_task_pool();
}
else {
grow_task_pool( new_size );
}
}
#if DO_ITT_NOTIFY
else {
// The preceding if-branch issues the same ittnotify inside release_task_pool() or grow_task_pool() methods
ITT_NOTIFY(sync_releasing, my_arena_slot);
}
#endif /* DO_ITT_NOTIFY */
tasks.copy_memory( dummy_slot.task_pool + my_arena_slot->tail );
// The following store with release is required on ia64 only
size_t new_tail = my_arena_slot->tail + num_tasks;
__TBB_store_with_release( my_arena_slot->tail, new_tail );
__TBB_ASSERT ( my_arena_slot->tail <= task_pool_size, "task deque end was overwritten" );
}
#if __TBB_ARENA_PER_MASTER
if ( !in_arena() )
enter_arena();
my_arena->advertise_new_work</*Spawned=*/true>();
#else /* !__TBB_ARENA_PER_MASTER */
if ( !in_arena() ) {
if ( is_worker() )
enter_arena();
else
try_enter_arena();
}
my_arena->mark_pool_full();
#endif /* !__TBB_ARENA_PER_MASTER */
__TBB_ASSERT( assert_okay(), NULL );
}
void generic_scheduler::local_spawn_root_and_wait( task& first, task*& next ) {
__TBB_ASSERT( governor::is_set(this), NULL );
__TBB_ASSERT( &first, NULL );
auto_empty_task dummy( __TBB_CONTEXT_ARG(this, first.prefix().context) );
internal::reference_count n = 0;
for( task* t=&first; ; t=t->prefix().next ) {
++n;
__TBB_ASSERT( !t->prefix().parent, "not a root task, or already running" );
t->prefix().parent = &dummy;
if( &t->prefix().next==&next ) break;
#if __TBB_TASK_GROUP_CONTEXT
__TBB_ASSERT( t->prefix().context == t->prefix().next->prefix().context,
"all the root tasks in list must share the same context");
#endif /* __TBB_TASK_GROUP_CONTEXT */
}
dummy.prefix().ref_count = n+1;
if( n>1 )
local_spawn( *first.prefix().next, next );
local_wait_for_all( dummy, &first );
}
inline task* generic_scheduler::get_mailbox_task() {
__TBB_ASSERT( my_affinity_id>0, "not in arena" );
task* result = NULL;
while( task_proxy* t = inbox.pop() ) {
intptr_t tat = __TBB_load_with_acquire(t->task_and_tag);
__TBB_ASSERT( tat==task_proxy::mailbox_bit || (tat==(tat|3)&&tat!=3), NULL );
if( tat!=task_proxy::mailbox_bit && __TBB_CompareAndSwapW( &t->task_and_tag, task_proxy::pool_bit, tat )==tat ) {
// Successfully grabbed the task, and left pool seeker with job of freeing the proxy.
ITT_NOTIFY( sync_acquired, inbox.outbox() );
result = (task*)(tat & ~3);
result->prefix().owner = this;
break;
}
free_task_proxy( *t );
}
return result;
}
inline task* generic_scheduler::strip_proxy( task_proxy* tp ) {
__TBB_ASSERT( tp->prefix().extra_state==es_task_proxy, NULL );
intptr_t tat = __TBB_load_with_acquire(tp->task_and_tag);
if( (tat&3)==3 ) {
// proxy is shared by a pool and a mailbox.
// Attempt to transition it to "empty proxy in mailbox" state.
if( __TBB_CompareAndSwapW( &tp->task_and_tag, task_proxy::mailbox_bit, tat )==tat ) {
// Successfully grabbed the task, and left the mailbox with the job of freeing the proxy.
return (task*)(tat&~3);
}
__TBB_ASSERT( tp->task_and_tag==task_proxy::pool_bit, NULL );
} else {
// We have exclusive access to the proxy
__TBB_ASSERT( (tat&3)==task_proxy::pool_bit, "task did not come from pool?" );
__TBB_ASSERT ( !(tat&~3), "Empty proxy in the pool contains non-zero task pointer" );
}
#if TBB_USE_ASSERT
tp->prefix().state = task::allocated;
#endif
free_task_proxy( *tp );
// Another thread grabbed the underlying task via their mailbox
return NULL;
}
#if __TBB_ARENA_PER_MASTER
void generic_scheduler::local_enqueue( task& t ) {
__TBB_ASSERT( governor::is_set(this), NULL );
__TBB_ASSERT( t.state()==task::allocated, "attempt to enqueue task that is not in 'allocated' state" );
t.prefix().owner = this;
t.prefix().state = task::ready;
#if TBB_USE_ASSERT
if( task* parent = t.parent() ) {
internal::reference_count ref_count = parent->prefix().ref_count;
__TBB_ASSERT( ref_count>=0, "attempt to enqueue task whose parent has a ref_count<0" );
__TBB_ASSERT( ref_count!=0, "attempt to enqueue task whose parent has a ref_count==0 (forgot to set_ref_count?)" );
parent->prefix().extra_state |= es_ref_count_active;
}
__TBB_ASSERT(t.prefix().affinity==affinity_id(0), "affinity is ignored for enqueued tasks");
#endif /* TBB_USE_ASSERT */
__TBB_ASSERT( my_arena, "thread is not in any arena" );
ITT_NOTIFY(sync_releasing, &my_arena->my_task_stream);
my_arena->my_task_stream.push( &t, my_arena_slot->hint_for_push );
my_arena->advertise_new_work< /*Spawned=*/ false >();
__TBB_ASSERT( assert_okay(), NULL );
}
inline task* generic_scheduler::dequeue_task() {
task* result = NULL;
my_arena->my_task_stream.pop(result, my_arena_slot->hint_for_pop);
if (result) ITT_NOTIFY(sync_acquired, &my_arena->my_task_stream);
return result;
}
#endif /* __TBB_ARENA_PER_MASTER */
inline task* generic_scheduler::get_task() {
task* result = NULL;
retry:
--my_arena_slot->tail;
__TBB_rel_acq_fence();
if ( (intptr_t)my_arena_slot->head > (intptr_t)my_arena_slot->tail ) {
acquire_task_pool();
if ( (intptr_t)my_arena_slot->head <= (intptr_t)my_arena_slot->tail ) {
// The thief backed off - grab the task
__TBB_ASSERT_VALID_TASK_PTR( dummy_slot.task_pool[my_arena_slot->tail] );
result = dummy_slot.task_pool[my_arena_slot->tail];
__TBB_POISON_TASK_PTR( dummy_slot.task_pool[my_arena_slot->tail] );
}
else {
__TBB_ASSERT ( my_arena_slot->head == my_arena_slot->tail + 1, "victim/thief arbitration algorithm failure" );
}
if ( (intptr_t)my_arena_slot->head < (intptr_t)my_arena_slot->tail ) {
release_task_pool();
}
else {
// In any case the deque is empty now, so compact it
my_arena_slot->head = my_arena_slot->tail = 0;
if ( in_arena() )
leave_arena();
}
}
else {
__TBB_ASSERT_VALID_TASK_PTR( dummy_slot.task_pool[my_arena_slot->tail] );
result = dummy_slot.task_pool[my_arena_slot->tail];
__TBB_POISON_TASK_PTR( dummy_slot.task_pool[my_arena_slot->tail] );
}
if( result && is_proxy(*result) ) {
result = strip_proxy((task_proxy*)result);
if( !result ) {
goto retry;
}
GATHER_STATISTIC( ++proxy_execute_count );
// Following assertion should be true because TBB 2.0 tasks never specify affinity, and hence are not proxied.
__TBB_ASSERT( is_version_3_task(*result), "backwards compatibility with TBB 2.0 broken" );
// Task affinity has changed.
innermost_running_task = result;
result->note_affinity(my_affinity_id);
}
return result;
} // generic_scheduler::get_task
task* generic_scheduler::steal_task( arena_slot& victim_slot ) {
task** victim_pool = lock_task_pool( &victim_slot );
if ( !victim_pool )
return NULL;
const size_t none = ~size_t(0);
size_t first_skipped_proxy = none;
task* result = NULL;
retry:
++victim_slot.head;
__TBB_rel_acq_fence();
if ( (intptr_t)victim_slot.head > (intptr_t)victim_slot.tail ) {
--victim_slot.head;
}
else {
__TBB_ASSERT_VALID_TASK_PTR( victim_pool[victim_slot.head - 1]);
result = victim_pool[victim_slot.head - 1];
if( is_proxy(*result) ) {
task_proxy& tp = *static_cast<task_proxy*>(result);
// If task will likely be grabbed by whom it was mailed to, skip it.
if( (tp.task_and_tag & 3) == 3 && tp.outbox->recipient_is_idle() ) {
if ( first_skipped_proxy == none )
first_skipped_proxy = victim_slot.head - 1;
result = NULL;
goto retry;
}
}
__TBB_POISON_TASK_PTR(victim_pool[victim_slot.head - 1]);
}
if ( first_skipped_proxy != none ) {
if ( result ) {
victim_pool[victim_slot.head - 1] = victim_pool[first_skipped_proxy];
__TBB_POISON_TASK_PTR( victim_pool[first_skipped_proxy] );
__TBB_store_with_release( victim_slot.head, first_skipped_proxy + 1 );
}
else
__TBB_store_with_release( victim_slot.head, first_skipped_proxy );
}
unlock_task_pool( &victim_slot, victim_pool );
return result;
}
inline void generic_scheduler::do_enter_arena() {
my_arena_slot = &my_arena->slot[arena_index];
__TBB_ASSERT ( my_arena_slot->head == my_arena_slot->tail, "task deque of a free slot must be empty" );
__TBB_ASSERT ( dummy_slot.head < dummy_slot.tail, "entering arena without tasks to share" );
my_arena_slot->head = dummy_slot.head;
my_arena_slot->tail = dummy_slot.tail;
// Release signal on behalf of previously spawned tasks (when this thread was not in arena yet)
ITT_NOTIFY(sync_releasing, my_arena_slot);
__TBB_store_with_release( my_arena_slot->task_pool, dummy_slot.task_pool );
// We'll leave arena only when it's empty, so clean up local instances of indices.
dummy_slot.head = dummy_slot.tail = 0;
}
void generic_scheduler::enter_arena() {
__TBB_ASSERT ( my_arena, "no arena: initialization not completed?" );
#if __TBB_ARENA_PER_MASTER
__TBB_ASSERT ( !in_arena(), "thread is already in arena?" );
__TBB_ASSERT ( arena_index < my_arena->my_num_slots, "arena slot index is out-of-bound" );
#else /* !__TBB_ARENA_PER_MASTER */
__TBB_ASSERT ( is_worker(), "only workers should use enter_arena()" );
__TBB_ASSERT ( !in_arena(), "worker already in arena?" );
__TBB_ASSERT ( arena_index < my_arena->prefix().number_of_workers, "invalid worker arena slot index" );
#endif /* !__TBB_ARENA_PER_MASTER */
__TBB_ASSERT ( my_arena->slot[arena_index].task_pool == EmptyTaskPool, "someone else grabbed my arena slot?" );
do_enter_arena();
}
#if !__TBB_ARENA_PER_MASTER
void generic_scheduler::try_enter_arena() {
__TBB_ASSERT ( !is_worker(), "only masters should use try_enter_arena()" );
__TBB_ASSERT ( my_arena, "no arena: initialization not completed?" );
__TBB_ASSERT ( !in_arena(), "master already in arena?" );
__TBB_ASSERT ( arena_index >= my_arena->prefix().number_of_workers &&
arena_index < my_arena->prefix().number_of_slots, "invalid arena slot hint value" );
size_t h = arena_index;
// We do not lock task pool upon successful entering arena
if( my_arena->slot[h].task_pool != EmptyTaskPool ||
__TBB_CompareAndSwapW( &my_arena->slot[h].task_pool, (intptr_t)LockedTaskPool,
(intptr_t)EmptyTaskPool ) != (intptr_t)EmptyTaskPool )
{
// Hinted arena slot is already busy, try some of the others at random
unsigned first = my_arena->prefix().number_of_workers,
last = my_arena->prefix().number_of_slots;
unsigned n = last - first - 1;
/// \todo Is this limit reasonable?
size_t max_attempts = last - first;
for (;;) {
size_t k = first + random.get() % n;
if( k >= h )
++k; // Adjusts random distribution to exclude previously tried slot
h = k;
if( my_arena->slot[h].task_pool == EmptyTaskPool &&
__TBB_CompareAndSwapW( &my_arena->slot[h].task_pool, (intptr_t)LockedTaskPool,
(intptr_t)EmptyTaskPool ) == (intptr_t)EmptyTaskPool )
{
break;
}
if ( --max_attempts == 0 ) {
// After so many attempts we are still unable to find a vacant arena slot.
// Cease the vain effort and work outside of arena for a while.
return;
}
}
}
// Successfully claimed a slot in the arena.
ITT_NOTIFY(sync_acquired, &my_arena->slot[h]);
__TBB_ASSERT ( my_arena->slot[h].task_pool == LockedTaskPool, "arena slot is not actually acquired" );
arena_index = h;
do_enter_arena();
attach_mailbox( affinity_id(h+1) );
}
#endif /* !__TBB_ARENA_PER_MASTER */
void generic_scheduler::leave_arena() {
__TBB_ASSERT( in_arena(), "Not in arena" );
// Do not reset arena_index. It will be used to (attempt to) re-acquire the slot next time
__TBB_ASSERT( &my_arena->slot[arena_index] == my_arena_slot, "arena slot and slot index mismatch" );
__TBB_ASSERT ( my_arena_slot->task_pool == LockedTaskPool, "Task pool must be locked when leaving arena" );
__TBB_ASSERT ( my_arena_slot->head == my_arena_slot->tail, "Cannot leave arena when the task pool is not empty" );
#if !__TBB_ARENA_PER_MASTER
if ( !is_worker() ) {
my_affinity_id = 0;
inbox.detach();
}
#endif /* !__TBB_ARENA_PER_MASTER */
ITT_NOTIFY(sync_releasing, &my_arena->slot[arena_index]);
__TBB_store_with_release( my_arena_slot->task_pool, EmptyTaskPool );
my_arena_slot = &dummy_slot;
}
#if __TBB_ARENA_PER_MASTER
generic_scheduler* generic_scheduler::create_worker( market& m, size_t index ) {
generic_scheduler* s = allocate_scheduler( NULL, index );
#if __TBB_TASK_GROUP_CONTEXT
s->dummy_task->prefix().context = &dummy_context;
// Sync up the local cancellation state with the global one. No need for fence here.
s->local_cancel_count = global_cancel_count;
#endif /* __TBB_TASK_GROUP_CONTEXT */
s->my_market = &m;
s->init_stack_info();
return s;
}
#else /* !__TBB_ARENA_PER_MASTER */
generic_scheduler* generic_scheduler::create_worker( arena& a, size_t index ) {
generic_scheduler* s = allocate_scheduler( &a, index );
// Put myself into the arena
#if __TBB_TASK_GROUP_CONTEXT
s->dummy_task->prefix().context = &dummy_context;
// Sync up the local cancellation state with the global one. No need for fence here.
s->local_cancel_count = global_cancel_count;
#endif /* __TBB_TASK_GROUP_CONTEXT */
s->attach_mailbox( index+1 );
s->init_stack_info();
__TBB_store_with_release( a.prefix().worker_list[index].scheduler, s );
return s;
}
#endif /* !__TBB_ARENA_PER_MASTER */
generic_scheduler* generic_scheduler::create_master( arena& a ) {
generic_scheduler* s = allocate_scheduler( &a,
#if __TBB_ARENA_PER_MASTER
0 // Master thread always occupies the first slot
#else /* !__TBB_ARENA_PER_MASTER */
null_arena_index // Master thread will have to search for a vacant slot
#endif /* !__TBB_ARENA_PER_MASTER */
);
task& t = *s->dummy_task;
s->innermost_running_task = &t;
t.prefix().ref_count = 1;
governor::sign_on(s);
__TBB_ASSERT( &task::self()==&t, "governor::sign_on failed?" );
#if __TBB_ARENA_PER_MASTER
#if __TBB_TASK_GROUP_CONTEXT
// Context to be used by root tasks by default (if the user has not specified one).
// Allocation is done by NFS allocator because we cannot reuse memory allocated
// for task objects since the free list is empty at the moment.
t.prefix().context = a.my_master_default_ctx =
new ( NFS_Allocate(sizeof(task_group_context), 1, NULL) ) task_group_context(task_group_context::isolated);
#endif
s->my_market = a.my_market;
__TBB_ASSERT( s->arena_index == 0, "Master thread must occupy the first slot in its arena" );
s->attach_mailbox(1);
a.slot[0].my_scheduler = s;
#if _WIN32|_WIN64
__TBB_ASSERT( s->my_market, NULL );
s->my_market->register_master( s->master_exec_resource );
#endif /* _WIN32|_WIN64 */
#else /* !__TBB_ARENA_PER_MASTER */
#if _WIN32|_WIN64
s->register_master();
#endif
#if __TBB_TASK_GROUP_CONTEXT
// Context to be used by root tasks by default (if the user has not specified one).
// Allocation is done by NFS allocator because we cannot reuse memory allocated
// for task objects since the free list is empty at the moment.
t.prefix().context = new ( NFS_Allocate(sizeof(task_group_context), 1, NULL) ) task_group_context(task_group_context::isolated);
scheduler_list_node_t &node = s->my_node;
{
mutex::scoped_lock lock(the_scheduler_list_mutex);
node.my_next = the_scheduler_list_head.my_next;
node.my_prev = &the_scheduler_list_head;
the_scheduler_list_head.my_next->my_prev = &node;
the_scheduler_list_head.my_next = &node;
#endif /* __TBB_TASK_GROUP_CONTEXT */
unsigned last = a.prefix().number_of_slots,
cur_limit = a.prefix().limit;
// This slot index assignment is just a hint to ...
if ( cur_limit < last ) {
// ... to prevent competition between the first few masters.
s->arena_index = cur_limit++;
// In the absence of exception handling this code is a subject to data
// race in case of multiple masters concurrently entering empty arena.
// But it does not affect correctness, and can only result in a few
// masters competing for the same arena slot during the first acquisition.
// The cost of competition is low in comparison to that of oversubscription.
a.prefix().limit = cur_limit;
}
else {
// ... to minimize the probability of competition between multiple masters.
unsigned first = a.prefix().number_of_workers;
s->arena_index = first + s->random.get() % (last - first);
}
#if __TBB_TASK_GROUP_CONTEXT
}
#endif
#endif /* !__TBB_ARENA_PER_MASTER */
s->init_stack_info();
#if __TBB_TASK_GROUP_CONTEXT
// Sync up the local cancellation state with the global one. No need for fence here.
s->local_cancel_count = global_cancel_count;
#endif
#if __TBB_SCHEDULER_OBSERVER
// Process any existing observers.
s->notify_entry_observers();
#endif /* __TBB_SCHEDULER_OBSERVER */
return s;
}
void generic_scheduler::cleanup_worker( void* arg, bool is_worker ) {
generic_scheduler& s = *(generic_scheduler*)arg;
__TBB_ASSERT( s.dummy_slot.task_pool, "cleaning up worker with missing task pool" );
// APM TODO: Decide how observers should react to each entry/leave to/from arena
#if __TBB_SCHEDULER_OBSERVER
s.notify_exit_observers( is_worker );
#endif /* __TBB_SCHEDULER_OBSERVER */
// When comparing "head" and "tail" indices ">=" is used because this worker's
// task pool may still be published in the arena, and thieves can optimistically
// bump "head" (and then roll back).
__TBB_ASSERT( s.my_arena_slot->task_pool == EmptyTaskPool || s.my_arena_slot->head >= s.my_arena_slot->tail,
"worker has unfinished work at run down" );
s.free_scheduler();
}
void generic_scheduler::cleanup_master() {
generic_scheduler& s = *this; // for similarity with cleanup_worker
__TBB_ASSERT( s.dummy_slot.task_pool, "cleaning up master with missing task pool" );
#if __TBB_SCHEDULER_OBSERVER
s.notify_exit_observers(/*is_worker=*/false);
#endif /* __TBB_SCHEDULER_OBSERVER */
if ( !local_task_pool_empty() ) {
__TBB_ASSERT ( governor::is_set(this), "TLS slot is cleared before the task pool cleanup" );
s.local_wait_for_all( *s.dummy_task, NULL );
__TBB_ASSERT ( governor::is_set(this), "Other thread reused our TLS key during the task pool cleanup" );
}
#if __TBB_ARENA_PER_MASTER
#if _WIN32|_WIN64
__TBB_ASSERT( s.my_market, NULL );
s.my_market->unregister_master( s.master_exec_resource );
#endif /* _WIN32|_WIN64 */
arena* a = s.my_arena;
#else /* !__TBB_ARENA_PER_MASTER */
#if _WIN32|_WIN64
s.unregister_master();
#endif /* _WIN32|_WIN64 */
#endif /* __TBB_ARENA_PER_MASTER */
s.free_scheduler();
#if __TBB_ARENA_PER_MASTER
a->slot[0].my_scheduler = NULL;
// Do not close arena if some fire-and-forget tasks remain; workers should care of it.
if( a->my_task_stream.empty() && a->pool_state.fetch_and_store(arena::SNAPSHOT_EMPTY)!=arena::SNAPSHOT_EMPTY )
a->my_market->adjust_demand( *a, -(int)a->my_max_num_workers );
if ( --a->my_num_threads_active==0 && a->pool_state==arena::SNAPSHOT_EMPTY )
a->close_arena();
#else /* !__TBB_ARENA_PER_MASTER */
governor::finish_with_arena();
#endif /* !__TBB_ARENA_PER_MASTER */
}
#if __TBB_SCHEDULER_OBSERVER
void generic_scheduler::notify_entry_observers() {
local_last_observer_proxy = observer_proxy::process_list(local_last_observer_proxy,is_worker(),/*is_entry=*/true);
}
void generic_scheduler::notify_exit_observers( bool is_worker ) {
observer_proxy::process_list(local_last_observer_proxy,is_worker,/*is_entry=*/false);
}
#endif /* __TBB_SCHEDULER_OBSERVER */
} // namespace internal
} // namespace tbb