#include "MultiGPUProcessing.h" #include "CacheManager.h" #include "KernelLaunch.h" #include "QueryOptimizer.h" //naive approach //launch shuffling kernel in each gpu //for each gpu //check each receiving buffer if ready to launch kernel //if ready then transfer and launch kernel in the destination gpu //have a cpu thread for each gpu checking //if the input buffer is full //if there is available output buffer //if both are satisfied, launch a kernel for the particular input buffer //have a cpu thread for each gpu checking //if the output buffer is full //if there is available input buffer on destination gpu //if both are satisfied, memcpy the buffer //extension: //have separate buffer for each kernel type (easy) //have a condensed and global output buffer for all kernel (kernel has to write the output intelligently to the correct buffer) //have a single kernel reading from multiple buffer (if available) //multiple kernel from the same and different operator can run at the same time (each buffer has separate stream) //buffer full mechanism (tricks: if there are conservatively not enough output buffer, then don't launch the kernel) CPUGPUProcessing::CPUGPUProcessing(size_t _cache_size, size_t _broadcast_size, size_t _processing_size, size_t _pinned_memsize, ncclComm_t* _comms, bool _verbose, bool _skipping, bool _reorder) { skipping = _skipping; reorder = _reorder; custom = true; if (custom) qo = new QueryOptimizer(_cache_size, _broadcast_size, _processing_size, _pinned_memsize, this); else qo = new QueryOptimizer(_cache_size, 0, 0, 0, this); cm = qo->cm; begin_time = chrono::high_resolution_clock::now(); col_idx = new int**[NUM_GPU]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { col_idx[gpu] = new int*[cm->TOT_COLUMN](); } all_col_idx = new int***[NUM_GPU]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { all_col_idx[gpu] = new int**[NUM_GPU]; for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) { all_col_idx[gpu][gpu_iter] = new int*[NUM_COLUMN](); for (int column = 0; column < NUM_COLUMN; column++) { CubDebugExit(cudaSetDevice(gpu)); //cannot do custom malloc since custom malloc will be reset after every query and this metadata persists across queries CubDebugExit(cudaMalloc((void**) &all_col_idx[gpu][gpu_iter][column], cm->allColumn[column]->total_segment * sizeof(int))); CubDebugExit(cudaSetDevice(0)); } } } seg_row_to_gpu = new int**[NUM_GPU]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { seg_row_to_gpu[gpu] = new int*[NUM_TABLE](); for (int table = 0; table < NUM_TABLE; table++) { CubDebugExit(cudaSetDevice(gpu)); //cannot do custom malloc since custom malloc will be reset after every query and this metadata persists across queries CubDebugExit(cudaMalloc((void**) &seg_row_to_gpu[gpu][table], cm->allTable[table]->total_segment * sizeof(int))); CubDebugExit(cudaSetDevice(0)); } } verbose = _verbose; cpu_time = new double[MAX_GROUPS]; gpu_time = new double[MAX_GROUPS]; nvlink_bytes = new double[MAX_GROUPS]; shuffle_time = new double[MAX_GROUPS]; cpu_to_gpu = new unsigned long long[MAX_GROUPS]; gpu_to_cpu = new unsigned long long[MAX_GROUPS]; comms = _comms; //THIS IS USED FOR BROADCAST broadcast_idx = new int**[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { broadcast_idx[gpu] = new int*[cm->TOT_COLUMN](); for (int col = 0; col < NUM_COLUMN; col++) { broadcast_idx[gpu][col] = (int*) malloc (cm->allColumn[col]->total_segment * sizeof(int)); memset(broadcast_idx[gpu][col], -1, cm->allColumn[col]->total_segment * sizeof(int)); } } sg_broadcast_count = 0; resetTime(); } void CPUGPUProcessing::switchDeviceGPUtoCPU(int* h_total_all, int** h_total, int*** off_col, int** h_off_col, int sg, cudaStream_t* stream) { assert(h_total != NULL); assert(h_total_all != NULL); assert(off_col != NULL); assert(h_off_col != NULL); SETUP_TIMING(); float time; cudaEventRecord(start, 0); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(off_col[gpu] != NULL); // assert(*(h_total[gpu]) > 0); *(h_total_all) += *(h_total[gpu]); } if (*(h_total_all) == 0) { return; } for (int table = 0; table < NUM_TABLE; table++) { if (off_col[0][table] != NULL) { h_off_col[table] = (int*) cm->customCudaHostAlloc<int>(*h_total_all); } } assert(off_col != NULL); for (int table = 0; table < NUM_TABLE; table++) { int temp = 0; for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(off_col[gpu] != NULL); if (off_col[gpu][table] != NULL) { assert(h_off_col[table] != NULL); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(h_off_col[table] + temp, off_col[gpu][table], *(h_total[gpu]) * sizeof(int), cudaMemcpyDeviceToHost, stream[gpu])); CubDebugExit(cudaSetDevice(0)); temp += *(h_total[gpu]); } } if (off_col[0][table] != NULL) assert(temp == *h_total_all); } cout << *h_total_all << endl; for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CHECK_ERROR_STREAM(stream[gpu]); CubDebugExit(cudaSetDevice(0)); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Transferred data from GPU to CPU " << time << endl; } void CPUGPUProcessing::switchDeviceCPUtoGPUScatter(int** h_total, int*** off_col, int*** h_off_col_part, int sg, cudaStream_t* stream) { assert(h_total != NULL); SETUP_TIMING(); float time; cudaEventRecord(start, 0); int h_total_all = 0; for (int gpu = 0; gpu < NUM_GPU; gpu++) { h_total_all += *(h_total[gpu]); } if (h_total_all == 0) return; assert(off_col != NULL); assert(h_off_col_part != NULL); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(h_off_col_part[gpu] != NULL); assert(off_col[gpu] != NULL); for (int table = 0; table < NUM_TABLE; table++) { if (h_off_col_part[gpu][table] != NULL) { if (*(h_total[gpu]) > 0) off_col[gpu][table] = (int*) cm->customCudaMalloc<int>(*(h_total[gpu]), gpu); } } } for (int table = 0; table < NUM_TABLE; table++) { for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (off_col[gpu][table] != NULL) { assert(h_off_col_part[gpu][table] != NULL); CubDebugExit(cudaSetDevice(gpu)); if (*(h_total[gpu]) > 0) CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], h_off_col_part[gpu][table], *(h_total[gpu]) * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); } } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CHECK_ERROR_STREAM(stream[gpu]); CubDebugExit(cudaSetDevice(0)); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Transferred data from CPU to GPU " << time << endl; } void CPUGPUProcessing::switchDeviceCPUtoGPUBroadcast(int* h_total_all, int** h_total, int*** off_col, int** h_off_col, int sg, cudaStream_t* stream) { assert(h_total_all != NULL); SETUP_TIMING(); float time; cudaEventRecord(start, 0); if (*(h_total_all) == 0) return; assert(off_col != NULL); assert(h_off_col != NULL); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(off_col[gpu] != NULL); for (int table = 0; table < NUM_TABLE; table++) { if (h_off_col[table] != NULL) { off_col[gpu][table] = (int*) cm->customCudaMalloc<int>(*(h_total_all), gpu); } } } // for (int table = 0; table < NUM_TABLE; table++) { // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // if (off_col[gpu][table] != NULL) { // assert(h_off_col[table] != NULL); // CubDebugExit(cudaSetDevice(gpu)); // CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], h_off_col[table], *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); // CubDebugExit(cudaSetDevice(0)); // } // } // } int lead_gpu = 0; for (int table = 0; table < NUM_TABLE; table++) { CubDebugExit(cudaSetDevice(lead_gpu)); CubDebugExit(cudaMemcpyAsync(off_col[lead_gpu][table], h_off_col, *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[lead_gpu])); CubDebugExit(cudaSetDevice(0)); } CubDebugExit(cudaSetDevice(lead_gpu)); CHECK_ERROR_STREAM(stream[lead_gpu]); CubDebugExit(cudaSetDevice(0)); for (int table = 0; table < NUM_TABLE; table++) { for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (gpu != lead_gpu && off_col[gpu][table] != NULL) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], off_col[lead_gpu][table], *(h_total_all) * sizeof(int), cudaMemcpyDeviceToDevice, stream[gpu])); nvlink_bytes[sg] += *(h_total_all); CubDebugExit(cudaSetDevice(0)); } } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CHECK_ERROR_STREAM(stream[gpu]); CubDebugExit(cudaSetDevice(0)); *(h_total[gpu]) = *(h_total_all); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Transferred data from CPU to GPU " << time << endl; } void CPUGPUProcessing::switchDeviceCPUtoGPUBroadcastDim(int* h_total_all, int** h_total, int*** off_col, int* h_off_col, int table, int sg, cudaStream_t* stream) { assert(h_total_all != NULL); SETUP_TIMING(); float time; cudaEventRecord(start, 0); if (*(h_total_all) == 0) return; assert(off_col != NULL); assert(h_off_col != NULL); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(off_col[gpu] != NULL); off_col[gpu][table] = (int*) cm->customCudaMalloc<int>(*(h_total_all), gpu); } // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // if (off_col[gpu][table] != NULL) { // CubDebugExit(cudaSetDevice(gpu)); // CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], h_off_col, *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); // CubDebugExit(cudaSetDevice(0)); // } // } int lead_gpu = 0; CubDebugExit(cudaSetDevice(lead_gpu)); CubDebugExit(cudaMemcpyAsync(off_col[lead_gpu][table], h_off_col, *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[lead_gpu])); CHECK_ERROR_STREAM(stream[lead_gpu]); CubDebugExit(cudaSetDevice(0)); for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (gpu != lead_gpu && off_col[gpu][table] != NULL) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], off_col[lead_gpu][table], *(h_total_all) * sizeof(int), cudaMemcpyDeviceToDevice, stream[gpu])); nvlink_bytes[sg] += *(h_total_all); CubDebugExit(cudaSetDevice(0)); } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CHECK_ERROR_STREAM(stream[gpu]); CubDebugExit(cudaSetDevice(0)); *(h_total[gpu]) = *(h_total_all); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Transferred data from CPU to GPU " << time << endl; } void CPUGPUProcessing::call_filter_partition_CPU(QueryParams* params, int*** h_off_col_part, int** h_total, int sg, int table) { ColumnInfo* temp; assert(h_total != NULL); for (int i = 0; i < qo->join.size(); i++) { if (qo->join[i].second->table_id == table) { temp = qo->join[i].second; break; } } if (qo->select_build[temp].size() == 0) return; ColumnInfo* column = qo->select_build[temp][0]; int* filter_col = column->col_ptr; int* output_estimate = new int[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { output_estimate[gpu] = qo->segment_group_each_gpu_count[gpu][table][sg] * SEGMENT_SIZE * params->selectivity[column]; } SETUP_TIMING(); float time; for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (output_estimate[gpu] > 0) { h_off_col_part[gpu][table] = (int*) cm->customCudaHostAlloc<int>(output_estimate[gpu]); } } cudaEventRecord(start, 0); int LEN; if (sg == qo->last_segment[table]) { LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE; } else { LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE; } struct filterArgsCPU fargs = { filter_col, NULL, params->compare1[column], params->compare2[column], 0, 0, params->mode[column], 0, params->map_filter_func_host[column], NULL }; short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(h_total[gpu] != NULL); } filter_partition_CPU(fargs, h_off_col_part, LEN, h_total, table, cm->seg_row_to_single_gpu, 0, segment_group_ptr); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(*(h_total[gpu]) <= output_estimate[gpu]); assert(*(h_total[gpu]) > 0); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Filter Partition Kernel time CPU: " << time << endl; cpu_time[sg] += time; }; void CPUGPUProcessing::shuffleDataOpt(int*** &off_col, int** h_total, struct shuffleHelper* shelper, bool first_shuffle, int sg, cudaStream_t* stream) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); assert(NUM_PARTITION == NUM_GPU); //send to the correct GPU //count total partition size and update h_total for calling subsequent operator int* total_partition_size = new int[NUM_PARTITION](); for (int partition = 0; partition < NUM_PARTITION; partition++) { for (int gpu = 0; gpu < NUM_GPU; gpu++) { total_partition_size[partition] += shelper->result_count[gpu][partition]; if (verbose) cout << "result count in gpu " << gpu << " for partition " << partition << " is " << shelper->result_count[gpu][partition] << endl; // assert(shelper->result_count[gpu][partition] > 0); } *(h_total[partition]) = total_partition_size[partition]; // cout << "total partition size for partition " << partition << " is " << total_partition_size[partition] << endl; } for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int col = 0; col < NUM_COLUMN; col++) { if (shelper->out_shuffle_col[gpu][col] != NULL && shelper->temp_col[gpu][col] == NULL) { // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle if (verbose) cout << "allocate temp col gpu " << gpu << " col " << col << " " << total_partition_size[gpu] * MULTIPLIER << endl; shelper->temp_col[gpu][col] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu); } } for (int table = 0; table < NUM_TABLE; table++) { // if (shelper->out_shuffle_off[gpu][table] == NULL) cout << "out_shuffle " << gpu << " " << table << endl; // if (off_col[gpu][table] != NULL) cout << "off col " << gpu << " " << table << endl; if (shelper->out_shuffle_off[gpu][table] != NULL && off_col[gpu][table] == NULL) { // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle if (verbose) cout << "allocate temp off gpu " << gpu << " table " << table << " " << total_partition_size[gpu] * MULTIPLIER << endl; off_col[gpu][table] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu); } } } cudaStream_t** streamopt = new cudaStream_t*[NUM_GPU]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { streamopt[gpu] = new cudaStream_t[NUM_PARTITION]; CubDebugExit(cudaSetDevice(gpu)); for (int partition = 0; partition < NUM_PARTITION; partition++) { CubDebugExit(cudaStreamCreate(&streamopt[gpu][partition])); } } CubDebugExit(cudaSetDevice(0)); //cudamemcpy from one GPU to all the other GPU (all to all communication) //gpu is the destination gpu, partition is the source gpu for (int col = 0; col < NUM_COLUMN; col++) { if (shelper->out_shuffle_col[0][col] != NULL) { cout << "Transferring column " << col << endl; for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(shelper->temp_col[gpu][col] != NULL); int temp = 0; for (int partition = 0; partition < NUM_PARTITION; partition++) { if (shelper->out_shuffle_col[partition][col][gpu] != NULL){ CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(shelper->temp_col[gpu][col] + temp, shelper->out_shuffle_col[partition][col][gpu], shelper->result_count[partition][gpu] * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[partition][gpu])); if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu]; CubDebugExit(cudaSetDevice(0)); temp += shelper->result_count[partition][gpu]; // cout << "partition " << partition << " gpu " << gpu << " offset " << temp << endl; } } } } } for (int table = 0; table < NUM_TABLE; table++) { if (shelper->out_shuffle_off[0][table] != NULL) { cout << "Transferring offset " << table << endl; for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(off_col[gpu][table] != NULL); int temp = 0; for (int partition = 0; partition < NUM_PARTITION; partition++) { if (shelper->out_shuffle_off[partition][table][gpu] != NULL){ CubDebugExit(cudaSetDevice(gpu)); // cout << "From " << partition << " To " << gpu << " " << shelper->result_count[partition][gpu] << endl; CubDebugExit(cudaMemcpyAsync(off_col[gpu][table] + temp, shelper->out_shuffle_off[partition][table][gpu], shelper->result_count[partition][gpu] * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[partition][gpu])); if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu]; CubDebugExit(cudaSetDevice(0)); temp += shelper->result_count[partition][gpu]; } } } } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); for (int partition = 0; partition < NUM_PARTITION; partition++) { CHECK_ERROR_STREAM(streamopt[gpu][partition]); CubDebugExit(cudaStreamDestroy(streamopt[gpu][partition])); } } CubDebugExit(cudaSetDevice(0)); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Shuffle done" << endl; } void CPUGPUProcessing::shuffleDataNCCL(int*** &off_col, int** h_total, struct shuffleHelper* shelper, bool first_shuffle, int sg, cudaStream_t* stream) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); assert(NUM_PARTITION == NUM_GPU); //send to the correct GPU int** scan_result_count = new int*[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { scan_result_count[gpu] = new int[NUM_PARTITION](); for (int partition = 0; partition < NUM_PARTITION; partition++) { if (partition == 0) scan_result_count[gpu][partition] = 0; else scan_result_count[gpu][partition] = scan_result_count[gpu][partition - 1] + shelper->result_count[partition - 1][gpu]; } } //count total partition size and update h_total for calling subsequent operator int* total_partition_size = new int[NUM_PARTITION](); for (int partition = 0; partition < NUM_PARTITION; partition++) { for (int gpu = 0; gpu < NUM_GPU; gpu++) { total_partition_size[partition] += shelper->result_count[gpu][partition]; if (verbose) cout << "result count in gpu " << gpu << " for partition " << partition << " is " << shelper->result_count[gpu][partition] << endl; assert(shelper->result_count[gpu][partition] > 0); } *(h_total[partition]) = total_partition_size[partition]; // cout << "total partition size for partition " << partition << " is " << total_partition_size[partition] << endl; } for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int col = 0; col < NUM_COLUMN; col++) { if (shelper->out_shuffle_col[gpu][col] != NULL && shelper->temp_col[gpu][col] == NULL) { // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle shelper->temp_col[gpu][col] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu); } } for (int table = 0; table < NUM_TABLE; table++) { if (shelper->out_shuffle_off[gpu][table] != NULL && off_col[gpu][table] == NULL) { // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle off_col[gpu][table] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu); } } } ncclGroupStart(); for (int col = 0; col < NUM_COLUMN; col++) { if (shelper->out_shuffle_col[0][col] != NULL) { if (verbose) cout << "Transferring column " << col << endl; //asserting pointers // for(int gpu = 0; gpu < NUM_GPU; gpu++) { // assert(shelper->temp_col[gpu][col] != NULL); // for(int partition = 0; partition < NUM_PARTITION; partition++) { // assert(shelper->out_shuffle_col[gpu][col][partition] != NULL); // } // } //use nccl all2all communication // ncclGroupStart(); for(int gpu = 0; gpu < NUM_GPU; gpu++) { for(int partition = 0; partition < NUM_PARTITION; partition++) { ncclSend(shelper->out_shuffle_col[gpu][col][partition], shelper->result_count[gpu][partition], ncclInt, partition, comms[gpu], stream[gpu]); ncclRecv(shelper->temp_col[gpu][col] + scan_result_count[gpu][partition], shelper->result_count[partition][gpu], ncclInt, partition, comms[gpu], stream[gpu]); if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu]; } } // ncclGroupEnd(); } } ncclGroupEnd(); ncclGroupStart(); for (int table = 0; table < NUM_TABLE; table++) { if (shelper->out_shuffle_off[0][table] != NULL) { if (verbose) cout << "Transferring offset " << table << endl; //asserting pointers // for(int gpu = 0; gpu < NUM_GPU; gpu++) { // assert(off_col[gpu][table] != NULL); // for(int partition = 0; partition < NUM_PARTITION; partition++) { // assert(shelper->out_shuffle_off[gpu][table][partition] != NULL); // } // } //use nccl all2all communication // ncclGroupStart(); for(int gpu = 0; gpu < NUM_GPU; gpu++) { for(int partition = 0; partition < NUM_PARTITION; partition++) { ncclSend(shelper->out_shuffle_off[gpu][table][partition], shelper->result_count[gpu][partition], ncclInt, partition, comms[gpu], stream[gpu]); ncclRecv(off_col[gpu][table] + scan_result_count[gpu][partition], shelper->result_count[partition][gpu], ncclInt, partition, comms[gpu], stream[gpu]); if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu]; } } // ncclGroupEnd(); } } ncclGroupEnd(); for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CHECK_ERROR_STREAM(stream[gpu]); CubDebugExit(cudaSetDevice(0)); } // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // CubDebugExit(cudaSetDevice(gpu)); // CubDebugExit(cudaDeviceSynchronize()); // } // CubDebugExit(cudaSetDevice(0)); // assert(0); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Shuffle done" << endl; } void CPUGPUProcessing::broadcastIdxTransfer(int*** d_broadcast_idx, int*** used_broadcast_idx, cudaStream_t* stream){ for (int gpu = 0; gpu < NUM_GPU; gpu++) { d_broadcast_idx[gpu] = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu); int** tmp = new int*[NUM_COLUMN]; for (int col = 0; col < NUM_COLUMN; col++) { if (used_broadcast_idx[gpu][col] != NULL && cm->allColumn[col]->table_id > 0) { //only do this for dimension table tmp[col] = (int*) cm->customCudaMalloc<int>(cm->allColumn[col]->total_segment, gpu); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(tmp[col], used_broadcast_idx[gpu][col], cm->allColumn[col]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); } } CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(d_broadcast_idx[gpu], tmp, NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); } } void CPUGPUProcessing::broadcastSegments(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); int* toBroadcastCount = new int[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { toBroadcastCount[gpu] = 0; } // cout << qo->segment_group_count[table][sg] << endl; for (int i = 0; i < qo->segment_group_count[table][sg]; i++) { int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i]; //check if the segment is replicated //if segment is not replicated (means that it's only in one GPU) if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) { assert(cm->segment_row_to_gpu[table][seg_id].size() == 1); int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (gpu != cur_gpu) { toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id; toBroadcastCount[gpu]++; if (seg_id == cm->allTable[table]->total_segment-1) { // cout << "here" << endl; if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) { broadcast_len[gpu] += SEGMENT_SIZE; } else { broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE; } // cout << gpu << " " << broadcast_len[gpu] << endl; } else { broadcast_len[gpu] += SEGMENT_SIZE; // cout << gpu << " " << broadcast_len[gpu] << endl; } } } } } assert(broadcast_col != NULL); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(broadcast_col[gpu] != NULL); if (toBroadcastCount[gpu] > 0) { for (int col = 0; col < NUM_COLUMN; col++) { if (used_col_idx[gpu][col] != NULL) { int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE; int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED); // cout << gpu << " " << cm->broadcastPointer[gpu] << endl; assert((tmp + size) < cm->each_broadcast_size); broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp; // cout << cm->broadcastPointer[gpu] << endl; } } } } cudaStream_t** streamopt = new cudaStream_t*[NUM_GPU]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { streamopt[gpu] = new cudaStream_t[NUM_GPU]; CubDebugExit(cudaSetDevice(gpu)); for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { CubDebugExit(cudaStreamCreate(&streamopt[gpu][cur_gpu])); } } CubDebugExit(cudaSetDevice(0)); for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (toBroadcastCount[gpu] > 0) { int* src = toBroadcast[gpu]; int* dst = d_toBroadcast[gpu]; CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, streamopt[gpu][0])); CubDebugExit(cudaSetDevice(0)); } } CubDebugExit(cudaSetDevice(0)); for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int col = 0; col < NUM_COLUMN; col++) { //check if the column is used in the query at all (assume that used_col_idx is segment group aware) if (used_col_idx[gpu][col] != NULL) { int temp = 0; for (int i = 0; i < toBroadcastCount[gpu]; i++) { int seg = toBroadcast[gpu][i]; int cur_gpu = cm->seg_row_to_single_gpu[table][seg]; //check if the column actually stored in GPU (for now we assume dimension table is always symmetrical) assert(cm->segment_list[cur_gpu][col][seg] != -1); int64_t idx = cm->segment_list[cur_gpu][col][seg]; int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]); int* dst = broadcast_col[gpu][col] + temp * SEGMENT_SIZE; CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[gpu][cur_gpu])); if (gpu != cur_gpu) nvlink_bytes[sg] += SEGMENT_SIZE; CubDebugExit(cudaSetDevice(0)); int64_t start_idx = (broadcast_col[gpu][col] - cm->gpuCache[gpu])/SEGMENT_SIZE; // cout << start_idx + temp << endl; broadcast_idx[gpu][col][seg] = start_idx + temp; // if (col == 11 || col == 15) cout << "col " << col << " " << broadcast_idx[gpu][col][seg] << endl; temp++; } } } } CubDebugExit(cudaSetDevice(0)); for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { CHECK_ERROR_STREAM(streamopt[gpu][cur_gpu]); CubDebugExit(cudaStreamDestroy(streamopt[gpu][cur_gpu])); } } CubDebugExit(cudaSetDevice(0)); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Broadcast done: " << time << endl; } void CPUGPUProcessing::broadcastSegments2(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); vector<vector<int>> shouldBroadcast; shouldBroadcast.resize(NUM_GPU); int* toBroadcastCount = new int[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { toBroadcastCount[gpu] = 0; } // cout << qo->segment_group_count[table][sg] << endl; for (int i = 0; i < qo->segment_group_count[table][sg]; i++) { int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i]; //check if the segment is replicated //if segment is not replicated (means that it's only in one GPU) if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) { assert(cm->segment_row_to_gpu[table][seg_id].size() == 1); int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { // if (gpu != cur_gpu) { toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id; toBroadcastCount[gpu]++; if (seg_id == cm->allTable[table]->total_segment-1) { if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) { broadcast_len[gpu] += SEGMENT_SIZE; } else { broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE; } } else { broadcast_len[gpu] += SEGMENT_SIZE; } // } } shouldBroadcast[cur_gpu].push_back(seg_id); } } assert(broadcast_col != NULL); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(broadcast_col[gpu] != NULL); if (toBroadcastCount[gpu] > 0) { for (int col = 0; col < NUM_COLUMN; col++) { if (used_col_idx[gpu][col] != NULL) { int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE; int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED); assert((tmp + size) < cm->each_broadcast_size); broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp; } } } } cudaStream_t** streamopt = new cudaStream_t*[NUM_GPU]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { streamopt[gpu] = new cudaStream_t[NUM_GPU]; CubDebugExit(cudaSetDevice(gpu)); for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { CubDebugExit(cudaStreamCreate(&streamopt[gpu][cur_gpu])); } } CubDebugExit(cudaSetDevice(0)); for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (toBroadcastCount[gpu] > 0) { int* src = toBroadcast[gpu]; int* dst = d_toBroadcast[gpu]; CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, streamopt[gpu][0])); CubDebugExit(cudaSetDevice(0)); } } CubDebugExit(cudaSetDevice(0)); // int*** tmp_buffer = new int**[NUM_GPU]; // int scanShouldBroadcast = 0; // for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { // if (shouldBroadcast[cur_gpu].size() > 0) { // // cout << cur_gpu << endl; // tmp_buffer[cur_gpu] = new int*[NUM_COLUMN]; // for (int col = 0; col < NUM_COLUMN; col++) { // // cout << col << endl; // if (used_col_idx[cur_gpu][col] != NULL) { // tmp_buffer[cur_gpu][col] = (int*) cm->customCudaMalloc<int>(shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE, cur_gpu); // for (int i = 0; i < shouldBroadcast[cur_gpu].size(); i++) { // int seg_id = shouldBroadcast[cur_gpu][i]; // assert(cm->segment_list[cur_gpu][col][seg_id] != -1); // int64_t idx = cm->segment_list[cur_gpu][col][seg_id]; // int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]); // int* dst = tmp_buffer[cur_gpu][col] + i * SEGMENT_SIZE; // CubDebugExit(cudaSetDevice(cur_gpu)); // CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][cur_gpu])); // for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) { // // if (cur_gpu != dst_gpu) { // int64_t start_idx = (broadcast_col[dst_gpu][col] - cm->gpuCache[dst_gpu])/SEGMENT_SIZE; // broadcast_idx[dst_gpu][col][seg_id] = start_idx + scanShouldBroadcast + i; // // if (col == 18) cout << "dst gpu " << dst_gpu << " " << seg_id << " " << broadcast_idx[dst_gpu][col][seg_id] << endl; // // } // } // } // } // } // scanShouldBroadcast += shouldBroadcast[cur_gpu].size(); // // cout << endl; // } // } int*** tmp_buffer = new int**[NUM_COLUMN]; for (int col = 0; col < NUM_COLUMN; col++) { if (used_col_idx[0][col] != NULL) { tmp_buffer[col] = new int*[NUM_GPU]; int scanShouldBroadcast = 0; for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { if (shouldBroadcast[cur_gpu].size() > 0) { tmp_buffer[col][cur_gpu] = (int*) cm->customCudaMalloc<int>(shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE, cur_gpu); for (int i = 0; i < shouldBroadcast[cur_gpu].size(); i++) { int seg_id = shouldBroadcast[cur_gpu][i]; assert(cm->segment_list[cur_gpu][col][seg_id] != -1); int64_t idx = cm->segment_list[cur_gpu][col][seg_id]; int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]); int* dst = tmp_buffer[col][cur_gpu] + i * SEGMENT_SIZE; CubDebugExit(cudaSetDevice(cur_gpu)); CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][cur_gpu])); for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) { int64_t start_idx = (broadcast_col[dst_gpu][col] - cm->gpuCache[dst_gpu])/SEGMENT_SIZE; broadcast_idx[dst_gpu][col][seg_id] = start_idx + scanShouldBroadcast + i; } } CubDebugExit(cudaSetDevice(cur_gpu)); CHECK_ERROR_STREAM(streamopt[cur_gpu][cur_gpu]); CubDebugExit(cudaSetDevice(0)); for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) { int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE; CubDebugExit(cudaSetDevice(cur_gpu)); CubDebugExit(cudaMemcpyAsync(dst, tmp_buffer[col][cur_gpu], shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][dst_gpu])); if (dst_gpu != cur_gpu) nvlink_bytes[sg] += shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE; CubDebugExit(cudaSetDevice(0)); } scanShouldBroadcast += shouldBroadcast[cur_gpu].size(); } } } } // for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { // CubDebugExit(cudaSetDevice(cur_gpu)); // CHECK_ERROR_STREAM(streamopt[cur_gpu][cur_gpu]); // CubDebugExit(cudaSetDevice(0)); // } // scanShouldBroadcast = 0; // for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { // if (shouldBroadcast[cur_gpu].size() > 0) { // for (int col = 0; col < NUM_COLUMN; col++) { // //check if the column is used in the query at all (assume that used_col_idx is segment group aware) // if (used_col_idx[cur_gpu][col] != NULL) { // int* src = tmp_buffer[cur_gpu][col]; // for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) { // // if (cur_gpu != dst_gpu) { // // cout << cur_gpu << " " << dst_gpu << endl; // int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE; // CubDebugExit(cudaSetDevice(cur_gpu)); // CubDebugExit(cudaMemcpyAsync(dst, src, shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][dst_gpu])); // CubDebugExit(cudaSetDevice(0)); // // } // } // } // } // scanShouldBroadcast += shouldBroadcast[cur_gpu].size(); // // cout << scanShouldBroadcast << endl; // } // } // CubDebugExit(cudaSetDevice(0)); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // CubDebugExit(cudaSetDevice(gpu)); // for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { // CHECK_ERROR_STREAM(streamopt[gpu][cur_gpu]); // } // } // CubDebugExit(cudaSetDevice(0)); for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { CHECK_ERROR_STREAM(streamopt[gpu][cur_gpu]); CubDebugExit(cudaStreamDestroy(streamopt[gpu][cur_gpu])); } } CubDebugExit(cudaSetDevice(0)); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Broadcast done: " << time << endl; // assert(0); } void CPUGPUProcessing::broadcastSegmentsNCCL2(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res, cudaStream_t* stream) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); vector<vector<int>> shouldBroadcast; shouldBroadcast.resize(NUM_GPU); int* toBroadcastCount = new int[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { toBroadcastCount[gpu] = 0; } // cout << qo->segment_group_count[table][sg] << endl; for (int i = 0; i < qo->segment_group_count[table][sg]; i++) { int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i]; //check if the segment is replicated //if segment is not replicated (means that it's only in one GPU) if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) { assert(cm->segment_row_to_gpu[table][seg_id].size() == 1); int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { // if (gpu != cur_gpu) { toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id; toBroadcastCount[gpu]++; if (seg_id == cm->allTable[table]->total_segment-1) { if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) { broadcast_len[gpu] += SEGMENT_SIZE; } else { broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE; } } else { broadcast_len[gpu] += SEGMENT_SIZE; } // } } shouldBroadcast[cur_gpu].push_back(seg_id); } } assert(broadcast_col != NULL); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(broadcast_col[gpu] != NULL); if (toBroadcastCount[gpu] > 0) { for (int col = 0; col < NUM_COLUMN; col++) { if (used_col_idx[gpu][col] != NULL) { int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE; int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED); assert((tmp + size) < cm->each_broadcast_size); broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp; } } } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (toBroadcastCount[gpu] > 0) { int* src = toBroadcast[gpu]; int* dst = d_toBroadcast[gpu]; CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); } } CubDebugExit(cudaSetDevice(0)); int*** tmp_buffer = new int**[NUM_GPU]; int scanShouldBroadcast = 0; for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { if (shouldBroadcast[cur_gpu].size() > 0) { // cout << cur_gpu << endl; tmp_buffer[cur_gpu] = new int*[NUM_COLUMN]; for (int col = 0; col < NUM_COLUMN; col++) { // cout << col << endl; if (used_col_idx[cur_gpu][col] != NULL) { tmp_buffer[cur_gpu][col] = (int*) cm->customCudaMalloc<int>(shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE, cur_gpu); for (int i = 0; i < shouldBroadcast[cur_gpu].size(); i++) { int seg_id = shouldBroadcast[cur_gpu][i]; assert(cm->segment_list[cur_gpu][col][seg_id] != -1); int64_t idx = cm->segment_list[cur_gpu][col][seg_id]; int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]); int* dst = tmp_buffer[cur_gpu][col] + i * SEGMENT_SIZE; CubDebugExit(cudaSetDevice(cur_gpu)); CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, stream[cur_gpu])); for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) { // if (cur_gpu != dst_gpu) { int64_t start_idx = (broadcast_col[dst_gpu][col] - cm->gpuCache[dst_gpu])/SEGMENT_SIZE; broadcast_idx[dst_gpu][col][seg_id] = start_idx + scanShouldBroadcast + i; // if (col == 18) cout << "dst gpu " << dst_gpu << " " << seg_id << " " << broadcast_idx[dst_gpu][col][seg_id] << endl; // } } } } } scanShouldBroadcast += shouldBroadcast[cur_gpu].size(); // cout << endl; } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CHECK_ERROR_STREAM(stream[gpu]); CubDebugExit(cudaSetDevice(0)); } // for (int col = 0; col < NUM_COLUMN; col++) { // ncclGroupStart(); // if (used_col_idx[0][col] != NULL) { // scanShouldBroadcast = 0; // for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { // if (shouldBroadcast[cur_gpu].size() > 0) { // for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) { // // cudaSetDevice(dst_gpu); // // int* abc = (int*) cm->customCudaMalloc<int>(2000000, dst_gpu); // int* ptr = tmp_buffer[cur_gpu][col]; // int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE; // int count = shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE; // // cout << count << endl; // ncclBroadcast(ptr, dst, count, ncclInt32, cur_gpu, comms[dst_gpu], stream[dst_gpu]); // // CubDebugExit(cudaMemcpyAsync(dst, ptr, count * sizeof(int), cudaMemcpyDeviceToDevice, stream[cur_gpu])); // } // scanShouldBroadcast += shouldBroadcast[cur_gpu].size(); // } // } // } // ncclGroupEnd(); // } ncclGroupStart(); scanShouldBroadcast = 0; for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { if (shouldBroadcast[cur_gpu].size() > 0) { for (int col = 0; col < NUM_COLUMN; col++) { if (used_col_idx[cur_gpu][col] != NULL) { nvlink_bytes[sg] += shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE * (NUM_GPU-1); for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) { // cudaSetDevice(dst_gpu); // int* abc = (int*) cm->customCudaMalloc<int>(2000000, dst_gpu); int* ptr = tmp_buffer[cur_gpu][col]; int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE; int count = shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE; // cout << count << endl; ncclBroadcast(ptr, dst, count, ncclInt32, cur_gpu, comms[dst_gpu], stream[dst_gpu]); // CubDebugExit(cudaMemcpyAsync(dst, ptr, count * sizeof(int), cudaMemcpyDeviceToDevice, stream[cur_gpu])); } } } scanShouldBroadcast += shouldBroadcast[cur_gpu].size(); } } ncclGroupEnd(); for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CHECK_ERROR_STREAM(stream[gpu]); CubDebugExit(cudaSetDevice(0)); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Broadcast done: " << time << endl; // assert(0); } void CPUGPUProcessing::broadcastSegmentsNCCL(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res, cudaStream_t* stream) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); vector<int> shouldBroadcast; int* toBroadcastCount = new int[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { toBroadcastCount[gpu] = 0; } // cout << qo->segment_group_count[table][sg] << endl; for (int i = 0; i < qo->segment_group_count[table][sg]; i++) { int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i]; //check if the segment is replicated //if segment is not replicated (means that it's only in one GPU) if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) { assert(cm->segment_row_to_gpu[table][seg_id].size() == 1); // int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { // if (gpu != cur_gpu) { toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id; toBroadcastCount[gpu]++; if (seg_id == cm->allTable[table]->total_segment-1) { if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) { broadcast_len[gpu] += SEGMENT_SIZE; } else { broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE; } } else { broadcast_len[gpu] += SEGMENT_SIZE; } // } } shouldBroadcast.push_back(seg_id); } } assert(broadcast_col != NULL); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(broadcast_col[gpu] != NULL); if (toBroadcastCount[gpu] > 0) { for (int col = 0; col < NUM_COLUMN; col++) { if (used_col_idx[gpu][col] != NULL) { int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE; int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED); assert((tmp + size) < cm->each_broadcast_size); broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp; } } } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (toBroadcastCount[gpu] > 0) { int* src = toBroadcast[gpu]; int* dst = d_toBroadcast[gpu]; CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); } } CubDebugExit(cudaSetDevice(0)); int**** dest = new int***[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { dest[gpu] = new int**[NUM_COLUMN](); for (int col = 0; col < NUM_COLUMN; col++) { //check if the column is used in the query at all (assume that used_col_idx is segment group aware) if (used_col_idx[gpu][col] != NULL) { int temp = 0; dest[gpu][col] = new int*[cm->allColumn[col]->total_segment](); for (int i = 0; i < toBroadcastCount[gpu]; i++) { int seg_id = toBroadcast[gpu][i]; // int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id]; dest[gpu][col][seg_id] = broadcast_col[gpu][col] + temp * SEGMENT_SIZE; int64_t start_idx = (broadcast_col[gpu][col] - cm->gpuCache[gpu])/SEGMENT_SIZE; broadcast_idx[gpu][col][seg_id] = start_idx + temp; temp++; } } } } ncclGroupStart(); for (int col = 0; col < NUM_COLUMN; col++) { if (used_col_idx[0][col] != NULL) { for (int i = 0; i < shouldBroadcast.size(); i++) { int seg_id = shouldBroadcast[i]; int root = cm->seg_row_to_single_gpu[table][seg_id]; int64_t idx = cm->segment_list[root][col][seg_id]; assert(idx != -1); int* ptr = &(cm->gpuCache[root][idx * SEGMENT_SIZE]); nvlink_bytes[sg] += SEGMENT_SIZE * (NUM_GPU-1); for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (gpu != root) assert(dest[gpu][col][seg_id] != NULL); ncclBroadcast(ptr, dest[gpu][col][seg_id], SEGMENT_SIZE, ncclInt, root, comms[gpu], stream[gpu]); } } } } ncclGroupEnd(); for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CHECK_ERROR_STREAM(stream[gpu]); CubDebugExit(cudaSetDevice(0)); } // ncclGroupStart(); // if (rank == root) { // for (int r=0; r<nranks; r++) // ncclSend(sendbuff[r], size, type, r, comm, stream); // } // ncclRecv(recvbuff, size, type, root, comm, stream); // ncclGroupEnd(); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Broadcast done: " << time << endl; // assert(0); } void CPUGPUProcessing::call_operator_dim_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int table, bool filter_on_gpu, bool broadcast, bool already_partitioned, cudaStream_t* stream) { int* dimkey_idx[NUM_GPU] = {}, *group_idx[NUM_GPU] = {}, *filter_idx[NUM_GPU] = {}; ColumnInfo* key_column = NULL, *filter_col = NULL, *group_col = NULL; int*** used_col_idx = new int**[NUM_GPU]; float output_selectivity = 1.0; bool first_shuffle = true; bool has_shuffled = false; for (int gpu = 0; gpu < NUM_GPU; gpu++) { used_col_idx[gpu] = new int*[NUM_COLUMN](); } for (int i = 0; i < qo->join.size(); i++) { if (qo->join[i].second->table_id == table) { key_column = qo->join[i].second; break; } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (qo->groupby_build.size() > 0 && qo->groupby_build[key_column].size() > 0) { if (qo->groupGPUcheck) { group_col = qo->groupby_build[key_column][0]; cm->indexTransfer(col_idx[gpu], group_col, stream[gpu], gpu, custom); assert(col_idx[gpu][group_col->column_id] != NULL); group_idx[gpu] = col_idx[gpu][group_col->column_id]; used_col_idx[gpu][group_col->column_id] = col_idx[gpu][group_col->column_id]; } } //WARNING: IT'S POSSIBLE THAT THE FILTER IS FROM CPU if (filter_on_gpu) { if (qo->select_build[key_column].size() > 0) { filter_col = qo->select_build[key_column][0]; cm->indexTransfer(col_idx[gpu], filter_col, stream[gpu], gpu, custom); assert(col_idx[gpu][filter_col->column_id] != NULL); filter_idx[gpu] = col_idx[gpu][filter_col->column_id]; used_col_idx[gpu][filter_col->column_id] = col_idx[gpu][filter_col->column_id]; output_selectivity = params->selectivity[filter_col]; } } // cout << "call index transfer for column " << key_column << " in gpu " << gpu << endl; cm->indexTransfer(col_idx[gpu], key_column, stream[gpu], gpu, custom); assert(col_idx[gpu][key_column->column_id] != NULL); cpu_to_gpu[sg] += (key_column->total_segment * sizeof(int)); dimkey_idx[gpu] = col_idx[gpu][key_column->column_id]; used_col_idx[gpu][key_column->column_id] = col_idx[gpu][key_column->column_id]; } //these four structs are helper for shuffling //output of shuffling (column) int*** temp_col = new int**[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { temp_col[gpu] = new int*[NUM_COLUMN](); } int** result_count = new int*[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { result_count[gpu] = new int[NUM_PARTITION](); } //input of partitioning (column) -> only for pipelined int**** in_shuffle_col = new int***[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { in_shuffle_col[gpu] = new int**[NUM_COLUMN](); } //input of partitioning (offset) -> can be global or local depending on needs -> only for pipelined int**** in_shuffle_off = new int***[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { in_shuffle_off[gpu] = new int**[NUM_TABLE](); } //output of partitioning (column) int**** out_shuffle_col = new int***[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { out_shuffle_col[gpu] = new int**[NUM_COLUMN](); } //output of partitioning (offset) -> can be global or local depending on needs int**** out_shuffle_off = new int***[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { out_shuffle_off[gpu] = new int**[NUM_TABLE](); } shuffleHelper* shelper = new shuffleHelper{ result_count, temp_col, NULL, out_shuffle_col, out_shuffle_off, in_shuffle_col, in_shuffle_off }; int** d_seg_is_replicated = new int*[NUM_GPU]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); d_seg_is_replicated[gpu] = (int*) cm->customCudaMalloc<int>(cm->allTable[table]->total_segment, gpu); CubDebugExit(cudaMemcpyAsync(d_seg_is_replicated[gpu], cm->seg_is_replicated[table], cm->allTable[table]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); } filterArgsGPU** fargs = new filterArgsGPU*[NUM_GPU](); buildArgsGPU** bargs = new buildArgsGPU*[NUM_GPU](); probeArgsGPU** pargs = new probeArgsGPU*[NUM_GPU](); groupbyArgsGPU** gargs = new groupbyArgsGPU*[NUM_GPU](); shuffleArgsGPU** sargs = new shuffleArgsGPU*[NUM_GPU](); KernelParams** kparams = new KernelParams*[NUM_GPU](); KernelLaunch** kernellaunch = new KernelLaunch*[NUM_GPU](); //THIS IS USED FOR BROADCAST //broadcast_idx universal to all segment group //broadcast_col local to segment group //broadcast_len local to segment group //toBroadcast local to segment group int** toBroadcast = new int*[NUM_GPU](); //list of segment row that are broadcasted to this GPU int** d_toBroadcast = new int*[NUM_GPU](); //list of segment row that are broadcasted to this GPU int*** broadcast_col = new int**[NUM_GPU](); //storing segments that are broadcasted to this GPU int* broadcast_len = new int[NUM_GPU](); //total segment row that are broadcasted to this GPU for (int gpu = 0; gpu < NUM_GPU; gpu++) { toBroadcast[gpu] = new int[cm->allTable[table]->total_segment](); d_toBroadcast[gpu] = (int*) cm->customCudaMalloc<int>(cm->allTable[table]->total_segment, gpu); broadcast_col[gpu] = new int*[NUM_COLUMN](); broadcast_len[gpu] = 0; } int** tmp_broadcast_idx_key = new int*[NUM_GPU](); int** tmp_broadcast_idx_val = new int*[NUM_GPU](); int** tmp_broadcast_idx_filter = new int*[NUM_GPU](); if (broadcast) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); // broadcastSegments2(sg, table, broadcast_col, used_col_idx, broadcast_len, toBroadcast, d_toBroadcast, params->res); if (NUM_GPU == 8) broadcastSegmentsNCCL(sg, table, broadcast_col, used_col_idx, broadcast_len, toBroadcast, d_toBroadcast, params->res, stream); else broadcastSegments2(sg, table, broadcast_col, used_col_idx, broadcast_len, toBroadcast, d_toBroadcast, params->res); // __atomic_fetch_add(&sg_broadcast_count, 1, __ATOMIC_RELAXED); // while (sg_broadcast_count < qo->par_segment_count[table]) {}; // assert(sg_broadcast_count == qo->par_segment_count[table]); // cout << sg_broadcast_count << endl; //HAVE TO TRANSFER THE IDX AFTER DOING BROADCAST, THE IDX ONLY VALID FOR THIS SEGMENT GROUP (DIFFERENT FROM THE ONE IN call_operator_fact) for (int gpu = 0; gpu < NUM_GPU; gpu++) { tmp_broadcast_idx_key[gpu] = (int*) cm->customCudaMalloc<int>(key_column->total_segment, gpu); if (group_col != NULL) tmp_broadcast_idx_val[gpu] = (int*) cm->customCudaMalloc<int>(group_col->total_segment, gpu); if (filter_col != NULL) tmp_broadcast_idx_filter[gpu] = (int*) cm->customCudaMalloc<int>(filter_col->total_segment, gpu); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(tmp_broadcast_idx_key[gpu], broadcast_idx[gpu][key_column->column_id], key_column->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); if (group_col != NULL) CubDebugExit(cudaMemcpyAsync(tmp_broadcast_idx_val[gpu], broadcast_idx[gpu][group_col->column_id], group_col->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); if (filter_col != NULL) CubDebugExit(cudaMemcpyAsync(tmp_broadcast_idx_filter[gpu], broadcast_idx[gpu][filter_col->column_id], filter_col->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Broadcast time " << time << endl; shuffle_time[sg] += time; } //WARNING set used_col_idx on filter to NULL so that it doesn't participate in partitioning for (int gpu = 0; gpu < NUM_GPU; gpu++) { if (filter_on_gpu && group_col != NULL) { if (filter_col->column_id != group_col->column_id) { if (qo->select_build[key_column].size() > 0) { used_col_idx[gpu][filter_col->column_id] = NULL; } } } } // if current join needs shuffling if (!broadcast && !params->ht_replicated[table] && !already_partitioned) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); assert(NUM_GPU == NUM_PARTITION); // if this is the first join then we will call rangepartitioningkeyvalue for (int gpu = 0; gpu < NUM_GPU; gpu++) { //do shuffling in each GPU //it's possible there is filter before shuffling if (filter_idx[gpu]) { fargs[gpu] = new filterArgsGPU{ filter_idx[gpu], NULL, params->compare1[filter_col], params->compare2[filter_col], 0, 0, params->mode[filter_col], 0, NULL, NULL, NULL, }; } else { fargs[gpu] = new filterArgsGPU{NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL, NULL}; } int** d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu); int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu); int** d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaMemcpyAsync(d_col_idx, used_col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); assert(off_col[gpu] != NULL); if (off_col[gpu][table] != NULL) { CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); } else { d_in_off=NULL; } CubDebugExit(cudaSetDevice(0)); if (verbose) cout << "Partitioning with column id " << key_column->column_id << endl; sargs[gpu] = new shuffleArgsGPU{ d_temp_col, NULL, d_col_idx, d_in_off, NULL, NULL, key_column->column_id, table, params->min_key[key_column], params->max_key[key_column], NULL, NULL, d_seg_is_replicated[gpu], NULL }; kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total); kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, Nope, table, output_selectivity, 0, 0, stream[gpu]); bool pipeline = 0; // kernellaunch[gpu]->preparePartitioningWithoutCount(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, first_shuffle, pipeline); kernellaunch[gpu]->countPartitioning(off_col, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, first_shuffle); kernellaunch[gpu]->preparePartitioningAfterCount(off_col, used_col_idx, qo->joinGPUcheck, first_shuffle); } for (int gpu = 0; gpu < NUM_GPU; gpu++) { kernellaunch[gpu]->launchPartitioning(); } for (int gpu = 0; gpu < NUM_GPU; gpu++) { kernellaunch[gpu]->clearPartitioning(); delete fargs[gpu]; delete sargs[gpu]; delete kparams[gpu]; delete kernellaunch[gpu]; } if (NUM_GPU == 8) shuffleDataNCCL(off_col, h_total, shelper, first_shuffle, sg, stream); else shuffleDataOpt(off_col, h_total, shelper, first_shuffle, sg, stream); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); first_shuffle = false; has_shuffled = true; if (verbose) cout << "Partitioning and Shuffle time " << time << endl; shuffle_time[sg] += time; } SETUP_TIMING(); float time; cudaEventRecord(start, 0); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(params->ht_GPU[gpu][key_column] != NULL); if (filter_idx[gpu] && !has_shuffled) { fargs[gpu] = new filterArgsGPU{ filter_idx[gpu], NULL, params->compare1[filter_col], params->compare2[filter_col], 0, 0, params->mode[filter_col], 0, broadcast_col[gpu][filter_col->column_id], NULL, NULL, }; } else { fargs[gpu] = new filterArgsGPU{NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL, NULL}; } if (broadcast) { bargs[gpu] = new buildArgsGPU{ dimkey_idx[gpu], group_idx[gpu], params->dim_len_GPU[gpu][key_column], params->min_key_GPU[gpu][key_column], params->max_key[key_column], NULL, NULL, tmp_broadcast_idx_key[gpu], (group_col != NULL) ? tmp_broadcast_idx_val[gpu] : NULL, (filter_col != NULL) ? tmp_broadcast_idx_filter[gpu] : NULL }; } else { if (!has_shuffled) { // cout << " here " << endl; // cout << params->dim_len_GPU[gpu][key_column] << " " << params->min_key_GPU[gpu][key_column] << endl;; bargs[gpu] = new buildArgsGPU{ dimkey_idx[gpu], group_idx[gpu], params->dim_len_GPU[gpu][key_column], params->min_key_GPU[gpu][key_column], params->max_key[key_column], NULL, NULL, NULL, NULL, NULL }; } else { bargs[gpu] = new buildArgsGPU{ NULL, NULL, params->dim_len_GPU[gpu][key_column], params->min_key_GPU[gpu][key_column], params->max_key[key_column], temp_col[gpu][key_column->column_id], (group_col != NULL) ? temp_col[gpu][group_col->column_id] : NULL, NULL, NULL, NULL }; // assert(group_col != NULL); // assert(temp_col[gpu][group_col->column_id] != NULL); } } int latemat = 0; bool aggrGPUcheck = 0; kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, NULL, h_total); kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, JustBuild, table, 1, latemat, aggrGPUcheck, stream[gpu]); kernellaunch[gpu]->prepareKernelDim(off_col, key_column, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, table, has_shuffled); kernellaunch[gpu]->launchKernel(has_shuffled, broadcast, d_toBroadcast[gpu], broadcast_len[gpu]); } for (int gpu = 0; gpu < NUM_GPU; gpu++) { int*** temp; int will_shuffle = 0; kernellaunch[gpu]->clearKernel(temp, will_shuffle, has_shuffled); delete fargs[gpu]; delete bargs[gpu]; delete kparams[gpu]; delete kernellaunch[gpu]; } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Build time " << time << endl; gpu_time[sg] += time; delete[] fargs; delete[] bargs; delete[] pargs; delete[] sargs; delete[] gargs; delete[] kparams; delete[] kernellaunch; delete shelper; for (int gpu = 0; gpu < NUM_GPU; gpu++) { delete[] temp_col[gpu]; delete[] result_count[gpu]; delete[] in_shuffle_col[gpu]; delete[] out_shuffle_col[gpu]; delete[] in_shuffle_off[gpu]; delete[] out_shuffle_off[gpu]; delete[] used_col_idx[gpu]; } delete[] temp_col; delete[] result_count; delete[] in_shuffle_col; delete[] in_shuffle_off; delete[] out_shuffle_col; delete[] out_shuffle_off; delete[] d_seg_is_replicated; delete[] toBroadcast; delete[] d_toBroadcast; delete[] broadcast_col; delete[] broadcast_len; delete[] tmp_broadcast_idx_key; delete[] tmp_broadcast_idx_filter; delete[] tmp_broadcast_idx_val; delete[] used_col_idx; }; void CPUGPUProcessing::call_filter_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int table, int select_so_far, cudaStream_t* stream) { if(qo->selectGPUPipelineCol[sg].size() == 0) return; SETUP_TIMING(); float time; cudaEventRecord(start, 0); int _compare1[2] = {0}, _compare2[2] = {0}; int *filter_idx[NUM_GPU][2] = {}; float output_selectivity = 1.0; for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemsetAsync(d_total[gpu], 0, sizeof(int), stream[gpu])); } CubDebugExit(cudaSetDevice(0)); int num_select = qo->selectGPUPipelineCol[sg].size(); int total_select = qo->selectGPUPipelineCol[sg].size() + qo->selectCPUPipelineCol[sg].size(); for (int select = 0; select < num_select; select++) { if (select_so_far == total_select) break; ColumnInfo* column = qo->selectGPUPipelineCol[sg][select]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom); assert(col_idx[gpu][column->column_id] != NULL); filter_idx[gpu][select_so_far + select] = col_idx[gpu][column->column_id]; } _compare1[select_so_far + select] = params->compare1[column]; _compare2[select_so_far + select] = params->compare2[column]; output_selectivity *= params->selectivity[column]; } int has_shuffled = 0; int will_shuffle = 0; filterArgsGPU** fargs = new filterArgsGPU*[NUM_GPU](); buildArgsGPU** bargs = new buildArgsGPU*[NUM_GPU](); probeArgsGPU** pargs = new probeArgsGPU*[NUM_GPU](); groupbyArgsGPU** gargs = new groupbyArgsGPU*[NUM_GPU](); shuffleArgsGPU** sargs = new shuffleArgsGPU*[NUM_GPU](); shuffleHelper* shelper = new shuffleHelper[1](); KernelParams** kparams = new KernelParams*[NUM_GPU](); KernelLaunch** kernellaunch = new KernelLaunch*[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { //do shuffling in each GPU //it's possible there is filter before shuffling fargs[gpu] = new filterArgsGPU{ filter_idx[gpu][0], filter_idx[gpu][1], _compare1[0], _compare2[0], _compare1[1], _compare2[1], 1, 1, NULL, NULL }; int latemat = 0; bool aggrGPUcheck = 0; kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total); kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, JustFilter, table, output_selectivity, latemat, aggrGPUcheck, stream[gpu]); kernellaunch[gpu]->prepareKernelFact(off_col, NULL, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, NULL, NULL, will_shuffle, has_shuffled); kernellaunch[gpu]->launchKernel(has_shuffled); } for (int gpu = 0; gpu < NUM_GPU; gpu++) { kernellaunch[gpu]->clearKernel(off_col, will_shuffle, has_shuffled); delete fargs[gpu]; delete kparams[gpu]; delete kernellaunch[gpu]; } delete[] fargs; delete[] bargs; delete[] pargs; delete[] sargs; delete[] gargs; delete[] kparams; delete[] kernellaunch; delete shelper; cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Filter GPU time " << time << endl; gpu_time[sg] += time; } //NOT SUPPORTED YET: //(1) pipeline filter before partitioning //(2) operator generate global offset (for example CPU operator or filter in GPU) which will be the input of partitioning void CPUGPUProcessing::call_operator_fact_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int select_so_far, int latemat, cudaStream_t* stream) { int _min_key[NUM_GPU][4] = {0}, _dim_len[NUM_GPU][4] = {0}; int _compare1[2] = {0}, _compare2[2] = {0}; int _min_val[4] = {0}, _unique_val[4] = {0}; int *aggr_idx[NUM_GPU][2] = {}, *group_idx[NUM_GPU][4] = {}, *filter_idx[NUM_GPU][2] = {}; float output_selectivity = 1.0; bool first_shuffle = true; bool has_shuffled = false; int group_col_id[MAX_GROUPBY] = {0}; int aggr_col_id[MAX_AGGR] = {0}; int fkey_col_id[MAX_JOIN] = {0}; for (int fkey_col = 0; fkey_col < MAX_JOIN; fkey_col++) { fkey_col_id[fkey_col] = -1; } //initialize group_col_id and aggr_col_id to -1 for (int group = 0; group < MAX_GROUPBY; group++) { group_col_id[group] = -1; } for (int aggr = 0; aggr < MAX_AGGR; aggr++) { aggr_col_id[aggr] = -1; } int*** used_col_idx = new int**[NUM_GPU]; int**** used_all_col_idx = new int***[NUM_GPU](); int*** used_broadcast_idx = new int**[NUM_GPU]; int*** d_broadcast_idx = new int**[NUM_GPU]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { used_col_idx[gpu] = new int*[NUM_COLUMN](); used_broadcast_idx[gpu] = new int*[NUM_COLUMN](); used_all_col_idx[gpu] = new int**[NUM_GPU](); for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) { used_all_col_idx[gpu][gpu_iter] = new int*[NUM_COLUMN](); } } assert(NUM_GPU == NUM_PARTITION); // ColumnInfo* filter_col[2] = {}; if(qo->joinGPUPipelineCol[sg].size() == 0) return; for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemsetAsync(d_total[gpu], 0, sizeof(int), stream[gpu])); } CubDebugExit(cudaSetDevice(0)); int num_join = qo->joinGPUPipelineCol[sg].size(); int num_select = qo->selectGPUPipelineCol[sg].size(); int total_select = qo->selectGPUPipelineCol[sg].size() + qo->selectCPUPipelineCol[sg].size(); bool aggrGPUcheck = qo->groupbyGPUPipelineCol[sg].size() == qo->queryAggrColumn.size(); if (verbose) cout << "aggr gpu check " << aggrGPUcheck << endl; for (int select = 0; select < num_select; select++) { if (select_so_far == total_select) break; ColumnInfo* column = qo->selectGPUPipelineCol[sg][select]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) { assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL); used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id]; } cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom); assert(col_idx[gpu][column->column_id] != NULL); cpu_to_gpu[sg] += (column->total_segment * sizeof(int)); filter_idx[gpu][select_so_far + select] = col_idx[gpu][column->column_id]; // if (latemat != 2) used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id]; } _compare1[select_so_far + select] = params->compare1[column]; _compare2[select_so_far + select] = params->compare2[column]; output_selectivity *= params->selectivity[column]; // filter_col[select_so_far + select] = column; } for (int join = 0; join < num_join; join++) { ColumnInfo* column = qo->joinGPUPipelineCol[sg][join]; int table_id = qo->fkey_pkey[column]->table_id; ColumnInfo* pkey = qo->fkey_pkey[column]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) { assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL); used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id]; } cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom); assert(col_idx[gpu][column->column_id] != NULL); cpu_to_gpu[sg] += (column->total_segment * sizeof(int)); used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id]; _min_key[gpu][table_id - 1] = params->min_key_GPU[gpu][pkey]; _dim_len[gpu][table_id - 1] = params->dim_len_GPU[gpu][pkey]; } } //have to fix cm->lo_orderdate in case we use synthetic bench for (int aggr = 0; aggr < qo->aggregation[cm->lo_orderdate].size(); aggr++) { if (qo->groupGPUcheck && aggrGPUcheck) { ColumnInfo* column = qo->aggregation[cm->lo_orderdate][aggr]; aggr_col_id[aggr] = column->column_id; for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) { assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL); used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id]; } cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom); assert(col_idx[gpu][column->column_id] != NULL); cpu_to_gpu[sg] += (column->total_segment * sizeof(int)); aggr_idx[gpu][aggr] = col_idx[gpu][column->column_id]; if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qo->groupGPUcheck && aggrGPUcheck)) used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id]; used_broadcast_idx[gpu][column->column_id] = broadcast_idx[gpu][column->column_id]; } } } unordered_map<ColumnInfo*, vector<ColumnInfo*>>::iterator it; for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) { if (qo->groupGPUcheck && aggrGPUcheck) { if (it->second.size() > 0) { ColumnInfo* column = it->second[0]; ColumnInfo* column_key = it->first; group_col_id[column_key->table_id - 1] = column->column_id; for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) { assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL); used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id]; } cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom); assert(col_idx[gpu][column->column_id] != NULL); cpu_to_gpu[sg] += (column->total_segment * sizeof(int)); group_idx[gpu][column_key->table_id - 1] = col_idx[gpu][column->column_id]; if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qo->groupGPUcheck && aggrGPUcheck)) used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id]; used_broadcast_idx[gpu][column->column_id] = broadcast_idx[gpu][column->column_id]; } _min_val[column_key->table_id - 1] = params->min_val[column_key]; _unique_val[column_key->table_id - 1] = params->unique_val[column_key]; // cout << column->column_id << endl; } } else { if (it->second.size() > 0) { ColumnInfo* column = it->second[0]; ColumnInfo* column_key = it->first; group_col_id[column_key->table_id - 1] = column->column_id; } } } //WARNING: aggr_col_id and group_col_id represent slightly different information //aggr_col_id != -1 meaning that segment group will do agregation on that column on GPU //group_col_id != -1 meaning that there will be groupby on that column at some point (doesn't matter whether its in CPU or GPU) //WARNING: HOW DO WE KNOW THAT SEG IS REPLICATED FOR THE OTHER TABLES??? broadcastIdxTransfer(d_broadcast_idx, used_broadcast_idx, stream); int** d_seg_is_replicated = new int*[NUM_GPU]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { int table_id = 0; CubDebugExit(cudaSetDevice(gpu)); d_seg_is_replicated[gpu] = (int*) cm->customCudaMalloc<int>(cm->allTable[table_id]->total_segment, gpu); CubDebugExit(cudaMemcpyAsync(d_seg_is_replicated[gpu], cm->seg_is_replicated[table_id], cm->allTable[table_id]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); // cout << "seg is replicated " << cm->seg_is_replicated[4][0] << endl; // assert(cm->seg_is_replicated[4][0] == 1); } //these four structs are helper for shuffling //output of shuffling (column) int*** temp_col = new int**[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { temp_col[gpu] = new int*[NUM_COLUMN](); } //output of shuffling (offset) -> can be global or local depending on needs //NOW THIS IS REPLACED BY off_col // int*** temp_off = new int**[NUM_GPU](); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // temp_off[gpu] = new int*[NUM_TABLE](); // if (off_col != NULL) { // for (int table = 0; table < NUM_TABLE; table++) { // temp_off[gpu][table] = off_col[gpu][table]; // } // } // } int** result_count = new int*[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { result_count[gpu] = new int[NUM_PARTITION](); } //offset output of local join int*** join_result_off = new int**[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { join_result_off[gpu] = new int*[NUM_TABLE](); } //input of partitioning (column) -> only for pipelined int**** in_shuffle_col = new int***[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { in_shuffle_col[gpu] = new int**[NUM_COLUMN](); } //input of partitioning (offset) -> can be global or local depending on needs -> only for pipelined int**** in_shuffle_off = new int***[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { in_shuffle_off[gpu] = new int**[NUM_TABLE](); } //output of partitioning (column) int**** out_shuffle_col = new int***[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { out_shuffle_col[gpu] = new int**[NUM_COLUMN](); } //output of partitioning (offset) -> can be global or local depending on needs int**** out_shuffle_off = new int***[NUM_GPU](); for (int gpu = 0; gpu < NUM_GPU; gpu++) { out_shuffle_off[gpu] = new int**[NUM_TABLE](); } shuffleHelper* shelper = new shuffleHelper{ result_count, temp_col, join_result_off, out_shuffle_col, out_shuffle_off, in_shuffle_col, in_shuffle_off }; vector<tuple<ColumnInfo*, ColumnInfo*>> pipeline_column; vector<ColumnInfo*> temp_pipeline; vector<ColumnInfo*> temp_pipeline2; if (reorder) { for (int join = 0; join < num_join; join++) { ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join]; ColumnInfo* pkey = qo->fkey_pkey[fkey]; if (!params->ht_replicated[pkey->table_id]) { // cout << pkey->column_name << endl; temp_pipeline2.push_back(fkey); // qo->joinGPUPipelineCol[sg].erase( qo->joinGPUPipelineCol[sg].begin() + join ); } else { temp_pipeline.push_back(fkey); } } qo->joinGPUPipelineCol[sg].clear(); for (int i = 0; i < temp_pipeline.size(); i++) { qo->joinGPUPipelineCol[sg].push_back(temp_pipeline[i]); } for (int i = 0; i < temp_pipeline2.size(); i++) { qo->joinGPUPipelineCol[sg].push_back(temp_pipeline2[i]); } } else { for (int join = 0; join < num_join; join++) { ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join]; ColumnInfo* pkey = qo->fkey_pkey[fkey]; if (fkey == cm->lo_suppkey) { // cout << pkey->column_name << endl; temp_pipeline2.push_back(fkey); // qo->joinGPUPipelineCol[sg].erase( qo->joinGPUPipelineCol[sg].begin() + join ); } else { temp_pipeline.push_back(fkey); } } qo->joinGPUPipelineCol[sg].clear(); for (int i = 0; i < temp_pipeline.size(); i++) { qo->joinGPUPipelineCol[sg].push_back(temp_pipeline[i]); } for (int i = 0; i < temp_pipeline2.size(); i++) { qo->joinGPUPipelineCol[sg].push_back(temp_pipeline2[i]); } } for (int join = 0; join < qo->joinGPUPipelineCol[sg].size(); join++) { cout << qo->joinGPUPipelineCol[sg][join]->column_name << endl; } // assert(0); for (int join = 0; join < num_join; join++) { filterArgsGPU** fargs = new filterArgsGPU*[NUM_GPU](); buildArgsGPU** bargs = new buildArgsGPU*[NUM_GPU](); probeArgsGPU** pargs = new probeArgsGPU*[NUM_GPU](); groupbyArgsGPU** gargs = new groupbyArgsGPU*[NUM_GPU](); shuffleArgsGPU** sargs = new shuffleArgsGPU*[NUM_GPU](); KernelParams** kparams = new KernelParams*[NUM_GPU](); KernelLaunch** kernellaunch = new KernelLaunch*[NUM_GPU](); ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join]; int table_id = qo->fkey_pkey[fkey]->table_id; ColumnInfo* pkey = qo->fkey_pkey[fkey]; int next_table_id = -1; ColumnInfo* next_fkey = NULL, *next_pkey = NULL; if (join != num_join - 1) { next_fkey = qo->joinGPUPipelineCol[sg][join+1]; next_pkey = qo->fkey_pkey[next_fkey]; next_table_id = qo->fkey_pkey[next_fkey]->table_id; } // if current join needs shuffling if (join == 0 && !params->ht_replicated[table_id]) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); assert(NUM_GPU == NUM_PARTITION); // if this is the first join then we will call rangepartitioningkeyvalue for (int gpu = 0; gpu < NUM_GPU; gpu++) { //do shuffling in each GPU //it's possible there is filter before shuffling fargs[gpu] = new filterArgsGPU{ filter_idx[gpu][0], filter_idx[gpu][1], _compare1[0], _compare2[0], _compare1[1], _compare2[1], 1, 1, NULL, NULL }; int** d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu); int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu); int** d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu); int** d_local_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaMemcpyAsync(d_col_idx, used_col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); assert(off_col[gpu] != NULL); if (off_col[gpu][0] != NULL) { CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); } else { d_in_off=NULL; } CubDebugExit(cudaMemcpyAsync(d_local_off, join_result_off[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); if (verbose) cout << "Partitioning with column id " << fkey->column_id << endl; sargs[gpu] = new shuffleArgsGPU{ d_temp_col, NULL, d_col_idx, d_in_off, NULL, d_local_off, fkey->column_id, fkey->table->table_id, params->min_key[pkey], params->max_key[pkey], NULL, NULL, d_seg_is_replicated[gpu], d_broadcast_idx[gpu] }; kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total); // if (join == 0 && num_select > 0) { //there is filter before this join // assert(0); //not supported for now // } else { kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, Nope, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]); // } // kernellaunch[gpu]->countPartitioning(qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, first_shuffle); bool pipeline = 0; kernellaunch[gpu]->preparePartitioningWithoutCount(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, first_shuffle, pipeline); } // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // kernellaunch[gpu]->synchronizePartitioning(); // } // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // kernellaunch[gpu]->preparePartitioningAfterCount(off_col, used_col_idx, qo->joinGPUcheck, first_shuffle); // } for (int gpu = 0; gpu < NUM_GPU; gpu++) { kernellaunch[gpu]->launchPartitioning(); } for (int gpu = 0; gpu < NUM_GPU; gpu++) { kernellaunch[gpu]->clearPartitioning(); output_selectivity = 1.0; delete fargs[gpu]; delete sargs[gpu]; delete kparams[gpu]; delete kernellaunch[gpu]; } // cudaEventRecord(stop, 0); // cudaEventSynchronize(stop); // cudaEventElapsedTime(&time, start, stop); // if (verbose) cout << "Partitioning time " << time << endl; // cudaEventRecord(start, 0); if (NUM_GPU == 8) shuffleDataNCCL(off_col, h_total, shelper, first_shuffle, sg, stream); else shuffleDataOpt(off_col, h_total, shelper, first_shuffle, sg, stream); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); first_shuffle = false; has_shuffled = true; if (verbose) cout << "Shuffle time " << time << endl; shuffle_time[sg] += time; } //if this is the last join if (next_table_id == -1 || (join + 1) == num_join) { //this is the last join SETUP_TIMING(); float time; cudaEventRecord(start, 0); //do join+groupby (either pipelined or individual) pipeline_column.push_back(make_tuple(fkey, pkey)); int *ht[NUM_GPU][MAX_JOIN] = {}, *fkey_idx[NUM_GPU][MAX_JOIN] = {}; //initialize it to null int *key_column[NUM_GPU][MAX_JOIN] = {}, *group_column[NUM_GPU][MAX_GROUPBY] = {}, *aggr_column[NUM_GPU][MAX_AGGR] = {}; for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) { ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]); ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]); assert(fkey_col_id[pkey_pipeline->table_id - 1] == -1); fkey_col_id[pkey_pipeline->table_id - 1] = fkey_pipeline->column_id; for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(col_idx[gpu][fkey_pipeline->column_id] != NULL); assert(params->ht_GPU[gpu][pkey_pipeline] != NULL); fkey_idx[gpu][pkey_pipeline->table_id -1] = col_idx[gpu][fkey_pipeline->column_id]; ht[gpu][pkey_pipeline->table_id - 1] = params->ht_GPU[gpu][pkey_pipeline]; if (has_shuffled) { key_column[gpu][pkey_pipeline->table_id - 1] = temp_col[gpu][fkey_pipeline->column_id]; assert(temp_col[gpu][fkey_pipeline->column_id] != NULL); } } // cout << fkey_pipeline->column_name << " " << params->selectivity[fkey_pipeline]<< endl; output_selectivity *= params->selectivity[fkey_pipeline]; // cout << output_selectivity << endl; } if (has_shuffled) { if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qo->groupGPUcheck && aggrGPUcheck)) { //have to fix cm->lo_orderdate in case we use synthetic bench if (qo->groupGPUcheck && aggrGPUcheck) { for (int aggr = 0; aggr < qo->aggregation[cm->lo_orderdate].size(); aggr++) { ColumnInfo* column = qo->aggregation[cm->lo_orderdate][aggr]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(temp_col[gpu][column->column_id] != NULL); aggr_column[gpu][aggr] = temp_col[gpu][column->column_id]; } } for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) { if (it->second.size() > 0) { ColumnInfo* column = it->second[0]; ColumnInfo* column_key = it->first; for (int gpu = 0; gpu < NUM_GPU; gpu++) { group_column[gpu][column_key->table_id - 1] = temp_col[gpu][column->column_id]; } } } } } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { int**d_in_off = NULL; int *d_group_col_id = NULL, *d_aggr_col_id = NULL, *d_fkey_col_id = NULL; int*** d_all_col_idx = NULL, **d_seg_row_to_gpu = NULL; if (has_shuffled) { d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu); CubDebugExit(cudaSetDevice(gpu)); assert(off_col[gpu] != NULL); if (off_col[gpu][0] != NULL) { CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); } else { d_in_off=NULL; } CubDebugExit(cudaSetDevice(0)); d_group_col_id = (int*) cm->customCudaMalloc<int>(MAX_GROUPBY, gpu); d_aggr_col_id = (int*) cm->customCudaMalloc<int>(MAX_AGGR, gpu); d_fkey_col_id = (int*) cm->customCudaMalloc<int>(MAX_JOIN, gpu); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(d_group_col_id, group_col_id, MAX_GROUPBY * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaMemcpyAsync(d_aggr_col_id, aggr_col_id, MAX_AGGR * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaMemcpyAsync(d_fkey_col_id, fkey_col_id, MAX_JOIN * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); d_all_col_idx = (int***) cm->customCudaMalloc<int**>(NUM_GPU, gpu); int*** tmp_all_col_idx = new int**[NUM_GPU]; for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) { tmp_all_col_idx[gpu_iter] = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(tmp_all_col_idx[gpu_iter], used_all_col_idx[gpu][gpu_iter], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); } CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(d_all_col_idx, tmp_all_col_idx, NUM_GPU * sizeof(int**), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); d_seg_row_to_gpu = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(d_seg_row_to_gpu, seg_row_to_gpu[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); } int** d_key_column = (int**) cm->customCudaMalloc<int*>(MAX_JOIN, gpu); int** d_group_column = (int**) cm->customCudaMalloc<int*>(MAX_GROUPBY, gpu); int** d_aggr_column = (int**) cm->customCudaMalloc<int*>(MAX_AGGR, gpu); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(d_key_column, key_column[gpu], MAX_JOIN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaMemcpyAsync(d_group_column, group_column[gpu], MAX_GROUPBY * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaMemcpyAsync(d_aggr_column, aggr_column[gpu], MAX_AGGR * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); pargs[gpu] = new probeArgsGPU{ //initialize pargs for this gpu fkey_idx[gpu][0], fkey_idx[gpu][1], fkey_idx[gpu][2], fkey_idx[gpu][3], ht[gpu][0], ht[gpu][1], ht[gpu][2], ht[gpu][3], _dim_len[gpu][0], _dim_len[gpu][1], _dim_len[gpu][2], _dim_len[gpu][3], _min_key[gpu][0], _min_key[gpu][1], _min_key[gpu][2], _min_key[gpu][3], d_key_column, d_fkey_col_id }; gargs[gpu] = new groupbyArgsGPU{ aggr_idx[gpu][0], aggr_idx[gpu][1], group_idx[gpu][0], group_idx[gpu][1], group_idx[gpu][2], group_idx[gpu][3], _min_val[0], _min_val[1], _min_val[2], _min_val[3], _unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3], params->total_val, params->mode_group, d_group_column, d_aggr_column, d_group_col_id, d_aggr_col_id, params->d_group_func }; fargs[gpu] = new filterArgsGPU{ filter_idx[gpu][0], filter_idx[gpu][1], _compare1[0], _compare2[0], _compare1[1], _compare2[1], 1, 1, NULL, NULL }; sargs[gpu] = new shuffleArgsGPU{ NULL, NULL, NULL, d_in_off, NULL, NULL, -1, -1, -1, -1, d_all_col_idx, d_seg_row_to_gpu, d_seg_is_replicated[gpu], d_broadcast_idx[gpu], }; kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total); // if (join == 0 && num_select > 0) { //there is filter before this join (meaning that the last join is the first join and it does not require shuffling) // assert(0); //not supported for now // } else { // cout << "groupgpucheck " << qo->groupGPUcheck << endl; if (qo->groupby_build.size() > 0 && qo->groupGPUcheck && aggrGPUcheck) kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, ProbeGroupby, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]); else if (qo->aggregation[cm->lo_orderdate].size() > 0 && qo->groupGPUcheck && aggrGPUcheck) kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, ProbeAggr, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]); else kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, JustProbe, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]); // } int will_shuffle = 0; // cout << " Here " << endl; kernellaunch[gpu]->prepareKernelFact(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, fkey_col_id, group_col_id, will_shuffle, has_shuffled); if (has_shuffled) { kernellaunch[gpu]->launchKernel(has_shuffled); } else { kernellaunch[gpu]->launchKernel(has_shuffled); } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { int will_shuffle = 0; kernellaunch[gpu]->clearKernel(off_col, will_shuffle, has_shuffled); output_selectivity = 1.0; delete pargs[gpu]; delete fargs[gpu]; delete gargs[gpu]; delete sargs[gpu]; delete kparams[gpu]; delete kernellaunch[gpu]; } pipeline_column.clear(); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Last Probe time " << time << endl; gpu_time[sg] += time; // if this is not the last join but the next join need shuffling (current join cannot be pipelined with next join) } else if (!params->ht_replicated[next_table_id]) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); //do join (either pipelined or individual) pipeline_column.push_back(make_tuple(fkey, pkey)); int *ht[NUM_GPU][4] = {}, *fkey_idx[NUM_GPU][4] = {}; //initialize it to null int *key_column[NUM_GPU][MAX_JOIN] = {}; for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) { ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]); ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]); assert(fkey_col_id[pkey_pipeline->table_id - 1] == -1); fkey_col_id[pkey_pipeline->table_id - 1] = fkey_pipeline->column_id; for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(col_idx[gpu][fkey_pipeline->column_id] != NULL); assert(params->ht_GPU[gpu][pkey_pipeline] != NULL); fkey_idx[gpu][pkey_pipeline->table_id -1] = col_idx[gpu][fkey_pipeline->column_id]; ht[gpu][pkey_pipeline->table_id - 1] = params->ht_GPU[gpu][pkey_pipeline]; if (has_shuffled) { assert(temp_col[gpu][fkey_pipeline->column_id] != NULL); key_column[gpu][pkey_pipeline->table_id - 1] = temp_col[gpu][fkey_pipeline->column_id]; } } output_selectivity *= params->selectivity[fkey_pipeline]; } for (int gpu = 0; gpu < NUM_GPU; gpu++) { int** d_temp_col = NULL, **d_in_off = NULL; if (has_shuffled) { d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu); d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); assert(off_col[gpu] != NULL); if (off_col[gpu][0] != NULL) { CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); } else { d_in_off=NULL; } CubDebugExit(cudaSetDevice(0)); } int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu); int** d_key_column = (int**) cm->customCudaMalloc<int*>(MAX_JOIN, gpu); int* d_fkey_col_id = (int*) cm->customCudaMalloc<int>(MAX_JOIN, gpu); int* d_group_col_id = (int*) cm->customCudaMalloc<int>(MAX_GROUPBY, gpu); int* d_aggr_col_id = (int*) cm->customCudaMalloc<int>(MAX_AGGR, gpu); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(d_key_column, key_column[gpu], MAX_JOIN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaMemcpyAsync(d_col_idx, used_col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaMemcpyAsync(d_fkey_col_id, fkey_col_id, MAX_JOIN * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaMemcpyAsync(d_group_col_id, group_col_id, MAX_GROUPBY * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaMemcpyAsync(d_aggr_col_id, aggr_col_id, MAX_AGGR * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaSetDevice(0)); fargs[gpu] = new filterArgsGPU{ filter_idx[gpu][0], filter_idx[gpu][1], _compare1[0], _compare2[0], _compare1[1], _compare2[1], 1, 1, NULL, NULL }; pargs[gpu] = new probeArgsGPU{ //initialize pargs for this gpu fkey_idx[gpu][0], fkey_idx[gpu][1], fkey_idx[gpu][2], fkey_idx[gpu][3], ht[gpu][0], ht[gpu][1], ht[gpu][2], ht[gpu][3], _dim_len[gpu][0], _dim_len[gpu][1], _dim_len[gpu][2], _dim_len[gpu][3], _min_key[gpu][0], _min_key[gpu][1], _min_key[gpu][2], _min_key[gpu][3], d_key_column, d_fkey_col_id }; gargs[gpu] = new groupbyArgsGPU{ NULL, NULL, NULL, NULL, NULL, NULL, _min_val[0], _min_val[1], _min_val[2], _min_val[3], _unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3], params->total_val, params->mode_group, NULL, NULL, d_group_col_id, d_aggr_col_id, NULL }; sargs[gpu] = new shuffleArgsGPU{ d_temp_col, NULL, d_col_idx, d_in_off, NULL, NULL, next_fkey->column_id, next_fkey->table->table_id, params->min_key[next_pkey], params->max_key[next_pkey], NULL, NULL, d_seg_is_replicated[gpu], d_broadcast_idx[gpu] }; kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total); kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, ProbePartition, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]); int will_shuffle = 1; // for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL); kernellaunch[gpu]->prepareKernelFact(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, fkey_col_id, group_col_id, will_shuffle, has_shuffled); // for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL); kernellaunch[gpu]->launchKernel(has_shuffled); } for (int gpu = 0; gpu < NUM_GPU; gpu++) { int will_shuffle = 1; // for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL); kernellaunch[gpu]->clearKernel(off_col, will_shuffle, has_shuffled); // for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL); output_selectivity = 1.0; delete fargs[gpu]; delete sargs[gpu]; delete pargs[gpu]; delete kparams[gpu]; delete kernellaunch[gpu]; } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Probe Partition time " << time << endl; gpu_time[sg] += time; // assert(0); cudaEventRecord(start, 0); //WARNING: WE NEED TO ELIMINATE ALL COLUMNS WHICH HAVE BEEN USED FOR THIS JOIN if (latemat == 2) { for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) { ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]); ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]); assert(fkey_col_id[pkey_pipeline->table_id - 1] > 0); for (int gpu = 0; gpu < NUM_GPU; gpu++) { assert(used_col_idx[gpu][fkey_pipeline->column_id] != NULL); assert(out_shuffle_col[gpu][fkey_pipeline->column_id] != NULL); used_col_idx[gpu][fkey_pipeline->column_id] = NULL; out_shuffle_col[gpu][fkey_pipeline->column_id] = NULL; if (has_shuffled) { assert(temp_col[gpu][fkey_pipeline->column_id] != NULL); temp_col[gpu][fkey_pipeline->column_id] = NULL; } } fkey_col_id[pkey_pipeline->table_id - 1] = fkey_col_id[pkey_pipeline->table_id - 1] * -1 * 2; } } // if this is not the first join then we will just shuffle right away (we already partition it after probe) if (NUM_GPU == 8) shuffleDataNCCL(off_col, h_total, shelper, first_shuffle, sg, stream); else shuffleDataOpt(off_col, h_total, shelper, first_shuffle, sg, stream); first_shuffle = false; has_shuffled = true; cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Shuffle time " << time << endl; shuffle_time[sg] += time; pipeline_column.clear(); // if next join does not need shuffling (current join can be pipelined with next join) } else { pipeline_column.push_back({fkey, pkey}); } delete[] fargs; delete[] bargs; delete[] pargs; delete[] gargs; delete[] sargs; delete[] kparams; delete[] kernellaunch; } for (int gpu = 0; gpu < NUM_GPU; gpu++) { delete[] temp_col[gpu]; delete[] result_count[gpu]; delete[] in_shuffle_col[gpu]; delete[] out_shuffle_col[gpu]; delete[] in_shuffle_off[gpu]; delete[] out_shuffle_off[gpu]; delete[] used_col_idx[gpu]; delete[] used_broadcast_idx[gpu]; for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) { delete[] used_all_col_idx[gpu][gpu_iter]; } delete[] used_all_col_idx[gpu]; } delete[] temp_col; delete[] result_count; delete[] in_shuffle_col; delete[] in_shuffle_off; delete[] out_shuffle_col; delete[] out_shuffle_off; delete[] d_seg_is_replicated; delete[] used_col_idx; delete[] used_broadcast_idx; delete[] used_all_col_idx; delete[] d_broadcast_idx; } void CPUGPUProcessing::resetTime() { for (int sg = 0 ; sg < MAX_GROUPS; sg++) { cpu_time[sg] = 0; gpu_time[sg] = 0; cpu_to_gpu[sg] = 0; gpu_to_cpu[sg] = 0; nvlink_bytes[sg] = 0; shuffle_time[sg] = 0; } gpu_total = 0; cpu_total = 0; cpu_to_gpu_total = 0; gpu_to_cpu_total = 0; execution_total = 0; optimization_total = 0; merging_total = 0; preparing_total = 0; nvlink_total = 0; shuffle_total = 0; } void CPUGPUProcessing::call_pfilter_probe_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) { int **off_col_out; int _min_key[4] = {0}, _dim_len[4] = {0}; int *ht[4] = {}, *fkey_col[4] = {}; int out_total = 0; ColumnInfo *filter_col[2] = {}; int _compare1[2] = {0}, _compare2[2] = {0}; float output_selectivity = 1.0; int output_estimate = 0; if(qo->joinCPUPipelineCol[sg].size() == 0) return; off_col_out = new int*[cm->TOT_TABLE] (); //initialize to null for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) { if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break; ColumnInfo* column = qo->selectCPUPipelineCol[sg][i]; filter_col[select_so_far + i] = column; _compare1[select_so_far + i] = params->compare1[column]; _compare2[select_so_far + i] = params->compare2[column]; output_selectivity *= params->selectivity[column]; } for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) { ColumnInfo* column = qo->joinCPUPipelineCol[sg][i]; int table_id = qo->fkey_pkey[column]->table_id; fkey_col[table_id - 1] = column->col_ptr; ColumnInfo* pkey = qo->fkey_pkey[column]; ht[table_id - 1] = params->ht_CPU[pkey]; _min_key[table_id - 1] = params->min_key[pkey]; _dim_len[table_id - 1] = params->dim_len[pkey]; output_selectivity *= params->selectivity[column]; } struct filterArgsCPU fargs = { (filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL), (filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL), _compare1[0], _compare2[0], _compare1[1], _compare2[1], 1, 1, (filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL), (filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL) }; struct probeArgsCPU pargs = { fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3], ht[0], ht[1], ht[2], ht[3], _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3], _min_key[0], _min_key[1], _min_key[2], _min_key[3] }; SETUP_TIMING(); float time; cudaEventRecord(start, 0); if (h_off_col == NULL) { output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity; for (int i = 0; i < cm->TOT_TABLE; i++) { if (i == 0 || qo->joinCPUcheck[i]) { if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault)); if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate); } } } else { assert(*h_total > 0); output_estimate = *h_total * output_selectivity; for (int i = 0; i < cm->TOT_TABLE; i++) { if (h_off_col[i] != NULL || i == 0 || qo->joinCPUcheck[i]) { if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault)); if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate); } } } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); cudaEventRecord(start, 0); if (h_off_col == NULL) { struct offsetCPU out_off = { off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4] }; int LEN; if (sg == qo->last_segment[0]) { LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE; } else { LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE; } short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment); filter_probe_CPU( fargs, pargs, out_off, LEN, &out_total, 0, segment_group_ptr); } else { assert(*h_total > 0); struct offsetCPU in_off = { h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4] }; struct offsetCPU out_off = { off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4] }; filter_probe_CPU2( in_off, fargs, pargs, out_off, *h_total, &out_total, 0); if (!custom) { for (int i = 0; i < cm->TOT_TABLE; i++) { if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]); } } } h_off_col = off_col_out; for (int i = 0; i < cm->TOT_TABLE; i++) h_off_col[i] = off_col_out[i]; *h_total = out_total; if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg << endl; assert(*h_total <= output_estimate); // assert(*h_total > 0); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Filter Probe Kernel time CPU: " << time << endl; cpu_time[sg] += time; }; void CPUGPUProcessing::call_probe_group_by_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) { if (*h_total == 0) return; int _min_key[4] = {0}, _dim_len[4] = {0}; int *ht[4] = {}, *fkey_col[4] = {}; int _min_val[4] = {0}, _unique_val[4] = {0}; int *aggr_col[2] = {}, *group_col[4] = {}; for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) { ColumnInfo* column = qo->joinCPUPipelineCol[sg][i]; int table_id = qo->fkey_pkey[column]->table_id; fkey_col[table_id - 1] = column->col_ptr; ColumnInfo* pkey = qo->fkey_pkey[column]; ht[table_id - 1] = params->ht_CPU[pkey]; _min_key[table_id - 1] = params->min_key[pkey]; _dim_len[table_id - 1] = params->dim_len[pkey]; } for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) { ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i]; aggr_col[i] = column->col_ptr; } unordered_map<ColumnInfo*, vector<ColumnInfo*>>::iterator it; for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) { if (it->second.size() > 0) { ColumnInfo* column = it->second[0]; ColumnInfo* column_key = it->first; group_col[column_key->table_id - 1] = column->col_ptr; _min_val[column_key->table_id - 1] = params->min_val[column_key]; _unique_val[column_key->table_id - 1] = params->unique_val[column_key]; } } struct probeArgsCPU pargs = { fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3], ht[0], ht[1], ht[2], ht[3], _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3], _min_key[0], _min_key[1], _min_key[2], _min_key[3] }; struct groupbyArgsCPU gargs = { aggr_col[0], aggr_col[1], group_col[0], group_col[1], group_col[2], group_col[3], _min_val[0], _min_val[1], _min_val[2], _min_val[3], _unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3], params->total_val, params->mode_group, params->h_group_func }; float time; SETUP_TIMING(); cudaEventRecord(start, 0); if (h_off_col == NULL) { int LEN; if (sg == qo->last_segment[0]) { LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE; } else { LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE; } short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment); // ColumnInfo* key_column = cm->s_suppkey; // int count = 0; // for (int i = 0; i < cm->s_suppkey->LEN; i+=2) { // if (ht[0][i+1] != 0) { // // cout << params->ht_CPU[key_column][i] << " " << params->ht_CPU[key_column][i+1] << endl; // count++; // } // } // cout << cm->s_suppkey->LEN << " " << count << endl; // cout << endl; // key_column = cm->c_custkey; // count = 0; // for (int i = 0; i < cm->c_custkey->LEN; i+=2) { // if (ht[1][i+1] != 0) { // // cout << params->ht_CPU[key_column][i] << " " << params->ht_CPU[key_column][i+1] << endl; // count++; // } // } // cout << cm->c_custkey->LEN << " " << count << endl; // cout << endl; probe_group_by_CPU(pargs, gargs, LEN , params->res, 0, segment_group_ptr); } else { struct offsetCPU offset = { h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4] }; probe_group_by_CPU2(offset, pargs, gargs, *h_total, params->res, 0); if (!custom) { for (int i = 0; i < cm->TOT_TABLE; i++) { if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]); } } } cudaEventRecord(stop, 0); // Stop time measuring cudaEventSynchronize(stop); // Wait until the completion of all device // work preceding the most recent call to cudaEventRecord() cudaEventElapsedTime(&time, start, stop); // Saving the time measured if (verbose) cout << "Probe Group Kernel time CPU: " << time << endl; cpu_time[sg] += time; }; void CPUGPUProcessing::call_probe_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) { int **off_col_out; int _min_key[4] = {0}, _dim_len[4] = {0}; int *ht[4] = {}, *fkey_col[4] = {}; int out_total = 0; float output_selectivity = 1.0; int output_estimate = 0; if(qo->joinCPUPipelineCol[sg].size() == 0) return; off_col_out = new int*[cm->TOT_TABLE] (); //initialize to null for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) { ColumnInfo* column = qo->joinCPUPipelineCol[sg][i]; int table_id = qo->fkey_pkey[column]->table_id; fkey_col[table_id - 1] = column->col_ptr; ColumnInfo* pkey = qo->fkey_pkey[column]; ht[table_id - 1] = params->ht_CPU[pkey]; _min_key[table_id - 1] = params->min_key[pkey]; _dim_len[table_id - 1] = params->dim_len[pkey]; output_selectivity *= params->selectivity[column]; } struct probeArgsCPU pargs = { fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3], ht[0], ht[1], ht[2], ht[3], _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3], _min_key[0], _min_key[1], _min_key[2], _min_key[3] }; float time; SETUP_TIMING(); cudaEventRecord(start, 0); if (h_off_col == NULL) { output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity; for (int i = 0; i < cm->TOT_TABLE; i++) { if (i == 0 || qo->joinCPUcheck[i]) { if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault)); if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate); } } } else { assert(*h_total > 0); output_estimate = *h_total * output_selectivity; for (int i = 0; i < cm->TOT_TABLE; i++) { if (h_off_col[i] != NULL || i == 0 || qo->joinCPUcheck[i]) { if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault)); if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate); } } } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); cudaEventRecord(start, 0); if (h_off_col == NULL) { // output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity; // for (int i = 0; i < cm->TOT_TABLE; i++) { // if (i == 0 || qo->joinCPUcheck[i]) { // if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate); // } // } struct offsetCPU out_off = { off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4] }; int LEN; if (sg == qo->last_segment[0]) { LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE; } else { LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE; } short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment); probe_CPU(pargs, out_off, LEN, &out_total, 0, segment_group_ptr); } else { assert(*h_total > 0); // output_estimate = *h_total * output_selectivity; // for (int i = 0; i < cm->TOT_TABLE; i++) { // if (h_off_col[i] != NULL || i == 0 || qo->joinCPUcheck[i]) { // if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate); // } // } struct offsetCPU in_off = { h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4] }; struct offsetCPU out_off = { off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4] }; probe_CPU2(in_off, pargs, out_off, *h_total, &out_total, 0); if (!custom) { for (int i = 0; i < cm->TOT_TABLE; i++) { if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]); } } } h_off_col = off_col_out; for (int i = 0; i < cm->TOT_TABLE; i++) h_off_col[i] = off_col_out[i]; *h_total = out_total; cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg << endl; assert(*h_total <= output_estimate); // assert(*h_total > 0); if (verbose) cout << "Probe Kernel time CPU: " << time << endl; cpu_time[sg] += time; }; //WONT WORK IF JOIN HAPPEN BEFORE FILTER (ONLY WRITE OUTPUT AS A SINGLE COLUMN OFF_COL_OUT[0]) void CPUGPUProcessing::call_pfilter_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) { int **off_col_out; ColumnInfo *filter_col[2] = {}; int out_total = 0; int _compare1[2] = {0}, _compare2[2] = {0}, _mode[2] = {0}; float output_selectivity = 1.0; int output_estimate = 0; if (qo->selectCPUPipelineCol[sg].size() == 0) return; off_col_out = new int*[cm->TOT_TABLE](); //initialize to NULL for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) { if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break; ColumnInfo* column = qo->selectCPUPipelineCol[sg][i]; filter_col[select_so_far + i] = column; _compare1[select_so_far + i] = params->compare1[column]; _compare2[select_so_far + i] = params->compare2[column]; _mode[select_so_far + i] = params->mode[column]; output_selectivity *= params->selectivity[column]; } struct filterArgsCPU fargs = { (filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL), (filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL), _compare1[0], _compare2[0], _compare1[1], _compare2[1], _mode[0], _mode[1], (filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL), (filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL) }; float time; SETUP_TIMING(); cudaEventRecord(start, 0); if (h_off_col == NULL) { output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity; if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[0], output_estimate * sizeof(int), cudaHostAllocDefault)); if (custom) off_col_out[0] = (int*) cm->customCudaHostAlloc<int>(output_estimate); } else { // assert(filter_col[0] == NULL); // assert(filter_col[1] != NULL); assert(*h_total > 0); output_estimate = *h_total * output_selectivity; for (int i = 0; i < cm->TOT_TABLE; i++) { if (h_off_col[i] != NULL || i == 0) { if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault)); if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate); } } } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); cudaEventRecord(start, 0); if (h_off_col == NULL) { // output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity; // if (custom) off_col_out[0] = (int*) cm->customCudaHostAlloc<int>(output_estimate); int LEN; if (sg == qo->last_segment[0]) { LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE; } else { LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE; } short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment); filter_CPU(fargs, off_col_out[0], LEN, &out_total, 0, segment_group_ptr); } else { // assert(filter_col[0] == NULL); // assert(filter_col[1] != NULL); assert(*h_total > 0); // output_estimate = *h_total * output_selectivity; // for (int i = 0; i < cm->TOT_TABLE; i++) { // if (h_off_col[i] != NULL || i == 0) { // if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate); // } // } filter_CPU2(h_off_col[0], fargs, off_col_out[0], *h_total, &out_total, 0); if (!custom) { for (int i = 0; i < cm->TOT_TABLE; i++) { if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]); } } } h_off_col = off_col_out; for (int i = 0; i < cm->TOT_TABLE; i++) h_off_col[i] = off_col_out[i]; *h_total = out_total; if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg << endl; assert(*h_total <= output_estimate); assert(*h_total > 0); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Filter Kernel time CPU: " << time << endl; cpu_time[sg] += time; } void CPUGPUProcessing::call_pfilter_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) { ColumnInfo *filter_col[2] = {}; int _compare1[2] = {0}, _compare2[2] = {0}, _mode[2] = {0}; int *aggr_col[2] = {}; if (qo->aggregation[cm->lo_orderdate].size() == 0) return; if (qo->selectCPUPipelineCol[sg].size() == 0) return; for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) { ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i]; aggr_col[i] = column->col_ptr; } for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) { if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break; ColumnInfo* column = qo->selectCPUPipelineCol[sg][i]; filter_col[select_so_far + i] = column; _compare1[select_so_far + i] = params->compare1[column]; _compare2[select_so_far + i] = params->compare2[column]; _mode[select_so_far + i] = params->mode[column]; } struct groupbyArgsCPU gargs = { aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL, 0, 0, 0, 0, 0, 0, 0, 0, 0, params->mode_group, params->h_group_func }; struct filterArgsCPU fargs = { (filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL), (filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL), _compare1[0], _compare2[0], _compare1[1], _compare2[1], _mode[0], _mode[1], (filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL), (filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL) }; float time; SETUP_TIMING(); cudaEventRecord(start, 0); if (h_off_col == NULL) { assert(0); } else { // assert(*h_total > 0); if (*h_total == 0) return; filter_aggr_CPU2(h_off_col[0], fargs, gargs, params->res, *h_total); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Filter Aggregation Kernel time CPU: " << time << endl; cpu_time[sg] += time; }; void CPUGPUProcessing::call_bfilter_build_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table) { ColumnInfo* column, *filter_col; int* group_ptr = NULL, *filter_ptr = NULL; for (int i = 0; i < qo->join.size(); i++) { if (qo->join[i].second->table_id == table) { column = qo->join[i].second; break; } } if (qo->groupby_build.size() > 0 && qo->groupby_build[column].size() > 0) { group_ptr = qo->groupby_build[column][0]->col_ptr; } if (qo->select_build[column].size() > 0) { filter_col = qo->select_build[column][0]; filter_ptr = filter_col->col_ptr; } struct filterArgsCPU fargs = { filter_ptr, NULL, params->compare1[filter_col], params->compare2[filter_col], 0, 0, params->mode[filter_col], 0, (filter_col != NULL) ? (params->map_filter_func_host[filter_col]) : (NULL), NULL }; struct buildArgsCPU bargs = { column->col_ptr, group_ptr, params->dim_len[column], params->min_key[column] }; if (params->ht_CPU[column] != NULL) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); if (h_off_col == NULL) { int LEN; if (sg == qo->last_segment[table]) { LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE; } else { LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE; } short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment); build_CPU(fargs, bargs, LEN, params->ht_CPU[column], 0, segment_group_ptr); } else { build_CPU2(h_off_col, fargs, bargs, *h_total, params->ht_CPU[column], 0); if (!custom) cudaFreeHost(h_off_col); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Filter Build Kernel time CPU: " << time << endl; cpu_time[sg] += time; } }; void CPUGPUProcessing::call_build_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table) { ColumnInfo* column; int* group_ptr = NULL; for (int i = 0; i < qo->join.size(); i++) { if (qo->join[i].second->table_id == table) { column = qo->join[i].second; break; } } if (qo->groupby_build.size() > 0 && qo->groupby_build[column].size() > 0) { group_ptr = qo->groupby_build[column][0]->col_ptr; } struct filterArgsCPU fargs = {NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL}; struct buildArgsCPU bargs = { column->col_ptr, group_ptr, params->dim_len[column], params->min_key[column] }; if (params->ht_CPU[column] != NULL) { SETUP_TIMING(); float time; cudaEventRecord(start, 0); if (h_off_col == NULL) { int LEN; if (sg == qo->last_segment[table]) { LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE; } else { LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE; } short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment); build_CPU(fargs, bargs, LEN, params->ht_CPU[column], 0, segment_group_ptr); } else { build_CPU2(h_off_col, fargs, bargs, *h_total, params->ht_CPU[column], 0); if (!custom) cudaFreeHost(h_off_col); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Build Kernel time CPU: " << time << endl; cpu_time[sg] += time; } }; void CPUGPUProcessing::call_bfilter_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table) { ColumnInfo* temp; for (int i = 0; i < qo->join.size(); i++) { if (qo->join[i].second->table_id == table) { temp = qo->join[i].second; break; } } // assert(qo->select_build[temp].size() > 0); if (qo->select_build[temp].size() == 0) return; ColumnInfo* column = qo->select_build[temp][0]; int* filter_col = column->col_ptr; int output_estimate = qo->segment_group_count[table][sg] * SEGMENT_SIZE * params->selectivity[column]; SETUP_TIMING(); float time; cudaEventRecord(start, 0); if (custom) h_off_col = (int*) cm->customCudaHostAlloc<int>(output_estimate); else CubDebugExit(cudaHostAlloc((void**) &h_off_col, output_estimate * sizeof(int), cudaHostAllocDefault)); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); cudaEventRecord(start, 0); int LEN; if (sg == qo->last_segment[table]) { LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE; } else { LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE; } struct filterArgsCPU fargs = { filter_col, NULL, params->compare1[column], params->compare2[column], 0, 0, params->mode[column], 0, params->map_filter_func_host[column], NULL }; short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment); filter_CPU(fargs, h_off_col, LEN, h_total, 0, segment_group_ptr); if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg << endl; assert(*h_total <= output_estimate); assert(*h_total > 0); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Filter Kernel time CPU: " << time << endl; cpu_time[sg] += time; }; void CPUGPUProcessing::call_group_by_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) { int _min_val[4] = {0}, _unique_val[4] = {0}; int *aggr_col[2] = {}, *group_col[4] = {}; if (qo->aggregation[cm->lo_orderdate].size() == 0) return; for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) { ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i]; aggr_col[i] = column->col_ptr; } unordered_map<ColumnInfo*, vector<ColumnInfo*>>::iterator it; for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) { if (it->second.size() > 0) { ColumnInfo* column = it->second[0]; ColumnInfo* column_key = it->first; group_col[column_key->table_id - 1] = column->col_ptr; _min_val[column_key->table_id - 1] = params->min_val[column_key]; _unique_val[column_key->table_id - 1] = params->unique_val[column_key]; } } struct groupbyArgsCPU gargs = { aggr_col[0], aggr_col[1], group_col[0], group_col[1], group_col[2], group_col[3], _min_val[0], _min_val[1], _min_val[2], _min_val[3], _unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3], params->total_val, params->mode_group, params->h_group_func }; struct offsetCPU offset = { h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4], }; SETUP_TIMING(); float time; cudaEventRecord(start, 0); cout << *h_total << endl; if (*h_total > 0) groupByCPU(offset, gargs, *h_total, params->res); if (!custom) { for (int i = 0; i < cm->TOT_TABLE; i++) { if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]); } } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Group Kernel time CPU: " << time << endl; cpu_time[sg] += time; }; void CPUGPUProcessing::call_aggregation_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg) { int *aggr_col[2] = {}; if (qo->aggregation[cm->lo_orderdate].size() == 0) return; for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) { ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i]; aggr_col[i] = column->col_ptr; } struct groupbyArgsCPU gargs = { aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL, 0, 0, 0, 0, 0, 0, 0, 0, 0, params->mode_group, params->h_group_func }; SETUP_TIMING(); float time; cudaEventRecord(start, 0); // assert(h_off_col != NULL); if (*h_total > 0) aggregationCPU(h_off_col, gargs, *h_total, params->res); if (!custom) cudaFree(h_off_col); cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Aggr Kernel time CPU: " << time << endl; cpu_time[sg] += time; }; void CPUGPUProcessing::call_probe_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) { int _min_key[4] = {0}, _dim_len[4] = {0}; int *ht[4] = {}, *fkey_col[4] = {}; int *aggr_col[2] = {}; for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) { ColumnInfo* column = qo->joinCPUPipelineCol[sg][i]; int table_id = qo->fkey_pkey[column]->table_id; fkey_col[table_id - 1] = column->col_ptr; ColumnInfo* pkey = qo->fkey_pkey[column]; ht[table_id - 1] = params->ht_CPU[pkey]; _min_key[table_id - 1] = params->min_key[pkey]; _dim_len[table_id - 1] = params->dim_len[pkey]; } for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) { ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i]; aggr_col[i] = column->col_ptr; } struct probeArgsCPU pargs = { fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3], ht[0], ht[1], ht[2], ht[3], _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3], _min_key[0], _min_key[1], _min_key[2], _min_key[3] }; struct groupbyArgsCPU gargs = { aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL, 0, 0, 0, 0, 0, 0, 0, 0, 0, params->mode_group, params->h_group_func }; cudaEvent_t start, stop; float time; cudaEventCreate(&start); cudaEventCreate(&stop); cudaEventRecord(start, 0); if (h_off_col == NULL) { int LEN; if (sg == qo->last_segment[0]) { LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE; } else { LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE; } short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment); probe_aggr_CPU(pargs, gargs, LEN, params->res, 0, segment_group_ptr); } else { struct offsetCPU offset = { h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4] }; probe_aggr_CPU2(offset, pargs, gargs, *h_total, params->res, 0); } cudaEventRecord(stop, 0); // Stop time measuring cudaEventSynchronize(stop); // Wait until the completion of all device // work preceding the most recent call to cudaEventRecord() cudaEventElapsedTime(&time, start, stop); // Saving the time measured if (verbose) cout << "Probe Aggr Kernel time CPU: " << time << endl; cpu_time[sg] += time; }; void CPUGPUProcessing::call_pfilter_probe_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) { int _min_key[4] = {0}, _dim_len[4] = {0}; int *ht[4] = {}, *fkey_col[4] = {}; ColumnInfo* filter_col[2] = {}; int _compare1[2] = {0}, _compare2[2] = {0}; int *aggr_col[2] = {}; for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) { if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break; ColumnInfo* column = qo->selectCPUPipelineCol[sg][i]; filter_col[select_so_far + i] = column; _compare1[select_so_far + i] = params->compare1[column]; _compare2[select_so_far + i] = params->compare2[column]; } for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) { ColumnInfo* column = qo->joinCPUPipelineCol[sg][i]; int table_id = qo->fkey_pkey[column]->table_id; fkey_col[table_id - 1] = column->col_ptr; ColumnInfo* pkey = qo->fkey_pkey[column]; ht[table_id - 1] = params->ht_CPU[pkey]; _min_key[table_id - 1] = params->min_key[pkey]; _dim_len[table_id - 1] = params->dim_len[pkey]; } for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) { ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i]; aggr_col[i] = column->col_ptr; } struct filterArgsCPU fargs = { (filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL), (filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL), _compare1[0], _compare2[0], _compare1[1], _compare2[1], 1, 1, (filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL), (filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL) }; struct probeArgsCPU pargs = { fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3], ht[0], ht[1], ht[2], ht[3], _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3], _min_key[0], _min_key[1], _min_key[2], _min_key[3] }; struct groupbyArgsCPU gargs = { aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL, 0, 0, 0, 0, 0, 0, 0, 0, 0, params->mode_group, params->h_group_func }; cudaEvent_t start, stop; // variables that holds 2 events float time; // Variable that will hold the time cudaEventCreate(&start); // creating the event 1 cudaEventCreate(&stop); // creating the event 2 cudaEventRecord(start, 0); // start measuring the time if (h_off_col == NULL) { int LEN; if (sg == qo->last_segment[0]) { LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE; } else { LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE; } short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment); filter_probe_aggr_CPU(fargs, pargs, gargs, LEN, params->res, 0, segment_group_ptr); } else { struct offsetCPU offset = { h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4] }; filter_probe_aggr_CPU2(offset, fargs, pargs, gargs, *h_total, params->res, 0); } cudaEventRecord(stop, 0); cudaEventSynchronize(stop); cudaEventElapsedTime(&time, start, stop); if (verbose) cout << "Filter Probe Aggr Kernel time CPU: " << time << endl; cpu_time[sg] += time; };