#ifndef _TBB_custom_scheduler_H
#define _TBB_custom_scheduler_H
#include "scheduler.h"
#include "observer_proxy.h"
#include "itt_notify.h"
namespace tbb {
namespace internal {
//! Amount of time to pause between steals.
/** The default values below were found to be best empirically for K-Means
on the 32-way Altix and 4-way (*2 for HT) fxqlin04. */
#if __TBB_ipf
static const long PauseTime = 1500;
static const long PauseTime = 80;
//! Traits classes for scheduler
struct DefaultSchedulerTraits {
static const bool itt_possible = true;
static const bool has_slow_atomic = false;
struct IntelSchedulerTraits {
static const bool itt_possible = false;
#if __TBB_x86_32||__TBB_x86_64
static const bool has_slow_atomic = true;
static const bool has_slow_atomic = false;
#endif /* __TBB_x86_32||__TBB_x86_64 */
// custom_scheduler
//! A scheduler with a customized evaluation loop.
/** The customization can use SchedulerTraits to make decisions without needing a run-time check. */
template<typename SchedulerTraits>
class custom_scheduler: private generic_scheduler {
typedef custom_scheduler<SchedulerTraits> scheduler_type;
//! Scheduler loop that dispatches tasks.
/** If child is non-NULL, it is dispatched first.
Then, until "parent" has a reference count of 1, other task are dispatched or stolen. */
void local_wait_for_all( task& parent, task* child );
//! Entry point from client code to the scheduler loop that dispatches tasks.
/** The method is virtual, but the *this object is used only for sake of dispatching on the correct vtable,
not necessarily the correct *this object. The correct *this object is looked up in TLS. */
void wait_for_all( task& parent, task* child ) {
static_cast<custom_scheduler*>(governor::local_scheduler())->scheduler_type::local_wait_for_all( parent, child );
//! Construct a custom_scheduler
custom_scheduler( arena* a, size_t index ) : generic_scheduler(a, index) {}
//! Decrements ref_count of a predecessor.
/** If it achieves 0, the predecessor is scheduled for execution.
When changing, remember that this is a hot path function. */
void tally_completion_of_predecessor( task& s, task*& bypass_slot ) {
task_prefix& p = s.prefix();
if( SchedulerTraits::itt_possible )
ITT_NOTIFY(sync_releasing, &p.ref_count);
if( SchedulerTraits::has_slow_atomic && p.ref_count==1 ) {
} else {
if( __TBB_FetchAndDecrementWrelease(&p.ref_count) > 1 ) // more references exist
__TBB_ASSERT(p.ref_count==0, "completion of task caused predecessor's reference count to underflow");
if( SchedulerTraits::itt_possible )
ITT_NOTIFY(sync_acquired, &p.ref_count);
p.extra_state &= ~es_ref_count_active;
#endif /* TBB_USE_ASSERT */
if( bypass_slot==NULL )
bypass_slot = &s;
local_spawn( s, s.prefix().next );
static generic_scheduler* allocate_scheduler( arena* a, size_t index ) {
__TBB_ASSERT( a, "missing arena" );
#endif /* !__TBB_ARENA_PER_MASTER */
scheduler_type* s = (scheduler_type*)NFS_Allocate(sizeof(scheduler_type),1,NULL);
new( s ) scheduler_type( a, index );
__TBB_ASSERT( s->assert_okay(), NULL );
ITT_SYNC_CREATE(s, SyncType_Scheduler, SyncObj_TaskPoolSpinning);
return s;
//! Try getting a task from the mailbox or stealing from another scheduler.
/** Returns the stolen task or NULL if all attempts fail. */
/* override */ task* receive_or_steal_task( reference_count&, bool );
}; // class custom_scheduler<>
// custom_scheduler methods
template<typename SchedulerTraits>
task* custom_scheduler<SchedulerTraits>::receive_or_steal_task( reference_count& completion_ref_count,
bool return_if_no_work ) {
task* t = NULL;
inbox.set_is_idle( true );
// The state "failure_count==-1" is used only when itt_possible is true,
// and denotes that a sync_prepare has not yet been issued.
for( int failure_count = -static_cast<int>(SchedulerTraits::itt_possible);; ++failure_count) {
if( completion_ref_count==1 ) {
if( SchedulerTraits::itt_possible ) {
if( failure_count!=-1 ) {
ITT_NOTIFY(sync_prepare, &completion_ref_count);
// Notify Intel(R) Thread Profiler that thread has stopped spinning.
ITT_NOTIFY(sync_acquired, this);
ITT_NOTIFY(sync_acquired, &completion_ref_count);
inbox.set_is_idle( false );
return NULL;
size_t n = my_arena->my_limit;
__TBB_ASSERT( arena_index < n, NULL );
#else /* !__TBB_ARENA_PER_MASTER */
size_t n = my_arena->prefix().limit;
#endif /* !__TBB_ARENA_PER_MASTER */
if( n>1 ) {
if( my_affinity_id && (t=get_mailbox_task()) ) {
GATHER_STATISTIC( ++mail_received_count );
// Check if there are tasks in starvation-resistant stream.
// Only allowed for workers with empty stack, which is identified by return_if_no_work.
else if ( return_if_no_work && (t=dequeue_task()) ) {
// just proceed with the obtained task
// Check if the resource manager requires our arena to relinquish some threads
else if ( return_if_no_work && (my_arena->my_num_workers_allotted < my_arena->num_workers_active()) )
return NULL;
#endif /* __TBB_ARENA_PER_MASTER */
else {
// Try to steal a task from a random victim.
if ( !can_steal() )
goto fail;
size_t k = random.get() % (n-1);
arena_slot* victim = &my_arena->slot[k];
// The following condition excludes the master that might have
// already taken our previous place in the arena from the list .
// of potential victims. But since such a situation can take
// place only in case of significant oversubscription, keeping
// the checks simple seems to be preferable to complicating the code.
if( k >= arena_index )
++victim; // Adjusts random distribution to exclude self
t = steal_task( *victim );
if( !t ) goto fail;
if( is_proxy(*t) ) {
t = strip_proxy((task_proxy*)t);
if( !t ) goto fail;
GATHER_STATISTIC( ++proxy_steal_count );
t->prefix().extra_state |= es_task_is_stolen;
if( is_version_3_task(*t) ) {
innermost_running_task = t;
t->note_affinity( my_affinity_id );
GATHER_STATISTIC( ++steal_count );
// No memory fence required for read of global_last_observer_proxy, because prior fence on steal/mailbox suffices.
if( local_last_observer_proxy!=global_last_observer_proxy ) {
if( SchedulerTraits::itt_possible ) {
if( failure_count!=-1 ) {
// FIXME - might be victim, or might be selected from a mailbox
// Notify Intel(R) Thread Profiler that thread has stopped spinning.
ITT_NOTIFY(sync_acquired, this);
inbox.set_is_idle( false );
break; // jumps to: return t;
if( SchedulerTraits::itt_possible && failure_count==-1 ) {
// The first attempt to steal work failed, so notify Intel(R) Thread Profiler that
// the thread has started spinning. Ideally, we would do this notification
// *before* the first failed attempt to steal, but at that point we do not
// know that the steal will fail.
ITT_NOTIFY(sync_prepare, this);
failure_count = 0;
// Pause, even if we are going to yield, because the yield might return immediately.
int yield_threshold = 2*int(n);
if( failure_count>=yield_threshold ) {
if( failure_count>=yield_threshold+100 ) {
// When a worker thread has nothing to do, return it to RML.
// For purposes of affinity support, the thread is considered idle while in RML.
if( return_if_no_work && my_arena->is_out_of_work() )
return NULL;
failure_count = yield_threshold;
return t;
template<typename SchedulerTraits>
void custom_scheduler<SchedulerTraits>::local_wait_for_all( task& parent, task* child ) {
__TBB_ASSERT( governor::is_set(this), NULL );
if( child ) {
child->prefix().owner = this;
__TBB_ASSERT( parent.ref_count() >= (child && child->parent() == &parent ? 2 : 1), "ref_count is too small" );
__TBB_ASSERT( assert_okay(), NULL );
// Using parent's refcount in sync_prepare (in the stealing loop below) is
// a workaround for TP. We need to name it here to display correctly in Ampl.
if( SchedulerTraits::itt_possible )
ITT_SYNC_CREATE(&parent.prefix().ref_count, SyncType_Scheduler, SyncObj_TaskStealingLoop);
__TBB_ASSERT( parent.prefix().context || (is_worker() && &parent == dummy_task), "parent task does not have context" );
task* t = child;
// Constant all_local_work_done is an unreacheable refcount value that prevents
// early quitting the dispatch loop. It is defined to be in the middle of the range
// of negative values representable by the reference_count type.
static const reference_count
// For normal dispatch loops
parents_work_done = 1,
// For termination dispatch loops in masters
all_local_work_done = (reference_count)3 << (sizeof(reference_count) * 8 - 2);
reference_count quit_point;
if( innermost_running_task == dummy_task ) {
// We are in the outermost task dispatch loop of a master thread,
__TBB_ASSERT( !is_worker(), NULL );
quit_point = &parent == dummy_task ? all_local_work_done : parents_work_done;
} else {
quit_point = parents_work_done;
task* old_innermost_running_task = innermost_running_task;
try {
// Outer loop steals tasks when necessary.
for(;;) {
// Middle loop evaluates tasks that are pulled off "array".
do {
// Inner loop evaluates tasks that are handed directly to us by other tasks.
while(t) {
__TBB_ASSERT( inbox.assert_is_idle(false), NULL );
__TBB_ASSERT(!is_proxy(*t),"unexpected proxy");
__TBB_ASSERT( t->prefix().owner==this, NULL );
if ( !t->prefix().context->my_cancellation_requested )
__TBB_ASSERT( 1L<<t->state() & (1L<<task::allocated|1L<<task::ready|1L<<task::reexecute), NULL );
#endif /* TBB_USE_ASSERT */
task* t_next = NULL;
innermost_running_task = t;
t->prefix().state = task::executing;
if ( !t->prefix().context->my_cancellation_requested )
GATHER_STATISTIC( ++execute_count );
if( SchedulerTraits::itt_possible )
ITT_STACK(callee_enter, t->prefix().context->itt_caller);
t_next = t->execute();
if( SchedulerTraits::itt_possible )
ITT_STACK(callee_leave, t->prefix().context->itt_caller);
if (t_next) {
__TBB_ASSERT( t_next->state()==task::allocated,
"if task::execute() returns task, it must be marked as allocated" );
t_next->prefix().extra_state &= ~es_task_is_stolen;
affinity_id next_affinity=t_next->prefix().affinity;
if (next_affinity != 0 && next_affinity != my_affinity_id)
GATHER_STATISTIC( ++proxy_bypass_count );
switch( task::state_type(t->prefix().state) ) {
case task::executing: {
task* s = t->parent();
__TBB_ASSERT( innermost_running_task==t, NULL );
__TBB_ASSERT( t->prefix().ref_count==0, "Task still has children after it has been executed" );
if( s )
tally_completion_of_predecessor(*s, t_next);
free_task<no_hint>( *t );
case task::recycle: // set by recycle_as_safe_continuation()
t->prefix().state = task::allocated;
__TBB_ASSERT( t_next != t, "a task returned from method execute() can not be recycled in another way" );
t->prefix().extra_state &= ~es_task_is_stolen;
// for safe continuation, need atomically decrement ref_count;
tally_completion_of_predecessor(*t, t_next);
case task::reexecute: // set by recycle_to_reexecute()
__TBB_ASSERT( t_next, "reexecution requires that method execute() return another task" );
__TBB_ASSERT( t_next != t, "a task returned from method execute() can not be recycled in another way" );
t->prefix().state = task::allocated;
t->prefix().extra_state &= ~es_task_is_stolen;
local_spawn( *t, t->prefix().next );
case task::allocated:
t->prefix().extra_state &= ~es_task_is_stolen;
case task::ready:
__TBB_ASSERT( false, "task is in READY state upon return from method execute()" );
__TBB_ASSERT( false, "illegal state" );
default: // just to shut up some compilation warnings
#endif /* TBB_USE_ASSERT */
if( t_next ) {
// The store here has a subtle secondary effect - it fetches *t_next into cache.
t_next->prefix().owner = this;
t = t_next;
} // end of scheduler bypass loop
if ( parent.prefix().ref_count == quit_point )
t = get_task();
__TBB_ASSERT(!t || !is_proxy(*t),"unexpected proxy");
if(t) {
__TBB_ASSERT( t->prefix().owner==this, "thread got task that it does not own" );
#endif /* TBB_USE_ASSERT */
} while( t ); // end of local task array processing loop
if ( quit_point == all_local_work_done ) {
__TBB_ASSERT( my_arena_slot == &dummy_slot && my_arena_slot->head == 0 && my_arena_slot->tail == 0, NULL );
innermost_running_task = old_innermost_running_task;
__TBB_ASSERT( my_arena->my_max_num_workers > 0 || parent.prefix().ref_count == 1, "deadlock detected" );
#else /* !__TBB_ARENA_PER_MASTER */
__TBB_ASSERT( my_arena->prefix().number_of_workers>0||parent.prefix().ref_count==1, "deadlock detected" );
#endif /* !__TBB_ARENA_PER_MASTER */
// old_innermost_running_task is NULL *iff* a worker thread is in its "inborn" dispath loop
// (i.e. its execution stack is empty), and it should return from there if no work is available.
t = receive_or_steal_task( parent.prefix().ref_count, !old_innermost_running_task );
if (!t) {
if( parent.prefix().ref_count==1 ) goto done;
__TBB_ASSERT( is_worker() && !old_innermost_running_task, "a thread exits dispatch loop prematurely" );
innermost_running_task = NULL;
__TBB_ASSERT(!is_proxy(*t),"unexpected proxy");
t->prefix().owner = this;
} // end of stealing loop
} TbbCatchAll( t->prefix().context );
if( task::state_type(t->prefix().state) == task::recycle ) { // state set by recycle_as_safe_continuation()
t->prefix().state = task::allocated;
// for safe continuation, need to atomically decrement ref_count;
if( SchedulerTraits::itt_possible )
ITT_NOTIFY(sync_releasing, &t->prefix().ref_count);
if( __TBB_FetchAndDecrementWrelease(&t->prefix().ref_count)==1 ) {
if( SchedulerTraits::itt_possible )
ITT_NOTIFY(sync_acquired, &t->prefix().ref_count);
t = NULL;
goto exception_was_caught;
if ( !ConcurrentWaitsEnabled(parent) )
parent.prefix().ref_count = 0;
parent.prefix().extra_state &= ~es_ref_count_active;
#endif /* TBB_USE_ASSERT */
innermost_running_task = old_innermost_running_task;
__TBB_ASSERT(parent.prefix().context && dummy_task->prefix().context, NULL);
task_group_context* parent_ctx = parent.prefix().context;
if ( parent_ctx->my_cancellation_requested ) {
task_group_context::exception_container_type *pe = parent_ctx->my_exception;
if ( innermost_running_task == dummy_task && parent_ctx == dummy_task->prefix().context ) {
// We are in the outermost task dispatch loop of a master thread, and
// the whole task tree has been collapsed. So we may clear cancellation data.
parent_ctx->my_cancellation_requested = 0;
__TBB_ASSERT(dummy_task->prefix().context == parent_ctx || !CancellationInfoPresent(*dummy_task),
"Unexpected exception or cancellation data in the dummy task");
// If possible, add assertion that master's dummy task context does not have children
if ( pe )
__TBB_ASSERT(!is_worker() || !CancellationInfoPresent(*dummy_task),
"Worker's dummy task context modified");
__TBB_ASSERT(innermost_running_task != dummy_task || !CancellationInfoPresent(*dummy_task),
"Unexpected exception or cancellation data in the master's dummy task");
__TBB_ASSERT( assert_okay(), NULL );
} // namespace internal
} // namespace tbb
#endif /* _TBB_custom_scheduler_H */