#include "QueryProcessing.h" #include "CacheManager.h" #include "QueryOptimizer.h" #include "MultiGPUProcessing.h" // #include "common.h" int queries[13] = {11, 12, 13, 21, 22, 23, 31, 32, 33, 34, 41, 42, 43}; void QueryProcessing::executeTableDim(int table_id, int sg) { int* h_off_col = NULL; int*** h_off_col_part = new int**[NUM_GPU](); int** d_total = new int*[NUM_GPU](); int** h_total = new int*[NUM_GPU](); int*** off_col = new int**[NUM_GPU](); int* h_total_all = NULL; bool filter_on_gpu = 1; for (int gpu = 0; gpu < NUM_GPU; gpu++) { h_off_col_part[gpu] = new int*[NUM_TABLE](); off_col[gpu] = new int*[NUM_TABLE](); h_total[gpu] = (int*) cm->customCudaHostAlloc<int>(1); memset(h_total[gpu], 0, sizeof(int)); d_total[gpu] = (int*) cm->customCudaMalloc<int>(1, gpu); } h_total_all = (int*) cm->customCudaHostAlloc<int>(1); memset(h_total_all, 0, sizeof(int)); // cout << "dim " << sg << endl; if (sg == 0 || sg == 1) { if (qo->joinCPUcheck[table_id] && qo->joinGPUcheck[table_id]) { // assert(sg != 0); // cgp->call_bfilter_CPU(params, h_off_col, h_total_all, sg, table_id); // cgp->switch_device_dim(d_off_col, h_off_col, d_total, h_total_all, sg, 0, table_id, streams[sg]); // cgp->call_build_GPU(params, d_off_col, h_total_all, sg, table_id, streams[sg]); // cgp->call_build_CPU(params, h_off_col, h_total_all, sg, table_id); // assert(0); if (sg == 1) { if (params->ht_replicated[table_id]) { cgp->call_bfilter_CPU(params, h_off_col, h_total_all, sg, table_id); cgp->switchDeviceCPUtoGPUBroadcastDim(h_total_all, h_total, off_col, h_off_col, table_id, sg, streams[sg]); } else { cgp->call_filter_partition_CPU(params, h_off_col_part, h_total, sg, table_id); cgp->switchDeviceCPUtoGPUScatter(h_total, off_col, h_off_col_part, sg, streams[sg]); } filter_on_gpu = 0; } cgp->call_operator_dim_GPU(params, off_col, d_total, h_total, sg, table_id, filter_on_gpu, broadcast[table_id], already_partitioned[table_id], streams[sg]); cgp->call_bfilter_build_CPU(params, h_off_col, h_total_all, sg, table_id); } else if (qo->joinCPUcheck[table_id] && !(qo->joinGPUcheck[table_id])) { cgp->call_bfilter_build_CPU(params, h_off_col, h_total_all, sg, table_id); // assert(0); } else if (!(qo->joinCPUcheck[table_id]) && qo->joinGPUcheck[table_id]) { assert(sg != 0); // cgp->call_bfilter_CPU(params, h_off_col, h_total_all, sg, table_id); // cgp->switch_device_dim(d_off_col, h_off_col, d_total, h_total_all, sg, 0, table_id, streams[sg]); // cgp->call_build_GPU(params, d_off_col, h_total_all, sg, table_id, streams[sg]); if (sg == 1) { if (params->ht_replicated[table_id]) { cgp->call_bfilter_CPU(params, h_off_col, h_total_all, sg, table_id); cgp->switchDeviceCPUtoGPUBroadcastDim(h_total_all, h_total, off_col, h_off_col, table_id, sg, streams[sg]); } else { cgp->call_filter_partition_CPU(params, h_off_col_part, h_total, sg, table_id); cgp->switchDeviceCPUtoGPUScatter(h_total, off_col, h_off_col_part, sg, streams[sg]); } filter_on_gpu = 0; } cgp->call_operator_dim_GPU(params, off_col, d_total, h_total, sg, table_id, filter_on_gpu, broadcast[table_id], already_partitioned[table_id], streams[sg]); // assert(0); } } else if (sg == 2 || sg == 3) { if (qo->joinGPUcheck[table_id]) { assert(sg != 2); // cgp->call_bfilter_build_GPU(params, d_off_col, h_total_all, sg, table_id, streams[sg]); // assert(0); cgp->call_operator_dim_GPU(params, off_col, d_total, h_total, sg, table_id, filter_on_gpu, broadcast[table_id], already_partitioned[table_id], streams[sg]); } if (qo->joinCPUcheck[table_id]) { cgp->call_bfilter_build_CPU(params, h_off_col, h_total_all, sg, table_id); // assert(0); } } else { assert(0); } } void QueryProcessing::executeTableFactSimplified(int sg) { int** d_total = new int*[NUM_GPU](); int** h_total = new int*[NUM_GPU](); int*** off_col = new int**[NUM_GPU](); int** h_off_col = NULL; int* h_total_all = new int[1](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { off_col[gpu] = new int*[NUM_TABLE](); for (int table = 0; table < NUM_TABLE; table++) { off_col[gpu][table] = NULL; } h_total[gpu] = (int*) cm->customCudaHostAlloc<int>(1); memset(h_total[gpu], 0, sizeof(int)); d_total[gpu] = (int*) cm->customCudaMalloc<int>(1, gpu); } memset(h_total_all, 0, sizeof(int)); int pipeline = 0; if (verbose) printf("fact sg = %d\n", sg); if (qo->selectCPUPipelineCol[sg].size() > 0) { if (qo->joinGPUPipelineCol[sg].size() > 0) { cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); h_off_col = new int*[NUM_TABLE](); cgp->switchDeviceGPUtoCPU(h_total_all, h_total, off_col, h_off_col, sg, streams[sg]); // cgp->call_pfilter_CPU(params, h_off_col, h_total_all, sg, qo->selectGPUPipelineCol[sg].size()); // cgp->call_aggregation_CPU(params, h_off_col[0], h_total_all, sg); cgp->call_pfilter_aggr_CPU(params, h_off_col, h_total_all, sg, qo->selectGPUPipelineCol[sg].size()); // assert(0); } else { //everything on CPU if (qo->groupby_build.size() == 0) cgp->call_pfilter_probe_aggr_CPU(params, h_off_col, h_total_all, sg, 0); else assert(0); } // assert(0); } else { if (qo->selectGPUPipelineCol[sg].size() > 0 && qo->joinGPUPipelineCol[sg].size() > 0) { //filter, probe, aggr on GPU if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() > 0) { if (qo->groupby_build.size() == 0) { // if (pipeline) cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); // cgp->call_pfilter_probe_aggr_GPU(params, off_col, h_total_all, sg, 0, streams[sg]); } else assert(0); // filter on GPU, part of join and groupby on CPU } else if (qo->joinCPUPipelineCol[sg].size() > 0) { // cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, 2, streams[sg]); // cgp->call_pfilter_probe_GPU(params, off_col, d_total, h_total_all, sg, 0, streams[sg]); // cgp->switch_device_fact(off_col, h_off_col, d_total, h_total_all, sg, 1, 0, streams[sg]); // if (qo->groupby_build.size() == 0) cgp->call_probe_aggr_CPU(params, h_off_col, h_total_all, sg); // else cgp->call_probe_group_by_CPU(params, h_off_col, h_total_all, sg); assert(0); // filter and join on GPU, groupby on CPU } else if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() == 0) { // if (pipeline) cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); // cgp->call_pfilter_probe_GPU(params, off_col, d_total, h_total_all, sg, 0, streams[sg]); h_off_col = new int*[NUM_TABLE](); cgp->switchDeviceGPUtoCPU(h_total_all, h_total, off_col, h_off_col, sg, streams[sg]); if (qo->groupby_build.size() == 0) cgp->call_aggregation_CPU(params, h_off_col[0], h_total_all, sg); else cgp->call_group_by_CPU(params, h_off_col, h_total_all, sg); // assert(0); } } else if (qo->selectGPUPipelineCol[sg].size() == 0 && qo->joinGPUPipelineCol[sg].size() > 0) { //join and groupby on GPU if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() > 0) { // if (pipeline) cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); // if (qo->groupby_build.size() == 0) cgp->call_probe_aggr_GPU(params, off_col, h_total_all, sg, streams[sg]); // else cgp->call_probe_group_by_GPU(params, off_col, h_total_all, sg, streams[sg]); // assert(0); //part of join on GPU, part of join on CPU, groupby on CPU } else if (qo->joinCPUPipelineCol[sg].size() > 0) { // if (pipeline) cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); // cgp->call_probe_GPU(params, off_col, d_total, h_total_all, sg, streams[sg]); h_off_col = new int*[NUM_TABLE](); // cout << "here " << endl; cgp->switchDeviceGPUtoCPU(h_total_all, h_total, off_col, h_off_col, sg, streams[sg]); if (qo->groupby_build.size() == 0) cgp->call_probe_aggr_CPU(params, h_off_col, h_total_all, sg); else cgp->call_probe_group_by_CPU(params, h_off_col, h_total_all, sg); // assert(0); // join on GPU, groupby on CPU } else if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() == 0) { // if (pipeline) cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]); // cgp->call_probe_GPU(params, off_col, d_total, h_total_all, sg, streams[sg]); h_off_col = new int*[NUM_TABLE](); cgp->switchDeviceGPUtoCPU(h_total_all, h_total, off_col, h_off_col, sg, streams[sg]); if (qo->groupby_build.size() == 0) cgp->call_aggregation_CPU(params, h_off_col[0], h_total_all, sg); else cgp->call_group_by_CPU(params, h_off_col, h_total_all, sg); // assert(0); } } else if (qo->selectGPUPipelineCol[sg].size() > 0 && qo->joinGPUPipelineCol[sg].size() == 0) { if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() > 0) { assert(0); //filter on GPU, join and groupby on CPU } else if (qo->joinCPUPipelineCol[sg].size() > 0) { cgp->call_filter_GPU(params, off_col, d_total, h_total, sg, 0, 0, streams[sg]); h_off_col = new int*[NUM_TABLE](); cgp->switchDeviceGPUtoCPU(h_total_all, h_total, off_col, h_off_col, sg, streams[sg]); if (qo->groupby_build.size() == 0) cgp->call_probe_aggr_CPU(params, h_off_col, h_total_all, sg); else cgp->call_probe_group_by_CPU(params, h_off_col, h_total_all, sg); // assert(0); } else if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() == 0) { assert(0); } } else if (qo->selectGPUPipelineCol[sg].size() == 0 && qo->joinGPUPipelineCol[sg].size() == 0) { if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() > 0) { assert(0); //join and groupby on CPU } else if (qo->joinCPUPipelineCol[sg].size() > 0) { if (qo->groupby_build.size() == 0) cgp->call_probe_aggr_CPU(params, h_off_col, h_total_all, sg); else cgp->call_probe_group_by_CPU(params, h_off_col, h_total_all, sg); // assert(0); } else if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() == 0) { assert(0); } } } } void QueryProcessing::runQuery(CUcontext ctx) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); for (int i = 0; i < qo->join.size(); i++) { int table_id = qo->join[i].second->table_id; //WARNING: MIGHT HAVE TO DO BROADCAST HERE BEFORE GOING INTO PARALLEL FOR // cgp->sg_broadcast_count = 0; if (NUM_GPU > 1 && qo->joinGPUcheck[table_id]) { cout << table_id << " " << qo->par_segment_count[table_id] << endl; assert(qo->par_segment_count[table_id] == 1); } parallel_for(short(0), qo->par_segment_count[table_id], [=](short j){ CUcontext poppedCtx; cuCtxPushCurrent(ctx); //NCCL DOES NOT SUPPORT PARALLEL FOR // for (short j = 0; j < qo->par_segment_count[table_id]; j++) { // for (short j = qo->par_segment_count[table_id]-1; j >= 0; j--) { int sg = qo->par_segment[table_id][j]; if (verbose) { cout << qo->join[i].second->column_name << endl; printf("sg = %d\n", sg); } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaStreamCreate(&streams[sg][gpu])); } CubDebugExit(cudaSetDevice(0)); if (qo->segment_group_count[table_id][sg] > 0) { executeTableDim(table_id, sg); } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaStreamSynchronize(streams[sg][gpu])); CubDebugExit(cudaStreamDestroy(streams[sg][gpu])); } CubDebugExit(cudaSetDevice(0)); cuCtxPopCurrent(&poppedCtx); }); // } // CubDebugExit(cudaDeviceSynchronize()); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Total dim table time " << time << endl; // cgp->execution_total += time; cudaEventRecord(start, 0); cout << qo->par_segment_count[0] << endl; parallel_for(short(0), qo->par_segment_count[0], [=](short i){ //NCCL DOES NOT SUPPORT PARALLEL FOR // for (int i = 0; i < qo->par_segment_count[0]; i++) { CUcontext poppedCtx; cuCtxPushCurrent(ctx); int sg = qo->par_segment[0][i]; // if (sg != 5) continue; cout << endl; cout << "Currently sg = " << sg << endl; for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaStreamCreate(&streams[sg][gpu])); } CubDebugExit(cudaSetDevice(0)); float time_; cudaEvent_t start_, stop_; cudaEventCreate(&start_); cudaEventCreate(&stop_); cudaEventRecord(start_, 0); if (qo->segment_group_count[0][sg] > 0) { executeTableFactSimplified(sg); } cudaEventRecord(stop_, 0); cudaEventSynchronize(stop_); cudaEventElapsedTime(&time_, start_, stop_); if (verbose) cout << "sg = " << sg << " non demand time = " << time_ << endl; for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaStreamSynchronize(streams[sg][gpu])); CubDebugExit(cudaStreamDestroy(streams[sg][gpu])); } CubDebugExit(cudaSetDevice(0)); cuCtxPopCurrent(&poppedCtx); }); // } // CubDebugExit(cudaDeviceSynchronize()); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Total fact table time " << time << endl; // cgp->execution_total += time; // cout << "Result:" << endl; // int res_count = 0; // for (int i=0; i< params->total_val; i++) { // if (params->res[6*i+4] != 0) { // cout << params->res[6*i] << " " << params->res[6*i+1] << " " << params->res[6*i+2] << " " << params->res[6*i+3] << " " << reinterpret_cast<unsigned long long*>(¶ms->res[6*i+4])[0] << endl; // res_count++; // } // } // cout << "Res count = " << res_count << endl; cudaEventRecord(start, 0); // int* resGPU; // if (custom) resGPU = (int*) cm->customCudaHostAlloc<int>(params->total_val * 6); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // CubDebugExit(cudaSetDevice(gpu)); // // else CubDebugExit(cudaHostAlloc((void**) &resGPU, params->total_val * 6 * sizeof(int), cudaHostAllocDefault)); // CubDebugExit(cudaMemcpy(resGPU, params->d_res[gpu], params->total_val * 6 * sizeof(int), cudaMemcpyDeviceToHost)); // cgp->gpu_to_cpu_total += (params->total_val * 6 * sizeof(int)); // CubDebugExit(cudaSetDevice(0)); // //check this again if this is right or not // merge(params->res, resGPU, params->total_val); // } int** resGPU = new int*[NUM_GPU](); int lead_gpu = 0; for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (gpu == lead_gpu) resGPU[gpu] = params->d_res[gpu]; else resGPU[gpu] = (int*) cm->customCudaMalloc<int>(params->total_val * 6, lead_gpu); } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaStreamCreate(&streams[0][gpu])); CubDebugExit(cudaSetDevice(0)); } if (NUM_GPU == 8) { ncclGroupStart(); for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (gpu == lead_gpu) { for (int r=0; r < NUM_GPU; r++) { ncclRecv(resGPU[r], params->total_val * 6, ncclInt, r, comms[gpu], streams[0][gpu]); } } ncclSend(params->d_res[gpu], params->total_val * 6, ncclInt, lead_gpu, comms[gpu], streams[0][gpu]); cgp->nvlink_total += params->total_val * 6; } ncclGroupEnd(); } else { for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (gpu != lead_gpu) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(resGPU[gpu], params->d_res[gpu], params->total_val * 6 * sizeof(int), cudaMemcpyDeviceToDevice, streams[0][gpu])); cgp->nvlink_total += params->total_val * 6; CubDebugExit(cudaSetDevice(0)); } } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CHECK_ERROR_STREAM(streams[0][gpu]); CubDebugExit(cudaSetDevice(0)); } // if (qo->par_segment_count[0] > 1) { int* resCPU = (int*) cm->customCudaHostAlloc<int>(params->total_val * 6); int** d_resGPU = (int**) cm->customCudaMalloc<int*>(NUM_GPU, lead_gpu); CubDebugExit(cudaSetDevice(lead_gpu)); CubDebugExit(cudaMemcpyAsync(d_resGPU, resGPU, NUM_GPU * sizeof(int*), cudaMemcpyHostToDevice, streams[0][lead_gpu])); mergeGPU<NUM_GPU><<<(params->total_val + 128 - 1)/128, 128, 0, streams[0][lead_gpu]>>>(d_resGPU, params->total_val, lead_gpu); CubDebugExit(cudaMemcpyAsync(resCPU, params->d_res[lead_gpu], params->total_val * 6 * sizeof(int), cudaMemcpyDeviceToHost, streams[0][lead_gpu])); CHECK_ERROR_STREAM(streams[0][lead_gpu]) CubDebugExit(cudaSetDevice(0)); merge(params->res, resCPU, params->total_val); // } else { // //not all CPU // if (qo->par_segment[0][0] != 0) { // // cudaEventRecord(start, 0); // // cout << " Here " << endl; // int* resCPU = (int*) cm->customCudaHostAlloc<int>(params->total_val * 6); // int** d_resGPU = (int**) cm->customCudaMalloc<int*>(NUM_GPU, lead_gpu); // CubDebugExit(cudaSetDevice(lead_gpu)); // CubDebugExit(cudaMemcpyAsync(d_resGPU, resGPU, NUM_GPU * sizeof(int*), cudaMemcpyHostToDevice, streams[0][lead_gpu])); // mergeGPU<NUM_GPU><<<(params->total_val + 128 - 1)/128, 128, 0, streams[0][lead_gpu]>>>(d_resGPU, params->total_val, lead_gpu); // CubDebugExit(cudaMemcpyAsync(resCPU, params->d_res[lead_gpu], params->total_val * 6 * sizeof(int), cudaMemcpyDeviceToHost, streams[0][lead_gpu])); // CHECK_ERROR_STREAM(streams[0][lead_gpu]); // params->res = resCPU; // CubDebugExit(cudaSetDevice(0)); // // cudaEventRecord(stop, 0); // // cudaEventSynchronize(stop); // // cudaEventElapsedTime(&time, start, stop); // // if (verbose) cout << "Merge time " << time << endl; // } // } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CHECK_ERROR_STREAM(streams[0][gpu]); CubDebugExit(cudaStreamDestroy(streams[0][gpu])); CubDebugExit(cudaSetDevice(0)); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Merge time " << time << endl; cgp->merging_total += time; } double QueryProcessing::processQuery(CUcontext ctx) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); qo->parseQuery(query); qo->prepareQuery(query, dist); params = qo->params; cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); cgp->preparing_total += time; if (verbose) { cout << "Query Prepare Time 1: " << time << endl; cout << endl; } cudaEventRecord(start, 0); qo->prepareOperatorPlacement(dist); for (int tbl = 0; tbl < qo->join.size(); tbl++) { qo->groupBitmapSegmentTable(qo->join[tbl].second->table_id, query, broadcast[qo->join[tbl].second->table_id]); } //WARNING: THE FACT TABLE HAS TO BE LAST (ALWAYS!!!!) qo->groupBitmapSegmentTable(0, query, broadcast[0]); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); cgp->optimization_total += time; if (verbose) { cout << "Query Optimization Time: " << time << endl; cout << endl; } cudaEventRecord(start, 0); qo->prepareQuery2(query, dist); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); cgp->preparing_total += time; if (verbose) { cout << "Query Prepare Time 2: " << time << endl; cout << endl; } cudaEventRecord(start, 0); runQuery(); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); // if (verbose) { // cout << "Total query running time: " << time << endl; // cout << endl; // } cgp->execution_total += time; for (int sg = 0 ; sg < MAX_GROUPS; sg++) { cgp->cpu_to_gpu_total += cgp->cpu_to_gpu[sg]; cgp->gpu_to_cpu_total += cgp->gpu_to_cpu[sg]; cgp->nvlink_total += cgp->nvlink_bytes[sg]; cgp->shuffle_total += cgp->shuffle_time[sg]; cgp->gpu_total += cgp->gpu_time[sg]; cgp->cpu_total += cgp->cpu_time[sg]; } if (verbose) { cout << "Result:" << endl; int res_count = 0; for (int i=0; i< params->total_val; i++) { // for (int i=0; i< 500; i++) { if (params->res[6*i+4] != 0) { cout << params->res[6*i] << " " << params->res[6*i+1] << " " << params->res[6*i+2] << " " << params->res[6*i+3] << " " << reinterpret_cast<unsigned long long*>(¶ms->res[6*i+4])[0] << endl; res_count++; } } cout << "Res count = " << res_count << endl; cout << "Query Execution Time: " << time << endl; cout << endl; } // updateStatsQuery(); if (params->compare1[cm->lo_orderdate] >= 19960101) count_zipfian++; qo->clearPlacement(); endQuery(); qo->clearParsing(); return cgp->execution_total + cgp->preparing_total + cgp->optimization_total; }; void QueryProcessing::ShuffleAwareExec() { if (NUM_GPU == 1) { broadcast[0] = false; broadcast[1] = false; broadcast[2] = false; broadcast[3] = false; broadcast[4] = false; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = false; already_partitioned[3] = false; already_partitioned[4] = false; return; } //original SSB if (SF % 10 == 0) { assert(dist != Zipf); broadcast[0] = false; broadcast[1] = true; broadcast[2] = true; broadcast[3] = true; broadcast[4] = true; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = false; already_partitioned[3] = false; already_partitioned[4] = false; return; } if (dist == Zipf) { if (SF == 322 && NUM_GPU == 8) { broadcast[0] = false; broadcast[1] = true; broadcast[2] = true; broadcast[3] = false; broadcast[4] = true; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = false; already_partitioned[3] = true; already_partitioned[4] = false; } else if ((SF == 162 && NUM_GPU >= 4) || (SF == 82 && NUM_GPU >=2)) { broadcast[0] = false; broadcast[1] = true; broadcast[2] = true; broadcast[3] = true; broadcast[4] = true; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = false; already_partitioned[3] = false; already_partitioned[4] = false; } else { broadcast[0] = false; broadcast[1] = true; broadcast[2] = false; broadcast[3] = false; broadcast[4] = true; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = true; already_partitioned[3] = true; already_partitioned[4] = false; } } else { if (adaptive) { broadcast[0] = false; broadcast[1] = true; broadcast[2] = false; broadcast[3] = false; broadcast[4] = true; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = true; already_partitioned[3] = true; already_partitioned[4] = false; } else { broadcast[0] = false; broadcast[1] = false; broadcast[2] = false; broadcast[3] = false; broadcast[4] = true; already_partitioned[0] = false; already_partitioned[1] = true; already_partitioned[2] = true; already_partitioned[3] = true; already_partitioned[4] = false; } } for (int table = 1; table < NUM_TABLE; table++) { if (broadcast[table] == false && already_partitioned[table] == false) { assert(cm->allTable[table]->total_segment % NUM_GPU == 0 && cm->allTable[table]->total_segment / NUM_GPU >= NUM_GPU && (cm->allTable[table]->total_segment / NUM_GPU) % NUM_GPU == 0); } } } void QueryProcessing::PartitioningOnlyExec() { if (SF % 10 == 0) assert(0); if (NUM_GPU == 1) { broadcast[0] = false; broadcast[1] = false; broadcast[2] = false; broadcast[3] = false; broadcast[4] = false; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = false; already_partitioned[3] = false; already_partitioned[4] = false; return; } if (SF == 322) { broadcast[0] = false; broadcast[1] = true; broadcast[2] = false; broadcast[3] = false; broadcast[4] = true; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = false; already_partitioned[3] = false; already_partitioned[4] = false; } else if (SF == 162) { broadcast[0] = false; broadcast[1] = true; if (query == 33 || query == 34) broadcast[2] = false; else broadcast[2] = true; // if (NUM_GPU < 4 || query == 42 || query == 43) broadcast[3] = false; // else broadcast[3] = true; broadcast[3] = false; broadcast[4] = true; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = false; already_partitioned[3] = false; already_partitioned[4] = false; } else if (SF == 82) { broadcast[0] = false; broadcast[1] = true; if (query == 32 || query == 33 || query == 34) broadcast[2] = false; else broadcast[2] = true; if (query == 42 || query == 43) broadcast[3] = false; else broadcast[3] = true; broadcast[4] = true; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = false; already_partitioned[3] = false; already_partitioned[4] = false; } else { // if (query == 22 || query == 23 || query == 31|| query == 32 || query == 33 || query == 34 || query == 42 || query == 43) broadcast[1] = false; // else broadcast[1] = true; broadcast[0] = false; broadcast[1] = true; if (query == 33 || query == 34) broadcast[2] = false; else broadcast[2] = true; if (query == 42 || query == 43) broadcast[3] = false; else broadcast[3] = true; broadcast[4] = true; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = false; already_partitioned[3] = false; already_partitioned[4] = false; } for (int table = 1; table < NUM_TABLE; table++) { if (broadcast[table] == false && already_partitioned[table] == false) { // assert(cm->allTable[table]->total_segment % NUM_GPU == 0 && cm->allTable[table]->total_segment / NUM_GPU >= NUM_GPU // && (cm->allTable[table]->total_segment / NUM_GPU) % NUM_GPU == 0); assert(cm->allTable[table]->total_segment % NUM_GPU == 0 && cm->allTable[table]->total_segment / NUM_GPU >= NUM_GPU); } } } void QueryProcessing::ReplicationOnlyExec() { if (SF % 10 == 0) assert(0); broadcast[0] = false; broadcast[1] = false; broadcast[2] = false; broadcast[3] = false; broadcast[4] = false; already_partitioned[0] = false; already_partitioned[1] = false; already_partitioned[2] = false; already_partitioned[3] = false; already_partitioned[4] = false; } void QueryProcessing::percentageData() { double fraction[NUM_QUERIES] = {0}; cout << endl; for (int k = 0; k < NUM_QUERIES; k++) { int cur_query = queries[k]; qo->parseQuery(cur_query); int total = 0; int cached = 0; for (int i = 0; i < qo->querySelectColumn.size(); i++) { ColumnInfo* column = qo->querySelectColumn[i]; total += column->total_segment; cached += column->tot_seg_in_GPU; } for (int i = 0; i < qo->queryBuildColumn.size(); i++) { ColumnInfo* column = qo->queryBuildColumn[i]; total += column->total_segment; cached += column->tot_seg_in_GPU; } for (int i = 0; i < qo->queryProbeColumn.size(); i++) { ColumnInfo* column = qo->queryProbeColumn[i]; total += column->total_segment; cached += column->tot_seg_in_GPU; } for (int i = 0; i < qo->queryGroupByColumn.size(); i++) { ColumnInfo* column = qo->queryGroupByColumn[i]; total += column->total_segment; cached += column->tot_seg_in_GPU; } for (int i = 0; i < qo->queryAggrColumn.size(); i++) { ColumnInfo* column = qo->queryAggrColumn[i]; total += column->total_segment; cached += column->tot_seg_in_GPU; } fraction[k] = cached*1.0/total; cout << "Query " << cur_query << " fraction: " << fraction[k] << " total: " << total << " cached: " << cached << endl; qo->clearParsing(); } cout << endl; } void QueryProcessing::endQuery() { qo->clearPrepare(); // qo->clearVector(); cm->resetPointer(); cgp->resetCGP(); // cgp->resetTime(); } void QueryProcessing::updateStatsQuery() { chrono::high_resolution_clock::time_point cur_time = chrono::high_resolution_clock::now(); chrono::duration<double> timestamp = cur_time - cgp->begin_time; double time_count = logical_time; logical_time += 20; for (int i = 0; i < qo->join.size(); i++) { for (int col = 0; col < qo->select_build[qo->join[i].second].size(); col++) { cm->updateColumnTimestamp(qo->select_build[qo->join[i].second][col], time_count++); ColumnInfo* column = qo->select_build[qo->join[i].second][col]; for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id]; cm->updateSegmentTimeDirect(column, segment, time_count); cm->updateSegmentFreqDirect(column, segment); } } cm->updateColumnTimestamp(qo->join[i].second, time_count++); ColumnInfo* column = qo->join[i].second; for (int seg_id = 0; seg_id < column->total_segment; seg_id++) { Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id]; cm->updateSegmentTimeDirect(column, segment, time_count); cm->updateSegmentFreqDirect(column, segment); } } parallel_for(short(0), qo->par_segment_count[0], [=](short i){ double par_time_count = time_count; int sg = qo->par_segment[0][i]; for (int col = 0; col < qo->selectCPUPipelineCol[sg].size(); col++) { ColumnInfo* column = qo->selectCPUPipelineCol[sg][col]; cm->updateColumnTimestamp(column, par_time_count++); for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) { int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg]; Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id]; cm->updateSegmentTimeDirect(column, segment, par_time_count); cm->updateSegmentFreqDirect(column, segment); } } for (int col = 0; col < qo->selectGPUPipelineCol[sg].size(); col++) { ColumnInfo* column = qo->selectGPUPipelineCol[sg][col]; cm->updateColumnTimestamp(column, par_time_count++); for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) { int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg]; Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id]; cm->updateSegmentTimeDirect(column, segment, par_time_count); cm->updateSegmentFreqDirect(column, segment); } } for (int col = 0; col < qo->joinGPUPipelineCol[sg].size(); col++) { ColumnInfo* column = qo->joinGPUPipelineCol[sg][col]; cm->updateColumnTimestamp(column, par_time_count++); for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) { int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg]; Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id]; cm->updateSegmentTimeDirect(column, segment, par_time_count); cm->updateSegmentFreqDirect(column, segment); } } for (int col = 0; col < qo->joinCPUPipelineCol[sg].size(); col++) { ColumnInfo* column = qo->joinCPUPipelineCol[sg][col]; cm->updateColumnTimestamp(column, par_time_count++); for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) { int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg]; Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id]; cm->updateSegmentTimeDirect(column, segment, par_time_count); cm->updateSegmentFreqDirect(column, segment); } } for (int col = 0; col < qo->queryGroupByColumn.size(); col++) { ColumnInfo* column = qo->queryGroupByColumn[col]; cm->updateColumnTimestamp(column, par_time_count++); for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) { int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg]; Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id]; cm->updateSegmentTimeDirect(column, segment, par_time_count); cm->updateSegmentFreqDirect(column, segment); } } for (int col = 0; col < qo->queryAggrColumn.size(); col++) { ColumnInfo* column = qo->queryAggrColumn[col]; cm->updateColumnTimestamp(column, par_time_count++); for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) { int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg]; Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id]; cm->updateSegmentTimeDirect(column, segment, par_time_count); cm->updateSegmentFreqDirect(column, segment); } } }); for (int i = 0; i < qo->querySelectColumn.size(); i++) { ColumnInfo* column = qo->querySelectColumn[i]; cm->updateColumnFrequency(column); cm->updateColumnWeightDirect(column, qo->speedup[query][column]); } for (int i = 0; i < qo->queryBuildColumn.size(); i++) { ColumnInfo* column = qo->queryBuildColumn[i]; cm->updateColumnFrequency(column); cm->updateColumnWeightDirect(column, qo->speedup[query][column]); } for (int i = 0; i < qo->queryProbeColumn.size(); i++) { ColumnInfo* column = qo->queryProbeColumn[i]; cm->updateColumnFrequency(column); cm->updateColumnWeightDirect(column, qo->speedup[query][column]); } for (int i = 0; i < qo->queryGroupByColumn.size(); i++) { ColumnInfo* column = qo->queryGroupByColumn[i]; cm->updateColumnFrequency(column); cm->updateColumnWeightDirect(column, qo->speedup[query][column]); } for (int i = 0; i < qo->queryAggrColumn.size(); i++) { ColumnInfo* column = qo->queryAggrColumn[i]; cm->updateColumnFrequency(column); cm->updateColumnWeightDirect(column, qo->speedup[query][column]); } } void QueryProcessing::dumpTrace(string filename) { int data_size = 0; int cached_data = 0; for (int i = 1; i < cm->TOT_COLUMN; i++) { data_size += cm->allColumn[i]->total_segment; cached_data += cm->allColumn[i]->tot_seg_in_GPU; } FILE *fptr = fopen(filename.c_str(), "w"); if (fptr == NULL) { printf("Could not open file\n"); assert(0); } fprintf(fptr, "===========================\n"); fprintf(fptr, "======= CACHE INFO ======\n"); fprintf(fptr, "===========================\n"); fprintf(fptr, "\n"); fprintf(fptr, "Segment size: %d\n", SEGMENT_SIZE); fprintf(fptr, "Cache size: %d\n", cm->cache_total_seg); fprintf(fptr, "Data size: %d\n", data_size); fprintf(fptr, "Cached data: %d\n", cached_data); fprintf(fptr, "\n"); for (int i = 1; i < cm->TOT_COLUMN; i++) { fprintf(fptr,"%s: %d/%d segments cached\n", cm->allColumn[i]->column_name.c_str(), cm->allColumn[i]->tot_seg_in_GPU, cm->allColumn[i]->total_segment); } fprintf(fptr, "\n"); fprintf(fptr, "\n"); fprintf(fptr, "\n"); for (int i = 0; i < NUM_QUERIES; i++) { fprintf(fptr, "===========================\n"); fprintf(fptr, "======== QUERY %d ========\n", queries[i]); fprintf(fptr, "===========================\n"); qo->parseQuery(queries[i]); qo->prepareQuery(queries[i], dist); int* t_segment = new int[cm->TOT_COLUMN](); int* t_c_segment = new int[cm->TOT_COLUMN](); int total_cached = 0, total_touched = 0, total_cached_touched = 0; countTouchedSegment(0, t_segment, t_c_segment); for (int tbl = 0; tbl < qo->join.size(); tbl++) { countTouchedSegment(qo->join[tbl].second->table_id, t_segment, t_c_segment); } for (int col = 0; col < cm->TOT_COLUMN; col++) { total_cached+=cm->allColumn[col]->tot_seg_in_GPU; total_touched+=t_segment[col]; total_cached_touched+=t_c_segment[col]; } fprintf(fptr, "\n"); fprintf(fptr,"Segment cached: %d\n", total_cached); fprintf(fptr,"Segment touched: %d\n", total_touched); fprintf(fptr,"Segment cached and touched: %d\n", total_cached_touched); fprintf(fptr, "\n"); for (int col = 0; col < cm->TOT_COLUMN; col++) { if (t_segment[col] > 0) { fprintf(fptr, "\n"); fprintf(fptr,"%s\n", cm->allColumn[col]->column_name.c_str()); fprintf(fptr,"Speedup: %.3f\n", qo->speedup[queries[i]][cm->allColumn[col]]); fprintf(fptr,"Segment cached: %d\n", cm->allColumn[col]->tot_seg_in_GPU); fprintf(fptr,"Segment touched: %d\n", t_segment[col]); fprintf(fptr,"Segment cached and touched: %d\n", t_c_segment[col]); fprintf(fptr, "\n"); } } fprintf(fptr, "\n"); fprintf(fptr, "\n"); fprintf(fptr, "\n"); delete[] t_segment; delete[] t_c_segment; endQuery(); qo->clearParsing(); } fclose(fptr); } void QueryProcessing::countTouchedSegment(int table_id, int* t_segment, int* t_c_segment) { int total_segment = cm->allColumn[cm->columns_in_table[table_id][0]]->total_segment; for (int i = 0; i < total_segment; i++) { if (qo->checkPredicate(table_id, i)) { for (int j = 0; j < qo->queryColumn[table_id].size(); j++) { ColumnInfo* column = qo->queryColumn[table_id][j]; t_segment[column->column_id]++; if (cm->segment_bitmap[column->column_id][i]) t_c_segment[column->column_id]++; } } } }