blob: a9e410dc7fb98b0a4671da78f347107756a596b5 [file] [log] [blame]
#ifndef RTTL_THREAD_HXX
#define RTTL_THREAD_HXX
#include "RTInclude.hxx"
#include <pthread.h>
#if !defined(_WIN32)
typedef volatile int atomic_t;
#if defined(__sparc__) || defined(__sparc) || defined(sparc) || defined(__SPARC__)
#define ASI_P 0x80
#define casa(rs1, rs2, rd) ({ \
u_int __rd = (uint32_t)(rd); \
__asm __volatile("casa [%2] %3, %4, %0" \
: "+r" (__rd), "=m" (*rs1) \
: "r" (rs1), "n" (ASI_P), "r" (rs2), "m" (*rs1)); \
__rd; \
})
_INLINE int atomic_add(atomic_t *v, const int c) {
atomic_t e, r, s;
for (e = *v;; e = r) {
s = e + c;
r = casa(v, e, s);
if (r == e)
break;
}
return e;
}
#else
_INLINE int atomic_add(atomic_t *v, const int c) {
int i = c;
int __i = i;
__asm__ __volatile__(
"lock ; xaddl %0, %1;"
:"=r"(i)
:"m"(*v), "0"(i));
return i + __i;
}
#endif
_INLINE int atomic_inc(atomic_t *v) {
return atomic_add(v,1);
}
_INLINE int atomic_dec(atomic_t *v) {
return atomic_add(v,-1);
}
#else
typedef volatile LONG atomic_t;
_INLINE int atomic_add(atomic_t *v, const int c)
{
LONG value = InterlockedExchangeAdd(v,(LONG)c);
return (int)value;
}
_INLINE int atomic_inc(atomic_t *v)
{
LONG value = atomic_add(v,1);
return (int)value;
}
_INLINE int atomic_dec(atomic_t *v)
{
LONG value = atomic_add(v,-1);
return (int)value;
}
#endif
/* class should have the right alignment to prevent cache trashing */
class AtomicCounter
{
private:
atomic_t m_counter;
char dummy[64-sizeof(atomic_t)]; // (iw) to make sure it's the only
// counter sitting in its
// cacheline....
public:
AtomicCounter() {
reset();
}
AtomicCounter(const int v) {
m_counter = v;
}
_INLINE void reset() {
#if defined(_WIN32)
m_counter = 0;
#else
m_counter = -1;
#endif
}
_INLINE int inc() {
return atomic_inc(&m_counter);
}
_INLINE int dec() {
return atomic_dec(&m_counter);
}
_INLINE int add(const int i) {
return atomic_add(&m_counter,i);
}
};
#define DBG_THREAD(x)
#ifdef NEEDS_PTHREAD_BARRIER_T_WRAPPER
class Barrier
{
int total;
int waiting;
pthread_cond_t m_cond;
pthread_mutex_t m_mutex;
public:
void init(int count)
{
pthread_mutex_init(&m_mutex,NULL);
pthread_cond_init(&m_cond,NULL);
total = count;
waiting = 0;
}
void wait()
{
pthread_mutex_lock(&m_mutex);
++waiting;
if (waiting == total)
{
pthread_cond_broadcast(&m_cond);
waiting = 0;
}
else
{
pthread_cond_wait(&m_cond,&m_mutex);
}
pthread_mutex_unlock(&m_mutex);
}
};
#else
class Barrier
{
protected:
pthread_barrier_t m_barrier;
public:
void init(int count)
{
pthread_barrier_init(&m_barrier,NULL,count);
}
void wait()
{
pthread_barrier_wait(&m_barrier);
}
};
#endif
class MultiThreadedTaskQueueServer; /// Actually executes tasks
class MultiThreadedScheduler; /// Allows suspending/resuming threads
class MultiThreadedTaskQueue /// Externally visible
{
public:
friend class MultiThreadedTaskQueueServer;
enum {
THREAD_EXIT,
THREAD_RUNNING
};
/// Allows defining jobs in derived classes.
virtual int task(int jobID, int threadID) {
/* nothing to do */
//PING;
//cout << "THIS SHOULD NOT BE CALLED" << endl;
return THREAD_RUNNING;
}
MultiThreadedTaskQueue() : m_threads(0) {}
explicit MultiThreadedTaskQueue(int nt) : m_threads(nt) { if (nt) createThreads(nt); }
~MultiThreadedTaskQueue() {}
/// Client differentiator: by default it is mapped to this pointer.
/// Derived classes could use it to implement
/// multi-phase processing.
virtual long long int tag() const {
return (long long int)this;
}
/// Set max # of threads handled by m_server;
/// actual number executed inside this class (as defined by createThreads)
/// may be different.
static void setMaxNumberOfThreads(int threads);
static int maxNumberOfThreads();
/// Request nthreads from server and execute task() in each
/// (only after startThreads() is called).
/// startThreads/waitForAllThreads pairs could be executed multiple times.
void createThreads(int nthreads);
/// Go!
virtual void startThreads();
/// Wait...
virtual void waitForAllThreads();
/// Start & wait
void executeAllThreads() {
startThreads();
waitForAllThreads();
}
_INLINE int numberOfThreads() const { return m_threads; }
protected:
volatile int m_assigned_jobs; // # of assigned jobs for client, not used for server
volatile int m_finished_jobs; // # of completed jobs for client, marker for server
int m_threads; // # of jobs; fo reach job task() will be called with job id
static MultiThreadedTaskQueueServer m_server;
static MultiThreadedTaskQueue** m_client;
static MultiThreadedScheduler* m_scheduler;
};
/// This class just allows suspending/resuming threads;
/// it is implemented on top of pthreads.
class MultiThreadedSyncPrimitive
{
public:
MultiThreadedSyncPrimitive() {
pthread_mutex_init(&m_mutex, NULL);
pthread_cond_init(&m_cond, NULL);
}
~MultiThreadedSyncPrimitive() {
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
// http://node1.yo-linux.com/cgi-bin/man2html?cgi_command=pthread_mutex_lock
#if 1
_INLINE int lock() { return pthread_mutex_lock(&m_mutex); }
_INLINE int trylock() { return pthread_mutex_trylock(&m_mutex); }
_INLINE int unlock() { return pthread_mutex_unlock(&m_mutex); }
_INLINE int suspend() { return pthread_cond_wait(&m_cond, &m_mutex); }
_INLINE void resume() { pthread_cond_signal(&m_cond); }
_INLINE void resumeAll() { pthread_cond_broadcast(&m_cond); }
#else
_INLINE int lock() { cout << "L" << flush; return pthread_mutex_lock(&m_mutex); }
_INLINE int trylock() { cout << "T" << flush; return pthread_mutex_trylock(&m_mutex); }
_INLINE int unlock() { cout << "U" << flush; return pthread_mutex_unlock(&m_mutex); }
_INLINE int suspend() { cout << "S" << flush; return pthread_cond_wait(&m_cond, &m_mutex); }
_INLINE void resume() { cout << "R" << flush; pthread_cond_signal(&m_cond); }
_INLINE void resumeAll() { cout << "A" << flush; pthread_cond_broadcast(&m_cond); }
#endif
protected:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif