| /* |
| Copyright 2005-2010 Intel Corporation. All Rights Reserved. |
| |
| This file is part of Threading Building Blocks. |
| |
| Threading Building Blocks is free software; you can redistribute it |
| and/or modify it under the terms of the GNU General Public License |
| version 2 as published by the Free Software Foundation. |
| |
| Threading Building Blocks is distributed in the hope that it will be |
| useful, but WITHOUT ANY WARRANTY; without even the implied warranty |
| of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| GNU General Public License for more details. |
| |
| You should have received a copy of the GNU General Public License |
| along with Threading Building Blocks; if not, write to the Free Software |
| Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
| |
| As a special exception, you may use this file as part of a free software |
| library without restriction. Specifically, if other files instantiate |
| templates or use macros or inline functions from this file, or you compile |
| this file and link it with other files to produce an executable, this |
| file does not by itself cause the resulting executable to be covered by |
| the GNU General Public License. This exception does not however |
| invalidate any other reasons why the executable file might be covered by |
| the GNU General Public License. |
| */ |
| |
| #include "rml_tbb.h" |
| #define private public /* Sleazy trick to avoid publishing internal names in public header. */ |
| #include "rml_omp.h" |
| #undef private |
| |
| #include "tbb/tbb_allocator.h" |
| #include "tbb/cache_aligned_allocator.h" |
| #include "tbb/aligned_space.h" |
| #include "tbb/atomic.h" |
| #include "tbb/spin_mutex.h" |
| #include "tbb/tbb_misc.h" // Get DetectNumberOfWorkers() from here. |
| #if _MSC_VER==1500 && !defined(__INTEL_COMPILER) |
| // VS2008/VC9 seems to have an issue; |
| #pragma warning( push ) |
| #pragma warning( disable: 4985 ) |
| #endif |
| #include "tbb/concurrent_vector.h" |
| #if _MSC_VER==1500 && !defined(__INTEL_COMPILER) |
| #pragma warning( pop ) |
| #endif |
| #if _MSC_VER && defined(_Wp64) |
| // Workaround for overzealous compiler warnings |
| #pragma warning (push) |
| #pragma warning (disable: 4244) |
| #endif |
| |
| #include "job_automaton.h" |
| #include "wait_counter.h" |
| #include "thread_monitor.h" |
| |
| #if RML_USE_WCRM |
| #include <concrt.h> |
| #include <concrtrm.h> |
| using namespace Concurrency; |
| #include <vector> |
| #include <hash_map> |
| #define __RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED 0 |
| #endif /* RML_USE_WCRM */ |
| |
| #define STRINGIFY(x) #x |
| #define TOSTRING(x) STRINGIFY(x) |
| |
| namespace rml { |
| namespace internal { |
| |
| //! Number of hardware contexts |
| static inline unsigned hardware_concurrency() { |
| static unsigned DefaultNumberOfThreads = 0; |
| unsigned n = DefaultNumberOfThreads; |
| if( !n ) DefaultNumberOfThreads = n = tbb::internal::DetectNumberOfWorkers(); |
| return n; |
| } |
| |
| using tbb::internal::rml::tbb_client; |
| using tbb::internal::rml::tbb_server; |
| |
| using __kmp::rml::omp_client; |
| using __kmp::rml::omp_server; |
| |
| typedef versioned_object::version_type version_type; |
| |
| #define SERVER_VERSION 2 |
| #define EARLIEST_COMPATIBLE_CLIENT_VERSION 2 |
| |
| static const size_t cache_line_size = tbb::internal::NFS_MaxLineSize; |
| |
| template<typename Server, typename Client> class generic_connection; |
| class tbb_connection_v2; |
| class omp_connection_v2; |
| |
| #if RML_USE_WCRM |
| //! State of a server_thread |
| /** Below are diagrams of legal state transitions. |
| |
| ts_busy |
| ^ ^ |
| / \ |
| / V |
| ts_done <----- ts_asleep <------> ts_idle |
| */ |
| |
| enum thread_state_t { |
| ts_idle, |
| ts_asleep, |
| ts_busy, |
| ts_done |
| }; |
| |
| //! Extra state of an omp server thread |
| enum thread_extra_state_t { |
| ts_none, |
| ts_removed, |
| ts_lent |
| }; |
| |
| //! Results from try_grab_for() |
| enum thread_grab_t { |
| wk_failed, |
| wk_from_asleep, |
| wk_from_idle |
| }; |
| |
| #else /* !RML_USE_WCRM */ |
| |
| //! State of a server_thread |
| /** Below are diagrams of legal state transitions. |
| |
| OMP |
| ts_omp_busy |
| ^ ^ |
| / \ |
| / V |
| ts_asleep <-----------> ts_idle |
| |
| |
| ts_deactivated |
| ^ ^ |
| / \ |
| V \ |
| ts_none <--------------> ts_reactivated |
| |
| TBB |
| ts_tbb_busy |
| ^ ^ |
| / \ |
| / V |
| ts_asleep <-----------> ts_idle --> ts_done |
| |
| For TBB only. Extra state transition. |
| |
| ts_created -> ts_started -> ts_visited |
| */ |
| enum thread_state_t { |
| //! Thread not doing anything useful, but running and looking for work. |
| ts_idle, |
| //! Thread not doing anything useful and is asleep */ |
| ts_asleep, |
| //! Thread is enlisted into OpenMP team |
| ts_omp_busy, |
| //! Thread is busy doing TBB work. |
| ts_tbb_busy, |
| //! For tbb threads only |
| ts_done, |
| ts_created, |
| ts_started, |
| ts_visited, |
| //! For omp threads only |
| ts_none, |
| ts_deactivated, |
| ts_reactivated |
| }; |
| #endif /* RML_USE_WCRM */ |
| |
| #if TBB_USE_ASSERT |
| #define PRODUCE_ARG(x) ,x |
| #else |
| #define PRODUCE_ARG(x) |
| #endif /* TBB_USE_ASSERT */ |
| |
| //! Synchronizes dispatch of OpenMP work. |
| class omp_dispatch_type { |
| typedef ::rml::job job_type; |
| omp_client* client; |
| void* cookie; |
| omp_client::size_type index; |
| tbb::atomic<job_type*> job; |
| #if TBB_USE_ASSERT |
| omp_connection_v2* server; |
| #endif /* TBB_USE_ASSERT */ |
| public: |
| omp_dispatch_type() {job=NULL;} |
| void consume(); |
| void produce( omp_client& c, job_type& j, void* cookie_, omp_client::size_type index_ PRODUCE_ARG( omp_connection_v2& s )) { |
| __TBB_ASSERT( &j, NULL ); |
| __TBB_ASSERT( !job, "job already set" ); |
| client = &c; |
| #if TBB_USE_ASSERT |
| server = &s; |
| #endif /* TBB_USE_ASSERT */ |
| cookie = cookie_; |
| index = index_; |
| // Must be last |
| job = &j; |
| } |
| }; |
| |
| //! A reference count. |
| /** No default constructor, because users of ref_count must be very careful about whether the |
| initial reference count is 0 or 1. */ |
| class ref_count: no_copy { |
| friend class thread_map; |
| tbb::atomic<int> my_ref_count; |
| public: |
| ref_count(int k ) {my_ref_count=k;} |
| ~ref_count() {__TBB_ASSERT( !my_ref_count, "premature destruction of refcounted object" );} |
| //! Add one and return new value. |
| int add_ref() { |
| int k = ++my_ref_count; |
| __TBB_ASSERT(k>=1,"reference count underflowed before add_ref"); |
| return k; |
| } |
| //! Subtract one and return new value. |
| int remove_ref() { |
| int k = --my_ref_count; |
| __TBB_ASSERT(k>=0,"reference count underflow"); |
| return k; |
| } |
| }; |
| |
| #if RML_USE_WCRM |
| |
| #if USE_UMS_THREAD |
| #define RML_THREAD_KIND UmsThreadDefault |
| #define RML_THREAD_KIND_STRING "UmsThread" |
| #else |
| #define RML_THREAD_KIND ThreadScheduler |
| #define RML_THREAD_KIND_STRING "WinThread" |
| #endif |
| |
| // Forward declaration |
| class thread_map; |
| |
| static const IExecutionResource* c_remove_prepare = (IExecutionResource*)0; |
| static const IExecutionResource* c_remove_returned = (IExecutionResource*)1; |
| |
| //! Server thread representation |
| class server_thread_rep : no_copy { |
| friend class thread_map; |
| friend class omp_connection_v2; |
| friend class server_thread; |
| friend class tbb_server_thread; |
| friend class omp_server_thread; |
| template<typename Connection> friend void make_job( Connection& c, typename Connection::server_thread_type& t ); |
| typedef int thread_state_rep_t; |
| public: |
| //! Ctor |
| server_thread_rep( bool assigned, IScheduler* s, IExecutionResource* r, thread_map& map, rml::client& cl ) : |
| uid( GetExecutionContextId() ), my_scheduler(s), my_proxy(NULL), |
| my_thread_map(map), my_client(cl), my_job(NULL) |
| { |
| my_state = assigned ? ts_busy : ts_idle; |
| my_extra_state = ts_none; |
| terminate = false; |
| my_execution_resource = r; |
| } |
| //! Dtor |
| ~server_thread_rep() {} |
| |
| //! Synchronization routine |
| inline rml::job* wait_for_job() { |
| if( !my_job ) my_job = &my_job_automaton.wait_for_job(); |
| return my_job; |
| } |
| |
| // Getters and setters |
| inline thread_state_t read_state() const { thread_state_rep_t s = my_state; return static_cast<thread_state_t>(s); } |
| inline void set_state( thread_state_t to ) {my_state = to;} |
| inline void set_removed() { __TBB_ASSERT( my_extra_state==ts_none, NULL ); my_extra_state = ts_removed; } |
| inline bool is_removed() const { return my_extra_state==ts_removed; } |
| inline bool is_lent() const {return my_extra_state==ts_lent;} |
| inline void set_lent() { my_extra_state=ts_lent; } |
| inline void set_returned() { my_extra_state=ts_none; } |
| inline IExecutionResource* get_execution_resource() { return my_execution_resource; } |
| inline IVirtualProcessorRoot* get_virtual_processor() { return (IVirtualProcessorRoot*)get_execution_resource(); } |
| |
| //! Enlist the thread for work |
| inline bool wakeup( thread_state_t to, thread_state_t from ) { |
| __TBB_ASSERT( from==ts_asleep && (to==ts_idle||to==ts_busy||to==ts_done), NULL ); |
| return my_state.compare_and_swap( to, from )==from; |
| } |
| |
| //! Enlist the thread for. |
| thread_grab_t try_grab_for(); |
| |
| //! Destroy the client job associated with the thread |
| template<typename Connection> bool destroy_job( Connection* c ); |
| |
| //! Try to re-use the thread |
| void revive( IScheduler* s, IExecutionResource* r, rml::client& c ) { |
| // the variables may not have been set before a thread was told to quit |
| __TBB_ASSERT( my_scheduler==s, "my_scheduler has been altered?\n" ); |
| my_scheduler = s; |
| __TBB_ASSERT( &my_client==&c, "my_client has been altered?\n" ); |
| if( r ) my_execution_resource = r; |
| my_client = c; |
| my_state = ts_idle; |
| __TBB_ASSERT( my_extra_state==ts_removed, NULL ); |
| my_extra_state = ts_none; |
| } |
| |
| protected: |
| const int uid; |
| IScheduler* my_scheduler; |
| IThreadProxy* my_proxy; |
| tbb::atomic<IExecutionResource*> my_execution_resource; /* for non-masters, it is IVirtualProcessorRoot */ |
| thread_map& my_thread_map; |
| rml::client& my_client; |
| job* my_job; |
| job_automaton my_job_automaton; |
| tbb::atomic<bool> terminate; |
| tbb::atomic<thread_state_rep_t> my_state; |
| tbb::atomic<thread_extra_state_t> my_extra_state; |
| }; |
| |
| //! Class that implements IExecutionContext |
| class server_thread : public IExecutionContext, public server_thread_rep { |
| friend class tbb_connection_v2; |
| friend class omp_connection_v2; |
| friend class tbb_server_thread; |
| friend class omp_server_thread; |
| friend class thread_map; |
| template<typename Connection> friend void make_job( Connection& c, typename Connection::server_thread_type& t ); |
| protected: |
| server_thread( bool is_tbb, bool assigned, IScheduler* s, IExecutionResource* r, thread_map& map, rml::client& cl ) : server_thread_rep(assigned,s,r,map,cl), tbb_thread(is_tbb) {} |
| ~server_thread() {} |
| /*override*/ unsigned int GetId() const { return uid; } |
| /*override*/ IScheduler* GetScheduler() { return my_scheduler; } |
| /*override*/ IThreadProxy* GetProxy() { return my_proxy; } |
| /*override*/ void SetProxy( IThreadProxy* thr_proxy ) { my_proxy = thr_proxy; } |
| |
| private: |
| bool tbb_thread; |
| }; |
| |
| // Forward declaration |
| class tbb_connection_v2; |
| class omp_connection_v2; |
| |
| //! TBB server thread |
| class tbb_server_thread : public server_thread { |
| friend class tbb_connection_v2; |
| public: |
| tbb_server_thread( bool assigned, IScheduler* s, IExecutionResource* r, tbb_connection_v2* con, thread_map& map, rml::client& cl ) : server_thread(true,assigned,s,r,map,cl), my_conn(con) { |
| activation_count = 0; |
| } |
| ~tbb_server_thread() {} |
| /*override*/ void Dispatch( DispatchState* ); |
| inline bool initiate_termination(); |
| bool sleep_perhaps(); |
| //! Switch out this thread |
| bool switch_out(); |
| private: |
| tbb_connection_v2* my_conn; |
| public: |
| tbb::atomic<int> activation_count; |
| }; |
| |
| //! OMP server thread |
| class omp_server_thread : public server_thread { |
| friend class omp_connection_v2; |
| public: |
| omp_server_thread( bool assigned, IScheduler* s, IExecutionResource* r, omp_connection_v2* con, thread_map& map, rml::client& cl ) : |
| server_thread(false,assigned,s,r,map,cl), my_conn(con), my_cookie(NULL), my_index(UINT_MAX) {} |
| ~omp_server_thread() {} |
| /*override*/ void Dispatch( DispatchState* ); |
| inline void* get_cookie() {return my_cookie;} |
| inline ::__kmp::rml::omp_client::size_type get_index() {return my_index;} |
| |
| inline IExecutionResource* get_execution_resource() { return get_execution_resource(); } |
| inline bool initiate_termination() { return destroy_job( (omp_connection_v2*) my_conn ); } |
| void sleep_perhaps(); |
| private: |
| omp_connection_v2* my_conn; |
| void* my_cookie; |
| ::__kmp::rml::omp_client::size_type my_index; |
| omp_dispatch_type omp_data; |
| }; |
| |
| //! Class that implements IScheduler |
| template<typename Connection> |
| class scheduler : no_copy, public IScheduler { |
| public: |
| /*override*/ unsigned int GetId() const {return uid;} |
| /*override*/ void Statistics( unsigned int* /*pTaskCompletionRate*/, unsigned int* /*pTaskArrivalRate*/, unsigned int* /*pNumberOfTaskEnqueued*/) {} |
| /*override*/ SchedulerPolicy GetPolicy() const { __TBB_ASSERT(my_policy,NULL); return *my_policy; } |
| /*override*/ void AddVirtualProcessors( IVirtualProcessorRoot** vproots, unsigned int count ) { if( !my_conn.is_closing() ) my_conn.add_virtual_processors( vproots, count); } |
| /*override*/ void RemoveVirtualProcessors( IVirtualProcessorRoot** vproots, unsigned int count ); |
| /*override*/ void NotifyResourcesExternallyIdle( IVirtualProcessorRoot** vproots, unsigned int count ) { __TBB_ASSERT( false, "This call is not allowed for TBB" ); } |
| /*override*/ void NotifyResourcesExternallyBusy( IVirtualProcessorRoot** vproots, unsigned int count ) { __TBB_ASSERT( false, "This call is not allowed for TBB" ); } |
| protected: |
| scheduler( Connection& conn ); |
| virtual ~scheduler() { __TBB_ASSERT( my_policy, NULL ); delete my_policy; } |
| |
| public: |
| static scheduler* create( Connection& conn ) {return new scheduler( conn );} |
| |
| private: |
| const int uid; |
| Connection& my_conn; |
| SchedulerPolicy* my_policy; |
| }; |
| |
| |
| /* |
| * --> ts_busy --> ts_done |
| */ |
| class thread_scavenger_thread : public IExecutionContext, no_copy { |
| public: |
| thread_scavenger_thread( IScheduler* s, IVirtualProcessorRoot* r, thread_map& map ) : |
| uid( GetExecutionContextId() ), my_scheduler(s), my_virtual_processor_root(r), my_proxy(NULL), my_thread_map(map) |
| { |
| my_state = ts_busy; |
| #if TBB_USE_ASSERT |
| activation_count = 0; |
| #endif |
| } |
| ~thread_scavenger_thread() {} |
| /*override*/ unsigned int GetId() const { return uid; } |
| /*override*/ IScheduler* GetScheduler() { return my_scheduler; } |
| /*override*/ IThreadProxy* GetProxy() { return my_proxy; } |
| /*override*/ void SetProxy( IThreadProxy* thr_proxy ) { my_proxy = thr_proxy; } |
| /*override*/ void Dispatch( DispatchState* ); |
| inline thread_state_t read_state() { return my_state; } |
| inline void set_state( thread_state_t s ) { my_state = s; } |
| inline IVirtualProcessorRoot* get_virtual_processor() { return my_virtual_processor_root; } |
| private: |
| const int uid; |
| IScheduler* my_scheduler; |
| IVirtualProcessorRoot* my_virtual_processor_root; |
| IThreadProxy* my_proxy; |
| thread_map& my_thread_map; |
| tbb::atomic<thread_state_t> my_state; |
| #if TBB_USE_ASSERT |
| public: |
| tbb::atomic<int> activation_count; |
| #endif |
| }; |
| |
| static const thread_scavenger_thread* c_claimed = reinterpret_cast<thread_scavenger_thread*>(1); |
| |
| struct garbage_connection_queue { |
| tbb::atomic<uintptr_t> head; |
| tbb::atomic<uintptr_t> tail; |
| static const uintptr_t empty = 0; // connection scavenger thread empty list |
| static const uintptr_t plugged = 1; // end of use of the list |
| static const uintptr_t plugged_acked = 2; // connection scavenger saw the plugged flag, and it freed all connections |
| }; |
| |
| //! Connection scavenger |
| /** It collects closed connection objects, wait for worker threads belonging to the connection to return to ConcRT RM |
| * then return the object to the memory manager. |
| */ |
| class connection_scavenger_thread { |
| friend void assist_cleanup_connections(); |
| /* |
| * connection_scavenger_thread's state |
| * ts_busy <----> ts_asleep <-- |
| */ |
| tbb::atomic<thread_state_t> state; |
| |
| /* We steal two bits from a connection pointer to encode |
| * whether the connection is for TBB or for OMP. |
| * |
| * ---------------------------------- |
| * | | | | |
| * ---------------------------------- |
| * ^ ^ |
| * / | |
| * 1 : tbb, 0 : omp | |
| * if set, terminate |
| */ |
| // FIXME: pad these? |
| thread_monitor monitor; |
| int default_concurrency; |
| HANDLE thr_handle; |
| #if TBB_USE_ASSERT |
| tbb::atomic<int> n_scavenger_threads; |
| #endif |
| |
| public: |
| connection_scavenger_thread() : thr_handle(NULL) { |
| state = ts_asleep; |
| #if TBB_USE_ASSERT |
| n_scavenger_threads = 0; |
| #endif |
| } |
| |
| ~connection_scavenger_thread() {} |
| |
| void wakeup() { |
| if( state.compare_and_swap( ts_busy, ts_asleep )==ts_asleep ) |
| monitor.notify(); |
| } |
| |
| void sleep_perhaps(); |
| |
| void process_requests( uintptr_t conn_ex ); |
| |
| static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg ); |
| |
| void launch( int dc ) { |
| default_concurrency = dc; |
| thread_monitor::launch( connection_scavenger_thread::thread_routine, this, NULL ); |
| } |
| |
| template<typename Server, typename Client> |
| void add_request( generic_connection<Server,Client>* conn_to_close ); |
| |
| template<typename Server, typename Client> |
| uintptr_t grab_and_prepend( generic_connection<Server,Client>* last_conn_to_close ); |
| }; |
| |
| void free_all_connections( uintptr_t ); |
| |
| #endif /* RML_USE_WCRM */ |
| |
| #if !RML_USE_WCRM |
| class server_thread; |
| |
| //! thread_map_base; we need to make the iterator type available to server_thread |
| struct thread_map_base { |
| //! A value in the map |
| class value_type { |
| public: |
| server_thread& thread() { |
| __TBB_ASSERT( my_thread, "thread_map::value_type::thread() called when !my_thread" ); |
| return *my_thread; |
| } |
| rml::job& job() { |
| __TBB_ASSERT( my_job, "thread_map::value_type::job() called when !my_job" ); |
| return *my_job; |
| } |
| value_type() : my_thread(NULL), my_job(NULL) {} |
| server_thread& wait_for_thread() const { |
| for(;;) { |
| server_thread* ptr=const_cast<server_thread*volatile&>(my_thread); |
| if( ptr ) |
| return *ptr; |
| __TBB_Yield(); |
| } |
| } |
| /** Shortly after when a connection is established, it is possible for the server |
| to grab a server_thread that has not yet created a job object for that server. */ |
| rml::job& wait_for_job() const { |
| if( !my_job ) { |
| my_job = &my_automaton.wait_for_job(); |
| } |
| return *my_job; |
| } |
| private: |
| server_thread* my_thread; |
| /** Marked mutable because though it is physically modified, conceptually it is a duplicate of |
| the job held by job_automaton. */ |
| mutable rml::job* my_job; |
| job_automaton my_automaton; |
| // FIXME - pad out to cache line, because my_automaton is hit hard by thread() |
| friend class thread_map; |
| }; |
| typedef tbb::concurrent_vector<value_type,tbb::zero_allocator<value_type,tbb::cache_aligned_allocator> > array_type; |
| }; |
| #endif /* !RML_USE_WCRM */ |
| |
| #if _MSC_VER && !defined(__INTEL_COMPILER) |
| // Suppress overzealous compiler warnings about uninstantiatble class |
| #pragma warning(push) |
| #pragma warning(disable:4510 4610) |
| #endif |
| |
| template<typename T> |
| class padded: public T { |
| char pad[cache_line_size - sizeof(T)%cache_line_size]; |
| }; |
| |
| #if _MSC_VER && !defined(__INTEL_COMPILER) |
| #pragma warning(pop) |
| #endif |
| |
| // FIXME - should we pad out memory to avoid false sharing of our global variables? |
| static tbb::atomic<int> the_balance; |
| static tbb::atomic<int> the_balance_inited; |
| |
| #if !RML_USE_WCRM |
| //! Per thread information |
| /** ref_count holds number of clients that are using this, |
| plus 1 if a host thread owns this instance. */ |
| class server_thread: public ref_count { |
| friend class thread_map; |
| template<typename Server, typename Client> friend class generic_connection; |
| friend class tbb_connection_v2; |
| friend class omp_connection_v2; |
| //! Integral type that can hold a thread_state_t |
| typedef int thread_state_rep_t; |
| tbb::atomic<thread_state_rep_t> state; |
| public: |
| thread_monitor monitor; |
| private: |
| bool is_omp_thread; |
| tbb::atomic<thread_state_rep_t> my_extra_state; |
| server_thread* link; |
| thread_map_base::array_type::iterator my_map_pos; |
| rml::server *my_conn; |
| rml::job* my_job; |
| job_automaton* my_ja; |
| size_t my_index; |
| tbb::atomic<bool> terminate; |
| omp_dispatch_type omp_dispatch; |
| |
| #if TBB_USE_ASSERT |
| //! Flag used to check if thread is still using *this. |
| bool has_active_thread; |
| #endif /* TBB_USE_ASSERT */ |
| |
| //! Volunteer to sleep. |
| void sleep_perhaps( thread_state_t asleep ); |
| |
| //! Destroy job corresponding to given client |
| /** Return true if thread must quit. */ |
| template<typename Connection> |
| bool destroy_job( Connection& c ); |
| |
| //! Do terminate the thread |
| /** Return true if thread must quit. */ |
| bool do_termination(); |
| |
| void loop(); |
| static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg ); |
| |
| public: |
| server_thread(); |
| |
| ~server_thread(); |
| |
| //! Read the thread state |
| thread_state_t read_state() const { |
| thread_state_rep_t s = state; |
| __TBB_ASSERT( unsigned(s)<=unsigned(ts_done), "corrupted server thread?" ); |
| return thread_state_t(s); |
| } |
| |
| //! Read the tbb-specific extra thread state |
| thread_state_t read_extra_state() const { |
| thread_state_rep_t s = my_extra_state; |
| return thread_state_t(s); |
| } |
| |
| //! Launch a thread that is bound to *this. |
| void launch( size_t stack_size ); |
| |
| //! Attempt to wakeup a thread |
| /** The value "to" is the new state for the thread, if it was woken up. |
| Returns true if thread was woken up, false otherwise. */ |
| bool wakeup( thread_state_t to, thread_state_t from ); |
| |
| //! Attempt to enslave a thread for OpenMP/TBB. |
| /** Returns true if state is successfully changed. 's' takes either ts_omp_busy or ts_tbb_busy */ |
| bool try_grab_for( thread_state_t s ); |
| |
| #if _WIN32||_WIN64 |
| //! Send the worker thread to sleep temporarily |
| void deactivate(); |
| |
| //! Wake the worker thread up |
| void reactivate(); |
| #endif /* _WIN32||_WIN64 */ |
| }; |
| |
| //! Bag of threads that are private to a client. |
| class private_thread_bag { |
| struct list_thread: server_thread { |
| list_thread* next; |
| }; |
| //! Root of atomic linked list of list_thread |
| /** ABA problem is avoided because items are only atomically pushed, never popped. */ |
| tbb::atomic<list_thread*> my_root; |
| tbb::cache_aligned_allocator<padded<list_thread> > my_allocator; |
| public: |
| //! Construct empty bag |
| private_thread_bag() {my_root=NULL;} |
| |
| //! Create a fresh server_thread object. |
| server_thread& add_one_thread() { |
| list_thread* t = my_allocator.allocate(1); |
| new( t ) list_thread; |
| // Atomically add to list |
| list_thread* old_root; |
| do { |
| old_root = my_root; |
| t->next = old_root; |
| } while( my_root.compare_and_swap( t, old_root )!=old_root ); |
| return *t; |
| } |
| |
| //! Destroy the bag and threads in it. |
| ~private_thread_bag() { |
| while( my_root ) { |
| // Unlink thread from list. |
| list_thread* t = my_root; |
| my_root = t->next; |
| // Destroy and deallocate the thread. |
| t->~list_thread(); |
| my_allocator.deallocate(static_cast<padded<list_thread>*>(t),1); |
| } |
| } |
| }; |
| |
| |
| //! Forward declaration |
| void wakeup_some_tbb_threads(); |
| |
| //! Type-independent part of class generic_connection. |
| /** One to one map from server threads to jobs, and associated reference counting. */ |
| class thread_map : public thread_map_base { |
| public: |
| typedef rml::client::size_type size_type; |
| //! ctor |
| thread_map( wait_counter& fc, ::rml::client& client ) : |
| all_visited_at_least_once(false), my_min_stack_size(0), my_server_ref_count(1), |
| my_client_ref_count(1), my_client(client), my_factory_counter(fc) |
| { my_unrealized_threads = 0; } |
| //! dtor |
| ~thread_map() {} |
| typedef array_type::iterator iterator; |
| iterator begin() {return my_array.begin();} |
| iterator end() {return my_array.end();} |
| void bind(); |
| void unbind(); |
| void assist_cleanup( bool assist_null_only ); |
| |
| /** Returns number of unrealized threads to create. */ |
| size_type wakeup_tbb_threads( size_type n ); |
| bool wakeup_next_thread( iterator i, tbb_connection_v2& conn ); |
| void release_tbb_threads( server_thread* t ); |
| void adjust_balance( int delta ); |
| |
| //! Add a server_thread object to the map, but do not bind it. |
| /** Return NULL if out of unrealized threads. */ |
| value_type* add_one_thread( bool is_omp_thread_ ); |
| |
| void bind_one_thread( rml::server& server, value_type& x ); |
| |
| void remove_client_ref(); |
| int add_server_ref() {return my_server_ref_count.add_ref();} |
| int remove_server_ref() {return my_server_ref_count.remove_ref();} |
| |
| ::rml::client& client() const {return my_client;} |
| |
| size_type get_unrealized_threads() { return my_unrealized_threads; } |
| |
| private: |
| private_thread_bag my_private_threads; |
| bool all_visited_at_least_once; |
| array_type my_array; |
| size_t my_min_stack_size; |
| tbb::atomic<size_type> my_unrealized_threads; |
| |
| //! Number of threads referencing *this, plus one extra. |
| /** When it becomes zero, the containing server object can be safely deleted. */ |
| ref_count my_server_ref_count; |
| |
| //! Number of jobs that need cleanup, plus one extra. |
| /** When it becomes zero, acknowledge_close_connection is called. */ |
| ref_count my_client_ref_count; |
| |
| ::rml::client& my_client; |
| //! Counter owned by factory that produced this thread_map. |
| wait_counter& my_factory_counter; |
| }; |
| |
| void thread_map::bind_one_thread( rml::server& server, value_type& x ) { |
| // Add one to account for the thread referencing this map hereforth. |
| server_thread& t = x.thread(); |
| my_server_ref_count.add_ref(); |
| my_client_ref_count.add_ref(); |
| #if TBB_USE_ASSERT |
| __TBB_ASSERT( t.add_ref()==1, NULL ); |
| #else |
| t.add_ref(); |
| #endif |
| // Have responsibility to start the thread. |
| t.my_conn = &server; |
| t.my_ja = &x.my_automaton; |
| t.launch( my_min_stack_size ); |
| /* Must wake thread up so it can fill in its "my_job" field in *this. |
| Otherwise deadlock can occur where wait_for_job spins on thread that is sleeping. */ |
| __TBB_ASSERT( t.state!=ts_tbb_busy, NULL ); |
| t.wakeup( ts_idle, ts_asleep ); |
| } |
| |
| thread_map::value_type* thread_map::add_one_thread( bool is_omp_thread_ ) { |
| size_type u; |
| do { |
| u = my_unrealized_threads; |
| if( !u ) return NULL; |
| } while( my_unrealized_threads.compare_and_swap(u-1,u)!=u ); |
| server_thread& t = my_private_threads.add_one_thread(); |
| t.is_omp_thread = is_omp_thread_; |
| __TBB_ASSERT( u>=1, NULL ); |
| t.my_index = u - 1; |
| __TBB_ASSERT( t.state!=ts_tbb_busy, NULL ); |
| t.my_extra_state = t.is_omp_thread ? ts_none : ts_created; |
| |
| iterator i = t.my_map_pos = my_array.grow_by(1); |
| value_type& v = *i; |
| v.my_thread = &t; |
| return &v; |
| } |
| |
| void thread_map::bind() { |
| ++my_factory_counter; |
| my_min_stack_size = my_client.min_stack_size(); |
| __TBB_ASSERT( my_unrealized_threads==0, "already called bind?" ); |
| my_unrealized_threads = my_client.max_job_count(); |
| } |
| |
| void thread_map::unbind() { |
| // Ask each server_thread to cleanup its job for this server. |
| for( iterator i=begin(); i!=end(); ++i ) { |
| server_thread& t = i->thread(); |
| t.terminate = true; |
| t.wakeup( ts_idle, ts_asleep ); |
| } |
| // Remove extra ref to client. |
| remove_client_ref(); |
| } |
| |
| void thread_map::assist_cleanup( bool assist_null_only ) { |
| // To avoid deadlock, the current thread *must* help out with cleanups that have not started, |
| // becausd the thread that created the job may be busy for a long time. |
| for( iterator i = begin(); i!=end(); ++i ) { |
| rml::job* j=0; |
| job_automaton& ja = i->my_automaton; |
| if( assist_null_only ? ja.try_plug_null() : ja.try_plug(j) ) { |
| if( j ) { |
| my_client.cleanup(*j); |
| } else { |
| // server thread did not get a chance to create a job. |
| } |
| remove_client_ref(); |
| } |
| } |
| } |
| |
| thread_map::size_type thread_map::wakeup_tbb_threads( size_type n ) { |
| __TBB_ASSERT(n>0,"must specify positive number of threads to wake up"); |
| iterator e = end(); |
| for( iterator k=begin(); k!=e; ++k ) { |
| // If another thread added *k, there is a tiny timing window where thread() is invalid. |
| server_thread& t = k->wait_for_thread(); |
| thread_state_t thr_s = t.read_state(); |
| if( t.read_extra_state()==ts_created || thr_s==ts_tbb_busy || thr_s==ts_done ) |
| continue; |
| if( --the_balance>=0 ) { // try to withdraw a coin from the deposit |
| while( !t.try_grab_for( ts_tbb_busy ) ) { |
| thr_s = t.read_state(); |
| if( thr_s==ts_tbb_busy || thr_s==ts_done ) { |
| // we lost; move on to the next. |
| ++the_balance; |
| goto skip; |
| } |
| } |
| if( --n==0 ) |
| return 0; |
| } else { |
| // overdraft. |
| ++the_balance; |
| break; |
| } |
| skip: |
| ; |
| } |
| return n<my_unrealized_threads ? n : my_unrealized_threads; |
| } |
| #else /* RML_USE_WCRM */ |
| |
| class thread_map : no_copy { |
| friend class omp_connection_v2; |
| typedef ::std::hash_map<uintptr_t,server_thread*> hash_map_type; |
| size_t my_min_stack_size; |
| size_t my_unrealized_threads; |
| ::rml::client& my_client; |
| //! Counter owned by factory that produced this thread_map. |
| wait_counter& my_factory_counter; |
| //! Ref counters |
| ref_count my_server_ref_count; |
| ref_count my_client_ref_count; |
| // FIXME: pad this? |
| hash_map_type my_map; |
| bool shutdown_in_progress; |
| std::vector<IExecutionResource*> original_exec_resources; |
| tbb::cache_aligned_allocator<padded<tbb_server_thread> > my_tbb_allocator; |
| tbb::cache_aligned_allocator<padded<omp_server_thread> > my_omp_allocator; |
| tbb::cache_aligned_allocator<padded<thread_scavenger_thread> > my_scavenger_allocator; |
| IResourceManager* my_concrt_resource_manager; |
| IScheduler* my_scheduler; |
| ISchedulerProxy* my_scheduler_proxy; |
| tbb::atomic<thread_scavenger_thread*> my_thread_scavenger_thread; |
| #if TBB_USE_ASSERT |
| tbb::atomic<int> n_add_vp_requests; |
| tbb::atomic<int> n_thread_scavengers_created; |
| #endif |
| public: |
| thread_map( wait_counter& fc, ::rml::client& client ) : |
| my_min_stack_size(0), my_client(client), my_factory_counter(fc), |
| my_server_ref_count(1), my_client_ref_count(1), shutdown_in_progress(false), |
| my_concrt_resource_manager(NULL), my_scheduler(NULL), my_scheduler_proxy(NULL) |
| { |
| my_thread_scavenger_thread = NULL; |
| #if TBB_USE_ASSERT |
| n_add_vp_requests = 0; |
| n_thread_scavengers_created; |
| #endif |
| } |
| |
| ~thread_map() { |
| __TBB_ASSERT( n_thread_scavengers_created<=1, "too many scavenger thread created" ); |
| // if thread_scavenger_thread is launched, wait for it to complete |
| if( my_thread_scavenger_thread ) { |
| __TBB_ASSERT( my_thread_scavenger_thread!=c_claimed, NULL ); |
| while( my_thread_scavenger_thread->read_state()==ts_busy ) |
| __TBB_Yield(); |
| thread_scavenger_thread* tst = my_thread_scavenger_thread; |
| my_scavenger_allocator.deallocate(static_cast<padded<thread_scavenger_thread>*>(tst),1); |
| } |
| // deallocate thread contexts |
| for( hash_map_type::const_iterator hi=my_map.begin(); hi!=my_map.end(); ++hi ) { |
| server_thread* thr = hi->second; |
| if( thr->tbb_thread ) { |
| while( ((tbb_server_thread*)thr)->activation_count>1 ) |
| __TBB_Yield(); |
| ((tbb_server_thread*)thr)->~tbb_server_thread(); |
| my_tbb_allocator.deallocate(static_cast<padded<tbb_server_thread>*>(thr),1); |
| } else { |
| ((omp_server_thread*)thr)->~omp_server_thread(); |
| my_omp_allocator.deallocate(static_cast<padded<omp_server_thread>*>(thr),1); |
| } |
| } |
| if( my_scheduler_proxy ) { |
| my_scheduler_proxy->Shutdown(); |
| my_concrt_resource_manager->Release(); |
| __TBB_ASSERT( my_scheduler, NULL ); |
| delete my_scheduler; |
| } else { |
| __TBB_ASSERT( !my_scheduler, NULL ); |
| } |
| } |
| typedef hash_map_type::key_type key_type; |
| typedef hash_map_type::value_type value_type; |
| typedef hash_map_type::iterator iterator; |
| iterator begin() {return my_map.begin();} |
| iterator end() {return my_map.end();} |
| iterator find( key_type k ) {return my_map.find( k );} |
| iterator insert( key_type k, server_thread* v ) { |
| std::pair<iterator,bool> res = my_map.insert( value_type(k,v) ); |
| return res.first; |
| } |
| void bind( IScheduler* s ) { |
| ++my_factory_counter; |
| if( s ) { |
| my_unrealized_threads = s->GetPolicy().GetPolicyValue( MaxConcurrency ); |
| __TBB_ASSERT( my_unrealized_threads>0, NULL ); |
| my_scheduler = s; |
| my_concrt_resource_manager = CreateResourceManager(); // reference count==3 when first created. |
| my_scheduler_proxy = my_concrt_resource_manager->RegisterScheduler( s, CONCRT_RM_VERSION_1 ); |
| my_scheduler_proxy->RequestInitialVirtualProcessors( false ); |
| } |
| } |
| bool is_closing() { return shutdown_in_progress; } |
| void unbind( rml::server& server, ::tbb::spin_mutex& mtx ); |
| void add_client_ref() { my_server_ref_count.add_ref(); } |
| void remove_client_ref(); |
| void add_server_ref() {my_server_ref_count.add_ref();} |
| int remove_server_ref() {return my_server_ref_count.remove_ref();} |
| int get_server_ref_count() { int k = my_server_ref_count.my_ref_count; return k; } |
| void assist_cleanup( bool assist_null_only ); |
| void adjust_balance( int delta ); |
| int current_balance() const {int k = the_balance; return k;} |
| ::rml::client& client() const {return my_client;} |
| void register_as_master( server::execution_resource_t& v ) const { (IExecutionResource*&)v = my_scheduler_proxy ? my_scheduler_proxy->SubscribeCurrentThread() : NULL; } |
| // Rremove() should be called from the same thread that subscribed the current h/w thread (i.e., the one that |
| // called register_as_master() ). |
| void unregister( server::execution_resource_t v ) const {if( v ) ((IExecutionResource*)v)->Remove( my_scheduler );} |
| void add_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count, tbb_connection_v2& conn, ::tbb::spin_mutex& mtx ); |
| void add_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count, omp_connection_v2& conn, ::tbb::spin_mutex& mtx ); |
| void remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx ); |
| void mark_virtual_processors_as_lent( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx ); |
| void create_oversubscribers( unsigned n, std::vector<server_thread*>& thr_vec, omp_connection_v2& conn, ::tbb::spin_mutex& mtx ); |
| void wakeup_tbb_threads( int c, ::tbb::spin_mutex& mtx ); |
| void mark_virtual_processors_as_returned( IVirtualProcessorRoot** vprocs, unsigned int count, tbb::spin_mutex& mtx ); |
| inline void addto_original_exec_resources( IExecutionResource* r, ::tbb::spin_mutex& mtx ) { |
| ::tbb::spin_mutex::scoped_lock lck(mtx); |
| __TBB_ASSERT( !is_closing(), "try to regster master while connection is being shutdown?" ); |
| original_exec_resources.push_back( r ); |
| } |
| #if !__RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED |
| void allocate_thread_scavenger( IExecutionResource* v ); |
| #endif |
| inline thread_scavenger_thread* get_thread_scavenger() { return my_thread_scavenger_thread; } |
| }; |
| |
| garbage_connection_queue connections_to_reclaim; |
| connection_scavenger_thread connection_scavenger; |
| |
| #endif /* !RML_USE_WCRM */ |
| |
| //------------------------------------------------------------------------ |
| // generic_connection |
| //------------------------------------------------------------------------ |
| |
| template<typename Server, typename Client> |
| struct connection_traits {}; |
| |
| // head of the active tbb connections |
| static tbb::atomic<uintptr_t> active_tbb_connections; |
| static tbb::atomic<int> current_tbb_conn_readers; |
| static size_t current_tbb_conn_reader_epoch; |
| static tbb::atomic<size_t> close_tbb_connection_event_count; |
| |
| #if RML_USE_WCRM |
| template<typename Connection> |
| void make_job( Connection& c, server_thread& t ); |
| #endif |
| |
| template<typename Server, typename Client> |
| class generic_connection: public Server, no_copy { |
| /*override*/ version_type version() const {return SERVER_VERSION;} |
| /*override*/ void yield() {thread_monitor::yield();} |
| /*override*/ void independent_thread_number_changed( int delta ) { my_thread_map.adjust_balance( -delta ); } |
| /*override*/ unsigned default_concurrency() const {return hardware_concurrency()-1;} |
| friend void wakeup_some_tbb_threads(); |
| friend class connection_scavenger_thread; |
| |
| protected: |
| thread_map my_thread_map; |
| generic_connection* next_conn; |
| size_t my_ec; |
| #if RML_USE_WCRM |
| // FIXME: pad it? |
| tbb::spin_mutex map_mtx; |
| IScheduler* my_scheduler; |
| void do_open( IScheduler* s ) { |
| my_scheduler = s; |
| my_thread_map.bind( s ); |
| } |
| bool is_closing() { return my_thread_map.is_closing(); } |
| void request_close_connection( bool existing ); |
| #else |
| void do_open() {my_thread_map.bind();} |
| void request_close_connection( bool ); |
| #endif /* RML_USE_WCRM */ |
| //! Make destructor virtual |
| virtual ~generic_connection() {} |
| #if !RML_USE_WCRM |
| generic_connection( wait_counter& fc, Client& c ) : my_thread_map(fc,c), next_conn(NULL), my_ec(0) {} |
| #else |
| generic_connection( wait_counter& fc, Client& c ) : |
| my_thread_map(fc,c), next_conn(NULL), my_ec(0), map_mtx(), my_scheduler(NULL) {} |
| void add_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count ); |
| void remove_virtual_processors( IVirtualProcessorRoot** vprocs, unsigned int count ); |
| void notify_resources_externally_busy( IVirtualProcessorRoot** vprocs, unsigned int count ) { my_thread_map.mark_virtual_processors_as_lent( vprocs, count, map_mtx ); } |
| void notify_resources_externally_idle( IVirtualProcessorRoot** vprocs, unsigned int count ) { |
| my_thread_map.mark_virtual_processors_as_returned( vprocs, count, map_mtx ); |
| } |
| #endif /* !RML_USE_WCRM */ |
| |
| public: |
| typedef Server server_type; |
| typedef Client client_type; |
| Client& client() const {return static_cast<Client&>(my_thread_map.client());} |
| void set_scratch_ptr( job& j, void* ptr ) { ::rml::server::scratch_ptr(j) = ptr; } |
| #if RML_USE_WCRM |
| template<typename Connection> |
| friend void make_job( Connection& c, server_thread& t ); |
| void add_server_ref () {my_thread_map.add_server_ref();} |
| void remove_server_ref() {if( my_thread_map.remove_server_ref()==0 ) delete this;} |
| void add_client_ref () {my_thread_map.add_client_ref();} |
| void remove_client_ref() {my_thread_map.remove_client_ref();} |
| #else /* !RML_USE_WCRM */ |
| int add_server_ref () {return my_thread_map.add_server_ref();} |
| void remove_server_ref() {if( my_thread_map.remove_server_ref()==0 ) delete this;} |
| void remove_client_ref() {my_thread_map.remove_client_ref();} |
| void make_job( server_thread& t, job_automaton& ja ); |
| #endif /* RML_USE_WCRM */ |
| static generic_connection* get_addr( uintptr_t addr_ex ) { |
| return reinterpret_cast<generic_connection*>( addr_ex&~(uintptr_t)3 ); |
| } |
| }; |
| |
| //------------------------------------------------------------------------ |
| // TBB server |
| //------------------------------------------------------------------------ |
| |
| template<> |
| struct connection_traits<tbb_server,tbb_client> { |
| static const bool assist_null_only = true; |
| static const bool is_tbb = true; |
| }; |
| |
| //! Represents a server/client binding. |
| /** The internal representation uses inheritance for the server part and a pointer for the client part. */ |
| class tbb_connection_v2: public generic_connection<tbb_server,tbb_client> { |
| /*override*/ void adjust_job_count_estimate( int delta ); |
| #if !RML_USE_WCRM |
| #if _WIN32||_WIN64 |
| /*override*/ void register_master ( rml::server::execution_resource_t& /*v*/ ) {} |
| /*override*/ void unregister_master ( rml::server::execution_resource_t /*v*/ ) {} |
| #endif |
| #else |
| /*override*/ void register_master ( rml::server::execution_resource_t& v ) { |
| my_thread_map.register_as_master(v); |
| if( v ) ++nesting; |
| } |
| /*override*/ void unregister_master ( rml::server::execution_resource_t v ) { |
| if( v ) { |
| __TBB_ASSERT( nesting>0, NULL ); |
| if( --nesting==0 ) { |
| #if !__RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED |
| my_thread_map.allocate_thread_scavenger( (IExecutionResource*)v ); |
| #endif |
| } |
| } |
| my_thread_map.unregister(v); |
| } |
| IScheduler* create_scheduler() {return( scheduler<tbb_connection_v2>::create( *this ) );} |
| friend void free_all_connections( uintptr_t ); |
| friend class scheduler<tbb_connection_v2>; |
| friend class execution_context; |
| friend class connection_scavenger_thread; |
| #endif /* RML_USE_WCRM */ |
| friend void wakeup_some_tbb_threads(); |
| //! Estimate on number of jobs without threads working on them. |
| tbb::atomic<int> my_slack; |
| friend class dummy_class_to_shut_up_gratuitous_warning_from_gcc_3_2_3; |
| #if TBB_USE_ASSERT |
| tbb::atomic<int> my_job_count_estimate; |
| #endif /* TBB_USE_ASSERT */ |
| |
| tbb::atomic<int> n_adjust_job_count_requests; |
| #if RML_USE_WCRM |
| tbb::atomic<int> nesting; |
| #endif |
| |
| // dtor |
| ~tbb_connection_v2(); |
| |
| public: |
| #if RML_USE_WCRM |
| typedef tbb_server_thread server_thread_type; |
| #endif |
| //! True if there is slack that try_process can use. |
| bool has_slack() const {return my_slack>0;} |
| |
| #if RML_USE_WCRM |
| bool try_process( job& job ) |
| #else |
| bool try_process( server_thread& t, job& job ) |
| #endif |
| { |
| bool visited = false; |
| // No check for my_slack>0 here because caller is expected to do that check. |
| int k = --my_slack; |
| if( k>=0 ) { |
| #if !RML_USE_WCRM |
| t.my_extra_state = ts_visited; // remember the thread paid a trip to process() at least once |
| #endif |
| client().process(job); |
| visited = true; |
| } |
| ++my_slack; |
| return visited; |
| } |
| |
| tbb_connection_v2( wait_counter& fc, tbb_client& client ) : generic_connection<tbb_server,tbb_client>(fc,client) |
| { |
| my_slack = 0; |
| #if RML_USE_WCRM |
| nesting = 0; |
| #endif |
| #if TBB_USE_ASSERT |
| my_job_count_estimate = 0; |
| #endif /* TBB_USE_ASSERT */ |
| __TBB_ASSERT( !my_slack, NULL ); |
| |
| #if RML_USE_WCRM |
| do_open( client.max_job_count()>0 ? create_scheduler() : NULL ); |
| #else |
| do_open(); |
| #endif /* !RML_USE_WCRM */ |
| n_adjust_job_count_requests = 0; |
| |
| // Acquire head of active_tbb_connections & push the connection into the list |
| uintptr_t conn; |
| do { |
| for( ; (conn=active_tbb_connections)&1; ) |
| __TBB_Yield(); |
| } while( active_tbb_connections.compare_and_swap( conn|1, conn )!=conn ); |
| |
| this->next_conn = generic_connection<tbb_server,tbb_client>::get_addr(conn); |
| // Update and release head of active_tbb_connections |
| active_tbb_connections = (uintptr_t) this; // set and release |
| } |
| inline void wakeup_tbb_threads( unsigned n ) { |
| my_thread_map.wakeup_tbb_threads( n |
| #if RML_USE_WCRM |
| , map_mtx |
| #endif |
| ); |
| } |
| #if RML_USE_WCRM |
| inline int get_nesting_level() { return nesting; } |
| #else |
| inline bool wakeup_next_thread( thread_map::iterator i ) {return my_thread_map.wakeup_next_thread( i, *this );} |
| inline thread_map::size_type get_unrealized_threads () {return my_thread_map.get_unrealized_threads();} |
| #endif /* !RML_USE_WCRM */ |
| }; |
| |
| //------------------------------------------------------------------------ |
| // OpenMP server |
| //------------------------------------------------------------------------ |
| |
| template<> |
| struct connection_traits<omp_server,omp_client> { |
| static const bool assist_null_only = false; |
| static const bool is_tbb = false; |
| }; |
| |
| class omp_connection_v2: public generic_connection<omp_server,omp_client> { |
| #if !RML_USE_WCRM |
| /*override*/ int current_balance() const {return the_balance;} |
| #else |
| friend void free_all_connections( uintptr_t ); |
| friend class scheduler<omp_connection_v2>; |
| /*override*/ int current_balance() const {return my_thread_map.current_balance();} |
| #endif /* !RML_USE_WCRM */ |
| /*override*/ int try_increase_load( size_type n, bool strict ); |
| /*override*/ void decrease_load( size_type n ); |
| /*override*/ void get_threads( size_type request_size, void* cookie, job* array[] ); |
| #if !RML_USE_WCRM |
| #if _WIN32||_WIN64 |
| /*override*/ void register_master ( rml::server::execution_resource_t& /*v*/ ) {} |
| /*override*/ void unregister_master ( rml::server::execution_resource_t /*v*/ ) {} |
| #endif |
| #else |
| /*override*/ void register_master ( rml::server::execution_resource_t& v ) { |
| my_thread_map.register_as_master( v ); |
| my_thread_map.addto_original_exec_resources( (IExecutionResource*)v, map_mtx ); |
| } |
| /*override*/ void unregister_master ( rml::server::execution_resource_t v ) { my_thread_map.unregister(v); } |
| #endif /* !RML_USE_WCRM */ |
| #if _WIN32||_WIN64 |
| /*override*/ void deactivate( rml::job* j ); |
| /*override*/ void reactivate( rml::job* j ); |
| #endif /* _WIN32||_WIN64 */ |
| #if RML_USE_WCRM |
| public: |
| typedef omp_server_thread server_thread_type; |
| private: |
| IScheduler* create_scheduler() {return( scheduler<omp_connection_v2>::create( *this ) );} |
| #endif /* RML_USE_WCRM */ |
| public: |
| #if TBB_USE_ASSERT |
| //! Net change in delta caused by this connection. |
| /** Should be zero when connection is broken */ |
| tbb::atomic<int> net_delta; |
| #endif /* TBB_USE_ASSERT */ |
| |
| omp_connection_v2( wait_counter& fc, omp_client& client ) : generic_connection<omp_server,omp_client>(fc,client) { |
| #if TBB_USE_ASSERT |
| net_delta = 0; |
| #endif /* TBB_USE_ASSERT */ |
| #if RML_USE_WCRM |
| do_open( create_scheduler() ); |
| #else |
| do_open(); |
| #endif /* RML_USE_WCRM */ |
| } |
| ~omp_connection_v2() {__TBB_ASSERT( net_delta==0, "net increase/decrease of load is nonzero" );} |
| }; |
| |
| #if !RML_USE_WCRM |
| /* to deal with cases where the machine is oversubscribed; we want each thread to trip to try_process() at least once */ |
| /* this should not involve computing the_balance */ |
| bool thread_map::wakeup_next_thread( thread_map::iterator this_thr, tbb_connection_v2& conn ) { |
| if( all_visited_at_least_once ) |
| return false; |
| |
| iterator e = end(); |
| retry: |
| bool exist = false; |
| iterator k=this_thr; |
| for( ++k; k!=e; ++k ) { |
| // If another thread added *k, there is a tiny timing window where thread() is invalid. |
| server_thread& t = k->wait_for_thread(); |
| if( t.my_extra_state!=ts_visited ) |
| exist = true; |
| if( t.read_state()!=ts_tbb_busy && t.my_extra_state==ts_started ) |
| if( t.try_grab_for( ts_tbb_busy ) ) |
| return true; |
| } |
| for( k=begin(); k!=this_thr; ++k ) { |
| server_thread& t = k->wait_for_thread(); |
| if( t.my_extra_state!=ts_visited ) |
| exist = true; |
| if( t.read_state()!=ts_tbb_busy && t.my_extra_state==ts_started ) |
| if( t.try_grab_for( ts_tbb_busy ) ) |
| return true; |
| } |
| |
| if( exist ) |
| if( conn.has_slack() ) |
| goto retry; |
| else |
| all_visited_at_least_once = true; |
| return false; |
| } |
| |
| void thread_map::release_tbb_threads( server_thread* t ) { |
| for( ; t; t = t->link ) { |
| while( t->read_state()!=ts_asleep ) |
| __TBB_Yield(); |
| t->my_extra_state = ts_started; |
| } |
| } |
| #endif /* !RML_USE_WCRM */ |
| |
| void thread_map::adjust_balance( int delta ) { |
| int new_balance = the_balance += delta; |
| if( new_balance>0 && 0>=new_balance-delta /*== old the_balance*/ ) |
| wakeup_some_tbb_threads(); |
| } |
| |
| void thread_map::remove_client_ref() { |
| int k = my_client_ref_count.remove_ref(); |
| if( k==0 ) { |
| // Notify factory that thread has crossed back into RML. |
| --my_factory_counter; |
| // Notify client that RML is done with the client object. |
| my_client.acknowledge_close_connection(); |
| } |
| } |
| |
| #if RML_USE_WCRM |
| /** Not a member of generic_connection because we need Connection to be the derived class. */ |
| template<typename Connection> |
| void make_job( Connection& c, typename Connection::server_thread_type& t ) { |
| if( t.my_job_automaton.try_acquire() ) { |
| rml::job& j = *t.my_client.create_one_job(); |
| __TBB_ASSERT( &j!=NULL, "client:::create_one_job returned NULL" ); |
| __TBB_ASSERT( (intptr_t(&j)&1)==0, "client::create_one_job returned misaligned job" ); |
| t.my_job_automaton.set_and_release( j ); |
| c.set_scratch_ptr( j, (void*) &t ); |
| } |
| } |
| #endif /* RML_USE_WCRM */ |
| |
| #if _MSC_VER && !defined(__INTEL_COMPILER) |
| // Suppress "conditional expression is constant" warning. |
| #pragma warning( push ) |
| #pragma warning( disable: 4127 ) |
| #endif |
| #if RML_USE_WCRM |
| template<typename Server, typename Client> |
| void generic_connection<Server,Client>::request_close_connection( bool exiting ) { |
| // for TBB connections, exiting should always be false |
| if( connection_traits<Server,Client>::is_tbb ) |
| __TBB_ASSERT( !exiting, NULL); |
| #if TBB_USE_ASSERT |
| else if( exiting ) |
| reinterpret_cast<omp_connection_v2*>(this)->net_delta = 0; |
| #endif |
| if( exiting ) { |
| uintptr_t tail = connections_to_reclaim.tail; |
| while( connections_to_reclaim.tail.compare_and_swap( garbage_connection_queue::plugged, tail )!=tail ) |
| __TBB_Yield(); |
| my_thread_map.unbind( *this, map_mtx ); |
| my_thread_map.assist_cleanup( connection_traits<Server,Client>::assist_null_only ); |
| // It is assumed that the client waits for all other threads to terminate before |
| // calling request_close_connection with true. Thus, it is safe to return all |
| // outstanding connection objects that are reachable. It is possible that there may |
| // be some unreachable connection objects lying somewhere. |
| free_all_connections( connection_scavenger.grab_and_prepend( this ) ); |
| return; |
| } |
| #else /* !RML_USE_WCRM */ |
| template<typename Server, typename Client> |
| void generic_connection<Server,Client>::request_close_connection( bool ) { |
| #endif /* RML_USE_WCRM */ |
| if( connection_traits<Server,Client>::is_tbb ) { |
| // acquire the head of active tbb connections |
| uintptr_t conn; |
| do { |
| for( ; (conn=active_tbb_connections)&1; ) |
| __TBB_Yield(); |
| } while( active_tbb_connections.compare_and_swap( conn|1, conn )!=conn ); |
| |
| // Locate the current connection |
| generic_connection* pred_conn = NULL; |
| generic_connection* curr_conn = (generic_connection*) conn; |
| for( ; curr_conn && curr_conn!=this; curr_conn=curr_conn->next_conn ) |
| pred_conn = curr_conn; |
| __TBB_ASSERT( curr_conn==this, "the current connection is not in the list?" ); |
| |
| // Remove this from the list |
| if( pred_conn ) { |
| pred_conn->next_conn = curr_conn->next_conn; |
| active_tbb_connections = reinterpret_cast<uintptr_t>(generic_connection<tbb_server,tbb_client>::get_addr(active_tbb_connections)); // release it |
| } else |
| active_tbb_connections = (uintptr_t) curr_conn->next_conn; // update & release it |
| curr_conn->next_conn = NULL; |
| // Increment the tbb connection close event count |
| my_ec = ++close_tbb_connection_event_count; |
| // Wait happens in tbb_connection_v2::~tbb_connection_v2() |
| } |
| #if RML_USE_WCRM |
| my_thread_map.unbind( *this, map_mtx ); |
| my_thread_map.assist_cleanup( connection_traits<Server,Client>::assist_null_only ); |
| connection_scavenger.add_request( this ); |
| #else |
| my_thread_map.unbind(); |
| my_thread_map.assist_cleanup( connection_traits<Server,Client>::assist_null_only ); |
| // Remove extra reference |
| remove_server_ref(); |
| #endif |
| } |
| #if _MSC_VER && !defined(__INTEL_COMPILER) |
| #pragma warning( pop ) |
| #endif |
| |
| #if RML_USE_WCRM |
| |
| template<typename Server, typename Client> |
| void generic_connection<Server,Client>::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count ) |
| {} |
| |
| template<> |
| void generic_connection<tbb_server,tbb_client>::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count ) |
| { |
| my_thread_map.add_virtual_processors( vproots, count, (tbb_connection_v2&)*this, map_mtx ); |
| } |
| template<> |
| void generic_connection<omp_server,omp_client>::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count ) |
| { |
| // For OMP, since it uses ScheudlerPolicy of MinThreads==MaxThreads, this is called once when |
| // RequestInitialVirtualProcessors() is called. |
| my_thread_map.add_virtual_processors( vproots, count, (omp_connection_v2&)*this, map_mtx ); |
| } |
| |
| template<typename Server, typename Client> |
| void generic_connection<Server,Client>::remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count ) |
| { |
| __TBB_ASSERT( false, "should not be called" ); |
| } |
| /* For OMP, RemoveVirtualProcessors() will never be called. */ |
| |
| template<> |
| void generic_connection<tbb_server,tbb_client>::remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count ) |
| { |
| my_thread_map.remove_virtual_processors( vproots, count, map_mtx ); |
| } |
| |
| void tbb_connection_v2::adjust_job_count_estimate( int delta ) { |
| #if TBB_USE_ASSERT |
| my_job_count_estimate += delta; |
| #endif /* TBB_USE_ASSERT */ |
| // Atomically update slack. |
| int c = my_slack+=delta; |
| if( c>0 ) { |
| ++n_adjust_job_count_requests; |
| my_thread_map.wakeup_tbb_threads( c, map_mtx ); |
| --n_adjust_job_count_requests; |
| } |
| } |
| #endif /* RML_USE_WCRM */ |
| |
| tbb_connection_v2::~tbb_connection_v2() { |
| #if TBB_USE_ASSERT |
| if( my_job_count_estimate!=0 ) { |
| fprintf(stderr, "TBB client tried to disconnect with non-zero net job count estimate of %d\n", int(my_job_count_estimate )); |
| abort(); |
| } |
| __TBB_ASSERT( !my_slack, "attempt to destroy tbb_server with nonzero slack" ); |
| __TBB_ASSERT( this!=static_cast<tbb_connection_v2*>(generic_connection<tbb_server,tbb_client >::get_addr(active_tbb_connections)), "request_close_connection() must be called" ); |
| #endif /* TBB_USE_ASSERT */ |
| #if !RML_USE_WCRM |
| // If there are other threads ready for work, give them coins |
| if( the_balance>0 ) |
| wakeup_some_tbb_threads(); |
| #endif |
| // Someone might be accessing my data members |
| while( current_tbb_conn_readers>0 && (ptrdiff_t)(my_ec-current_tbb_conn_reader_epoch)>0 ) |
| __TBB_Yield(); |
| } |
| |
| #if !RML_USE_WCRM |
| template<typename Server, typename Client> |
| void generic_connection<Server,Client>::make_job( server_thread& t, job_automaton& ja ) { |
| if( ja.try_acquire() ) { |
| rml::job& j = *client().create_one_job(); |
| __TBB_ASSERT( &j!=NULL, "client:::create_one_job returned NULL" ); |
| __TBB_ASSERT( (intptr_t(&j)&1)==0, "client::create_one_job returned misaligned job" ); |
| ja.set_and_release( j ); |
| __TBB_ASSERT( t.my_conn && t.my_ja && t.my_job==NULL, NULL ); |
| t.my_job = &j; |
| set_scratch_ptr( j, (void*) &t ); |
| } |
| } |
| |
| void tbb_connection_v2::adjust_job_count_estimate( int delta ) { |
| #if TBB_USE_ASSERT |
| my_job_count_estimate += delta; |
| #endif /* TBB_USE_ASSERT */ |
| // Atomically update slack. |
| int c = my_slack+=delta; |
| if( c>0 ) { |
| ++n_adjust_job_count_requests; |
| // The client has work to do and there are threads available |
| thread_map::size_type n = my_thread_map.wakeup_tbb_threads(c); |
| |
| server_thread* new_threads_anchor = NULL; |
| thread_map::size_type i; |
| for( i=0; i<n; ++i ) { |
| // Obtain unrealized threads |
| thread_map::value_type* k = my_thread_map.add_one_thread( false ); |
| if( !k ) |
| // No unrealized threads left. |
| break; |
| // Eagerly start the thread off. |
| my_thread_map.bind_one_thread( *this, *k ); |
| server_thread& t = k->thread(); |
| __TBB_ASSERT( !t.link, NULL ); |
| t.link = new_threads_anchor; |
| new_threads_anchor = &t; |
| } |
| |
| thread_map::size_type j=0; |
| for( ; the_balance>0 && j<i; ++j ) { |
| if( --the_balance>=0 ) { |
| // Withdraw a coin from the bank |
| __TBB_ASSERT( new_threads_anchor, NULL ); |
| |
| server_thread* t = new_threads_anchor; |
| new_threads_anchor = t->link; |
| while( !t->try_grab_for( ts_tbb_busy ) ) |
| __TBB_Yield(); |
| t->my_extra_state = ts_started; |
| } else { |
| // Overdraft. return it to the bank |
| ++the_balance; |
| break; |
| } |
| } |
| __TBB_ASSERT( i-j!=0||new_threads_anchor==NULL, NULL ); |
| // Mark the ones that did not get started as eligible for being snatched. |
| if( new_threads_anchor ) |
| my_thread_map.release_tbb_threads( new_threads_anchor ); |
| |
| --n_adjust_job_count_requests; |
| } |
| } |
| #endif /* RML_USE_WCRM */ |
| |
| #if RML_USE_WCRM |
| int omp_connection_v2::try_increase_load( size_type n, bool strict ) { |
| __TBB_ASSERT(int(n)>=0,NULL); |
| if( strict ) { |
| the_balance -= int(n); |
| } else { |
| int avail, old; |
| do { |
| avail = the_balance; |
| if( avail<=0 ) { |
| // No atomic read-write-modify operation necessary. |
| return avail; |
| } |
| // Don't read the_system_balance; if it changes, compare_and_swap will fail anyway. |
| old = the_balance.compare_and_swap( int(n)<avail ? avail-n : 0, avail ); |
| } while( old!=avail ); |
| if( int(n)>avail ) |
| n=avail; |
| } |
| #if TBB_USE_ASSERT |
| net_delta += n; |
| #endif /* TBB_USE_ASSERT */ |
| return n; |
| } |
| |
| void omp_connection_v2::decrease_load( size_type /*n*/ ) {} |
| |
| void omp_connection_v2::get_threads( size_type request_size, void* cookie, job* array[] ) { |
| unsigned index = 0; |
| std::vector<omp_server_thread*> enlisted(request_size); |
| std::vector<thread_grab_t> to_activate(request_size); |
| |
| if( request_size==0 ) return; |
| |
| { |
| tbb::spin_mutex::scoped_lock lock(map_mtx); |
| |
| __TBB_ASSERT( !is_closing(), "try to get threads while connection is being shutdown?" ); |
| |
| for( int scan=0; scan<2; ++scan ) { |
| for( thread_map::iterator i=my_thread_map.begin(); i!=my_thread_map.end(); ++i ) { |
| omp_server_thread* thr = (omp_server_thread*) (*i).second; |
| // in the first scan, skip VPs that are lent |
| if( scan==0 && thr->is_lent() ) continue; |
| thread_grab_t res = thr->try_grab_for(); |
| if( res!=wk_failed ) {// && if is not busy by some other scheduler |
| to_activate[index] = res; |
| enlisted[index] = thr; |
| if( ++index==request_size ) |
| goto activate_threads; |
| } |
| } |
| } |
| } |
| |
| activate_threads: |
| |
| for( unsigned i=0; i<index; ++i ) { |
| omp_server_thread* thr = enlisted[i]; |
| if( to_activate[i]==wk_from_asleep ) |
| thr->get_virtual_processor()->Activate( thr ); |
| job* j = thr->wait_for_job(); |
| array[i] = j; |
| thr->omp_data.produce( client(), *j, cookie, i PRODUCE_ARG(*this) ); |
| } |
| |
| if( index==request_size ) |
| return; |
| |
| // If we come to this point, it must be becuase dynamic==false |
| // Create Oversubscribers.. |
| |
| // Note that our policy is such that MinConcurrency==MaxConcurrency. |
| // RM will deliver MaxConcurrency of VirtualProcessors and no more. |
| __TBB_ASSERT( request_size>index, NULL ); |
| unsigned n = request_size - index; |
| std::vector<server_thread*> thr_vec(n); |
| typedef std::vector<server_thread*>::iterator iterator_thr; |
| my_thread_map.create_oversubscribers( n, thr_vec, *this, map_mtx ); |
| for( iterator_thr ti=thr_vec.begin(); ti!=thr_vec.end(); ++ti ) { |
| omp_server_thread* thr = (omp_server_thread*) *ti; |
| __TBB_ASSERT( thr, "thread not created?" ); |
| // Thread is already grabbed; since it is nrewly created, we need to activate it. |
| thr->get_virtual_processor()->Activate( thr ); |
| job* j = thr->wait_for_job(); |
| array[index] = j; |
| thr->omp_data.produce( client(), *j, cookie, index PRODUCE_ARG(*this) ); |
| ++index; |
| } |
| } |
| |
| #if _WIN32||_WIN64 |
| void omp_connection_v2::deactivate( rml::job* j ) |
| { |
| my_thread_map.adjust_balance(1); |
| #if TBB_USE_ASSERT |
| net_delta -= 1; |
| #endif |
| omp_server_thread* thr = (omp_server_thread*) scratch_ptr( *j ); |
| (thr->get_virtual_processor())->Deactivate( thr ); |
| } |
| |
| void omp_connection_v2::reactivate( rml::job* j ) |
| { |
| // Should not adjust the_balance because OMP client is supposed to |
| // do try_increase_load() to reserve the threads to use. |
| omp_server_thread* thr = (omp_server_thread*) scratch_ptr( *j ); |
| (thr->get_virtual_processor())->Activate( thr ); |
| } |
| #endif /* !_WIN32||_WIN64 */ |
| |
| #endif /* RML_USE_WCRM */ |
| |
| //! Wake up some available tbb threads |
| void wakeup_some_tbb_threads() |
| { |
| /* First, atomically grab the connection, then increase the server ref count to keep |
| it from being released prematurely. Second, check if the balance is available for TBB |
| and the tbb conneciton has slack to exploit. If the answer is true, go ahead and |
| try to wake some up. */ |
| if( generic_connection<tbb_server,tbb_client >::get_addr(active_tbb_connections)==0 ) |
| // the next connection will see the change; return. |
| return; |
| |
| start_it_over: |
| int n_curr_readers = ++current_tbb_conn_readers; |
| if( n_curr_readers>1 ) // I lost |
| return; |
| // if n_curr_readers==1, i am the first one, so I will take responsibility for waking tbb threads up. |
| |
| // update the current epoch |
| current_tbb_conn_reader_epoch = close_tbb_connection_event_count; |
| |
| // read and clear |
| // Newly added connection will not invalidate the pointer, and it will |
| // compete with the current one to claim coins. |
| // One that is about to close the connection increments the event count |
| // after it removes the connection from the list. But it will keep around |
| // the connection until all readers including this one catch up. So, reading |
| // the head and clearing the lock bit should be o.k. |
| generic_connection<tbb_server,tbb_client>* next_conn_wake_up = generic_connection<tbb_server,tbb_client>::get_addr( active_tbb_connections ); |
| |
| for( ; next_conn_wake_up; ) { |
| /* some threads are creating tbb server threads; they may not see my changes made to the_balance */ |
| /* When a thread is in adjust_job_count_estimate() to increase the slack |
| RML tries to activate worker threads on behalf of the requesting thread |
| by repeatedly drawing a coin from the bank optimistically and grabbing a |
| thread. If it finds the bank overdrafted, it returns the coin back to |
| the bank and returns the control to the thread (return from the method). |
| There lies a tiny timing hole. |
| |
| When the overdraft occurs (note that multiple masters may be in |
| adjust_job_count_estimate() so the_balance can be any negative value) and |
| a worker returns from the TBB work at that moment, its returning the coin |
| does not bump up the_balance over 0, so it happily returns from |
| wakeup_some_tbb_threads() without attempting to give coins to worker threads |
| that are ready. |
| */ |
| while( ((tbb_connection_v2*)next_conn_wake_up)->n_adjust_job_count_requests>0 ) |
| __TBB_Yield(); |
| |
| int bal = the_balance; |
| n_curr_readers = current_tbb_conn_readers; // get the snapshot |
| if( bal<=0 ) break; |
| // if the connection is deleted, the following will immediately return because its slack would be 0 or less. |
| |
| tbb_connection_v2* tbb_conn = (tbb_connection_v2*)next_conn_wake_up; |
| int my_slack = tbb_conn->my_slack; |
| if( my_slack>0 ) tbb_conn->wakeup_tbb_threads( my_slack ); |
| next_conn_wake_up = next_conn_wake_up->next_conn; |
| } |
| |
| int delta = current_tbb_conn_readers -= n_curr_readers; |
| //if delta>0, more threads entered the routine since this one took the snapshot |
| if( delta>0 ) { |
| current_tbb_conn_readers = 0; |
| if( the_balance>0 && generic_connection<tbb_server,tbb_client >::get_addr(active_tbb_connections)!=0 ) |
| goto start_it_over; |
| } |
| |
| // Signal any connection that is waiting for me to complete my access that I am done. |
| current_tbb_conn_reader_epoch = close_tbb_connection_event_count; |
| } |
| |
| #if !RML_USE_WCRM |
| int omp_connection_v2::try_increase_load( size_type n, bool strict ) { |
| __TBB_ASSERT(int(n)>=0,NULL); |
| if( strict ) { |
| the_balance -= int(n); |
| } else { |
| int avail, old; |
| do { |
| avail = the_balance; |
| if( avail<=0 ) { |
| // No atomic read-write-modify operation necessary. |
| return avail; |
| } |
| // don't read the_balance; if it changes, compare_and_swap will fail anyway. |
| old = the_balance.compare_and_swap( int(n)<avail ? avail-n : 0, avail ); |
| } while( old!=avail ); |
| if( int(n)>avail ) |
| n=avail; |
| } |
| #if TBB_USE_ASSERT |
| net_delta += n; |
| #endif /* TBB_USE_ASSERT */ |
| return n; |
| } |
| |
| void omp_connection_v2::decrease_load( size_type n ) { |
| __TBB_ASSERT(int(n)>=0,NULL); |
| my_thread_map.adjust_balance(int(n)); |
| #if TBB_USE_ASSERT |
| net_delta -= n; |
| #endif /* TBB_USE_ASSERT */ |
| } |
| |
| void omp_connection_v2::get_threads( size_type request_size, void* cookie, job* array[] ) { |
| |
| if( !request_size ) |
| return; |
| |
| unsigned index = 0; |
| for(;;) { // don't return until all request_size threads are grabbed. |
| // Need to grab some threads |
| thread_map::iterator k_end=my_thread_map.end(); |
| for( thread_map::iterator k=my_thread_map.begin(); k!=k_end; ++k ) { |
| // If another thread added *k, there is a tiny timing window where thread() is invalid. |
| server_thread& t = k->wait_for_thread(); |
| if( t.try_grab_for( ts_omp_busy ) ) { |
| // The preincrement instead of post-increment of index is deliberate. |
| job& j = k->wait_for_job(); |
| array[index] = &j; |
| t.omp_dispatch.produce( client(), j, cookie, index PRODUCE_ARG(*this) ); |
| if( ++index==request_size ) |
| return; |
| } |
| } |
| // Need to allocate more threads |
| for( unsigned i=index; i<request_size; ++i ) { |
| __TBB_ASSERT( index<request_size, NULL ); |
| thread_map::value_type* k = my_thread_map.add_one_thread( true ); |
| #if TBB_USE_ASSERT |
| if( !k ) { |
| // Client erred |
| __TBB_ASSERT(false, "server::get_threads: exceeded job_count\n"); |
| } |
| #endif |
| my_thread_map.bind_one_thread( *this, *k ); |
| server_thread& t = k->thread(); |
| if( t.try_grab_for( ts_omp_busy ) ) { |
| job& j = k->wait_for_job(); |
| array[index] = &j; |
| // The preincrement instead of post-increment of index is deliberate. |
| t.omp_dispatch.produce( client(), j, cookie, index PRODUCE_ARG(*this) ); |
| if( ++index==request_size ) |
| return; |
| } // else someone else snatched it. |
| } |
| } |
| } |
| #endif /* !RML_USE_WCRM */ |
| |
| //------------------------------------------------------------------------ |
| // Methods of omp_dispatch_type |
| //------------------------------------------------------------------------ |
| void omp_dispatch_type::consume() { |
| job_type* j = job; |
| // Wait for short window between when master sets state of this thread to ts_omp_busy |
| // and master thread calls produce. |
| if( !j ) { |
| tbb::internal::atomic_backoff bo; |
| do { |
| bo.pause(); |
| j = job; |
| } while( !j ); |
| } |
| job = static_cast<job_type*>(NULL); |
| client->process(*j,cookie,index); |
| #if TBB_USE_ASSERT |
| // Return of method process implies "decrease_load" from client's viewpoint, even though |
| // the actual adjustment of the_balance only happens when this thread really goes to sleep. |
| --server->net_delta; |
| #endif /* TBB_USE_ASSERT */ |
| } |
| |
| #if !RML_USE_WCRM |
| #if _WIN32||_WIN64 |
| void omp_connection_v2::deactivate( rml::job* j ) |
| { |
| #if TBB_USE_ASSERT |
| net_delta -= 1; |
| #endif |
| __TBB_ASSERT( j, NULL ); |
| server_thread* thr = (server_thread*) scratch_ptr( *j ); |
| thr->deactivate(); |
| } |
| |
| void omp_connection_v2::reactivate( rml::job* j ) |
| { |
| // Should not adjust the_balance because OMP client is supposed to |
| // do try_increase_load() to reserve the threads to use. |
| __TBB_ASSERT( j, NULL ); |
| server_thread* thr = (server_thread*) scratch_ptr( *j ); |
| thr->reactivate(); |
| } |
| #endif /* _WIN32||_WIN64 */ |
| |
| //------------------------------------------------------------------------ |
| // Methods of server_thread |
| //------------------------------------------------------------------------ |
| |
| server_thread::server_thread() : |
| ref_count(0), |
| link(NULL), |
| my_map_pos(), |
| my_conn(NULL), my_job(NULL), my_ja(NULL) |
| { |
| state = ts_idle; |
| terminate = false; |
| #if TBB_USE_ASSERT |
| has_active_thread = false; |
| #endif /* TBB_USE_ASSERT */ |
| } |
| |
| server_thread::~server_thread() { |
| __TBB_ASSERT( !has_active_thread, NULL ); |
| } |
| |
| #if _MSC_VER && !defined(__INTEL_COMPILER) |
| // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced |
| #pragma warning(push) |
| #pragma warning(disable:4189) |
| #endif |
| __RML_DECL_THREAD_ROUTINE server_thread::thread_routine( void* arg ) { |
| server_thread* self = static_cast<server_thread*>(arg); |
| AVOID_64K_ALIASING( self->my_index ); |
| #if TBB_USE_ASSERT |
| __TBB_ASSERT( !self->has_active_thread, NULL ); |
| self->has_active_thread = true; |
| #endif /* TBB_USE_ASSERT */ |
| self->loop(); |
| return 0; |
| } |
| #if _MSC_VER && !defined(__INTEL_COMPILER) |
| #pragma warning(pop) |
| #endif |
| |
| void server_thread::launch( size_t stack_size ) { |
| thread_monitor::launch( thread_routine, this, stack_size ); |
| } |
| |
| void server_thread::sleep_perhaps( thread_state_t asleep ) { |
| if( terminate ) return; |
| __TBB_ASSERT( asleep==ts_asleep, NULL ); |
| thread_monitor::cookie c; |
| monitor.prepare_wait(c); |
| if( state.compare_and_swap( asleep, ts_idle )==ts_idle ) { |
| if( !terminate ) { |
| monitor.commit_wait(c); |
| // Someone else woke me up. The compare_and_swap further below deals with spurious wakeups. |
| } else { |
| monitor.cancel_wait(); |
| } |
| thread_state_t s = read_state(); |
| if( s==ts_asleep ) { |
| state.compare_and_swap( ts_idle, ts_asleep ); |
| // I woke myself up, either because I cancelled the wait or suffered a spurious wakeup. |
| } else { |
| // Someone else woke me up; there the_balance is decremented by 1. -- tbb only |
| if( !is_omp_thread ) { |
| __TBB_ASSERT( s==ts_tbb_busy||s==ts_idle, NULL ); |
| } |
| } |
| } else { |
| // someone else made it busy ; see try_grab_for when state==ts_idle. |
| __TBB_ASSERT( state==ts_omp_busy||state==ts_tbb_busy, NULL ); |
| monitor.cancel_wait(); |
| } |
| __TBB_ASSERT( read_state()!=asleep, "a thread can only put itself to sleep" ); |
| } |
| |
| bool server_thread::wakeup( thread_state_t to, thread_state_t from ) { |
| bool success = false; |
| __TBB_ASSERT( from==ts_asleep && (to==ts_idle||to==ts_omp_busy||to==ts_tbb_busy), NULL ); |
| if( state.compare_and_swap( to, from )==from ) { |
| if( !is_omp_thread ) __TBB_ASSERT( to==ts_idle||to==ts_tbb_busy, NULL ); |
| // There is a small timing window that permits balance to become negative, |
| // but such occurrences are probably rare enough to not worry about, since |
| // at worst the result is slight temporary oversubscription. |
| monitor.notify(); |
| success = true; |
| } |
| return success; |
| } |
| |
| //! Attempt to change a thread's state to ts_omp_busy, and waking it up if necessary. |
| bool server_thread::try_grab_for( thread_state_t target_state ) { |
| bool success = false; |
| switch( read_state() ) { |
| case ts_asleep: |
| success = wakeup( target_state, ts_asleep ); |
| break; |
| case ts_idle: |
| success = state.compare_and_swap( target_state, ts_idle )==ts_idle; |
| break; |
| default: |
| // Thread is not available to be part of an OpenMP thread team. |
| break; |
| } |
| return success; |
| } |
| |
| #if _WIN32||_WIN64 |
| void server_thread::deactivate() { |
| thread_state_t es = (thread_state_t) my_extra_state.fetch_and_store( ts_deactivated ); |
| __TBB_ASSERT( my_extra_state==ts_deactivated, "someone else tampered with my_extra_state?" ); |
| if( es==ts_none ) |
| state = ts_idle; |
| else |
| __TBB_ASSERT( es==ts_reactivated, "Cannot call deactivate() while in ts_deactivated" ); |
| // only the thread can transition itself from ts_deactivted to ts_none |
| __TBB_ASSERT( my_extra_state==ts_deactivated, "someone else tampered with my_extra_state?" ); |
| my_extra_state = ts_none; // release the critical section |
| int bal = ++the_balance; |
| if( bal>0 ) |
| wakeup_some_tbb_threads(); |
| if( es==ts_none ) |
| sleep_perhaps( ts_asleep ); |
| } |
| |
| void server_thread::reactivate() { |
| thread_state_t es; |
| do { |
| while( (es=read_extra_state())==ts_deactivated ) |
| __TBB_Yield(); |
| if( es==ts_reactivated ) { |
| __TBB_ASSERT( false, "two Reactivate() calls in a row. Should not happen" ); |
| return; |
| } |
| __TBB_ASSERT( es==ts_none, NULL ); |
| } while( (thread_state_t)my_extra_state.compare_and_swap( ts_reactivated, ts_none )!=ts_none ); |
| if( state!=ts_omp_busy ) { |
| my_extra_state = ts_none; |
| while( !try_grab_for( ts_omp_busy ) ) |
| __TBB_Yield(); |
| } |
| } |
| #endif /* _WIN32||_WIN64 */ |
| |
| |
| template<typename Connection> |
| bool server_thread::destroy_job( Connection& c ) { |
| __TBB_ASSERT( !is_omp_thread||(state==ts_idle||state==ts_omp_busy), NULL ); |
| __TBB_ASSERT( is_omp_thread||(state==ts_idle||state==ts_tbb_busy), NULL ); |
| if( !is_omp_thread ) { |
| __TBB_ASSERT( state==ts_idle||state==ts_tbb_busy, NULL ); |
| if( state==ts_idle ) |
| state.compare_and_swap( ts_done, ts_idle ); |
| // 'state' may be set to ts_tbb_busy by another thread. |
| |
| if( state==ts_tbb_busy ) { // return the coin to the deposit |
| // need to deposit first to let the next connection see the change |
| ++the_balance; |
| state = ts_done; // no other thread changes the state when it is ts_*_busy |
| } |
| } |
| if( job_automaton* ja = my_ja ) { |
| rml::job* j; |
| if( ja->try_plug(j) ) { |
| __TBB_ASSERT( j, NULL ); |
| c.client().cleanup(*j); |
| c.remove_client_ref(); |
| } else { |
| // Some other thread took responsibility for cleaning up the job. |
| } |
| } |
| // Must do remove client reference first, because execution of |
| // c.remove_ref() can cause *this to be destroyed. |
| int k = remove_ref(); |
| __TBB_ASSERT_EX( k==0, "more than one references?" ); |
| #if TBB_USE_ASSERT |
| has_active_thread = false; |
| #endif /* TBB_USE_ASSERT */ |
| c.remove_server_ref(); |
| return true; |
| } |
| |
| bool server_thread::do_termination() { |
| if( is_omp_thread ) |
| return destroy_job( *static_cast<omp_connection_v2*>(my_conn) ); |
| else |
| return destroy_job( *static_cast<tbb_connection_v2*>(my_conn) ); |
| } |
| |
| //! Loop that each thread executes |
| void server_thread::loop() { |
| if( is_omp_thread ) |
| static_cast<omp_connection_v2*>(my_conn)->make_job( *this, *my_ja ); |
| else |
| static_cast<tbb_connection_v2*>(my_conn)->make_job( *this, *my_ja ); |
| for(;;) { |
| __TBB_Yield(); |
| if( state==ts_idle ) |
| sleep_perhaps( ts_asleep ); |
| |
| // Check whether I should quit. |
| if( terminate ) |
| if( do_termination() ) |
| return; |
| |
| // read the state |
| thread_state_t s = read_state(); |
| __TBB_ASSERT( s==ts_idle||s==ts_omp_busy||s==ts_tbb_busy, NULL ); |
| |
| if( s==ts_omp_busy ) { |
| // Enslaved by OpenMP team. |
| omp_dispatch.consume(); |
| /* here wake tbb threads up if feasible */ |
| if( ++the_balance>0 ) |
| wakeup_some_tbb_threads(); |
| state = ts_idle; |
| } else if( s==ts_tbb_busy ) { |
| // do some TBB work. |
| __TBB_ASSERT( my_conn && my_job, NULL ); |
| tbb_connection_v2& conn = *static_cast<tbb_connection_v2*>(my_conn); |
| // give openmp higher priority |
| bool has_coin = true; |
| if( conn.has_slack() ) { |
| // it has the coin, it should trip to the scheduler at least once as long as its slack is positive |
| do { |
| if( conn.try_process( *this, *my_job ) ) |
| if( conn.has_slack() && the_balance>=0 ) |
| has_coin = !conn.wakeup_next_thread( my_map_pos ); |
| } while( has_coin && conn.has_slack() && the_balance>=0 ); |
| } |
| state = ts_idle; |
| if( has_coin ) { |
| ++the_balance; // return the coin back to the deposit |
| if( conn.has_slack() ) { // a new adjust_job_request_estimate() is in progress |
| // it may have missed my changes to state and/or the_balance |
| if( --the_balance>=0 ) { // try to grab the coin back |
| // I got the coin |
| if( state.compare_and_swap( ts_tbb_busy, ts_idle )!=ts_idle ) |
| ++the_balance; // someone else enlisted me. |
| } else { |
| // overdraft. return the coin |
| ++the_balance; |
| } |
| } // else the new request will see my changes to state & the_balance. |
| } |
| /* here wake tbb threads up if feasible */ |
| if( the_balance>0 ) |
| wakeup_some_tbb_threads(); |
| } |
| } |
| } |
| #endif /* !RML_USE_WCRM */ |
| |
| #if RML_USE_WCRM |
| |
| class tbb_connection_v2; |
| class omp_connection_v2; |
| |
| #define CREATE_SCHEDULER_POLICY(policy,min_thrs,max_thrs,stack_size) \ |
| try { \ |
| policy = new SchedulerPolicy (7, \ |
| SchedulerKind, RML_THREAD_KIND, /*defined in _rml_serer_msrt.h*/ \ |
| MinConcurrency, min_thrs, \ |
| MaxConcurrency, max_thrs, \ |
| TargetOversubscriptionFactor, 1, \ |
| ContextStackSize, stack_size/1000, /*ConcRT:kB, iRML:bytes*/ \ |
| ContextPriority, THREAD_PRIORITY_NORMAL, \ |
| DynamicProgressFeedback, ProgressFeedbackDisabled ); \ |
| } catch ( invalid_scheduler_policy_key & ) { \ |
| __TBB_ASSERT( false, "invalid scheduler policy key exception caught" );\ |
| } catch ( invalid_scheduler_policy_value & ) { \ |
| __TBB_ASSERT( false, "invalid scheduler policy value exception caught" );\ |
| } |
| |
| static unsigned int core_count; |
| static tbb::atomic<int> core_count_inited; |
| |
| |
| static unsigned int get_processor_count() |
| { |
| if( core_count_inited!=2 ) { |
| if( core_count_inited.compare_and_swap( 1, 0 )==0 ) { |
| core_count = GetProcessorCount(); |
| core_count_inited = 2; |
| } else { |
| tbb::internal::spin_wait_until_eq( core_count_inited, 2 ); |
| } |
| } |
| return core_count; |
| } |
| |
| template<typename Connection> |
| scheduler<Connection>::scheduler( Connection& conn ) : uid(GetSchedulerId()), my_conn(conn) {} |
| |
| template<> |
| scheduler<tbb_connection_v2>::scheduler( tbb_connection_v2& conn ) : uid(GetSchedulerId()), my_conn(conn) |
| { |
| rml::client& cl = my_conn.client(); |
| unsigned max_job_count = cl.max_job_count(); |
| unsigned count = get_processor_count(); |
| __TBB_ASSERT( max_job_count>0, "max job count must be positive" ); |
| __TBB_ASSERT( count>1, "The processor count must be greater than 1" ); |
| if( max_job_count>count-1) max_job_count = count-1; |
| CREATE_SCHEDULER_POLICY( my_policy, 0, max_job_count, cl.min_stack_size() ); |
| } |
| |
| #if __RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED |
| template<> |
| void scheduler<tbb_connection_v2>::RemoveVirtualProcessors( IVirtualProcessorRoot**, unsigned int) |
| { |
| } |
| #else |
| template<> |
| void scheduler<tbb_connection_v2>::RemoveVirtualProcessors( IVirtualProcessorRoot** vproots, unsigned int count ) |
| { |
| if( !my_conn.is_closing() ) |
| my_conn.remove_virtual_processors( vproots, count ); |
| } |
| #endif |
| |
| template<> |
| void scheduler<tbb_connection_v2>::NotifyResourcesExternallyIdle( IVirtualProcessorRoot** /*vproots*/, unsigned int /*count*/) |
| { |
| __TBB_ASSERT( false, "NotifyResourcesExternallyIdle() is not allowed for TBB" ); |
| } |
| |
| template<> |
| void scheduler<tbb_connection_v2>::NotifyResourcesExternallyBusy( IVirtualProcessorRoot** /*vproots*/, unsigned int /*count*/ ) |
| { |
| __TBB_ASSERT( false, "NotifyResourcesExternallyBusy() is not allowed for TBB" ); |
| } |
| |
| template<> |
| scheduler<omp_connection_v2>::scheduler( omp_connection_v2& conn ) : uid(GetSchedulerId()), my_conn(conn) |
| { |
| unsigned count = get_processor_count(); |
| rml::client& cl = my_conn.client(); |
| __TBB_ASSERT( count>1, "The processor count must be greater than 1" ); |
| CREATE_SCHEDULER_POLICY( my_policy, count-1, count-1, cl.min_stack_size() ); |
| } |
| |
| template<> |
| void scheduler<omp_connection_v2>::RemoveVirtualProcessors( IVirtualProcessorRoot** /*vproots*/, unsigned int /*count*/ ) { |
| __TBB_ASSERT( false, "RemoveVirtualProcessors() is not allowed for OMP" ); |
| } |
| |
| template<> |
| void scheduler<omp_connection_v2>::NotifyResourcesExternallyIdle( IVirtualProcessorRoot** vproots, unsigned int count ){ |
| if( !my_conn.is_closing() ) |
| my_conn.notify_resources_externally_idle( vproots, count ); |
| } |
| |
| template<> |
| void scheduler<omp_connection_v2>::NotifyResourcesExternallyBusy( IVirtualProcessorRoot** vproots, unsigned int count ){ |
| if( !my_conn.is_closing() ) |
| my_conn.notify_resources_externally_busy( vproots, count ); |
| } |
| |
| /* ts_idle, ts_asleep, ts_busy */ |
| void tbb_server_thread::Dispatch( DispatchState* ) { |
| // Activate() will resume a thread right after Deactivate() as if it returns from the call |
| tbb_connection_v2* tbb_conn = static_cast<tbb_connection_v2*>(my_conn); |
| make_job( *tbb_conn, *this ); |
| |
| for( ;; ) { |
| // Try to wake some tbb threads if the balance is positive. |
| // When a thread is added by ConcRT and enter here for the first time, |
| // the thread may wake itself up (i.e., atomically change its state to ts_busy. |
| if( the_balance>0 ) |
| wakeup_some_tbb_threads(); |
| if( read_state()!=ts_busy ) |
| if( sleep_perhaps() ) |
| return; |
| if( terminate ) |
| if( initiate_termination() ) |
| return; |
| if( read_state()==ts_busy ) { |
| // this thread has a coin (i.e., state=ts_busy; it should trip to the scheduler at least once |
| if ( tbb_conn->has_slack() ) { |
| do { |
| tbb_conn->try_process( *wait_for_job() ); |
| } while( tbb_conn->has_slack() && the_balance>=0 && !is_removed() ); |
| } |
| __TBB_ASSERT( read_state()==ts_busy, "thread is not in busy state after returning from process()" ); |
| // see remove_virtual_processors() |
| if( my_state.compare_and_swap( ts_idle, ts_busy )==ts_busy ) { |
| int bal = ++the_balance; |
| if( tbb_conn->has_slack() ) { |
| // slack is positive, volunteer to help |
| bal = --the_balance; // try to grab the coin back |
| if( bal>=0 ) { // got the coin back |
| if( my_state.compare_and_swap( ts_busy, ts_idle )!=ts_idle ) |
| ++the_balance; // someone else enlisted me. |
| // else my_state is ts_busy, I will come back to tbb_conn->try_process(). |
| } else { |
| // overdraft. return the coin |
| ++the_balance; |
| } |
| } // else the new request will see my changes to state & the_balance. |
| } else { |
| __TBB_ASSERT( false, "someone tampered with my state" ); |
| } |
| } // someone else might set the state to somthing other than ts_idle |
| } |
| } |
| |
| void omp_server_thread::Dispatch( DispatchState* ) { |
| // Activate() will resume a thread right after Deactivate() as if it returns from the call |
| make_job( *static_cast<omp_connection_v2*>(my_conn), *this ); |
| |
| for( ;; ) { |
| if( read_state()!=ts_busy ) |
| sleep_perhaps(); |
| if( terminate ) { |
| if( initiate_termination() ) |
| return; |
| } |
| if( read_state()==ts_busy ) { |
| omp_data.consume(); |
| __TBB_ASSERT( read_state()==ts_busy, "thread is not in busy state after returning from process()" ); |
| my_thread_map.adjust_balance( 1 ); |
| set_state( ts_idle ); |
| } |
| // someone else might set the state to somthing other than ts_idle |
| } |
| } |
| |
| //! Attempt to change a thread's state to ts_omp_busy, and waking it up if necessary. |
| thread_grab_t server_thread_rep::try_grab_for() { |
| thread_grab_t res = wk_failed; |
| thread_state_t s = read_state(); |
| switch( s ) { |
| case ts_asleep: |
| if( wakeup( ts_busy, ts_asleep ) ) |
| res = wk_from_asleep; |
| __TBB_ASSERT( res==wk_failed||read_state()==ts_busy, NULL ); |
| break; |
| case ts_idle: |
| if( my_state.compare_and_swap( ts_busy, ts_idle )==ts_idle ) |
| res = wk_from_idle; |
| // At this point a thread is grabbed (i.e., its state has changed to ts_busy. |
| // It is possible that the thread 1) processes the job, returns from process() and |
| // sets its state ts_idle again. In some cases, it even sets its state to ts_asleep. |
| break; |
| default: |
| break; |
| } |
| return res; |
| } |
| |
| bool tbb_server_thread::switch_out() { |
| thread_state_t s = read_state(); |
| __TBB_ASSERT( s==ts_asleep||s==ts_busy, NULL ); |
| // This thread comes back from the TBB scheduler, and changed its state to ts_asleep successfully. |
| // The master enlisted it and woke it up by Activate()'ing it; now it is emerging from Deactivated(). |
| // ConcRT requested for removal of the vp associated with the thread, and RML marks it removed. |
| // Now, it has ts_busy, and removed. -- we should remove it. |
| IExecutionResource* old_vp = my_execution_resource; |
| if( s==ts_busy ) { |
| ++the_balance; |
| my_state = ts_asleep; |
| } |
| IThreadProxy* proxy = my_proxy; |
| __TBB_ASSERT( proxy, NULL ); |
| my_execution_resource = (IExecutionResource*) c_remove_prepare; |
| old_vp->Remove( my_scheduler ); |
| my_execution_resource = (IExecutionResource*) c_remove_returned; |
| int cnt = --activation_count; |
| __TBB_ASSERT_EX( cnt==0||cnt==1, "too many activations?" ); |
| proxy->SwitchOut(); |
| if( terminate ) { |
| bool activated = activation_count==1; |
| #if TBB_USE_ASSERT |
| /* In a rare sequence of events, a thread comes out of SwitchOut with activation_count==1. |
| * 1) The thread is SwitchOut'ed. |
| * 2) AddVirtualProcessors() arrived and the thread is Activated. |
| * 3) The thread is coming out of SwitchOut(). |
| * 4) request_close_connection arrives and inform the thread that it is time to terminate. |
| * 5) The thread hits the check and falls into the path with 'activated==true'. |
| * In that case, do the clean-up but do not switch to the thread scavenger; rather simply return to RM. |
| */ |
| if( activated ) { |
| // thread is 'revived' in add_virtual_processors after being Activated(). |
| // so, if the thread extra state is still marked 'removed', it will shortly change to 'none' |
| // i.e., !is_remove(). The thread state is changed to ts_idle before the extra state, so |
| // the thread's state should be either ts_idle or ts_done. |
| while( is_removed() ) |
| __TBB_Yield(); |
| thread_state_t s = read_state(); |
| __TBB_ASSERT( s==ts_idle || s==ts_done, NULL ); |
| } |
| #endif |
| __TBB_ASSERT( my_state==ts_asleep||my_state==ts_idle, NULL ); |
| // it is possible that in make_job() the thread may not have a chance to create a job. |
| // my_job may not be set if the thread did not get a chance to process client's job (i.e., call try_process()) |
| rml::job* j; |
| if( my_job_automaton.try_plug(j) ) { |
| __TBB_ASSERT( j, NULL ); |
| my_client.cleanup(*j); |
| my_conn->remove_client_ref(); |
| } |
| // Must do remove client reference first, because execution of |
| // c.remove_ref() can cause *this to be destroyed. |
| if( !activated ) |
| proxy->SwitchTo( my_thread_map.get_thread_scavenger(), Idle ); |
| my_conn->remove_server_ref(); |
| return true; |
| } |
| // We revive a thread in add_virtual_processors() after we Activate the thread on a new virtual processor. |
| // So briefly wait until the thread's my_execution_resource gets set. |
| while( get_virtual_processor()==c_remove_returned ) |
| __TBB_Yield(); |
| return false; |
| } |
| |
| bool tbb_server_thread::sleep_perhaps () { |
| if( terminate ) return false; |
| thread_state_t s = read_state(); |
| if( s==ts_idle ) { |
| if( my_state.compare_and_swap( ts_asleep, ts_idle )==ts_idle ) { |
| // If a thread is between read_state() and compare_and_swap(), and the master tries to terminate, |
| // the master's compare_and_swap() will fail because the thread's state is ts_idle. |
| // We need to check if terminate is true or not before letting the thread go to sleep oetherwise |
| // we will miss the terminate signal. |
| if( !terminate ) { |
| if( !is_removed() ) { |
| --activation_count; |
| get_virtual_processor()->Deactivate( this ); |
| } |
| if( is_removed() ) { |
| if( switch_out() ) |
| return true; |
| __TBB_ASSERT( my_execution_resource>c_remove_returned, NULL ); |
| } |
| // in add_virtual_processors(), when we revive a thread, we change its state after Activate the thread |
| // in that case the state may be ts_asleep for a short period |
| while( read_state()==ts_asleep ) |
| __TBB_Yield(); |
| } else { |
| if( my_state.compare_and_swap( ts_done, ts_asleep )!=ts_asleep ) { |
| --activation_count; |
| // unbind() changed my state. It will call Activate(). So issue a matching Deactivate() |
| get_virtual_processor()->Deactivate( this ); |
| } |
| } |
| } |
| } else { |
| __TBB_ASSERT( s==ts_busy, NULL ); |
| } |
| return false; |
| } |
| |
| void omp_server_thread::sleep_perhaps () { |
| if( terminate ) return; |
| thread_state_t s = read_state(); |
| if( s==ts_idle ) { |
| if( my_state.compare_and_swap( ts_asleep, ts_idle )==ts_idle ) { |
| // If a thread is between read_state() and compare_and_swap(), and the master tries to terminate, |
| // the master's compare_and_swap() will fail because the thread's state is ts_idle. |
| // We need to check if terminate is true or not before letting the thread go to sleep oetherwise |
| // we will miss the terminate signal. |
| if( !terminate ) { |
| get_virtual_processor()->Deactivate( this ); |
| __TBB_ASSERT( !is_removed(), "OMP threads should not be deprived of a virtual processor" ); |
| __TBB_ASSERT( read_state()!=ts_asleep, NULL ); |
| } else { |
| if( my_state.compare_and_swap( ts_done, ts_asleep )!=ts_asleep ) |
| // unbind() changed my state. It will call Activate(). So issue a matching Deactivate() |
| get_virtual_processor()->Deactivate( this ); |
| } |
| } |
| } else { |
| __TBB_ASSERT( s==ts_busy, NULL ); |
| } |
| } |
| |
| bool tbb_server_thread::initiate_termination() { |
| if( read_state()==ts_busy ) { |
| int bal = ++the_balance; |
| if( bal>0 ) wakeup_some_tbb_threads(); |
| } |
| return destroy_job( (tbb_connection_v2*) my_conn ); |
| } |
| |
| template<typename Connection> |
| bool server_thread_rep::destroy_job( Connection* c ) { |
| __TBB_ASSERT( my_state!=ts_asleep, NULL ); |
| rml::job* j; |
| if( my_job_automaton.try_plug(j) ) { |
| __TBB_ASSERT( j, NULL ); |
| my_client.cleanup(*j); |
| c->remove_client_ref(); |
| } |
| // Must do remove client reference first, because execution of |
| // c.remove_ref() can cause *this to be destroyed. |
| c->remove_server_ref(); |
| return true; |
| } |
| |
| void thread_map::assist_cleanup( bool assist_null_only ) { |
| // To avoid deadlock, the current thread *must* help out with cleanups that have not started, |
| // becausd the thread that created the job may be busy for a long time. |
| for( iterator i = begin(); i!=end(); ++i ) { |
| rml::job* j=0; |
| server_thread* thr = (*i).second; |
| job_automaton& ja = thr->my_job_automaton; |
| if( assist_null_only ? ja.try_plug_null() : ja.try_plug(j) ) { |
| if( j ) { |
| my_client.cleanup(*j); |
| } else { |
| // server thread did not get a chance to create a job. |
| } |
| remove_client_ref(); |
| } |
| } |
| } |
| |
| void thread_map::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count, tbb_connection_v2& conn, ::tbb::spin_mutex& mtx ) |
| { |
| #if TBB_USE_ASSERT |
| int req_cnt = ++n_add_vp_requests; |
| __TBB_ASSERT( req_cnt==1, NULL ); |
| #endif |
| std::vector<thread_map::iterator> vec(count); |
| std::vector<tbb_server_thread*> tvec(count); |
| iterator end; |
| |
| { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| __TBB_ASSERT( my_map.size()==0||count==1, NULL ); |
| end = my_map.end(); //remember 'end' at the time of 'find' |
| // find entries in the map for those VPs that were previosly added and then removed. |
| for( size_t i=0; i<count; ++i ) { |
| vec[i] = my_map.find( (key_type) vproots[i] ); |
| #if TBB_USE_DEBUG |
| if( vec[i]!=end ) { |
| tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second; |
| IVirtualProcessorRoot* v = t->get_virtual_processor(); |
| __TBB_ASSERT( v==c_remove_prepare||v==c_remove_returned, NULL ); |
| } |
| #endif |
| } |
| |
| iterator nxt = my_map.begin(); |
| for( size_t i=0; i<count; ++i ) { |
| if( vec[i]!=end ) { |
| #if TBB_USE_ASSERT |
| tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second; |
| __TBB_ASSERT( t->read_state()==ts_asleep, NULL ); |
| IVirtualProcessorRoot* r = t->get_virtual_processor(); |
| __TBB_ASSERT( r==c_remove_prepare||r==c_remove_returned, NULL ); |
| #endif |
| continue; |
| } |
| |
| if( my_unrealized_threads>0 ) { |
| --my_unrealized_threads; |
| } else { |
| __TBB_ASSERT( nxt!=end, "nxt should not be thread_map::iterator::end" ); |
| // find a removed thread context for i |
| for( ; nxt!=end; ++nxt ) { |
| tbb_server_thread* t = (tbb_server_thread*) (*nxt).second; |
| if( t->is_removed() && t->read_state()==ts_asleep && t->get_virtual_processor()==c_remove_returned ) { |
| vec[i] = nxt++; |
| break; |
| } |
| } |
| // break target |
| if( vec[i]==end ) // ignore excessive VP. |
| vproots[i] = NULL; |
| } |
| } |
| } |
| |
| for( size_t i=0; i<count; ++i ) { |
| __TBB_ASSERT( !tvec[i], NULL ); |
| if( vec[i]==end ) { |
| if( vproots[i] ) { |
| tvec[i] = my_tbb_allocator.allocate(1); |
| new ( tvec[i] ) tbb_server_thread( false, my_scheduler, (IExecutionResource*)vproots[i], &conn, *this, my_client ); |
| } |
| #if TBB_USE_ASSERT |
| } else { |
| tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second; |
| __TBB_ASSERT( t->GetProxy(), "Proxy is cleared?" ); |
| #endif |
| } |
| } |
| |
| { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| |
| bool closing = is_closing(); |
| |
| for( size_t i=0; i<count; ++i ) { |
| if( vec[i]==end ) { |
| if( vproots[i] ) { |
| thread_map::key_type key = (thread_map::key_type) vproots[i]; |
| vec[i] = insert( key, (server_thread*) tvec[i] ); |
| my_client_ref_count.add_ref(); |
| my_server_ref_count.add_ref(); |
| } |
| } else if( !closing ) { |
| tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second; |
| |
| if( (*vec[i]).first!=(thread_map::key_type)vproots[i] ) { |
| my_map.erase( vec[i] ); |
| thread_map::key_type key = (thread_map::key_type) vproots[i]; |
| __TBB_ASSERT( key, NULL ); |
| vec[i] = insert( key, t ); |
| } |
| __TBB_ASSERT( t->read_state()==ts_asleep, NULL ); |
| // We did not decrement server/client ref count when a thread is removed. |
| // So, don't increment server/client ref count here. |
| } |
| } |
| |
| // we could check is_closing() earlier. That requires marking the newly allocated server_thread objects |
| // that are not inserted into the thread_map, and deallocate them. Doing so seems more cumbersome |
| // than simply adding these to the thread_map and let thread_map's destructor take care of reclamation. |
| __TBB_ASSERT( closing==is_closing(), NULL ); |
| if( closing ) return; |
| } |
| |
| for( size_t i=0; i<count; ++i ) { |
| if( vproots[i] ) { |
| tbb_server_thread* t = (tbb_server_thread*) (*vec[i]).second; |
| __TBB_ASSERT( tvec[i]!=NULL||t->GetProxy(), "Proxy is cleared?" ); |
| if( t->is_removed() ) |
| __TBB_ASSERT( t->get_virtual_processor()==c_remove_returned, NULL ); |
| int cnt = ++t->activation_count; |
| __TBB_ASSERT_EX( cnt==0||cnt==1, NULL ); |
| vproots[i]->Activate( t ); |
| if( t->is_removed() ) |
| t->revive( my_scheduler, vproots[i], my_client ); |
| } |
| } |
| #if TBB_USE_ASSERT |
| req_cnt = --n_add_vp_requests; |
| __TBB_ASSERT( req_cnt==0, NULL ); |
| #endif |
| } |
| |
| void thread_map::remove_virtual_processors( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx ) { |
| if( my_map.size()==0 ) |
| return; |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| |
| if( is_closing() ) return; |
| |
| for( unsigned int c=0; c<count; ++c ) { |
| iterator i = my_map.find( (key_type) vproots[c] ); |
| if( i==my_map.end() ) { |
| thread_scavenger_thread* tst = my_thread_scavenger_thread; |
| if( !tst ) { |
| // Remove unknown vp from my scheduler; |
| vproots[c]->Remove( my_scheduler ); |
| } else { |
| while( (tst=my_thread_scavenger_thread)==c_claimed ) |
| __TBB_Yield(); |
| if( vproots[c]!=tst->get_virtual_processor() ) |
| vproots[c]->Remove( my_scheduler ); |
| } |
| continue; |
| } |
| tbb_server_thread* thr = (tbb_server_thread*) (*i).second; |
| __TBB_ASSERT( thr->tbb_thread, "incorrect type of server_thread" ); |
| thr->set_removed(); |
| if( thr->read_state()==ts_asleep ) { |
| while( thr->activation_count>0 ) { |
| if( thr->get_virtual_processor()<=c_remove_returned ) |
| break; |
| __TBB_Yield(); |
| } |
| if( thr->get_virtual_processor()>c_remove_returned ) { |
| // the thread is in Deactivated state |
| ++thr->activation_count; |
| // wake the thread up so that it Switches Out itself. |
| thr->get_virtual_processor()->Activate( thr ); |
| } // else, it is Switched Out |
| } // else the thread will see that it is removed and proceed to switch itself out without Deactivation |
| } |
| } |
| |
| void thread_map::add_virtual_processors( IVirtualProcessorRoot** vproots, unsigned int count, omp_connection_v2& conn, ::tbb::spin_mutex& mtx ) |
| { |
| std::vector<thread_map::iterator> vec(count); |
| std::vector<server_thread*> tvec(count); |
| iterator end; |
| |
| { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| // read the map |
| end = my_map.end(); //remember 'end' at the time of 'find' |
| for( size_t i=0; i<count; ++i ) |
| vec[i] = my_map.find( (key_type) vproots[i] ); |
| } |
| |
| for( size_t i=0; i<count; ++i ) { |
| __TBB_ASSERT( !tvec[i], NULL ); |
| if( vec[i]==end ) { |
| tvec[i] = my_omp_allocator.allocate(1); |
| new ( tvec[i] ) omp_server_thread( false, my_scheduler, (IExecutionResource*)vproots[i], &conn, *this, my_client ); |
| } |
| } |
| |
| { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| |
| for( size_t i=0; i<count; ++i ) { |
| if( vec[i]==my_map.end() ) { |
| thread_map::key_type key = (thread_map::key_type) vproots[i]; |
| vec[i] = insert( key, tvec[i] ); |
| my_client_ref_count.add_ref(); |
| my_server_ref_count.add_ref(); |
| } |
| } |
| |
| // we could check is_closing() earlier. That requires marking the newly allocated server_thread objects |
| // that are not inserted into the thread_map, and deallocate them. Doing so seems more cumbersome |
| // than simply adding these to the thread_map and let thread_map's destructor take care of reclamation. |
| if( is_closing() ) return; |
| } |
| |
| for( size_t i=0; i<count; ++i ) |
| vproots[i]->Activate( (*vec[i]).second ); |
| |
| { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| for( size_t i=0; i<count; ++i ) |
| original_exec_resources.push_back( vproots[i] ); |
| } |
| } |
| |
| void thread_map::mark_virtual_processors_as_lent( IVirtualProcessorRoot** vproots, unsigned count, ::tbb::spin_mutex& mtx ) { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| |
| if( is_closing() ) return; |
| |
| iterator end = my_map.end(); |
| for( unsigned int c=0; c<count; ++c ) { |
| iterator i = my_map.find( (key_type) vproots[c] ); |
| if( i==end ) { |
| // The vproc has not been added to the map in create_oversubscribers() |
| my_map.insert( hash_map_type::value_type( (key_type) vproots[c], (server_thread*)1 ) ); |
| } else { |
| server_thread* thr = (*i).second; |
| if( ((uintptr_t)thr)&~(uintptr_t)1 ) { |
| __TBB_ASSERT( !thr->is_removed(), "incorrectly removed" ); |
| ((omp_server_thread*)thr)->set_lent(); |
| } |
| } |
| } |
| } |
| |
| void thread_map::create_oversubscribers( unsigned n, std::vector<server_thread*>& thr_vec, omp_connection_v2& conn, ::tbb::spin_mutex& mtx ) { |
| std::vector<IExecutionResource*> curr_exec_rsc; |
| { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| curr_exec_rsc = original_exec_resources; // copy construct |
| } |
| typedef std::vector<IExecutionResource*>::iterator iterator_er; |
| typedef ::std::vector<std::pair<hash_map_type::key_type, hash_map_type::mapped_type> > hash_val_vector_t; |
| hash_val_vector_t v_vec(n); |
| iterator_er begin = curr_exec_rsc.begin(); |
| iterator_er end = curr_exec_rsc.end(); |
| iterator_er i = begin; |
| for( unsigned c=0; c<n; ++c ) { |
| IVirtualProcessorRoot* vpr = my_scheduler_proxy->CreateOversubscriber( *i ); |
| omp_server_thread* t = new ( my_omp_allocator.allocate(1) ) omp_server_thread( true, my_scheduler, (IExecutionResource*)vpr, &conn, *this, my_client ); |
| thr_vec[c] = t; |
| v_vec[c] = hash_map_type::value_type( (key_type) vpr, t ); |
| if( ++i==end ) i = begin; |
| } |
| |
| { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| |
| if( is_closing() ) return; |
| |
| iterator end = my_map.end(); |
| unsigned c = 0; |
| for( hash_val_vector_t::iterator vi=v_vec.begin(); vi!=v_vec.end(); ++vi, ++c ) { |
| iterator i = my_map.find( (key_type) (*vi).first ); |
| if( i==end ) { |
| my_map.insert( *vi ); |
| } else { |
| // the vproc has not been added to the map in mark_virtual_processors_as_returned(); |
| unsigned lent = (unsigned) (*i).second; |
| __TBB_ASSERT( lent<=1, "vproc map entry added incorrectly?"); |
| (*i).second = thr_vec[c]; |
| if( lent ) |
| ((omp_server_thread*)thr_vec[c])->set_lent(); |
| else |
| ((omp_server_thread*)thr_vec[c])->set_returned(); |
| } |
| my_client_ref_count.add_ref(); |
| my_server_ref_count.add_ref(); |
| } |
| } |
| } |
| |
| void thread_map::wakeup_tbb_threads( int c, ::tbb::spin_mutex& mtx ) { |
| std::vector<tbb_server_thread*> vec(c); |
| |
| size_t idx = 0; |
| { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| |
| if( is_closing() ) return; |
| // only one RML thread is in here to wake worker threads up. |
| |
| int bal = the_balance; |
| int cnt = c<bal ? c : bal; |
| |
| if( cnt<=0 ) { return; } |
| |
| for( iterator i=begin(); i!=end(); ++i ) { |
| tbb_server_thread* thr = (tbb_server_thread*) (*i).second; |
| // ConcRT RM should take threads away from TBB scheduler instead of lending them to another scheduler |
| if( thr->is_removed() ) |
| continue; |
| |
| if( --the_balance>=0 ) { |
| thread_grab_t res; |
| while( (res=thr->try_grab_for())!=wk_from_idle ) { |
| if( res==wk_from_asleep ) { |
| vec[idx++] = thr; |
| break; |
| } else { |
| thread_state_t s = thr->read_state(); |
| if( s==ts_busy ) {// failed because already assigned. move on. |
| ++the_balance; |
| goto skip; |
| } |
| } |
| } |
| thread_state_t s = thr->read_state(); |
| __TBB_ASSERT_EX( s==ts_busy, "should have set the state to ts_busy" ); |
| if( --cnt==0 ) |
| break; |
| } else { |
| // overdraft |
| ++the_balance; |
| break; |
| } |
| skip: |
| ; |
| } |
| } |
| |
| for( size_t i=0; i<idx; ++i ) { |
| tbb_server_thread* thr = vec[i]; |
| __TBB_ASSERT( thr, NULL ); |
| thread_state_t s = thr->read_state(); |
| __TBB_ASSERT_EX( s==ts_busy, "should have set the state to ts_busy" ); |
| ++thr->activation_count; |
| thr->get_virtual_processor()->Activate( thr ); |
| } |
| |
| } |
| |
| void thread_map::mark_virtual_processors_as_returned( IVirtualProcessorRoot** vprocs, unsigned int count, tbb::spin_mutex& mtx ) { |
| { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| |
| if( is_closing() ) return; |
| |
| iterator end = my_map.end(); |
| for(unsigned c=0; c<count; ++c ) { |
| iterator i = my_map.find( (key_type) vprocs[c] ); |
| if( i==end ) { |
| // the vproc has not been added to the map in create_oversubscribers() |
| my_map.insert( hash_map_type::value_type( (key_type) vprocs[c], static_cast<server_thread*>(0) ) ); |
| } else { |
| omp_server_thread* thr = (omp_server_thread*) (*i).second; |
| if( ((uintptr_t)thr)&~(uintptr_t)1 ) { |
| __TBB_ASSERT( !thr->is_removed(), "incorrectly removed" ); |
| // we shoud not make any assumption on the initial state of an added vproc. |
| thr->set_returned(); |
| } |
| } |
| } |
| } |
| } |
| |
| |
| void thread_map::unbind( rml::server& /*server*/, tbb::spin_mutex& mtx ) { |
| { |
| tbb::spin_mutex::scoped_lock lck( mtx ); |
| shutdown_in_progress = true; // ignore any callbacks from ConcRT RM |
| |
| // Ask each server_thread to cleanup its job for this server. |
| for( iterator i = begin(); i!=end(); ++i ) { |
| server_thread* t = (*i).second; |
| t->terminate = true; |
| if( t->is_removed() ) { |
| // This is for TBB only as ConcRT RM does not request OMP schedulers to remove virtual processors |
| if( t->read_state()==ts_asleep ) { |
| __TBB_ASSERT( my_thread_scavenger_thread, "this is TBB connection; thread_scavenger_thread must be allocated" ); |
| // thread is on its way to switch_out; see remove_virtual_processors() where |
| // the thread is Activated() to bring it back from 'Deactivated' in sleep_perhaps() |
| // now assume that the thread will go to SwitchOut() |
| #if TBB_USE_ASSERT |
| while( t->get_virtual_processor()>c_remove_returned ) |
| __TBB_Yield(); |
| #endif |
| // A removed thread is supposed to proceed to SwithcOut. |
| // There, we remove client&server references. |
| } |
| } else { |
| if( t->wakeup( ts_done, ts_asleep ) ) { |
| if( t->tbb_thread ) |
| ++((tbb_server_thread*)t)->activation_count; |
| t->get_virtual_processor()->Activate( t ); |
| // We mark in the thread_map such that when termination sequence started, we ignore |
| // all notification from ConcRT RM. |
| } |
| } |
| } |
| } |
| // Remove extra ref to client. |
| remove_client_ref(); |
| |
| if( my_thread_scavenger_thread ) { |
| thread_scavenger_thread* tst; |
| while( (tst=my_thread_scavenger_thread)==c_claimed ) |
| __TBB_Yield(); |
| #if TBB_USE_ASSERT |
| ++my_thread_scavenger_thread->activation_count; |
| #endif |
| tst->get_virtual_processor()->Activate( tst ); |
| } |
| } |
| |
| #if !__RML_REMOVE_VIRTUAL_PROCESSORS_DISABLED |
| void thread_map::allocate_thread_scavenger( IExecutionResource* v ) |
| { |
| if( my_thread_scavenger_thread>c_claimed ) return; |
| thread_scavenger_thread* c = my_thread_scavenger_thread.fetch_and_store((thread_scavenger_thread*)c_claimed); |
| if( c==NULL ) { // successfully claimed |
| add_server_ref(); |
| #if TBB_USE_ASSERT |
| ++n_thread_scavengers_created; |
| #endif |
| __TBB_ASSERT( v, NULL ); |
| IVirtualProcessorRoot* vpr = my_scheduler_proxy->CreateOversubscriber( v ); |
| my_thread_scavenger_thread = c = new ( my_scavenger_allocator.allocate(1) ) thread_scavenger_thread( my_scheduler, vpr, *this ); |
| #if TBB_USE_ASSERT |
| ++c->activation_count; |
| #endif |
| vpr->Activate( c ); |
| } else if( c>c_claimed ) { |
| my_thread_scavenger_thread = c; |
| } |
| } |
| #endif |
| |
| void thread_scavenger_thread::Dispatch( DispatchState* ) |
| { |
| __TBB_ASSERT( my_proxy, NULL ); |
| #if TBB_USE_ASSERT |
| --activation_count; |
| #endif |
| get_virtual_processor()->Deactivate( this ); |
| for( thread_map::iterator i=my_thread_map.begin(); i!=my_thread_map.end(); ++i ) { |
| tbb_server_thread* t = (tbb_server_thread*) (*i).second; |
| if( t->read_state()==ts_asleep && t->is_removed() ) { |
| while( t->get_execution_resource()!=c_remove_returned ) |
| __TBB_Yield(); |
| my_proxy->SwitchTo( t, Blocking ); |
| } |
| } |
| get_virtual_processor()->Remove( my_scheduler ); |
| my_thread_map.remove_server_ref(); |
| // signal to the connection scavenger that i am done with the map. |
| __TBB_ASSERT( activation_count==1, NULL ); |
| set_state( ts_done ); |
| } |
| |
| //! Windows "DllMain" that handles startup and shutdown of dynamic library. |
| extern "C" bool WINAPI DllMain( HINSTANCE /*hinstDLL*/, DWORD fwdReason, LPVOID lpvReserved ) { |
| void assist_cleanup_connections(); |
| if( fwdReason==DLL_PROCESS_DETACH ) { |
| // dll is being unloaded |
| if( !lpvReserved ) // if FreeLibrary has been called |
| assist_cleanup_connections(); |
| } |
| return true; |
| } |
| |
| void free_all_connections( uintptr_t conn_ex ) { |
| while( conn_ex ) { |
| bool is_tbb = (conn_ex&2)>0; |
| //clear extra bits |
| uintptr_t curr_conn = conn_ex & ~(uintptr_t)3; |
| __TBB_ASSERT( curr_conn, NULL ); |
| |
| // Wait for worker threads to return |
| if( is_tbb ) { |
| tbb_connection_v2* tbb_conn = reinterpret_cast<tbb_connection_v2*>(curr_conn); |
| conn_ex = reinterpret_cast<uintptr_t>(tbb_conn->next_conn); |
| while( tbb_conn->my_thread_map.remove_server_ref()>0 ) |
| __TBB_Yield(); |
| delete tbb_conn; |
| } else { |
| omp_connection_v2* omp_conn = reinterpret_cast<omp_connection_v2*>(curr_conn); |
| conn_ex = reinterpret_cast<uintptr_t>(omp_conn->next_conn); |
| while( omp_conn->my_thread_map.remove_server_ref()>0 ) |
| __TBB_Yield(); |
| delete omp_conn; |
| } |
| } |
| } |
| |
| void assist_cleanup_connections() |
| { |
| //signal to connection_scavenger_thread to terminate |
| uintptr_t tail = connections_to_reclaim.tail; |
| while( connections_to_reclaim.tail.compare_and_swap( garbage_connection_queue::plugged, tail )!=tail ) { |
| __TBB_Yield(); |
| tail = connections_to_reclaim.tail; |
| } |
| |
| __TBB_ASSERT( connection_scavenger.state==ts_busy || connection_scavenger.state==ts_asleep, NULL ); |
| // Scavenger thread may be busy freeing connections |
| DWORD thr_exit_code = STILL_ACTIVE; |
| while( connection_scavenger.state==ts_busy ) { |
| if( GetExitCodeThread( connection_scavenger.thr_handle, &thr_exit_code )>0 ) |
| if( thr_exit_code!=STILL_ACTIVE ) |
| break; |
| __TBB_Yield(); |
| thr_exit_code = STILL_ACTIVE; |
| } |
| if( connection_scavenger.state==ts_asleep && thr_exit_code==STILL_ACTIVE ) |
| connection_scavenger.wakeup(); // wake the connection scavenger thread up |
| |
| // it is possible that the connection scavenger thread already exited. Take over its responsibility. |
| if( tail && connections_to_reclaim.tail!=garbage_connection_queue::plugged_acked ) { |
| // atomically claim the head of the list. |
| uintptr_t head = connections_to_reclaim.head.fetch_and_store( garbage_connection_queue::empty ); |
| if( head==garbage_connection_queue::empty ) |
| head = tail; |
| connection_scavenger.process_requests( head ); |
| } |
| __TBB_ASSERT( connections_to_reclaim.tail==garbage_connection_queue::plugged||connections_to_reclaim.tail==garbage_connection_queue::plugged_acked, "someone else added a request after termination has initiated" ); |
| __TBB_ASSERT( the_balance==connection_scavenger.default_concurrency, NULL ); |
| } |
| |
| void connection_scavenger_thread::sleep_perhaps() { |
| uintptr_t tail = connections_to_reclaim.tail; |
| // connections_to_reclaim.tail==garbage_connection_queue::plugged --> terminate, |
| // connections_to_reclaim.tail>garbage_connection_queue::plugged : we got work to do |
| if( tail>=garbage_connection_queue::plugged ) return; |
| __TBB_ASSERT( !tail, NULL ); |
| thread_monitor::cookie c; |
| monitor.prepare_wait(c); |
| if( state.compare_and_swap( ts_asleep, ts_busy )==ts_busy ) { |
| if( connections_to_reclaim.tail!=garbage_connection_queue::plugged ) { |
| monitor.commit_wait(c); |
| // Someone else woke me up. The compare_and_swap further below deals with spurious wakeups. |
| } else { |
| monitor.cancel_wait(); |
| } |
| thread_state_t s = state; |
| if( s==ts_asleep ) // if spurious wakeup. |
| state.compare_and_swap( ts_busy, ts_asleep ); |
| // I woke myself up, either because I cancelled the wait or suffered a spurious wakeup. |
| } else { |
| __TBB_ASSERT( false, "someone else tampered with my state" ); |
| } |
| __TBB_ASSERT( state==ts_busy, "a thread can only put itself to sleep" ); |
| } |
| |
| void connection_scavenger_thread::process_requests( uintptr_t conn_ex ) |
| { |
| __TBB_ASSERT( conn_ex>1, NULL ); |
| __TBB_ASSERT( n_scavenger_threads==1||connections_to_reclaim.tail==garbage_connection_queue::plugged, "more than one connection_scavenger_thread being active?" ); |
| |
| bool done = false; |
| while( !done ) { |
| bool is_tbb = (conn_ex&2)>0; |
| //clear extra bits |
| uintptr_t curr_conn = conn_ex & ~(uintptr_t)3; |
| |
| // no contention. there is only one connection_scavenger_thread!! |
| uintptr_t next_conn; |
| tbb_connection_v2* tbb_conn = NULL; |
| omp_connection_v2* omp_conn = NULL; |
| // Wait for worker threads to return |
| if( is_tbb ) { |
| tbb_conn = reinterpret_cast<tbb_connection_v2*>(curr_conn); |
| next_conn = reinterpret_cast<uintptr_t>(tbb_conn->next_conn); |
| while( tbb_conn->my_thread_map.get_server_ref_count()>1 ) |
| __TBB_Yield(); |
| } else { |
| omp_conn = reinterpret_cast<omp_connection_v2*>(curr_conn); |
| next_conn = reinterpret_cast<uintptr_t>(omp_conn->next_conn); |
| while( omp_conn->my_thread_map.get_server_ref_count()>1 ) |
| __TBB_Yield(); |
| } |
| |
| //someone else may try to write into this connection object. |
| //So access next_conn field first before remove the extra server ref count. |
| |
| if( next_conn==0 ) { |
| uintptr_t tail = connections_to_reclaim.tail; |
| if( tail==garbage_connection_queue::plugged ) { |
| tail = garbage_connection_queue::plugged_acked; // connection scavenger saw the flag, and it freed all connections. |
| done = true; |
| } else if( tail==conn_ex ) { |
| if( connections_to_reclaim.tail.compare_and_swap( garbage_connection_queue::empty, tail )==tail ) { |
| __TBB_ASSERT( !connections_to_reclaim.head, NULL ); |
| done = true; |
| } |
| } |
| |
| if( !done ) { |
| // A new connection to close is added to connections_to_reclaim.tail; |
| // Wait for curr_conn->next_conn to be set. |
| if( is_tbb ) { |
| while( !tbb_conn->next_conn ) |
| __TBB_Yield(); |
| conn_ex = reinterpret_cast<uintptr_t>(tbb_conn->next_conn); |
| } else { |
| while( !omp_conn->next_conn ) |
| __TBB_Yield(); |
| conn_ex = reinterpret_cast<uintptr_t>(omp_conn->next_conn); |
| } |
| } |
| } else { |
| conn_ex = next_conn; |
| } |
| __TBB_ASSERT( conn_ex, NULL ); |
| if( is_tbb ) |
| // remove extra srever ref count; this will trigger Shutdown/Release of ConcRT RM |
| tbb_conn->remove_server_ref(); |
| else |
| // remove extra srever ref count; this will trigger Shutdown/Release of ConcRT RM |
| omp_conn->remove_server_ref(); |
| } |
| } |
| |
| __RML_DECL_THREAD_ROUTINE connection_scavenger_thread::thread_routine( void* arg ) { |
| connection_scavenger_thread* thr = (connection_scavenger_thread*) arg; |
| thr->state = ts_busy; |
| thr->thr_handle = GetCurrentThread(); |
| #if TBB_USE_ASSERT |
| ++thr->n_scavenger_threads; |
| #endif |
| for(;;) { |
| __TBB_Yield(); |
| thr->sleep_perhaps(); |
| if( connections_to_reclaim.tail==garbage_connection_queue::plugged || connections_to_reclaim.tail==garbage_connection_queue::plugged_acked ) { |
| thr->state = ts_asleep; |
| return 0; |
| } |
| |
| __TBB_ASSERT( connections_to_reclaim.tail!=garbage_connection_queue::plugged_acked, NULL ); |
| __TBB_ASSERT( connections_to_reclaim.tail>garbage_connection_queue::plugged && (connections_to_reclaim.tail&garbage_connection_queue::plugged)==0 , NULL ); |
| while( connections_to_reclaim.head==garbage_connection_queue::empty ) |
| __TBB_Yield(); |
| uintptr_t head = connections_to_reclaim.head.fetch_and_store( garbage_connection_queue::empty ); |
| thr->process_requests( head ); |
| wakeup_some_tbb_threads(); |
| } |
| } |
| |
| template<typename Server, typename Client> |
| void connection_scavenger_thread::add_request( generic_connection<Server,Client>* conn_to_close ) |
| { |
| uintptr_t conn_ex = (uintptr_t)conn_to_close | (connection_traits<Server,Client>::is_tbb<<1); |
| __TBB_ASSERT( !conn_to_close->next_conn, NULL ); |
| uintptr_t old_tail_ex = connections_to_reclaim.tail; |
| __TBB_ASSERT( old_tail_ex==0||old_tail_ex>garbage_connection_queue::plugged_acked, "Unloading DLL called while this connection is being closed?" ); |
| tbb::internal::atomic_backoff backoff; |
| while( connections_to_reclaim.tail.compare_and_swap( conn_ex, old_tail_ex )!=old_tail_ex ) { |
| backoff.pause(); |
| old_tail_ex = connections_to_reclaim.tail; |
| } |
| |
| if( old_tail_ex==garbage_connection_queue::empty ) |
| connections_to_reclaim.head = conn_ex; |
| else { |
| bool is_tbb = (old_tail_ex&2)>0; |
| uintptr_t old_tail = old_tail_ex & ~(uintptr_t)3; |
| if( is_tbb ) |
| reinterpret_cast<tbb_connection_v2*>(old_tail)->next_conn = reinterpret_cast<tbb_connection_v2*>(conn_ex); |
| else |
| reinterpret_cast<omp_connection_v2*>(old_tail)->next_conn = reinterpret_cast<omp_connection_v2*>(conn_ex); |
| } |
| |
| if( state==ts_asleep ) |
| wakeup(); |
| } |
| |
| template<> |
| uintptr_t connection_scavenger_thread::grab_and_prepend( generic_connection<tbb_server,tbb_client>* /*last_conn_to_close*/ ) { return 0;} |
| |
| template<> |
| uintptr_t connection_scavenger_thread::grab_and_prepend( generic_connection<omp_server,omp_client>* last_conn_to_close ) |
| { |
| uintptr_t conn_ex = (uintptr_t)last_conn_to_close; |
| uintptr_t head = connections_to_reclaim.head.fetch_and_store( garbage_connection_queue::empty ); |
| reinterpret_cast<omp_connection_v2*>(last_conn_to_close)->next_conn = reinterpret_cast<omp_connection_v2*>(head); |
| return conn_ex; |
| } |
| |
| extern "C" ULONGLONG NTAPI VerSetConditionMask( ULONGLONG, DWORD, BYTE); |
| |
| bool is_windows7_or_later () |
| { |
| try { |
| return GetOSVersion()>=IResourceManager::Win7OrLater; |
| } catch( ... ) { |
| return false; |
| } |
| } |
| |
| #endif /* RML_USE_WCRM */ |
| |
| template<typename Connection, typename Server, typename Client> |
| static factory::status_type connect( factory& f, Server*& server, Client& client ) { |
| server = new Connection(*static_cast<wait_counter*>(f.scratch_ptr),client); |
| return factory::st_success; |
| } |
| |
| extern "C" factory::status_type __RML_open_factory( factory& f, version_type& server_version, version_type client_version ) { |
| // Hack to keep this library from being closed by causing the first client's dlopen to not have a corresponding dlclose. |
| // This code will be removed once we figure out how to do shutdown of the RML perfectly. |
| static tbb::atomic<bool> one_time_flag; |
| if( one_time_flag.compare_and_swap(true,false)==false) { |
| __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, NULL ); |
| #if _WIN32||_WIN64 |
| f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload); |
| #else |
| f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload); |
| #endif |
| } |
| // End of hack |
| |
| // Initialize the_balance only once |
| if( the_balance_inited!=2 ) { |
| if( the_balance_inited.compare_and_swap( 1, 0 )==0 ) { |
| the_balance = hardware_concurrency()-1; |
| the_balance_inited = 2; |
| #if RML_USE_WCRM |
| connection_scavenger.launch( the_balance ); |
| #endif |
| } else { |
| tbb::internal::spin_wait_until_eq( the_balance_inited, 2 ); |
| } |
| } |
| |
| server_version = SERVER_VERSION; |
| f.scratch_ptr = 0; |
| if( client_version==0 ) { |
| return factory::st_incompatible; |
| #if RML_USE_WCRM |
| } else if ( !is_windows7_or_later() ) { |
| #if TBB_USE_DEBUG |
| fprintf(stderr, "This version of the RML library requires Windows 7 to run on.\nConnection request denied.\n"); |
| #endif |
| return factory::st_incompatible; |
| #endif |
| } else { |
| #if TBB_USE_DEBUG |
| if( client_version<EARLIEST_COMPATIBLE_CLIENT_VERSION ) |
| fprintf(stderr, "This client library is too old for the current RML server.\nThe connection request is granted but oversubscription/undersubscription may occur.\n"); |
| #endif |
| f.scratch_ptr = new wait_counter; |
| return factory::st_success; |
| } |
| } |
| |
| extern "C" void __RML_close_factory( factory& f ) { |
| if( wait_counter* fc = static_cast<wait_counter*>(f.scratch_ptr) ) { |
| f.scratch_ptr = 0; |
| fc->wait(); |
| size_t bal = the_balance; |
| f.scratch_ptr = (void*)bal; |
| delete fc; |
| } |
| } |
| |
| void call_with_build_date_str( ::rml::server_info_callback_t cb, void* arg ); |
| |
| }} // rml::internal |
| |
| namespace tbb { |
| namespace internal { |
| namespace rml { |
| |
| extern "C" tbb_factory::status_type __TBB_make_rml_server( tbb_factory& f, tbb_server*& server, tbb_client& client ) { |
| return ::rml::internal::connect< ::rml::internal::tbb_connection_v2>(f,server,client); |
| } |
| |
| extern "C" void __TBB_call_with_my_server_info( ::rml::server_info_callback_t cb, void* arg ) { |
| return ::rml::internal::call_with_build_date_str( cb, arg ); |
| } |
| |
| }}} |
| |
| namespace __kmp { |
| namespace rml { |
| |
| extern "C" omp_factory::status_type __KMP_make_rml_server( omp_factory& f, omp_server*& server, omp_client& client ) { |
| return ::rml::internal::connect< ::rml::internal::omp_connection_v2>(f,server,client); |
| } |
| |
| extern "C" void __KMP_call_with_my_server_info( ::rml::server_info_callback_t cb, void* arg ) { |
| return ::rml::internal::call_with_build_date_str( cb, arg ); |
| } |
| |
| }} |
| |
| /* |
| * RML server info |
| */ |
| #include "version_string.tmp" |
| |
| #ifndef __TBB_VERSION_STRINGS |
| #pragma message("Warning: version_string.tmp isn't generated properly by version_info.sh script!") |
| #endif |
| |
| // We use the build time as the RML server info. TBB is required to build RML, so we make it the same as the TBB build time. |
| #ifndef __TBB_DATETIME |
| #define __TBB_DATETIME __DATE__ " " __TIME__ |
| #endif |
| |
| #if !RML_USE_WCRM |
| #define RML_SERVER_BUILD_TIME "Intel(R) RML library built: " __TBB_DATETIME |
| #define RML_SERVER_VERSION_ST "Intel(R) RML library version: v" TOSTRING(SERVER_VERSION) |
| #else |
| #define RML_SERVER_BUILD_TIME "Intel(R) RML library built: " __TBB_DATETIME |
| #define RML_SERVER_VERSION_ST "Intel(R) RML library version: v" TOSTRING(SERVER_VERSION) " on ConcRT RM with " RML_THREAD_KIND_STRING |
| #endif |
| |
| namespace rml { |
| namespace internal { |
| |
| void call_with_build_date_str( ::rml::server_info_callback_t cb, void* arg ) |
| { |
| (*cb)( arg, RML_SERVER_BUILD_TIME ); |
| (*cb)( arg, RML_SERVER_VERSION_ST ); |
| } |
| }} // rml::internal |