| /* |
| Copyright 2005-2010 Intel Corporation. All Rights Reserved. |
| |
| This file is part of Threading Building Blocks. |
| |
| Threading Building Blocks is free software; you can redistribute it |
| and/or modify it under the terms of the GNU General Public License |
| version 2 as published by the Free Software Foundation. |
| |
| Threading Building Blocks is distributed in the hope that it will be |
| useful, but WITHOUT ANY WARRANTY; without even the implied warranty |
| of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| GNU General Public License for more details. |
| |
| You should have received a copy of the GNU General Public License |
| along with Threading Building Blocks; if not, write to the Free Software |
| Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
| |
| As a special exception, you may use this file as part of a free software |
| library without restriction. Specifically, if other files instantiate |
| templates or use macros or inline functions from this file, or you compile |
| this file and link it with other files to produce an executable, this |
| file does not by itself cause the resulting executable to be covered by |
| the GNU General Public License. This exception does not however |
| invalidate any other reasons why the executable file might be covered by |
| the GNU General Public License. |
| */ |
| |
| #ifndef _TBB_custom_scheduler_H |
| #define _TBB_custom_scheduler_H |
| |
| #include "scheduler.h" |
| #include "observer_proxy.h" |
| #include "itt_notify.h" |
| |
| namespace tbb { |
| namespace internal { |
| |
| //! Amount of time to pause between steals. |
| /** The default values below were found to be best empirically for K-Means |
| on the 32-way Altix and 4-way (*2 for HT) fxqlin04. */ |
| #if __TBB_ipf |
| static const long PauseTime = 1500; |
| #else |
| static const long PauseTime = 80; |
| #endif |
| |
| //------------------------------------------------------------------------ |
| //! Traits classes for scheduler |
| //------------------------------------------------------------------------ |
| |
| struct DefaultSchedulerTraits { |
| static const bool itt_possible = true; |
| static const bool has_slow_atomic = false; |
| }; |
| |
| struct IntelSchedulerTraits { |
| static const bool itt_possible = false; |
| #if __TBB_x86_32||__TBB_x86_64 |
| static const bool has_slow_atomic = true; |
| #else |
| static const bool has_slow_atomic = false; |
| #endif /* __TBB_x86_32||__TBB_x86_64 */ |
| }; |
| |
| //------------------------------------------------------------------------ |
| // custom_scheduler |
| //------------------------------------------------------------------------ |
| |
| //! A scheduler with a customized evaluation loop. |
| /** The customization can use SchedulerTraits to make decisions without needing a run-time check. */ |
| template<typename SchedulerTraits> |
| class custom_scheduler: private generic_scheduler { |
| typedef custom_scheduler<SchedulerTraits> scheduler_type; |
| |
| //! Scheduler loop that dispatches tasks. |
| /** If child is non-NULL, it is dispatched first. |
| Then, until "parent" has a reference count of 1, other task are dispatched or stolen. */ |
| /*override*/ |
| void local_wait_for_all( task& parent, task* child ); |
| |
| //! Entry point from client code to the scheduler loop that dispatches tasks. |
| /** The method is virtual, but the *this object is used only for sake of dispatching on the correct vtable, |
| not necessarily the correct *this object. The correct *this object is looked up in TLS. */ |
| /*override*/ |
| void wait_for_all( task& parent, task* child ) { |
| static_cast<custom_scheduler*>(governor::local_scheduler())->scheduler_type::local_wait_for_all( parent, child ); |
| } |
| |
| //! Construct a custom_scheduler |
| custom_scheduler( arena* a, size_t index ) : generic_scheduler(a, index) {} |
| |
| //! Decrements ref_count of a predecessor. |
| /** If it achieves 0, the predecessor is scheduled for execution. |
| When changing, remember that this is a hot path function. */ |
| void tally_completion_of_predecessor( task& s, task*& bypass_slot ) { |
| task_prefix& p = s.prefix(); |
| if( SchedulerTraits::itt_possible ) |
| ITT_NOTIFY(sync_releasing, &p.ref_count); |
| if( SchedulerTraits::has_slow_atomic && p.ref_count==1 ) { |
| p.ref_count=0; |
| } else { |
| if( __TBB_FetchAndDecrementWrelease(&p.ref_count) > 1 ) // more references exist |
| return; |
| } |
| __TBB_ASSERT(p.ref_count==0, "completion of task caused predecessor's reference count to underflow"); |
| if( SchedulerTraits::itt_possible ) |
| ITT_NOTIFY(sync_acquired, &p.ref_count); |
| #if TBB_USE_ASSERT |
| p.extra_state &= ~es_ref_count_active; |
| #endif /* TBB_USE_ASSERT */ |
| |
| if( bypass_slot==NULL ) |
| bypass_slot = &s; |
| else |
| local_spawn( s, s.prefix().next ); |
| } |
| |
| public: |
| static generic_scheduler* allocate_scheduler( arena* a, size_t index ) { |
| #if !__TBB_ARENA_PER_MASTER |
| __TBB_ASSERT( a, "missing arena" ); |
| #endif /* !__TBB_ARENA_PER_MASTER */ |
| scheduler_type* s = (scheduler_type*)NFS_Allocate(sizeof(scheduler_type),1,NULL); |
| new( s ) scheduler_type( a, index ); |
| __TBB_ASSERT( s->assert_okay(), NULL ); |
| ITT_SYNC_CREATE(s, SyncType_Scheduler, SyncObj_TaskPoolSpinning); |
| return s; |
| } |
| |
| //! Try getting a task from the mailbox or stealing from another scheduler. |
| /** Returns the stolen task or NULL if all attempts fail. */ |
| /* override */ task* receive_or_steal_task( reference_count&, bool ); |
| |
| }; // class custom_scheduler<> |
| |
| //------------------------------------------------------------------------ |
| // custom_scheduler methods |
| //------------------------------------------------------------------------ |
| |
| template<typename SchedulerTraits> |
| task* custom_scheduler<SchedulerTraits>::receive_or_steal_task( reference_count& completion_ref_count, |
| bool return_if_no_work ) { |
| task* t = NULL; |
| inbox.set_is_idle( true ); |
| // The state "failure_count==-1" is used only when itt_possible is true, |
| // and denotes that a sync_prepare has not yet been issued. |
| for( int failure_count = -static_cast<int>(SchedulerTraits::itt_possible);; ++failure_count) { |
| if( completion_ref_count==1 ) { |
| if( SchedulerTraits::itt_possible ) { |
| if( failure_count!=-1 ) { |
| ITT_NOTIFY(sync_prepare, &completion_ref_count); |
| // Notify Intel(R) Thread Profiler that thread has stopped spinning. |
| ITT_NOTIFY(sync_acquired, this); |
| } |
| ITT_NOTIFY(sync_acquired, &completion_ref_count); |
| } |
| inbox.set_is_idle( false ); |
| return NULL; |
| } |
| #if __TBB_ARENA_PER_MASTER |
| size_t n = my_arena->my_limit; |
| __TBB_ASSERT( arena_index < n, NULL ); |
| #else /* !__TBB_ARENA_PER_MASTER */ |
| size_t n = my_arena->prefix().limit; |
| #endif /* !__TBB_ARENA_PER_MASTER */ |
| if( n>1 ) { |
| if( my_affinity_id && (t=get_mailbox_task()) ) { |
| GATHER_STATISTIC( ++mail_received_count ); |
| } |
| #if __TBB_ARENA_PER_MASTER |
| // Check if there are tasks in starvation-resistant stream. |
| // Only allowed for workers with empty stack, which is identified by return_if_no_work. |
| else if ( return_if_no_work && (t=dequeue_task()) ) { |
| // just proceed with the obtained task |
| } |
| // Check if the resource manager requires our arena to relinquish some threads |
| else if ( return_if_no_work && (my_arena->my_num_workers_allotted < my_arena->num_workers_active()) ) |
| return NULL; |
| #endif /* __TBB_ARENA_PER_MASTER */ |
| else { |
| // Try to steal a task from a random victim. |
| if ( !can_steal() ) |
| goto fail; |
| size_t k = random.get() % (n-1); |
| arena_slot* victim = &my_arena->slot[k]; |
| // The following condition excludes the master that might have |
| // already taken our previous place in the arena from the list . |
| // of potential victims. But since such a situation can take |
| // place only in case of significant oversubscription, keeping |
| // the checks simple seems to be preferable to complicating the code. |
| if( k >= arena_index ) |
| ++victim; // Adjusts random distribution to exclude self |
| t = steal_task( *victim ); |
| if( !t ) goto fail; |
| if( is_proxy(*t) ) { |
| t = strip_proxy((task_proxy*)t); |
| if( !t ) goto fail; |
| GATHER_STATISTIC( ++proxy_steal_count ); |
| } |
| t->prefix().extra_state |= es_task_is_stolen; |
| if( is_version_3_task(*t) ) { |
| innermost_running_task = t; |
| t->note_affinity( my_affinity_id ); |
| } |
| GATHER_STATISTIC( ++steal_count ); |
| } |
| __TBB_ASSERT(t,NULL); |
| #if __TBB_SCHEDULER_OBSERVER |
| // No memory fence required for read of global_last_observer_proxy, because prior fence on steal/mailbox suffices. |
| if( local_last_observer_proxy!=global_last_observer_proxy ) { |
| notify_entry_observers(); |
| } |
| #endif /* __TBB_SCHEDULER_OBSERVER */ |
| if( SchedulerTraits::itt_possible ) { |
| if( failure_count!=-1 ) { |
| // FIXME - might be victim, or might be selected from a mailbox |
| // Notify Intel(R) Thread Profiler that thread has stopped spinning. |
| ITT_NOTIFY(sync_acquired, this); |
| } |
| } |
| inbox.set_is_idle( false ); |
| break; // jumps to: return t; |
| } |
| fail: |
| if( SchedulerTraits::itt_possible && failure_count==-1 ) { |
| // The first attempt to steal work failed, so notify Intel(R) Thread Profiler that |
| // the thread has started spinning. Ideally, we would do this notification |
| // *before* the first failed attempt to steal, but at that point we do not |
| // know that the steal will fail. |
| ITT_NOTIFY(sync_prepare, this); |
| failure_count = 0; |
| } |
| // Pause, even if we are going to yield, because the yield might return immediately. |
| __TBB_Pause(PauseTime); |
| int yield_threshold = 2*int(n); |
| if( failure_count>=yield_threshold ) { |
| __TBB_Yield(); |
| if( failure_count>=yield_threshold+100 ) { |
| // When a worker thread has nothing to do, return it to RML. |
| // For purposes of affinity support, the thread is considered idle while in RML. |
| if( return_if_no_work && my_arena->is_out_of_work() ) |
| return NULL; |
| failure_count = yield_threshold; |
| } |
| } |
| } |
| return t; |
| } |
| |
| template<typename SchedulerTraits> |
| void custom_scheduler<SchedulerTraits>::local_wait_for_all( task& parent, task* child ) { |
| __TBB_ASSERT( governor::is_set(this), NULL ); |
| if( child ) { |
| child->prefix().owner = this; |
| } |
| __TBB_ASSERT( parent.ref_count() >= (child && child->parent() == &parent ? 2 : 1), "ref_count is too small" ); |
| __TBB_ASSERT( assert_okay(), NULL ); |
| // Using parent's refcount in sync_prepare (in the stealing loop below) is |
| // a workaround for TP. We need to name it here to display correctly in Ampl. |
| if( SchedulerTraits::itt_possible ) |
| ITT_SYNC_CREATE(&parent.prefix().ref_count, SyncType_Scheduler, SyncObj_TaskStealingLoop); |
| #if __TBB_TASK_GROUP_CONTEXT |
| __TBB_ASSERT( parent.prefix().context || (is_worker() && &parent == dummy_task), "parent task does not have context" ); |
| #endif /* __TBB_TASK_GROUP_CONTEXT */ |
| task* t = child; |
| // Constant all_local_work_done is an unreacheable refcount value that prevents |
| // early quitting the dispatch loop. It is defined to be in the middle of the range |
| // of negative values representable by the reference_count type. |
| static const reference_count |
| // For normal dispatch loops |
| parents_work_done = 1, |
| // For termination dispatch loops in masters |
| all_local_work_done = (reference_count)3 << (sizeof(reference_count) * 8 - 2); |
| reference_count quit_point; |
| if( innermost_running_task == dummy_task ) { |
| // We are in the outermost task dispatch loop of a master thread, |
| __TBB_ASSERT( !is_worker(), NULL ); |
| quit_point = &parent == dummy_task ? all_local_work_done : parents_work_done; |
| } else { |
| quit_point = parents_work_done; |
| } |
| task* old_innermost_running_task = innermost_running_task; |
| #if __TBB_TASK_GROUP_CONTEXT && TBB_USE_EXCEPTIONS |
| exception_was_caught: |
| try { |
| #endif /* __TBB_TASK_GROUP_CONTEXT && TBB_USE_EXCEPTIONS */ |
| // Outer loop steals tasks when necessary. |
| for(;;) { |
| // Middle loop evaluates tasks that are pulled off "array". |
| do { |
| // Inner loop evaluates tasks that are handed directly to us by other tasks. |
| while(t) { |
| __TBB_ASSERT( inbox.assert_is_idle(false), NULL ); |
| #if TBB_USE_ASSERT |
| __TBB_ASSERT(!is_proxy(*t),"unexpected proxy"); |
| __TBB_ASSERT( t->prefix().owner==this, NULL ); |
| #if __TBB_TASK_GROUP_CONTEXT |
| if ( !t->prefix().context->my_cancellation_requested ) |
| #endif |
| __TBB_ASSERT( 1L<<t->state() & (1L<<task::allocated|1L<<task::ready|1L<<task::reexecute), NULL ); |
| __TBB_ASSERT(assert_okay(),NULL); |
| #endif /* TBB_USE_ASSERT */ |
| task* t_next = NULL; |
| innermost_running_task = t; |
| t->prefix().state = task::executing; |
| #if __TBB_TASK_GROUP_CONTEXT |
| if ( !t->prefix().context->my_cancellation_requested ) |
| #endif |
| { |
| GATHER_STATISTIC( ++execute_count ); |
| #if __TBB_TASK_GROUP_CONTEXT |
| if( SchedulerTraits::itt_possible ) |
| ITT_STACK(callee_enter, t->prefix().context->itt_caller); |
| #endif |
| t_next = t->execute(); |
| #if __TBB_TASK_GROUP_CONTEXT |
| if( SchedulerTraits::itt_possible ) |
| ITT_STACK(callee_leave, t->prefix().context->itt_caller); |
| #endif |
| if (t_next) { |
| __TBB_ASSERT( t_next->state()==task::allocated, |
| "if task::execute() returns task, it must be marked as allocated" ); |
| t_next->prefix().extra_state &= ~es_task_is_stolen; |
| #if STATISTICS |
| affinity_id next_affinity=t_next->prefix().affinity; |
| if (next_affinity != 0 && next_affinity != my_affinity_id) |
| GATHER_STATISTIC( ++proxy_bypass_count ); |
| #endif |
| } |
| } |
| __TBB_ASSERT(assert_okay(),NULL); |
| switch( task::state_type(t->prefix().state) ) { |
| case task::executing: { |
| task* s = t->parent(); |
| __TBB_ASSERT( innermost_running_task==t, NULL ); |
| __TBB_ASSERT( t->prefix().ref_count==0, "Task still has children after it has been executed" ); |
| t->~task(); |
| if( s ) |
| tally_completion_of_predecessor(*s, t_next); |
| free_task<no_hint>( *t ); |
| __TBB_ASSERT(assert_okay(),NULL); |
| break; |
| } |
| |
| case task::recycle: // set by recycle_as_safe_continuation() |
| t->prefix().state = task::allocated; |
| __TBB_ASSERT( t_next != t, "a task returned from method execute() can not be recycled in another way" ); |
| t->prefix().extra_state &= ~es_task_is_stolen; |
| // for safe continuation, need atomically decrement ref_count; |
| tally_completion_of_predecessor(*t, t_next); |
| __TBB_ASSERT(assert_okay(),NULL); |
| break; |
| |
| case task::reexecute: // set by recycle_to_reexecute() |
| __TBB_ASSERT( t_next, "reexecution requires that method execute() return another task" ); |
| __TBB_ASSERT( t_next != t, "a task returned from method execute() can not be recycled in another way" ); |
| t->prefix().state = task::allocated; |
| t->prefix().extra_state &= ~es_task_is_stolen; |
| local_spawn( *t, t->prefix().next ); |
| __TBB_ASSERT(assert_okay(),NULL); |
| break; |
| case task::allocated: |
| t->prefix().extra_state &= ~es_task_is_stolen; |
| break; |
| #if TBB_USE_ASSERT |
| case task::ready: |
| __TBB_ASSERT( false, "task is in READY state upon return from method execute()" ); |
| break; |
| default: |
| __TBB_ASSERT( false, "illegal state" ); |
| #else |
| default: // just to shut up some compilation warnings |
| break; |
| #endif /* TBB_USE_ASSERT */ |
| } |
| |
| if( t_next ) { |
| // The store here has a subtle secondary effect - it fetches *t_next into cache. |
| t_next->prefix().owner = this; |
| } |
| t = t_next; |
| } // end of scheduler bypass loop |
| __TBB_ASSERT(assert_okay(),NULL); |
| |
| if ( parent.prefix().ref_count == quit_point ) |
| break; |
| t = get_task(); |
| __TBB_ASSERT(!t || !is_proxy(*t),"unexpected proxy"); |
| #if TBB_USE_ASSERT |
| __TBB_ASSERT(assert_okay(),NULL); |
| if(t) { |
| AssertOkay(*t); |
| __TBB_ASSERT( t->prefix().owner==this, "thread got task that it does not own" ); |
| } |
| #endif /* TBB_USE_ASSERT */ |
| } while( t ); // end of local task array processing loop |
| |
| if ( quit_point == all_local_work_done ) { |
| __TBB_ASSERT( my_arena_slot == &dummy_slot && my_arena_slot->head == 0 && my_arena_slot->tail == 0, NULL ); |
| innermost_running_task = old_innermost_running_task; |
| return; |
| } |
| #if __TBB_ARENA_PER_MASTER |
| __TBB_ASSERT( my_arena->my_max_num_workers > 0 || parent.prefix().ref_count == 1, "deadlock detected" ); |
| #else /* !__TBB_ARENA_PER_MASTER */ |
| __TBB_ASSERT( my_arena->prefix().number_of_workers>0||parent.prefix().ref_count==1, "deadlock detected" ); |
| #endif /* !__TBB_ARENA_PER_MASTER */ |
| // old_innermost_running_task is NULL *iff* a worker thread is in its "inborn" dispath loop |
| // (i.e. its execution stack is empty), and it should return from there if no work is available. |
| t = receive_or_steal_task( parent.prefix().ref_count, !old_innermost_running_task ); |
| if (!t) { |
| if( parent.prefix().ref_count==1 ) goto done; |
| __TBB_ASSERT( is_worker() && !old_innermost_running_task, "a thread exits dispatch loop prematurely" ); |
| innermost_running_task = NULL; |
| return; |
| } |
| __TBB_ASSERT(t,NULL); |
| __TBB_ASSERT(!is_proxy(*t),"unexpected proxy"); |
| t->prefix().owner = this; |
| } // end of stealing loop |
| #if __TBB_TASK_GROUP_CONTEXT && TBB_USE_EXCEPTIONS |
| } TbbCatchAll( t->prefix().context ); |
| |
| if( task::state_type(t->prefix().state) == task::recycle ) { // state set by recycle_as_safe_continuation() |
| t->prefix().state = task::allocated; |
| // for safe continuation, need to atomically decrement ref_count; |
| if( SchedulerTraits::itt_possible ) |
| ITT_NOTIFY(sync_releasing, &t->prefix().ref_count); |
| if( __TBB_FetchAndDecrementWrelease(&t->prefix().ref_count)==1 ) { |
| if( SchedulerTraits::itt_possible ) |
| ITT_NOTIFY(sync_acquired, &t->prefix().ref_count); |
| }else{ |
| t = NULL; |
| } |
| } |
| goto exception_was_caught; |
| #endif /* __TBB_TASK_GROUP_CONTEXT && TBB_USE_EXCEPTIONS */ |
| done: |
| if ( !ConcurrentWaitsEnabled(parent) ) |
| parent.prefix().ref_count = 0; |
| #if TBB_USE_ASSERT |
| parent.prefix().extra_state &= ~es_ref_count_active; |
| #endif /* TBB_USE_ASSERT */ |
| innermost_running_task = old_innermost_running_task; |
| #if __TBB_TASK_GROUP_CONTEXT |
| __TBB_ASSERT(parent.prefix().context && dummy_task->prefix().context, NULL); |
| task_group_context* parent_ctx = parent.prefix().context; |
| if ( parent_ctx->my_cancellation_requested ) { |
| task_group_context::exception_container_type *pe = parent_ctx->my_exception; |
| if ( innermost_running_task == dummy_task && parent_ctx == dummy_task->prefix().context ) { |
| // We are in the outermost task dispatch loop of a master thread, and |
| // the whole task tree has been collapsed. So we may clear cancellation data. |
| parent_ctx->my_cancellation_requested = 0; |
| __TBB_ASSERT(dummy_task->prefix().context == parent_ctx || !CancellationInfoPresent(*dummy_task), |
| "Unexpected exception or cancellation data in the dummy task"); |
| // If possible, add assertion that master's dummy task context does not have children |
| } |
| if ( pe ) |
| pe->throw_self(); |
| } |
| __TBB_ASSERT(!is_worker() || !CancellationInfoPresent(*dummy_task), |
| "Worker's dummy task context modified"); |
| __TBB_ASSERT(innermost_running_task != dummy_task || !CancellationInfoPresent(*dummy_task), |
| "Unexpected exception or cancellation data in the master's dummy task"); |
| #endif /* __TBB_TASK_GROUP_CONTEXT */ |
| __TBB_ASSERT( assert_okay(), NULL ); |
| } |
| |
| } // namespace internal |
| } // namespace tbb |
| |
| #endif /* _TBB_custom_scheduler_H */ |