#include "tbb/compat/condition_variable"
#include "tbb/mutex.h"
#include "tbb/recursive_mutex.h"
#include "tbb/tick_count.h"
#include "tbb/atomic.h"
#include "harness.h"
// This test deliberately avoids a "using tbb" statement,
// so that the error of putting types in the wrong namespace will be caught.
using namespace std;
template<typename M>
struct Counter {
typedef M mutex_type;
M mutex;
volatile long value;
void flog_once_lock_guard( size_t mode );
void flog_once_unique_lock( size_t mode );
template<typename M>
void Counter<M>::flog_once_lock_guard(size_t mode)
/** Increments counter once for each iteration in the iteration space. */
if( mode&1 ) {
// Try acquire and release with implicit lock_guard
// precondition: if mutex_type is not a recursive mutex, the calling thread does not own the mutex m.
// if the prcondition is not met, either dead-lock incorrect 'value' would result in.
lock_guard<M> lg(mutex);
value = value+1;
} else {
// Try acquire and release with adopt lock_quard
// precodition: the calling thread owns the mutex m.
// if the prcondition is not met, incorrect 'value' would result in because the thread unlocks
// mutex that it does not own.
lock_guard<M> lg( mutex, adopt_lock );
value = value+1;
template<typename M>
void Counter<M>::flog_once_unique_lock(size_t mode)
/** Increments counter once for each iteration in the iteration space. */
switch( mode&7 ) {
case 0:
{// implicitly acquire and release mutex with unique_lock
unique_lock<M> ul( mutex );
value = value+1;
ASSERT( ul==true, NULL );
case 1:
{// unique_lock with defer_lock
unique_lock<M> ul( mutex, defer_lock );
ASSERT( ul.owns_lock()==false, NULL );
value = value+1;
ASSERT( ul.owns_lock()==true, NULL );
case 2:
{// unique_lock::try_lock() with try_to_lock
unique_lock<M> ul( mutex, try_to_lock );
if( !ul )
while( !ul.try_lock() )
value = value+1;
case 3:
{// unique_lock::try_lock_for() with try_to_lock
unique_lock<M> ul( mutex, defer_lock );
tbb::tick_count::interval_t i(1.0);
while( !ul.try_lock_for( i ) )
value = value+1;
ASSERT( ul.owns_lock()==true, NULL );
case 4:
unique_lock<M> ul_o4;
{// unique_lock with adopt_lock
unique_lock<M> ul( mutex, adopt_lock );
value = value+1;
ASSERT( ul.owns_lock()==true, NULL );
ASSERT( ul.mutex()==&mutex, NULL );
ASSERT( ul_o4.owns_lock()==false, NULL );
ASSERT( ul_o4.mutex()==NULL, NULL );
swap( ul, ul_o4 );
ASSERT( ul.owns_lock()==false, NULL );
ASSERT( ul.mutex()==NULL, NULL );
ASSERT( ul_o4.owns_lock()==true, NULL );
ASSERT( ul_o4.mutex()==&mutex, NULL );
ASSERT( ul_o4.owns_lock()==false, NULL );
case 5:
unique_lock<M> ul_o5;
{// unique_lock with adopt_lock
unique_lock<M> ul( mutex, adopt_lock );
value = value+1;
ASSERT( ul.owns_lock()==true, NULL );
ASSERT( ul.mutex()==&mutex, NULL );
ASSERT( ul_o5.owns_lock()==false, NULL );
ASSERT( ul_o5.mutex()==NULL, NULL );
ul_o5.swap( ul );
ASSERT( ul.owns_lock()==false, NULL );
ASSERT( ul.mutex()==NULL, NULL );
ASSERT( ul_o5.owns_lock()==true, NULL );
ASSERT( ul_o5.mutex()==&mutex, NULL );
ASSERT( ul_o5.owns_lock()==false, NULL );
{// unique_lock with adopt_lock, and release()
unique_lock<M> ul( mutex, adopt_lock );
ASSERT( ul==true, NULL );
value = value+1;
M* old_m = ul.release();
ASSERT( ul.owns_lock()==false, NULL );
static tbb::atomic<size_t> Order;
template<typename State, long TestSize>
struct WorkForLocks: NoAssign {
static const size_t chunk = 100;
State& state;
WorkForLocks( State& state_ ) : state(state_) {}
void operator()( int ) const {
size_t step;
while( (step=Order.fetch_and_add<tbb::acquire>(chunk))<TestSize ) {
for( size_t i=0; i<chunk && step<TestSize; ++i, ++step ) {
template<typename M>
void TestLocks( const char* name, int nthread ) {
REMARK("testing %s in TestLocks\n",name);
Counter<M> counter;
counter.value = 0;
Order = 0;
const long test_size = 100000;
NativeParallelFor( nthread, WorkForLocks<Counter<M>, test_size>(counter) );
if( counter.value!=2*test_size )
REPORT("ERROR for %s in TestLocks: counter.value=%ld != 2 * %ld=test_size\n",name,counter.value,test_size);
static tbb::atomic<int> barrier;
// Test if the constructor works and if native_handle() works
template<typename M>
struct WorkForCondVarCtor: NoAssign {
condition_variable& my_cv;
M& my_mtx;
WorkForCondVarCtor( condition_variable& cv_, M& mtx_ ) : my_cv(cv_), my_mtx(mtx_) {}
void operator()( int tid ) const {
ASSERT( tid<=1, NULL ); // test with 2 threads.
condition_variable::native_handle_type handle = my_cv.native_handle();
if( tid&1 ) {
#if _WIN32||_WIN64
if( !tbb::interface5::internal::internal_condition_variable_wait( *handle, &my_mtx ) ) {
int ec = GetLastError();
throw_exception( tbb::internal::eid_condvar_wait_failed );
if( pthread_cond_wait( handle, my_mtx.native_handle() ) )
throw_exception( tbb::internal::eid_condvar_wait_failed );
} else {
bool res;
while( (res=my_mtx.try_lock())==true && barrier==0 ) {
if( res ) my_mtx.unlock();
do {
#if _WIN32||_WIN64
tbb::interface5::internal::internal_condition_variable_notify_one( *handle );
pthread_cond_signal( handle );
} while ( barrier<2 );
static condition_variable* test_cv;
static tbb::atomic<int> n_waiters;
// Test if the destructor works
template<typename M>
struct WorkForCondVarDtor: NoAssign {
int nthread;
M& my_mtx;
WorkForCondVarDtor( int n, M& mtx_ ) : nthread(n), my_mtx(mtx_) {}
void operator()( int tid ) const {
if( tid==0 ) {
unique_lock<M> ul( my_mtx, defer_lock );
test_cv = new condition_variable;
while( n_waiters<nthread-1 )
while( n_waiters>0 )
delete test_cv;
} else {
while( test_cv==NULL )
unique_lock<M> ul(my_mtx);
test_cv->wait( ul );
static const int max_ticket = 100;
static const int short_delay = 10;
static const int long_delay = 100;
tbb::atomic<int> n_signaled;
tbb::atomic<int> n_done, n_done_1, n_done_2;
tbb::atomic<int> n_timed_out;
static bool false_to_true;
struct TestPredicateFalseToTrue {
TestPredicateFalseToTrue() {}
bool operator()() { return false_to_true; }
struct TestPredicateFalse {
TestPredicateFalse() {}
bool operator()() { return false; }
struct TestPredicateTrue {
TestPredicateTrue() {}
bool operator()() { return true; }
// Test timed wait and timed wait with pred
template<typename M>
struct WorkForCondVarTimedWait: NoAssign {
int nthread;
condition_variable& test_cv;
M& my_mtx;
WorkForCondVarTimedWait( int n_, condition_variable& cv_, M& mtx_ ) : nthread(n_), test_cv(cv_), my_mtx(mtx_) {}
void operator()( int tid ) const {
tbb::tick_count t1, t2;
unique_lock<M> ul( my_mtx, defer_lock );
ASSERT( n_timed_out==0, NULL );
while( barrier<nthread ) __TBB_Yield();
// test if a thread times out with wait_for()
for( int i=1; i<10; ++i ) {
tbb::tick_count::interval_t intv((double)i*0.0001 /*seconds*/);
cv_status st = no_timeout;
/** Some version of glibc return EINVAL instead 0 when spurious wakeup occurs on pthread_cond_timedwait() **/
st = test_cv.wait_for( ul, intv );
} __TBB_CATCH( std::runtime_error& ) {}
ASSERT( ul, "mutex should have been reacquired" );
if( st==timeout )
ASSERT( n_timed_out>0, "should have been timed-out at least once\n" );
while( n_done_1<nthread ) __TBB_Yield();
for( int i=1; i<10; ++i ) {
tbb::tick_count::interval_t intv((double)i*0.0001 /*seconds*/);
/** Some version of glibc return EINVAL instead 0 when spurious wakeup occurs on pthread_cond_timedwait() **/
ASSERT( false==test_cv.wait_for( ul, intv, TestPredicateFalse()), "incorrect return value" );
} __TBB_CATCH( std::runtime_error& ) {}
ASSERT( ul, "mutex should have been reacquired" );
if( tid==0 )
n_waiters = 0;
// barrier
while( n_done_2<nthread ) __TBB_Yield();
// at this point, we know wait_for() successfully times out.
// so test if a thread blocked on wait_for() could receive a signal before its waiting time elapses.
if( tid==0 ) {
// signaler
n_signaled = 0;
ASSERT( n_waiters==0, NULL );
++n_done_2; // open gate 1
while( n_waiters<(nthread-1) ) __TBB_Yield(); // wait until all other threads block on cv. flag_1
n_waiters = 0;
while( n_done_2<2*nthread ) __TBB_Yield();
ASSERT( n_signaled>0, "too small an interval?" );
n_signaled = 0;
} else {
while( n_done_2<nthread+1 ) __TBB_Yield(); // gate 1
// sleeper
tbb::tick_count::interval_t intv((double)2.0 /*seconds*/);
++n_waiters; // raise flag 1/(nthread-1)
t1 = tbb::tick_count::now();
cv_status st = test_cv.wait_for( ul, intv ); // gate 2
t2 = tbb::tick_count::now();
if( st==no_timeout ) {
ASSERT( (t2-t1).seconds()<intv.seconds(), "got a signal after timed-out?" );
ASSERT( n_done==0, NULL );
if( tid==0 ) {
ASSERT( n_waiters==0, NULL );
++n_done; // open gate 3
while( n_waiters<(nthread-1) ) __TBB_Yield(); // wait until all other threads block on cv.
for( int i=0; i<2*short_delay; ++i ) __TBB_Yield(); // give some time to waiters so that all of them in the waitq
false_to_true = true;
test_cv.notify_all(); // open gate 4
while( n_done<nthread ) __TBB_Yield(); // wait until all other threads wake up.
ASSERT( n_signaled>0, "too small an interval?" );
} else {
while( n_done<1 ) __TBB_Yield(); // gate 3
tbb::tick_count::interval_t intv((double)2.0 /*seconds*/);
// wait_for w/ predciate
t1 = tbb::tick_count::now();
ASSERT( test_cv.wait_for( ul, intv, TestPredicateFalseToTrue())==true, NULL ); // gate 4
t2 = tbb::tick_count::now();
if( (t2-t1).seconds()<intv.seconds() )
tbb::atomic<int> ticket_for_sleep, ticket_for_wakeup, signaled_ticket, wokeup_ticket;
tbb::atomic<unsigned> n_visit_to_waitq;
unsigned max_waitq_length;
template<typename M>
struct WorkForCondVarWaitAndNotifyOne: NoAssign {
int nthread;
condition_variable& test_cv;
M& my_mtx;
WorkForCondVarWaitAndNotifyOne( int n_, condition_variable& cv_, M& mtx_ ) : nthread(n_), test_cv(cv_), my_mtx(mtx_) {}
void operator()( int tid ) const {
if( tid&1 ) {
// exercise signal part
while( ticket_for_wakeup<max_ticket ) {
int my_ticket = ++ticket_for_wakeup; // atomically grab the next ticket
if( my_ticket>max_ticket )
for( ;; ) {
unique_lock<M> ul( my_mtx, defer_lock );
if( n_waiters>0 && my_ticket<=ticket_for_sleep && my_ticket==(wokeup_ticket+1) ) {
signaled_ticket = my_ticket;
// give waiters time to go to sleep.
for( int m=0; m<short_delay; ++m )
} else {
while( ticket_for_sleep<max_ticket ) {
unique_lock<M> ul( my_mtx, defer_lock );
// exercise wait part
int my_ticket = ++ticket_for_sleep; // grab my ticket
if( my_ticket>max_ticket ) break;
// each waiter should go to sleep at least once
unsigned nw = ++n_waiters;
for( ;; ) {
// update to max_waitq_length
if( nw>max_waitq_length ) max_waitq_length = nw;
test_cv.wait( ul );
// if( ret==false ) ++n_timedout;
ASSERT( ul, "mutex should have been locked" );
if( signaled_ticket==my_ticket ) {
wokeup_ticket = my_ticket;
if( n_waiters>0 )
nw = ++n_waiters; // update to max_waitq_length occurs above
__TBB_Yield(); // give other threads chance to run.
spin_wait_until_eq( n_done, nthread );
ASSERT( n_signaled==max_ticket, "incorrect number of notifications sent" );
struct TestPredicate1 {
int target;
TestPredicate1( int i_ ) : target(i_) {}
bool operator()( ) { return signaled_ticket==target; }
template<typename M>
struct WorkForCondVarWaitPredAndNotifyAll: NoAssign {
int nthread;
condition_variable& test_cv;
M& my_mtx;
int multiple;
WorkForCondVarWaitPredAndNotifyAll( int n_, condition_variable& cv_, M& mtx_, int m_ ) :
nthread(n_), test_cv(cv_), my_mtx(mtx_), multiple(m_) {}
void operator()( int tid ) const {
if( tid&1 ) {
while( ticket_for_sleep<max_ticket ) {
unique_lock<M> ul( my_mtx, defer_lock );
// exercise wait part
int my_ticket = ++ticket_for_sleep; // grab my ticket
if( my_ticket>max_ticket )
unsigned nw = ++n_waiters;
if( nw>max_waitq_length ) max_waitq_length = nw;
test_cv.wait( ul, TestPredicate1( my_ticket ) );
wokeup_ticket = my_ticket;
ASSERT( ul, "mutex should have been locked" );
__TBB_Yield(); // give other threads chance to run.
} else {
// exercise signal part
while( ticket_for_wakeup<max_ticket ) {
int my_ticket = ++ticket_for_wakeup; // atomically grab the next ticket
if( my_ticket>max_ticket )
for( ;; ) {
unique_lock<M> ul( my_mtx );
if( n_waiters>0 && my_ticket<=ticket_for_sleep && my_ticket==(wokeup_ticket+1) ) {
signaled_ticket = my_ticket;
// give waiters time to go to sleep.
for( int m=0; m<long_delay*multiple; ++m )
spin_wait_until_eq( n_done, nthread );
ASSERT( n_signaled==max_ticket, "incorrect number of notifications sent" );
void InitGlobalCounters()
ticket_for_sleep = ticket_for_wakeup = signaled_ticket = wokeup_ticket = 0;
n_waiters = 0;
n_signaled = 0;
n_done = n_done_1 = n_done_2 = 0;
n_visit_to_waitq = 0;
n_timed_out = 0;
template<typename M>
void TestConditionVariable( const char* name, int nthread )
REMARK("testing %s in TestConditionVariable\n",name);
Counter<M> counter;
M mtx;
ASSERT( nthread>1, "at least two threads are needed for testing condition_variable" );
REMARK(" - constructor\n" );
// Test constructor.
condition_variable cv1;
#if _WIN32||_WIN64
condition_variable::native_handle_type handle = cv1.native_handle();
ASSERT( uintptr_t(&handle->cv_event)==uintptr_t(&handle->cv_native), NULL );
M mtx1;
barrier = 0;
NativeParallelFor( 2, WorkForCondVarCtor<M>( cv1, mtx1 ) );
REMARK(" - destructor\n" );
// Test destructor.
M mtx2;
test_cv = NULL;
n_waiters = 0;
NativeParallelFor( nthread, WorkForCondVarDtor<M>( nthread, mtx2 ) );
REMARK(" - timed_wait (i.e., wait_for)\n");
// Test timed wait.
condition_variable cv_tw;
M mtx_tw;
barrier = 0;
int nthr = nthread>4?4:nthread;
NativeParallelFor( nthr, WorkForCondVarTimedWait<M>( nthr, cv_tw, mtx_tw ) );
REMARK(" - wait with notify_one\n");
// Test wait and notify_one
do {
condition_variable cv3;
M mtx3;
NativeParallelFor( nthread, WorkForCondVarWaitAndNotifyOne<M>( nthread, cv3, mtx3 ) );
} while( n_visit_to_waitq==0 || max_waitq_length==0 );
REMARK(" - predicated wait with notify_all\n");
// Test wait_pred and notify_all
int delay_multiple = 1;
do {
condition_variable cv4;
M mtx4;
NativeParallelFor( nthread, WorkForCondVarWaitPredAndNotifyAll<M>( nthread, cv4, mtx4, delay_multiple ) );
if( max_waitq_length<unsigned(nthread/2) )
} while( n_visit_to_waitq<=0 || max_waitq_length<unsigned(nthread/2) );
static tbb::atomic<int> err_count;
try { \
op; \
++err_count; \
} catch( std::runtime_error& e ) {ASSERT( strstr(e.what(), msg) , NULL );} catch(...) {++err_count;}
template<typename M>
void TestUniqueLockException( const char * name ) {
REMARK("testing %s TestUniqueLockException\n",name);
M mtx;
unique_lock<M> ul_0;
err_count = 0;
TRY_AND_CATCH_RUNTIME_ERROR( ul_0.lock(), "Operation not permitted" );
TRY_AND_CATCH_RUNTIME_ERROR( ul_0.try_lock(), "Operation not permitted" );
unique_lock<M> ul_1( mtx );
TRY_AND_CATCH_RUNTIME_ERROR( ul_1.lock(), "Resource deadlock" );
TRY_AND_CATCH_RUNTIME_ERROR( ul_1.try_lock(), "Resource deadlock" );
TRY_AND_CATCH_RUNTIME_ERROR( ul_1.unlock(), "Operation not permitted" );
ASSERT( !err_count, "Some exceptions are not thrown or incorrect ones are thrown" );
template<typename M>
void TestConditionVariableException( const char * name ) {
REMARK("testing %s in TestConditionVariableException; yet to be implemented\n",name);
template<typename Mutex, typename RecursiveMutex>
void DoCondVarTest()
for( int p=MinThread; p<=MaxThread; ++p ) {
REMARK( "testing with %d threads\n", p );
TestLocks<Mutex>( "mutex", p );
TestLocks<RecursiveMutex>( "recursive_mutex", p );
if( p<=1 ) continue;
// for testing condition_variable, at least one sleeper and one notifier are needed
TestConditionVariable<Mutex>( "mutex", p );
REPORT("Known issue: exception handling tests are skipped.\n");
TestUniqueLockException<Mutex>( "mutex" );
TestUniqueLockException<RecursiveMutex>( "recursive_mutex" );
TestConditionVariableException<Mutex>( "mutex" );