blob: 246aeddb182749c39c41dcf84975240ea572ff68 [file] [log] [blame]
/*
* Copyright (C) 2008 Princeton University
* All rights reserved.
* Authors: Jia Deng, Gilberto Contreras
*
* streamcluster - Online clustering algorithm
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#ifdef ENABLE_PARSEC_UPTCPIP
#include <uptcp_socket.h>
#endif
#define PORT 42284
/* REPLACE with your server machine name*/
#define SERVER_IP 0x0a0a0a0c //"10.10.10.12"
//#define SERVER_IP 0x7f000001 //"127.0.0.1"
using namespace std;
#define SEED 1
#define MAX_THREAD 16
typedef struct thread_arg{
int tid;
int fd;
long size;
} thread_arg;
static int nproc; //# of threads
static int dim;
static long chunksize;
static pthread_barrier_t thread_barrier;
/*****************************//**
*
* send to server
*
*******************************/
void* send_to_server(void* arg)
{
thread_arg* t_arg = (thread_arg*)arg;
int tid = t_arg->tid;
int sd = t_arg->fd;
struct sockaddr_in sin;
struct sockaddr_in pin;
struct hostent *hp;
int chunks;
float *send_buf;
if(tid != 0){
/* fill in the socket structure with host information */
memset(&pin, 0, sizeof(pin));
pin.sin_family = AF_INET;
pin.sin_addr.s_addr = htonl(SERVER_IP);
pin.sin_port = htons(PORT);
/* grab an Internet domain socket */
#ifdef ENABLE_PARSEC_UPTCPIP
if ((sd = uptcp_socket(AF_INET, SOCK_STREAM, 0)) == -1) {
#else
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
#endif
printf("Socket error: socket()\n");
goto out;
}
/* connect to PORT on SERVER_IP */
#ifdef ENABLE_PARSEC_UPTCPIP
if (uptcp_connect(sd,(struct sockaddr *) &pin, sizeof(pin)) == -1) {
#else
if (connect(sd,(struct sockaddr *) &pin, sizeof(pin)) == -1) {
#endif
printf("Socket error: connect()\n");
goto out;
}
}
/* allocate memory buffer */
chunks = t_arg->size / chunksize;
send_buf = (float*)malloc(chunksize*dim*sizeof(float));
if(send_buf == NULL){
printf("not enough ememory\n");
goto out;
}
/* Send data to server */
printf("[Client:%d]: Sending ...\n", tid);
for(int i = 0; i < chunks; i++){
/* generate data */
for( int j = 0; j < chunksize ; j++ ) {
for( int k = 0; k < dim; k++ ) {
send_buf[j*dim + k] = lrand48()/(float)(2147483647); //INT_MAX;
}
}
/* send data */
int bytes_left = chunksize * dim * sizeof(float);
int bytes_tosend = 0;
int ss;
char* send_ptr = (char*)send_buf;
while(bytes_left >0){
bytes_tosend = bytes_left;
#ifdef ENABLE_PARSEC_UPTCPIP
if ((ss = uptcp_send(sd, send_ptr, bytes_tosend, 0)) == -1) {
#else
if ((ss = send(sd, send_ptr, bytes_tosend, 0)) == -1) {
#endif
printf("Socket error: send input file data error\n");
}
printf("[Client:%d] send bytes = %d\n", tid, ss);
bytes_left -= ss;
send_ptr += ss;
}
}
printf("[Client:%d] Send data to server ok!\n", tid);
if(tid == 0){
#ifdef ENABLE_PARSEC_UPTCPIP
if (uptcp_recv(sd, &tid, sizeof(tid), 0) == -1) {
#else
if (recv(sd, &tid, sizeof(tid), 0) == -1) {
#endif
printf("Socket error: recv()\n");
}
printf("Receive Ack from Server\n");
}
pthread_barrier_wait(&thread_barrier);
out:
free(t_arg);
#ifdef ENABLE_PARSEC_UPTCPIP
uptcp_close(sd);
#else
close(sd);
#endif
pthread_exit(NULL);
}
/*****************************//**
*
* main()
*
*******************************/
int main(int argc, char **argv)
{
long n;
pthread_t thread_id[MAX_THREAD];
int thread_count = 0;
/* deal with input arguments */
if (argc != 5) {
fprintf(stderr,"usage: %s d n chunksize nproc\n",
argv[0]);
fprintf(stderr," d: Dimension of each data point\n");
fprintf(stderr," n: Number of data points\n");
fprintf(stderr," chunksize: Number of data points to handle per step\n");
fprintf(stderr," nproc: Number of threads to use\n");
fprintf(stderr,"\n");
fprintf(stderr, "if n > 0, points will be randomly generated instead of reading from infile.\n");
exit(1);
}
dim = atoi(argv[1]);
n = atoi(argv[2]);
chunksize = atoi(argv[3]);
nproc = atoi(argv[4]);
if(n % chunksize != 0){
printf("[Client] ensure that n \% chunksize == 0\n");
exit(1);
}
int chunks = n/chunksize;
if(chunks < nproc){
printf("[Client] %d threads is too many for %d chunks. Use %d threads instead", nproc, chunks, chunks);
nproc = chunks;
}
int chunks_per_thread = chunks / nproc;
int rest = chunks % nproc;
srand48(SEED);
/* initial connection */
int sd;
struct sockaddr_in sin;
struct sockaddr_in pin;
struct hostent *hp;
/* fill in the socket structure with host information */
memset(&pin, 0, sizeof(pin));
pin.sin_family = AF_INET;
pin.sin_addr.s_addr = htonl(SERVER_IP);
pin.sin_port = htons(PORT);
/* grab an Internet domain socket */
#ifdef ENABLE_PARSEC_UPTCPIP
if ((sd = uptcp_socket(AF_INET, SOCK_STREAM, 0)) == -1) {
#else
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
#endif
printf("Socket error: socket()\n");
exit(1);
}
/* connect to PORT on SERVER_IP */
#ifdef ENABLE_PARSEC_UPTCPIP
if (uptcp_connect(sd,(struct sockaddr *) &pin, sizeof(pin)) == -1) {
#else
if (connect(sd,(struct sockaddr *) &pin, sizeof(pin)) == -1) {
#endif
printf("Socket error: connect()\n");
exit(1);
}
/* send file size to server */
#ifdef ENABLE_PARSEC_UPTCPIP
if (uptcp_send(sd, &nproc, sizeof(nproc), 0) == -1) {
#else
if (send(sd, &nproc, sizeof(nproc), 0) == -1) {
#endif
printf("Socket error: cannot send input file size to server\n");
exit(1);
}
pthread_barrier_init(&thread_barrier, NULL, nproc);
/* create threads */
thread_arg* arg_ptr;
while(thread_count < nproc){
arg_ptr = (thread_arg*)malloc(sizeof(thread_arg));
arg_ptr->tid = thread_count;
if(thread_count == 0)
arg_ptr->fd = sd;
else arg_ptr->fd = -1;
if(thread_count < rest)
arg_ptr->size = (chunks_per_thread + 1) * chunksize;
else arg_ptr->size = chunks_per_thread * chunksize;
#ifdef ENABLE_PARSEC_UPTCPIP
if(uptcp_pthread_create(&thread_id[thread_count], NULL, send_to_server, (void*)arg_ptr) != 0){
#else
if(pthread_create(&thread_id[thread_count], NULL, send_to_server, arg_ptr) != 0){
#endif
printf("pthread_create() error\n");
}
thread_count ++;
}
while(thread_count > 0){
pthread_join(thread_id[thread_count-1], NULL);
thread_count --;
}
return 0;
}