blob: 5bf1173f4bfdfbfe0086968aa1d4ede595df3142 [file] [log] [blame]
#include "queue.h"
#include <pthread.h>
#include <stdlib.h>
void queue_init(struct queue * que, int size, int prod_threads) {
pthread_mutex_init(&que->mutex, NULL);
pthread_cond_init(&que->empty, NULL);
pthread_cond_init(&que->full, NULL);
que->head = que->tail = 0;
que->data = (void **)malloc(sizeof(void*) * size);
que->size = size;
que->prod_threads = prod_threads;
que->end_count = 0;
}
void queue_destroy(struct queue* que)
{
pthread_mutex_destroy(&que->mutex);
pthread_cond_destroy(&que->empty);
pthread_cond_destroy(&que->full);
free(que->data);
que->data = NULL;
}
void queue_signal_terminate(struct queue * que) {
pthread_mutex_lock(&que->mutex);
que->end_count++;
pthread_cond_broadcast(&que->empty);
pthread_mutex_unlock(&que->mutex);
}
int dequeue(struct queue * que, void **to_buf) {
pthread_mutex_lock(&que->mutex);
// chceck if queue is empty?
while (que->tail == que->head && (que->end_count) < que->prod_threads) {
pthread_cond_wait(&que->empty, &que->mutex);
}
// check if queue has been terminated?
if (que->tail == que->head && (que->end_count) == que->prod_threads) {
pthread_cond_broadcast(&que->empty);
pthread_mutex_unlock(&que->mutex);
return -1;
}
*to_buf = que->data[que->tail];
que->tail ++;
if (que->tail == que->size)
que->tail = 0;
pthread_cond_signal(&que->full);
pthread_mutex_unlock(&que->mutex);
return 0;
}
void enqueue(struct queue * que, void *from_buf) {
pthread_mutex_lock(&que->mutex);
while (que->head == (que->tail-1+que->size)%que->size)
pthread_cond_wait(&que->full, &que->mutex);
que->data[que->head] = from_buf;
que->head ++;
if (que->head == que->size)
que->head = 0;
pthread_cond_signal(&que->empty);
pthread_mutex_unlock(&que->mutex);
}