blob: 4ae69b575e30bb54972633b82e2f9ff7bbd96c24 [file] [log] [blame]
/*
* Decoder for dedup files
*
* Copyright 2010 Princeton University.
* All rights reserved.
*
* Originally written by Minlan Yu.
* Largely rewritten by Christian Bienia.
*/
/*
* The pipeline model for Encode is Fragment->FragmentRefine->Deduplicate->Compress->Reorder
* Each stage has basically three steps:
* 1. fetch a group of items from the queue
* 2. process the items
* 3. put them in the queue for the next stage
*/
#include <assert.h>
#include <strings.h>
#include <math.h>
#include <limits.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include "util.h"
#include "dedupdef.h"
#include "encoder.h"
#include "debug.h"
#include "hashtable.h"
#include "config.h"
#include "rabin.h"
#include "mbuffer.h"
#ifdef ENABLE_PTHREADS
#include "queue.h"
#include "binheap.h"
#include "tree.h"
#endif //ENABLE_PTHREADS
#ifdef ENABLE_GZIP_COMPRESSION
#include <zlib.h>
#endif //ENABLE_GZIP_COMPRESSION
#ifdef ENABLE_BZIP2_COMPRESSION
#include <bzlib.h>
#endif //ENABLE_BZIP2_COMPRESSION
#ifdef ENABLE_PTHREADS
#include <pthread.h>
#endif //ENABLE_PTHREADS
#ifdef ENABLE_PARSEC_HOOKS
#include <hooks.h>
#endif //ENABLE_PARSEC_HOOKS
/* for tcpip stack */
#include <sys/types.h>
#include <sys/socket.h>
#include <linux/in.h>
#ifdef ENABLE_PARSEC_UPTCPIP
#include <uptcp_socket.h>
#endif
#define PORT 0x2233
#define INITIAL_SEARCH_TREE_SIZE 4096
//The configuration block defined in main
config_t * conf;
//Hash table data structure & utility functions
struct hashtable *cache;
static unsigned int hash_from_key_fn( void *k ) {
//NOTE: sha1 sum is integer-aligned
return ((unsigned int *)k)[0];
}
static int keys_equal_fn ( void *key1, void *key2 ) {
return (memcmp(key1, key2, SHA1_LEN) == 0);
}
//Arguments to pass to each thread
struct thread_args {
//thread id, unique within a thread pool (i.e. unique for a pipeline stage)
int tid;
//number of queues available, first and last pipeline stage only
int nqueues;
//file descriptor, first pipeline stage only
int fd;
//input file buffer, first pipeline stage & preloading only
struct {
void *buffer;
size_t size;
} input_file;
};
#ifdef ENABLE_STATISTICS
//Keep track of block granularity with 2^CHUNK_GRANULARITY_POW resolution (for statistics)
#define CHUNK_GRANULARITY_POW (7)
//Number of blocks to distinguish, CHUNK_MAX_NUM * 2^CHUNK_GRANULARITY_POW is biggest block being recognized (for statistics)
#define CHUNK_MAX_NUM (8*32)
//Map a chunk size to a statistics array slot
#define CHUNK_SIZE_TO_SLOT(s) ( ((s)>>(CHUNK_GRANULARITY_POW)) >= (CHUNK_MAX_NUM) ? (CHUNK_MAX_NUM)-1 : ((s)>>(CHUNK_GRANULARITY_POW)) )
//Get the average size of a chunk from a statistics array slot
#define SLOT_TO_CHUNK_SIZE(s) ( (s)*(1<<(CHUNK_GRANULARITY_POW)) + (1<<((CHUNK_GRANULARITY_POW)-1)) )
//Deduplication statistics (only used if ENABLE_STATISTICS is defined)
typedef struct {
/* Cumulative sizes */
size_t total_input; //Total size of input in bytes
size_t total_dedup; //Total size of input without duplicate blocks (after global compression) in bytes
size_t total_compressed; //Total size of input stream after local compression in bytes
size_t total_output; //Total size of output in bytes (with overhead) in bytes
/* Size distribution & other properties */
unsigned int nChunks[CHUNK_MAX_NUM]; //Coarse-granular size distribution of data chunks
unsigned int nDuplicates; //Total number of duplicate blocks
} stats_t;
//Initialize a statistics record
static void init_stats(stats_t *s) {
int i;
assert(s!=NULL);
s->total_input = 0;
s->total_dedup = 0;
s->total_compressed = 0;
s->total_output = 0;
for(i=0; i<CHUNK_MAX_NUM; i++) {
s->nChunks[i] = 0;
}
s->nDuplicates = 0;
}
#ifdef ENABLE_PTHREADS
//The queues between the pipeline stages
queue_t *deduplicate_que, *refine_que, *reorder_que, *compress_que;
//Merge two statistics records: s1=s1+s2
static void merge_stats(stats_t *s1, stats_t *s2) {
int i;
assert(s1!=NULL);
assert(s2!=NULL);
s1->total_input += s2->total_input;
s1->total_dedup += s2->total_dedup;
s1->total_compressed += s2->total_compressed;
s1->total_output += s2->total_output;
for(i=0; i<CHUNK_MAX_NUM; i++) {
s1->nChunks[i] += s2->nChunks[i];
}
s1->nDuplicates += s2->nDuplicates;
}
#endif //ENABLE_PTHREADS
//Print statistics
static void print_stats(stats_t *s) {
const unsigned int unit_str_size = 7; //elements in unit_str array
const char *unit_str[] = {"Bytes", "KB", "MB", "GB", "TB", "PB", "EB"};
unsigned int unit_idx = 0;
size_t unit_div = 1;
assert(s!=NULL);
//determine most suitable unit to use
for(unit_idx=0; unit_idx<unit_str_size; unit_idx++) {
unsigned int unit_div_next = unit_div * 1024;
if(s->total_input / unit_div_next <= 0) break;
if(s->total_dedup / unit_div_next <= 0) break;
if(s->total_compressed / unit_div_next <= 0) break;
if(s->total_output / unit_div_next <= 0) break;
unit_div = unit_div_next;
}
printf("Total input size: %14.2f %s\n", (float)(s->total_input)/(float)(unit_div), unit_str[unit_idx]);
printf("Total output size: %14.2f %s\n", (float)(s->total_output)/(float)(unit_div), unit_str[unit_idx]);
printf("Effective compression factor: %14.2fx\n", (float)(s->total_input)/(float)(s->total_output));
printf("\n");
//Total number of chunks
unsigned int i;
unsigned int nTotalChunks=0;
for(i=0; i<CHUNK_MAX_NUM; i++) nTotalChunks+= s->nChunks[i];
//Average size of chunks
float mean_size = 0.0;
for(i=0; i<CHUNK_MAX_NUM; i++) mean_size += (float)(SLOT_TO_CHUNK_SIZE(i)) * (float)(s->nChunks[i]);
mean_size = mean_size / (float)nTotalChunks;
//Variance of chunk size
float var_size = 0.0;
for(i=0; i<CHUNK_MAX_NUM; i++) var_size += (mean_size - (float)(SLOT_TO_CHUNK_SIZE(i))) *
(mean_size - (float)(SLOT_TO_CHUNK_SIZE(i))) *
(float)(s->nChunks[i]);
printf("Mean data chunk size: %14.2f %s (stddev: %.2f %s)\n", mean_size / 1024.0, "KB", sqrtf(var_size) / 1024.0, "KB");
printf("Amount of duplicate chunks: %14.2f%%\n", 100.0*(float)(s->nDuplicates)/(float)(nTotalChunks));
printf("Data size after deduplication: %14.2f %s (compression factor: %.2fx)\n", (float)(s->total_dedup)/(float)(unit_div), unit_str[unit_idx], (float)(s->total_input)/(float)(s->total_dedup));
printf("Data size after compression: %14.2f %s (compression factor: %.2fx)\n", (float)(s->total_compressed)/(float)(unit_div), unit_str[unit_idx], (float)(s->total_dedup)/(float)(s->total_compressed));
printf("Output overhead: %14.2f%%\n", 100.0*(float)(s->total_output-s->total_compressed)/(float)(s->total_output));
}
//variable with global statistics
stats_t stats;
#endif //ENABLE_STATISTICS
int init_server_socket(int *accept_sd)
{
int sd, asd;
socklen_t addrlen;
struct sockaddr_in sin;
struct sockaddr_in pin;
/* get an internet domain socket */
#ifdef ENABLE_PARSEC_UPTCPIP
if ((sd = uptcp_socket(AF_INET, SOCK_STREAM, 0)) == -1) {
#else
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
#endif
EXIT_TRACE("Socket error: socket()\n");
}
/* complete the socket structure */
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(PORT);
/* bind the socket to the port number */
#ifdef ENABLE_PARSEC_UPTCPIP
if (uptcp_bind(sd, (struct sockaddr *) &sin, sizeof(sin)) == -1) {
uptcp_close(sd);
#else
if (bind(sd, (struct sockaddr *) &sin, sizeof(sin)) == -1) {
close(sd);
#endif
EXIT_TRACE("Socket error: bind()\n");
}
/* show that we are willing to listen */
#ifdef ENABLE_PARSEC_UPTCPIP
if (uptcp_listen(sd, 5) == -1) {
uptcp_close(sd);
#else
if (listen(sd, 5) == -1) {
close(sd);
#endif
EXIT_TRACE("Socket error: listen()\n");
}
/* wait for a client to talk to us */
addrlen = sizeof(pin);
#ifdef ENABLE_PARSEC_UPTCPIP
if ((asd = uptcp_accept(sd, (struct sockaddr *) &pin, &addrlen)) == -1) {
uptcp_close(sd);
#else
if ((asd = accept(sd, (struct sockaddr *) &pin, &addrlen)) == -1) {
close(sd);
#endif
EXIT_TRACE("Socket error: accept()\n");
}
*accept_sd = asd;
return sd;
}
//Simple write utility function
static int write_file(int fd, u_char type, u_long len, u_char * content) {
if (xwrite(fd, &type, sizeof(type)) < 0){
perror("xwrite:");
EXIT_TRACE("xwrite type fails\n");
return -1;
}
if (xwrite(fd, &len, sizeof(len)) < 0){
EXIT_TRACE("xwrite content fails\n");
}
if (xwrite(fd, content, len) < 0){
EXIT_TRACE("xwrite content fails\n");
}
return 0;
}
/*
* Helper function that creates and initializes the output file
* Takes the file name to use as input and returns the file handle
* The output file can be used to write chunks without any further steps
*/
static int create_output_file(char *outfile) {
int fd;
//Create output file
fd = open(outfile, O_CREAT|O_TRUNC|O_WRONLY|O_TRUNC, S_IRGRP | S_IWUSR | S_IRUSR | S_IROTH);
if (fd < 0) {
EXIT_TRACE("Cannot open output file.");
}
//Write header
if (write_header(fd, conf->compress_type)) {
EXIT_TRACE("Cannot write output file header.\n");
}
return fd;
}
/*
* Helper function that writes a chunk to an output file depending on
* its state. The function will write the SHA1 sum if the chunk has
* already been written before, or it will write the compressed data
* of the chunk if it has not been written yet.
*
* This function will block if the compressed data is not available yet.
* This function might update the state of the chunk if there are any changes.
*/
#ifdef ENABLE_PTHREADS
//NOTE: The parallel version checks the state of each chunk to make sure the
// relevant data is available. If it is not then the function waits.
static void write_chunk_to_file(int fd, chunk_t *chunk) {
assert(chunk!=NULL);
//Find original chunk
if(chunk->header.isDuplicate) chunk = chunk->compressed_data_ref;
pthread_mutex_lock(&chunk->header.lock);
while(chunk->header.state == CHUNK_STATE_UNCOMPRESSED) {
pthread_cond_wait(&chunk->header.update, &chunk->header.lock);
}
//state is now guaranteed to be either COMPRESSED or FLUSHED
if(chunk->header.state == CHUNK_STATE_COMPRESSED) {
//Chunk data has not been written yet, do so now
write_file(fd, TYPE_COMPRESS, chunk->compressed_data.n, chunk->compressed_data.ptr);
mbuffer_free(&chunk->compressed_data);
chunk->header.state = CHUNK_STATE_FLUSHED;
} else {
//Chunk data has been written to file before, just write SHA1
write_file(fd, TYPE_FINGERPRINT, SHA1_LEN, (unsigned char *)(chunk->sha1));
}
pthread_mutex_unlock(&chunk->header.lock);
}
#else
//NOTE: The serial version relies on the fact that chunks are processed in-order,
// which means if it reaches the function it is guaranteed all data is ready.
static void write_chunk_to_file(int fd, chunk_t *chunk) {
assert(chunk!=NULL);
if(!chunk->header.isDuplicate) {
//Unique chunk, data has not been written yet, do so now
write_file(fd, TYPE_COMPRESS, chunk->compressed_data.n, chunk->compressed_data.ptr);
mbuffer_free(&chunk->compressed_data);
} else {
//Duplicate chunk, data has been written to file before, just write SHA1
write_file(fd, TYPE_FINGERPRINT, SHA1_LEN, (unsigned char *)(chunk->sha1));
}
}
#endif //ENABLE_PTHREADS
int rf_win;
int rf_win_dataprocess;
/*
* Computational kernel of compression stage
*
* Actions performed:
* - Compress a data chunk
*/
void sub_Compress(chunk_t *chunk) {
size_t n;
int r;
assert(chunk!=NULL);
//compress the item and add it to the database
#ifdef ENABLE_PTHREADS
pthread_mutex_lock(&chunk->header.lock);
assert(chunk->header.state == CHUNK_STATE_UNCOMPRESSED);
#endif //ENABLE_PTHREADS
switch (conf->compress_type) {
case COMPRESS_NONE:
//Simply duplicate the data
n = chunk->uncompressed_data.n;
r = mbuffer_create(&chunk->compressed_data, n);
if(r != 0) {
EXIT_TRACE("Creation of compression buffer failed.\n");
}
//copy the block
memcpy(chunk->compressed_data.ptr, chunk->uncompressed_data.ptr, chunk->uncompressed_data.n);
break;
#ifdef ENABLE_GZIP_COMPRESSION
case COMPRESS_GZIP:
//Gzip compression buffer must be at least 0.1% larger than source buffer plus 12 bytes
n = chunk->uncompressed_data.n + (chunk->uncompressed_data.n >> 9) + 12;
r = mbuffer_create(&chunk->compressed_data, n);
if(r != 0) {
EXIT_TRACE("Creation of compression buffer failed.\n");
}
//compress the block
r = compress(chunk->compressed_data.ptr, &n, chunk->uncompressed_data.ptr, chunk->uncompressed_data.n);
if (r != Z_OK) {
EXIT_TRACE("Compression failed\n");
}
//Shrink buffer to actual size
if(n < chunk->compressed_data.n) {
r = mbuffer_realloc(&chunk->compressed_data, n);
assert(r == 0);
}
break;
#endif //ENABLE_GZIP_COMPRESSION
#ifdef ENABLE_BZIP2_COMPRESSION
case COMPRESS_BZIP2:
//Bzip compression buffer must be at least 1% larger than source buffer plus 600 bytes
n = chunk->uncompressed_data.n + (chunk->uncompressed_data.n >> 6) + 600;
r = mbuffer_create(&chunk->compressed_data, n);
if(r != 0) {
EXIT_TRACE("Creation of compression buffer failed.\n");
}
//compress the block
unsigned int int_n = n;
r = BZ2_bzBuffToBuffCompress(chunk->compressed_data.ptr, &int_n, chunk->uncompressed_data.ptr, chunk->uncompressed_data.n, 9, 0, 30);
n = int_n;
if (r != BZ_OK) {
EXIT_TRACE("Compression failed\n");
}
//Shrink buffer to actual size
if(n < chunk->compressed_data.n) {
r = mbuffer_realloc(&chunk->compressed_data, n);
assert(r == 0);
}
break;
#endif //ENABLE_BZIP2_COMPRESSION
default:
EXIT_TRACE("Compression type not implemented.\n");
break;
}
mbuffer_free(&chunk->uncompressed_data);
#ifdef ENABLE_PTHREADS
chunk->header.state = CHUNK_STATE_COMPRESSED;
pthread_cond_broadcast(&chunk->header.update);
pthread_mutex_unlock(&chunk->header.lock);
#endif //ENABLE_PTHREADS
return;
}
/*
* Pipeline stage function of compression stage
*
* Actions performed:
* - Dequeue items from compression queue
* - Execute compression kernel for each item
* - Enqueue each item into send queue
*/
#ifdef ENABLE_PTHREADS
void *Compress(void * targs) {
struct thread_args *args = (struct thread_args *)targs;
const int qid = args->tid / MAX_THREADS_PER_QUEUE;
chunk_t * chunk;
int r;
ringbuffer_t recv_buf, send_buf;
#ifdef ENABLE_STATISTICS
stats_t *thread_stats = malloc(sizeof(stats_t));
if(thread_stats == NULL) EXIT_TRACE("Memory allocation failed.\n");
init_stats(thread_stats);
#endif //ENABLE_STATISTICS
r=0;
r += ringbuffer_init(&recv_buf, ITEM_PER_FETCH);
r += ringbuffer_init(&send_buf, ITEM_PER_INSERT);
assert(r==0);
while(1) {
//get items from the queue
if (ringbuffer_isEmpty(&recv_buf)) {
r = queue_dequeue(&compress_que[qid], &recv_buf, ITEM_PER_FETCH);
if (r < 0) break;
}
//fetch one item
chunk = (chunk_t *)ringbuffer_remove(&recv_buf);
assert(chunk!=NULL);
sub_Compress(chunk);
#ifdef ENABLE_STATISTICS
thread_stats->total_compressed += chunk->compressed_data.n;
#endif //ENABLE_STATISTICS
r = ringbuffer_insert(&send_buf, chunk);
assert(r==0);
//put the item in the next queue for the write thread
if (ringbuffer_isFull(&send_buf)) {
r = queue_enqueue(&reorder_que[qid], &send_buf, ITEM_PER_INSERT);
assert(r>=1);
}
}
//Enqueue left over items
while (!ringbuffer_isEmpty(&send_buf)) {
r = queue_enqueue(&reorder_que[qid], &send_buf, ITEM_PER_INSERT);
assert(r>=1);
}
ringbuffer_destroy(&recv_buf);
ringbuffer_destroy(&send_buf);
//shutdown
queue_terminate(&reorder_que[qid]);
#ifdef ENABLE_STATISTICS
return thread_stats;
#else
return NULL;
#endif //ENABLE_STATISTICS
}
#endif //ENABLE_PTHREADS
/*
* Computational kernel of deduplication stage
*
* Actions performed:
* - Calculate SHA1 signature for each incoming data chunk
* - Perform database lookup to determine chunk redundancy status
* - On miss add chunk to database
* - Returns chunk redundancy status
*/
int sub_Deduplicate(chunk_t *chunk) {
int isDuplicate;
chunk_t *entry;
assert(chunk!=NULL);
assert(chunk->uncompressed_data.ptr!=NULL);
SHA1_Digest(chunk->uncompressed_data.ptr, chunk->uncompressed_data.n, (unsigned char *)(chunk->sha1));
//Query database to determine whether we've seen the data chunk before
#ifdef ENABLE_PTHREADS
pthread_mutex_t *ht_lock = hashtable_getlock(cache, (void *)(chunk->sha1));
pthread_mutex_lock(ht_lock);
#endif
entry = (chunk_t *)hashtable_search(cache, (void *)(chunk->sha1));
isDuplicate = (entry != NULL);
chunk->header.isDuplicate = isDuplicate;
if (!isDuplicate) {
// Cache miss: Create entry in hash table and forward data to compression stage
#ifdef ENABLE_PTHREADS
pthread_mutex_init(&chunk->header.lock, NULL);
pthread_cond_init(&chunk->header.update, NULL);
#endif
//NOTE: chunk->compressed_data.buffer will be computed in compression stage
if (hashtable_insert(cache, (void *)(chunk->sha1), (void *)chunk) == 0) {
EXIT_TRACE("hashtable_insert failed");
}
} else {
// Cache hit: Skipping compression stage
chunk->compressed_data_ref = entry;
mbuffer_free(&chunk->uncompressed_data);
}
#ifdef ENABLE_PTHREADS
pthread_mutex_unlock(ht_lock);
#endif
return isDuplicate;
}
/*
* Pipeline stage function of deduplication stage
*
* Actions performed:
* - Take input data from fragmentation stages
* - Execute deduplication kernel for each data chunk
* - Route resulting package either to compression stage or to reorder stage, depending on deduplication status
*/
#ifdef ENABLE_PTHREADS
void * Deduplicate(void * targs) {
struct thread_args *args = (struct thread_args *)targs;
const int qid = args->tid / MAX_THREADS_PER_QUEUE;
chunk_t *chunk;
int r;
ringbuffer_t recv_buf, send_buf_reorder, send_buf_compress;
#ifdef ENABLE_STATISTICS
stats_t *thread_stats = malloc(sizeof(stats_t));
if(thread_stats == NULL) {
EXIT_TRACE("Memory allocation failed.\n");
}
init_stats(thread_stats);
#endif //ENABLE_STATISTICS
r=0;
r += ringbuffer_init(&recv_buf, CHUNK_ANCHOR_PER_FETCH);
r += ringbuffer_init(&send_buf_reorder, ITEM_PER_INSERT);
r += ringbuffer_init(&send_buf_compress, ITEM_PER_INSERT);
assert(r==0);
while (1) {
//if no items available, fetch a group of items from the queue
if (ringbuffer_isEmpty(&recv_buf)) {
r = queue_dequeue(&deduplicate_que[qid], &recv_buf, CHUNK_ANCHOR_PER_FETCH);
if (r < 0) break;
}
//get one chunk
chunk = (chunk_t *)ringbuffer_remove(&recv_buf);
assert(chunk!=NULL);
//Do the processing
int isDuplicate = sub_Deduplicate(chunk);
#ifdef ENABLE_STATISTICS
if(isDuplicate) {
thread_stats->nDuplicates++;
} else {
thread_stats->total_dedup += chunk->uncompressed_data.n;
}
#endif //ENABLE_STATISTICS
//Enqueue chunk either into compression queue or into send queue
if(!isDuplicate) {
r = ringbuffer_insert(&send_buf_compress, chunk);
assert(r==0);
if (ringbuffer_isFull(&send_buf_compress)) {
r = queue_enqueue(&compress_que[qid], &send_buf_compress, ITEM_PER_INSERT);
assert(r>=1);
}
} else {
r = ringbuffer_insert(&send_buf_reorder, chunk);
assert(r==0);
if (ringbuffer_isFull(&send_buf_reorder)) {
r = queue_enqueue(&reorder_que[qid], &send_buf_reorder, ITEM_PER_INSERT);
assert(r>=1);
}
}
}
//empty buffers
while(!ringbuffer_isEmpty(&send_buf_compress)) {
r = queue_enqueue(&compress_que[qid], &send_buf_compress, ITEM_PER_INSERT);
assert(r>=1);
}
while(!ringbuffer_isEmpty(&send_buf_reorder)) {
r = queue_enqueue(&reorder_que[qid], &send_buf_reorder, ITEM_PER_INSERT);
assert(r>=1);
}
ringbuffer_destroy(&recv_buf);
ringbuffer_destroy(&send_buf_compress);
ringbuffer_destroy(&send_buf_reorder);
//shutdown
queue_terminate(&compress_que[qid]);
#ifdef ENABLE_STATISTICS
return thread_stats;
#else
return NULL;
#endif //ENABLE_STATISTICS
}
#endif //ENABLE_PTHREADS
/*
* Pipeline stage function and computational kernel of refinement stage
*
* Actions performed:
* - Take coarse chunks from fragmentation stage
* - Partition data block into smaller chunks with Rabin rolling fingerprints
* - Send resulting data chunks to deduplication stage
*
* Notes:
* - Allocates mbuffers for fine-granular chunks
*/
#ifdef ENABLE_PTHREADS
void *FragmentRefine(void * targs) {
struct thread_args *args = (struct thread_args *)targs;
const int qid = args->tid / MAX_THREADS_PER_QUEUE;
ringbuffer_t recv_buf, send_buf;
int r;
chunk_t *temp;
chunk_t *chunk;
u32int * rabintab = malloc(256*sizeof rabintab[0]);
u32int * rabinwintab = malloc(256*sizeof rabintab[0]);
if(rabintab == NULL || rabinwintab == NULL) {
EXIT_TRACE("Memory allocation failed.\n");
}
r=0;
r += ringbuffer_init(&recv_buf, MAX_PER_FETCH);
r += ringbuffer_init(&send_buf, CHUNK_ANCHOR_PER_INSERT);
assert(r==0);
#ifdef ENABLE_STATISTICS
stats_t *thread_stats = malloc(sizeof(stats_t));
if(thread_stats == NULL) {
EXIT_TRACE("Memory allocation failed.\n");
}
init_stats(thread_stats);
#endif //ENABLE_STATISTICS
while (TRUE) {
//if no item for process, get a group of items from the pipeline
if (ringbuffer_isEmpty(&recv_buf)) {
r = queue_dequeue(&refine_que[qid], &recv_buf, MAX_PER_FETCH);
if (r < 0) {
break;
}
}
//get one item
chunk = (chunk_t *)ringbuffer_remove(&recv_buf);
assert(chunk!=NULL);
rabininit(rf_win, rabintab, rabinwintab);
int split;
sequence_number_t chcount = 0;
do {
//Find next anchor with Rabin fingerprint
int offset = rabinseg(chunk->uncompressed_data.ptr, chunk->uncompressed_data.n, rf_win, rabintab, rabinwintab);
//Can we split the buffer?
if(offset < chunk->uncompressed_data.n) {
//Allocate a new chunk and create a new memory buffer
temp = (chunk_t *)malloc(sizeof(chunk_t));
if(temp==NULL) EXIT_TRACE("Memory allocation failed.\n");
temp->header.state = chunk->header.state;
temp->sequence.l1num = chunk->sequence.l1num;
//split it into two pieces
r = mbuffer_split(&chunk->uncompressed_data, &temp->uncompressed_data, offset);
if(r!=0) EXIT_TRACE("Unable to split memory buffer.\n");
//Set correct state and sequence numbers
chunk->sequence.l2num = chcount;
chunk->isLastL2Chunk = FALSE;
chcount++;
#ifdef ENABLE_STATISTICS
//update statistics
thread_stats->nChunks[CHUNK_SIZE_TO_SLOT(chunk->uncompressed_data.n)]++;
#endif //ENABLE_STATISTICS
//put it into send buffer
r = ringbuffer_insert(&send_buf, chunk);
assert(r==0);
if (ringbuffer_isFull(&send_buf)) {
r = queue_enqueue(&deduplicate_que[qid], &send_buf, CHUNK_ANCHOR_PER_INSERT);
assert(r>=1);
}
//prepare for next iteration
chunk = temp;
split = 1;
} else {
//End of buffer reached, don't split but simply enqueue it
//Set correct state and sequence numbers
chunk->sequence.l2num = chcount;
chunk->isLastL2Chunk = TRUE;
chcount++;
#ifdef ENABLE_STATISTICS
//update statistics
thread_stats->nChunks[CHUNK_SIZE_TO_SLOT(chunk->uncompressed_data.n)]++;
#endif //ENABLE_STATISTICS
//put it into send buffer
r = ringbuffer_insert(&send_buf, chunk);
assert(r==0);
if (ringbuffer_isFull(&send_buf)) {
r = queue_enqueue(&deduplicate_que[qid], &send_buf, CHUNK_ANCHOR_PER_INSERT);
assert(r>=1);
}
//prepare for next iteration
chunk = NULL;
split = 0;
}
} while(split);
}
//drain buffer
while(!ringbuffer_isEmpty(&send_buf)) {
r = queue_enqueue(&deduplicate_que[qid], &send_buf, CHUNK_ANCHOR_PER_INSERT);
assert(r>=1);
}
free(rabintab);
free(rabinwintab);
ringbuffer_destroy(&recv_buf);
ringbuffer_destroy(&send_buf);
//shutdown
queue_terminate(&deduplicate_que[qid]);
#ifdef ENABLE_STATISTICS
return thread_stats;
#else
return NULL;
#endif //ENABLE_STATISTICS
}
#endif //ENABLE_PTHREADS
/*
* Integrate all computationally intensive pipeline
* stages to improve cache efficiency.
*/
void *SerialIntegratedPipeline(void * targs) {
struct thread_args *args = (struct thread_args *)targs;
int fd = args->fd;
int fd_out = create_output_file(conf->outfile);
int r;
chunk_t *temp = NULL;
chunk_t *chunk = NULL;
u32int * rabintab = malloc(256*sizeof rabintab[0]);
u32int * rabinwintab = malloc(256*sizeof rabintab[0]);
if(rabintab == NULL || rabinwintab == NULL) {
EXIT_TRACE("Memory allocation failed.\n");
}
rf_win_dataprocess = 0;
rabininit(rf_win_dataprocess, rabintab, rabinwintab);
//Sanity check
if(MAXBUF < 8 * ANCHOR_JUMP) {
printf("WARNING: I/O buffer size is very small. Performance degraded.\n");
fflush(NULL);
}
//read from input file / buffer
while (1) {
size_t bytes_left; //amount of data left over in last_mbuffer from previous iteration
//Check how much data left over from previous iteration resp. create an initial chunk
if(temp != NULL) {
bytes_left = temp->uncompressed_data.n;
} else {
bytes_left = 0;
}
//Make sure that system supports new buffer size
if(MAXBUF+bytes_left > SSIZE_MAX) {
EXIT_TRACE("Input buffer size exceeds system maximum.\n");
}
//Allocate a new chunk and create a new memory buffer
chunk = (chunk_t *)malloc(sizeof(chunk_t));
if(chunk==NULL) EXIT_TRACE("Memory allocation failed.\n");
r = mbuffer_create(&chunk->uncompressed_data, MAXBUF+bytes_left);
if(r!=0) {
EXIT_TRACE("Unable to initialize memory buffer.\n");
}
chunk->header.state = CHUNK_STATE_UNCOMPRESSED;
if(bytes_left > 0) {
//FIXME: Short-circuit this if no more data available
//"Extension" of existing buffer, copy sequence number and left over data to beginning of new buffer
//NOTE: We cannot safely extend the current memory region because it has already been given to another thread
memcpy(chunk->uncompressed_data.ptr, temp->uncompressed_data.ptr, temp->uncompressed_data.n);
mbuffer_free(&temp->uncompressed_data);
free(temp);
temp = NULL;
}
//Read data until buffer full
size_t bytes_read=0;
while(bytes_read < MAXBUF) {
#ifdef ENABLE_PARSEC_UPTCPIP
r = uptcp_recv(fd, chunk->uncompressed_data.ptr+bytes_left+bytes_read, MAXBUF-bytes_read, 0);
#else
r = recv(fd, chunk->uncompressed_data.ptr+bytes_left+bytes_read, MAXBUF-bytes_read, 0);
#endif
if(r<0) switch(errno) {
case EAGAIN:
EXIT_TRACE("I/O error: No data available\n");break;
case EBADF:
EXIT_TRACE("I/O error: Invalid file descriptor\n");break;
case EFAULT:
EXIT_TRACE("I/O error: Buffer out of range\n");break;
case EINTR:
EXIT_TRACE("I/O error: Interruption\n");break;
case EINVAL:
EXIT_TRACE("I/O error: Unable to read from file descriptor\n");break;
case EIO:
EXIT_TRACE("I/O error: Generic I/O error\n");break;
case EISDIR:
EXIT_TRACE("I/O error: Cannot read from a directory\n");break;
default:
EXIT_TRACE("I/O error: Unrecognized error\n");break;
}
if(r==0) break;
bytes_read += r;
}
//No data left over from last iteration and also nothing new read in, simply clean up and quit
if(bytes_left + bytes_read == 0) {
mbuffer_free(&chunk->uncompressed_data);
free(chunk);
chunk = NULL;
break;
}
//Shrink buffer to actual size
if(bytes_left+bytes_read < chunk->uncompressed_data.n) {
r = mbuffer_realloc(&chunk->uncompressed_data, bytes_left+bytes_read);
assert(r == 0);
}
//Check whether any new data was read in, process last chunk if not
if(bytes_read == 0) {
#ifdef ENABLE_STATISTICS
//update statistics
stats.nChunks[CHUNK_SIZE_TO_SLOT(chunk->uncompressed_data.n)]++;
#endif //ENABLE_STATISTICS
//Deduplicate
int isDuplicate = sub_Deduplicate(chunk);
#ifdef ENABLE_STATISTICS
if(isDuplicate) {
stats.nDuplicates++;
} else {
stats.total_dedup += chunk->uncompressed_data.n;
}
#endif //ENABLE_STATISTICS
//If chunk is unique compress & archive it.
if(!isDuplicate) {
sub_Compress(chunk);
#ifdef ENABLE_STATISTICS
stats.total_compressed += chunk->compressed_data.n;
#endif //ENABLE_STATISTICS
}
write_chunk_to_file(fd_out, chunk);
if(chunk->header.isDuplicate) {
free(chunk);
chunk=NULL;
}
//stop fetching from input buffer, terminate processing
break;
}
//partition input block into fine-granular chunks
int split;
do {
split = 0;
//Try to split the buffer
int offset = rabinseg(chunk->uncompressed_data.ptr, chunk->uncompressed_data.n, rf_win_dataprocess, rabintab, rabinwintab);
//Did we find a split location?
if(offset == 0) {
//Split found at the very beginning of the buffer (should never happen due to technical limitations)
assert(0);
split = 0;
} else if(offset < chunk->uncompressed_data.n) {
//Split found somewhere in the middle of the buffer
//Allocate a new chunk and create a new memory buffer
temp = (chunk_t *)malloc(sizeof(chunk_t));
if(temp==NULL) EXIT_TRACE("Memory allocation failed.\n");
//split it into two pieces
r = mbuffer_split(&chunk->uncompressed_data, &temp->uncompressed_data, offset);
if(r!=0) EXIT_TRACE("Unable to split memory buffer.\n");
temp->header.state = CHUNK_STATE_UNCOMPRESSED;
#ifdef ENABLE_STATISTICS
//update statistics
stats.nChunks[CHUNK_SIZE_TO_SLOT(chunk->uncompressed_data.n)]++;
#endif //ENABLE_STATISTICS
//Deduplicate
int isDuplicate = sub_Deduplicate(chunk);
#ifdef ENABLE_STATISTICS
if(isDuplicate) {
stats.nDuplicates++;
} else {
stats.total_dedup += chunk->uncompressed_data.n;
}
#endif //ENABLE_STATISTICS
//If chunk is unique compress & archive it.
if(!isDuplicate) {
sub_Compress(chunk);
#ifdef ENABLE_STATISTICS
stats.total_compressed += chunk->compressed_data.n;
#endif //ENABLE_STATISTICS
}
write_chunk_to_file(fd_out, chunk);
if(chunk->header.isDuplicate){
free(chunk);
chunk=NULL;
}
//prepare for next iteration
chunk = temp;
temp = NULL;
split = 1;
} else {
//Due to technical limitations we can't distinguish the cases "no split" and "split at end of buffer"
//This will result in some unnecessary (and unlikely) work but yields the correct result eventually.
temp = chunk;
chunk = NULL;
split = 0;
}
} while(split);
}
free(rabintab);
free(rabinwintab);
close(fd_out);
return NULL;
}
/*
* Pipeline stage function of fragmentation stage
*
* Actions performed:
* - Read data from file (or preloading buffer)
* - Perform coarse-grained chunking
* - Send coarse chunks to refinement stages for further processing
*
* Notes:
* This pipeline stage is a bottleneck because it is inherently serial. We
* therefore perform only coarse chunking and pass on the data block as fast
* as possible so that there are no delays that might decrease scalability.
* With very large numbers of threads this stage will not be able to keep up
* which will eventually limit scalability. A solution to this is to increase
* the size of coarse-grained chunks with a comparable increase in total
* input size.
*/
#ifdef ENABLE_PTHREADS
void *Fragment(void * targs){
struct thread_args *args = (struct thread_args *)targs;
int qid = 0;
int fd = args->fd;
int i;
size_t bytes_input = args->input_file.size;
ringbuffer_t send_buf;
sequence_number_t anchorcount = 0;
int r;
chunk_t *temp = NULL;
chunk_t *chunk = NULL;
u32int * rabintab = malloc(256*sizeof rabintab[0]);
u32int * rabinwintab = malloc(256*sizeof rabintab[0]);
if(rabintab == NULL || rabinwintab == NULL) {
EXIT_TRACE("Memory allocation failed.\n");
}
r = ringbuffer_init(&send_buf, ANCHOR_DATA_PER_INSERT);
assert(r==0);
rf_win_dataprocess = 0;
rabininit(rf_win_dataprocess, rabintab, rabinwintab);
//Sanity check
if(MAXBUF < 8 * ANCHOR_JUMP) {
printf("WARNING: I/O buffer size is very small. Performance degraded.\n");
fflush(NULL);
}
//read from input file / buffer
size_t total_bytes_left = bytes_input;
while (1) {
size_t bytes_left; //amount of data left over in last_mbuffer from previous iteration
//Check how much data left over from previous iteration resp. create an initial chunk
if(temp != NULL) {
bytes_left = temp->uncompressed_data.n;
} else {
bytes_left = 0;
}
//Make sure that system supports new buffer size
if(MAXBUF+bytes_left > SSIZE_MAX) {
EXIT_TRACE("Input buffer size exceeds system maximum.\n");
}
//Allocate a new chunk and create a new memory buffer
chunk = (chunk_t *)malloc(sizeof(chunk_t));
if(chunk==NULL) EXIT_TRACE("Memory allocation failed.\n");
r = mbuffer_create(&chunk->uncompressed_data, MAXBUF+bytes_left);
if(r!=0) {
EXIT_TRACE("Unable to initialize memory buffer.\n");
}
if(bytes_left > 0) {
//FIXME: Short-circuit this if no more data available
//"Extension" of existing buffer, copy sequence number and left over data to beginning of new buffer
chunk->header.state = CHUNK_STATE_UNCOMPRESSED;
chunk->sequence.l1num = temp->sequence.l1num;
//NOTE: We cannot safely extend the current memory region because it has already been given to another thread
memcpy(chunk->uncompressed_data.ptr, temp->uncompressed_data.ptr, temp->uncompressed_data.n);
mbuffer_free(&temp->uncompressed_data);
free(temp);
temp = NULL;
} else {
//brand new mbuffer, increment sequence number
chunk->header.state = CHUNK_STATE_UNCOMPRESSED;
chunk->sequence.l1num = anchorcount;
anchorcount++;
}
//Read data until buffer full
size_t bytes_read=0;
while(total_bytes_left > 0 && bytes_read < MAXBUF) {
#ifdef ENABLE_PARSEC_UPTCPIP
r = uptcp_recv(fd, chunk->uncompressed_data.ptr+bytes_left+bytes_read, MAXBUF-bytes_read, 0);
#else
r = recv(fd, chunk->uncompressed_data.ptr+bytes_left+bytes_read, MAXBUF-bytes_read, 0);
#endif
if(r<0) switch(errno) {
case EAGAIN:
EXIT_TRACE("I/O error: No data available\n");break;
case EBADF:
EXIT_TRACE("I/O error: Invalid file descriptor\n");break;
case EFAULT:
EXIT_TRACE("I/O error: Buffer out of range\n");break;
case EINTR:
EXIT_TRACE("I/O error: Interruption\n");break;
case EINVAL:
EXIT_TRACE("I/O error: Unable to read from file descriptor\n");break;
case EIO:
EXIT_TRACE("I/O error: Generic I/O error\n");break;
case EISDIR:
EXIT_TRACE("I/O error: Cannot read from a directory\n");break;
default:
EXIT_TRACE("I/O error: Unrecognized error\n");break;
}
if(r==0) break;
bytes_read += r;
total_bytes_left -= r;
}
//No data left over from last iteration and also nothing new read in, simply clean up and quit
if(bytes_left + bytes_read == 0) {
mbuffer_free(&chunk->uncompressed_data);
free(chunk);
chunk = NULL;
break;
}
//Shrink buffer to actual size
if(bytes_left+bytes_read < chunk->uncompressed_data.n) {
r = mbuffer_realloc(&chunk->uncompressed_data, bytes_left+bytes_read);
assert(r == 0);
}
//Check whether any new data was read in, enqueue last chunk if not
if(bytes_read == 0) {
//put it into send buffer
r = ringbuffer_insert(&send_buf, chunk);
assert(r==0);
//NOTE: No need to empty a full send_buf, we will break now and pass everything on to the queue
break;
}
//partition input block into large, coarse-granular chunks
int split;
do {
split = 0;
//Try to split the buffer at least ANCHOR_JUMP bytes away from its beginning
if(ANCHOR_JUMP < chunk->uncompressed_data.n) {
int offset = rabinseg(chunk->uncompressed_data.ptr + ANCHOR_JUMP, chunk->uncompressed_data.n - ANCHOR_JUMP, rf_win_dataprocess, rabintab, rabinwintab);
//Did we find a split location?
if(offset == 0) {
//Split found at the very beginning of the buffer (should never happen due to technical limitations)
assert(0);
split = 0;
} else if(offset + ANCHOR_JUMP < chunk->uncompressed_data.n) {
//Split found somewhere in the middle of the buffer
//Allocate a new chunk and create a new memory buffer
temp = (chunk_t *)malloc(sizeof(chunk_t));
if(temp==NULL) EXIT_TRACE("Memory allocation failed.\n");
//split it into two pieces
r = mbuffer_split(&chunk->uncompressed_data, &temp->uncompressed_data, offset + ANCHOR_JUMP);
if(r!=0) EXIT_TRACE("Unable to split memory buffer.\n");
temp->header.state = CHUNK_STATE_UNCOMPRESSED;
temp->sequence.l1num = anchorcount;
anchorcount++;
printf("anchorcount = %d, offset = %d\n", anchorcount, offset);
//put it into send buffer
r = ringbuffer_insert(&send_buf, chunk);
assert(r==0);
//send a group of items into the next queue in round-robin fashion
if(ringbuffer_isFull(&send_buf)) {
r = queue_enqueue(&refine_que[qid], &send_buf, ANCHOR_DATA_PER_INSERT);
assert(r>=1);
qid = (qid+1) % args->nqueues;
}
//prepare for next iteration
chunk = temp;
temp = NULL;
split = 1;
} else {
//Due to technical limitations we can't distinguish the cases "no split" and "split at end of buffer"
//This will result in some unnecessary (and unlikely) work but yields the correct result eventually.
temp = chunk;
chunk = NULL;
split = 0;
}
} else {
//NOTE: We don't process the stub, instead we try to read in more data so we might be able to find a proper split.
// Only once the end of the file is reached do we get a genuine stub which will be enqueued right after the read operation.
temp = chunk;
chunk = NULL;
split = 0;
}
printf("anchorcount = %d\n", anchorcount);
} while(split);
}
//drain buffer
while(!ringbuffer_isEmpty(&send_buf)) {
r = queue_enqueue(&refine_que[qid], &send_buf, ANCHOR_DATA_PER_INSERT);
assert(r>=1);
qid = (qid+1) % args->nqueues;
}
free(rabintab);
free(rabinwintab);
ringbuffer_destroy(&send_buf);
//shutdown
for(i=0; i<args->nqueues; i++) {
queue_terminate(&refine_que[i]);
}
return NULL;
}
#endif //ENABLE_PTHREADS
/*
* Pipeline stage function of reorder stage
*
* Actions performed:
* - Receive chunks from compression and deduplication stage
* - Check sequence number of each chunk to determine correct order
* - Cache chunks that arrive out-of-order until predecessors are available
* - Write chunks in-order to file (or preloading buffer)
*
* Notes:
* - This function blocks if the compression stage has not finished supplying
* the compressed data for a duplicate chunk.
*/
#ifdef ENABLE_PTHREADS
void *Reorder(void * targs) {
struct thread_args *args = (struct thread_args *)targs;
int qid = 0;
int fd = 0;
ringbuffer_t recv_buf;
chunk_t *chunk;
SearchTree T;
T = TreeMakeEmpty(NULL);
Position pos = NULL;
struct tree_element tele;
sequence_t next;
sequence_reset(&next);
//We perform global anchoring in the first stage and refine the anchoring
//in the second stage. This array keeps track of the number of chunks in
//a coarse chunk.
sequence_number_t *chunks_per_anchor;
unsigned int chunks_per_anchor_max = 1024;
chunks_per_anchor = malloc(chunks_per_anchor_max * sizeof(sequence_number_t));
if(chunks_per_anchor == NULL) EXIT_TRACE("Error allocating memory\n");
memset(chunks_per_anchor, 0, chunks_per_anchor_max * sizeof(sequence_number_t));
int r;
int i;
r = ringbuffer_init(&recv_buf, ITEM_PER_FETCH);
assert(r==0);
fd = create_output_file(conf->outfile);
while(1) {
//get a group of items
if (ringbuffer_isEmpty(&recv_buf)) {
//process queues in round-robin fashion
for(i=0,r=0; r<=0 && i<args->nqueues; i++) {
r = queue_dequeue(&reorder_que[qid], &recv_buf, ITEM_PER_FETCH);
qid = (qid+1) % args->nqueues;
}
if(r<0) break;
}
chunk = (chunk_t *)ringbuffer_remove(&recv_buf);
if (chunk == NULL) break;
//Double size of sequence number array if necessary
if(chunk->sequence.l1num >= chunks_per_anchor_max) {
chunks_per_anchor = realloc(chunks_per_anchor, 2 * chunks_per_anchor_max * sizeof(sequence_number_t));
if(chunks_per_anchor == NULL) EXIT_TRACE("Error allocating memory\n");
memset(&chunks_per_anchor[chunks_per_anchor_max], 0, chunks_per_anchor_max * sizeof(sequence_number_t));
chunks_per_anchor_max *= 2;
}
//Update expected L2 sequence number
if(chunk->isLastL2Chunk) {
assert(chunks_per_anchor[chunk->sequence.l1num] == 0);
chunks_per_anchor[chunk->sequence.l1num] = chunk->sequence.l2num+1;
}
//Put chunk into local cache if it's not next in the sequence
if(!sequence_eq(chunk->sequence, next)) {
pos = TreeFind(chunk->sequence.l1num, T);
if (pos == NULL) {
//FIXME: Can we remove at least one of the two mallocs in this if-clause?
//FIXME: Rename "INITIAL_SEARCH_TREE_SIZE" to something more accurate
tele.l1num = chunk->sequence.l1num;
tele.queue = Initialize(INITIAL_SEARCH_TREE_SIZE);
Insert(chunk, tele.queue);
T = TreeInsert(tele, T);
} else {
Insert(chunk, pos->Element.queue);
}
continue;
}
//write as many chunks as possible, current chunk is next in sequence
pos = TreeFindMin(T);
do {
write_chunk_to_file(fd, chunk);
if(chunk->header.isDuplicate) {
free(chunk);
chunk=NULL;
}
sequence_inc_l2(&next);
if(chunks_per_anchor[next.l1num]!=0 && next.l2num==chunks_per_anchor[next.l1num]) sequence_inc_l1(&next);
//Check whether we can write more chunks from cache
if(pos != NULL && (pos->Element.l1num == next.l1num)) {
chunk = FindMin(pos->Element.queue);
if(sequence_eq(chunk->sequence, next)) {
//Remove chunk from cache, update position for next iteration
DeleteMin(pos->Element.queue);
if(IsEmpty(pos->Element.queue)) {
Destroy(pos->Element.queue);
T = TreeDelete(pos->Element, T);
pos = TreeFindMin(T);
}
} else {
//level 2 sequence number does not match
chunk = NULL;
}
} else {
//level 1 sequence number does not match or no chunks left in cache
chunk = NULL;
}
} while(chunk != NULL);
}
//flush the blocks left in the cache to file
pos = TreeFindMin(T);
while(pos !=NULL) {
if(pos->Element.l1num == next.l1num) {
chunk = FindMin(pos->Element.queue);
if(sequence_eq(chunk->sequence, next)) {
//Remove chunk from cache, update position for next iteration
DeleteMin(pos->Element.queue);
if(IsEmpty(pos->Element.queue)) {
Destroy(pos->Element.queue);
T = TreeDelete(pos->Element, T);
pos = TreeFindMin(T);
}
} else {
//level 2 sequence number does not match
EXIT_TRACE("L2 sequence number mismatch.\n");
}
} else {
//level 1 sequence number does not match
EXIT_TRACE("L1 sequence number mismatch.\n");
}
write_chunk_to_file(fd, chunk);
if(chunk->header.isDuplicate) {
free(chunk);
chunk=NULL;
}
sequence_inc_l2(&next);
if(chunks_per_anchor[next.l1num]!=0 && next.l2num==chunks_per_anchor[next.l1num]) sequence_inc_l1(&next);
}
close(fd);
ringbuffer_destroy(&recv_buf);
free(chunks_per_anchor);
return NULL;
}
#endif //ENABLE_PTHREADS
/*--------------------------------------------------------------------------*/
/* Encode
* Compress an input stream
*
* Arguments:
* conf: Configuration parameters
*
*/
void Encode(config_t * _conf) {
struct stat filestat;
int listen_fd, fd;
size_t bytes_input;
conf = _conf;
#ifdef ENABLE_STATISTICS
init_stats(&stats);
#endif
//Create chunk cache
cache = hashtable_create(65536, hash_from_key_fn, keys_equal_fn, FALSE);
if(cache == NULL) {
printf("ERROR: Out of memory\n");
exit(1);
}
#ifdef ENABLE_PTHREADS
struct thread_args data_process_args;
int i;
//queue allocation & initialization
const int nqueues = (conf->nthreads / MAX_THREADS_PER_QUEUE) +
((conf->nthreads % MAX_THREADS_PER_QUEUE != 0) ? 1 : 0);
deduplicate_que = malloc(sizeof(queue_t) * nqueues);
refine_que = malloc(sizeof(queue_t) * nqueues);
reorder_que = malloc(sizeof(queue_t) * nqueues);
compress_que = malloc(sizeof(queue_t) * nqueues);
if( (deduplicate_que == NULL) || (refine_que == NULL) || (reorder_que == NULL) || (compress_que == NULL)) {
printf("Out of memory\n");
exit(1);
}
int threads_per_queue;
for(i=0; i<nqueues; i++) {
if (i < nqueues -1 || conf->nthreads %MAX_THREADS_PER_QUEUE == 0) {
//all but last queue
threads_per_queue = MAX_THREADS_PER_QUEUE;
} else {
//remaining threads work on last queue
threads_per_queue = conf->nthreads %MAX_THREADS_PER_QUEUE;
}
//call queue_init with threads_per_queue
queue_init(&deduplicate_que[i], QUEUE_SIZE, threads_per_queue);
queue_init(&refine_que[i], QUEUE_SIZE, 1);
queue_init(&reorder_que[i], QUEUE_SIZE, threads_per_queue);
queue_init(&compress_que[i], QUEUE_SIZE, threads_per_queue);
}
#else
struct thread_args generic_args;
#endif //ENABLE_PTHREADS
assert(!mbuffer_system_init());
/* initialize socket */
listen_fd = init_server_socket(&fd);
printf("[netdedup.server]: accept client\n");
#ifdef ENABLE_PARSEC_UPTCPIP
if (uptcp_recv(fd, &bytes_input, sizeof(size_t), 0) < 0) {
#else
if (recv(fd, &bytes_input, sizeof(size_t), 0) < 0) {
#endif
EXIT_TRACE("recv() size failed: %s\n", strerror(errno));
}
#ifdef ENABLE_STATISTICS
stats.total_input = bytes_input;
printf("[netdedup.server]: data transfer size = %dB\n", (int)stats.total_input);
#endif //ENABLE_STATISTICS
#ifdef ENABLE_PTHREADS
/* Variables for 3 thread pools and 2 pipeline stage threads.
* The first and the last stage are serial (mostly I/O).
*/
pthread_t threads_anchor[MAX_THREADS],
threads_chunk[MAX_THREADS],
threads_compress[MAX_THREADS],
threads_send, threads_process;
data_process_args.tid = 0;
data_process_args.nqueues = nqueues;
data_process_args.fd = fd;
data_process_args.input_file.size = bytes_input;
#ifdef ENABLE_PARSEC_HOOKS
__parsec_roi_begin();
#endif
#ifdef ENABLE_PARSEC_UPTCPIP
parsec_enter_tcpip_roi();
#endif
//thread for first pipeline stage (input)
#ifdef ENABLE_PARSEC_UPTCPIP
uptcp_pthread_create(&threads_process, NULL, Fragment, &data_process_args);
#else
pthread_create(&threads_process, NULL, Fragment, &data_process_args);
#endif
//Create 3 thread pools for the intermediate pipeline stages
struct thread_args anchor_thread_args[conf->nthreads];
for (i = 0; i < conf->nthreads; i ++) {
anchor_thread_args[i].tid = i;
#ifdef ENABLE_PARSEC_UPTCPIP
uptcp_pthread_create(&threads_anchor[i], NULL, FragmentRefine, &anchor_thread_args[i]);
#else
pthread_create(&threads_anchor[i], NULL, FragmentRefine, &anchor_thread_args[i]);
#endif
}
struct thread_args chunk_thread_args[conf->nthreads];
for (i = 0; i < conf->nthreads; i ++) {
chunk_thread_args[i].tid = i;
#ifdef ENABLE_PARSEC_UPTCPIP
uptcp_pthread_create(&threads_chunk[i], NULL, Deduplicate, &chunk_thread_args[i]);
#else
pthread_create(&threads_chunk[i], NULL, Deduplicate, &chunk_thread_args[i]);
#endif
}
struct thread_args compress_thread_args[conf->nthreads];
for (i = 0; i < conf->nthreads; i ++) {
compress_thread_args[i].tid = i;
#ifdef ENABLE_PARSEC_UPTCPIP
uptcp_pthread_create(&threads_compress[i], NULL, Compress, &compress_thread_args[i]);
#else
pthread_create(&threads_compress[i], NULL, Compress, &compress_thread_args[i]);
#endif
}
//thread for last pipeline stage (output)
struct thread_args send_block_args;
send_block_args.tid = 0;
send_block_args.nqueues = nqueues;
#ifdef ENABLE_PARSEC_UPTCPIP
uptcp_pthread_create(&threads_send, NULL, Reorder, &send_block_args);
#else
pthread_create(&threads_send, NULL, Reorder, &send_block_args);
#endif
/*** parallel phase ***/
//Return values of threads
stats_t *threads_anchor_rv[conf->nthreads];
stats_t *threads_chunk_rv[conf->nthreads];
stats_t *threads_compress_rv[conf->nthreads];
//join all threads
pthread_join(threads_process, NULL);
#ifdef ENABLE_PARSEC_UPTCPIP
parsec_exit_tcpip_roi();
#endif
for (i = 0; i < conf->nthreads; i ++)
pthread_join(threads_anchor[i], (void **)&threads_anchor_rv[i]);
for (i = 0; i < conf->nthreads; i ++)
pthread_join(threads_chunk[i], (void **)&threads_chunk_rv[i]);
for (i = 0; i < conf->nthreads; i ++)
pthread_join(threads_compress[i], (void **)&threads_compress_rv[i]);
pthread_join(threads_send, NULL);
#ifdef ENABLE_PARSEC_HOOKS
__parsec_roi_end();
#endif
/* free queues */
for(i=0; i<nqueues; i++) {
queue_destroy(&deduplicate_que[i]);
queue_destroy(&refine_que[i]);
queue_destroy(&reorder_que[i]);
queue_destroy(&compress_que[i]);
}
free(deduplicate_que);
free(refine_que);
free(reorder_que);
free(compress_que);
#ifdef ENABLE_STATISTICS
//Merge everything into global `stats' structure
for(i=0; i<conf->nthreads; i++) {
merge_stats(&stats, threads_anchor_rv[i]);
free(threads_anchor_rv[i]);
}
for(i=0; i<conf->nthreads; i++) {
merge_stats(&stats, threads_chunk_rv[i]);
free(threads_chunk_rv[i]);
}
for(i=0; i<conf->nthreads; i++) {
merge_stats(&stats, threads_compress_rv[i]);
free(threads_compress_rv[i]);
}
#endif //ENABLE_STATISTICS
#else //serial version
generic_args.tid = 0;
generic_args.nqueues = -1;
generic_args.fd = fd;
#ifdef ENABLE_PARSEC_HOOKS
__parsec_roi_begin();
#endif
#ifdef ENABLE_PARSEC_UPTCPIP
parsec_enter_tcpip_roi();
#endif
//Do the processing
SerialIntegratedPipeline(&generic_args);
#ifdef ENABLE_PARSEC_UPTCPIP
parsec_exit_tcpip_roi();
#endif
#ifdef ENABLE_PARSEC_HOOKS
__parsec_roi_end();
#endif
#endif //ENABLE_PTHREADS
#ifdef ENABLE_PARSEC_UPTCPIP
uptcp_close(listen_fd);
uptcp_close(fd);
#else
close(listen_fd);
close(fd);
#endif
assert(!mbuffer_system_destroy());
hashtable_destroy(cache, TRUE);
#ifdef ENABLE_STATISTICS
/* dest file stat */
if (stat(conf->outfile, &filestat) < 0)
EXIT_TRACE("stat() %s failed: %s\n", conf->outfile, strerror(errno));
stats.total_output = filestat.st_size;
//Analyze and print statistics
if(conf->verbose) print_stats(&stats);
#endif //ENABLE_STATISTICS
}