blob: 9b26ccd5072847ba8dced9856c225963d13cbf3f [file] [log] [blame]
#include <cstring>
#include <cerrno>
#include "RTTL/common/RTThread.hxx"
/// This class extends MultiThreadedSyncPrimitive by including m_suspended member.
class MultiThreadedScheduler: public MultiThreadedSyncPrimitive
{
public:
MultiThreadedScheduler() : m_suspended(false), MultiThreadedSyncPrimitive() {
}
~MultiThreadedScheduler() {
}
_INLINE int suspended() const {
return m_suspended;
}
// This function hides MultiThreadedSyncPrimitive::suspend().
_INLINE void suspend(bool locked = false, int value = true) {
int code;
if (locked == false) code = lock();
m_suspended = value;
code = MultiThreadedSyncPrimitive::suspend();
code = unlock();
}
_INLINE void resume() {
m_suspended = false;
MultiThreadedSyncPrimitive::resume();
}
private:
int m_suspended;
};
/// This class is neither visible nor accessible by the application.
class MultiThreadedTaskQueueServer: public MultiThreadedTaskQueue, public MultiThreadedSyncPrimitive
{
protected:
MultiThreadedTaskQueueServer(int)
: MultiThreadedTaskQueue(),
MultiThreadedSyncPrimitive(),
m_thread(NULL)
{
PRINT(this);
m_finished_jobs = 0;
memset(m_waitingClients, 0, sizeof(m_waitingClients));
}
~MultiThreadedTaskQueueServer() {
clean(false);
}
_INLINE void clean(bool force_exit_from_thread_function = true) {
if (m_threads <= 0) {
return;
}
if (force_exit_from_thread_function) {
lock();
// Let all active threads know we're finished.
int nt = m_threads;
m_threads = -1;
// Gently push suspended threads allowing them to discover
// that they should return from threadFunc.
for (int i = 0; i < nt; i++) {
m_scheduler[i].resume();
}
unlock();
// Wait till all threads do just that.
int i = 0;
while (i < nt) {
for (; i < nt; i++) {
if (m_scheduler[i].suspended() != -1) {
//_mm_pause();
sched_yield();
break;
}
}
}
}
if (m_thread)
delete [] m_thread;
if (m_client)
delete [] m_client;
if (m_scheduler)
delete [] m_scheduler;
}
public:
friend class MultiThreadedTaskQueue;
/// Without arguments, suspend the server,
/// With integer argument, suspend i-th task.
using MultiThreadedSyncPrimitive::suspend;
_INLINE static void suspend(int i) {
assert(m_client[i] == NULL);
m_scheduler[i].suspend();
}
/// Without arguments, resume the server.
/// With integer argument, resume i-th task.
using MultiThreadedSyncPrimitive::resume;
using MultiThreadedSyncPrimitive::resumeAll;
_INLINE static void resume(int i) {
assert(m_client[i]);
m_scheduler[i].resume();
}
void createThreads(int threads) {
assert(threads >= 1);
// Free resources.
clean();
typedef MultiThreadedTaskQueue* pMultiThreadedTaskQueue;
m_threads = threads;
m_thread = new pthread_t[threads];
m_client = new pMultiThreadedTaskQueue[threads];
m_scheduler = new MultiThreadedScheduler[threads];
memset(m_client, 0, sizeof(pMultiThreadedTaskQueue) * threads);
int res;
//cout << "creating " << m_threads << " RENDERER THREADS" << endl;
for (int i = 0; i < m_threads; i++) {
if ( (res = pthread_create(&m_thread[i], NULL, &threadFunc, (void*)i)) != 0) {
cerr << "can't create thread " << i;
if (res == EAGAIN)
cerr << " (the system does not have enough resources)" << endl;
else
cerr << ". reason : " << res << endl;
exit(EXIT_FAILURE);
}
}
// Wait till all threads suspend themselves for the first time.
waitUntillAllThreadsAreSuspended();
}
_INLINE bool finished() const { return m_threads == -1; }
private:
// All other member functions are not accessible by derived classes.
_INLINE void waitUntillAllThreadsAreSuspended() {
int ns;
do {
//_mm_pause(); // sleep(0);
sched_yield();
ns = 0;
for (int i = 0; i < m_threads; i++)
ns += m_scheduler[i].suspended();
} while (ns != m_threads);
}
static void* threadFunc(void* _id) {
long long int id = (long long int)_id;
_mm_setcsr(_mm_getcsr() | /*FTZ:*/ (1<<15) | /*DAZ:*/ (1<<6));
// First time...
m_scheduler[id].suspend();
int seqn;
long long int tag = -1;
while (!m_server.finished()) {
MultiThreadedTaskQueue* client = MultiThreadedTaskQueue::m_client[id];
int action = 0;
assert(client);
if (tag != client->tag()) {
tag = client->tag();
m_server.lock();
if (client->m_assigned_jobs >= client->m_threads) {
m_server.unlock();
goto getnext;
}
seqn = client->m_assigned_jobs++;
m_server.unlock();
}
action = client->task(seqn, (int)id);
if (action == THREAD_EXIT)
break;
getnext:
// deactivateThread also suspends it if there are no more jobs pending.
seqn = deactivateThread((int)id);
}
m_scheduler[id].suspend(false, -1);
return NULL;
}
_INLINE static int deactivateThread(const int threadID) {
// Clear up threadID job.
MultiThreadedTaskQueue* client = m_client[threadID];
assert(m_client[threadID]);
m_client[threadID] = NULL;
// Check if this client is finished and
// schedule a new job for the current thread.
int code = m_server.lock();
int seqn = -1;
//printf("%i", (1 + client->m_finished_jobs));
// If done, server will be waked up (it was suspended in waitForAllThreads).
bool done = ++client->m_finished_jobs == client->m_threads;
// Is there any new job for this thread?
if (m_server.schedule(threadID)) {
// Yep, unlock and return to the threadFunc
// (after resuming the server if we done).
if (done)
m_server.resume();
else
seqn = client->m_assigned_jobs++;
code = m_server.unlock();
} else {
// Nope, wait here till addClient resumes this thread.
// (after resuming the server if we done).
if (done)
m_server.resume();
m_scheduler[threadID].lock();
code = m_server.unlock();
m_scheduler[threadID].suspend(true);
}
return seqn;
}
_INLINE int getNextClientIndex() {
// This function doesn't lock anything; caller should do it.
int j = m_finished_jobs;
for (int i = 0; i < queuesize; i++) {
if (m_waitingClients[j]) {
return j;
}
if (++j == queuesize) j = 0;
}
return -1;
}
void addClient(MultiThreadedTaskQueue* client) {
recheck:
m_server.lock();
int j = m_finished_jobs;
for (int i = 0; i < queuesize; i++) {
if (m_waitingClients[j] == NULL) goto found;
if (++j == queuesize) j = 0;
}
// Should be extremely rare occasion; worth signaling fatal error...
// But just for shear fun of it...
m_server.unlock();
//_mm_pause(); // sleep(0);
sched_yield();
goto recheck;
found:
m_waitingClients[j] = client;
m_scheduledJobs[j] = 0;
m_finished_jobs = j + 1;
if (m_finished_jobs == queuesize) m_finished_jobs = 0;
int nt = m_server.m_threads;
// For all free threads among nt server threads...
for (int i = 0; i < nt; i++) {
m_scheduler[i].lock();
if (m_scheduler[i].suspended()) {
if (!schedule(i)) {
m_scheduler[i].unlock();
break;
}
m_scheduler[i].resume();
}
m_scheduler[i].unlock();
}
m_server.unlock();
}
bool schedule(int i) {
// This function doesn't lock anything; caller should do it.
if (m_client[i] == NULL) {
int ji = getNextClientIndex();
if (ji == -1) {
// There are no more pending jobs.
return false;
}
m_client[i] = m_waitingClients[ji];
// Are we done assigning threads for this client?
if (++m_scheduledJobs[ji] == m_waitingClients[ji]->m_threads)
m_waitingClients[ji] = NULL; // yep, remove it from the queue
}
return true;
}
pthread_t* m_thread; // active server threads
static const int queuesize = 4; // queue size
MultiThreadedTaskQueue* m_waitingClients[queuesize]; // client queue
int m_scheduledJobs[queuesize]; // # of already scheduled jobs per client
};
/// This is a public function.
void MultiThreadedTaskQueue::setMaxNumberOfThreads(int threads) {
PRINT(threads);
m_server.createThreads(threads);
}
/// This is a public function.
int MultiThreadedTaskQueue::maxNumberOfThreads() {
return m_server.numberOfThreads();
}
/// This is a public function.
void MultiThreadedTaskQueue::createThreads(int threads) {
assert(threads >= 1);
m_threads = threads;
m_server.lock();
if (m_server.m_threads == 0) {
// Initialize server (once).
m_server.createThreads(threads);
}
m_server.unlock();
}
/// This is a public function.
void MultiThreadedTaskQueue::startThreads() {
m_finished_jobs = m_assigned_jobs = 0;
m_server.addClient(this);
}
/// This is a public function.
void MultiThreadedTaskQueue::waitForAllThreads() {
if (m_finished_jobs == m_threads)
return;
int code = m_server.lock();
//printf("%c", char('a' + m_finished_jobs));
// Server is suspended if threads are stil running...
while (m_finished_jobs < m_threads)
m_server.suspend();
//printf("\n");
code = m_server.unlock();
}
/// Initialize static members.
MultiThreadedTaskQueueServer MultiThreadedTaskQueue::m_server(1);
MultiThreadedTaskQueue** MultiThreadedTaskQueue::m_client = NULL;
MultiThreadedScheduler* MultiThreadedTaskQueue::m_scheduler = NULL;