| /* |
| Copyright 2005-2010 Intel Corporation. All Rights Reserved. |
| |
| This file is part of Threading Building Blocks. |
| |
| Threading Building Blocks is free software; you can redistribute it |
| and/or modify it under the terms of the GNU General Public License |
| version 2 as published by the Free Software Foundation. |
| |
| Threading Building Blocks is distributed in the hope that it will be |
| useful, but WITHOUT ANY WARRANTY; without even the implied warranty |
| of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| GNU General Public License for more details. |
| |
| You should have received a copy of the GNU General Public License |
| along with Threading Building Blocks; if not, write to the Free Software |
| Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
| |
| As a special exception, you may use this file as part of a free software |
| library without restriction. Specifically, if other files instantiate |
| templates or use macros or inline functions from this file, or you compile |
| this file and link it with other files to produce an executable, this |
| file does not by itself cause the resulting executable to be covered by |
| the GNU General Public License. This exception does not however |
| invalidate any other reasons why the executable file might be covered by |
| the GNU General Public License. |
| */ |
| |
| /* This header contains code shared by test_omp_server.cpp and test_tbb_server.cpp |
| There is no ifndef guard - test is supposed to include this file exactly once. |
| The test is also exected to have #include of rml_omp.h or rml_tbb.h before |
| including this header. |
| |
| This header should not use any parts of TBB that require linking in the TBB run-time. |
| It uses a few instances of tbb::atomic<T>, all of which are completely inlined. */ |
| |
| #include "tbb/atomic.h" |
| #include "tbb/tbb_thread.h" |
| #include "harness.h" |
| #include "harness_memory.h" |
| #include "harness_concurrency_tracker.h" |
| |
| |
| //! Define TRIVIAL as 1 to test only a single client, no nesting, no extra threads. |
| #define TRIVIAL 0 |
| |
| //! Maximum number of clients |
| #if TRIVIAL |
| const size_t MaxClient = 1; |
| #else |
| const size_t MaxClient = 4; |
| #endif |
| |
| const size_t ClientStackSize[MaxClient] = { |
| 1000000 |
| #if !TRIVIAL |
| ,2000000 |
| ,1000000 |
| ,4000000 |
| #endif /* TRIVIAL */ |
| }; |
| |
| const size_t OverheadStackSize = 500000; |
| |
| const size_t JobArraySize = 1000; |
| |
| static bool TestSingleConnection; |
| |
| static size_t N_TestConnections; |
| |
| #if _WIN32||_WIN64 |
| #include <Windows.h> /* Need Sleep */ |
| #else |
| #include <unistd.h> /* Need usleep */ |
| #endif |
| |
| void MilliSleep( unsigned milliseconds ) { |
| #if _WIN32||_WIN64 |
| Sleep( milliseconds ); |
| #else |
| usleep( milliseconds*1000 ); |
| #endif /* _WIN32||_WIN64 */ |
| } |
| |
| class MyJob: public ::rml::job { |
| public: |
| //! Enumeration for tracking states of a job. |
| enum state_t { |
| //! Job has not yet been allocated. |
| unallocated, |
| //! Is idle. |
| idle, |
| //! Has a thread working on it. |
| busy, |
| //! After call to client::cleanup |
| clean |
| }; |
| tbb::atomic<int> state; |
| tbb::atomic<int> processing_count; |
| void update( state_t new_state, state_t old_state ) { |
| int o = state.compare_and_swap(new_state,old_state); |
| ASSERT( o==old_state, "illegal transition" ); |
| } |
| void update_from_either( state_t new_state, state_t old_state1, state_t old_state2 ) { |
| int snapshot; |
| do { |
| snapshot = state; |
| ASSERT( snapshot==old_state1||snapshot==old_state2, "illegal transition" ); |
| } while( state.compare_and_swap(new_state,snapshot)!=snapshot ); |
| } |
| MyJob() { |
| state=unallocated; |
| processing_count=0; |
| } |
| ~MyJob() { |
| // Overwrite so that accidental use after destruction can be detected. |
| memset(this,-1,sizeof(*this)); |
| } |
| }; |
| |
| static tbb::atomic<int> ClientConstructions; |
| static tbb::atomic<int> ClientDestructions; |
| |
| struct Nesting { |
| int level; |
| int limit; |
| Nesting() : level(0), limit(0) {} |
| Nesting( int level_, int limit_ ) : level(level_), limit(limit_) {} |
| }; |
| |
| template<typename Client> |
| class ClientBase: public Client { |
| protected: |
| typedef typename Client::size_type size_type; |
| typedef typename Client::version_type version_type; |
| typedef typename Client::policy_type policy_type; |
| typedef typename Client::job job; |
| private: |
| size_type my_max_job_count; |
| size_t my_stack_size; |
| tbb::atomic<size_t> next_job_index; |
| int my_client_id; |
| rml::server* my_server; |
| |
| public: |
| enum state_t { |
| //! Treat *this as constructed. |
| live=0x1, |
| //! Treat *this as destroyed. |
| destroyed=0xDEAD |
| }; |
| |
| tbb::atomic<int> state; |
| void update( state_t new_state, state_t old_state ) { |
| int o = state.compare_and_swap(new_state,old_state); |
| ASSERT( o==old_state, NULL ); |
| } |
| |
| tbb::atomic<bool> expect_close_connection; |
| |
| MyJob *job_array; |
| |
| /*override*/version_type version() const { |
| ASSERT( state==live, NULL ); |
| return 1; |
| } |
| |
| /*override*/size_type max_job_count() const { |
| ASSERT( state==live, NULL ); |
| return my_max_job_count; |
| } |
| |
| /*override*/size_t min_stack_size() const { |
| ASSERT( state==live, NULL ); |
| return my_stack_size; |
| } |
| |
| /*override*/policy_type policy() const {return Client::throughput;} |
| |
| /*override*/void acknowledge_close_connection() { |
| ASSERT( expect_close_connection, NULL ); |
| for( size_t k=next_job_index; k>0; ) { |
| --k; |
| ASSERT( job_array[k].state==MyJob::clean, NULL ); |
| } |
| delete[] job_array; |
| job_array = NULL; |
| ASSERT( my_server, NULL ); |
| update( destroyed, live ); |
| delete this; |
| } |
| |
| /*override*/void cleanup( job& j_ ) { |
| REMARK("client %d: cleanup(%p) called\n",client_id(),&j_); |
| ASSERT( state==live, NULL ); |
| MyJob& j = static_cast<MyJob&>(j_); |
| while( j.state==MyJob::busy ) |
| my_server->yield(); |
| j.update(MyJob::clean,MyJob::idle); |
| REMARK("client %d: cleanup(%p) returns\n",client_id(),&j_); |
| } |
| |
| job* create_one_job(); |
| |
| protected: |
| void do_process( job& j_ ) { |
| ASSERT( state==live, NULL ); |
| MyJob& j = static_cast<MyJob&>(j_); |
| ASSERT( &j, NULL ); |
| j.update(MyJob::busy,MyJob::idle); |
| // use of the plain addition (not the atomic increment) is intentonial |
| j.processing_count = j.processing_count + 1; |
| ASSERT( my_stack_size>OverheadStackSize, NULL ); |
| #ifdef __ia64__ |
| // Half of the stack is reserved for RSE, so test only remaining half. |
| UseStackSpace( (my_stack_size-OverheadStackSize)/2 ); |
| #else |
| // XNMetaScheduler does not expose API for changing stack size. |
| UseStackSpace( my_stack_size-OverheadStackSize ); |
| #endif |
| j.update(MyJob::idle,MyJob::busy); |
| my_server->yield(); |
| } |
| public: |
| ClientBase() : my_server(NULL) { |
| my_client_id = ClientConstructions++; |
| next_job_index = 0; |
| } |
| int client_id() const {return my_client_id;} |
| |
| Nesting nesting; |
| |
| void initialize( size_type max_job_count, Nesting nesting_, size_t stack_size ) { |
| ASSERT( stack_size>0, NULL ); |
| my_max_job_count = max_job_count; |
| nesting = nesting_; |
| my_stack_size = stack_size; |
| job_array = new MyJob[JobArraySize]; |
| expect_close_connection = false; |
| state = live; |
| } |
| |
| void set_server( rml::server* s ) {my_server=s;} |
| |
| unsigned default_concurrency() const { ASSERT( my_server, NULL); return my_server->default_concurrency(); } |
| |
| virtual ~ClientBase() { |
| ASSERT( state==destroyed, NULL ); |
| ++ClientDestructions; |
| } |
| }; |
| |
| template<typename Client> |
| typename Client::job* ClientBase<Client>::create_one_job() { |
| REMARK("client %d: create_one_job() called\n",client_id()); |
| size_t k = next_job_index++; |
| ASSERT( state==live, NULL ); |
| // Following assertion depends on assumption that implementation does not destroy jobs until |
| // the connection is closed. If the implementation is changed to destroy jobs sooner, the |
| // test logic in this header will have to be reworked. |
| ASSERT( k<my_max_job_count, "RML allocated more than max_job_count jobs simultaneously" ); |
| ASSERT( k<JobArraySize, "JobArraySize not big enough (problem is in test, not RML)" ); |
| MyJob& j = job_array[k]; |
| j.update(MyJob::idle,MyJob::unallocated); |
| REMARK("client %d: create_one_job() for k=%d returns %p\n",client_id(),int(k),&j); |
| return &j; |
| } |
| |
| struct warning_tracker { |
| tbb::atomic<int> n_more_than_available; |
| tbb::atomic<int> n_too_many_threads; |
| tbb::atomic<int> n_system_overload; |
| warning_tracker() { |
| n_more_than_available = 0; |
| n_too_many_threads = 0; |
| n_system_overload = 0; |
| } |
| bool all_set() { return n_more_than_available>0 && n_too_many_threads>0 && n_system_overload>0; } |
| } tracker; |
| |
| class Checker { |
| public: |
| int default_concurrency; |
| void check_number_of_threads_delivered( int n_delivered, int n_requested, int n_extra ) const; |
| Checker( rml::server& server ) : default_concurrency(int(server.default_concurrency())) {} |
| }; |
| |
| void Checker::check_number_of_threads_delivered( int n_delivered, int n_requested, int n_extra ) const { |
| ASSERT( default_concurrency>=0, NULL ); |
| if( tracker.all_set() ) return; |
| // Check that number of threads delivered is reasonable. |
| int n_avail = default_concurrency; |
| if( n_extra>0 ) |
| n_avail-=n_extra; |
| if( n_avail<0 ) |
| n_avail=0; |
| if( n_requested>default_concurrency ) |
| n_avail += n_requested-default_concurrency; |
| int n_expected = n_requested; |
| if( n_expected>n_avail ) |
| n_expected=n_avail; |
| const char* msg = NULL; |
| if( n_delivered>n_avail ) { |
| if( ++tracker.n_more_than_available>1 ) |
| return; |
| msg = "server delivered more threads than were theoretically available"; |
| } else if( n_delivered>n_expected ) { |
| if( ++tracker.n_too_many_threads>1 ) |
| return; |
| msg = "server delivered more threads than expected"; |
| } else if( n_delivered<n_expected ) { |
| if( ++tracker.n_system_overload>1 ) |
| return; |
| msg = "server delivered fewer threads than ideal; or, the system is overloaded?"; |
| } |
| if( msg ) { |
| REPORT("Warning: %s (n_delivered=%d n_avail=%d n_requested=%d n_extra=%d default_concurrency=%d)\n", |
| msg, n_delivered, n_avail, n_requested, n_extra, default_concurrency ); |
| } |
| } |
| |
| template<typename Factory,typename Client> |
| class DoOneConnection: NoAssign { |
| //! Number of threads to request |
| const int n_thread; |
| //! Nesting |
| const Nesting nesting; |
| //! Number of extra threads to pretend having outside the RML |
| const int n_extra; |
| //! If true, check number of threads actually delivered. |
| const bool check_delivered; |
| public: |
| DoOneConnection( int n_thread_, Nesting nesting_, int n_extra_, bool check_delivered_ ) : |
| n_thread(n_thread_), |
| nesting(nesting_), |
| n_extra(n_extra_), |
| check_delivered(check_delivered_) |
| { |
| } |
| |
| //! Test ith connection |
| void operator()( size_t i ) const; |
| }; |
| |
| template<typename Factory,typename Client> |
| void DoOneConnection<Factory,Client>::operator()( size_t i ) const { |
| ASSERT( i<MaxClient, NULL ); |
| Client* client = new Client; |
| client->initialize( Client::is_omp ? JobArraySize : n_thread, nesting, ClientStackSize[i] ); |
| Factory factory; |
| memset( &factory, 0, sizeof(factory) ); |
| typename Factory::status_type status = factory.open(); |
| ASSERT( status==Factory::st_success, NULL ); |
| |
| typename Factory::server_type* server; |
| status = factory.make_server( server, *client ); |
| ASSERT( status==Factory::st_success, NULL ); |
| Harness::ConcurrencyTracker ct; |
| REMARK("client %d: opened server n_thread=%d nesting=(%d,%d)\n", |
| client->client_id(), n_thread, nesting.level, nesting.limit); |
| client->set_server( server ); |
| Checker checker( *server ); |
| |
| FireUpJobs( *server, *client, n_thread, n_extra, check_delivered && !client->is_strict() ? &checker : NULL ); |
| |
| // Close the connection |
| client->expect_close_connection = true; |
| REMARK("client %d: calling request_close_connection\n", client->client_id()); |
| #if !RML_USE_WCRM |
| int default_concurrency = server->default_concurrency(); |
| #endif |
| server->request_close_connection(); |
| // Client deletes itself when it sees call to acknowledge_close_connection from server. |
| factory.close(); |
| #if !RML_USE_WCRM |
| if( TestSingleConnection ) |
| __TBB_ASSERT_EX( uintptr_t(factory.scratch_ptr)==uintptr_t(default_concurrency), "under/over subscription?" ); |
| #endif |
| } |
| |
| //! Test with n_threads threads and n_client clients. |
| template<typename Factory, typename Client> |
| void SimpleTest() { |
| Harness::ConcurrencyTracker::Reset(); |
| TestSingleConnection = true; |
| N_TestConnections = 1; |
| for( int n_thread=MinThread; n_thread<=MaxThread; ++n_thread ) { |
| // Test a single connection, no nesting, no extra threads |
| DoOneConnection<Factory,Client> doc(n_thread,Nesting(0,0),0,false); |
| doc(0); |
| } |
| #if !TRIVIAL |
| TestSingleConnection = false; |
| for( int n_thread=MinThread; n_thread<=MaxThread; ++n_thread ) { |
| // Test parallel connections |
| for( int n_client=1; n_client<=int(MaxClient); ++n_client ) { |
| N_TestConnections = n_client; |
| REMARK("SimpleTest: n_thread=%d n_client=%d\n",n_thread,n_client); |
| NativeParallelFor( n_client, DoOneConnection<Factory,Client>(n_thread,Nesting(0,0),0,false) ); |
| } |
| // Test server::independent_thread_number_changed |
| N_TestConnections = 1; |
| for( int n_extra=-4; n_extra<=32; n_extra=n_extra+1+n_extra/5 ) { |
| DoOneConnection<Factory,Client> doc(n_thread,Nesting(0,0),n_extra,true); |
| doc(0); |
| } |
| #if !RML_USE_WCRM |
| // Test nested connections |
| DoOneConnection<Factory,Client> doc(n_thread,Nesting(0,2),0,false); |
| doc(0); |
| #endif |
| } |
| ASSERT( Harness::ConcurrencyTracker::PeakParallelism()>1, "No multiple connections exercised?" ); |
| #endif /* !TRIVIAL */ |
| // Let RML catch up. |
| while( ClientConstructions!=ClientDestructions ) |
| MilliSleep(1); |
| } |
| |
| static void check_server_info( void* arg, const char* server_info ) |
| { |
| ASSERT( strstr(server_info, (char*)arg), NULL ); |
| } |
| |
| template<typename Factory, typename Client> |
| void VerifyInitialization( int n_thread ) { |
| Client* client = new Client; |
| client->initialize( Client::is_omp ? JobArraySize : n_thread, Nesting(), ClientStackSize[0] ); |
| Factory factory; |
| memset( &factory, 0, sizeof(factory) ); |
| typename Factory::status_type status = factory.open(); |
| ASSERT( status!=Factory::st_not_found, "could not find RML library" ); |
| ASSERT( status!=Factory::st_incompatible, NULL ); |
| ASSERT( status==Factory::st_success, NULL ); |
| factory.call_with_server_info( check_server_info, (void*)"Intel(R) RML library" ); |
| typename Factory::server_type* server; |
| status = factory.make_server( server, *client ); |
| ASSERT( status!=Factory::st_incompatible, NULL ); |
| ASSERT( status!=Factory::st_not_found, NULL ); |
| ASSERT( status==Factory::st_success, NULL ); |
| REMARK("client %d: opened server n_thread=%d nesting=(%d,%d)\n", |
| client->client_id(), n_thread, 0, 0); |
| ASSERT( server, NULL ); |
| client->set_server( server ); |
| |
| DoClientSpecificVerification( *server, n_thread ); |
| |
| // Close the connection |
| client->expect_close_connection = true; |
| REMARK("client %d: calling request_close_connection\n", client->client_id()); |
| server->request_close_connection(); |
| // Client deletes itself when it sees call to acknowledge_close_connection from server. |
| factory.close(); |
| } |