#include "QueryProcessing.h" #include "QueryOptimizer.h" #include "MultiGPUProcessing.h" #include "CacheManager.h" #include "CPUProcessing.h" #include "CostModel.h" enum Policy { aware, partitioning, replication, none }; int main() { bool verbose = true; bool skipping = true; bool adaptive = true; int latemat = 2; bool reorder = true; Distribution dist = None; double alpha = 2.5; double alpha1 = 3.0; //52428800 adalah 50 segment // size_t cache_size = ((size_t) (52428800)) * 20 * NUM_GPU; //200 MB // size_t broadcast_size = ((size_t) (52428800)) * 5 * NUM_GPU; //200 MB // size_t processing_size = ((size_t) (52428800)) * 15 * NUM_GPU; //400 MB // size_t pinned_memsize = ((size_t) (52428800)) * 20; //400 MB size_t cache_size, broadcast_size, processing_size, pinned_memsize; bool isopt = false; if (!adaptive || !reorder || latemat == 0) { cache_size = ((size_t) (1048576)) * 1000 * NUM_GPU; //1048576 is 1 segment broadcast_size = ((size_t) (1048576)) * 500 * NUM_GPU; //1048576 is 1 segment processing_size = ((size_t) (524288)) * 2220 * NUM_GPU; //524288 is 1 segment pinned_memsize = ((size_t) (524288)) * 1000; //524288 is 1 segment isopt = true; } else if (SF == 162 && NUM_GPU == 8) { cache_size = ((size_t) (1048576)) * 2000 * NUM_GPU; //1048576 is 1 segment broadcast_size = ((size_t) (1048576)) * 700 * NUM_GPU; //1048576 is 1 segment processing_size = ((size_t) (524288)) * 1020 * NUM_GPU; //524288 is 1 segment pinned_memsize = ((size_t) (524288)) * 1000; //524288 is 1 segment } else if (SF == 82 || SF == 42 || SF == 162) { cache_size = ((size_t) (1048576)) * 2000 * NUM_GPU; //1048576 is 1 segment broadcast_size = ((size_t) (1048576)) * 500 * NUM_GPU; //1048576 is 1 segment processing_size = ((size_t) (524288)) * 1220 * NUM_GPU; //524288 is 1 segment pinned_memsize = ((size_t) (524288)) * 1000; //524288 is 1 segment } else if (SF == 322) { //WARNING::IF THE TOTAL = 3800, IT WOULD NOT WORK FOR 8 GPUS, THIS IS BECAUSE NCCL NEEDS SOME GPU MEM TO OPERATE cache_size = ((size_t) (1048576)) * 2000 * NUM_GPU; //1048576 is 1 segment broadcast_size = ((size_t) (1048576)) * 200 * NUM_GPU; //1048576 is 1 segment processing_size = ((size_t) (524288)) * 1520 * NUM_GPU; //524288 is 1 segment pinned_memsize = ((size_t) (524288)) * 1000; //524288 is 1 segment } else if (SF == 402) { //WARNING::IF THE TOTAL = 3800, IT WOULD NOT WORK FOR 8 GPUS, THIS IS BECAUSE NCCL NEEDS SOME GPU MEM TO OPERATE cache_size = ((size_t) (1048576)) * 1900 * NUM_GPU; //1048576 is 1 segment broadcast_size = ((size_t) (1048576)) * 250 * NUM_GPU; //1048576 is 1 segment processing_size = ((size_t) (524288)) * 1570 * NUM_GPU; //524288 is 1 segment pinned_memsize = ((size_t) (524288)) * 2000; //524288 is 1 segment } else if (SF % 10 == 0) {// original SSB assert(dist == None); cache_size = ((size_t) (1048576)) * 2000 * NUM_GPU; //1048576 is 1 segment broadcast_size = ((size_t) (1048576)) * 50 * NUM_GPU; //1048576 is 1 segment processing_size = ((size_t) (524288)) * 1670 * NUM_GPU; //524288 is 1 segment pinned_memsize = ((size_t) (524288)) * 2000; //524288 is 1 segment } else { cache_size = ((size_t) (1048576)) * 2000 * NUM_GPU; //1048576 is 1 segment broadcast_size = ((size_t) (1048576)) * 200 * NUM_GPU; //1048576 is 1 segment processing_size = ((size_t) (524288)) * 1520 * NUM_GPU; //524288 is 1 segment pinned_memsize = ((size_t) (524288)) * 1000; //524288 is 1 segment } for (int src = 0; src < NUM_GPU; src++) { CubDebugExit(cudaSetDevice(src)); for (int dst = 0; dst < NUM_GPU; dst++) { if (src != dst) CubDebugExit(cudaDeviceEnablePeerAccess(dst, 0)); } } CubDebugExit(cudaSetDevice(0)); /*List GPU Device*/ int *DeviceList = (int *) malloc (ALL_GPU * sizeof(int)); for(int gpu = 0; gpu < ALL_GPU; ++gpu) DeviceList[gpu] = gpu; /*NCCL Init*/ //NCCL still does not work!! But the performance is bad anyway!! ncclComm_t* comms = (ncclComm_t*) malloc(sizeof(ncclComm_t) * ALL_GPU); ncclCommInitAll(comms, ALL_GPU, DeviceList); //NEED TO SYNCHRONIZE AFTER NCCL INIT OTHERWISE IT WILL CAUSE ERROR for(int gpu = 0; gpu < ALL_GPU; ++gpu) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaDeviceSynchronize()); } CubDebugExit(cudaSetDevice(0)); //2, 4, 8, 12, 16, 20, 24, 32, 40 //4, 10, 20, 30, 40 //TODO: make it support cache size > 8 GB (there are lots of integer overflow resulting in negative offset to the gpuCache) should have used unsigned int everywhere CPUGPUProcessing* cgp = new CPUGPUProcessing(cache_size, broadcast_size, processing_size, pinned_memsize, comms, verbose, skipping, reorder); CacheManager* cm = cgp->cm; string input, query, many; int many_query; Policy repl_policy = none; double time = 0, execution_time = 0, optimization_time = 0, merging_time = 0, preparing_time = 0, nvlink_traffic = 0, shuffle_time = 0, gpu_total_time = 0; // unsigned long long cpu_to_gpu = 0, gpu_to_cpu = 0; // unsigned long long gpu_traffic = 0, cpu_traffic = 0, repl_traffic = 0; // double repl_time = 0; bool exit = false; QueryProcessing* qp = new QueryProcessing(cgp, verbose, dist, comms, adaptive, latemat); // cm->ShuffleAware(); // cm->PartitioningOnly(); // cm->ReplicationOnly(); // fact table // for (int col = 1; col <= 9; col++) { // ColumnInfo* column = cm->allColumn[col]; // // } else { // // if (col != 5 && col != 6 && col != 7 && col != 8 && col != 9) { // // if (col != 8 && col != 7) { // // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { // // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // // int gpu = seg->segment_id % NUM_GPU; // // cm->cacheSegmentInGPU(seg, gpu); // // } // // } // // } // if (col == 5) { // for (int seg_id = 0; seg_id < 50; seg_id++) { // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // int gpu = seg->segment_id % NUM_GPU; // cm->cacheSegmentInGPU(seg, gpu); // } // } else if (col == 4) { // for (int seg_id = 0; seg_id < 100; seg_id++) { // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // int gpu = seg->segment_id % NUM_GPU; // cm->cacheSegmentInGPU(seg, gpu); // } // } else { // for (int seg_id = 0; seg_id < 150; seg_id++) { // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // int gpu = seg->segment_id % NUM_GPU; // cm->cacheSegmentInGPU(seg, gpu); // } // } // } // //customer table // for (int col = 10; col <= 13; col++) { // ColumnInfo* column = cm->allColumn[col]; // // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { // // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // // cm->cacheSegmentInGPU(seg, gpu); // // } // // } // if (col == 10) { // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // int gpu = seg->segment_id % NUM_GPU; // cm->cacheSegmentInGPU(seg, gpu); // } // } else { // for (int seg_id = 0; seg_id < column->total_segment/2; seg_id++) { // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // int gpu = seg->segment_id % NUM_GPU; // // int gpu = seg->segment_id / NUM_GPU; // cm->cacheSegmentInGPU(seg, gpu); // } // } // } // //supplier table // for (int col = 14; col <= 17; col++) { // ColumnInfo* column = cm->allColumn[col]; // // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { // // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // // cm->cacheSegmentInGPU(seg,gpu); // // } // // } // // int step = column->total_segment / NUM_GPU; // // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // // for (int seg_id = gpu * step; seg_id < gpu * step + step; seg_id++) { // // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // // // int gpu = seg->segment_id % NUM_GPU; // // cm->cacheSegmentInGPU(seg, gpu); // // } // // } // // if (col == 14) { // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // int gpu = seg->segment_id % NUM_GPU; // // int gpu = seg->segment_id / NUM_GPU; // cm->cacheSegmentInGPU(seg, gpu); // } // // } else { // // for (int seg_id = 0; seg_id < column->total_segment/2; seg_id++) { // // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // // int gpu = seg->segment_id % NUM_GPU; // // // int gpu = seg->segment_id / NUM_GPU; // // cm->cacheSegmentInGPU(seg, gpu); // // } // // } // } // //part table // for (int col = 18; col <= 21; col++) { // ColumnInfo* column = cm->allColumn[col]; // // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { // // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // // cm->cacheSegmentInGPU(seg,gpu); // // } // // } // // int step = column->total_segment / NUM_GPU; // // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // // for (int seg_id = gpu * step; seg_id < gpu * step + step; seg_id++) { // // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // // // int gpu = seg->segment_id % NUM_GPU; // // cm->cacheSegmentInGPU(seg, gpu); // // } // // } // if (col == 18) { // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // int gpu = seg->segment_id % NUM_GPU; // cm->cacheSegmentInGPU(seg, gpu); // } // } else { // for (int seg_id = 0; seg_id < column->total_segment/2; seg_id++) { // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // int gpu = seg->segment_id % NUM_GPU; // cm->cacheSegmentInGPU(seg, gpu); // } // } // } // //date table // for (int col = 22; col <= 24; col++) { // ColumnInfo* column = cm->allColumn[col]; // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // cm->cacheSegmentInGPU(seg,gpu); // } // } // // for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { // // Segment* seg = cm->index_to_segment[column->column_id][seg_id]; // // int gpu = seg->segment_id % NUM_GPU; // // cm->cacheSegmentInGPU(seg, gpu); // // } // } // cout << "Renewing metadata" << endl; // cgp->renewMetadata(); // cout << "Metadata renewed" << endl; if (dist == Zipf) { qp->qo->setDistributionZipfian(alpha, alpha1); } while (!exit) { cout << "Select Options:" << endl; cout << "1. Run Specific Query" << endl; cout << "2. Run Random Queries" << endl; cout << "3. Replacement Shuffle-Aware" << endl; cout << "4. Replacement Partitioning-Only" << endl; cout << "5. Replacement Replication-Only" << endl; cout << "exit. Exit" << endl; cout << "clear. Delete Columns from GPU" << endl; cout << "skipping. Toggle segment skipping" << endl; cout << "latemat. Toggle late materialization" << endl; cout << "reorder. Toggle join reordering" << endl; cout << "adaptive. Toggle adaptive execution" << endl; cout << "zipf. Enable zipfian distribution" << endl; cout << "none. Enable no distribution" << endl; cout << "Your Input: "; cin >> input; if (input.compare("1") == 0) { // time = 0; cpu_traffic = 0; gpu_traffic = 0; malloc_time_total = 0; cpu_to_gpu = 0; gpu_to_cpu = 0; execution_time = 0; optimization_time = 0; merging_time = 0; time = 0, execution_time = 0, optimization_time = 0, merging_time = 0, preparing_time = 0, nvlink_traffic = 0, shuffle_time = 0, gpu_total_time = 0; cgp->resetTime(); cout << "Input Query: "; cin >> query; qp->setQuery(stoi(query)); if (repl_policy == aware) qp->ShuffleAwareExec(); else if (repl_policy == partitioning) qp->PartitioningOnlyExec(); else if (repl_policy == replication) qp->ReplicationOnlyExec(); time += qp->processQuery(); execution_time += cgp->execution_total; optimization_time += cgp->optimization_total; merging_time += cgp->merging_total; preparing_time += cgp->preparing_total; nvlink_traffic += cgp->nvlink_total; shuffle_time += cgp->shuffle_total; gpu_total_time += cgp->gpu_total; cgp->resetTime(); cout << "Cumulated Time: " << time << endl; cout << "Execution Time: " << execution_time << endl; cout << "NVlink traffic: " << nvlink_traffic << endl; cout << endl; } else if (input.compare("2") == 0) { // time = 0; cpu_traffic = 0; gpu_traffic = 0; malloc_time_total = 0; cpu_to_gpu = 0; gpu_to_cpu = 0; execution_time = 0; optimization_time = 0; merging_time = 0; time = 0, execution_time = 0, optimization_time = 0, merging_time = 0, preparing_time = 0, nvlink_traffic = 0, shuffle_time = 0, gpu_total_time = 0; if (dist == None) { int queries[13] = {43, 42, 41, 34, 33, 32, 31, 23, 22, 21, 13, 12, 11}; double tot_time[13], exec_time[13], opt_time[13], merge_time[13], prep_time[13], nvtraf[13], shuf_time[13], gpu_time[13]; cgp->resetTime(); cout << "Executing 13 Queries" << endl; for (int i = 0; i < 13; i++) { qp->setQuery(queries[i]); if (repl_policy == aware) qp->ShuffleAwareExec(); else if (repl_policy == partitioning) qp->PartitioningOnlyExec(); else if (repl_policy == replication) qp->ReplicationOnlyExec(); tot_time[i] = qp->processQuery(); exec_time[i] = cgp->execution_total; opt_time[i] = cgp->optimization_total; merge_time[i] = cgp->merging_total; prep_time[i] = cgp->preparing_total; nvtraf[i] = cgp->nvlink_total; shuf_time[i] = cgp->shuffle_total; gpu_time[i] = cgp->gpu_total; cgp->resetTime(); } cout << endl; cout << "How many queries: "; cin >> many; many_query = stoi(many); for (int i = 0; i < many_query; i++) { time += tot_time[i % 13]; execution_time += exec_time[i % 13]; optimization_time += opt_time[i % 13]; merging_time += merge_time[i % 13]; preparing_time += prep_time[i % 13]; nvlink_traffic += nvtraf[i % 13]; shuffle_time += shuf_time[i % 13]; gpu_total_time += gpu_time[i % 13]; } cout << endl; } else if (dist == Zipf) { srand(123); cout << "How many queries: "; cin >> many; many_query = stoi(many); cgp->resetTime(); cout << "Executing Random Query" << endl; qp->count_zipfian = 0; for (int i = 0; i < many_query; i++) { qp->generate_rand_query(); if (repl_policy == aware) qp->ShuffleAwareExec(); else if (repl_policy == partitioning) qp->PartitioningOnlyExec(); else if (repl_policy == replication) qp->ReplicationOnlyExec(); time += qp->processQuery(); execution_time += cgp->execution_total; optimization_time += cgp->optimization_total; merging_time += cgp->merging_total; preparing_time += cgp->preparing_total; nvlink_traffic += cgp->nvlink_total; shuffle_time += cgp->shuffle_total; gpu_total_time += cgp->gpu_total; cgp->resetTime(); } cout << endl; cout << "Percentage predicate later > 19960101: " << qp->count_zipfian*1.0/many_query << endl; } cout << "Cumulated Time: " << time << endl; cout << "Execution Time: " << execution_time << endl; cout << "NVlink traffic: " << nvlink_traffic << endl; cout << endl; string runs, policy, skip; if (repl_policy == none) policy = "None"; else if (repl_policy == aware) policy = "ShuffleAware"; else if (repl_policy == partitioning) policy = "Partitioning"; else if (repl_policy == replication) policy = "Replication"; if (skipping) skip = "Skipping"; else skip = "NoSkip"; FILE *fptr; if (adaptive && reorder && latemat == 2 && !isopt) { if (dist == None) runs = string("logs/runs/None/") + policy + to_string(NUM_GPU) + "GPUsSF" + to_string(SF) + string(skip); else if (dist == Zipf) runs = string("logs/runs/Zipf/") + policy + to_string(NUM_GPU) + "GPUsSF" + to_string(SF) + "alpha" + to_string( (int) (alpha * 10) ) + string(skip); fptr = fopen(runs.c_str(), "w"); if (fptr == NULL) { printf("Could not open file\n"); cout << runs << endl; assert(0); } fprintf(fptr, "{\"SF\":%u,\"cumulated_time\":%.2f,\"execution_time\":%.2f,\"merging_time\":%.2f,\"optimization_time\":%.2f,\"prepare time\":%.2f,\"nvlink traffic\":%.2f,\"shuffle time\":%.2f,\"gpu kernel time\":%.2f}\n", \ SF, time, execution_time, merging_time, optimization_time, preparing_time, nvlink_traffic, shuffle_time, gpu_total_time); fclose(fptr); } if (dist == None && repl_policy == aware && isopt) { string opt; if (!latemat && !adaptive && !reorder) opt = "logs/runs/opt/NoOpt" + to_string(NUM_GPU) + "GPUs" + to_string(SF)+ string(skip); else if (!latemat && adaptive && !reorder) opt = "logs/runs/opt/Adaptive" + to_string(NUM_GPU) + "GPUs"+ to_string(SF)+ string(skip); else if (!latemat && adaptive && reorder) opt = "logs/runs/opt/Reorder" + to_string(NUM_GPU) + "GPUs"+ to_string(SF)+ string(skip); else if (latemat && adaptive && reorder) opt = "logs/runs/opt/Latemat" + to_string(NUM_GPU) + "GPUs" + to_string(SF)+ string(skip); else assert(0); fptr = fopen(opt.c_str(), "w"); if (fptr == NULL) { printf("Could not open file\n"); assert(0); } fprintf(fptr, "{\"SF\":%u,\"cumulated_time\":%.2f,\"execution_time\":%.2f,\"merging_time\":%.2f,\"optimization_time\":%.2f,\"prepare time\":%.2f,\"nvlink traffic\":%.2f,\"shuffle time\":%.2f,\"gpu kernel time\":%.2f}\n", \ SF, time, execution_time, merging_time, optimization_time, preparing_time, nvlink_traffic, shuffle_time, gpu_total_time); fclose(fptr); } } else if (input.compare("3") == 0) { repl_policy = aware; bool opt = !adaptive || !reorder || !latemat; cm->ShuffleAware(dist, opt); cout << "Renewing metadata" << endl; cgp->renewMetadata(); cout << "Metadata renewed" << endl; } else if (input.compare("4") == 0) { repl_policy = partitioning; cm->PartitioningOnly(dist); cout << "Renewing metadata" << endl; cgp->renewMetadata(); cout << "Metadata renewed" << endl; } else if (input.compare("5") == 0) { repl_policy = replication; cm->ReplicationOnly(dist); cout << "Renewing metadata" << endl; cgp->renewMetadata(); cout << "Metadata renewed" << endl; } else if (input.compare("clear") == 0) { repl_policy = none; cgp->cm->deleteAll(); cout << "Renewing metadata" << endl; cgp->renewMetadata(); cout << "Metadata renewed" << endl; } else if (input.compare("latemat") == 0) { if (latemat == 0) latemat = 2; else latemat = 0; qp->latemat = latemat; if (latemat != 0) cout << "Late materialization is enabled" << endl; else cout << "Late materialization is disabled" << endl; cout << endl; } else if (input.compare("reorder") == 0) { reorder = !reorder; cgp->reorder = reorder; if (reorder) cout << "Join reordering is enabled" << endl; else cout << "Join reordering is disabled" << endl; cout << endl; } else if (input.compare("adaptive") == 0) { adaptive = !adaptive; qp->adaptive = adaptive; if (adaptive) cout << "Adaptive execution is enabled" << endl; else cout << "Adaptive execution is disabled" << endl; cout << endl; } else if (input.compare("skipping") == 0) { skipping = !skipping; cgp->skipping = skipping; cgp->qo->skipping = skipping; qp->skipping = skipping; if (skipping) cout << "Segment skipping is enabled" << endl; else cout << "Segment skipping is disabled" << endl; cout << endl; } else if (input.compare("zipf") == 0) { dist = Zipf; qp->qo->setDistributionZipfian(alpha, alpha1); qp->dist = Zipf; cout << "Zipf distribution is enabled" << endl; cout << endl; } else if (input.compare("none") == 0) { dist = None; qp->qo->setDistributionZipfian(alpha, alpha1); qp->dist = None; cout << "No distribution is enabled" << endl; cout << endl; } else { exit = true; } } for(int gpu = 0; gpu < ALL_GPU; gpu++) { ncclCommDestroy(comms[gpu]); } } //change ssb utils //change common //change minmax.sh //recompile minmax.cpp, minmax_newbench.cpp, minmaxsort.cpp //recompile gen_synthetic_bench //recompile main_multi_gpu