blob: 379cbebc16ac9c858ae2933cec246271457ebdf4 [file] [log] [blame]
/*
Copyright 2005-2010 Intel Corporation. All Rights Reserved.
This file is part of Threading Building Blocks.
Threading Building Blocks is free software; you can redistribute it
and/or modify it under the terms of the GNU General Public License
version 2 as published by the Free Software Foundation.
Threading Building Blocks is distributed in the hope that it will be
useful, but WITHOUT ANY WARRANTY; without even the implied warranty
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Threading Building Blocks; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
As a special exception, you may use this file as part of a free software
library without restriction. Specifically, if other files instantiate
templates or use macros or inline functions from this file, or you compile
this file and link it with other files to produce an executable, this
file does not by itself cause the resulting executable to be covered by
the GNU General Public License. This exception does not however
invalidate any other reasons why the executable file might be covered by
the GNU General Public License.
*/
#ifndef __TBB_concurrent_monitor_H
#define __TBB_concurrent_monitor_H
#include "tbb/tbb_stddef.h"
#include "tbb/atomic.h"
#include "tbb/spin_mutex.h"
#if _WIN32||_WIN64
#if _XBOX
#define NONET
#define NOD3D
#include <xtl.h>
#else
#include <windows.h>
#endif
#elif __APPLE__
#include <pthread.h>
#include <mach/semaphore.h>
#include <mach/task.h>
#include <mach/mach_init.h>
#include <mach/error.h>
#else
#include <pthread.h>
#include <semaphore.h>
#ifdef TBB_USE_DEBUG
#include <errno.h>
#endif
#endif /*_WIN32||_WIN64*/
namespace tbb {
namespace internal {
#if _WIN32||_WIN64
typedef LONG sem_count_t;
//! semaphore for concurrent_monitor
class semaphore : no_copy {
public:
//! ctor
semaphore() {sem = CreateSemaphore( NULL, 0, 1, NULL );}
//! dtor
~semaphore() {CloseHandle( sem );}
//! wait/acquire
void P() {WaitForSingleObject( sem, INFINITE );}
//! post/release
void V() {ReleaseSemaphore( sem, 1, NULL );}
private:
HANDLE sem;
};
#elif __APPLE__
//! semaphore for concurrent monitor
class semaphore : no_copy {
public:
//! ctor
semaphore() : sem(0) {
kern_return_t ret = semaphore_create( mach_task_self(), &sem, SYNC_POLICY_FIFO, 0 );
__TBB_ASSERT_EX( ret==err_none, "failed to create a semaphore" );
}
//! dtor
~semaphore() {
kern_return_t ret = semaphore_destroy( mach_task_self(), sem );
__TBB_ASSERT_EX( ret==err_none, NULL );
}
//! wait/acquire
void P() {
int ret;
do {
ret = semaphore_wait( sem );
} while( ret==KERN_ABORTED );
__TBB_ASSERT( ret==KERN_SUCCESS, "semaphore_wait() failed" );
}
//! post/release
void V() { semaphore_signal( sem ); }
private:
semaphore_t sem;
};
#else /* Linux/Unix */
typedef uint32_t sem_count_t;
//! semaphore for concurrent monitor
class semaphore : no_copy {
public:
//! ctor
semaphore() {
int ret = sem_init( &sem, /*shared among threads*/ 0, 0 );
__TBB_ASSERT_EX( !ret, NULL );
}
//! dtor
~semaphore() {
int ret = sem_destroy( &sem );
__TBB_ASSERT_EX( !ret, NULL );
}
//! wait/acquire
void P() {
while( sem_wait( &sem )!=0 )
__TBB_ASSERT( errno==EINTR, NULL );
}
//! post/release
void V() { sem_post( &sem ); }
private:
sem_t sem;
};
#endif /* _WIN32||_WIN64 */
//! Circular doubly-linked list with sentinel
/** head.next points to the front and head.prev points to the back */
class circular_doubly_linked_list_with_sentinel : no_copy {
public:
struct node_t {
node_t* next;
node_t* prev;
node_t() : next(NULL), prev(NULL) {}
};
// ctor
circular_doubly_linked_list_with_sentinel() {clear();}
// dtor
~circular_doubly_linked_list_with_sentinel() {__TBB_ASSERT( head.next==&head && head.prev==&head, "the list is not empty" );}
inline size_t size() const {return count;}
inline bool empty() const {return size()==0;}
inline node_t* front() const {return head.next;}
inline node_t* last() const {return head.prev;}
inline node_t* begin() const {return front();}
inline const node_t* end() const {return &head;}
//! add to the back of the list
inline void add( node_t* n ) {
count = count + 1;
n->prev = head.prev;
n->next = &head;
head.prev->next = n;
head.prev = n;
}
//! remove node 'n' from the 'this' list
inline void remove( node_t& n ) {
count = count - 1;
n.prev->next = n.next;
n.next->prev = n.prev;
}
//! move all elements to 'lst' and initiallize the 'this' list
inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) {
if( count>0 ) {
lst.count = count;
lst.head.next = head.next;
lst.head.prev = head.prev;
head.next->prev = &lst.head;
head.prev->next = &lst.head;
clear();
}
}
#if !TBB_USE_DEBUG
private:
#endif
atomic<size_t> count;
node_t head;
void clear() {count = 0; head.next = &head; head.prev = &head;}
};
typedef circular_doubly_linked_list_with_sentinel waitset_t;
typedef circular_doubly_linked_list_with_sentinel dllist_t;
typedef circular_doubly_linked_list_with_sentinel::node_t waitset_node_t;
class concurrent_monitor;
//! concurrent_monitor
/** fine-grained concurrent_monitor implementation */
class concurrent_monitor : no_copy {
public:
/** per-thread descriptor for concurrent_monitor */
class thread_context : waitset_node_t, no_copy {
friend class concurrent_monitor;
public:
thread_context() : spurious(false), context(NULL) {epoch = 0; in_waitset = false;}
~thread_context() { if( spurious ) sema.P(); }
private:
semaphore sema;
tbb::atomic<unsigned> epoch;
tbb::atomic<bool> in_waitset;
bool spurious;
void* context;
};
//! ctor
concurrent_monitor() {epoch = 0;}
//! prepare wait by inserting 'thr' into the wailt queue
void prepare_wait( thread_context& thr, void* ctx = 0 );
//! Commit wait if even count has not changed; otherwise, cancel wait.
/** Returns true of commited; false if canceled. */
inline bool commit_wait( thread_context& thr ) {
bool do_it = thr.epoch==epoch;
// this check is just an optimization
if( do_it ) {
thr.sema.P();
__TBB_ASSERT( !thr.in_waitset, "still in the queue?" );
} else {
cancel_wait( thr );
}
return do_it;
}
//! Cancel the wait. Removes the thread from the wait queue if not removed yet.
void cancel_wait( thread_context& thr );
//! Notify one thread about the event
void notify_one() {__TBB_rel_acq_fence(); notify_one_relaxed();}
//! Notify one thread about the event. Relaxed version.
void notify_one_relaxed();
//! Notify all waiting threads of the event
void notify_all() {__TBB_rel_acq_fence(); notify_all_relaxed();}
//! Notify all waiting threads of the event; Relaxed version
void notify_all_relaxed();
//! Notify waiting threads of the event that satisfies the given predicate
template<typename P> void notify( const P& predicate ) {__TBB_rel_acq_fence();notify_relaxed( predicate );}
//! Notify waiting threads of the event that satisfies the given predicate; Relaxed version
template<typename P> void notify_relaxed( const P& predicate );
private:
tbb::spin_mutex mutex_ec;
waitset_t waitset_ec;
tbb::atomic<unsigned> epoch;
thread_context* to_thread_context( waitset_node_t* n ) { return static_cast<thread_context*>(n); }
};
template<typename P>
void concurrent_monitor::notify_relaxed( const P& predicate ) {
if( waitset_ec.size()==0 )
return;
dllist_t temp;
waitset_node_t* nxt;
const waitset_node_t* end = waitset_ec.end();
{
tbb::spin_mutex::scoped_lock l( mutex_ec );
epoch = epoch + 1;
for( waitset_node_t* n=waitset_ec.last(); n!=end; n=nxt ) {
nxt = n->prev;
thread_context* thr = to_thread_context( n );
if( predicate( thr->context ) ) {
waitset_ec.remove( *n );
thr->in_waitset = false;
temp.add( n );
}
}
}
end = temp.end();
for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
nxt = n->next;
to_thread_context(n)->sema.V();
}
#if TBB_USE_DEBUG
temp.clear();
#endif
}
} // namespace internal
} // namespace tbb
#endif /* __TBB_concurrent_monitor_H */