#ifndef _MULTI_PROCESSING_H_ #define _MULTI_PROCESSING_H_ #pragma once #include "QueryOptimizer.h" #include "GPUProcessing.h" #include "CPUProcessing.h" #include "common.h" class CPUGPUProcessing { public: CacheManager* cm; QueryOptimizer* qo; ncclComm_t* comms; bool custom; bool skipping; bool reorder; int*** col_idx; //for each gpu, store the local index of each segment involved in this kernel int**** all_col_idx; //for each gpu, store the index of each segment for each column in each gpu int*** seg_row_to_gpu; //for each gpu, store the location of each segment row for each table int*** d_off_to_seg_id; //for each gpu, for each table, convert the offset to segment id of that table for broadcasted data int*** broadcast_idx; //same as col_idx but for broadcasted segment int*** d_broadcast_idx; //gpu located version of broadcast_idx int** broadcast_count;//for each gpu, for each table, count broadcasted segment chrono::high_resolution_clock::time_point begin_time; bool verbose; double cpu_total; double gpu_total; double* cpu_time; double* gpu_time; double* nvlink_bytes; double* shuffle_time; unsigned long long* cpu_to_gpu; unsigned long long* gpu_to_cpu; unsigned long long cpu_to_gpu_total; unsigned long long gpu_to_cpu_total; double execution_total; double optimization_total; double merging_total; double preparing_total; double nvlink_total; double shuffle_total; int sg_broadcast_count; CPUGPUProcessing(size_t _cache_size, size_t _broadcast_size, size_t _processing_size, size_t _pinned_memsize, ncclComm_t* _comms, bool _verbose, bool _skipping, bool _reorder); ~CPUGPUProcessing() { for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) { for (int col = 0; col < cm->TOT_COLUMN; col++) { delete[] all_col_idx[gpu][gpu_iter][col]; } delete[] all_col_idx[gpu][gpu_iter]; } delete[] all_col_idx[gpu]; } delete[] all_col_idx; for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int table = 0; table < cm->TOT_TABLE; table++) { delete[] seg_row_to_gpu[gpu][table]; } delete[] seg_row_to_gpu[gpu]; } delete[] seg_row_to_gpu; for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int i = 0; i < cm->TOT_COLUMN; i++) { delete[] col_idx[gpu][i]; } delete[] col_idx[gpu]; } delete[] col_idx; delete[] cpu_time; delete[] gpu_time; delete[] shuffle_time; delete[] nvlink_bytes; delete[] cpu_to_gpu; delete[] gpu_to_cpu; delete qo; } void resetCGP() { for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int i = 0; i < cm->TOT_COLUMN; i++) { if (col_idx[gpu][i] != NULL && !custom) cudaFree(col_idx[gpu]); if (cm->allColumn[i]->table_id > 0) { //only do this for dimension table //we might want to remove this memset(broadcast_idx[gpu][i], -1, cm->allColumn[i]->total_segment * sizeof(int)); } col_idx[gpu][i] = NULL; } } } void renewMetadata() { cudaStream_t stream[NUM_GPU]; for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaStreamCreate(&stream[gpu])); } CubDebugExit(cudaSetDevice(0)); //TODO: these two metadata transfers should have been done after every replication epoch //transferring global index to each GPU for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) { for (int column = 0; column < NUM_COLUMN; column++) { assert(all_col_idx[gpu][gpu_iter][column] != NULL); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(all_col_idx[gpu][gpu_iter][column], cm->segment_list[gpu_iter][column], cm->allColumn[column]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaStreamSynchronize(stream[gpu])); CubDebugExit(cudaSetDevice(0)); } } } //transferring seg_row_to_GPU for (int gpu = 0; gpu < NUM_GPU; gpu++) { for (int table = 0; table < NUM_TABLE; table++) { assert(seg_row_to_gpu[gpu][table] != NULL); CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaMemcpyAsync(seg_row_to_gpu[gpu][table], cm->seg_row_to_single_gpu[table], cm->allTable[table]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); CubDebugExit(cudaStreamSynchronize(stream[gpu])); CubDebugExit(cudaSetDevice(0)); } } for (int gpu = 0; gpu < NUM_GPU; gpu++) { CubDebugExit(cudaSetDevice(gpu)); CubDebugExit(cudaStreamSynchronize(stream[gpu])); CubDebugExit(cudaStreamDestroy(stream[gpu])); } CubDebugExit(cudaSetDevice(0)); } void resetTime(); void switchDeviceGPUtoCPU(int* h_total_all, int** h_total, int*** off_col, int** h_off_col, int sg, cudaStream_t* stream); void switchDeviceCPUtoGPUScatter(int** h_total, int*** off_col, int*** h_off_col_part, int sg, cudaStream_t* stream); void switchDeviceCPUtoGPUBroadcast(int* h_total_all, int** h_total, int*** off_col, int** h_off_col, int sg, cudaStream_t* stream); void switchDeviceCPUtoGPUBroadcastDim(int* h_total_all, int** h_total, int*** off_col, int* h_off_col, int table, int sg, cudaStream_t* stream); void broadcastIdxTransfer(int*** d_broadcast_idx, int*** used_broadcast_idx, cudaStream_t* stream); void broadcastSegments(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res); void broadcastSegments2(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res); void broadcastSegmentsNCCL(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res, cudaStream_t* stream); void broadcastSegmentsNCCL2(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res, cudaStream_t* stream); // void switch_device_fact(int*** &off_col, int** &h_off_col_all, int** &d_total, int** h_total, int sg, int mode, int table, cudaStream_t* stream); // void switch_device_dim(int** &off_col, int* &h_off_col_all, int** &d_total, int** h_total, int sg, int mode, int table, cudaStream_t* stream); void shuffleData(int*** &off_col, int** h_total, struct shuffleHelper* shelper, bool first_shuffle, cudaStream_t* stream); void shuffleDataNCCL(int*** &off_col, int** h_total, struct shuffleHelper* shelper, bool first_shuffle, int sg, cudaStream_t* stream); void shuffleDataOpt(int*** &off_col, int** h_total, struct shuffleHelper* shelper, bool first_shuffle, int sg, cudaStream_t* stream); // void call_pfilter_probe_GPU(QueryParams* params, int** &off_col, int* &d_total, int* h_total, int sg, int select_so_far, cudaStream_t stream); // void call_probe_group_by_GPU(QueryParams* params, int** &off_col, int* h_total, int sg, cudaStream_t stream); void call_probe_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, cudaStream_t* stream); // void call_pfilter_GPU(QueryParams* params, int** &off_col, int* &d_total, int* h_total, int sg, int select_so_far, cudaStream_t stream); // void call_group_by_GPU(QueryParams* params, int** &off_col, int* h_total, int sg, cudaStream_t stream); // void call_aggregation_GPU(QueryParams* params, int* &off_col, int* h_total, int sg, cudaStream_t stream); // void call_probe_aggr_GPU(QueryParams* params, int** &off_col, int* h_total, int sg, cudaStream_t stream); // void call_pfilter_probe_aggr_GPU(QueryParams* params, int** &off_col, int* h_total, int sg, int select_so_far, cudaStream_t stream); // void call_bfilter_build_GPU(QueryParams* params, int* &d_off_col, int* h_total, int sg, int table, cudaStream_t stream); // void call_build_GPU(QueryParams* params, int* &d_off_col, int* h_total, int sg, int table, cudaStream_t stream); // void call_bfilter_GPU(QueryParams* params, int* &d_off_col, int* &d_total, int* h_total, int sg, int table, cudaStream_t stream); void call_operator_fact_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int select_so_far, int latemat, cudaStream_t* stream); void call_operator_fact_pipeline_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int select_so_far, int latemat, cudaStream_t* stream); void call_filter_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int table, int select_so_far, cudaStream_t* stream); void call_filter_partition_CPU(QueryParams* params, int*** h_off_col_part, int** h_total, int sg, int table); void call_operator_dim_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int table, bool filter_on_gpu, bool broadcast, bool already_partitioned, cudaStream_t* stream); void call_pfilter_probe_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far); void call_probe_group_by_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg); void call_probe_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg); void call_pfilter_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far); void call_group_by_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg); void call_aggregation_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg); void call_probe_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg); void call_pfilter_probe_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far); void call_bfilter_build_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table); void call_build_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table); void call_bfilter_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table); void call_pfilter_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far); }; #endif // void // CPUGPUProcessing::call_probe_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, cudaStream_t* stream) { // int _min_key[4] = {0}, _dim_len[4] = {0}; // float output_selectivity = 1.0; // bool first_shuffle = true; // if(qo->joinGPUPipelineCol[sg].size() == 0) return; // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // CubDebugExit(cudaSetDevice(gpu)); // CubDebugExit(cudaMemsetAsync(d_total[gpu], 0, sizeof(int), stream[gpu])); // } // CubDebugExit(cudaSetDevice(0)); // int num_join = qo->joinGPUPipelineCol[sg].size(); // for (int join = 0; join < num_join; join++) { // ColumnInfo* column = qo->joinGPUPipelineCol[sg][join]; // int table_id = qo->fkey_pkey[column]->table_id; // ColumnInfo* pkey = qo->fkey_pkey[column]; // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // cm->indexTransfer(col_idx[gpu], column, stream[gpu], custom, gpu); // assert(col_idx[gpu][column->column_id] != NULL); // cpu_to_gpu[sg] += (column->total_segment * sizeof(int)); // } // _min_key[table_id - 1] = params->min_key[pkey]; // _dim_len[table_id - 1] = params->dim_len[pkey]; // output_selectivity *= params->selectivity[column]; // } // //these four structs are helper for shuffling // int*** temp_col = new int**[NUM_GPU](); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // temp_col[gpu] = new int*[NUM_COLUMN](); // } // int** result_count = new int*[NUM_GPU](); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // result_count[gpu] = new int[NUM_PARTITION](); // } // //offset output of local join // int*** join_result_off = new int**[NUM_GPU](); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // join_result_off[gpu] = new int*[NUM_TABLE](); // } // //output of partitioning (column) // int**** out_shuffle_col = new int***[NUM_GPU](); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // out_shuffle_col[gpu] = new int**[NUM_COLUMN](); // } // //output of partitioning (offset) // int**** out_shuffle_off = new int***[NUM_GPU](); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // out_shuffle_off[gpu] = new int**[NUM_TABLE](); // } // shuffleHelper* shelper = new shuffleHelper{ // result_count, temp_col, join_result_off, out_shuffle_col, out_shuffle_off // }; // vector<tuple<ColumnInfo*, ColumnInfo*>> pipeline_column; // for (int join = 0; join < num_join; join++) { // ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join]; // int table_id = qo->fkey_pkey[fkey]->table_id; // ColumnInfo* pkey = qo->fkey_pkey[fkey]; // int next_table_id = -1; ColumnInfo* next_fkey = NULL; // if (join != num_join - 1) { // next_fkey = qo->joinGPUPipelineCol[sg][join+1]; // next_table_id = qo->fkey_pkey[next_fkey]->table_id; // } // // if current join need shuffling // if (!params->ht_replicated[table_id]) { // assert(NUM_GPU == NUM_PARTITION); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // //do shuffling in each GPU // filterArgsGPU* fargs = new filterArgsGPU(); // probeArgsGPU* pargs = new probeArgsGPU(); // groupbyArgsGPU* gargs = new groupbyArgsGPU(); // buildArgsGPU* bargs = new buildArgsGPU(); // int** d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu); // int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu); // int** d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu); // int** d_local_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu); // CubDebugExit(cudaSetDevice(gpu)); // CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); // CubDebugExit(cudaMemcpyAsync(d_col_idx, col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); // if (off_col[gpu] != NULL) { // CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); // } else { // d_in_off=NULL; // } // CubDebugExit(cudaMemcpyAsync(d_local_off, join_result_off[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); // CubDebugExit(cudaSetDevice(0)); // shuffleArgsGPU* sargs = new shuffleArgsGPU{ // d_temp_col, d_col_idx, d_in_off, d_local_off, fkey->column_id, fkey->table->table_id // }; // KernelParams* kparams = new KernelParams(fargs, pargs, bargs, gargs, sargs, shelper, d_total, h_total); // KernelLaunch* kernellaunch = new KernelLaunch(cm, kparams, sg, gpu, 0, table_id, output_selectivity, stream[gpu]); // kernellaunch->preparePartitioning(off_col, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, first_shuffle); // kernellaunch->launchPartitioning(); // kernellaunch->clearPartitioning(); // } // shuffleData(off_col, shelper, first_shuffle, stream); // first_shuffle = false; // } // // if next join need shuffling (current join cannot be pipelined with next join) // if (!params->ht_replicated[next_table_id] || (join + 1) == num_join) { // //do join (either pipelined or individual) // pipeline_column.push_back(make_tuple(fkey, pkey)); // int *ht[NUM_GPU][4] = {}, *fkey_idx[NUM_GPU][4] = {}; //initialize it to null // for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) { // ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]); // ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // fkey_idx[gpu][pkey_pipeline->table_id -1] = col_idx[gpu][fkey_pipeline->column_id]; // ht[gpu][pkey_pipeline->table_id - 1] = params->ht_GPU[gpu][pkey_pipeline]; // } // } // //initialize kernel launch object // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // probeArgsGPU* pargs = new probeArgsGPU{ //initialize pargs for this gpu // fkey_idx[gpu][0], fkey_idx[gpu][1], fkey_idx[gpu][2], fkey_idx[gpu][3], // ht[gpu][0], ht[gpu][1], ht[gpu][2], ht[gpu][3], // _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3], // _min_key[0], _min_key[1], _min_key[2], _min_key[3] // }; // filterArgsGPU* fargs = new filterArgsGPU(); // groupbyArgsGPU* gargs = new groupbyArgsGPU(); // shuffleArgsGPU* sargs = new shuffleArgsGPU(); // buildArgsGPU* bargs = new buildArgsGPU(); // KernelParams* kparams = new KernelParams(fargs, pargs, bargs, gargs, sargs, shelper, d_total, h_total); // KernelLaunch* kernellaunch = new KernelLaunch(cm, kparams, sg, gpu, 0, table_id, output_selectivity, stream[gpu]); // kernellaunch->prepareKernelFact(off_col, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck); // kernellaunch->launchKernel(); // kernellaunch->clearKernel(off_col); // } // pipeline_column.clear(); // // if next join does not need shuffling (current join can be pipelined with next join) // } else { // pipeline_column.push_back({fkey, pkey}); // } // } // } // void // CPUGPUProcessing::switch_device_fact(int*** &off_col, int** &h_off_col_all, int** &d_total, int** h_total, int sg, int mode, int table, cudaStream_t* stream) { // float time; // SETUP_TIMING(); // if (mode == 0) { // // if (h_off_col == NULL) return; // // assert(h_off_col != NULL); // // // assert(*h_total > 0); // DONT BE SURPRISED IF WE REACHED THIS FOR 19980401-19980430 PREDICATES CAUSE THE RESULT IS 0 // // assert(h_off_col[0] != NULL); // // off_col = new int*[cm->TOT_TABLE](); // // CubDebugExit(cudaMemcpyAsync(d_total, h_total, sizeof(int), cudaMemcpyHostToDevice, stream)); // // CubDebugExit(cudaStreamSynchronize(stream)); // // cpu_to_gpu[sg] += (1 * sizeof(int)); // // cudaEventRecord(start, 0); // // for (int i = 0; i < cm->TOT_TABLE; i++) { // // if (h_off_col[i] != NULL) { // // if (!custom) CubDebugExit(cudaMalloc((void**) &off_col[i], *h_total * sizeof(int))); // // if (custom) off_col[i] = (int*) cm->customCudaMalloc<int>(*h_total); // // } // // } // assert(0); // } else { // if (off_col == NULL) return; // assert(off_col != NULL); // for (int gpu = 0; gpu < NUM_GPU; gpu++) assert(off_col[gpu][0] != NULL); // h_off_col_all = new int*[cm->TOT_TABLE](); // gpu_to_cpu[sg] += (1 * sizeof(int)); // cudaEventRecord(start, 0); // //THIS WOULD NOT WORK BECAUSE *h_total_all HAS TO BE RESET FOR EACH TABLE // for (int table = 0; table < NUM_TABLE; table++) { // int h_total_all = 0; // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // if (off_col[gpu][table] != NULL) { // assert(*(h_total[gpu]) > 0); // h_total_all += *(h_total[gpu]); // } // } // if (h_total_all > 0) // h_off_col_all[table] = (int*) cm->customCudaHostAlloc<int>(h_total_all); // else // h_off_col_all[table] = NULL; // } // } // cudaEventRecord(stop, 0); // cudaEventSynchronize(stop); // cudaEventElapsedTime(&time, start, stop); // malloc_time[sg] += time; // // cout << "sg: " << sg << " transfer malloc time: " << malloc_time[sg]<< endl; // if (verbose) cout << "Transfer size: " << *h_total << " sg: " << sg << endl; // cudaEventRecord(start, 0); // if (mode == 0) { //CPU to GPU // // for (int i = 0; i < cm->TOT_TABLE; i++) { // // if (h_off_col[i] != NULL) { // // // if (custom) off_col[i] = (int*) cm->customCudaMalloc<int>(*h_total); // // CubDebugExit(cudaMemcpyAsync(off_col[i], h_off_col[i], *h_total * sizeof(int), cudaMemcpyHostToDevice, stream)); // // CubDebugExit(cudaStreamSynchronize(stream)); // // if (!custom) cudaFreeHost(h_off_col[i]); // // cpu_to_gpu[sg] += (*h_total * sizeof(int)); // // } // // } // // CubDebugExit(cudaStreamSynchronize(stream)); // assert(0); // } else { // GPU to CPU // for (int table = 0; table < NUM_TABLE; table++) { // int temp = 0; // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // if (off_col[gpu][table] != NULL) { // assert(h_off_col_all[table] != NULL); // assert(*(h_total[gpu]) > 0); // CubDebugExit(cudaSetDevice(gpu)); // CubDebugExit(cudaMemcpyAsync(h_off_col_all[table] + temp, off_col[gpu][table], *(h_total[gpu]) * sizeof(int), cudaMemcpyDeviceToHost, stream[gpu])); // CubDebugExit(cudaStreamSynchronize(stream[gpu])); // gpu_to_cpu[sg] += (*(h_total[gpu]) * sizeof(int)); // temp += *(h_total[gpu]); // } // } // CubDebugExit(cudaSetDevice(0)); // } // } // cudaEventRecord(stop, 0); // cudaEventSynchronize(stop); // cudaEventElapsedTime(&time, start, stop); // if (verbose) cout << "Transfer Time: " << time << " sg: " << sg << endl; // transfer_time[sg] += time; // } // void // CPUGPUProcessing::switch_device_dim(int** &off_col, int* &h_off_col_all, int** &d_total, int** h_total, int sg, int mode, int table, cudaStream_t* stream) { // float time; // SETUP_TIMING(); // cudaEventRecord(start, 0); // if (mode == 0) { // // if (h_off_col == NULL) return; // // assert(h_off_col != NULL); // // assert(*h_total > 0); // // CubDebugExit(cudaMemcpyAsync(d_total, h_total, sizeof(int), cudaMemcpyHostToDevice, stream)); // // CubDebugExit(cudaStreamSynchronize(stream)); // // cpu_to_gpu[sg] += (1 * sizeof(int)); // // if (!custom) CubDebugExit(cudaMalloc((void**) &d_off_col, *h_total * sizeof(int))); // // if (custom) d_off_col = (int*) cm->customCudaMalloc<int>(*h_total); // assert(0); // } else { // if (off_col == NULL) return; // assert(off_col != NULL); // for (int gpu = 0; gpu < NUM_GPU; gpu++) assert(off_col[gpu] != NULL); // int h_total_all = 0; // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // if (off_col[gpu] != NULL) { // assert(*(h_total[gpu]) > 0); // h_total_all += *(h_total[gpu]); // } // } // if (h_total_all > 0) // h_off_col_all = (int*) cm->customCudaHostAlloc<int>(h_total_all); // else // h_off_col_all = NULL; // } // cudaEventRecord(stop, 0); // cudaEventSynchronize(stop); // cudaEventElapsedTime(&time, start, stop); // malloc_time[sg] += time; // if (verbose) cout << "Transfer size: " << *h_total << " sg: " << sg << endl; // cudaEventRecord(start, 0); // if (mode == 0) { //CPU to GPU // // if (custom) d_off_col = (int*) cm->customCudaMalloc<int>(*h_total); // // if (h_off_col != NULL) { // // CubDebugExit(cudaMemcpyAsync(d_off_col, h_off_col, *h_total * sizeof(int), cudaMemcpyHostToDevice, stream)); // // CubDebugExit(cudaStreamSynchronize(stream)); // // cpu_to_gpu[sg] += (*h_total * sizeof(int)); // // // if (!custom) cudaFreeHost(h_off_col); //TODO: UNCOMMENTING THIS WILL CAUSE SEGFAULT BECAUSE THERE IS A POSSIBILITY OF // // //FILTER CPU -> SWITCH -> BUILD GPU -> BUILD CPU (H_OFF_COL WILL BE USED AGAIN IN BUILD CPU) // // } else d_off_col = NULL; // // CubDebugExit(cudaStreamSynchronize(stream)); // assert(0); // } else { // GPU to CPU // assert(h_off_col_all != NULL); // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // int temp = 0; // if (off_col[gpu] != NULL) { // assert(*(h_total[gpu]) > 0); // CubDebugExit(cudaSetDevice(gpu)); // CubDebugExit(cudaMemcpyAsync(h_off_col_all + temp, off_col[gpu], *(h_total[gpu]) * sizeof(int), cudaMemcpyDeviceToHost, stream[gpu])); // CubDebugExit(cudaStreamSynchronize(stream[gpu])); // gpu_to_cpu[sg] += (*(h_total[gpu]) * sizeof(int)); // temp += *(h_total[gpu]); // } // } // CubDebugExit(cudaSetDevice(0)); // } // cudaEventRecord(stop, 0); // cudaEventSynchronize(stop); // cudaEventElapsedTime(&time, start, stop); // if (verbose) cout << "Transfer Time: " << time << " sg: " << sg << endl; // transfer_time[sg] += time; // } // for (int gpu = 0; gpu < NUM_GPU; gpu++) { // assert(params->ht_GPU[gpu][key_column] != NULL); // if (params->ht_GPU[gpu][key_column] != NULL) { // filterArgsGPU* fargs; // if (filter_idx[gpu]) { // fargs = new filterArgsGPU{ // filter_idx[gpu], NULL, // params->compare1[filter_col], params->compare2[filter_col], 0, 0, // params->mode[filter_col], 0, // NULL, NULL // }; // } else { // fargs = new filterArgsGPU{ // NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL // }; // } // // buildArgsGPU* bargs = new buildArgsGPU{ // // dimkey_idx[gpu], group_idx[gpu], // // params->dim_len[key_column], params->min_key[key_column] // // }; // //WE WILL ALWAYS SET GROUPBY IDX TO NULL FOR NOW // buildArgsGPU* bargs = new buildArgsGPU{ // dimkey_idx[gpu], NULL, // params->dim_len[key_column], params->min_key[key_column] // }; // probeArgsGPU* pargs = new probeArgsGPU(); // groupbyArgsGPU* gargs = new groupbyArgsGPU(); // shuffleArgsGPU* sargs = new shuffleArgsGPU(); // shuffleHelper* shelper = new shuffleHelper(); // KernelParams* kparams = new KernelParams(fargs, pargs, bargs, gargs, sargs, shelper, NULL, h_total); // KernelLaunch* kernellaunch = new KernelLaunch(cm, kparams, params, sg, gpu, 3, table, 1, stream[gpu]); // kernellaunch->prepareKernelDim(d_off_col, key_column, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, false); // kernellaunch->launchKernel(false); // //clear kernel is not called for dim table // } // } // //range partition with local offset (intermediate result of join after the first shuffle) but you have to materialize the global offset for the build table // template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE> // __global__ void RangePartitioningKeyValue3(int* gpuCache, // struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, // int write_val = 1, int write_offset = 0) { // __const__ int column_map[NUM_COLUMN] = { // 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 2, 2, 2, 2, // 1, 1, 1, 1, // 3, 3, 3, 3, // 4, 4, 4 // }; // int item_off[NUM_TABLE][ITEMS_PER_THREAD]; // int item_key[ITEMS_PER_THREAD]; // int item_val[ITEMS_PER_THREAD]; // int selection_flags[ITEMS_PER_THREAD]; // int index[ITEMS_PER_THREAD]; // int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; // int tile_offset = blockIdx.x * tile_size; // int num_tiles = (num_tuples + tile_size - 1) / tile_size; // int num_tile_items = tile_size; // if (blockIdx.x == num_tiles - 1) { // num_tile_items = num_tuples - tile_offset; // } // //this range_size would not work for date column // int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) // __shared__ int smem_index[NUM_RANGE]; // __syncthreads(); // if (threadIdx.x < NUM_RANGE) { // smem_index[threadIdx.x] = 0; // } // __syncthreads(); // #pragma unroll // for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { // index[ITEM] = 0; // } // InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); // cudaAssert(sargs.column[sargs.key_column] != NULL); // //loading local offset // int* ptr = sargs.local_off[sargs.table]; // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[sargs.table], num_tile_items); // BlockReadOffsetAndCountIndexPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[sargs.table], sargs.column[sargs.key_column], item_key, index, smem_index, num_tile_items, range_size); // __syncthreads(); // if (threadIdx.x < NUM_RANGE) { // smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); // } // __syncthreads(); // #pragma unroll // for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { // if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { // int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition // if (partition >= NUM_RANGE) partition = NUM_RANGE-1; // index[ITEM] += smem_index[partition]; // } // } // __syncthreads(); // BlockWritePartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, sout.column[sargs.key_column], num_tile_items, range_size, 0, 0); // //read the local offset (or global offset if the table is the build table) // for (int table = 0; table < NUM_TABLE; table++) { // if (sargs.table != table && sargs.local_off[table] != NULL) { // cudaAssert(sargs.local_off[table] != NULL); // ptr = sargs.local_off[table]; // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[table], num_tile_items); // } // } // if (write_val) { // //scatter the value // cudaAssert(sargs.col_idx != NULL); // //use the local offset to scatter the value // //TODO: we have to support the case where the offset for dimension table is a global offset // for (int col = 0; col < NUM_COLUMN; col++) { // int table = column_map[col]; // // if (col == 11 || col == 15) printf("%d %d\n", sargs.local_off[table] != NULL, sargs.col_idx[col] != NULL); // //materializing column using local offset for table 0 from current join (we assume that all input table (joined) is now called table 0) // //TODO: THIS PART DOESN'T WORK BECAUSE THE CODE ALWAYS GO TO THE FIRST BRANCH (sargs.column[col] != NULL for every col involved in the query) // if (sargs.column[col] != NULL && sargs.key_column != col) { // cudaAssert(sargs.column[col] != NULL); // cudaAssert(sout.column[col] != NULL); // // if (col == 11 || col == 15) printf("im here 1\n"); // BlockReadOffsetShuffle<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], sargs.column[col], item_val, num_tile_items); // BlockWriteValPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, sout.column[col], num_tile_items, range_size); // //for now sargs.col_idx[col] should be NULL except for groupby column in SSB in dimension table // //sargs.local_off[table] here is global offset from dimension table // //materializing column using local offset from current join // } else if (sargs.local_off[table] != NULL && sargs.col_idx[col] != NULL && sargs.table != table && sargs.key_column != col) { //materializing the global offset // cudaAssert(table != 0); // cudaAssert(sargs.col_idx[col] != NULL); // cudaAssert(sargs.broadcast_idx[col] != NULL); // cudaAssert(sout.column[col] != NULL); // // if (col == 11 || col == 15) printf("im here 2\n"); // BlockReadOffsetGPU3<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[table], item_val, selection_flags, gpuCache, sargs.col_idx[col], sargs.broadcast_idx[col], num_tile_items); // BlockWriteValPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, sout.column[col], num_tile_items, range_size); // } // } // } // if (write_offset) { // //scatter the offset (global offset and not the local offset), local offset would be useless once the shuffle is done // for (int table = 0; table < NUM_TABLE; table++) { // cudaAssert(sargs.in_off != NULL); // //materializing global offset (fact) using local offset table 0 from current join // if (sargs.in_off[table] != NULL && sargs.table == 0) { // cudaAssert(sargs.in_off[table] != NULL); // cudaAssert(sout.out_off[table] != NULL); // cudaAssert(sargs.local_off[0] != NULL); // BlockReadOffsetShuffle<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], sargs.in_off[table], item_val, num_tile_items); // BlockWriteValPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, sout.out_off[table], num_tile_items, range_size); // //materializing global offset (dim) from previous join using local offset table 0 from current join // //TODO: THIS PART DOESN'T WORK BECAUSE sargs.local_off[table] != NULL for every table involved in the query (WE CANNOT DISTINGUISH BETWEEN CURRENT JOIN AND EVERY OTHER JON) // } else if (sargs.in_off[table] != NULL && sargs.local_off[table] == NULL && table > 0) { // cudaAssert(sargs.in_off[table] != NULL); // cudaAssert(sout.out_off[table] != NULL); // BlockReadOffsetShuffle<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], sargs.in_off[table], item_val, num_tile_items); // BlockWriteValPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, sout.out_off[table], num_tile_items, range_size); // //writing global offset (dim) from current jioin // } else if (sargs.in_off[table] == NULL && sargs.local_off[table] != NULL && table > 0) { // BlockWriteValPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[table], item_key, index, sout.out_off[table], num_tile_items, range_size); // } // } // } // }