| /* |
| Copyright 2005-2010 Intel Corporation. All Rights Reserved. |
| |
| This file is part of Threading Building Blocks. |
| |
| Threading Building Blocks is free software; you can redistribute it |
| and/or modify it under the terms of the GNU General Public License |
| version 2 as published by the Free Software Foundation. |
| |
| Threading Building Blocks is distributed in the hope that it will be |
| useful, but WITHOUT ANY WARRANTY; without even the implied warranty |
| of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| GNU General Public License for more details. |
| |
| You should have received a copy of the GNU General Public License |
| along with Threading Building Blocks; if not, write to the Free Software |
| Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
| |
| As a special exception, you may use this file as part of a free software |
| library without restriction. Specifically, if other files instantiate |
| templates or use macros or inline functions from this file, or you compile |
| this file and link it with other files to produce an executable, this |
| file does not by itself cause the resulting executable to be covered by |
| the GNU General Public License. This exception does not however |
| invalidate any other reasons why the executable file might be covered by |
| the GNU General Public License. |
| */ |
| |
| #include "arena.h" |
| #include "governor.h" |
| #include "scheduler.h" |
| #include "itt_notify.h" |
| |
| #include <stdlib.h> // for rand() |
| |
| namespace tbb { |
| namespace internal { |
| |
| #if !__TBB_ARENA_PER_MASTER |
| //------------------------------------------------------------------------ |
| // UnpaddedArenaPrefix |
| //------------------------------------------------------------------------ |
| inline arena& UnpaddedArenaPrefix::Arena() { |
| return *static_cast<tbb::internal::arena*>(static_cast<void*>( static_cast<ArenaPrefix*>(this)+1 )); |
| } |
| |
| void UnpaddedArenaPrefix::process( job& j ) { |
| generic_scheduler& s = static_cast<generic_scheduler&>(j); |
| __TBB_ASSERT( governor::is_set(&s), NULL ); |
| __TBB_ASSERT( !s.innermost_running_task, NULL ); |
| // Try to steal a task. |
| // Passing reference count is technically unnecessary in this context, |
| // but omitting it here would add checks inside the function. |
| task* t = s.receive_or_steal_task( s.dummy_task->prefix().ref_count, /*return_if_no_work=*/true ); |
| if (t) { |
| // A side effect of receive_or_steal_task is that innermost_running_task can be set. |
| // But for the outermost dispatch loop of a worker it has to be NULL. |
| s.innermost_running_task = NULL; |
| s.local_wait_for_all(*s.dummy_task,t); |
| } |
| __TBB_ASSERT( s.inbox.assert_is_idle(true), NULL ); |
| __TBB_ASSERT( !s.innermost_running_task, NULL ); |
| } |
| |
| void UnpaddedArenaPrefix::cleanup( job& j ) { |
| generic_scheduler& s = static_cast<generic_scheduler&>(j); |
| if( !governor::is_set( &s ) ) { |
| bool is_master = governor::is_set( NULL ); |
| governor::assume_scheduler( &s ); |
| generic_scheduler::cleanup_worker( &s, !is_master ); |
| governor::assume_scheduler( NULL ); |
| } else { |
| generic_scheduler::cleanup_worker( &s, true ); |
| } |
| } |
| |
| void UnpaddedArenaPrefix::acknowledge_close_connection() { |
| Arena().free_arena(); |
| } |
| |
| ::rml::job* UnpaddedArenaPrefix::create_one_job() { |
| generic_scheduler* s = generic_scheduler::create_worker( Arena(), next_job_index++ ); |
| governor::sign_on(s); |
| return s; |
| } |
| #endif /* !__TBB_ARENA_PER_MASTER */ |
| |
| //------------------------------------------------------------------------ |
| // arena |
| //------------------------------------------------------------------------ |
| |
| #if __TBB_ARENA_PER_MASTER |
| |
| void arena::process( generic_scheduler& s ) { |
| __TBB_ASSERT( governor::is_set(&s), NULL ); |
| __TBB_ASSERT( !s.innermost_running_task, NULL ); |
| |
| __TBB_ASSERT( my_num_slots != 1, NULL ); |
| // Start search for an empty slot from the one we occupied the last time |
| unsigned index = s.arena_index < my_num_slots ? s.arena_index : s.random.get() % (my_num_slots - 1) + 1, |
| end = index; |
| __TBB_ASSERT( index != 0, "A worker cannot occupy slot 0" ); |
| __TBB_ASSERT( index < my_num_slots, NULL ); |
| |
| // Find a vacant slot |
| for ( ;; ) { |
| if ( !slot[index].my_scheduler && __TBB_CompareAndSwapW( &slot[index].my_scheduler, (intptr_t)&s, 0 ) == 0 ) |
| break; |
| if ( ++index == my_num_slots ) |
| index = 1; |
| if ( index == end ) { |
| // Likely this arena is already saturated |
| if ( --my_num_threads_active == 0 ) |
| close_arena(); |
| return; |
| } |
| } |
| ITT_NOTIFY(sync_acquired, &slot[index]); |
| s.my_arena = this; |
| s.arena_index = index; |
| s.attach_mailbox( affinity_id(index+1) ); |
| |
| slot[index].hint_for_push = index ^ unsigned(&s-(generic_scheduler*)NULL)>>16; // randomizer seed |
| slot[index].hint_for_pop = index; // initial value for round-robin |
| |
| unsigned new_limit = index + 1; |
| unsigned old_limit = my_limit; |
| while ( new_limit > old_limit ) { |
| if ( my_limit.compare_and_swap(new_limit, old_limit) == old_limit ) |
| break; |
| old_limit = my_limit; |
| } |
| |
| unsigned num_threads_left; |
| for ( ;; ) { |
| // Try to steal a task. |
| // Passing reference count is technically unnecessary in this context, |
| // but omitting it here would add checks inside the function. |
| task* t = s.receive_or_steal_task( s.dummy_task->prefix().ref_count, /*return_if_no_work=*/true ); |
| if (t) { |
| // A side effect of receive_or_steal_task is that innermost_running_task can be set. |
| // But for the outermost dispatch loop of a worker it has to be NULL. |
| s.innermost_running_task = NULL; |
| s.local_wait_for_all(*s.dummy_task,t); |
| } |
| num_threads_left = --my_num_threads_active; |
| __TBB_ASSERT ( slot[index].head == slot[index].tail, "Worker cannot leave arena when the task pool is not empty" ); |
| __TBB_ASSERT( slot[index].task_pool == EmptyTaskPool, "Worker cannot leave arena when the task pool is not empty" ); |
| // Revalidate quitting condition |
| // This check prevents relinquishing more than necessary workers because |
| // of the non-atomicity of the decision making procedure |
| if ( num_workers_active() >= my_num_workers_allotted || !my_num_workers_requested ) |
| break; |
| // Restore ref count |
| __TBB_ASSERT( !slot[0].my_scheduler || my_num_threads_active > 0, "Who requested more workers after the last one left the dispatch loop and the master's gone?" ); |
| ++my_num_threads_active; |
| } |
| __TBB_store_with_release( slot[index].my_scheduler, (generic_scheduler*)NULL ); |
| s.inbox.detach(); |
| __TBB_ASSERT( s.inbox.assert_is_idle(true), NULL ); |
| __TBB_ASSERT( !s.innermost_running_task, NULL ); |
| if ( !num_threads_left ) |
| close_arena(); |
| } |
| |
| arena::arena ( market& m, unsigned max_num_workers ) { |
| __TBB_ASSERT( sizeof(slot[0]) % NFS_GetLineSize()==0, "arena::slot size not multiple of cache line size" ); |
| __TBB_ASSERT( (uintptr_t)this % NFS_GetLineSize()==0, "arena misaligned" ); |
| my_market = &m; |
| my_limit = 1; |
| // Two slots are mandatory: for the master, and for 1 worker (required to support starvation resistant tasks). |
| my_num_slots = max(2u, max_num_workers + 1); |
| my_max_num_workers = max_num_workers; |
| my_num_threads_active = 1; // accounts for the master |
| __TBB_ASSERT ( my_max_num_workers < my_num_slots, NULL ); |
| // Construct mailboxes. Mark internal synchronization elements for the tools. |
| for( unsigned i = 0; i < my_num_slots; ++i ) { |
| __TBB_ASSERT( !slot[i].my_scheduler && !slot[i].task_pool, NULL ); |
| ITT_SYNC_CREATE(slot + i, SyncType_Scheduler, SyncObj_WorkerTaskPool); |
| mailbox(i+1).construct(); |
| ITT_SYNC_CREATE(&mailbox(i+1), SyncType_Scheduler, SyncObj_Mailbox); |
| } |
| my_task_stream.initialize(my_num_slots); |
| ITT_SYNC_CREATE(&my_task_stream, SyncType_Scheduler, SyncObj_TaskStream); |
| my_mandatory_concurrency = false; |
| #if __TBB_TASK_GROUP_CONTEXT |
| my_master_default_ctx = NULL; |
| #endif |
| } |
| |
| arena& arena::allocate_arena( market& m, unsigned max_num_workers ) { |
| __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); |
| __TBB_ASSERT( sizeof(base_type) % NFS_GetLineSize() == 0, "arena slots area misaligned: wrong padding" ); |
| __TBB_ASSERT( sizeof(mail_outbox) == NFS_MaxLineSize, "Mailbox padding is wrong" ); |
| |
| unsigned num_slots = max(2u, max_num_workers + 1); |
| size_t n = sizeof(base_type) + num_slots * (sizeof(mail_outbox) + sizeof(arena_slot)); |
| |
| unsigned char* storage = (unsigned char*)NFS_Allocate( n, 1, NULL ); |
| // Zero all slots to indicate that they are empty |
| memset( storage, 0, n ); |
| return *new( storage + num_slots * sizeof(mail_outbox) ) arena(m, max_num_workers); |
| } |
| |
| void arena::free_arena () { |
| __TBB_ASSERT( !my_num_threads_active, "There are threads in the dying arena" ); |
| intptr_t drained = 0; |
| for ( unsigned i = 1; i <= my_num_slots; ++i ) |
| drained += mailbox(i).drain(); |
| __TBB_ASSERT(my_task_stream.empty() && my_task_stream.drain()==0, "Not all enqueued tasks were executed"); |
| #if __TBB_COUNT_TASK_NODES |
| my_market->update_task_node_count( -drained ); |
| #endif /* __TBB_COUNT_TASK_NODES */ |
| my_market->release(); |
| #if __TBB_TASK_GROUP_CONTEXT |
| __TBB_ASSERT( my_master_default_ctx, "Master thread never entered the arena?" ); |
| my_master_default_ctx->~task_group_context(); |
| NFS_Free(my_master_default_ctx); |
| #endif /* __TBB_TASK_GROUP_CONTEXT */ |
| void* storage = &mailbox(my_num_slots); |
| this->~arena(); |
| NFS_Free( storage ); |
| } |
| |
| #else /* !__TBB_ARENA_PER_MASTER */ |
| |
| arena* arena::allocate_arena( unsigned number_of_slots, unsigned number_of_workers, stack_size_type stack_size ) { |
| __TBB_ASSERT( sizeof(ArenaPrefix) % NFS_GetLineSize()==0, "ArenaPrefix not multiple of cache line size" ); |
| __TBB_ASSERT( sizeof(mail_outbox)==NFS_MaxLineSize, NULL ); |
| __TBB_ASSERT( stack_size>0, NULL ); |
| |
| size_t n = sizeof(ArenaPrefix) + number_of_slots*(sizeof(mail_outbox)+sizeof(arena_slot)); |
| |
| unsigned char* storage = (unsigned char*)NFS_Allocate( n, 1, NULL ); |
| // Zero all slots to indicate that they are empty |
| memset( storage, 0, n ); |
| arena* a = (arena*)(storage + sizeof(ArenaPrefix)+ number_of_slots*(sizeof(mail_outbox))); |
| __TBB_ASSERT( sizeof(a->slot[0]) % NFS_GetLineSize()==0, "arena::slot size not multiple of cache line size" ); |
| __TBB_ASSERT( (uintptr_t)a % NFS_GetLineSize()==0, NULL ); |
| new( &a->prefix() ) ArenaPrefix( number_of_slots, number_of_workers ); |
| |
| // Allocate the worker_list |
| WorkerDescriptor * w = new WorkerDescriptor[number_of_workers]; |
| memset( w, 0, sizeof(WorkerDescriptor)*(number_of_workers)); |
| a->prefix().worker_list = w; |
| |
| // Construct mailboxes. |
| for( unsigned j=1; j<=number_of_slots; ++j ) |
| a->mailbox(j).construct(); |
| |
| a->prefix().stack_size = stack_size; |
| size_t k; |
| // Mark each internal sync element for the tools |
| for( k=0; k<number_of_workers; ++k ) { |
| ITT_SYNC_CREATE(a->slot + k, SyncType_Scheduler, SyncObj_WorkerTaskPool); |
| ITT_SYNC_CREATE(&w[k].scheduler, SyncType_Scheduler, SyncObj_WorkerLifeCycleMgmt); |
| ITT_SYNC_CREATE(&a->mailbox(k+1), SyncType_Scheduler, SyncObj_Mailbox); |
| } |
| for( ; k<number_of_slots; ++k ) { |
| ITT_SYNC_CREATE(a->slot + k, SyncType_Scheduler, SyncObj_MasterTaskPool); |
| ITT_SYNC_CREATE(&a->mailbox(k+1), SyncType_Scheduler, SyncObj_Mailbox); |
| } |
| |
| return a; |
| } |
| |
| void arena::free_arena () { |
| // Drain mailboxes |
| // TODO: each scheduler should plug-and-drain its own mailbox when it terminates. |
| intptr_t drain_count = 0; |
| for( unsigned i=1; i<=prefix().number_of_slots; ++i ) |
| drain_count += mailbox(i).drain(); |
| #if __TBB_COUNT_TASK_NODES |
| prefix().task_node_count -= drain_count; |
| if( prefix().task_node_count ) { |
| runtime_warning( "Leaked %ld task objects\n", long(prefix().task_node_count) ); |
| } |
| #endif /* __TBB_COUNT_TASK_NODES */ |
| void* storage = &mailbox(prefix().number_of_slots); |
| delete[] prefix().worker_list; |
| prefix().~ArenaPrefix(); |
| NFS_Free( storage ); |
| } |
| |
| #endif /* !__TBB_ARENA_PER_MASTER */ |
| |
| bool arena::is_out_of_work() { |
| // TODO: rework it to return at least a hint about where a task was found; better if the task itself. |
| for(;;) { |
| pool_state_t snapshot = prefix().pool_state; |
| switch( snapshot ) { |
| case SNAPSHOT_EMPTY: |
| #if !__TBB_ARENA_PER_MASTER |
| case SNAPSHOT_SERVER_GOING_AWAY: |
| #endif /* !__TBB_ARENA_PER_MASTER */ |
| return true; |
| case SNAPSHOT_FULL: { |
| // Use unique id for "busy" in order to avoid ABA problems. |
| const pool_state_t busy = pool_state_t(this); |
| // Request permission to take snapshot |
| if( prefix().pool_state.compare_and_swap( busy, SNAPSHOT_FULL )==SNAPSHOT_FULL ) { |
| // Got permission. Take the snapshot. |
| #if __TBB_ARENA_PER_MASTER |
| size_t n = my_limit; |
| #else /* !__TBB_ARENA_PER_MASTER */ |
| size_t n = prefix().limit; |
| #endif /* !__TBB_ARENA_PER_MASTER */ |
| size_t k; |
| for( k=0; k<n; ++k ) |
| if( slot[k].task_pool != EmptyTaskPool && slot[k].head < slot[k].tail ) |
| break; |
| bool work_absent = k>=n; |
| #if __TBB_ARENA_PER_MASTER |
| work_absent = work_absent && my_task_stream.empty(); |
| #endif /* __TBB_ARENA_PER_MASTER */ |
| // Test and test-and-set. |
| if( prefix().pool_state==busy ) { |
| if( work_absent ) { |
| #if __TBB_ARENA_PER_MASTER |
| // save current demand value before setting SNAPSHOT_EMPTY, |
| // to avoid race with advertise_new_work. |
| int current_demand = (int)my_max_num_workers; |
| #endif |
| if( prefix().pool_state.compare_and_swap( SNAPSHOT_EMPTY, busy )==busy ) { |
| // This thread transitioned pool to empty state, and thus is responsible for |
| // telling RML that there is no other work to do. |
| #if __TBB_ARENA_PER_MASTER |
| my_market->adjust_demand( *this, -current_demand ); |
| #else /* !__TBB_ARENA_PER_MASTER */ |
| prefix().server->adjust_job_count_estimate( -int(prefix().number_of_workers) ); |
| #endif /* !__TBB_ARENA_PER_MASTER */ |
| return true; |
| } |
| } else { |
| // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it. |
| prefix().pool_state.compare_and_swap( SNAPSHOT_FULL, busy ); |
| } |
| } |
| } |
| return false; |
| } |
| default: |
| // Another thread is taking a snapshot. |
| return false; |
| } |
| } |
| } |
| |
| void arena::close_arena () { |
| #if __TBB_ARENA_PER_MASTER |
| my_market->detach_arena( *this ); |
| free_arena(); |
| #else /* !__TBB_ARENA_PER_MASTER */ |
| for(;;) { |
| pool_state_t snapshot = prefix().pool_state; |
| if( snapshot==SNAPSHOT_SERVER_GOING_AWAY ) |
| break; |
| if( prefix().pool_state.compare_and_swap( SNAPSHOT_SERVER_GOING_AWAY, snapshot )==snapshot ) { |
| if( snapshot!=SNAPSHOT_EMPTY ) |
| prefix().server->adjust_job_count_estimate( -int(prefix().number_of_workers) ); |
| break; |
| } |
| } |
| prefix().server->request_close_connection(); |
| #endif /* !__TBB_ARENA_PER_MASTER */ |
| } |
| |
| #if __TBB_COUNT_TASK_NODES |
| intptr_t arena::workers_task_node_count() { |
| intptr_t result = 0; |
| #if __TBB_ARENA_PER_MASTER |
| for( unsigned i = 1; i < my_num_slots; ++i ) { |
| generic_scheduler* s = slot[i].my_scheduler; |
| #else /* !__TBB_ARENA_PER_MASTER */ |
| for( unsigned i=0; i<prefix().number_of_workers; ++i ) { |
| generic_scheduler* s = prefix().worker_list[i].scheduler; |
| #endif /* !__TBB_ARENA_PER_MASTER */ |
| if( s ) |
| result += s->task_node_count; |
| } |
| return result; |
| } |
| #endif /* __TBB_COUNT_TASK_NODES */ |
| |
| } // namespace internal |
| } // namespace tbb |