blob: 3550537c11024d6b3c14c5278bc403fa37554ef8 [file] [log] [blame]
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "adc.h"
#include "macrodef.h"
#include "protots.h"
#ifdef UNIX
#include <errno.h>
#endif
extern int32 computeChecksum(ADC_VIEW_CNTL *avp,treeNode *t,uint64 *ordern);
extern int32 WriteViewToDiskCS(ADC_VIEW_CNTL *avp,treeNode *t,uint64 *ordern);
int32 ReadWholeInputData(ADC_VIEW_CNTL *avp, FILE *inpf){
uint32 iRec = 0;
uint32 inpBufferLineSize, inpBufferPace, inpRecSize, ib = 0;
FSEEK(inpf, 0L, SEEK_SET);
inpRecSize = 8*avp->nm+4*avp->nTopDims;
inpBufferLineSize = inpRecSize;
if (inpBufferLineSize%8) inpBufferLineSize += 4;
inpBufferPace = inpBufferLineSize/4;
while(fread(&avp->inpDataBuffer[ib], inpRecSize, 1, inpf)){
iRec++;
ib += inpBufferPace;
}
avp->nRowsToRead = iRec;
FSEEK(inpf, 0L, SEEK_SET);
if(avp->nInputRecs != iRec){
fprintf(stderr, " ReadWholeInputData(): wrong input data reading.\n");
return ADC_INTERNAL_ERROR;
}
return ADC_OK;
}
int32 ComputeMemoryFittedView (ADC_VIEW_CNTL *avp){
uint32 iRec = 0;
uint32 viewBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
uint32 inpBufferLineSize, inpBufferPace, inpRecSize,ib;
uint64 ordern=0;
#ifdef VIEW_FILE_OUTPUT
uint32 retCode;
#endif
FSEEK(avp->viewFile, 0L, SEEK_END);
inpRecSize = 8*avp->nm+4*avp->nTopDims;
inpBufferLineSize = inpRecSize;
if (inpBufferLineSize%8) inpBufferLineSize += 4;
inpBufferPace = inpBufferLineSize/4;
InitializeTree(avp->tree, avp->nv, avp->nm);
ib=0;
for ( iRec = 1; iRec <= avp->nRowsToRead; iRec++ ){
SelectToView( &avp->inpDataBuffer[ib], avp->selection, viewBuf,
avp->nd, avp->nm, avp->nv );
ib += inpBufferPace;
TreeInsert(avp->tree, viewBuf);
if(avp->tree->memoryIsFull){
fprintf(stderr, "ComputeMemoryFittedView(): Not enough memory.\n");
return 1;
}
}
#ifdef VIEW_FILE_OUTPUT
if( retCode = WriteViewToDiskCS(avp, avp->tree->root.left,&ordern) ){
fprintf(stderr, "ComputeMemoryFittedView() Write error is occured.\n");
return retCode;
}
#else
computeChecksum(avp,avp->tree->root.left,&ordern);
#endif
avp->nViewRows = avp->tree->count;
avp->totalOfViewRows += avp->nViewRows;
InitializeTree(avp->tree, avp->nv, avp->nm);
return ADC_OK;
}
int32 SharedSortAggregate(ADC_VIEW_CNTL *avp){
int32 retCode;
uint32 iRec = 0;
uint32 attrs[MAX_VIEW_ROW_SIZE_IN_INTS];
uint32 currBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
int64 chunkOffset = 0;
int64 inpfOffset;
uint32 nPart = 0;
uint32 prevV;
uint32 currV;
uint32 total = 0;
unsigned char *ib;
uint32 ibsize = SSA_BUFFER_SIZE;
uint32 nib;
uint32 iib;
uint32 nreg;
uint32 nlst;
uint32 nsgs;
uint32 ncur;
uint32 ibOffset = 0;
uint64 ordern=0;
ib = (unsigned char*) malloc(ibsize);
if (!ib){
fprintf(stderr,"SharedSortAggregate: memory allocation failed\n");
return ADC_MEMORY_ALLOCATION_FAILURE;
}
nib = ibsize/avp->inpRecSize;
nsgs = avp->nRowsToRead/nib;
if (nsgs == 0){
nreg = avp->nRowsToRead;
nlst = nreg;
nsgs = 1;
}else{
nreg = nib;
if (avp->nRowsToRead%nib) {
nsgs++;
nlst = avp->nRowsToRead%nib;
}else{
nlst = nreg;
}
}
avp->nViewRows = 0;
for( iib = 1; iib <= nsgs; iib++ ){
if(iib > 1) FSEEK(avp->viewFile, inpfOffset, SEEK_SET);
if( iib == nsgs ) ncur = nlst; else ncur = nreg;
fread(ib, ncur*avp->inpRecSize, 1, avp->viewFile);
inpfOffset = ftell(avp->viewFile);
for( ibOffset = 0, iRec = 1; iRec <= ncur; iRec++ ){
memcpy(attrs, &ib[ibOffset], avp->inpRecSize);
ibOffset += avp->inpRecSize;
SelectToView(attrs, avp->selection, currBuf, avp->nd, avp->nm, avp->nv);
currV = currBuf[2*avp->nm];
if(iib == 1 && iRec == 1){
prevV = currV;
nPart = 1;
InitializeTree(avp->tree, avp->nv, avp->nm);
TreeInsert(avp->tree, currBuf);
}else{
if (currV == prevV){
nPart++;
TreeInsert (avp->tree, currBuf);
if (avp->tree->memoryIsFull){
avp->chunksParams[avp->numberOfChunks].curChunkNum =
avp->tree->count;
avp->chunksParams[avp->numberOfChunks].chunkOffset = chunkOffset;
(avp->numberOfChunks)++;
if(avp->numberOfChunks >= MAX_NUM_OF_CHUNKS){
fprintf(stderr,"Too many chunks were created.\n");
exit(1);
}
chunkOffset += (uint64)(avp->tree->count*avp->outRecSize);
retCode=WriteChunkToDisk(avp->outRecSize, avp->fileOfChunks,
avp->tree->root.left, avp->logf);
if(retCode!=ADC_OK){
fprintf(stderr,"SharedSortAggregate: Write error occured.\n");
return retCode;
}
InitializeTree(avp->tree, avp->nv, avp->nm);
} /* memoryIsFull */
}else{
if(avp->numberOfChunks && avp->tree->count!=0){
avp->chunksParams[avp->numberOfChunks].curChunkNum =
avp->tree->count;
avp->chunksParams[avp->numberOfChunks].chunkOffset = chunkOffset;
(avp->numberOfChunks)++;
chunkOffset +=
(uint64)(avp->tree->count*(4*avp->nv + 8*avp->nm));
retCode=WriteChunkToDisk( avp->outRecSize, avp->fileOfChunks,
avp->tree->root.left, avp->logf);
if(retCode!=ADC_OK){
fprintf(stderr,"SharedSortAggregate: Write error occured.\n");
return retCode;
}
}
FSEEK(avp->viewFile, 0L, SEEK_END);
if(!avp->numberOfChunks){
avp->nViewRows += avp->tree->count;
retCode = WriteViewToDiskCS(avp, avp->tree->root.left,&ordern);
if(retCode!=ADC_OK){
fprintf(stderr,
"SharedSortAggregate: Write error occured.\n");
return retCode;
}
}else{
retCode=MultiWayMerge(avp);
if(retCode!=ADC_OK) {
fprintf(stderr,"SharedSortAggregate.MultiWayMerge: failed.\n");
return retCode;
}
}
InitializeTree(avp->tree, avp->nv, avp->nm);
TreeInsert(avp->tree, currBuf);
total += nPart;
nPart = 1;
}
}
prevV = currV;
} /* iRec */
} /* iib */
if(avp->numberOfChunks && avp->tree->count!=0) {
avp->chunksParams[avp->numberOfChunks].curChunkNum = avp->tree->count;
avp->chunksParams[avp->numberOfChunks].chunkOffset = chunkOffset;
(avp->numberOfChunks)++;
chunkOffset += (uint64)(avp->tree->count*(4*avp->nv + 8*avp->nm));
retCode=WriteChunkToDisk(avp->outRecSize, avp->fileOfChunks,
avp->tree->root.left, avp->logf);
if(retCode!=ADC_OK){
fprintf(stderr,"SharedSortAggregate: Write error occured.\n");
return retCode;
}
}
FSEEK(avp->viewFile, 0L, SEEK_END);
if(!avp->numberOfChunks){
avp->nViewRows += avp->tree->count;
if( retCode = WriteViewToDiskCS(avp, avp->tree->root.left,&ordern)){
fprintf(stderr, "SharedSortAggregate: Write error occured.\n");
return retCode;
}
}else{
retCode=MultiWayMerge(avp);
if(retCode!=ADC_OK) {
fprintf(stderr,"SharedSortAggregate.MultiWayMerge failed.\n");
return retCode;
}
}
FSEEK(avp->fileOfChunks, 0L, SEEK_SET);
total += nPart;
avp->totalOfViewRows += avp->nViewRows;
if(ib) free(ib);
return ADC_OK;
}
int32 PrefixedAggregate(ADC_VIEW_CNTL *avp, FILE *iof){
uint32 i;
uint32 iRec = 0;
uint32 attrs[MAX_VIEW_ROW_SIZE_IN_INTS];
uint32 aggrBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
uint32 currBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
uint32 prevBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
int64 *aggrmp;
int64 *currmp;
int32 compRes;
uint32 nOut = 0;
uint32 mpOffset = 0;
uint32 nOutBufRecs;
uint32 nViewRows = 0;
int64 inpfOffset;
aggrmp = (int64*) &aggrBuf[0];
currmp = (int64*) &currBuf[0];
for(i = 0; i < 2*avp->nm+avp->nv; i++){prevBuf[i] = 0; aggrBuf[i] = 0;}
nOutBufRecs = avp->memoryLimit/avp->outRecSize;
for(iRec = 1; iRec <= avp->nRowsToRead; iRec++ ){
fread(attrs, avp->inpRecSize, 1, iof);
SelectToView(attrs, avp->selection, currBuf, avp->nd, avp->nm, avp->nv);
if (iRec == 1) memcpy(aggrBuf, currBuf, avp->outRecSize);
else{
compRes = KeyComp( &currBuf[2*avp->nm], &prevBuf[2*avp->nm], avp->nv);
switch(compRes){
case 1:
memcpy(&avp->memPool[mpOffset], aggrBuf, avp->outRecSize);
mpOffset += avp->outRecSize;
nOut++;
for ( i = 0; i < avp->nm; i++ ){
avp->mSums[i] += aggrmp[i];
avp->checksums[i] += nOut*aggrmp[i]%measbound;
}
memcpy(aggrBuf, currBuf, avp->outRecSize);
break;
case 0:
for ( i = 0; i < avp->nm; i++ ) aggrmp[i] += currmp[i];
break;
case -1:
fprintf(stderr,"PrefixedAggregate: wrong parent view order.\n");
exit(1);
break;
default:
fprintf(stderr,"PrefixedAggregate: wrong KeyComp() result.\n");
exit(1);
break;
}
if (nOut == nOutBufRecs){
inpfOffset = ftell(iof);
FSEEK(iof, 0L, SEEK_END);
WriteToFile(avp->memPool, nOut*avp->outRecSize, 1, iof, stderr);
FSEEK(iof, inpfOffset, SEEK_SET);
mpOffset = 0;
nViewRows += nOut;
nOut = 0;
}
}
memcpy(prevBuf, currBuf, avp->outRecSize);
}
memcpy(&avp->memPool[mpOffset], aggrBuf, avp->outRecSize);
nOut++;
for ( i = 0; i < avp->nm; i++ ){
avp->mSums[i] += aggrmp[i];
avp->checksums[i] += nOut*aggrmp[i]%measbound;
}
FSEEK(iof, 0L, SEEK_END);
WriteToFile(avp->memPool, nOut*avp->outRecSize, 1, iof, stderr);
avp->nViewRows = nViewRows+nOut;
avp->totalOfViewRows += avp->nViewRows;
return ADC_OK;
}
int32 RunFormation (ADC_VIEW_CNTL *avp, FILE *inpf){
uint32 iRec = 0;
uint32 viewBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
uint32 attrs[MAX_VIEW_ROW_SIZE_IN_INTS];
int64 chunkOffset = 0;
InitializeTree(avp->tree, avp->nv, avp->nm);
for(iRec = 1; iRec <= avp->nRowsToRead; iRec++ ){
fread(attrs, avp->inpRecSize, 1, inpf);
SelectToView(attrs, avp->selection, viewBuf, avp->nd, avp->nm, avp->nv);
TreeInsert(avp->tree, viewBuf);
if(avp->tree->memoryIsFull) {
avp->chunksParams[avp->numberOfChunks].curChunkNum = avp->tree->count;
avp->chunksParams[avp->numberOfChunks].chunkOffset = chunkOffset;
(avp->numberOfChunks)++;
if (avp->numberOfChunks >= MAX_NUM_OF_CHUNKS) {
fprintf(stderr, "RunFormation: Too many chunks were created.\n");
return ADC_INTERNAL_ERROR;
}
chunkOffset += (uint64)(avp->tree->count*avp->outRecSize);
if(WriteChunkToDisk( avp->outRecSize, avp->fileOfChunks,
avp->tree->root.left, avp->logf )){
fprintf(stderr,
"RunFormation.WriteChunkToDisk: Write error is occured.\n");
return ADC_WRITE_FAILED;
}
InitializeTree(avp->tree, avp->nv, avp->nm);
}
} /* Insertion ... */
if(avp->numberOfChunks && avp->tree->count!=0) {
avp->chunksParams[avp->numberOfChunks].curChunkNum = avp->tree->count;
avp->chunksParams[avp->numberOfChunks].chunkOffset = chunkOffset;
(avp->numberOfChunks)++;
chunkOffset += (uint64)(avp->tree->count*(4*avp->nv + 8*avp->nm));
if(WriteChunkToDisk(avp->outRecSize, avp->fileOfChunks,
avp->tree->root.left, avp->logf)){
fprintf(stderr,
"RunFormation(.WriteChunkToDisk: Write error is occured.\n");
return ADC_WRITE_FAILED;
}
}
FSEEK(avp->viewFile, 0L, SEEK_END);
return ADC_OK;
}
void SeekAndReadNextSubChunk( uint32 multiChunkBuffer[],
uint32 k,
FILE *inFile,
uint32 chunkRecSize,
uint64 inFileOffs,
uint32 subChunkNum){
int64 ret;
ret = FSEEK(inFile, inFileOffs, SEEK_SET);
if (ret < 0){
fprintf(stderr,"SeekAndReadNextSubChunk.fseek() < 0 ");
exit(1);
}
fread(&multiChunkBuffer[k], chunkRecSize*subChunkNum, 1, inFile);
}
void ReadSubChunk(
uint32 chunkRecSize,
uint32 *multiChunkBuffer,
uint32 mwBufRecSizeInInt,
uint32 iChunk,
uint32 regSubChunkSize,
CHUNKS *chunks,
FILE *fileOfChunks
){
if (chunks[iChunk].curChunkNum > 0){
if(chunks[iChunk].curChunkNum < regSubChunkSize){
SeekAndReadNextSubChunk(multiChunkBuffer,
(iChunk*regSubChunkSize +
(regSubChunkSize-chunks[iChunk].curChunkNum))*
mwBufRecSizeInInt,
fileOfChunks,
chunkRecSize,
chunks[iChunk].chunkOffset,
chunks[iChunk].curChunkNum);
chunks[iChunk].posSubChunk=regSubChunkSize-chunks[iChunk].curChunkNum;
chunks[iChunk].curSubChunk=chunks[iChunk].curChunkNum;
chunks[iChunk].curChunkNum=0;
chunks[iChunk].chunkOffset=-1;
}else{
SeekAndReadNextSubChunk(multiChunkBuffer,
iChunk*regSubChunkSize*mwBufRecSizeInInt,
fileOfChunks,
chunkRecSize,
chunks[iChunk].chunkOffset,
regSubChunkSize);
chunks[iChunk].posSubChunk = 0;
chunks[iChunk].curSubChunk = regSubChunkSize;
chunks[iChunk].curChunkNum -= regSubChunkSize;
chunks[iChunk].chunkOffset += regSubChunkSize * chunkRecSize;
}
}
}
int32 MultiWayMerge(ADC_VIEW_CNTL *avp){
uint32 outputBuffer[OUTPUT_BUFFER_SIZE];
uint32 r_buf [OUTPUT_BUFFER_SIZE];
uint32 min_r_buf [OUTPUT_BUFFER_SIZE];
uint32 first_one;
uint32 i;
uint32 iChunk;
uint32 min_r_chunk;
uint32 sPos;
uint32 iPos;
uint32 numEmptyBufs;
uint32 numEmptyRuns;
uint32 mwBufRecSizeInInt;
uint32 chunkRecSize;
uint32 *multiChunkBuffer;
uint32 regSubChunkSize;
int32 compRes;
int64 *m_min_r_buf;
int64 *m_outputBuffer;
FSEEK(avp->fileOfChunks, 0L, SEEK_SET);
multiChunkBuffer = (uint32*) &avp->memPool[0];
first_one = 1;
avp->nViewRows = 0;
chunkRecSize = avp->outRecSize;
mwBufRecSizeInInt = chunkRecSize/4;
m_min_r_buf = (int64*)&min_r_buf[0];
m_outputBuffer = (int64*)&outputBuffer[0];
mwBufRecSizeInInt = chunkRecSize/4;
regSubChunkSize = (avp->memoryLimit/avp->numberOfChunks)/chunkRecSize;
if (regSubChunkSize==0) {
fprintf(stderr,
"MultiWayMerge: Not enough memory to run the external sort\n");
return ADC_INTERNAL_ERROR;
}
multiChunkBuffer = (uint32*) &avp->memPool[0];
for(i = 0; i < avp->numberOfChunks; i++ ){
ReadSubChunk(
chunkRecSize,
multiChunkBuffer,
mwBufRecSizeInInt,
i,
regSubChunkSize,
avp->chunksParams,
avp->fileOfChunks
);
}
while(1){
for(iChunk = 0;iChunk<avp->numberOfChunks;iChunk++){
if (avp->chunksParams[iChunk].curSubChunk > 0){
sPos = iChunk*regSubChunkSize*mwBufRecSizeInInt;
iPos = sPos+mwBufRecSizeInInt*avp->chunksParams[iChunk].posSubChunk;
memcpy(&min_r_buf[0], &multiChunkBuffer[iPos], avp->outRecSize);
min_r_chunk = iChunk;
break;
}
}
for ( iChunk = min_r_chunk; iChunk < avp->numberOfChunks; iChunk++ ){
uint32 iPos;
if (avp->chunksParams[iChunk].curSubChunk > 0){
iPos = mwBufRecSizeInInt*(iChunk*regSubChunkSize+
avp->chunksParams[iChunk].posSubChunk);
memcpy(&r_buf[0],&multiChunkBuffer[iPos],avp->outRecSize);
compRes=KeyComp(&r_buf[2*avp->nm],&min_r_buf[2*avp->nm],avp->nv);
if(compRes < 0) {
memcpy(&min_r_buf[0], &r_buf[0], avp->outRecSize);
min_r_chunk = iChunk;
}
}
}
/* Step forward */
if(avp->chunksParams[min_r_chunk].curSubChunk != 0){
avp->chunksParams[min_r_chunk].curSubChunk--;
avp->chunksParams[min_r_chunk].posSubChunk++;
}
/* Aggreagation if a duplicate is encountered */
if(first_one){
memcpy( &outputBuffer[0], &min_r_buf[0], avp->outRecSize);
first_one = 0;
}else{
compRes = KeyComp( &outputBuffer[2*avp->nm],
&min_r_buf[2*avp->nm], avp->nv );
if(!compRes){
for(i = 0; i < avp->nm; i++ ){
m_outputBuffer[i] += m_min_r_buf[i];
}
}else{
WriteToFile(outputBuffer,avp->outRecSize,1,avp->viewFile,stderr);
avp->nViewRows++;
for(i=0;i<avp->nm;i++){
avp->mSums[i]+=m_outputBuffer[i];
avp->checksums[i] += avp->nViewRows*m_outputBuffer[i]%measbound;
}
memcpy( &outputBuffer[0], &min_r_buf[0], avp->outRecSize );
}
}
for(numEmptyBufs = 0,
numEmptyRuns = 0, i = 0; i < avp->numberOfChunks; i++ ){
if (avp->chunksParams[i].curSubChunk == 0) numEmptyBufs++;
if (avp->chunksParams[i].curChunkNum == 0) numEmptyRuns++;
}
if( numEmptyBufs == avp->numberOfChunks
&&numEmptyRuns == avp->numberOfChunks) break;
if(avp->chunksParams[min_r_chunk].curSubChunk == 0) {
ReadSubChunk(
chunkRecSize,
multiChunkBuffer,
mwBufRecSizeInInt,
min_r_chunk,
regSubChunkSize,
avp->chunksParams,
avp->fileOfChunks);
}
} /* while(1) */
WriteToFile( outputBuffer, avp->outRecSize, 1, avp->viewFile, stderr);
avp->nViewRows++;
for(i = 0; i < avp->nm; i++ ){
avp->mSums[i] += m_outputBuffer[i];
avp->checksums[i] += avp->nViewRows*m_outputBuffer[i]%measbound;
}
avp->totalOfViewRows += avp->nViewRows;
return ADC_OK;
}
void SelectToView( uint32 * ib, uint32 *ix, uint32 *viewBuf,
uint32 nd, uint32 nm, uint32 nv ){
uint32 i, j;
for ( j = 0, i = 0; i < nv; i++ ) viewBuf[2*nm+j++] = ib[2*nm+ix[i]-1];
memcpy(&viewBuf[0], &ib[0], MSR_FSZ*nm);
}
FILE * AdcFileOpen(const char *fileName, const char *mode){
FILE *fr;
if ((fr = (FILE*) fopen(fileName, mode))==NULL)
fprintf(stderr, "AdcFileOpen: Cannot open the file %s errno = %d\n",
fileName, errno);
return fr;
}
void AdcFileName(char *adcFileName, const char *adcName,
const char *fileName, uint32 taskNumber){
sprintf(adcFileName, "%s.%s.%d",adcName,fileName,taskNumber);
}
ADC_VIEW_CNTL * NewAdcViewCntl(ADC_VIEW_PARS *adcpp, uint32 pnum){
ADC_VIEW_CNTL *adccntl;
uint32 i, j, k;
#ifdef IN_CORE
uint32 ux;
#endif
char id[8+1];
adccntl = (ADC_VIEW_CNTL *) malloc(sizeof(ADC_VIEW_CNTL));
if (adccntl==NULL) return NULL;
adccntl->ndid = adcpp->ndid;
adccntl->taskNumber = pnum;
adccntl->retCode = 0;
adccntl->swapIt = 0;
strcpy(adccntl->adcName, adcpp->adcName);
adccntl->nTopDims = adcpp->nd;
adccntl->nd = adcpp->nd;
adccntl->nm = adcpp->nm;
adccntl->nInputRecs = adcpp->nInputRecs;
adccntl->inpRecSize = GetRecSize(adccntl->nd,adccntl->nm);
adccntl->outRecSize = GetRecSize(adccntl->nv,adccntl->nm);
adccntl->accViewFileOffset = 0;
adccntl->totalViewFileSize = 0;
adccntl->numberOfMadeViews = 0;
adccntl->numberOfViewsMadeFromInput = 0;
adccntl->numberOfPrefixedGroupbys = 0;
adccntl->numberOfSharedSortGroupbys = 0;
adccntl->totalOfViewRows = 0;
adccntl->memoryLimit = adcpp->memoryLimit;
adccntl->nTasks = adcpp->nTasks;
strcpy(adccntl->inpFileName, adcpp->adcInpFileName);
sprintf(id, ".%d", adcpp->ndid);
AdcFileName(adccntl->adcLogFileName,
adccntl->adcName, "logf", adccntl->taskNumber);
strcat(adccntl->adcLogFileName, id);
adccntl->logf = AdcFileOpen(adccntl->adcLogFileName, "w");
AdcFileName(adccntl->inpFileName, adccntl->adcName, "dat", adcpp->ndid);
adccntl->inpf = AdcFileOpen(adccntl->inpFileName, "rb");
if(!adccntl->inpf){
adccntl->retCode = ADC_FILE_OPEN_FAILURE;
return(adccntl);
}
AdcFileName(adccntl->viewFileName, adccntl->adcName,
"view.dat", adccntl->taskNumber);
strcat(adccntl->viewFileName, id);
adccntl->viewFile = AdcFileOpen(adccntl->viewFileName, "wb+");
AdcFileName(adccntl->chunksFileName, adccntl->adcName,
"chunks.dat", adccntl->taskNumber);
strcat(adccntl->chunksFileName, id);
adccntl->fileOfChunks = AdcFileOpen(adccntl->chunksFileName,"wb+");
AdcFileName(adccntl->groupbyFileName, adccntl->adcName,
"groupby.dat", adccntl->taskNumber);
strcat(adccntl->groupbyFileName, id);
adccntl->groupbyFile = AdcFileOpen(adccntl->groupbyFileName,"wb+");
AdcFileName(adccntl->adcViewSizesFileName, adccntl->adcName,
"view.sz", adcpp->ndid);
adccntl->adcViewSizesFile = AdcFileOpen(adccntl->adcViewSizesFileName,"r");
if(!adccntl->adcViewSizesFile){
adccntl->retCode = ADC_FILE_OPEN_FAILURE;
return(adccntl);
}
AdcFileName(adccntl->viewSizesFileName, adccntl->adcName,
"viewsz.dat", adccntl->taskNumber);
strcat(adccntl->viewSizesFileName, id);
adccntl->viewSizesFile = AdcFileOpen(adccntl->viewSizesFileName, "wb+");
adccntl->chunksParams = (CHUNKS*) malloc(MAX_NUM_OF_CHUNKS*sizeof(CHUNKS));
if(adccntl->chunksParams==NULL){
fprintf(adccntl->logf,"NewAdcViewCntl: Cannot allocate 'chunksParsms'\n");
adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
return(adccntl);
}
adccntl->memPool = (unsigned char*) malloc(adccntl->memoryLimit);
if(adccntl->memPool == NULL ){
fprintf(adccntl->logf,
"NewAdcViewCntl: Cannot allocate 'main memory pool'\n");
adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
return(adccntl);
}
#ifdef IN_CORE
/* add a condition to allocate this memory buffer, THIS is IMPORTANT */
ux = 4*adccntl->nTopDims + 8*adccntl->nm;
if (adccntl->nTopDims%8) ux += 4;
adccntl->inpDataBuffer = (uint32*) malloc(adccntl->nInputRecs*ux);
if(adccntl->inpDataBuffer == NULL ){
fprintf(adccntl->logf,
"NewAdcViewCntl: Cannot allocate 'input data buffer'\n");
adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
return(adccntl);
}
#endif
adccntl->numberOfChunks = 0;
for ( i = 0; i < adccntl->nm; i++ ){
adccntl->mSums[i] = 0;
adccntl->checksums[i] = 0;
adccntl->totchs[i] = 0;
}
adccntl->tree = CreateEmptyTree(adccntl->nd, adccntl->nm,
adccntl->memoryLimit, adccntl->memPool);
if(!adccntl->tree){
fprintf(adccntl->logf,"\nNewAdcViewCntl.CreateEmptyTree failed.\n");
adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
return(adccntl);
}
adccntl->nv = adcpp->nd; /* default */
for ( i = 0; i < adccntl->nv; i++ ) adccntl->selection[i]=i+1;
adccntl->nViewLimit = (1<<adcpp->nd)-1;
adccntl->jpp=(JOB_POOL *) malloc((adccntl->nViewLimit+1)*sizeof(JOB_POOL));
if ( adccntl->jpp == NULL){
fprintf(adccntl->logf,
"\n Not enough space to allocate %ld byte for a job pool.",
(long)(adccntl->nViewLimit+1)*sizeof(JOB_POOL));
adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
return(adccntl);
}
adccntl->lpp = (LAYER * ) malloc( (adcpp->nd+1)*sizeof(LAYER));
if ( adccntl->lpp == NULL){
fprintf(adccntl->logf,
"\n Not enough space to allocate %ld byte for a layer reference array.",
(long)(adcpp->nd+1)*sizeof(LAYER));
adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
return(adccntl);
}
for ( j = 1, i = 1; i <= adcpp->nd; i++ ) {
k = NumOfCombsFromNbyK ( adcpp->nd, i );
adccntl->lpp[i].layerIndex = j;
j += k;
adccntl->lpp[i].layerQuantityLimit = k;
adccntl->lpp[i].layerCurrentPopulation = 0;
}
JobPoolInit ( adccntl->jpp, (adccntl->nViewLimit+1), adcpp->nd );
fprintf(adccntl->logf,"\nMeaning of the log file colums is as follows:\n");
fprintf(adccntl->logf,
"Row Number | Groupby | View Size | Measure Sums | Number of Chunks\n");
adccntl->verificationFailed = 1;
return adccntl;
}
void InitAdcViewCntl(ADC_VIEW_CNTL *adccntl,
uint32 nSelectedDims,
uint32 *selection,
uint32 fromParent ){
uint32 i;
adccntl->nv = nSelectedDims;
for (i = 0; i < adccntl->nm; i++ ) adccntl->mSums[i] = 0;
for (i = 0; i < adccntl->nv; i++ ) adccntl->selection[i] = selection[i];
adccntl->outRecSize = GetRecSize(adccntl->nv,adccntl->nm);
adccntl->numberOfChunks = 0;
adccntl->fromParent = fromParent;
adccntl->nViewRows = 0;
if(fromParent){
adccntl->nd = adccntl->smallestParentLevel;
FSEEK(adccntl->viewFile, adccntl->viewOffset, SEEK_SET);
adccntl->nRowsToRead = adccntl->nParentViewRows;
}else{
adccntl->nd = adccntl->nTopDims;
adccntl->nRowsToRead = adccntl->nInputRecs;
}
adccntl->inpRecSize = GetRecSize(adccntl->nd,adccntl->nm);
adccntl->outRecSize = GetRecSize(adccntl->nv,adccntl->nm);
}
int32 CloseAdcView(ADC_VIEW_CNTL *adccntl){
if (adccntl->inpf) fclose(adccntl->inpf);
if (adccntl->viewFile) fclose(adccntl->viewFile);
if (adccntl->fileOfChunks) fclose(adccntl->fileOfChunks);
if (adccntl->groupbyFile) fclose(adccntl->groupbyFile);
if (adccntl->adcViewSizesFile) fclose(adccntl->adcViewSizesFile);
if (adccntl->viewSizesFile) fclose(adccntl->viewSizesFile);
if (DeleteOneFile(adccntl->chunksFileName))
return ADC_FILE_DELETE_FAILURE;
if (DeleteOneFile(adccntl->viewSizesFileName))
return ADC_FILE_DELETE_FAILURE;
if (DeleteOneFile(adccntl->groupbyFileName))
return ADC_FILE_DELETE_FAILURE;
if (adccntl->chunksParams){
free(adccntl->chunksParams);
adccntl->chunksParams=NULL;
}
if (adccntl->memPool){ free(adccntl->memPool); adccntl->memPool=NULL;}
if (adccntl->jpp){ free(adccntl->jpp); adccntl->jpp=NULL; }
if (adccntl->lpp){ free(adccntl->lpp); adccntl->lpp=NULL; }
if (adccntl->logf) fclose(adccntl->logf);
free(adccntl);
return ADC_OK;
}
void AdcCntlLog(ADC_VIEW_CNTL *adccntlp){
fprintf(adccntlp->logf," memoryLimit = %20d\n",
adccntlp->memoryLimit);
fprintf(adccntlp->logf," treeNodeSize = %20d\n",
adccntlp->tree->treeNodeSize);
fprintf(adccntlp->logf," treeMemoryLimit = %20d\n",
adccntlp->tree->memoryLimit);
fprintf(adccntlp->logf," nNodesLimit = %20d\n",
adccntlp->tree->nNodesLimit);
fprintf(adccntlp->logf,"freeNodeCounter = %20d\n",
adccntlp->tree->freeNodeCounter);
fprintf(adccntlp->logf," nViewRows = %20d\n",
adccntlp->nViewRows);
}
int32 ViewSizesVerification(ADC_VIEW_CNTL *adccntlp){
char inps[MAX_PARAM_LINE_SIZE];
char msg[64];
uint32 *viewCounts;
uint32 selection_viewSize[2];
uint32 sz;
uint32 sel[64];
uint32 i;
uint32 k;
uint64 tx;
uint32 iTx;
viewCounts = (uint32 *) &adccntlp->memPool[0];
for ( i = 0; i <= adccntlp->nViewLimit; i++) viewCounts[i] = 0;
FSEEK(adccntlp->viewSizesFile, 0L, SEEK_SET);
FSEEK(adccntlp->adcViewSizesFile, 0L, SEEK_SET);
while(fread(selection_viewSize, 8, 1, adccntlp->viewSizesFile)){
viewCounts[selection_viewSize[0]] = selection_viewSize[1];
}
k = 0;
while ( fscanf(adccntlp->adcViewSizesFile, "%s", inps) != EOF ){
if ( strcmp(inps, "Selection:") == 0 ) {
while ( fscanf(adccntlp->adcViewSizesFile, "%s", inps)) {
if ( strcmp(inps, "View") == 0 ) break;
sel[k++] = atoi(inps);
}
}
if ( strcmp(inps, "Size:") == 0 ) {
fscanf(adccntlp->adcViewSizesFile, "%s", inps);
sz = atoi(inps);
CreateBinTuple(&tx, sel, k);
iTx = (int32)(tx>>(64-adccntlp->nTopDims));
adccntlp->verificationFailed = 0;
if (!adccntlp->numberOfMadeViews) adccntlp->verificationFailed = 1;
if ( viewCounts[iTx] != 0){
if (viewCounts[iTx] != sz) {
if (viewCounts[iTx] != adccntlp->nInputRecs){
fprintf(adccntlp->logf,
"A view size is wrong: genSz=%d calcSz=%d\n",
sz, viewCounts[iTx]);
adccntlp->verificationFailed = 1;
return ADC_VERIFICATION_FAILED;
}
}
}
k = 0;
}
} /* of while() */
fprintf(adccntlp->logf,
"\n\nMeaning of the log file colums is as follows:\n");
fprintf(adccntlp->logf,
"Row Number | Groupby | View Size | Measure Sums | Number of Chunks\n");
if (!adccntlp->verificationFailed)
strcpy(msg, "Verification=passed");
else strcpy(msg, "Verification=failed");
FSEEK(adccntlp->logf, 0L, SEEK_SET);
fprintf(adccntlp->logf, msg);
FSEEK(adccntlp->logf, 0L, SEEK_END);
FSEEK(adccntlp->viewSizesFile, 0L, SEEK_SET);
return ADC_OK;
}
int32 ComputeGivenGroupbys(ADC_VIEW_CNTL *adccntlp){
int32 retCode;
uint32 i;
uint64 binRepTuple;
uint32 ut32;
uint32 nViews = 0;
uint32 nSelectedDims;
uint32 smp;
#ifdef IN_CORE
uint32 firstView = 1;
#endif
uint32 selection_viewsize[2];
char ttout[16];
while (fread(&binRepTuple, 8, 1, adccntlp->groupbyFile )){
for(i = 0; i < adccntlp->nm; i++) adccntlp->checksums[i]=0;
nViews++;
swap8(&binRepTuple);
GetRegTupleFromBin64(binRepTuple, adccntlp->selection,
adccntlp->nTopDims, &nSelectedDims);
ut32 = (uint32)(binRepTuple>>(64-adccntlp->nTopDims));
selection_viewsize[0] = ut32;
ut32 <<= (32-adccntlp->nTopDims);
adccntlp->groupby = ut32;
#ifndef IN_CORE
smp = GetParent(adccntlp, ut32);
#endif
#ifdef IN_CORE
if (firstView) {
firstView = 0;
if(ReadWholeInputData(adccntlp, adccntlp->inpf)) {
fprintf(stderr, "ReadWholeInputData failed.\n");
return ADC_INTERNAL_ERROR;
}
}
smp = noneParent;
#endif
if (smp != noneParent)
GetRegTupleFromParent(binRepTuple,
adccntlp->parBinRepTuple,
adccntlp->selection,
adccntlp->nTopDims);
InitAdcViewCntl(adccntlp, nSelectedDims,
adccntlp->selection, (smp == noneParent)?0:1);
#ifdef IN_CORE
if(retCode = ComputeMemoryFittedView(adccntlp)) {
fprintf(stderr, "ComputeMemoryFittedView failed.\n");
return retCode;
}
#else
#ifdef OPTIMIZATION
if (smp == prefixedParent){
if (retCode = PrefixedAggregate(adccntlp, adccntlp->viewFile)) {
fprintf(stderr,
"ComputeGivenGroupbys.PrefixedAggregate failed.\n");
return retCode;
}
adccntlp->numberOfPrefixedGroupbys++;
}else if (smp == sharedSortParent) {
if (retCode = SharedSortAggregate(adccntlp)) {
fprintf(stderr,
"ComputeGivenGroupbys.SharedSortAggregate failed.\n");
return retCode;
}
adccntlp->numberOfSharedSortGroupbys++;
}else
#endif /* OPTIMIZATION */
{
if( smp != noneParent ) {
retCode = RunFormation(adccntlp, adccntlp->viewFile);
if(retCode!=ADC_OK){
fprintf(stderr,
"ComputrGivenGroupbys.RunFormation failed.\n");
return retCode;
}
}else{
if ((retCode=RunFormation (adccntlp, adccntlp->inpf)) != ADC_OK){
fprintf(stderr,
"ComputrGivenGroupbys.RunFormation failed.\n");
return retCode;
}
adccntlp->numberOfViewsMadeFromInput++;
}
if(!adccntlp->numberOfChunks){
uint64 ordern=0;
adccntlp->nViewRows = adccntlp->tree->count;
adccntlp->totalOfViewRows += adccntlp->nViewRows;
retCode=WriteViewToDiskCS(adccntlp,adccntlp->tree->root.left,&ordern);
if(retCode!=ADC_OK){
fprintf(stderr,
"ComputeGivenGroupbys.WriteViewToDisk: Write error.\n");
return ADC_WRITE_FAILED;
}
}else {
retCode=MultiWayMerge(adccntlp);
if(retCode!=ADC_OK) {
fprintf(stderr,"ComputeGivenGroupbys.MultiWayMerge failed.\n");
return retCode;
}
}
}
JobPoolUpdate(adccntlp);
adccntlp->accViewFileOffset +=
(int64)(adccntlp->nViewRows*adccntlp->outRecSize);
FSEEK(adccntlp->fileOfChunks, 0L, SEEK_SET);
FSEEK(adccntlp->inpf, 0L, SEEK_SET);
#endif /* IN_CORE */
for( i = 0; i < adccntlp->nm; i++)
adccntlp->totchs[i]+=adccntlp->checksums[i];
selection_viewsize[1] = adccntlp->nViewRows;
fwrite(selection_viewsize, 8, 1, adccntlp->viewSizesFile);
adccntlp->totalViewFileSize +=
adccntlp->outRecSize*adccntlp->nViewRows;
sprintf(ttout, "%7d ", nViews);
WriteOne32Tuple(ttout, adccntlp->groupby,
adccntlp->nTopDims, adccntlp->logf);
fprintf(adccntlp->logf, " | %15d | ", adccntlp->nViewRows);
for ( i = 0; i < adccntlp->nm; i++ ){
fprintf(adccntlp->logf, " %20lld", adccntlp->checksums[i]);
}
fprintf(adccntlp->logf, " | %5d", adccntlp->numberOfChunks);
}
adccntlp->numberOfMadeViews = nViews;
if(ViewSizesVerification(adccntlp)) return ADC_VERIFICATION_FAILED;
return ADC_OK;
}