blob: 936024af7dfeb75fddf69c894999d617ced78f58 [file] [log] [blame]
/*
* Implements dynamic task queues to provide load balancing
* Sanjeev Kumar --- December, 2004
*/
#include <time.h>
#include "config.h"
#ifdef ENABLE_PTHREADS
#include "alamere.h"
pthread_t _M4_threadsTable[MAX_THREADS];
int _M4_threadsTableAllocated[MAX_THREADS];
pthread_mutexattr_t _M4_normalMutexAttr;
#endif //ENABLE_PTHREADS
#include "taskQInternal.h"
#include "taskQList.h"
static volatile long numThreads, numTaskQs, threadsPerTaskQ, maxTasks;
static volatile int nextQ = 0; // Just a hint. Not protected by locks.
static volatile int parallelRegion = 0;
static volatile int noMoreTasks = 0;
#if defined( TASKQ_DIST_GRID)
#include "taskQDistGrid.h"
#elif defined( TASKQ_DIST_LIST)
#include "taskQDistList.h"
#elif defined( TASKQ_DIST_FIXED)
#include "taskQDistFixed.h"
#else
#error "Missing Definition"
#endif
typedef struct {
#ifdef ENABLE_PTHREADS
pthread_mutex_t lock;
#endif //ENABLE_PTHREADS
long statEnqueued, statLocal, *statStolen;
char padding1[CACHE_LINE_SIZE];
TaskQDetails q;
char padding2[CACHE_LINE_SIZE];
} TaskQ;
typedef struct {
#ifdef ENABLE_PTHREADS
pthread_mutex_t lock;
pthread_cond_t taskAvail;
pthread_cond_t tasksDone;
#endif //ENABLE_PTHREADS
volatile long threadCount;
} Sync;
TaskQ *taskQs;
Sync sync;
#define MAX_STEAL 8
static inline int calculateNumSteal( int available) {
int half = ( available == 1) ? 1 : available/2;
return ( half > MAX_STEAL) ? MAX_STEAL : half;
}
#if defined( TASKQ_DIST_GRID)
#include "taskQDistGrid.c"
#elif defined( TASKQ_DIST_LIST)
#include "taskQDistList.c"
#elif defined( TASKQ_DIST_FIXED)
#include "taskQDistFixed.c"
#else
#error "Missing Definition"
#endif
static void waitForTasks( void) {
TRACE;
#ifdef ENABLE_PTHREADS
pthread_mutex_lock(&(sync.lock));;
sync.threadCount++;
if ( sync.threadCount == numThreads)
pthread_cond_broadcast(&sync.tasksDone);;
pthread_cond_wait(&sync.taskAvail,&sync.lock); pthread_mutex_unlock(&sync.lock);;
#else
sync.threadCount++;
#endif //ENABLE_PTHREADS
TRACE;
}
static void signalTasks( void) {
TRACE;
if ( sync.threadCount == 0) return; // Unsafe
#ifdef ENABLE_PTHREADS
pthread_mutex_lock(&(sync.lock));;
sync.threadCount = 0;
pthread_cond_broadcast(&sync.taskAvail);;
pthread_cond_broadcast(&sync.tasksDone);;
pthread_mutex_unlock(&(sync.lock));;
#else
sync.threadCount = 0;
#endif //ENABLE_PTHREADS
TRACE;
}
static int waitForEnd( void) {
int done;
TRACE;
#ifdef ENABLE_PTHREADS
pthread_mutex_lock(&(sync.lock));;
sync.threadCount++;
if ( sync.threadCount != numThreads)
pthread_cond_wait(&sync.tasksDone,&sync.lock);;
done = (sync.threadCount == numThreads);
pthread_mutex_unlock(&(sync.lock));;
#else
sync.threadCount++;
done = (sync.threadCount == numThreads);
#endif //ENABLE_PTHREADS
TRACE;
return done;
}
static int doOwnTasks( long myThreadId, long myQ) {
void *task[NUM_FIELDS];
int executed = 0;
TRACE;
while ( getATaskFromHead( &taskQs[myQ], task)) {
( ( TaskQTask3)task[0])( myThreadId, task[1], task[2], task[3]);
executed = 1;
}
TRACE;
return executed;
}
static int stealTasks( long myThreadId, long myQ) {
int i, stolen = 0;
TRACE;
i = myQ + 1;
while ( 1 ) {
if ( i == numTaskQs) i = 0;
if ( i == myQ) break;
stolen = stealTasksSpecialized( &taskQs[myQ], &taskQs[i]);
if( stolen) {
IF_STATS( {
#ifdef ENABLE_PTHREADS
pthread_mutex_lock(&(taskQs[myQ].lock));;
taskQs[myQ].statStolen[i] += stolen;
pthread_mutex_unlock(&(taskQs[myQ].lock));;
#else
taskQs[myQ].statStolen[i] += stolen;
#endif //ENABLE_PTHREADS
});
break;
} else {
i++;
}
}
TRACE;
return stolen;
}
static void *taskQIdleLoop( void *arg) {
long index = ( long)arg;
long myQ = index / threadsPerTaskQ;
int i = 0;
int stolen;
while ( 1 ) {
waitForTasks();
if ( noMoreTasks) return 0;
doOwnTasks( index, myQ);
while ( 1) {
stolen = stealTasks( index, myQ);
if ( stolen)
doOwnTasks( index, myQ);
else
break;
}
i++;
}
}
void taskQInit( int numOfThreads, int maxNumOfTasks) {
int i;
#ifdef ENABLE_PTHREADS
ALAMERE_INIT(numOfThreads);
ALAMERE_AFTER_CHECKPOINT();
pthread_mutexattr_init( &_M4_normalMutexAttr);
pthread_mutexattr_settype( &_M4_normalMutexAttr, PTHREAD_MUTEX_NORMAL);
{
int _M4_i;
for ( _M4_i = 0; _M4_i < MAX_THREADS; _M4_i++) {
_M4_threadsTableAllocated[_M4_i] = 0;
}
}
#endif //ENABLE_PTHREADS
;
maxTasks = maxNumOfTasks;
numThreads = numOfThreads;
threadsPerTaskQ = taskQGetParam( TaskQThreadsPerQueue);
DEBUG_ASSERT( ( numThreads >= 1) && ( threadsPerTaskQ >= 1));
numTaskQs = (numOfThreads+threadsPerTaskQ-1)/threadsPerTaskQ;
/*
printf( "\n\n\t##### Running TaskQ version Distributed %-15s with %ld threads and %ld queues #####\n",
VERSION, numThreads, numTaskQs);
printf( "\t##### \t\t\t\t\t\t[ built on %s at %s ] #####\n\n", __DATE__, __TIME__);
printf( "\t\t TaskQ mutex address : %ld\n", ( long)&sync.lock);
printf( "\t\t TaskQ condition variable 1 address : %ld\n", ( long)&sync.taskAvail);
printf( "\t\t TaskQ condition variable 2 address : %ld\n", ( long)&sync.tasksDone);
printf( "\n\n");
*/
DEBUG_ANNOUNCE;
taskQs = ( TaskQ *)malloc( sizeof( TaskQ) * numTaskQs);
for ( i = 0; i < numTaskQs; i++) {
#ifdef ENABLE_PTHREADS
pthread_mutex_init(&(taskQs[i].lock), NULL);;
#endif //ENABLE_PTHREADS
taskQs[i].statStolen = ( long *)malloc( sizeof(long) * numTaskQs);
initTaskQDetails( &taskQs[i]);
}
taskQResetStats();
#ifdef ENABLE_PTHREADS
pthread_mutex_init(&(sync.lock), NULL);;
pthread_cond_init(&sync.taskAvail,NULL);;
pthread_cond_init(&sync.tasksDone,NULL);;
#endif //ENABLE_PTHREADS
sync.threadCount = 0;
#ifdef ENABLE_PTHREADS
for ( i = 1; i < numThreads; i++)
{
int _M4_i;
for ( _M4_i = 0; _M4_i < MAX_THREADS; _M4_i++) {
if ( _M4_threadsTableAllocated[_M4_i] == 0) break;
}
pthread_create(&_M4_threadsTable[_M4_i],NULL,(void *(*)(void *))taskQIdleLoop,(void *)( long)i);
_M4_threadsTableAllocated[_M4_i] = 1;
}
#endif //ENABLE_PTHREADS
;
waitForEnd();
}
static inline int pickQueue( int threadId) { // Needs work
if ( parallelRegion) return threadId/threadsPerTaskQ;
int q = nextQ;
int p = q+1;
nextQ = ( p >= numTaskQs) ? 0 : p;
DEBUG_ASSERT( q < numTaskQs);
return q;
}
void taskQEnqueueGrid( TaskQTask taskFunction, TaskQThreadId threadId, long numOfDimensions,
long dimensionSize[MAX_DIMENSION], long tileSize[MAX_DIMENSION]) {
taskQEnqueueGridSpecialized( ( TaskQTask3)taskFunction, threadId, numOfDimensions, tileSize);
enqueueGridHelper( assignTasks, ( TaskQTask3)taskFunction, numOfDimensions, numTaskQs, dimensionSize, tileSize);
if ( parallelRegion) signalTasks();
}
static inline void taskQEnqueueTaskHelper( int threadId, void *task[NUM_FIELDS]) {
TRACE;
int queueNo = pickQueue( threadId);
// if ( !parallelRegion) printf( "%30ld %20ld\n", ( long)task[1], ( long)queueNo);
taskQEnqueueTaskSpecialized( &taskQs[queueNo], task);
if ( parallelRegion) signalTasks();
}
void taskQEnqueueTask1( TaskQTask1 taskFunction, TaskQThreadId threadId, void *arg1) {
TRACE;
void *task[NUM_FIELDS];
copyArgs1( task, taskFunction, arg1);
taskQEnqueueTaskHelper( threadId, task);
}
void taskQEnqueueTask2( TaskQTask2 taskFunction, TaskQThreadId threadId, void *arg1, void *arg2) {
TRACE;
void *task[NUM_FIELDS];
copyArgs2( task, taskFunction, arg1, arg2);
taskQEnqueueTaskHelper( threadId, task);
}
void taskQEnqueueTask3( TaskQTask3 taskFunction, TaskQThreadId threadId, void *arg1, void *arg2, void *arg3) {
TRACE;
void *task[NUM_FIELDS];
copyArgs3( task, taskFunction, arg1, arg2, arg3);
taskQEnqueueTaskHelper( threadId, task);
}
void taskQWait( void) {
static int i = 0;
int done;
parallelRegion = 1;
signalTasks();
TRACE;
do {
doOwnTasks( 0, 0);
stealTasks( 0, 0);
doOwnTasks( 0, 0);
done = waitForEnd();
} while (!done);
parallelRegion = 0;
TRACE;
i++;
}
void taskQResetStats() {
IF_STATS( {
int i; int j;
for ( i = 0; i < numTaskQs; i++) {
taskQs[i].statEnqueued = 0;
taskQs[i].statLocal = 0;
for ( j = 0; j < numTaskQs; j++) {
taskQs[i].statStolen[j] = 0;
}
}
});
}
void taskQPrnStats() {
IF_STATS( {
long j; long i; long total1 = 0; long total2 = 0;
printf( "\n\n\t##### Cumulative statistics from Task Queues #####\n\n");
printf( "\t\t%-10s %-10s %-10s %-10s %-10s\n\n", "Queue", "Enqueued", "Local", "Stolen", "Executed");
for ( j = 0; j < numTaskQs; j++) {
long totalStolen = 0;
for ( i = 0; i < numTaskQs; i++) totalStolen += taskQs[j].statStolen[i];
printf( "\t\t%5ld %10ld %10ld %10ld %10ld\n", j, taskQs[j].statEnqueued, taskQs[j].statLocal-totalStolen, totalStolen, taskQs[j].statLocal);
total1 += taskQs[j].statEnqueued;
total2 += taskQs[j].statLocal;
}
printf( "\t\t%5s %10ld %10s %10s %10ld\n", "Total", total1, "", "", total2);
printf( "\n\n");
printf( "\tBreakdown of Task Stealing\n\n\t%10s", "");
for ( i = 0; i < numTaskQs; i++) { printf( "%8ld Q", i); }
printf( "\n\n");
for ( j = 0; j < numTaskQs; j++) {
printf( "\t%8ld T", j);
for ( i = 0; i < numTaskQs; i++) {
printf( "%10ld", taskQs[j].statStolen[i]);
}
printf( "\n");
}
printf( "\n\n");
});
}
void taskQEnd( void) {
noMoreTasks = 1;
signalTasks();
#ifdef ENABLE_PTHREADS
ALAMERE_END();;
#endif //ENABLE_PTHREADS
}