blob: 9da66b8e6afce0b956e767b3d5227cb005efc057 [file] [log] [blame]
/* AUTORIGHTS
Copyright (C) 2007 Princeton University
This file is part of Ferret Toolkit.
Ferret Toolkit is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software Foundation,
Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <math.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <dirent.h>
#include <unistd.h>
#include <pthread.h>
#include <cass.h>
#include <cass_timer.h>
#include <../image/image.h>
#include "tpool.h"
#include "queue.h"
#ifdef ENABLE_PARSEC_HOOKS
#include <hooks.h>
#endif
/* for tcpip stack */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#ifdef ENABLE_PARSEC_UPTCPIP
#include <uptcp_socket.h>
#endif
#define PORT 42285
#define MAX_TCP_CONNECTION 10000
#define MAX_SOCKET_BUF 0x4000
#define DEFAULT_DEPTH 25
#define MAXR 100
#define IMAGE_DIM 14
const char *db_dir = NULL;
const char *table_name = NULL;
const char *query_dir = NULL;
const char *output_path = NULL;
int NTHREAD_ACCEPTOR = 1;
int NTHREAD_LOAD = 1;
int NTHREAD_SEG = 1;
int NTHREAD_EXTRACT = 1;
int NTHREAD_VEC = 1;
int NTHREAD_RANK = 1;
int NTHREAD_OUT = 1;
int DEPTH = DEFAULT_DEPTH;
int top_K = 10;
char *extra_params = "-L 8 - T 20";
cass_env_t *env;
cass_table_t *table;
cass_table_t *query_table;
int vec_dist_id = 0;
int vecset_dist_id = 0;
struct packet{
int len;
char buf[1];
};
/* tcp requests */
struct request_data
{
int fd; /* socket */
};
struct queue q_req_load; /* accepted connection queue */
struct load_data
{
int width, height;
int fd;
unsigned char *HSV, *RGB;
};
struct queue q_load_seg;
struct seg_data
{
int width, height, nrgn;
int fd;
unsigned char *mask;
unsigned char *HSV;
};
struct queue q_seg_extract;
struct extract_data
{
cass_dataset_t ds;
int fd;
};
struct queue q_extract_vec;
struct vec_query_data
{
int fd;
cass_dataset_t *ds;
cass_result_t result;
};
struct queue q_vec_rank;
struct rank_data
{
int fd;
cass_dataset_t *ds;
cass_result_t result;
};
struct queue q_rank_out;
/* ------- The Helper Functions ------- */
int cnt_enqueue;
int cnt_dequeue;
/* ------ The Stages ------ */
void *t_acceptor (void *dummy)
{
int fd0, fd1;
struct sockaddr_in sin;
struct sockaddr_in pin;
struct hostent *hp;
socklen_t addrlen;
struct request_data *req;
/* 1. create a socket */
#ifdef ENABLE_PARSEC_UPTCPIP
if ((fd0 = uptcp_socket(AF_INET, SOCK_STREAM, 0)) == -1) {
#else
if ((fd0 = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
#endif
fprintf(stderr, "Socket error: socket()\n");
return (void*)-1;
}
/* 2. bind to a port */
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(PORT);
#ifdef ENABLE_PARSEC_UPTCPIP
if (uptcp_bind(fd0, (struct sockaddr *) &sin, sizeof(sin)) == -1) {
uptcp_close(fd0);
#else
if (bind(fd0, (struct sockaddr *) &sin, sizeof(sin)) == -1) {
close(fd0);
#endif
fprintf(stderr, "Socket error: bind()\n");
return (void*)-1;
}
/* 3. listen */
#ifdef ENABLE_PARSEC_UPTCPIP
if (uptcp_listen(fd0, MAX_TCP_CONNECTION) == -1) {
uptcp_close(fd0);
#else
if (listen(fd0, MAX_TCP_CONNECTION) == -1) {
close(fd0);
#endif
fprintf(stderr, "Socket error: listen()\n");
return (void*)-1;
}
/* 4. accept and enqueue */
while(1){
#ifdef ENABLE_PARSEC_UPTCPIP
if ((fd1 = uptcp_accept(fd0, (struct sockaddr *) &pin, &addrlen)) == -1) {
#else
if ((fd1 = accept(fd0, (struct sockaddr *) &pin, &addrlen)) == -1) {
#endif
fprintf(stderr, "Socket error: accept()\n");
return (void*)-1;
}
/* enqueue */
req = (struct request_data *)calloc(1, sizeof(struct request_data));
req->fd = fd1;
cnt_enqueue++;
#ifdef ENABLE_PARSEC_UPTCPIP
if(cnt_enqueue == 1) /* first connection */
parsec_enter_tcpip_roi();
#endif
enqueue(&q_req_load, req);
}
/* 5. exit */
return NULL;
}
void *t_load (void *dummy)
{
struct request_data *req;
struct load_data *data;
char *buf;
int r;
int bytes_recv;
int bytes_input;
struct packet *pkt_ptr;
int fd;
pkt_ptr = (struct packet*)malloc(MAX_SOCKET_BUF);
while(1){
/* 1. dequeue one request */
if(dequeue(&q_req_load, &req) < 0)
break;
assert(req != NULL);
fd = req->fd;
/* 2. receive data from clients */
r = 0;
bytes_recv = 0;
bytes_input = MAX_SOCKET_BUF;
pkt_ptr->len = 0;
buf = (char*)pkt_ptr;
while(bytes_recv < bytes_input) {
#ifdef ENABLE_PARSEC_UPTCPIP
if ((r = uptcp_recv(fd, buf+bytes_recv, (bytes_input-bytes_recv), 0)) == -1) {
#else
if ((r = recv(fd, buf+bytes_recv, (bytes_input-bytes_recv), 0)) == -1) {
#endif
printf("I/O error\n");
}
/*
* Extract bytes_input from recv_buf
*
* struct send_buf {
* int send_bytes;
* char *buf;
* };
*/
if(bytes_input == MAX_SOCKET_BUF)
bytes_input = pkt_ptr->len + sizeof(int);
if(pkt_ptr->len == -1){ /* last connection */
#ifdef ENABLE_PARSEC_UPTCPIP
parsec_exit_tcpip_roi();
#endif
goto exit;
}
bytes_recv += r;
}
/* 3. enqueue */
data = (struct load_data *)malloc(sizeof(struct load_data));
assert(data != NULL);
data->fd = req->fd;
r = image_read_rgb_hsv_buf(pkt_ptr->buf, pkt_ptr->len, &data->width, &data->height, &data->RGB, &data->HSV);
assert(r == 0);
enqueue(&q_load_seg, data);
}
exit:
queue_signal_terminate(&q_load_seg);
return NULL;
}
void *t_seg (void *dummy)
{
struct seg_data *seg;
struct load_data *load;
while(1)
{
if(dequeue(&q_load_seg, &load) < 0)
break;
assert(load != NULL);
seg = (struct seg_data *)calloc(1, sizeof(struct seg_data));
seg->fd = load->fd;
seg->width = load->width;
seg->height = load->height;
seg->HSV = load->HSV;
image_segment(&seg->mask, &seg->nrgn, load->RGB, load->width, load->height);
free(load->RGB);
free(load);
enqueue(&q_seg_extract, seg);
}
queue_signal_terminate(&q_seg_extract);
return NULL;
}
void *t_extract (void *dummy)
{
struct seg_data *seg;
struct extract_data *extract;
while (1)
{
if(dequeue(&q_seg_extract, &seg) < 0)
break;
assert(seg != NULL);
extract = (struct extract_data *)calloc(1, sizeof(struct extract_data));
extract->fd = seg->fd;
image_extract_helper(seg->HSV, seg->mask, seg->width, seg->height, seg->nrgn, &extract->ds);
free(seg->mask);
free(seg->HSV);
free(seg);
enqueue(&q_extract_vec, extract);
}
queue_signal_terminate(&q_extract_vec);
return NULL;
}
void *t_vec (void *dummy)
{
struct extract_data *extract;
struct vec_query_data *vec;
cass_query_t query;
while(1)
{
if(dequeue(&q_extract_vec, &extract) < 0)
break;
assert(extract != NULL);
vec = (struct vec_query_data *)calloc(1, sizeof(struct vec_query_data));
vec->fd = extract->fd;
memset(&query, 0, sizeof query);
query.flags = CASS_RESULT_LISTS | CASS_RESULT_USERMEM;
vec->ds = query.dataset = &extract->ds;
query.vecset_id = 0;
query.vec_dist_id = vec_dist_id;
query.vecset_dist_id = vecset_dist_id;
query.topk = 2*top_K;
query.extra_params = extra_params;
cass_result_alloc_list(&vec->result, vec->ds->vecset[0].num_regions, query.topk);
// cass_result_alloc_list(&result_m, 0, T1);
// cass_table_query(table, &query, &vec->result);
cass_table_query(table, &query, &vec->result);
enqueue(&q_vec_rank, vec);
}
queue_signal_terminate(&q_vec_rank);
return NULL;
}
void *t_rank (void *dummy)
{
struct vec_query_data *vec;
struct rank_data *rank;
cass_result_t *candidate;
cass_query_t query;
while (1)
{
if(dequeue(&q_vec_rank, &vec) < 0)
break;
assert(vec != NULL);
rank = (struct rank_data *)calloc(1, sizeof(struct rank_data));
rank->fd = vec->fd;
query.flags = CASS_RESULT_LIST | CASS_RESULT_USERMEM | CASS_RESULT_SORT;
query.dataset = vec->ds;
query.vecset_id = 0;
query.vec_dist_id = vec_dist_id;
query.vecset_dist_id = vecset_dist_id;
query.topk = top_K;
query.extra_params = NULL;
candidate = cass_result_merge_lists(&vec->result, (cass_dataset_t *)query_table->__private, 0);
query.candidate = candidate;
cass_result_alloc_list(&rank->result, 0, top_K);
cass_table_query(query_table, &query, &rank->result);
cass_result_free(&vec->result);
cass_result_free(candidate);
free(candidate);
cass_dataset_release(vec->ds);
free(vec->ds);
free(vec);
enqueue(&q_rank_out, rank);
}
queue_signal_terminate(&q_rank_out);
return NULL;
}
void *t_out (void *dummy)
{
struct rank_data *rank;
char *sendbuf, *str_ptr;
int bufsize, str_len;
struct packet *pkt_ptr;
if((pkt_ptr = (struct packet*)malloc(MAX_SOCKET_BUF)) == NULL){
fprintf(stderr, "Not enough memory for sendbuf\n");
return (void*)-1;
}
while (1)
{
if(dequeue(&q_rank_out, &rank) < 0)
break;
assert(rank != NULL);
bufsize = 0;
str_ptr = pkt_ptr->buf;
ARRAY_BEGIN_FOREACH(rank->result.u.list, cass_list_entry_t p)
{
char *obj = NULL;
if (p.dist == HUGE_VAL) continue;
cass_map_id_to_dataobj(query_table->map, p.id, &obj);
assert(obj != NULL);
sprintf(str_ptr, "\t%s:%g", obj, p.dist);
str_len = strlen(str_ptr);
str_ptr += str_len;
bufsize += str_len;
} ARRAY_END_FOREACH;
/* send results to client */
sendbuf = (char*)pkt_ptr;
pkt_ptr->len = bufsize;
bufsize += sizeof(int);
#ifdef ENABLE_PARSEC_UPTCPIP
if (uptcp_send(rank->fd, sendbuf, bufsize, 0) == -1) {
#else
if (send(rank->fd, sendbuf, bufsize, 0) == -1) {
#endif
fprintf(stderr, "Socket error: cannot send ack to client\n");
}
cass_result_free(&rank->result);
free(rank);
cnt_dequeue++;
fprintf(stderr, "(%d,%d)\n", cnt_enqueue, cnt_dequeue);
}
free(pkt_ptr);
assert(cnt_enqueue == cnt_dequeue+1);
return NULL;
}
int main (int argc, char *argv[])
{
stimer_t tmr;
tdesc_t *t_acceptor_desc;
tdesc_t *t_load_desc;
tdesc_t *t_seg_desc;
tdesc_t *t_extract_desc;
tdesc_t *t_vec_desc;
tdesc_t *t_rank_desc;
tdesc_t *t_out_desc;
tpool_t *p_acceptor; /* accept tcp connections */
tpool_t *p_load; /* handle tcp connection*/
tpool_t *p_seg;
tpool_t *p_extract;
tpool_t *p_vec;
tpool_t *p_rank;
tpool_t *p_out;
int ret, i;
#ifdef PARSEC_VERSION
#define __PARSEC_STRING(x) #x
#define __PARSEC_XSTRING(x) __PARSEC_STRING(x)
printf("PARSEC Benchmark Suite Version "__PARSEC_XSTRING(PARSEC_VERSION)"\n");
fflush(NULL);
#else
printf("PARSEC Benchmark Suite\n");
fflush(NULL);
#endif //PARSEC_VERSION
#ifdef ENABLE_PARSEC_HOOKS
__parsec_bench_begin(__parsec_ferret);
#endif
if (argc < 6)
{
printf("%s <database> <table> <top K> <depth> <n>\n", argv[0]);
return 0;
}
db_dir = argv[1];
table_name = argv[2];
top_K = atoi(argv[3]);
DEPTH = atoi(argv[4]);
NTHREAD_SEG = atoi(argv[5]);
NTHREAD_EXTRACT = atoi(argv[5]);
NTHREAD_VEC = atoi(argv[5]);
NTHREAD_RANK = atoi(argv[5]);
cass_init();
ret = cass_env_open(&env, db_dir, 0);
if (ret != 0) { printf("ERROR: %s\n", cass_strerror(ret)); return 0; }
vec_dist_id = cass_reg_lookup(&env->vec_dist, "L2_float");
assert(vec_dist_id >= 0);
vecset_dist_id = cass_reg_lookup(&env->vecset_dist, "emd");
assert(vecset_dist_id >= 0);
i = cass_reg_lookup(&env->table, table_name);
table = query_table = cass_reg_get(&env->table, i);
i = table->parent_id;
if (i >= 0)
{
query_table = cass_reg_get(&env->table, i);
}
if (query_table != table) cass_table_load(query_table);
cass_map_load(query_table->map);
cass_table_load(table);
image_init(argv[0]);
stimer_tick(&tmr);
queue_init(&q_req_load, DEPTH, NTHREAD_ACCEPTOR);
queue_init(&q_load_seg, DEPTH, NTHREAD_LOAD);
queue_init(&q_seg_extract, DEPTH, NTHREAD_SEG);
queue_init(&q_extract_vec, DEPTH, NTHREAD_EXTRACT);
queue_init(&q_vec_rank, DEPTH, NTHREAD_VEC);
queue_init(&q_rank_out, DEPTH, NTHREAD_RANK);
t_acceptor_desc = (tdesc_t *)calloc(NTHREAD_ACCEPTOR, sizeof(tdesc_t));
t_load_desc = (tdesc_t *)calloc(NTHREAD_LOAD, sizeof(tdesc_t));
t_seg_desc = (tdesc_t *)calloc(NTHREAD_SEG, sizeof(tdesc_t));
t_extract_desc = (tdesc_t *)calloc(NTHREAD_EXTRACT, sizeof(tdesc_t));
t_vec_desc = (tdesc_t *)calloc(NTHREAD_VEC, sizeof(tdesc_t));
t_rank_desc = (tdesc_t *)calloc(NTHREAD_RANK, sizeof(tdesc_t));
t_out_desc = (tdesc_t *)calloc(NTHREAD_OUT, sizeof(tdesc_t));
t_acceptor_desc[0].attr = NULL;
t_acceptor_desc[0].start_routine = t_acceptor;
t_acceptor_desc[0].arg = NULL;
for (i = 1; i < NTHREAD_LOAD; i++) t_acceptor_desc[i] = t_acceptor_desc[0];
t_load_desc[0].attr = NULL;
t_load_desc[0].start_routine = t_load;
t_load_desc[0].arg = NULL;
for (i = 1; i < NTHREAD_LOAD; i++) t_load_desc[i] = t_load_desc[0];
t_seg_desc[0].attr = NULL;
t_seg_desc[0].start_routine = t_seg;
t_seg_desc[0].arg = NULL;
for (i = 1; i < NTHREAD_SEG; i++) t_seg_desc[i] = t_seg_desc[0];
t_extract_desc[0].attr = NULL;
t_extract_desc[0].start_routine = t_extract;
t_extract_desc[0].arg = NULL;
for (i = 1; i < NTHREAD_EXTRACT; i++) t_extract_desc[i] = t_extract_desc[0];
t_vec_desc[0].attr = NULL;
t_vec_desc[0].start_routine = t_vec;
t_vec_desc[0].arg = NULL;
for (i = 1; i < NTHREAD_VEC; i++) t_vec_desc[i] = t_vec_desc[0];
t_rank_desc[0].attr = NULL;
t_rank_desc[0].start_routine = t_rank;
t_rank_desc[0].arg = NULL;
for (i = 1; i < NTHREAD_RANK; i++) t_rank_desc[i] = t_rank_desc[0];
t_out_desc[0].attr = NULL;
t_out_desc[0].start_routine = t_out;
t_out_desc[0].arg = NULL;
for (i = 1; i < NTHREAD_OUT; i++) t_out_desc[i] = t_out_desc[0];
cnt_enqueue = cnt_dequeue = 0;
#ifdef ENABLE_PARSEC_HOOKS
__parsec_roi_begin();
#endif
p_acceptor = tpool_create(t_acceptor_desc, NTHREAD_ACCEPTOR);
p_load = tpool_create(t_load_desc, NTHREAD_LOAD);
p_seg = tpool_create(t_seg_desc, NTHREAD_SEG);
p_extract = tpool_create(t_extract_desc, NTHREAD_EXTRACT);
p_vec = tpool_create(t_vec_desc, NTHREAD_VEC);
p_rank = tpool_create(t_rank_desc, NTHREAD_RANK);
p_out = tpool_create(t_out_desc, NTHREAD_OUT);
tpool_join(p_out, NULL);
tpool_join(p_rank, NULL);
tpool_join(p_vec, NULL);
tpool_join(p_extract, NULL);
tpool_join(p_seg, NULL);
tpool_join(p_load, NULL);
#ifdef ENABLE_PARSEC_HOOKS
__parsec_roi_end();
#endif
tpool_cancel(p_acceptor);
tpool_destroy(p_acceptor);
tpool_destroy(p_load);
tpool_destroy(p_seg);
tpool_destroy(p_extract);
tpool_destroy(p_vec);
tpool_destroy(p_rank);
tpool_destroy(p_out);
free(t_acceptor_desc);
free(t_load_desc);
free(t_seg_desc);
free(t_extract_desc);
free(t_vec_desc);
free(t_rank_desc);
free(t_out_desc);
queue_destroy(&q_req_load);
queue_destroy(&q_load_seg);
queue_destroy(&q_seg_extract);
queue_destroy(&q_extract_vec);
queue_destroy(&q_vec_rank);
queue_destroy(&q_rank_out);
stimer_tuck(&tmr, "QUERY TIME");
ret = cass_env_close(env, 0);
if (ret != 0) { printf("ERROR: %s\n", cass_strerror(ret)); return 0; }
cass_cleanup();
image_cleanup();
#ifdef ENABLE_PARSEC_HOOKS
__parsec_bench_end();
#endif
return 0;
}