#include "QueryProcessing.h"
#include "QueryOptimizer.h"
#include "MultiGPUProcessing.h"
#include "CacheManager.h"
#include "CPUProcessing.h"
#include "CostModel.h"
enum Policy {
aware, partitioning, replication, none
};
int main() {
bool verbose = true;
bool skipping = true;
bool adaptive = true;
int latemat = 2;
bool reorder = true;
Distribution dist = None;
double alpha = 2.5;
double alpha1 = 3.0;
//52428800 adalah 50 segment
// size_t cache_size = ((size_t) (52428800)) * 20 * NUM_GPU; //200 MB
// size_t broadcast_size = ((size_t) (52428800)) * 5 * NUM_GPU; //200 MB
// size_t processing_size = ((size_t) (52428800)) * 15 * NUM_GPU; //400 MB
// size_t pinned_memsize = ((size_t) (52428800)) * 20; //400 MB
size_t cache_size, broadcast_size, processing_size, pinned_memsize;
bool isopt = false;
if (!adaptive || !reorder || latemat == 0) {
cache_size = ((size_t) (1048576)) * 1000 * NUM_GPU; //1048576 is 1 segment
broadcast_size = ((size_t) (1048576)) * 500 * NUM_GPU; //1048576 is 1 segment
processing_size = ((size_t) (524288)) * 2220 * NUM_GPU; //524288 is 1 segment
pinned_memsize = ((size_t) (524288)) * 1000; //524288 is 1 segment
isopt = true;
} else if (SF == 162 && NUM_GPU == 8) {
cache_size = ((size_t) (1048576)) * 2000 * NUM_GPU; //1048576 is 1 segment
broadcast_size = ((size_t) (1048576)) * 700 * NUM_GPU; //1048576 is 1 segment
processing_size = ((size_t) (524288)) * 1020 * NUM_GPU; //524288 is 1 segment
pinned_memsize = ((size_t) (524288)) * 1000; //524288 is 1 segment
} else if (SF == 82 || SF == 42 || SF == 162) {
cache_size = ((size_t) (1048576)) * 2000 * NUM_GPU; //1048576 is 1 segment
broadcast_size = ((size_t) (1048576)) * 500 * NUM_GPU; //1048576 is 1 segment
processing_size = ((size_t) (524288)) * 1220 * NUM_GPU; //524288 is 1 segment
pinned_memsize = ((size_t) (524288)) * 1000; //524288 is 1 segment
} else if (SF == 322) {
//WARNING::IF THE TOTAL = 3800, IT WOULD NOT WORK FOR 8 GPUS, THIS IS BECAUSE NCCL NEEDS SOME GPU MEM TO OPERATE
cache_size = ((size_t) (1048576)) * 2000 * NUM_GPU; //1048576 is 1 segment
broadcast_size = ((size_t) (1048576)) * 200 * NUM_GPU; //1048576 is 1 segment
processing_size = ((size_t) (524288)) * 1520 * NUM_GPU; //524288 is 1 segment
pinned_memsize = ((size_t) (524288)) * 1000; //524288 is 1 segment
} else if (SF == 402) {
//WARNING::IF THE TOTAL = 3800, IT WOULD NOT WORK FOR 8 GPUS, THIS IS BECAUSE NCCL NEEDS SOME GPU MEM TO OPERATE
cache_size = ((size_t) (1048576)) * 1900 * NUM_GPU; //1048576 is 1 segment
broadcast_size = ((size_t) (1048576)) * 250 * NUM_GPU; //1048576 is 1 segment
processing_size = ((size_t) (524288)) * 1570 * NUM_GPU; //524288 is 1 segment
pinned_memsize = ((size_t) (524288)) * 2000; //524288 is 1 segment
} else if (SF % 10 == 0) {// original SSB
assert(dist == None);
cache_size = ((size_t) (1048576)) * 2000 * NUM_GPU; //1048576 is 1 segment
broadcast_size = ((size_t) (1048576)) * 50 * NUM_GPU; //1048576 is 1 segment
processing_size = ((size_t) (524288)) * 1670 * NUM_GPU; //524288 is 1 segment
pinned_memsize = ((size_t) (524288)) * 2000; //524288 is 1 segment
} else {
cache_size = ((size_t) (1048576)) * 2000 * NUM_GPU; //1048576 is 1 segment
broadcast_size = ((size_t) (1048576)) * 200 * NUM_GPU; //1048576 is 1 segment
processing_size = ((size_t) (524288)) * 1520 * NUM_GPU; //524288 is 1 segment
pinned_memsize = ((size_t) (524288)) * 1000; //524288 is 1 segment
}
for (int src = 0; src < NUM_GPU; src++) {
CubDebugExit(cudaSetDevice(src));
for (int dst = 0; dst < NUM_GPU; dst++) {
if (src != dst) CubDebugExit(cudaDeviceEnablePeerAccess(dst, 0));
}
}
CubDebugExit(cudaSetDevice(0));
/*List GPU Device*/
int *DeviceList = (int *) malloc (ALL_GPU * sizeof(int));
for(int gpu = 0; gpu < ALL_GPU; ++gpu)
DeviceList[gpu] = gpu;
/*NCCL Init*/
//NCCL still does not work!! But the performance is bad anyway!!
ncclComm_t* comms = (ncclComm_t*) malloc(sizeof(ncclComm_t) * ALL_GPU);
ncclCommInitAll(comms, ALL_GPU, DeviceList);
//NEED TO SYNCHRONIZE AFTER NCCL INIT OTHERWISE IT WILL CAUSE ERROR
for(int gpu = 0; gpu < ALL_GPU; ++gpu) {
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaDeviceSynchronize());
}
CubDebugExit(cudaSetDevice(0));
//2, 4, 8, 12, 16, 20, 24, 32, 40
//4, 10, 20, 30, 40
//TODO: make it support cache size > 8 GB (there are lots of integer overflow resulting in negative offset to the gpuCache) should have used unsigned int everywhere
CPUGPUProcessing* cgp = new CPUGPUProcessing(cache_size, broadcast_size, processing_size, pinned_memsize, comms,
verbose, skipping, reorder);
CacheManager* cm = cgp->cm;
string input, query, many;
int many_query;
Policy repl_policy = none;
double time = 0, execution_time = 0, optimization_time = 0, merging_time = 0, preparing_time = 0, nvlink_traffic = 0, shuffle_time = 0, gpu_total_time = 0;
// unsigned long long cpu_to_gpu = 0, gpu_to_cpu = 0;
// unsigned long long gpu_traffic = 0, cpu_traffic = 0, repl_traffic = 0;
// double repl_time = 0;
bool exit = false;
QueryProcessing* qp = new QueryProcessing(cgp, verbose, dist, comms, adaptive, latemat);
// cm->ShuffleAware();
// cm->PartitioningOnly();
// cm->ReplicationOnly();
// fact table
// for (int col = 1; col <= 9; col++) {
// ColumnInfo* column = cm->allColumn[col];
// // } else {
// // if (col != 5 && col != 6 && col != 7 && col != 8 && col != 9) {
// // if (col != 8 && col != 7) {
// // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
// // Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// // int gpu = seg->segment_id % NUM_GPU;
// // cm->cacheSegmentInGPU(seg, gpu);
// // }
// // }
// // }
// if (col == 5) {
// for (int seg_id = 0; seg_id < 50; seg_id++) {
// Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// int gpu = seg->segment_id % NUM_GPU;
// cm->cacheSegmentInGPU(seg, gpu);
// }
// } else if (col == 4) {
// for (int seg_id = 0; seg_id < 100; seg_id++) {
// Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// int gpu = seg->segment_id % NUM_GPU;
// cm->cacheSegmentInGPU(seg, gpu);
// }
// } else {
// for (int seg_id = 0; seg_id < 150; seg_id++) {
// Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// int gpu = seg->segment_id % NUM_GPU;
// cm->cacheSegmentInGPU(seg, gpu);
// }
// }
// }
// //customer table
// for (int col = 10; col <= 13; col++) {
// ColumnInfo* column = cm->allColumn[col];
// // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
// // Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// // cm->cacheSegmentInGPU(seg, gpu);
// // }
// // }
// if (col == 10) {
// for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
// Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// int gpu = seg->segment_id % NUM_GPU;
// cm->cacheSegmentInGPU(seg, gpu);
// }
// } else {
// for (int seg_id = 0; seg_id < column->total_segment/2; seg_id++) {
// Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// int gpu = seg->segment_id % NUM_GPU;
// // int gpu = seg->segment_id / NUM_GPU;
// cm->cacheSegmentInGPU(seg, gpu);
// }
// }
// }
// //supplier table
// for (int col = 14; col <= 17; col++) {
// ColumnInfo* column = cm->allColumn[col];
// // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
// // Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// // cm->cacheSegmentInGPU(seg,gpu);
// // }
// // }
// // int step = column->total_segment / NUM_GPU;
// // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// // for (int seg_id = gpu * step; seg_id < gpu * step + step; seg_id++) {
// // Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// // // int gpu = seg->segment_id % NUM_GPU;
// // cm->cacheSegmentInGPU(seg, gpu);
// // }
// // }
// // if (col == 14) {
// for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
// Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// int gpu = seg->segment_id % NUM_GPU;
// // int gpu = seg->segment_id / NUM_GPU;
// cm->cacheSegmentInGPU(seg, gpu);
// }
// // } else {
// // for (int seg_id = 0; seg_id < column->total_segment/2; seg_id++) {
// // Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// // int gpu = seg->segment_id % NUM_GPU;
// // // int gpu = seg->segment_id / NUM_GPU;
// // cm->cacheSegmentInGPU(seg, gpu);
// // }
// // }
// }
// //part table
// for (int col = 18; col <= 21; col++) {
// ColumnInfo* column = cm->allColumn[col];
// // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
// // Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// // cm->cacheSegmentInGPU(seg,gpu);
// // }
// // }
// // int step = column->total_segment / NUM_GPU;
// // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// // for (int seg_id = gpu * step; seg_id < gpu * step + step; seg_id++) {
// // Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// // // int gpu = seg->segment_id % NUM_GPU;
// // cm->cacheSegmentInGPU(seg, gpu);
// // }
// // }
// if (col == 18) {
// for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
// Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// int gpu = seg->segment_id % NUM_GPU;
// cm->cacheSegmentInGPU(seg, gpu);
// }
// } else {
// for (int seg_id = 0; seg_id < column->total_segment/2; seg_id++) {
// Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// int gpu = seg->segment_id % NUM_GPU;
// cm->cacheSegmentInGPU(seg, gpu);
// }
// }
// }
// //date table
// for (int col = 22; col <= 24; col++) {
// ColumnInfo* column = cm->allColumn[col];
// for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
// Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// cm->cacheSegmentInGPU(seg,gpu);
// }
// }
// // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
// // Segment* seg = cm->index_to_segment[column->column_id][seg_id];
// // int gpu = seg->segment_id % NUM_GPU;
// // cm->cacheSegmentInGPU(seg, gpu);
// // }
// }
// cout << "Renewing metadata" << endl;
// cgp->renewMetadata();
// cout << "Metadata renewed" << endl;
if (dist == Zipf) {
qp->qo->setDistributionZipfian(alpha, alpha1);
}
while (!exit) {
cout << "Select Options:" << endl;
cout << "1. Run Specific Query" << endl;
cout << "2. Run Random Queries" << endl;
cout << "3. Replacement Shuffle-Aware" << endl;
cout << "4. Replacement Partitioning-Only" << endl;
cout << "5. Replacement Replication-Only" << endl;
cout << "exit. Exit" << endl;
cout << "clear. Delete Columns from GPU" << endl;
cout << "skipping. Toggle segment skipping" << endl;
cout << "latemat. Toggle late materialization" << endl;
cout << "reorder. Toggle join reordering" << endl;
cout << "adaptive. Toggle adaptive execution" << endl;
cout << "zipf. Enable zipfian distribution" << endl;
cout << "none. Enable no distribution" << endl;
cout << "Your Input: ";
cin >> input;
if (input.compare("1") == 0) {
// time = 0; cpu_traffic = 0; gpu_traffic = 0; malloc_time_total = 0; cpu_to_gpu = 0; gpu_to_cpu = 0; execution_time = 0; optimization_time = 0; merging_time = 0;
time = 0, execution_time = 0, optimization_time = 0, merging_time = 0, preparing_time = 0, nvlink_traffic = 0, shuffle_time = 0, gpu_total_time = 0;
cgp->resetTime();
cout << "Input Query: ";
cin >> query;
qp->setQuery(stoi(query));
if (repl_policy == aware) qp->ShuffleAwareExec();
else if (repl_policy == partitioning) qp->PartitioningOnlyExec();
else if (repl_policy == replication) qp->ReplicationOnlyExec();
time += qp->processQuery();
execution_time += cgp->execution_total;
optimization_time += cgp->optimization_total;
merging_time += cgp->merging_total;
preparing_time += cgp->preparing_total;
nvlink_traffic += cgp->nvlink_total;
shuffle_time += cgp->shuffle_total;
gpu_total_time += cgp->gpu_total;
cgp->resetTime();
cout << "Cumulated Time: " << time << endl;
cout << "Execution Time: " << execution_time << endl;
cout << "NVlink traffic: " << nvlink_traffic << endl;
cout << endl;
} else if (input.compare("2") == 0) {
// time = 0; cpu_traffic = 0; gpu_traffic = 0; malloc_time_total = 0; cpu_to_gpu = 0; gpu_to_cpu = 0; execution_time = 0; optimization_time = 0; merging_time = 0;
time = 0, execution_time = 0, optimization_time = 0, merging_time = 0, preparing_time = 0, nvlink_traffic = 0, shuffle_time = 0, gpu_total_time = 0;
if (dist == None) {
int queries[13] = {43, 42, 41, 34, 33, 32, 31, 23, 22, 21, 13, 12, 11};
double tot_time[13], exec_time[13], opt_time[13], merge_time[13], prep_time[13], nvtraf[13], shuf_time[13], gpu_time[13];
cgp->resetTime();
cout << "Executing 13 Queries" << endl;
for (int i = 0; i < 13; i++) {
qp->setQuery(queries[i]);
if (repl_policy == aware) qp->ShuffleAwareExec();
else if (repl_policy == partitioning) qp->PartitioningOnlyExec();
else if (repl_policy == replication) qp->ReplicationOnlyExec();
tot_time[i] = qp->processQuery();
exec_time[i] = cgp->execution_total;
opt_time[i] = cgp->optimization_total;
merge_time[i] = cgp->merging_total;
prep_time[i] = cgp->preparing_total;
nvtraf[i] = cgp->nvlink_total;
shuf_time[i] = cgp->shuffle_total;
gpu_time[i] = cgp->gpu_total;
cgp->resetTime();
}
cout << endl;
cout << "How many queries: ";
cin >> many;
many_query = stoi(many);
for (int i = 0; i < many_query; i++) {
time += tot_time[i % 13];
execution_time += exec_time[i % 13];
optimization_time += opt_time[i % 13];
merging_time += merge_time[i % 13];
preparing_time += prep_time[i % 13];
nvlink_traffic += nvtraf[i % 13];
shuffle_time += shuf_time[i % 13];
gpu_total_time += gpu_time[i % 13];
}
cout << endl;
} else if (dist == Zipf) {
srand(123);
cout << "How many queries: ";
cin >> many;
many_query = stoi(many);
cgp->resetTime();
cout << "Executing Random Query" << endl;
qp->count_zipfian = 0;
for (int i = 0; i < many_query; i++) {
qp->generate_rand_query();
if (repl_policy == aware) qp->ShuffleAwareExec();
else if (repl_policy == partitioning) qp->PartitioningOnlyExec();
else if (repl_policy == replication) qp->ReplicationOnlyExec();
time += qp->processQuery();
execution_time += cgp->execution_total;
optimization_time += cgp->optimization_total;
merging_time += cgp->merging_total;
preparing_time += cgp->preparing_total;
nvlink_traffic += cgp->nvlink_total;
shuffle_time += cgp->shuffle_total;
gpu_total_time += cgp->gpu_total;
cgp->resetTime();
}
cout << endl;
cout << "Percentage predicate later > 19960101: " << qp->count_zipfian*1.0/many_query << endl;
}
cout << "Cumulated Time: " << time << endl;
cout << "Execution Time: " << execution_time << endl;
cout << "NVlink traffic: " << nvlink_traffic << endl;
cout << endl;
string runs, policy, skip;
if (repl_policy == none) policy = "None";
else if (repl_policy == aware) policy = "ShuffleAware";
else if (repl_policy == partitioning) policy = "Partitioning";
else if (repl_policy == replication) policy = "Replication";
if (skipping) skip = "Skipping";
else skip = "NoSkip";
FILE *fptr;
if (adaptive && reorder && latemat == 2 && !isopt) {
if (dist == None) runs = string("logs/runs/None/") + policy + to_string(NUM_GPU) + "GPUsSF" + to_string(SF) + string(skip);
else if (dist == Zipf) runs = string("logs/runs/Zipf/") + policy + to_string(NUM_GPU) + "GPUsSF" + to_string(SF) + "alpha" + to_string( (int) (alpha * 10) ) + string(skip);
fptr = fopen(runs.c_str(), "w");
if (fptr == NULL)
{
printf("Could not open file\n");
cout << runs << endl;
assert(0);
}
fprintf(fptr, "{\"SF\":%u,\"cumulated_time\":%.2f,\"execution_time\":%.2f,\"merging_time\":%.2f,\"optimization_time\":%.2f,\"prepare time\":%.2f,\"nvlink traffic\":%.2f,\"shuffle time\":%.2f,\"gpu kernel time\":%.2f}\n", \
SF, time, execution_time, merging_time, optimization_time, preparing_time, nvlink_traffic, shuffle_time, gpu_total_time);
fclose(fptr);
}
if (dist == None && repl_policy == aware && isopt) {
string opt;
if (!latemat && !adaptive && !reorder)
opt = "logs/runs/opt/NoOpt" + to_string(NUM_GPU) + "GPUs" + to_string(SF)+ string(skip);
else if (!latemat && adaptive && !reorder)
opt = "logs/runs/opt/Adaptive" + to_string(NUM_GPU) + "GPUs"+ to_string(SF)+ string(skip);
else if (!latemat && adaptive && reorder)
opt = "logs/runs/opt/Reorder" + to_string(NUM_GPU) + "GPUs"+ to_string(SF)+ string(skip);
else if (latemat && adaptive && reorder)
opt = "logs/runs/opt/Latemat" + to_string(NUM_GPU) + "GPUs" + to_string(SF)+ string(skip);
else assert(0);
fptr = fopen(opt.c_str(), "w");
if (fptr == NULL)
{
printf("Could not open file\n");
assert(0);
}
fprintf(fptr, "{\"SF\":%u,\"cumulated_time\":%.2f,\"execution_time\":%.2f,\"merging_time\":%.2f,\"optimization_time\":%.2f,\"prepare time\":%.2f,\"nvlink traffic\":%.2f,\"shuffle time\":%.2f,\"gpu kernel time\":%.2f}\n", \
SF, time, execution_time, merging_time, optimization_time, preparing_time, nvlink_traffic, shuffle_time, gpu_total_time);
fclose(fptr);
}
} else if (input.compare("3") == 0) {
repl_policy = aware;
bool opt = !adaptive || !reorder || !latemat;
cm->ShuffleAware(dist, opt);
cout << "Renewing metadata" << endl;
cgp->renewMetadata();
cout << "Metadata renewed" << endl;
} else if (input.compare("4") == 0) {
repl_policy = partitioning;
cm->PartitioningOnly(dist);
cout << "Renewing metadata" << endl;
cgp->renewMetadata();
cout << "Metadata renewed" << endl;
} else if (input.compare("5") == 0) {
repl_policy = replication;
cm->ReplicationOnly(dist);
cout << "Renewing metadata" << endl;
cgp->renewMetadata();
cout << "Metadata renewed" << endl;
} else if (input.compare("clear") == 0) {
repl_policy = none;
cgp->cm->deleteAll();
cout << "Renewing metadata" << endl;
cgp->renewMetadata();
cout << "Metadata renewed" << endl;
} else if (input.compare("latemat") == 0) {
if (latemat == 0) latemat = 2;
else latemat = 0;
qp->latemat = latemat;
if (latemat != 0) cout << "Late materialization is enabled" << endl;
else cout << "Late materialization is disabled" << endl;
cout << endl;
} else if (input.compare("reorder") == 0) {
reorder = !reorder;
cgp->reorder = reorder;
if (reorder) cout << "Join reordering is enabled" << endl;
else cout << "Join reordering is disabled" << endl;
cout << endl;
} else if (input.compare("adaptive") == 0) {
adaptive = !adaptive;
qp->adaptive = adaptive;
if (adaptive) cout << "Adaptive execution is enabled" << endl;
else cout << "Adaptive execution is disabled" << endl;
cout << endl;
} else if (input.compare("skipping") == 0) {
skipping = !skipping;
cgp->skipping = skipping;
cgp->qo->skipping = skipping;
qp->skipping = skipping;
if (skipping) cout << "Segment skipping is enabled" << endl;
else cout << "Segment skipping is disabled" << endl;
cout << endl;
} else if (input.compare("zipf") == 0) {
dist = Zipf;
qp->qo->setDistributionZipfian(alpha, alpha1);
qp->dist = Zipf;
cout << "Zipf distribution is enabled" << endl;
cout << endl;
} else if (input.compare("none") == 0) {
dist = None;
qp->qo->setDistributionZipfian(alpha, alpha1);
qp->dist = None;
cout << "No distribution is enabled" << endl;
cout << endl;
} else {
exit = true;
}
}
for(int gpu = 0; gpu < ALL_GPU; gpu++) {
ncclCommDestroy(comms[gpu]);
}
}
//change ssb utils
//change common
//change minmax.sh
//recompile minmax.cpp, minmax_newbench.cpp, minmaxsort.cpp
//recompile gen_synthetic_bench
//recompile main_multi_gpu