#include "MultiGPUProcessing.h"
#include "CacheManager.h"
#include "KernelLaunch.h"
#include "QueryOptimizer.h"
//naive approach
//launch shuffling kernel in each gpu
//for each gpu
//check each receiving buffer if ready to launch kernel
//if ready then transfer and launch kernel in the destination gpu
//have a cpu thread for each gpu checking
//if the input buffer is full
//if there is available output buffer
//if both are satisfied, launch a kernel for the particular input buffer
//have a cpu thread for each gpu checking
//if the output buffer is full
//if there is available input buffer on destination gpu
//if both are satisfied, memcpy the buffer
//extension:
//have separate buffer for each kernel type (easy)
//have a condensed and global output buffer for all kernel (kernel has to write the output intelligently to the correct buffer)
//have a single kernel reading from multiple buffer (if available)
//multiple kernel from the same and different operator can run at the same time (each buffer has separate stream)
//buffer full mechanism (tricks: if there are conservatively not enough output buffer, then don't launch the kernel)
CPUGPUProcessing::CPUGPUProcessing(size_t _cache_size, size_t _broadcast_size, size_t _processing_size, size_t _pinned_memsize, ncclComm_t* _comms,
bool _verbose, bool _skipping, bool _reorder) {
skipping = _skipping;
reorder = _reorder;
custom = true;
if (custom) qo = new QueryOptimizer(_cache_size, _broadcast_size, _processing_size, _pinned_memsize, this);
else qo = new QueryOptimizer(_cache_size, 0, 0, 0, this);
cm = qo->cm;
begin_time = chrono::high_resolution_clock::now();
col_idx = new int**[NUM_GPU];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
col_idx[gpu] = new int*[cm->TOT_COLUMN]();
}
all_col_idx = new int***[NUM_GPU];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
all_col_idx[gpu] = new int**[NUM_GPU];
for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
all_col_idx[gpu][gpu_iter] = new int*[NUM_COLUMN]();
for (int column = 0; column < NUM_COLUMN; column++) {
CubDebugExit(cudaSetDevice(gpu));
//cannot do custom malloc since custom malloc will be reset after every query and this metadata persists across queries
CubDebugExit(cudaMalloc((void**) &all_col_idx[gpu][gpu_iter][column], cm->allColumn[column]->total_segment * sizeof(int)));
CubDebugExit(cudaSetDevice(0));
}
}
}
seg_row_to_gpu = new int**[NUM_GPU];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
seg_row_to_gpu[gpu] = new int*[NUM_TABLE]();
for (int table = 0; table < NUM_TABLE; table++) {
CubDebugExit(cudaSetDevice(gpu));
//cannot do custom malloc since custom malloc will be reset after every query and this metadata persists across queries
CubDebugExit(cudaMalloc((void**) &seg_row_to_gpu[gpu][table], cm->allTable[table]->total_segment * sizeof(int)));
CubDebugExit(cudaSetDevice(0));
}
}
verbose = _verbose;
cpu_time = new double[MAX_GROUPS];
gpu_time = new double[MAX_GROUPS];
nvlink_bytes = new double[MAX_GROUPS];
shuffle_time = new double[MAX_GROUPS];
cpu_to_gpu = new unsigned long long[MAX_GROUPS];
gpu_to_cpu = new unsigned long long[MAX_GROUPS];
comms = _comms;
//THIS IS USED FOR BROADCAST
broadcast_idx = new int**[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
broadcast_idx[gpu] = new int*[cm->TOT_COLUMN]();
for (int col = 0; col < NUM_COLUMN; col++) {
broadcast_idx[gpu][col] = (int*) malloc (cm->allColumn[col]->total_segment * sizeof(int));
memset(broadcast_idx[gpu][col], -1, cm->allColumn[col]->total_segment * sizeof(int));
}
}
sg_broadcast_count = 0;
resetTime();
}
void
CPUGPUProcessing::switchDeviceGPUtoCPU(int* h_total_all, int** h_total, int*** off_col, int** h_off_col, int sg, cudaStream_t* stream) {
assert(h_total != NULL);
assert(h_total_all != NULL);
assert(off_col != NULL);
assert(h_off_col != NULL);
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(off_col[gpu] != NULL);
// assert(*(h_total[gpu]) > 0);
*(h_total_all) += *(h_total[gpu]);
}
if (*(h_total_all) == 0) {
return;
}
for (int table = 0; table < NUM_TABLE; table++) {
if (off_col[0][table] != NULL) {
h_off_col[table] = (int*) cm->customCudaHostAlloc<int>(*h_total_all);
}
}
assert(off_col != NULL);
for (int table = 0; table < NUM_TABLE; table++) {
int temp = 0;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(off_col[gpu] != NULL);
if (off_col[gpu][table] != NULL) {
assert(h_off_col[table] != NULL);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(h_off_col[table] + temp, off_col[gpu][table], *(h_total[gpu]) * sizeof(int), cudaMemcpyDeviceToHost, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
temp += *(h_total[gpu]);
}
}
if (off_col[0][table] != NULL) assert(temp == *h_total_all);
}
cout << *h_total_all << endl;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream[gpu]);
CubDebugExit(cudaSetDevice(0));
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Transferred data from GPU to CPU " << time << endl;
}
void
CPUGPUProcessing::switchDeviceCPUtoGPUScatter(int** h_total, int*** off_col, int*** h_off_col_part, int sg, cudaStream_t* stream) {
assert(h_total != NULL);
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
int h_total_all = 0;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
h_total_all += *(h_total[gpu]);
}
if (h_total_all == 0) return;
assert(off_col != NULL);
assert(h_off_col_part != NULL);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(h_off_col_part[gpu] != NULL);
assert(off_col[gpu] != NULL);
for (int table = 0; table < NUM_TABLE; table++) {
if (h_off_col_part[gpu][table] != NULL) {
if (*(h_total[gpu]) > 0) off_col[gpu][table] = (int*) cm->customCudaMalloc<int>(*(h_total[gpu]), gpu);
}
}
}
for (int table = 0; table < NUM_TABLE; table++) {
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (off_col[gpu][table] != NULL) {
assert(h_off_col_part[gpu][table] != NULL);
CubDebugExit(cudaSetDevice(gpu));
if (*(h_total[gpu]) > 0) CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], h_off_col_part[gpu][table], *(h_total[gpu]) * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
}
}
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream[gpu]);
CubDebugExit(cudaSetDevice(0));
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Transferred data from CPU to GPU " << time << endl;
}
void
CPUGPUProcessing::switchDeviceCPUtoGPUBroadcast(int* h_total_all, int** h_total, int*** off_col, int** h_off_col, int sg, cudaStream_t* stream) {
assert(h_total_all != NULL);
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
if (*(h_total_all) == 0) return;
assert(off_col != NULL);
assert(h_off_col != NULL);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(off_col[gpu] != NULL);
for (int table = 0; table < NUM_TABLE; table++) {
if (h_off_col[table] != NULL) {
off_col[gpu][table] = (int*) cm->customCudaMalloc<int>(*(h_total_all), gpu);
}
}
}
// for (int table = 0; table < NUM_TABLE; table++) {
// for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// if (off_col[gpu][table] != NULL) {
// assert(h_off_col[table] != NULL);
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], h_off_col[table], *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
// CubDebugExit(cudaSetDevice(0));
// }
// }
// }
int lead_gpu = 0;
for (int table = 0; table < NUM_TABLE; table++) {
CubDebugExit(cudaSetDevice(lead_gpu));
CubDebugExit(cudaMemcpyAsync(off_col[lead_gpu][table], h_off_col, *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[lead_gpu]));
CubDebugExit(cudaSetDevice(0));
}
CubDebugExit(cudaSetDevice(lead_gpu));
CHECK_ERROR_STREAM(stream[lead_gpu]);
CubDebugExit(cudaSetDevice(0));
for (int table = 0; table < NUM_TABLE; table++) {
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (gpu != lead_gpu && off_col[gpu][table] != NULL) {
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], off_col[lead_gpu][table], *(h_total_all) * sizeof(int), cudaMemcpyDeviceToDevice, stream[gpu]));
nvlink_bytes[sg] += *(h_total_all);
CubDebugExit(cudaSetDevice(0));
}
}
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream[gpu]);
CubDebugExit(cudaSetDevice(0));
*(h_total[gpu]) = *(h_total_all);
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Transferred data from CPU to GPU " << time << endl;
}
void
CPUGPUProcessing::switchDeviceCPUtoGPUBroadcastDim(int* h_total_all, int** h_total, int*** off_col, int* h_off_col, int table, int sg, cudaStream_t* stream) {
assert(h_total_all != NULL);
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
if (*(h_total_all) == 0) return;
assert(off_col != NULL);
assert(h_off_col != NULL);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(off_col[gpu] != NULL);
off_col[gpu][table] = (int*) cm->customCudaMalloc<int>(*(h_total_all), gpu);
}
// for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// if (off_col[gpu][table] != NULL) {
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], h_off_col, *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
// CubDebugExit(cudaSetDevice(0));
// }
// }
int lead_gpu = 0;
CubDebugExit(cudaSetDevice(lead_gpu));
CubDebugExit(cudaMemcpyAsync(off_col[lead_gpu][table], h_off_col, *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[lead_gpu]));
CHECK_ERROR_STREAM(stream[lead_gpu]);
CubDebugExit(cudaSetDevice(0));
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (gpu != lead_gpu && off_col[gpu][table] != NULL) {
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], off_col[lead_gpu][table], *(h_total_all) * sizeof(int), cudaMemcpyDeviceToDevice, stream[gpu]));
nvlink_bytes[sg] += *(h_total_all);
CubDebugExit(cudaSetDevice(0));
}
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream[gpu]);
CubDebugExit(cudaSetDevice(0));
*(h_total[gpu]) = *(h_total_all);
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Transferred data from CPU to GPU " << time << endl;
}
void
CPUGPUProcessing::call_filter_partition_CPU(QueryParams* params, int*** h_off_col_part, int** h_total, int sg, int table) {
ColumnInfo* temp;
assert(h_total != NULL);
for (int i = 0; i < qo->join.size(); i++) {
if (qo->join[i].second->table_id == table) {
temp = qo->join[i].second; break;
}
}
if (qo->select_build[temp].size() == 0) return;
ColumnInfo* column = qo->select_build[temp][0];
int* filter_col = column->col_ptr;
int* output_estimate = new int[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
output_estimate[gpu] = qo->segment_group_each_gpu_count[gpu][table][sg] * SEGMENT_SIZE * params->selectivity[column];
}
SETUP_TIMING();
float time;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (output_estimate[gpu] > 0) {
h_off_col_part[gpu][table] = (int*) cm->customCudaHostAlloc<int>(output_estimate[gpu]);
}
}
cudaEventRecord(start, 0);
int LEN;
if (sg == qo->last_segment[table]) {
LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE;
} else {
LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE;
}
struct filterArgsCPU fargs = {
filter_col, NULL,
params->compare1[column], params->compare2[column], 0, 0,
params->mode[column], 0, params->map_filter_func_host[column], NULL
};
short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(h_total[gpu] != NULL);
}
filter_partition_CPU(fargs, h_off_col_part, LEN, h_total, table, cm->seg_row_to_single_gpu, 0, segment_group_ptr);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(*(h_total[gpu]) <= output_estimate[gpu]);
assert(*(h_total[gpu]) > 0);
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Filter Partition Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
};
void
CPUGPUProcessing::shuffleDataOpt(int*** &off_col, int** h_total,
struct shuffleHelper* shelper, bool first_shuffle, int sg, cudaStream_t* stream) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
assert(NUM_PARTITION == NUM_GPU);
//send to the correct GPU
//count total partition size and update h_total for calling subsequent operator
int* total_partition_size = new int[NUM_PARTITION]();
for (int partition = 0; partition < NUM_PARTITION; partition++) {
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
total_partition_size[partition] += shelper->result_count[gpu][partition];
if (verbose) cout << "result count in gpu " << gpu << " for partition " << partition << " is " << shelper->result_count[gpu][partition] << endl;
// assert(shelper->result_count[gpu][partition] > 0);
}
*(h_total[partition]) = total_partition_size[partition];
// cout << "total partition size for partition " << partition << " is " << total_partition_size[partition] << endl;
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
for (int col = 0; col < NUM_COLUMN; col++) {
if (shelper->out_shuffle_col[gpu][col] != NULL && shelper->temp_col[gpu][col] == NULL) {
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
if (verbose) cout << "allocate temp col gpu " << gpu << " col " << col << " " << total_partition_size[gpu] * MULTIPLIER << endl;
shelper->temp_col[gpu][col] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu);
}
}
for (int table = 0; table < NUM_TABLE; table++) {
// if (shelper->out_shuffle_off[gpu][table] == NULL) cout << "out_shuffle " << gpu << " " << table << endl;
// if (off_col[gpu][table] != NULL) cout << "off col " << gpu << " " << table << endl;
if (shelper->out_shuffle_off[gpu][table] != NULL && off_col[gpu][table] == NULL) {
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
if (verbose) cout << "allocate temp off gpu " << gpu << " table " << table << " " << total_partition_size[gpu] * MULTIPLIER << endl;
off_col[gpu][table] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu);
}
}
}
cudaStream_t** streamopt = new cudaStream_t*[NUM_GPU];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
streamopt[gpu] = new cudaStream_t[NUM_PARTITION];
CubDebugExit(cudaSetDevice(gpu));
for (int partition = 0; partition < NUM_PARTITION; partition++) {
CubDebugExit(cudaStreamCreate(&streamopt[gpu][partition]));
}
}
CubDebugExit(cudaSetDevice(0));
//cudamemcpy from one GPU to all the other GPU (all to all communication)
//gpu is the destination gpu, partition is the source gpu
for (int col = 0; col < NUM_COLUMN; col++) {
if (shelper->out_shuffle_col[0][col] != NULL) {
cout << "Transferring column " << col << endl;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(shelper->temp_col[gpu][col] != NULL);
int temp = 0;
for (int partition = 0; partition < NUM_PARTITION; partition++) {
if (shelper->out_shuffle_col[partition][col][gpu] != NULL){
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(shelper->temp_col[gpu][col] + temp, shelper->out_shuffle_col[partition][col][gpu], shelper->result_count[partition][gpu] * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[partition][gpu]));
if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu];
CubDebugExit(cudaSetDevice(0));
temp += shelper->result_count[partition][gpu];
// cout << "partition " << partition << " gpu " << gpu << " offset " << temp << endl;
}
}
}
}
}
for (int table = 0; table < NUM_TABLE; table++) {
if (shelper->out_shuffle_off[0][table] != NULL) {
cout << "Transferring offset " << table << endl;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(off_col[gpu][table] != NULL);
int temp = 0;
for (int partition = 0; partition < NUM_PARTITION; partition++) {
if (shelper->out_shuffle_off[partition][table][gpu] != NULL){
CubDebugExit(cudaSetDevice(gpu));
// cout << "From " << partition << " To " << gpu << " " << shelper->result_count[partition][gpu] << endl;
CubDebugExit(cudaMemcpyAsync(off_col[gpu][table] + temp, shelper->out_shuffle_off[partition][table][gpu], shelper->result_count[partition][gpu] * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[partition][gpu]));
if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu];
CubDebugExit(cudaSetDevice(0));
temp += shelper->result_count[partition][gpu];
}
}
}
}
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
for (int partition = 0; partition < NUM_PARTITION; partition++) {
CHECK_ERROR_STREAM(streamopt[gpu][partition]);
CubDebugExit(cudaStreamDestroy(streamopt[gpu][partition]));
}
}
CubDebugExit(cudaSetDevice(0));
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Shuffle done" << endl;
}
void
CPUGPUProcessing::shuffleDataNCCL(int*** &off_col, int** h_total,
struct shuffleHelper* shelper, bool first_shuffle, int sg, cudaStream_t* stream) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
assert(NUM_PARTITION == NUM_GPU);
//send to the correct GPU
int** scan_result_count = new int*[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
scan_result_count[gpu] = new int[NUM_PARTITION]();
for (int partition = 0; partition < NUM_PARTITION; partition++) {
if (partition == 0) scan_result_count[gpu][partition] = 0;
else scan_result_count[gpu][partition] = scan_result_count[gpu][partition - 1] + shelper->result_count[partition - 1][gpu];
}
}
//count total partition size and update h_total for calling subsequent operator
int* total_partition_size = new int[NUM_PARTITION]();
for (int partition = 0; partition < NUM_PARTITION; partition++) {
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
total_partition_size[partition] += shelper->result_count[gpu][partition];
if (verbose) cout << "result count in gpu " << gpu << " for partition " << partition << " is " << shelper->result_count[gpu][partition] << endl;
assert(shelper->result_count[gpu][partition] > 0);
}
*(h_total[partition]) = total_partition_size[partition];
// cout << "total partition size for partition " << partition << " is " << total_partition_size[partition] << endl;
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
for (int col = 0; col < NUM_COLUMN; col++) {
if (shelper->out_shuffle_col[gpu][col] != NULL && shelper->temp_col[gpu][col] == NULL) {
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
shelper->temp_col[gpu][col] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu);
}
}
for (int table = 0; table < NUM_TABLE; table++) {
if (shelper->out_shuffle_off[gpu][table] != NULL && off_col[gpu][table] == NULL) {
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
off_col[gpu][table] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu);
}
}
}
ncclGroupStart();
for (int col = 0; col < NUM_COLUMN; col++) {
if (shelper->out_shuffle_col[0][col] != NULL) {
if (verbose) cout << "Transferring column " << col << endl;
//asserting pointers
// for(int gpu = 0; gpu < NUM_GPU; gpu++) {
// assert(shelper->temp_col[gpu][col] != NULL);
// for(int partition = 0; partition < NUM_PARTITION; partition++) {
// assert(shelper->out_shuffle_col[gpu][col][partition] != NULL);
// }
// }
//use nccl all2all communication
// ncclGroupStart();
for(int gpu = 0; gpu < NUM_GPU; gpu++) {
for(int partition = 0; partition < NUM_PARTITION; partition++) {
ncclSend(shelper->out_shuffle_col[gpu][col][partition], shelper->result_count[gpu][partition], ncclInt, partition, comms[gpu], stream[gpu]);
ncclRecv(shelper->temp_col[gpu][col] + scan_result_count[gpu][partition], shelper->result_count[partition][gpu], ncclInt, partition, comms[gpu], stream[gpu]);
if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu];
}
}
// ncclGroupEnd();
}
}
ncclGroupEnd();
ncclGroupStart();
for (int table = 0; table < NUM_TABLE; table++) {
if (shelper->out_shuffle_off[0][table] != NULL) {
if (verbose) cout << "Transferring offset " << table << endl;
//asserting pointers
// for(int gpu = 0; gpu < NUM_GPU; gpu++) {
// assert(off_col[gpu][table] != NULL);
// for(int partition = 0; partition < NUM_PARTITION; partition++) {
// assert(shelper->out_shuffle_off[gpu][table][partition] != NULL);
// }
// }
//use nccl all2all communication
// ncclGroupStart();
for(int gpu = 0; gpu < NUM_GPU; gpu++) {
for(int partition = 0; partition < NUM_PARTITION; partition++) {
ncclSend(shelper->out_shuffle_off[gpu][table][partition], shelper->result_count[gpu][partition], ncclInt, partition, comms[gpu], stream[gpu]);
ncclRecv(off_col[gpu][table] + scan_result_count[gpu][partition], shelper->result_count[partition][gpu], ncclInt, partition, comms[gpu], stream[gpu]);
if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu];
}
}
// ncclGroupEnd();
}
}
ncclGroupEnd();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream[gpu]);
CubDebugExit(cudaSetDevice(0));
}
// for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaDeviceSynchronize());
// }
// CubDebugExit(cudaSetDevice(0));
// assert(0);
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Shuffle done" << endl;
}
void
CPUGPUProcessing::broadcastIdxTransfer(int*** d_broadcast_idx, int*** used_broadcast_idx, cudaStream_t* stream){
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
d_broadcast_idx[gpu] = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
int** tmp = new int*[NUM_COLUMN];
for (int col = 0; col < NUM_COLUMN; col++) {
if (used_broadcast_idx[gpu][col] != NULL && cm->allColumn[col]->table_id > 0) { //only do this for dimension table
tmp[col] = (int*) cm->customCudaMalloc<int>(cm->allColumn[col]->total_segment, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(tmp[col], used_broadcast_idx[gpu][col], cm->allColumn[col]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
}
}
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_broadcast_idx[gpu], tmp, NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
}
}
void
CPUGPUProcessing::broadcastSegments(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
int* toBroadcastCount = new int[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
toBroadcastCount[gpu] = 0;
}
// cout << qo->segment_group_count[table][sg] << endl;
for (int i = 0; i < qo->segment_group_count[table][sg]; i++) {
int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i];
//check if the segment is replicated
//if segment is not replicated (means that it's only in one GPU)
if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) {
assert(cm->segment_row_to_gpu[table][seg_id].size() == 1);
int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (gpu != cur_gpu) {
toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id;
toBroadcastCount[gpu]++;
if (seg_id == cm->allTable[table]->total_segment-1) {
// cout << "here" << endl;
if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) {
broadcast_len[gpu] += SEGMENT_SIZE;
} else {
broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE;
}
// cout << gpu << " " << broadcast_len[gpu] << endl;
} else {
broadcast_len[gpu] += SEGMENT_SIZE;
// cout << gpu << " " << broadcast_len[gpu] << endl;
}
}
}
}
}
assert(broadcast_col != NULL);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(broadcast_col[gpu] != NULL);
if (toBroadcastCount[gpu] > 0) {
for (int col = 0; col < NUM_COLUMN; col++) {
if (used_col_idx[gpu][col] != NULL) {
int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE;
int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED);
// cout << gpu << " " << cm->broadcastPointer[gpu] << endl;
assert((tmp + size) < cm->each_broadcast_size);
broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp;
// cout << cm->broadcastPointer[gpu] << endl;
}
}
}
}
cudaStream_t** streamopt = new cudaStream_t*[NUM_GPU];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
streamopt[gpu] = new cudaStream_t[NUM_GPU];
CubDebugExit(cudaSetDevice(gpu));
for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
CubDebugExit(cudaStreamCreate(&streamopt[gpu][cur_gpu]));
}
}
CubDebugExit(cudaSetDevice(0));
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (toBroadcastCount[gpu] > 0) {
int* src = toBroadcast[gpu];
int* dst = d_toBroadcast[gpu];
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, streamopt[gpu][0]));
CubDebugExit(cudaSetDevice(0));
}
}
CubDebugExit(cudaSetDevice(0));
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
for (int col = 0; col < NUM_COLUMN; col++) {
//check if the column is used in the query at all (assume that used_col_idx is segment group aware)
if (used_col_idx[gpu][col] != NULL) {
int temp = 0;
for (int i = 0; i < toBroadcastCount[gpu]; i++) {
int seg = toBroadcast[gpu][i];
int cur_gpu = cm->seg_row_to_single_gpu[table][seg];
//check if the column actually stored in GPU (for now we assume dimension table is always symmetrical)
assert(cm->segment_list[cur_gpu][col][seg] != -1);
int64_t idx = cm->segment_list[cur_gpu][col][seg];
int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]);
int* dst = broadcast_col[gpu][col] + temp * SEGMENT_SIZE;
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[gpu][cur_gpu]));
if (gpu != cur_gpu) nvlink_bytes[sg] += SEGMENT_SIZE;
CubDebugExit(cudaSetDevice(0));
int64_t start_idx = (broadcast_col[gpu][col] - cm->gpuCache[gpu])/SEGMENT_SIZE;
// cout << start_idx + temp << endl;
broadcast_idx[gpu][col][seg] = start_idx + temp;
// if (col == 11 || col == 15) cout << "col " << col << " " << broadcast_idx[gpu][col][seg] << endl;
temp++;
}
}
}
}
CubDebugExit(cudaSetDevice(0));
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
CHECK_ERROR_STREAM(streamopt[gpu][cur_gpu]);
CubDebugExit(cudaStreamDestroy(streamopt[gpu][cur_gpu]));
}
}
CubDebugExit(cudaSetDevice(0));
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Broadcast done: " << time << endl;
}
void
CPUGPUProcessing::broadcastSegments2(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
vector<vector<int>> shouldBroadcast;
shouldBroadcast.resize(NUM_GPU);
int* toBroadcastCount = new int[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
toBroadcastCount[gpu] = 0;
}
// cout << qo->segment_group_count[table][sg] << endl;
for (int i = 0; i < qo->segment_group_count[table][sg]; i++) {
int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i];
//check if the segment is replicated
//if segment is not replicated (means that it's only in one GPU)
if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) {
assert(cm->segment_row_to_gpu[table][seg_id].size() == 1);
int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// if (gpu != cur_gpu) {
toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id;
toBroadcastCount[gpu]++;
if (seg_id == cm->allTable[table]->total_segment-1) {
if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) {
broadcast_len[gpu] += SEGMENT_SIZE;
} else {
broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE;
}
} else {
broadcast_len[gpu] += SEGMENT_SIZE;
}
// }
}
shouldBroadcast[cur_gpu].push_back(seg_id);
}
}
assert(broadcast_col != NULL);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(broadcast_col[gpu] != NULL);
if (toBroadcastCount[gpu] > 0) {
for (int col = 0; col < NUM_COLUMN; col++) {
if (used_col_idx[gpu][col] != NULL) {
int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE;
int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED);
assert((tmp + size) < cm->each_broadcast_size);
broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp;
}
}
}
}
cudaStream_t** streamopt = new cudaStream_t*[NUM_GPU];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
streamopt[gpu] = new cudaStream_t[NUM_GPU];
CubDebugExit(cudaSetDevice(gpu));
for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
CubDebugExit(cudaStreamCreate(&streamopt[gpu][cur_gpu]));
}
}
CubDebugExit(cudaSetDevice(0));
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (toBroadcastCount[gpu] > 0) {
int* src = toBroadcast[gpu];
int* dst = d_toBroadcast[gpu];
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, streamopt[gpu][0]));
CubDebugExit(cudaSetDevice(0));
}
}
CubDebugExit(cudaSetDevice(0));
// int*** tmp_buffer = new int**[NUM_GPU];
// int scanShouldBroadcast = 0;
// for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
// if (shouldBroadcast[cur_gpu].size() > 0) {
// // cout << cur_gpu << endl;
// tmp_buffer[cur_gpu] = new int*[NUM_COLUMN];
// for (int col = 0; col < NUM_COLUMN; col++) {
// // cout << col << endl;
// if (used_col_idx[cur_gpu][col] != NULL) {
// tmp_buffer[cur_gpu][col] = (int*) cm->customCudaMalloc<int>(shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE, cur_gpu);
// for (int i = 0; i < shouldBroadcast[cur_gpu].size(); i++) {
// int seg_id = shouldBroadcast[cur_gpu][i];
// assert(cm->segment_list[cur_gpu][col][seg_id] != -1);
// int64_t idx = cm->segment_list[cur_gpu][col][seg_id];
// int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]);
// int* dst = tmp_buffer[cur_gpu][col] + i * SEGMENT_SIZE;
// CubDebugExit(cudaSetDevice(cur_gpu));
// CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][cur_gpu]));
// for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
// // if (cur_gpu != dst_gpu) {
// int64_t start_idx = (broadcast_col[dst_gpu][col] - cm->gpuCache[dst_gpu])/SEGMENT_SIZE;
// broadcast_idx[dst_gpu][col][seg_id] = start_idx + scanShouldBroadcast + i;
// // if (col == 18) cout << "dst gpu " << dst_gpu << " " << seg_id << " " << broadcast_idx[dst_gpu][col][seg_id] << endl;
// // }
// }
// }
// }
// }
// scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
// // cout << endl;
// }
// }
int*** tmp_buffer = new int**[NUM_COLUMN];
for (int col = 0; col < NUM_COLUMN; col++) {
if (used_col_idx[0][col] != NULL) {
tmp_buffer[col] = new int*[NUM_GPU];
int scanShouldBroadcast = 0;
for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
if (shouldBroadcast[cur_gpu].size() > 0) {
tmp_buffer[col][cur_gpu] = (int*) cm->customCudaMalloc<int>(shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE, cur_gpu);
for (int i = 0; i < shouldBroadcast[cur_gpu].size(); i++) {
int seg_id = shouldBroadcast[cur_gpu][i];
assert(cm->segment_list[cur_gpu][col][seg_id] != -1);
int64_t idx = cm->segment_list[cur_gpu][col][seg_id];
int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]);
int* dst = tmp_buffer[col][cur_gpu] + i * SEGMENT_SIZE;
CubDebugExit(cudaSetDevice(cur_gpu));
CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][cur_gpu]));
for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
int64_t start_idx = (broadcast_col[dst_gpu][col] - cm->gpuCache[dst_gpu])/SEGMENT_SIZE;
broadcast_idx[dst_gpu][col][seg_id] = start_idx + scanShouldBroadcast + i;
}
}
CubDebugExit(cudaSetDevice(cur_gpu));
CHECK_ERROR_STREAM(streamopt[cur_gpu][cur_gpu]);
CubDebugExit(cudaSetDevice(0));
for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE;
CubDebugExit(cudaSetDevice(cur_gpu));
CubDebugExit(cudaMemcpyAsync(dst, tmp_buffer[col][cur_gpu], shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][dst_gpu]));
if (dst_gpu != cur_gpu) nvlink_bytes[sg] += shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE;
CubDebugExit(cudaSetDevice(0));
}
scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
}
}
}
}
// for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
// CubDebugExit(cudaSetDevice(cur_gpu));
// CHECK_ERROR_STREAM(streamopt[cur_gpu][cur_gpu]);
// CubDebugExit(cudaSetDevice(0));
// }
// scanShouldBroadcast = 0;
// for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
// if (shouldBroadcast[cur_gpu].size() > 0) {
// for (int col = 0; col < NUM_COLUMN; col++) {
// //check if the column is used in the query at all (assume that used_col_idx is segment group aware)
// if (used_col_idx[cur_gpu][col] != NULL) {
// int* src = tmp_buffer[cur_gpu][col];
// for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
// // if (cur_gpu != dst_gpu) {
// // cout << cur_gpu << " " << dst_gpu << endl;
// int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE;
// CubDebugExit(cudaSetDevice(cur_gpu));
// CubDebugExit(cudaMemcpyAsync(dst, src, shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][dst_gpu]));
// CubDebugExit(cudaSetDevice(0));
// // }
// }
// }
// }
// scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
// // cout << scanShouldBroadcast << endl;
// }
// }
// CubDebugExit(cudaSetDevice(0));
// for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// CubDebugExit(cudaSetDevice(gpu));
// for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
// CHECK_ERROR_STREAM(streamopt[gpu][cur_gpu]);
// }
// }
// CubDebugExit(cudaSetDevice(0));
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
CHECK_ERROR_STREAM(streamopt[gpu][cur_gpu]);
CubDebugExit(cudaStreamDestroy(streamopt[gpu][cur_gpu]));
}
}
CubDebugExit(cudaSetDevice(0));
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Broadcast done: " << time << endl;
// assert(0);
}
void
CPUGPUProcessing::broadcastSegmentsNCCL2(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res, cudaStream_t* stream) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
vector<vector<int>> shouldBroadcast;
shouldBroadcast.resize(NUM_GPU);
int* toBroadcastCount = new int[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
toBroadcastCount[gpu] = 0;
}
// cout << qo->segment_group_count[table][sg] << endl;
for (int i = 0; i < qo->segment_group_count[table][sg]; i++) {
int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i];
//check if the segment is replicated
//if segment is not replicated (means that it's only in one GPU)
if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) {
assert(cm->segment_row_to_gpu[table][seg_id].size() == 1);
int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// if (gpu != cur_gpu) {
toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id;
toBroadcastCount[gpu]++;
if (seg_id == cm->allTable[table]->total_segment-1) {
if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) {
broadcast_len[gpu] += SEGMENT_SIZE;
} else {
broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE;
}
} else {
broadcast_len[gpu] += SEGMENT_SIZE;
}
// }
}
shouldBroadcast[cur_gpu].push_back(seg_id);
}
}
assert(broadcast_col != NULL);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(broadcast_col[gpu] != NULL);
if (toBroadcastCount[gpu] > 0) {
for (int col = 0; col < NUM_COLUMN; col++) {
if (used_col_idx[gpu][col] != NULL) {
int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE;
int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED);
assert((tmp + size) < cm->each_broadcast_size);
broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp;
}
}
}
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (toBroadcastCount[gpu] > 0) {
int* src = toBroadcast[gpu];
int* dst = d_toBroadcast[gpu];
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
}
}
CubDebugExit(cudaSetDevice(0));
int*** tmp_buffer = new int**[NUM_GPU];
int scanShouldBroadcast = 0;
for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
if (shouldBroadcast[cur_gpu].size() > 0) {
// cout << cur_gpu << endl;
tmp_buffer[cur_gpu] = new int*[NUM_COLUMN];
for (int col = 0; col < NUM_COLUMN; col++) {
// cout << col << endl;
if (used_col_idx[cur_gpu][col] != NULL) {
tmp_buffer[cur_gpu][col] = (int*) cm->customCudaMalloc<int>(shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE, cur_gpu);
for (int i = 0; i < shouldBroadcast[cur_gpu].size(); i++) {
int seg_id = shouldBroadcast[cur_gpu][i];
assert(cm->segment_list[cur_gpu][col][seg_id] != -1);
int64_t idx = cm->segment_list[cur_gpu][col][seg_id];
int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]);
int* dst = tmp_buffer[cur_gpu][col] + i * SEGMENT_SIZE;
CubDebugExit(cudaSetDevice(cur_gpu));
CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, stream[cur_gpu]));
for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
// if (cur_gpu != dst_gpu) {
int64_t start_idx = (broadcast_col[dst_gpu][col] - cm->gpuCache[dst_gpu])/SEGMENT_SIZE;
broadcast_idx[dst_gpu][col][seg_id] = start_idx + scanShouldBroadcast + i;
// if (col == 18) cout << "dst gpu " << dst_gpu << " " << seg_id << " " << broadcast_idx[dst_gpu][col][seg_id] << endl;
// }
}
}
}
}
scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
// cout << endl;
}
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream[gpu]);
CubDebugExit(cudaSetDevice(0));
}
// for (int col = 0; col < NUM_COLUMN; col++) {
// ncclGroupStart();
// if (used_col_idx[0][col] != NULL) {
// scanShouldBroadcast = 0;
// for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
// if (shouldBroadcast[cur_gpu].size() > 0) {
// for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
// // cudaSetDevice(dst_gpu);
// // int* abc = (int*) cm->customCudaMalloc<int>(2000000, dst_gpu);
// int* ptr = tmp_buffer[cur_gpu][col];
// int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE;
// int count = shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE;
// // cout << count << endl;
// ncclBroadcast(ptr, dst, count, ncclInt32, cur_gpu, comms[dst_gpu], stream[dst_gpu]);
// // CubDebugExit(cudaMemcpyAsync(dst, ptr, count * sizeof(int), cudaMemcpyDeviceToDevice, stream[cur_gpu]));
// }
// scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
// }
// }
// }
// ncclGroupEnd();
// }
ncclGroupStart();
scanShouldBroadcast = 0;
for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
if (shouldBroadcast[cur_gpu].size() > 0) {
for (int col = 0; col < NUM_COLUMN; col++) {
if (used_col_idx[cur_gpu][col] != NULL) {
nvlink_bytes[sg] += shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE * (NUM_GPU-1);
for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
// cudaSetDevice(dst_gpu);
// int* abc = (int*) cm->customCudaMalloc<int>(2000000, dst_gpu);
int* ptr = tmp_buffer[cur_gpu][col];
int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE;
int count = shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE;
// cout << count << endl;
ncclBroadcast(ptr, dst, count, ncclInt32, cur_gpu, comms[dst_gpu], stream[dst_gpu]);
// CubDebugExit(cudaMemcpyAsync(dst, ptr, count * sizeof(int), cudaMemcpyDeviceToDevice, stream[cur_gpu]));
}
}
}
scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
}
}
ncclGroupEnd();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream[gpu]);
CubDebugExit(cudaSetDevice(0));
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Broadcast done: " << time << endl;
// assert(0);
}
void
CPUGPUProcessing::broadcastSegmentsNCCL(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res, cudaStream_t* stream) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
vector<int> shouldBroadcast;
int* toBroadcastCount = new int[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
toBroadcastCount[gpu] = 0;
}
// cout << qo->segment_group_count[table][sg] << endl;
for (int i = 0; i < qo->segment_group_count[table][sg]; i++) {
int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i];
//check if the segment is replicated
//if segment is not replicated (means that it's only in one GPU)
if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) {
assert(cm->segment_row_to_gpu[table][seg_id].size() == 1);
// int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// if (gpu != cur_gpu) {
toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id;
toBroadcastCount[gpu]++;
if (seg_id == cm->allTable[table]->total_segment-1) {
if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) {
broadcast_len[gpu] += SEGMENT_SIZE;
} else {
broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE;
}
} else {
broadcast_len[gpu] += SEGMENT_SIZE;
}
// }
}
shouldBroadcast.push_back(seg_id);
}
}
assert(broadcast_col != NULL);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(broadcast_col[gpu] != NULL);
if (toBroadcastCount[gpu] > 0) {
for (int col = 0; col < NUM_COLUMN; col++) {
if (used_col_idx[gpu][col] != NULL) {
int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE;
int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED);
assert((tmp + size) < cm->each_broadcast_size);
broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp;
}
}
}
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (toBroadcastCount[gpu] > 0) {
int* src = toBroadcast[gpu];
int* dst = d_toBroadcast[gpu];
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
}
}
CubDebugExit(cudaSetDevice(0));
int**** dest = new int***[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
dest[gpu] = new int**[NUM_COLUMN]();
for (int col = 0; col < NUM_COLUMN; col++) {
//check if the column is used in the query at all (assume that used_col_idx is segment group aware)
if (used_col_idx[gpu][col] != NULL) {
int temp = 0;
dest[gpu][col] = new int*[cm->allColumn[col]->total_segment]();
for (int i = 0; i < toBroadcastCount[gpu]; i++) {
int seg_id = toBroadcast[gpu][i];
// int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id];
dest[gpu][col][seg_id] = broadcast_col[gpu][col] + temp * SEGMENT_SIZE;
int64_t start_idx = (broadcast_col[gpu][col] - cm->gpuCache[gpu])/SEGMENT_SIZE;
broadcast_idx[gpu][col][seg_id] = start_idx + temp;
temp++;
}
}
}
}
ncclGroupStart();
for (int col = 0; col < NUM_COLUMN; col++) {
if (used_col_idx[0][col] != NULL) {
for (int i = 0; i < shouldBroadcast.size(); i++) {
int seg_id = shouldBroadcast[i];
int root = cm->seg_row_to_single_gpu[table][seg_id];
int64_t idx = cm->segment_list[root][col][seg_id];
assert(idx != -1);
int* ptr = &(cm->gpuCache[root][idx * SEGMENT_SIZE]);
nvlink_bytes[sg] += SEGMENT_SIZE * (NUM_GPU-1);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (gpu != root) assert(dest[gpu][col][seg_id] != NULL);
ncclBroadcast(ptr, dest[gpu][col][seg_id], SEGMENT_SIZE, ncclInt, root, comms[gpu], stream[gpu]);
}
}
}
}
ncclGroupEnd();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream[gpu]);
CubDebugExit(cudaSetDevice(0));
}
// ncclGroupStart();
// if (rank == root) {
// for (int r=0; r<nranks; r++)
// ncclSend(sendbuff[r], size, type, r, comm, stream);
// }
// ncclRecv(recvbuff, size, type, root, comm, stream);
// ncclGroupEnd();
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Broadcast done: " << time << endl;
// assert(0);
}
void
CPUGPUProcessing::call_operator_dim_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int table, bool filter_on_gpu, bool broadcast, bool already_partitioned, cudaStream_t* stream) {
int* dimkey_idx[NUM_GPU] = {}, *group_idx[NUM_GPU] = {}, *filter_idx[NUM_GPU] = {};
ColumnInfo* key_column = NULL, *filter_col = NULL, *group_col = NULL;
int*** used_col_idx = new int**[NUM_GPU];
float output_selectivity = 1.0;
bool first_shuffle = true;
bool has_shuffled = false;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
used_col_idx[gpu] = new int*[NUM_COLUMN]();
}
for (int i = 0; i < qo->join.size(); i++) {
if (qo->join[i].second->table_id == table) {
key_column = qo->join[i].second; break;
}
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (qo->groupby_build.size() > 0 && qo->groupby_build[key_column].size() > 0) {
if (qo->groupGPUcheck) {
group_col = qo->groupby_build[key_column][0];
cm->indexTransfer(col_idx[gpu], group_col, stream[gpu], gpu, custom);
assert(col_idx[gpu][group_col->column_id] != NULL);
group_idx[gpu] = col_idx[gpu][group_col->column_id];
used_col_idx[gpu][group_col->column_id] = col_idx[gpu][group_col->column_id];
}
}
//WARNING: IT'S POSSIBLE THAT THE FILTER IS FROM CPU
if (filter_on_gpu) {
if (qo->select_build[key_column].size() > 0) {
filter_col = qo->select_build[key_column][0];
cm->indexTransfer(col_idx[gpu], filter_col, stream[gpu], gpu, custom);
assert(col_idx[gpu][filter_col->column_id] != NULL);
filter_idx[gpu] = col_idx[gpu][filter_col->column_id];
used_col_idx[gpu][filter_col->column_id] = col_idx[gpu][filter_col->column_id];
output_selectivity = params->selectivity[filter_col];
}
}
// cout << "call index transfer for column " << key_column << " in gpu " << gpu << endl;
cm->indexTransfer(col_idx[gpu], key_column, stream[gpu], gpu, custom);
assert(col_idx[gpu][key_column->column_id] != NULL);
cpu_to_gpu[sg] += (key_column->total_segment * sizeof(int));
dimkey_idx[gpu] = col_idx[gpu][key_column->column_id];
used_col_idx[gpu][key_column->column_id] = col_idx[gpu][key_column->column_id];
}
//these four structs are helper for shuffling
//output of shuffling (column)
int*** temp_col = new int**[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
temp_col[gpu] = new int*[NUM_COLUMN]();
}
int** result_count = new int*[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
result_count[gpu] = new int[NUM_PARTITION]();
}
//input of partitioning (column) -> only for pipelined
int**** in_shuffle_col = new int***[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
in_shuffle_col[gpu] = new int**[NUM_COLUMN]();
}
//input of partitioning (offset) -> can be global or local depending on needs -> only for pipelined
int**** in_shuffle_off = new int***[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
in_shuffle_off[gpu] = new int**[NUM_TABLE]();
}
//output of partitioning (column)
int**** out_shuffle_col = new int***[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
out_shuffle_col[gpu] = new int**[NUM_COLUMN]();
}
//output of partitioning (offset) -> can be global or local depending on needs
int**** out_shuffle_off = new int***[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
out_shuffle_off[gpu] = new int**[NUM_TABLE]();
}
shuffleHelper* shelper = new shuffleHelper{
result_count, temp_col, NULL, out_shuffle_col, out_shuffle_off, in_shuffle_col, in_shuffle_off
};
int** d_seg_is_replicated = new int*[NUM_GPU];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
d_seg_is_replicated[gpu] = (int*) cm->customCudaMalloc<int>(cm->allTable[table]->total_segment, gpu);
CubDebugExit(cudaMemcpyAsync(d_seg_is_replicated[gpu], cm->seg_is_replicated[table], cm->allTable[table]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
}
filterArgsGPU** fargs = new filterArgsGPU*[NUM_GPU]();
buildArgsGPU** bargs = new buildArgsGPU*[NUM_GPU]();
probeArgsGPU** pargs = new probeArgsGPU*[NUM_GPU]();
groupbyArgsGPU** gargs = new groupbyArgsGPU*[NUM_GPU]();
shuffleArgsGPU** sargs = new shuffleArgsGPU*[NUM_GPU]();
KernelParams** kparams = new KernelParams*[NUM_GPU]();
KernelLaunch** kernellaunch = new KernelLaunch*[NUM_GPU]();
//THIS IS USED FOR BROADCAST
//broadcast_idx universal to all segment group
//broadcast_col local to segment group
//broadcast_len local to segment group
//toBroadcast local to segment group
int** toBroadcast = new int*[NUM_GPU](); //list of segment row that are broadcasted to this GPU
int** d_toBroadcast = new int*[NUM_GPU](); //list of segment row that are broadcasted to this GPU
int*** broadcast_col = new int**[NUM_GPU](); //storing segments that are broadcasted to this GPU
int* broadcast_len = new int[NUM_GPU](); //total segment row that are broadcasted to this GPU
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
toBroadcast[gpu] = new int[cm->allTable[table]->total_segment]();
d_toBroadcast[gpu] = (int*) cm->customCudaMalloc<int>(cm->allTable[table]->total_segment, gpu);
broadcast_col[gpu] = new int*[NUM_COLUMN]();
broadcast_len[gpu] = 0;
}
int** tmp_broadcast_idx_key = new int*[NUM_GPU]();
int** tmp_broadcast_idx_val = new int*[NUM_GPU]();
int** tmp_broadcast_idx_filter = new int*[NUM_GPU]();
if (broadcast) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
// broadcastSegments2(sg, table, broadcast_col, used_col_idx, broadcast_len, toBroadcast, d_toBroadcast, params->res);
if (NUM_GPU == 8) broadcastSegmentsNCCL(sg, table, broadcast_col, used_col_idx, broadcast_len, toBroadcast, d_toBroadcast, params->res, stream);
else broadcastSegments2(sg, table, broadcast_col, used_col_idx, broadcast_len, toBroadcast, d_toBroadcast, params->res);
// __atomic_fetch_add(&sg_broadcast_count, 1, __ATOMIC_RELAXED);
// while (sg_broadcast_count < qo->par_segment_count[table]) {};
// assert(sg_broadcast_count == qo->par_segment_count[table]);
// cout << sg_broadcast_count << endl;
//HAVE TO TRANSFER THE IDX AFTER DOING BROADCAST, THE IDX ONLY VALID FOR THIS SEGMENT GROUP (DIFFERENT FROM THE ONE IN call_operator_fact)
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
tmp_broadcast_idx_key[gpu] = (int*) cm->customCudaMalloc<int>(key_column->total_segment, gpu);
if (group_col != NULL) tmp_broadcast_idx_val[gpu] = (int*) cm->customCudaMalloc<int>(group_col->total_segment, gpu);
if (filter_col != NULL) tmp_broadcast_idx_filter[gpu] = (int*) cm->customCudaMalloc<int>(filter_col->total_segment, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(tmp_broadcast_idx_key[gpu], broadcast_idx[gpu][key_column->column_id], key_column->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
if (group_col != NULL) CubDebugExit(cudaMemcpyAsync(tmp_broadcast_idx_val[gpu], broadcast_idx[gpu][group_col->column_id], group_col->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
if (filter_col != NULL) CubDebugExit(cudaMemcpyAsync(tmp_broadcast_idx_filter[gpu], broadcast_idx[gpu][filter_col->column_id], filter_col->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Broadcast time " << time << endl;
shuffle_time[sg] += time;
}
//WARNING set used_col_idx on filter to NULL so that it doesn't participate in partitioning
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
if (filter_on_gpu && group_col != NULL) {
if (filter_col->column_id != group_col->column_id) {
if (qo->select_build[key_column].size() > 0) {
used_col_idx[gpu][filter_col->column_id] = NULL;
}
}
}
}
// if current join needs shuffling
if (!broadcast && !params->ht_replicated[table] && !already_partitioned) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
assert(NUM_GPU == NUM_PARTITION);
// if this is the first join then we will call rangepartitioningkeyvalue
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//do shuffling in each GPU
//it's possible there is filter before shuffling
if (filter_idx[gpu]) {
fargs[gpu] = new filterArgsGPU{
filter_idx[gpu], NULL,
params->compare1[filter_col], params->compare2[filter_col], 0, 0,
params->mode[filter_col], 0,
NULL, NULL, NULL,
};
} else {
fargs[gpu] = new filterArgsGPU{NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL, NULL};
}
int** d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
int** d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaMemcpyAsync(d_col_idx, used_col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
assert(off_col[gpu] != NULL);
if (off_col[gpu][table] != NULL) {
CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
} else {
d_in_off=NULL;
}
CubDebugExit(cudaSetDevice(0));
if (verbose) cout << "Partitioning with column id " << key_column->column_id << endl;
sargs[gpu] = new shuffleArgsGPU{
d_temp_col, NULL, d_col_idx, d_in_off, NULL, NULL,
key_column->column_id, table, params->min_key[key_column], params->max_key[key_column],
NULL, NULL,
d_seg_is_replicated[gpu], NULL
};
kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total);
kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, Nope, table, output_selectivity, 0, 0, stream[gpu]);
bool pipeline = 0;
// kernellaunch[gpu]->preparePartitioningWithoutCount(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, first_shuffle, pipeline);
kernellaunch[gpu]->countPartitioning(off_col, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, first_shuffle);
kernellaunch[gpu]->preparePartitioningAfterCount(off_col, used_col_idx, qo->joinGPUcheck, first_shuffle);
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
kernellaunch[gpu]->launchPartitioning();
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
kernellaunch[gpu]->clearPartitioning();
delete fargs[gpu];
delete sargs[gpu];
delete kparams[gpu];
delete kernellaunch[gpu];
}
if (NUM_GPU == 8) shuffleDataNCCL(off_col, h_total, shelper, first_shuffle, sg, stream);
else shuffleDataOpt(off_col, h_total, shelper, first_shuffle, sg, stream);
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
first_shuffle = false;
has_shuffled = true;
if (verbose) cout << "Partitioning and Shuffle time " << time << endl;
shuffle_time[sg] += time;
}
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(params->ht_GPU[gpu][key_column] != NULL);
if (filter_idx[gpu] && !has_shuffled) {
fargs[gpu] = new filterArgsGPU{
filter_idx[gpu], NULL,
params->compare1[filter_col], params->compare2[filter_col], 0, 0,
params->mode[filter_col], 0,
broadcast_col[gpu][filter_col->column_id],
NULL, NULL,
};
} else {
fargs[gpu] = new filterArgsGPU{NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL, NULL};
}
if (broadcast) {
bargs[gpu] = new buildArgsGPU{
dimkey_idx[gpu], group_idx[gpu],
params->dim_len_GPU[gpu][key_column], params->min_key_GPU[gpu][key_column], params->max_key[key_column],
NULL,
NULL,
tmp_broadcast_idx_key[gpu],
(group_col != NULL) ? tmp_broadcast_idx_val[gpu] : NULL,
(filter_col != NULL) ? tmp_broadcast_idx_filter[gpu] : NULL
};
} else {
if (!has_shuffled) {
// cout << " here " << endl;
// cout << params->dim_len_GPU[gpu][key_column] << " " << params->min_key_GPU[gpu][key_column] << endl;;
bargs[gpu] = new buildArgsGPU{
dimkey_idx[gpu], group_idx[gpu],
params->dim_len_GPU[gpu][key_column], params->min_key_GPU[gpu][key_column], params->max_key[key_column],
NULL, NULL, NULL, NULL, NULL
};
} else {
bargs[gpu] = new buildArgsGPU{
NULL, NULL,
params->dim_len_GPU[gpu][key_column], params->min_key_GPU[gpu][key_column], params->max_key[key_column],
temp_col[gpu][key_column->column_id],
(group_col != NULL) ? temp_col[gpu][group_col->column_id] : NULL,
NULL, NULL, NULL
};
// assert(group_col != NULL);
// assert(temp_col[gpu][group_col->column_id] != NULL);
}
}
int latemat = 0;
bool aggrGPUcheck = 0;
kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, NULL, h_total);
kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, JustBuild, table, 1, latemat, aggrGPUcheck, stream[gpu]);
kernellaunch[gpu]->prepareKernelDim(off_col, key_column, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, table, has_shuffled);
kernellaunch[gpu]->launchKernel(has_shuffled, broadcast, d_toBroadcast[gpu], broadcast_len[gpu]);
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
int*** temp;
int will_shuffle = 0;
kernellaunch[gpu]->clearKernel(temp, will_shuffle, has_shuffled);
delete fargs[gpu];
delete bargs[gpu];
delete kparams[gpu];
delete kernellaunch[gpu];
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Build time " << time << endl;
gpu_time[sg] += time;
delete[] fargs;
delete[] bargs;
delete[] pargs;
delete[] sargs;
delete[] gargs;
delete[] kparams;
delete[] kernellaunch;
delete shelper;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
delete[] temp_col[gpu];
delete[] result_count[gpu];
delete[] in_shuffle_col[gpu];
delete[] out_shuffle_col[gpu];
delete[] in_shuffle_off[gpu];
delete[] out_shuffle_off[gpu];
delete[] used_col_idx[gpu];
}
delete[] temp_col;
delete[] result_count;
delete[] in_shuffle_col;
delete[] in_shuffle_off;
delete[] out_shuffle_col;
delete[] out_shuffle_off;
delete[] d_seg_is_replicated;
delete[] toBroadcast;
delete[] d_toBroadcast;
delete[] broadcast_col;
delete[] broadcast_len;
delete[] tmp_broadcast_idx_key;
delete[] tmp_broadcast_idx_filter;
delete[] tmp_broadcast_idx_val;
delete[] used_col_idx;
};
void
CPUGPUProcessing::call_filter_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int table, int select_so_far, cudaStream_t* stream) {
if(qo->selectGPUPipelineCol[sg].size() == 0) return;
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
int _compare1[2] = {0}, _compare2[2] = {0};
int *filter_idx[NUM_GPU][2] = {};
float output_selectivity = 1.0;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemsetAsync(d_total[gpu], 0, sizeof(int), stream[gpu]));
}
CubDebugExit(cudaSetDevice(0));
int num_select = qo->selectGPUPipelineCol[sg].size();
int total_select = qo->selectGPUPipelineCol[sg].size() + qo->selectCPUPipelineCol[sg].size();
for (int select = 0; select < num_select; select++) {
if (select_so_far == total_select) break;
ColumnInfo* column = qo->selectGPUPipelineCol[sg][select];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom);
assert(col_idx[gpu][column->column_id] != NULL);
filter_idx[gpu][select_so_far + select] = col_idx[gpu][column->column_id];
}
_compare1[select_so_far + select] = params->compare1[column];
_compare2[select_so_far + select] = params->compare2[column];
output_selectivity *= params->selectivity[column];
}
int has_shuffled = 0;
int will_shuffle = 0;
filterArgsGPU** fargs = new filterArgsGPU*[NUM_GPU]();
buildArgsGPU** bargs = new buildArgsGPU*[NUM_GPU]();
probeArgsGPU** pargs = new probeArgsGPU*[NUM_GPU]();
groupbyArgsGPU** gargs = new groupbyArgsGPU*[NUM_GPU]();
shuffleArgsGPU** sargs = new shuffleArgsGPU*[NUM_GPU]();
shuffleHelper* shelper = new shuffleHelper[1]();
KernelParams** kparams = new KernelParams*[NUM_GPU]();
KernelLaunch** kernellaunch = new KernelLaunch*[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//do shuffling in each GPU
//it's possible there is filter before shuffling
fargs[gpu] = new filterArgsGPU{
filter_idx[gpu][0], filter_idx[gpu][1],
_compare1[0], _compare2[0], _compare1[1], _compare2[1],
1, 1,
NULL, NULL
};
int latemat = 0;
bool aggrGPUcheck = 0;
kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total);
kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, JustFilter, table, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
kernellaunch[gpu]->prepareKernelFact(off_col, NULL, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck,
NULL, NULL, will_shuffle, has_shuffled);
kernellaunch[gpu]->launchKernel(has_shuffled);
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
kernellaunch[gpu]->clearKernel(off_col, will_shuffle, has_shuffled);
delete fargs[gpu];
delete kparams[gpu];
delete kernellaunch[gpu];
}
delete[] fargs;
delete[] bargs;
delete[] pargs;
delete[] sargs;
delete[] gargs;
delete[] kparams;
delete[] kernellaunch;
delete shelper;
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Filter GPU time " << time << endl;
gpu_time[sg] += time;
}
//NOT SUPPORTED YET:
//(1) pipeline filter before partitioning
//(2) operator generate global offset (for example CPU operator or filter in GPU) which will be the input of partitioning
void
CPUGPUProcessing::call_operator_fact_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int select_so_far, int latemat, cudaStream_t* stream) {
int _min_key[NUM_GPU][4] = {0}, _dim_len[NUM_GPU][4] = {0};
int _compare1[2] = {0}, _compare2[2] = {0};
int _min_val[4] = {0}, _unique_val[4] = {0};
int *aggr_idx[NUM_GPU][2] = {}, *group_idx[NUM_GPU][4] = {}, *filter_idx[NUM_GPU][2] = {};
float output_selectivity = 1.0;
bool first_shuffle = true;
bool has_shuffled = false;
int group_col_id[MAX_GROUPBY] = {0};
int aggr_col_id[MAX_AGGR] = {0};
int fkey_col_id[MAX_JOIN] = {0};
for (int fkey_col = 0; fkey_col < MAX_JOIN; fkey_col++) {
fkey_col_id[fkey_col] = -1;
}
//initialize group_col_id and aggr_col_id to -1
for (int group = 0; group < MAX_GROUPBY; group++) {
group_col_id[group] = -1;
}
for (int aggr = 0; aggr < MAX_AGGR; aggr++) {
aggr_col_id[aggr] = -1;
}
int*** used_col_idx = new int**[NUM_GPU];
int**** used_all_col_idx = new int***[NUM_GPU]();
int*** used_broadcast_idx = new int**[NUM_GPU];
int*** d_broadcast_idx = new int**[NUM_GPU];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
used_col_idx[gpu] = new int*[NUM_COLUMN]();
used_broadcast_idx[gpu] = new int*[NUM_COLUMN]();
used_all_col_idx[gpu] = new int**[NUM_GPU]();
for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
used_all_col_idx[gpu][gpu_iter] = new int*[NUM_COLUMN]();
}
}
assert(NUM_GPU == NUM_PARTITION);
// ColumnInfo* filter_col[2] = {};
if(qo->joinGPUPipelineCol[sg].size() == 0) return;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemsetAsync(d_total[gpu], 0, sizeof(int), stream[gpu]));
}
CubDebugExit(cudaSetDevice(0));
int num_join = qo->joinGPUPipelineCol[sg].size();
int num_select = qo->selectGPUPipelineCol[sg].size();
int total_select = qo->selectGPUPipelineCol[sg].size() + qo->selectCPUPipelineCol[sg].size();
bool aggrGPUcheck = qo->groupbyGPUPipelineCol[sg].size() == qo->queryAggrColumn.size();
if (verbose) cout << "aggr gpu check " << aggrGPUcheck << endl;
for (int select = 0; select < num_select; select++) {
if (select_so_far == total_select) break;
ColumnInfo* column = qo->selectGPUPipelineCol[sg][select];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL);
used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id];
}
cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom);
assert(col_idx[gpu][column->column_id] != NULL);
cpu_to_gpu[sg] += (column->total_segment * sizeof(int));
filter_idx[gpu][select_so_far + select] = col_idx[gpu][column->column_id];
// if (latemat != 2) used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id];
}
_compare1[select_so_far + select] = params->compare1[column];
_compare2[select_so_far + select] = params->compare2[column];
output_selectivity *= params->selectivity[column];
// filter_col[select_so_far + select] = column;
}
for (int join = 0; join < num_join; join++) {
ColumnInfo* column = qo->joinGPUPipelineCol[sg][join];
int table_id = qo->fkey_pkey[column]->table_id;
ColumnInfo* pkey = qo->fkey_pkey[column];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL);
used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id];
}
cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom);
assert(col_idx[gpu][column->column_id] != NULL);
cpu_to_gpu[sg] += (column->total_segment * sizeof(int));
used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id];
_min_key[gpu][table_id - 1] = params->min_key_GPU[gpu][pkey];
_dim_len[gpu][table_id - 1] = params->dim_len_GPU[gpu][pkey];
}
}
//have to fix cm->lo_orderdate in case we use synthetic bench
for (int aggr = 0; aggr < qo->aggregation[cm->lo_orderdate].size(); aggr++) {
if (qo->groupGPUcheck && aggrGPUcheck) {
ColumnInfo* column = qo->aggregation[cm->lo_orderdate][aggr];
aggr_col_id[aggr] = column->column_id;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL);
used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id];
}
cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom);
assert(col_idx[gpu][column->column_id] != NULL);
cpu_to_gpu[sg] += (column->total_segment * sizeof(int));
aggr_idx[gpu][aggr] = col_idx[gpu][column->column_id];
if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qo->groupGPUcheck && aggrGPUcheck)) used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id];
used_broadcast_idx[gpu][column->column_id] = broadcast_idx[gpu][column->column_id];
}
}
}
unordered_map<ColumnInfo*, vector<ColumnInfo*>>::iterator it;
for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) {
if (qo->groupGPUcheck && aggrGPUcheck) {
if (it->second.size() > 0) {
ColumnInfo* column = it->second[0];
ColumnInfo* column_key = it->first;
group_col_id[column_key->table_id - 1] = column->column_id;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL);
used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id];
}
cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom);
assert(col_idx[gpu][column->column_id] != NULL);
cpu_to_gpu[sg] += (column->total_segment * sizeof(int));
group_idx[gpu][column_key->table_id - 1] = col_idx[gpu][column->column_id];
if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qo->groupGPUcheck && aggrGPUcheck)) used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id];
used_broadcast_idx[gpu][column->column_id] = broadcast_idx[gpu][column->column_id];
}
_min_val[column_key->table_id - 1] = params->min_val[column_key];
_unique_val[column_key->table_id - 1] = params->unique_val[column_key];
// cout << column->column_id << endl;
}
} else {
if (it->second.size() > 0) {
ColumnInfo* column = it->second[0];
ColumnInfo* column_key = it->first;
group_col_id[column_key->table_id - 1] = column->column_id;
}
}
}
//WARNING: aggr_col_id and group_col_id represent slightly different information
//aggr_col_id != -1 meaning that segment group will do agregation on that column on GPU
//group_col_id != -1 meaning that there will be groupby on that column at some point (doesn't matter whether its in CPU or GPU)
//WARNING: HOW DO WE KNOW THAT SEG IS REPLICATED FOR THE OTHER TABLES???
broadcastIdxTransfer(d_broadcast_idx, used_broadcast_idx, stream);
int** d_seg_is_replicated = new int*[NUM_GPU];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
int table_id = 0;
CubDebugExit(cudaSetDevice(gpu));
d_seg_is_replicated[gpu] = (int*) cm->customCudaMalloc<int>(cm->allTable[table_id]->total_segment, gpu);
CubDebugExit(cudaMemcpyAsync(d_seg_is_replicated[gpu], cm->seg_is_replicated[table_id], cm->allTable[table_id]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
// cout << "seg is replicated " << cm->seg_is_replicated[4][0] << endl;
// assert(cm->seg_is_replicated[4][0] == 1);
}
//these four structs are helper for shuffling
//output of shuffling (column)
int*** temp_col = new int**[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
temp_col[gpu] = new int*[NUM_COLUMN]();
}
//output of shuffling (offset) -> can be global or local depending on needs
//NOW THIS IS REPLACED BY off_col
// int*** temp_off = new int**[NUM_GPU]();
// for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// temp_off[gpu] = new int*[NUM_TABLE]();
// if (off_col != NULL) {
// for (int table = 0; table < NUM_TABLE; table++) {
// temp_off[gpu][table] = off_col[gpu][table];
// }
// }
// }
int** result_count = new int*[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
result_count[gpu] = new int[NUM_PARTITION]();
}
//offset output of local join
int*** join_result_off = new int**[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
join_result_off[gpu] = new int*[NUM_TABLE]();
}
//input of partitioning (column) -> only for pipelined
int**** in_shuffle_col = new int***[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
in_shuffle_col[gpu] = new int**[NUM_COLUMN]();
}
//input of partitioning (offset) -> can be global or local depending on needs -> only for pipelined
int**** in_shuffle_off = new int***[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
in_shuffle_off[gpu] = new int**[NUM_TABLE]();
}
//output of partitioning (column)
int**** out_shuffle_col = new int***[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
out_shuffle_col[gpu] = new int**[NUM_COLUMN]();
}
//output of partitioning (offset) -> can be global or local depending on needs
int**** out_shuffle_off = new int***[NUM_GPU]();
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
out_shuffle_off[gpu] = new int**[NUM_TABLE]();
}
shuffleHelper* shelper = new shuffleHelper{
result_count, temp_col, join_result_off, out_shuffle_col, out_shuffle_off, in_shuffle_col, in_shuffle_off
};
vector<tuple<ColumnInfo*, ColumnInfo*>> pipeline_column;
vector<ColumnInfo*> temp_pipeline;
vector<ColumnInfo*> temp_pipeline2;
if (reorder) {
for (int join = 0; join < num_join; join++) {
ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join];
ColumnInfo* pkey = qo->fkey_pkey[fkey];
if (!params->ht_replicated[pkey->table_id]) {
// cout << pkey->column_name << endl;
temp_pipeline2.push_back(fkey);
// qo->joinGPUPipelineCol[sg].erase( qo->joinGPUPipelineCol[sg].begin() + join );
} else {
temp_pipeline.push_back(fkey);
}
}
qo->joinGPUPipelineCol[sg].clear();
for (int i = 0; i < temp_pipeline.size(); i++) {
qo->joinGPUPipelineCol[sg].push_back(temp_pipeline[i]);
}
for (int i = 0; i < temp_pipeline2.size(); i++) {
qo->joinGPUPipelineCol[sg].push_back(temp_pipeline2[i]);
}
} else {
for (int join = 0; join < num_join; join++) {
ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join];
ColumnInfo* pkey = qo->fkey_pkey[fkey];
if (fkey == cm->lo_suppkey) {
// cout << pkey->column_name << endl;
temp_pipeline2.push_back(fkey);
// qo->joinGPUPipelineCol[sg].erase( qo->joinGPUPipelineCol[sg].begin() + join );
} else {
temp_pipeline.push_back(fkey);
}
}
qo->joinGPUPipelineCol[sg].clear();
for (int i = 0; i < temp_pipeline.size(); i++) {
qo->joinGPUPipelineCol[sg].push_back(temp_pipeline[i]);
}
for (int i = 0; i < temp_pipeline2.size(); i++) {
qo->joinGPUPipelineCol[sg].push_back(temp_pipeline2[i]);
}
}
for (int join = 0; join < qo->joinGPUPipelineCol[sg].size(); join++) {
cout << qo->joinGPUPipelineCol[sg][join]->column_name << endl;
}
// assert(0);
for (int join = 0; join < num_join; join++) {
filterArgsGPU** fargs = new filterArgsGPU*[NUM_GPU]();
buildArgsGPU** bargs = new buildArgsGPU*[NUM_GPU]();
probeArgsGPU** pargs = new probeArgsGPU*[NUM_GPU]();
groupbyArgsGPU** gargs = new groupbyArgsGPU*[NUM_GPU]();
shuffleArgsGPU** sargs = new shuffleArgsGPU*[NUM_GPU]();
KernelParams** kparams = new KernelParams*[NUM_GPU]();
KernelLaunch** kernellaunch = new KernelLaunch*[NUM_GPU]();
ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join];
int table_id = qo->fkey_pkey[fkey]->table_id;
ColumnInfo* pkey = qo->fkey_pkey[fkey];
int next_table_id = -1; ColumnInfo* next_fkey = NULL, *next_pkey = NULL;
if (join != num_join - 1) {
next_fkey = qo->joinGPUPipelineCol[sg][join+1];
next_pkey = qo->fkey_pkey[next_fkey];
next_table_id = qo->fkey_pkey[next_fkey]->table_id;
}
// if current join needs shuffling
if (join == 0 && !params->ht_replicated[table_id]) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
assert(NUM_GPU == NUM_PARTITION);
// if this is the first join then we will call rangepartitioningkeyvalue
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//do shuffling in each GPU
//it's possible there is filter before shuffling
fargs[gpu] = new filterArgsGPU{
filter_idx[gpu][0], filter_idx[gpu][1],
_compare1[0], _compare2[0], _compare1[1], _compare2[1],
1, 1,
NULL, NULL
};
int** d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
int** d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
int** d_local_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaMemcpyAsync(d_col_idx, used_col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
assert(off_col[gpu] != NULL);
if (off_col[gpu][0] != NULL) {
CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
} else {
d_in_off=NULL;
}
CubDebugExit(cudaMemcpyAsync(d_local_off, join_result_off[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
if (verbose) cout << "Partitioning with column id " << fkey->column_id << endl;
sargs[gpu] = new shuffleArgsGPU{
d_temp_col, NULL, d_col_idx, d_in_off, NULL, d_local_off,
fkey->column_id, fkey->table->table_id, params->min_key[pkey], params->max_key[pkey],
NULL, NULL,
d_seg_is_replicated[gpu], d_broadcast_idx[gpu]
};
kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total);
// if (join == 0 && num_select > 0) { //there is filter before this join
// assert(0); //not supported for now
// } else {
kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, Nope, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
// }
// kernellaunch[gpu]->countPartitioning(qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, first_shuffle);
bool pipeline = 0;
kernellaunch[gpu]->preparePartitioningWithoutCount(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, first_shuffle, pipeline);
}
// for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// kernellaunch[gpu]->synchronizePartitioning();
// }
// for (int gpu = 0; gpu < NUM_GPU; gpu++) {
// kernellaunch[gpu]->preparePartitioningAfterCount(off_col, used_col_idx, qo->joinGPUcheck, first_shuffle);
// }
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
kernellaunch[gpu]->launchPartitioning();
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
kernellaunch[gpu]->clearPartitioning();
output_selectivity = 1.0;
delete fargs[gpu];
delete sargs[gpu];
delete kparams[gpu];
delete kernellaunch[gpu];
}
// cudaEventRecord(stop, 0);
// cudaEventSynchronize(stop);
// cudaEventElapsedTime(&time, start, stop);
// if (verbose) cout << "Partitioning time " << time << endl;
// cudaEventRecord(start, 0);
if (NUM_GPU == 8) shuffleDataNCCL(off_col, h_total, shelper, first_shuffle, sg, stream);
else shuffleDataOpt(off_col, h_total, shelper, first_shuffle, sg, stream);
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
first_shuffle = false;
has_shuffled = true;
if (verbose) cout << "Shuffle time " << time << endl;
shuffle_time[sg] += time;
}
//if this is the last join
if (next_table_id == -1 || (join + 1) == num_join) { //this is the last join
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
//do join+groupby (either pipelined or individual)
pipeline_column.push_back(make_tuple(fkey, pkey));
int *ht[NUM_GPU][MAX_JOIN] = {}, *fkey_idx[NUM_GPU][MAX_JOIN] = {}; //initialize it to null
int *key_column[NUM_GPU][MAX_JOIN] = {}, *group_column[NUM_GPU][MAX_GROUPBY] = {}, *aggr_column[NUM_GPU][MAX_AGGR] = {};
for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) {
ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]);
ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]);
assert(fkey_col_id[pkey_pipeline->table_id - 1] == -1);
fkey_col_id[pkey_pipeline->table_id - 1] = fkey_pipeline->column_id;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(col_idx[gpu][fkey_pipeline->column_id] != NULL);
assert(params->ht_GPU[gpu][pkey_pipeline] != NULL);
fkey_idx[gpu][pkey_pipeline->table_id -1] = col_idx[gpu][fkey_pipeline->column_id];
ht[gpu][pkey_pipeline->table_id - 1] = params->ht_GPU[gpu][pkey_pipeline];
if (has_shuffled) {
key_column[gpu][pkey_pipeline->table_id - 1] = temp_col[gpu][fkey_pipeline->column_id];
assert(temp_col[gpu][fkey_pipeline->column_id] != NULL);
}
}
// cout << fkey_pipeline->column_name << " " << params->selectivity[fkey_pipeline]<< endl;
output_selectivity *= params->selectivity[fkey_pipeline];
// cout << output_selectivity << endl;
}
if (has_shuffled) {
if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qo->groupGPUcheck && aggrGPUcheck)) {
//have to fix cm->lo_orderdate in case we use synthetic bench
if (qo->groupGPUcheck && aggrGPUcheck) {
for (int aggr = 0; aggr < qo->aggregation[cm->lo_orderdate].size(); aggr++) {
ColumnInfo* column = qo->aggregation[cm->lo_orderdate][aggr];
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(temp_col[gpu][column->column_id] != NULL);
aggr_column[gpu][aggr] = temp_col[gpu][column->column_id];
}
}
for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) {
if (it->second.size() > 0) {
ColumnInfo* column = it->second[0];
ColumnInfo* column_key = it->first;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
group_column[gpu][column_key->table_id - 1] = temp_col[gpu][column->column_id];
}
}
}
}
}
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
int**d_in_off = NULL;
int *d_group_col_id = NULL, *d_aggr_col_id = NULL, *d_fkey_col_id = NULL;
int*** d_all_col_idx = NULL, **d_seg_row_to_gpu = NULL;
if (has_shuffled) {
d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
CubDebugExit(cudaSetDevice(gpu));
assert(off_col[gpu] != NULL);
if (off_col[gpu][0] != NULL) {
CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
} else {
d_in_off=NULL;
}
CubDebugExit(cudaSetDevice(0));
d_group_col_id = (int*) cm->customCudaMalloc<int>(MAX_GROUPBY, gpu);
d_aggr_col_id = (int*) cm->customCudaMalloc<int>(MAX_AGGR, gpu);
d_fkey_col_id = (int*) cm->customCudaMalloc<int>(MAX_JOIN, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_group_col_id, group_col_id, MAX_GROUPBY * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaMemcpyAsync(d_aggr_col_id, aggr_col_id, MAX_AGGR * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaMemcpyAsync(d_fkey_col_id, fkey_col_id, MAX_JOIN * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
d_all_col_idx = (int***) cm->customCudaMalloc<int**>(NUM_GPU, gpu);
int*** tmp_all_col_idx = new int**[NUM_GPU];
for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
tmp_all_col_idx[gpu_iter] = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(tmp_all_col_idx[gpu_iter], used_all_col_idx[gpu][gpu_iter], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
}
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_all_col_idx, tmp_all_col_idx, NUM_GPU * sizeof(int**), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
d_seg_row_to_gpu = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_seg_row_to_gpu, seg_row_to_gpu[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
}
int** d_key_column = (int**) cm->customCudaMalloc<int*>(MAX_JOIN, gpu);
int** d_group_column = (int**) cm->customCudaMalloc<int*>(MAX_GROUPBY, gpu);
int** d_aggr_column = (int**) cm->customCudaMalloc<int*>(MAX_AGGR, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_key_column, key_column[gpu], MAX_JOIN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaMemcpyAsync(d_group_column, group_column[gpu], MAX_GROUPBY * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaMemcpyAsync(d_aggr_column, aggr_column[gpu], MAX_AGGR * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
pargs[gpu] = new probeArgsGPU{ //initialize pargs for this gpu
fkey_idx[gpu][0], fkey_idx[gpu][1], fkey_idx[gpu][2], fkey_idx[gpu][3],
ht[gpu][0], ht[gpu][1], ht[gpu][2], ht[gpu][3],
_dim_len[gpu][0], _dim_len[gpu][1], _dim_len[gpu][2], _dim_len[gpu][3],
_min_key[gpu][0], _min_key[gpu][1], _min_key[gpu][2], _min_key[gpu][3],
d_key_column, d_fkey_col_id
};
gargs[gpu] = new groupbyArgsGPU{
aggr_idx[gpu][0], aggr_idx[gpu][1],
group_idx[gpu][0], group_idx[gpu][1], group_idx[gpu][2], group_idx[gpu][3],
_min_val[0], _min_val[1], _min_val[2], _min_val[3],
_unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3],
params->total_val, params->mode_group,
d_group_column, d_aggr_column, d_group_col_id, d_aggr_col_id,
params->d_group_func
};
fargs[gpu] = new filterArgsGPU{
filter_idx[gpu][0], filter_idx[gpu][1],
_compare1[0], _compare2[0], _compare1[1], _compare2[1],
1, 1,
NULL, NULL
};
sargs[gpu] = new shuffleArgsGPU{
NULL, NULL, NULL, d_in_off, NULL, NULL,
-1, -1, -1, -1,
d_all_col_idx, d_seg_row_to_gpu,
d_seg_is_replicated[gpu], d_broadcast_idx[gpu],
};
kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total);
// if (join == 0 && num_select > 0) { //there is filter before this join (meaning that the last join is the first join and it does not require shuffling)
// assert(0); //not supported for now
// } else {
// cout << "groupgpucheck " << qo->groupGPUcheck << endl;
if (qo->groupby_build.size() > 0 && qo->groupGPUcheck && aggrGPUcheck) kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, ProbeGroupby, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
else if (qo->aggregation[cm->lo_orderdate].size() > 0 && qo->groupGPUcheck && aggrGPUcheck) kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, ProbeAggr, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
else kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, JustProbe, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
// }
int will_shuffle = 0;
// cout << " Here " << endl;
kernellaunch[gpu]->prepareKernelFact(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck,
fkey_col_id, group_col_id, will_shuffle, has_shuffled);
if (has_shuffled) {
kernellaunch[gpu]->launchKernel(has_shuffled);
} else {
kernellaunch[gpu]->launchKernel(has_shuffled);
}
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
int will_shuffle = 0;
kernellaunch[gpu]->clearKernel(off_col, will_shuffle, has_shuffled);
output_selectivity = 1.0;
delete pargs[gpu];
delete fargs[gpu];
delete gargs[gpu];
delete sargs[gpu];
delete kparams[gpu];
delete kernellaunch[gpu];
}
pipeline_column.clear();
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Last Probe time " << time << endl;
gpu_time[sg] += time;
// if this is not the last join but the next join need shuffling (current join cannot be pipelined with next join)
} else if (!params->ht_replicated[next_table_id]) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
//do join (either pipelined or individual)
pipeline_column.push_back(make_tuple(fkey, pkey));
int *ht[NUM_GPU][4] = {}, *fkey_idx[NUM_GPU][4] = {}; //initialize it to null
int *key_column[NUM_GPU][MAX_JOIN] = {};
for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) {
ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]);
ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]);
assert(fkey_col_id[pkey_pipeline->table_id - 1] == -1);
fkey_col_id[pkey_pipeline->table_id - 1] = fkey_pipeline->column_id;
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(col_idx[gpu][fkey_pipeline->column_id] != NULL);
assert(params->ht_GPU[gpu][pkey_pipeline] != NULL);
fkey_idx[gpu][pkey_pipeline->table_id -1] = col_idx[gpu][fkey_pipeline->column_id];
ht[gpu][pkey_pipeline->table_id - 1] = params->ht_GPU[gpu][pkey_pipeline];
if (has_shuffled) {
assert(temp_col[gpu][fkey_pipeline->column_id] != NULL);
key_column[gpu][pkey_pipeline->table_id - 1] = temp_col[gpu][fkey_pipeline->column_id];
}
}
output_selectivity *= params->selectivity[fkey_pipeline];
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
int** d_temp_col = NULL, **d_in_off = NULL;
if (has_shuffled) {
d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
assert(off_col[gpu] != NULL);
if (off_col[gpu][0] != NULL) {
CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
} else {
d_in_off=NULL;
}
CubDebugExit(cudaSetDevice(0));
}
int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
int** d_key_column = (int**) cm->customCudaMalloc<int*>(MAX_JOIN, gpu);
int* d_fkey_col_id = (int*) cm->customCudaMalloc<int>(MAX_JOIN, gpu);
int* d_group_col_id = (int*) cm->customCudaMalloc<int>(MAX_GROUPBY, gpu);
int* d_aggr_col_id = (int*) cm->customCudaMalloc<int>(MAX_AGGR, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_key_column, key_column[gpu], MAX_JOIN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaMemcpyAsync(d_col_idx, used_col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaMemcpyAsync(d_fkey_col_id, fkey_col_id, MAX_JOIN * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaMemcpyAsync(d_group_col_id, group_col_id, MAX_GROUPBY * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaMemcpyAsync(d_aggr_col_id, aggr_col_id, MAX_AGGR * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
CubDebugExit(cudaSetDevice(0));
fargs[gpu] = new filterArgsGPU{
filter_idx[gpu][0], filter_idx[gpu][1],
_compare1[0], _compare2[0], _compare1[1], _compare2[1],
1, 1,
NULL, NULL
};
pargs[gpu] = new probeArgsGPU{ //initialize pargs for this gpu
fkey_idx[gpu][0], fkey_idx[gpu][1], fkey_idx[gpu][2], fkey_idx[gpu][3],
ht[gpu][0], ht[gpu][1], ht[gpu][2], ht[gpu][3],
_dim_len[gpu][0], _dim_len[gpu][1], _dim_len[gpu][2], _dim_len[gpu][3],
_min_key[gpu][0], _min_key[gpu][1], _min_key[gpu][2], _min_key[gpu][3],
d_key_column, d_fkey_col_id
};
gargs[gpu] = new groupbyArgsGPU{
NULL, NULL,
NULL, NULL, NULL, NULL,
_min_val[0], _min_val[1], _min_val[2], _min_val[3],
_unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3],
params->total_val, params->mode_group,
NULL, NULL, d_group_col_id, d_aggr_col_id,
NULL
};
sargs[gpu] = new shuffleArgsGPU{
d_temp_col, NULL, d_col_idx, d_in_off, NULL, NULL,
next_fkey->column_id, next_fkey->table->table_id, params->min_key[next_pkey], params->max_key[next_pkey],
NULL, NULL,
d_seg_is_replicated[gpu], d_broadcast_idx[gpu]
};
kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total);
kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, ProbePartition, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
int will_shuffle = 1;
// for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL);
kernellaunch[gpu]->prepareKernelFact(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck,
fkey_col_id, group_col_id, will_shuffle, has_shuffled);
// for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL);
kernellaunch[gpu]->launchKernel(has_shuffled);
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
int will_shuffle = 1;
// for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL);
kernellaunch[gpu]->clearKernel(off_col, will_shuffle, has_shuffled);
// for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL);
output_selectivity = 1.0;
delete fargs[gpu];
delete sargs[gpu];
delete pargs[gpu];
delete kparams[gpu];
delete kernellaunch[gpu];
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Probe Partition time " << time << endl;
gpu_time[sg] += time;
// assert(0);
cudaEventRecord(start, 0);
//WARNING: WE NEED TO ELIMINATE ALL COLUMNS WHICH HAVE BEEN USED FOR THIS JOIN
if (latemat == 2) {
for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) {
ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]);
ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]);
assert(fkey_col_id[pkey_pipeline->table_id - 1] > 0);
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
assert(used_col_idx[gpu][fkey_pipeline->column_id] != NULL);
assert(out_shuffle_col[gpu][fkey_pipeline->column_id] != NULL);
used_col_idx[gpu][fkey_pipeline->column_id] = NULL;
out_shuffle_col[gpu][fkey_pipeline->column_id] = NULL;
if (has_shuffled) {
assert(temp_col[gpu][fkey_pipeline->column_id] != NULL);
temp_col[gpu][fkey_pipeline->column_id] = NULL;
}
}
fkey_col_id[pkey_pipeline->table_id - 1] = fkey_col_id[pkey_pipeline->table_id - 1] * -1 * 2;
}
}
// if this is not the first join then we will just shuffle right away (we already partition it after probe)
if (NUM_GPU == 8) shuffleDataNCCL(off_col, h_total, shelper, first_shuffle, sg, stream);
else shuffleDataOpt(off_col, h_total, shelper, first_shuffle, sg, stream);
first_shuffle = false;
has_shuffled = true;
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Shuffle time " << time << endl;
shuffle_time[sg] += time;
pipeline_column.clear();
// if next join does not need shuffling (current join can be pipelined with next join)
} else {
pipeline_column.push_back({fkey, pkey});
}
delete[] fargs;
delete[] bargs;
delete[] pargs;
delete[] gargs;
delete[] sargs;
delete[] kparams;
delete[] kernellaunch;
}
for (int gpu = 0; gpu < NUM_GPU; gpu++) {
delete[] temp_col[gpu];
delete[] result_count[gpu];
delete[] in_shuffle_col[gpu];
delete[] out_shuffle_col[gpu];
delete[] in_shuffle_off[gpu];
delete[] out_shuffle_off[gpu];
delete[] used_col_idx[gpu];
delete[] used_broadcast_idx[gpu];
for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
delete[] used_all_col_idx[gpu][gpu_iter];
}
delete[] used_all_col_idx[gpu];
}
delete[] temp_col;
delete[] result_count;
delete[] in_shuffle_col;
delete[] in_shuffle_off;
delete[] out_shuffle_col;
delete[] out_shuffle_off;
delete[] d_seg_is_replicated;
delete[] used_col_idx;
delete[] used_broadcast_idx;
delete[] used_all_col_idx;
delete[] d_broadcast_idx;
}
void
CPUGPUProcessing::resetTime() {
for (int sg = 0 ; sg < MAX_GROUPS; sg++) {
cpu_time[sg] = 0;
gpu_time[sg] = 0;
cpu_to_gpu[sg] = 0;
gpu_to_cpu[sg] = 0;
nvlink_bytes[sg] = 0;
shuffle_time[sg] = 0;
}
gpu_total = 0;
cpu_total = 0;
cpu_to_gpu_total = 0;
gpu_to_cpu_total = 0;
execution_total = 0;
optimization_total = 0;
merging_total = 0;
preparing_total = 0;
nvlink_total = 0;
shuffle_total = 0;
}
void
CPUGPUProcessing::call_pfilter_probe_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) {
int **off_col_out;
int _min_key[4] = {0}, _dim_len[4] = {0};
int *ht[4] = {}, *fkey_col[4] = {};
int out_total = 0;
ColumnInfo *filter_col[2] = {};
int _compare1[2] = {0}, _compare2[2] = {0};
float output_selectivity = 1.0;
int output_estimate = 0;
if(qo->joinCPUPipelineCol[sg].size() == 0) return;
off_col_out = new int*[cm->TOT_TABLE] (); //initialize to null
for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) {
if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break;
ColumnInfo* column = qo->selectCPUPipelineCol[sg][i];
filter_col[select_so_far + i] = column;
_compare1[select_so_far + i] = params->compare1[column];
_compare2[select_so_far + i] = params->compare2[column];
output_selectivity *= params->selectivity[column];
}
for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) {
ColumnInfo* column = qo->joinCPUPipelineCol[sg][i];
int table_id = qo->fkey_pkey[column]->table_id;
fkey_col[table_id - 1] = column->col_ptr;
ColumnInfo* pkey = qo->fkey_pkey[column];
ht[table_id - 1] = params->ht_CPU[pkey];
_min_key[table_id - 1] = params->min_key[pkey];
_dim_len[table_id - 1] = params->dim_len[pkey];
output_selectivity *= params->selectivity[column];
}
struct filterArgsCPU fargs = {
(filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL),
(filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL),
_compare1[0], _compare2[0], _compare1[1], _compare2[1],
1, 1,
(filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL),
(filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL)
};
struct probeArgsCPU pargs = {
fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3],
ht[0], ht[1], ht[2], ht[3],
_dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
_min_key[0], _min_key[1], _min_key[2], _min_key[3]
};
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity;
for (int i = 0; i < cm->TOT_TABLE; i++) {
if (i == 0 || qo->joinCPUcheck[i]) {
if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault));
if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
}
}
} else {
assert(*h_total > 0);
output_estimate = *h_total * output_selectivity;
for (int i = 0; i < cm->TOT_TABLE; i++) {
if (h_off_col[i] != NULL || i == 0 || qo->joinCPUcheck[i]) {
if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault));
if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
}
}
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
struct offsetCPU out_off = {
off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4]
};
int LEN;
if (sg == qo->last_segment[0]) {
LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
} else {
LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
}
short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);
filter_probe_CPU(
fargs, pargs, out_off, LEN, &out_total, 0, segment_group_ptr);
} else {
assert(*h_total > 0);
struct offsetCPU in_off = {
h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4]
};
struct offsetCPU out_off = {
off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4]
};
filter_probe_CPU2(
in_off, fargs, pargs, out_off, *h_total, &out_total, 0);
if (!custom) {
for (int i = 0; i < cm->TOT_TABLE; i++) {
if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]);
}
}
}
h_off_col = off_col_out;
for (int i = 0; i < cm->TOT_TABLE; i++)
h_off_col[i] = off_col_out[i];
*h_total = out_total;
if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg << endl;
assert(*h_total <= output_estimate);
// assert(*h_total > 0);
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Filter Probe Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
};
void
CPUGPUProcessing::call_probe_group_by_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) {
if (*h_total == 0) return;
int _min_key[4] = {0}, _dim_len[4] = {0};
int *ht[4] = {}, *fkey_col[4] = {};
int _min_val[4] = {0}, _unique_val[4] = {0};
int *aggr_col[2] = {}, *group_col[4] = {};
for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) {
ColumnInfo* column = qo->joinCPUPipelineCol[sg][i];
int table_id = qo->fkey_pkey[column]->table_id;
fkey_col[table_id - 1] = column->col_ptr;
ColumnInfo* pkey = qo->fkey_pkey[column];
ht[table_id - 1] = params->ht_CPU[pkey];
_min_key[table_id - 1] = params->min_key[pkey];
_dim_len[table_id - 1] = params->dim_len[pkey];
}
for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
aggr_col[i] = column->col_ptr;
}
unordered_map<ColumnInfo*, vector<ColumnInfo*>>::iterator it;
for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) {
if (it->second.size() > 0) {
ColumnInfo* column = it->second[0];
ColumnInfo* column_key = it->first;
group_col[column_key->table_id - 1] = column->col_ptr;
_min_val[column_key->table_id - 1] = params->min_val[column_key];
_unique_val[column_key->table_id - 1] = params->unique_val[column_key];
}
}
struct probeArgsCPU pargs = {
fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3],
ht[0], ht[1], ht[2], ht[3],
_dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
_min_key[0], _min_key[1], _min_key[2], _min_key[3]
};
struct groupbyArgsCPU gargs = {
aggr_col[0], aggr_col[1], group_col[0], group_col[1], group_col[2], group_col[3],
_min_val[0], _min_val[1], _min_val[2], _min_val[3],
_unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3],
params->total_val, params->mode_group, params->h_group_func
};
float time;
SETUP_TIMING();
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
int LEN;
if (sg == qo->last_segment[0]) {
LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
} else {
LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
}
short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);
// ColumnInfo* key_column = cm->s_suppkey;
// int count = 0;
// for (int i = 0; i < cm->s_suppkey->LEN; i+=2) {
// if (ht[0][i+1] != 0) {
// // cout << params->ht_CPU[key_column][i] << " " << params->ht_CPU[key_column][i+1] << endl;
// count++;
// }
// }
// cout << cm->s_suppkey->LEN << " " << count << endl;
// cout << endl;
// key_column = cm->c_custkey;
// count = 0;
// for (int i = 0; i < cm->c_custkey->LEN; i+=2) {
// if (ht[1][i+1] != 0) {
// // cout << params->ht_CPU[key_column][i] << " " << params->ht_CPU[key_column][i+1] << endl;
// count++;
// }
// }
// cout << cm->c_custkey->LEN << " " << count << endl;
// cout << endl;
probe_group_by_CPU(pargs, gargs, LEN , params->res, 0, segment_group_ptr);
} else {
struct offsetCPU offset = {
h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4]
};
probe_group_by_CPU2(offset, pargs, gargs, *h_total, params->res, 0);
if (!custom) {
for (int i = 0; i < cm->TOT_TABLE; i++) {
if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]);
}
}
}
cudaEventRecord(stop, 0); // Stop time measuring
cudaEventSynchronize(stop); // Wait until the completion of all device
// work preceding the most recent call to cudaEventRecord()
cudaEventElapsedTime(&time, start, stop); // Saving the time measured
if (verbose) cout << "Probe Group Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
};
void
CPUGPUProcessing::call_probe_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) {
int **off_col_out;
int _min_key[4] = {0}, _dim_len[4] = {0};
int *ht[4] = {}, *fkey_col[4] = {};
int out_total = 0;
float output_selectivity = 1.0;
int output_estimate = 0;
if(qo->joinCPUPipelineCol[sg].size() == 0) return;
off_col_out = new int*[cm->TOT_TABLE] (); //initialize to null
for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) {
ColumnInfo* column = qo->joinCPUPipelineCol[sg][i];
int table_id = qo->fkey_pkey[column]->table_id;
fkey_col[table_id - 1] = column->col_ptr;
ColumnInfo* pkey = qo->fkey_pkey[column];
ht[table_id - 1] = params->ht_CPU[pkey];
_min_key[table_id - 1] = params->min_key[pkey];
_dim_len[table_id - 1] = params->dim_len[pkey];
output_selectivity *= params->selectivity[column];
}
struct probeArgsCPU pargs = {
fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3],
ht[0], ht[1], ht[2], ht[3],
_dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
_min_key[0], _min_key[1], _min_key[2], _min_key[3]
};
float time;
SETUP_TIMING();
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity;
for (int i = 0; i < cm->TOT_TABLE; i++) {
if (i == 0 || qo->joinCPUcheck[i]) {
if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault));
if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
}
}
} else {
assert(*h_total > 0);
output_estimate = *h_total * output_selectivity;
for (int i = 0; i < cm->TOT_TABLE; i++) {
if (h_off_col[i] != NULL || i == 0 || qo->joinCPUcheck[i]) {
if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault));
if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
}
}
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
// output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity;
// for (int i = 0; i < cm->TOT_TABLE; i++) {
// if (i == 0 || qo->joinCPUcheck[i]) {
// if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
// }
// }
struct offsetCPU out_off = {
off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4]
};
int LEN;
if (sg == qo->last_segment[0]) {
LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
} else {
LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
}
short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);
probe_CPU(pargs, out_off, LEN, &out_total, 0, segment_group_ptr);
} else {
assert(*h_total > 0);
// output_estimate = *h_total * output_selectivity;
// for (int i = 0; i < cm->TOT_TABLE; i++) {
// if (h_off_col[i] != NULL || i == 0 || qo->joinCPUcheck[i]) {
// if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
// }
// }
struct offsetCPU in_off = {
h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4]
};
struct offsetCPU out_off = {
off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4]
};
probe_CPU2(in_off, pargs, out_off, *h_total, &out_total, 0);
if (!custom) {
for (int i = 0; i < cm->TOT_TABLE; i++) {
if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]);
}
}
}
h_off_col = off_col_out;
for (int i = 0; i < cm->TOT_TABLE; i++)
h_off_col[i] = off_col_out[i];
*h_total = out_total;
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg << endl;
assert(*h_total <= output_estimate);
// assert(*h_total > 0);
if (verbose) cout << "Probe Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
};
//WONT WORK IF JOIN HAPPEN BEFORE FILTER (ONLY WRITE OUTPUT AS A SINGLE COLUMN OFF_COL_OUT[0])
void
CPUGPUProcessing::call_pfilter_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) {
int **off_col_out;
ColumnInfo *filter_col[2] = {};
int out_total = 0;
int _compare1[2] = {0}, _compare2[2] = {0}, _mode[2] = {0};
float output_selectivity = 1.0;
int output_estimate = 0;
if (qo->selectCPUPipelineCol[sg].size() == 0) return;
off_col_out = new int*[cm->TOT_TABLE](); //initialize to NULL
for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) {
if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break;
ColumnInfo* column = qo->selectCPUPipelineCol[sg][i];
filter_col[select_so_far + i] = column;
_compare1[select_so_far + i] = params->compare1[column];
_compare2[select_so_far + i] = params->compare2[column];
_mode[select_so_far + i] = params->mode[column];
output_selectivity *= params->selectivity[column];
}
struct filterArgsCPU fargs = {
(filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL),
(filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL),
_compare1[0], _compare2[0], _compare1[1], _compare2[1],
_mode[0], _mode[1],
(filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL),
(filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL)
};
float time;
SETUP_TIMING();
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity;
if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[0], output_estimate * sizeof(int), cudaHostAllocDefault));
if (custom) off_col_out[0] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
} else {
// assert(filter_col[0] == NULL);
// assert(filter_col[1] != NULL);
assert(*h_total > 0);
output_estimate = *h_total * output_selectivity;
for (int i = 0; i < cm->TOT_TABLE; i++) {
if (h_off_col[i] != NULL || i == 0) {
if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault));
if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
}
}
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
// output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity;
// if (custom) off_col_out[0] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
int LEN;
if (sg == qo->last_segment[0]) {
LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
} else {
LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
}
short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);
filter_CPU(fargs, off_col_out[0], LEN, &out_total, 0, segment_group_ptr);
} else {
// assert(filter_col[0] == NULL);
// assert(filter_col[1] != NULL);
assert(*h_total > 0);
// output_estimate = *h_total * output_selectivity;
// for (int i = 0; i < cm->TOT_TABLE; i++) {
// if (h_off_col[i] != NULL || i == 0) {
// if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
// }
// }
filter_CPU2(h_off_col[0], fargs, off_col_out[0], *h_total, &out_total, 0);
if (!custom) {
for (int i = 0; i < cm->TOT_TABLE; i++) {
if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]);
}
}
}
h_off_col = off_col_out;
for (int i = 0; i < cm->TOT_TABLE; i++)
h_off_col[i] = off_col_out[i];
*h_total = out_total;
if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg << endl;
assert(*h_total <= output_estimate);
assert(*h_total > 0);
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Filter Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
}
void
CPUGPUProcessing::call_pfilter_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) {
ColumnInfo *filter_col[2] = {};
int _compare1[2] = {0}, _compare2[2] = {0}, _mode[2] = {0};
int *aggr_col[2] = {};
if (qo->aggregation[cm->lo_orderdate].size() == 0) return;
if (qo->selectCPUPipelineCol[sg].size() == 0) return;
for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
aggr_col[i] = column->col_ptr;
}
for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) {
if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break;
ColumnInfo* column = qo->selectCPUPipelineCol[sg][i];
filter_col[select_so_far + i] = column;
_compare1[select_so_far + i] = params->compare1[column];
_compare2[select_so_far + i] = params->compare2[column];
_mode[select_so_far + i] = params->mode[column];
}
struct groupbyArgsCPU gargs = {
aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL,
0, 0, 0, 0,
0, 0, 0, 0,
0, params->mode_group, params->h_group_func
};
struct filterArgsCPU fargs = {
(filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL),
(filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL),
_compare1[0], _compare2[0], _compare1[1], _compare2[1],
_mode[0], _mode[1],
(filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL),
(filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL)
};
float time;
SETUP_TIMING();
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
assert(0);
} else {
// assert(*h_total > 0);
if (*h_total == 0) return;
filter_aggr_CPU2(h_off_col[0], fargs, gargs, params->res, *h_total);
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Filter Aggregation Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
};
void
CPUGPUProcessing::call_bfilter_build_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table) {
ColumnInfo* column, *filter_col;
int* group_ptr = NULL, *filter_ptr = NULL;
for (int i = 0; i < qo->join.size(); i++) {
if (qo->join[i].second->table_id == table) {
column = qo->join[i].second; break;
}
}
if (qo->groupby_build.size() > 0 && qo->groupby_build[column].size() > 0) {
group_ptr = qo->groupby_build[column][0]->col_ptr;
}
if (qo->select_build[column].size() > 0) {
filter_col = qo->select_build[column][0];
filter_ptr = filter_col->col_ptr;
}
struct filterArgsCPU fargs = {
filter_ptr, NULL,
params->compare1[filter_col], params->compare2[filter_col], 0, 0,
params->mode[filter_col], 0,
(filter_col != NULL) ? (params->map_filter_func_host[filter_col]) : (NULL), NULL
};
struct buildArgsCPU bargs = {
column->col_ptr, group_ptr,
params->dim_len[column], params->min_key[column]
};
if (params->ht_CPU[column] != NULL) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
int LEN;
if (sg == qo->last_segment[table]) {
LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE;
} else {
LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE;
}
short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment);
build_CPU(fargs, bargs, LEN, params->ht_CPU[column], 0, segment_group_ptr);
} else {
build_CPU2(h_off_col, fargs, bargs, *h_total, params->ht_CPU[column], 0);
if (!custom) cudaFreeHost(h_off_col);
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Filter Build Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
}
};
void
CPUGPUProcessing::call_build_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table) {
ColumnInfo* column;
int* group_ptr = NULL;
for (int i = 0; i < qo->join.size(); i++) {
if (qo->join[i].second->table_id == table) {
column = qo->join[i].second; break;
}
}
if (qo->groupby_build.size() > 0 && qo->groupby_build[column].size() > 0) {
group_ptr = qo->groupby_build[column][0]->col_ptr;
}
struct filterArgsCPU fargs = {NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL};
struct buildArgsCPU bargs = {
column->col_ptr, group_ptr,
params->dim_len[column], params->min_key[column]
};
if (params->ht_CPU[column] != NULL) {
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
int LEN;
if (sg == qo->last_segment[table]) {
LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE;
} else {
LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE;
}
short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment);
build_CPU(fargs, bargs, LEN, params->ht_CPU[column], 0, segment_group_ptr);
} else {
build_CPU2(h_off_col, fargs, bargs, *h_total, params->ht_CPU[column], 0);
if (!custom) cudaFreeHost(h_off_col);
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Build Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
}
};
void
CPUGPUProcessing::call_bfilter_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table) {
ColumnInfo* temp;
for (int i = 0; i < qo->join.size(); i++) {
if (qo->join[i].second->table_id == table) {
temp = qo->join[i].second; break;
}
}
// assert(qo->select_build[temp].size() > 0);
if (qo->select_build[temp].size() == 0) return;
ColumnInfo* column = qo->select_build[temp][0];
int* filter_col = column->col_ptr;
int output_estimate = qo->segment_group_count[table][sg] * SEGMENT_SIZE * params->selectivity[column];
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
if (custom) h_off_col = (int*) cm->customCudaHostAlloc<int>(output_estimate);
else CubDebugExit(cudaHostAlloc((void**) &h_off_col, output_estimate * sizeof(int), cudaHostAllocDefault));
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
cudaEventRecord(start, 0);
int LEN;
if (sg == qo->last_segment[table]) {
LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE;
} else {
LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE;
}
struct filterArgsCPU fargs = {
filter_col, NULL,
params->compare1[column], params->compare2[column], 0, 0,
params->mode[column], 0, params->map_filter_func_host[column], NULL
};
short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment);
filter_CPU(fargs, h_off_col, LEN, h_total, 0, segment_group_ptr);
if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg << endl;
assert(*h_total <= output_estimate);
assert(*h_total > 0);
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Filter Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
};
void
CPUGPUProcessing::call_group_by_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) {
int _min_val[4] = {0}, _unique_val[4] = {0};
int *aggr_col[2] = {}, *group_col[4] = {};
if (qo->aggregation[cm->lo_orderdate].size() == 0) return;
for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
aggr_col[i] = column->col_ptr;
}
unordered_map<ColumnInfo*, vector<ColumnInfo*>>::iterator it;
for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) {
if (it->second.size() > 0) {
ColumnInfo* column = it->second[0];
ColumnInfo* column_key = it->first;
group_col[column_key->table_id - 1] = column->col_ptr;
_min_val[column_key->table_id - 1] = params->min_val[column_key];
_unique_val[column_key->table_id - 1] = params->unique_val[column_key];
}
}
struct groupbyArgsCPU gargs = {
aggr_col[0], aggr_col[1], group_col[0], group_col[1], group_col[2], group_col[3],
_min_val[0], _min_val[1], _min_val[2], _min_val[3],
_unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3],
params->total_val, params->mode_group, params->h_group_func
};
struct offsetCPU offset = {
h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4],
};
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
cout << *h_total << endl;
if (*h_total > 0) groupByCPU(offset, gargs, *h_total, params->res);
if (!custom) {
for (int i = 0; i < cm->TOT_TABLE; i++) {
if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]);
}
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Group Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
};
void
CPUGPUProcessing::call_aggregation_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg) {
int *aggr_col[2] = {};
if (qo->aggregation[cm->lo_orderdate].size() == 0) return;
for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
aggr_col[i] = column->col_ptr;
}
struct groupbyArgsCPU gargs = {
aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL,
0, 0, 0, 0,
0, 0, 0, 0,
0, params->mode_group, params->h_group_func
};
SETUP_TIMING();
float time;
cudaEventRecord(start, 0);
// assert(h_off_col != NULL);
if (*h_total > 0) aggregationCPU(h_off_col, gargs, *h_total, params->res);
if (!custom) cudaFree(h_off_col);
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Aggr Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
};
void
CPUGPUProcessing::call_probe_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) {
int _min_key[4] = {0}, _dim_len[4] = {0};
int *ht[4] = {}, *fkey_col[4] = {};
int *aggr_col[2] = {};
for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) {
ColumnInfo* column = qo->joinCPUPipelineCol[sg][i];
int table_id = qo->fkey_pkey[column]->table_id;
fkey_col[table_id - 1] = column->col_ptr;
ColumnInfo* pkey = qo->fkey_pkey[column];
ht[table_id - 1] = params->ht_CPU[pkey];
_min_key[table_id - 1] = params->min_key[pkey];
_dim_len[table_id - 1] = params->dim_len[pkey];
}
for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
aggr_col[i] = column->col_ptr;
}
struct probeArgsCPU pargs = {
fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3],
ht[0], ht[1], ht[2], ht[3],
_dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
_min_key[0], _min_key[1], _min_key[2], _min_key[3]
};
struct groupbyArgsCPU gargs = {
aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL,
0, 0, 0, 0,
0, 0, 0, 0,
0, params->mode_group, params->h_group_func
};
cudaEvent_t start, stop;
float time;
cudaEventCreate(&start);
cudaEventCreate(&stop);
cudaEventRecord(start, 0);
if (h_off_col == NULL) {
int LEN;
if (sg == qo->last_segment[0]) {
LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
} else {
LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
}
short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);
probe_aggr_CPU(pargs, gargs, LEN, params->res, 0, segment_group_ptr);
} else {
struct offsetCPU offset = {
h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4]
};
probe_aggr_CPU2(offset, pargs, gargs, *h_total, params->res, 0);
}
cudaEventRecord(stop, 0); // Stop time measuring
cudaEventSynchronize(stop); // Wait until the completion of all device
// work preceding the most recent call to cudaEventRecord()
cudaEventElapsedTime(&time, start, stop); // Saving the time measured
if (verbose) cout << "Probe Aggr Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
};
void
CPUGPUProcessing::call_pfilter_probe_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) {
int _min_key[4] = {0}, _dim_len[4] = {0};
int *ht[4] = {}, *fkey_col[4] = {};
ColumnInfo* filter_col[2] = {};
int _compare1[2] = {0}, _compare2[2] = {0};
int *aggr_col[2] = {};
for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) {
if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break;
ColumnInfo* column = qo->selectCPUPipelineCol[sg][i];
filter_col[select_so_far + i] = column;
_compare1[select_so_far + i] = params->compare1[column];
_compare2[select_so_far + i] = params->compare2[column];
}
for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) {
ColumnInfo* column = qo->joinCPUPipelineCol[sg][i];
int table_id = qo->fkey_pkey[column]->table_id;
fkey_col[table_id - 1] = column->col_ptr;
ColumnInfo* pkey = qo->fkey_pkey[column];
ht[table_id - 1] = params->ht_CPU[pkey];
_min_key[table_id - 1] = params->min_key[pkey];
_dim_len[table_id - 1] = params->dim_len[pkey];
}
for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
aggr_col[i] = column->col_ptr;
}
struct filterArgsCPU fargs = {
(filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL),
(filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL),
_compare1[0], _compare2[0], _compare1[1], _compare2[1],
1, 1,
(filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL),
(filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL)
};
struct probeArgsCPU pargs = {
fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3],
ht[0], ht[1], ht[2], ht[3],
_dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
_min_key[0], _min_key[1], _min_key[2], _min_key[3]
};
struct groupbyArgsCPU gargs = {
aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL,
0, 0, 0, 0,
0, 0, 0, 0,
0, params->mode_group, params->h_group_func
};
cudaEvent_t start, stop; // variables that holds 2 events
float time; // Variable that will hold the time
cudaEventCreate(&start); // creating the event 1
cudaEventCreate(&stop); // creating the event 2
cudaEventRecord(start, 0); // start measuring the time
if (h_off_col == NULL) {
int LEN;
if (sg == qo->last_segment[0]) {
LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
} else {
LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
}
short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);
filter_probe_aggr_CPU(fargs, pargs, gargs, LEN, params->res, 0, segment_group_ptr);
} else {
struct offsetCPU offset = {
h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4]
};
filter_probe_aggr_CPU2(offset, fargs, pargs, gargs, *h_total, params->res, 0);
}
cudaEventRecord(stop, 0);
cudaEventSynchronize(stop);
cudaEventElapsedTime(&time, start, stop);
if (verbose) cout << "Filter Probe Aggr Kernel time CPU: " << time << endl;
cpu_time[sg] += time;
};