blob: 4a24a8c4dea16eeb8cd6241d0e0e2a8ca62ab544 [file] [log] [blame]
/*
Copyright 2005-2010 Intel Corporation. All Rights Reserved.
This file is part of Threading Building Blocks.
Threading Building Blocks is free software; you can redistribute it
and/or modify it under the terms of the GNU General Public License
version 2 as published by the Free Software Foundation.
Threading Building Blocks is distributed in the hope that it will be
useful, but WITHOUT ANY WARRANTY; without even the implied warranty
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Threading Building Blocks; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
As a special exception, you may use this file as part of a free software
library without restriction. Specifically, if other files instantiate
templates or use macros or inline functions from this file, or you compile
this file and link it with other files to produce an executable, this
file does not by itself cause the resulting executable to be covered by
the GNU General Public License. This exception does not however
invalidate any other reasons why the executable file might be covered by
the GNU General Public License.
*/
#include "tbb/tbb_stddef.h"
#if __TBB_ARENA_PER_MASTER
#include "market.h"
#include "tbb_main.h"
#include "governor.h"
#include "scheduler.h"
namespace tbb {
namespace internal {
//------------------------------------------------------------------------
// market
//------------------------------------------------------------------------
market::market ( unsigned max_num_workers, size_t stack_size )
: my_ref_count(1)
, my_stack_size(stack_size)
, my_max_num_workers(max_num_workers)
{
my_next_arena = my_arenas.begin();
// Once created RML server will start initializing workers that will need
// global market instance to get worker stack size
my_server = governor::create_rml_server( *this );
__TBB_ASSERT( my_server, "Failed to create RML server" );
}
market& market::global_market ( unsigned max_num_workers, size_t stack_size ) {
global_market_mutex_type::scoped_lock lock( theMarketMutex );
market *m = theMarket;
if ( m ) {
++m->my_ref_count;
if ( m->my_stack_size < stack_size )
runtime_warning( "Newer master request for larger stack cannot be satisfied\n" );
}
else {
max_num_workers = max( governor::default_num_threads() - 1, max_num_workers );
// at least 1 worker is required to support starvation resistant tasks
if( max_num_workers==0 ) max_num_workers = 1;
// Create the global market instance
size_t size = sizeof(market);
#if __TBB_TASK_GROUP_CONTEXT
__TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(generic_scheduler*) == sizeof(market),
"my_workers must be the last data field of the market class");
size += sizeof(generic_scheduler*) * (max_num_workers - 1);
#endif /* __TBB_TASK_GROUP_CONTEXT */
__TBB_InitOnce::add_ref();
void* storage = NFS_Allocate(size, 1, NULL);
memset( storage, 0, size );
// Initialize and publish global market
m = new (storage) market( max_num_workers, stack_size );
theMarket = m;
}
return *m;
}
void market::destroy () {
#if __TBB_COUNT_TASK_NODES
if ( my_task_node_count )
runtime_warning( "Leaked %ld task objects\n", (intptr_t)my_task_node_count );
#endif /* __TBB_COUNT_TASK_NODES */
this->~market();
NFS_Free( this );
__TBB_InitOnce::remove_ref();
}
void market::release () {
__TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
bool do_release = false;
{
global_market_mutex_type::scoped_lock lock(theMarketMutex);
if ( --my_ref_count == 0 ) {
do_release = true;
theMarket = NULL;
}
}
if( do_release )
my_server->request_close_connection();
}
arena& market::create_arena ( unsigned max_num_workers, size_t stack_size ) {
market &m = global_market( max_num_workers, stack_size ); // increases market's ref count
arena& a = arena::allocate_arena( m, min(max_num_workers, m.my_max_num_workers) );
// Add newly created arena into the existing market's list.
spin_mutex::scoped_lock lock(m.my_arenas_list_mutex);
m.my_arenas.push_front( a );
if ( m.my_arenas.size() == 1 )
m.my_next_arena = m.my_arenas.begin();
return a;
}
void market::detach_arena ( arena& a ) {
__TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
spin_mutex::scoped_lock lock(my_arenas_list_mutex);
__TBB_ASSERT( my_next_arena != my_arenas.end(), NULL );
if ( &*my_next_arena == &a )
if ( ++my_next_arena == my_arenas.end() && my_arenas.size() > 1 )
my_next_arena = my_arenas.begin();
my_arenas.remove( a );
}
arena* market::arena_in_need () {
spin_mutex::scoped_lock lock(my_arenas_list_mutex);
if ( my_arenas.empty() )
return NULL;
__TBB_ASSERT( my_next_arena != my_arenas.end(), NULL );
arena_list_type::iterator it = my_next_arena;
do {
arena& a = *it;
if ( ++it == my_arenas.end() )
it = my_arenas.begin();
if ( a.num_workers_active() < a.my_num_workers_allotted ) {
++a.my_num_threads_active;
my_next_arena = it;
return &a;
}
} while ( it != my_next_arena );
return NULL;
}
void market::update_allotment ( int max_workers ) {
unsigned carry = 0;
spin_mutex::scoped_lock lock(my_arenas_list_mutex);
arena_list_type::iterator it = my_arenas.begin();
int total_demand = my_total_demand;
max_workers = min(max_workers, total_demand);
if ( total_demand > 0 ) {
for ( ; it != my_arenas.end(); ++it ) {
arena& a = *it;
int tmp = a.my_num_workers_requested * max_workers + carry;
int allotted = tmp / total_demand;
carry = tmp % total_demand;
a.my_num_workers_allotted = min( allotted, (int)a.my_max_num_workers );
}
}
else {
for ( ; it != my_arenas.end(); ++it ) {
it->my_num_workers_allotted = 0;
}
}
}
/** The balancing algorithm may be liable to data races. However the aberrations
caused by the races are not fatal and generally only temporarily affect fairness
of the workers distribution among arenas. **/
void market::adjust_demand ( arena& a, int delta ) {
__TBB_ASSERT( theMarket, "market instance was destroyed prematurely?" );
a.my_num_workers_requested += delta;
my_total_demand += delta;
update_allotment( my_max_num_workers );
// Must be called outside of any locks
my_server->adjust_job_count_estimate( delta );
}
void market::process( job& j ) {
generic_scheduler& s = static_cast<generic_scheduler&>(j);
while ( arena *a = arena_in_need() )
a->process(s);
}
void market::cleanup( job& j ) {
__TBB_ASSERT( theMarket != this, NULL );
generic_scheduler& s = static_cast<generic_scheduler&>(j);
generic_scheduler* mine = governor::local_scheduler_if_initialized();
__TBB_ASSERT( !mine || mine->arena_index!=0, NULL );
if( mine!=&s ) {
governor::assume_scheduler( &s );
generic_scheduler::cleanup_worker( &s, mine!=NULL );
governor::assume_scheduler( mine );
} else {
generic_scheduler::cleanup_worker( &s, true );
}
}
void market::acknowledge_close_connection() {
destroy();
}
::rml::job* market::create_one_job() {
unsigned index = ++my_num_workers;
__TBB_ASSERT( index > 0, NULL );
// index serves as a hint decreasing conflicts between workers when they migrate between arenas
generic_scheduler* s = generic_scheduler::create_worker( *this, index );
#if __TBB_TASK_GROUP_CONTEXT
__TBB_ASSERT( !my_workers[index - 1], NULL );
my_workers[index - 1] = s;
#endif /* __TBB_TASK_GROUP_CONTEXT */
governor::sign_on(s);
return s;
}
#if __TBB_TASK_GROUP_CONTEXT
/** Propagates cancellation down the tree of dependent contexts by walking each
thread's local list of contexts **/
void market::propagate_cancellation ( task_group_context& ctx ) {
__TBB_ASSERT ( ctx.my_cancellation_requested, "No cancellation request in the context" );
// The whole propagation algorithm is under the lock in order to ensure correctness
// in case of parallel cancellations at the different levels of the context tree.
// See the note 1 at the bottom of this file.
global_market_mutex_type::scoped_lock lock(theMarketMutex);
// Advance global cancellation epoch
uintptr_t global_epoch = __TBB_FetchAndAddWrelease(&global_cancel_count, 1);
// Propagate to all workers and masters and sync up their local epochs with the global one
unsigned num_workers = my_num_workers;
for ( unsigned i = 0; i < num_workers; ++i ) {
generic_scheduler *s = my_workers[i];
// If the worker is only about to be registered, skip it.
if ( s ) {
s->propagate_cancellation();
s->local_cancel_count = global_epoch;
}
}
arena_list_type::iterator it = my_arenas.begin();
for ( ; it != my_arenas.end(); ++it ) {
generic_scheduler *s = it->slot[0].my_scheduler;
// If the master is under construction, skip it.
if ( s ) {
s->propagate_cancellation();
s->local_cancel_count = global_epoch;
}
}
}
#endif /* __TBB_TASK_GROUP_CONTEXT */
#if __TBB_COUNT_TASK_NODES
intptr_t market::workers_task_node_count() {
intptr_t result = 0;
spin_mutex::scoped_lock lock(my_arenas_list_mutex);
for ( arena_list_type::iterator it = my_arenas.begin(); it != my_arenas.end(); ++it )
result += it->workers_task_node_count();
return result;
}
#endif /* __TBB_COUNT_TASK_NODES */
} // namespace internal
} // namespace tbb
#endif /* __TBB_ARENA_PER_MASTER */
/*
Notes:
1. Consider parallel cancellations at the different levels of the context tree:
Ctx1 <- Cancelled by Thread1 |- Thread2 started processing
| |
Ctx2 |- Thread1 started processing
| T1 |- Thread2 finishes and syncs up local counters
Ctx3 <- Cancelled by Thread2 |
| |- Ctx5 is bound to Ctx2
Ctx4 |
T2 |- Thread1 reaches Ctx2
Thread-propagator of each cancellation increments global counter. However the thread
propagating the cancellation from the outermost context (Thread1) may be the last
to finish. Which means that the local counters may be synchronized earlier (by Thread2,
at Time1) than it propagated cancellation into Ctx2 (at time Time2). If a new context
(Ctx5) is created and bound to Ctx2 between Time1 and Time2, checking its parent only
(Ctx2) may result in cancellation request being lost.
This issue is solved by doing the whole propagation under the lock (the_scheduler_list_mutex).
If we need more concurrency while processing parallel cancellations, we could try
the following modification of the propagation algorithm:
advance global counter and remember it
for each thread:
scan thread's list of contexts
for each thread:
sync up its local counter only if the global counter has not been changed
However this version of the algorithm requires more analysis and verification.
*/