#include "CacheManager.h"
#include "KernelLaunch.h"
#include "QueryOptimizer.h"
KernelParams::KernelParams(struct filterArgsGPU* _fargs, struct probeArgsGPU* _pargs, struct buildArgsGPU* _bargs,
struct groupbyArgsGPU* _gargs, struct shuffleArgsGPU* _sargs,
struct shuffleHelper* _shelper, int** _d_total, int** _h_total) {
fargs = _fargs;
pargs = _pargs;
gargs = _gargs;
bargs = _bargs;
shelper = _shelper;
sargs = _sargs;
// assert(_d_total != NULL);
d_total = _d_total;
assert(_h_total != NULL);
h_total = _h_total;
in_off = new offsetGPU(); //input offset
out_off = new offsetGPU(); //output offset
sout = new shuffleOutGPU(); //output of partitioning
assert(shelper != NULL);
}
KernelLaunch::KernelLaunch(CacheManager* _cm, KernelParams* _kparams, QueryParams* _qparams,
int _sg, int _gpu, KernelType _kernel_type, int _table_id,
float _output_selectivity, int _latemat, bool _aggrGPUcheck, cudaStream_t _stream) {
cm = _cm;
sg = _sg;
gpu = _gpu;
kernel_type = _kernel_type;
table_id = _table_id;
stream = _stream;
output_estimate = 0;
INPUT_LEN = 0;
d_segment_group_each_gpu = NULL;
output_selectivity = _output_selectivity;
kparams = _kparams;
qparams = _qparams;
fargs = _kparams->fargs;
pargs = _kparams->pargs;
gargs = _kparams->gargs;
bargs = _kparams->bargs;
in_off = _kparams->in_off;
out_off = _kparams->out_off;
sargs = _kparams->sargs;
sout = _kparams->sout;
shelper = _kparams->shelper;
d_total = _kparams->d_total;
h_total = _kparams->h_total;
count = NULL;
key_off_col = NULL;
latemat = _latemat;
aggrGPUcheck = _aggrGPUcheck;
assert(shelper != NULL);
// result_count = shelper->result_count; //partitioning result count
// temp_col = shelper->temp_col; //intermediate result of shuffling (column)
// temp_off = shelper->temp_off; //intermediate result of shuffling (offset)
// join_result_off = shelper->join_result_off; //intermediate result of join (can be global or local offset depending on needs)
// out_shuffle_col = shelper->out_shuffle_col; //output of partitioning (column)
// out_shuffle_off = shelper->out_shuffle_off; //output of partitioning (can be global or local offset depending on needs)
}
void
KernelLaunch::countPartitioning(int*** &off_col, short*** segment_group_each_gpu_count, short*** segment_group_each_gpu,
int** last_segment_gpu, bool first_shuffle) {
assert(sargs != NULL);
assert(shelper != NULL);
assert(NUM_PARTITION == NUM_GPU);
int tile_items = 128*4;
count = new int[NUM_PARTITION]();
int* d_count = (int*) cm->customCudaMalloc<int>(NUM_PARTITION, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemsetAsync(d_count, 0, NUM_PARTITION * sizeof(int), stream));
CubDebugExit(cudaSetDevice(0));
if (!first_shuffle) {
assert(0);
assert(*(h_total[gpu]) > 0);
INPUT_LEN = *(h_total[gpu]);
// CubDebugExit(cudaSetDevice(gpu));
// RangePartitioningCount2<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
// *sargs, d_count, INPUT_LEN);
// CubDebugExit(cudaMemcpyAsync(count, d_count, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
// CubDebugExit(cudaSetDevice(0));
} else {
if (off_col[gpu][table_id] == NULL) {
assert(segment_group_each_gpu_count != NULL);
assert(segment_group_each_gpu_count[gpu] != NULL);
assert(segment_group_each_gpu_count[gpu][table_id] != NULL);
assert(segment_group_each_gpu_count[gpu][table_id][sg] > 0);
assert(segment_group_each_gpu != NULL);
assert(segment_group_each_gpu[gpu] != NULL);
assert(segment_group_each_gpu[gpu][table_id] != NULL);
if (last_segment_gpu[gpu][table_id] == sg) {
INPUT_LEN = (segment_group_each_gpu_count[gpu][table_id][sg] - 1) * SEGMENT_SIZE + cm->allTable[table_id]->LEN % SEGMENT_SIZE;
} else {
INPUT_LEN = segment_group_each_gpu_count[gpu][table_id][sg] * SEGMENT_SIZE;
}
d_segment_group_each_gpu = (short*) cm->customCudaMalloc<short>(cm->allTable[table_id]->total_segment, gpu);
CubDebugExit(cudaSetDevice(gpu));
short* segment_group_each_gpu_ptr = segment_group_each_gpu[gpu][table_id] + (sg * cm->allTable[table_id]->total_segment);
CubDebugExit(cudaMemcpyAsync(d_segment_group_each_gpu, segment_group_each_gpu_ptr, segment_group_each_gpu_count[gpu][table_id][sg] * sizeof(short), cudaMemcpyHostToDevice, stream));
RangePartitioningCount<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *fargs, *sargs, d_count, INPUT_LEN, gpu, d_segment_group_each_gpu);
CubDebugExit(cudaMemcpyAsync(count, d_count, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
CubDebugExit(cudaSetDevice(0));
//first partitioning but there is an offset from CPU (not yet supported)
} else {
assert(*(h_total[gpu]) > 0);
INPUT_LEN = *(h_total[gpu]);
CubDebugExit(cudaSetDevice(gpu));
RangePartitioningCount2<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *sargs, d_count, INPUT_LEN, gpu);
CubDebugExit(cudaMemcpyAsync(count, d_count, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
CubDebugExit(cudaSetDevice(0));
}
}
}
void
KernelLaunch::synchronizePartitioning() {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
}
void
KernelLaunch::preparePartitioningAfterCount(int*** &off_col, int*** used_col_idx, bool* joinGPUcheck, bool first_shuffle) {
assert(table_id != 0);
assert(sargs != NULL);
assert(shelper != NULL);
assert(NUM_PARTITION == NUM_GPU);
int* sum_count = new int[NUM_PARTITION+1]();
sum_count[0] = 0;
for (int partition = 1; partition <= NUM_PARTITION; partition++) {
sum_count[partition] = sum_count[partition-1] + count[partition-1];
}
assert(shelper->out_shuffle_col != NULL);
assert(shelper->out_shuffle_off != NULL);
assert(shelper->out_shuffle_col[gpu] != NULL);
assert(shelper->out_shuffle_off[gpu] != NULL);
//if not first shuffle, no need to allocate (this might not work for the case when the second partitioning returned a larger out_shuffle_col than the first partitioning)
for (int col = 0; col < NUM_COLUMN; col++) {
int table = cm->allColumn[col]->table_id;
if (used_col_idx[gpu][col] != NULL || shelper->temp_col[gpu][col] != NULL) {
int* temp;
if (table == table_id && shelper->out_shuffle_col[gpu][col] == NULL) {
shelper->out_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
cout << "allocate out_shuffle_col " << col << " " << (int) sum_count[NUM_PARTITION] * MULTIPLIER << endl;
temp = cm->customCudaMalloc<int>((int) sum_count[NUM_PARTITION] * MULTIPLIER, gpu);
} else {
temp = shelper->out_shuffle_col[gpu][col][0];
}
assert(shelper->out_shuffle_col[gpu][col] != NULL);
for (int partition = 0; partition < NUM_PARTITION; partition++) {
assert(temp + sum_count[partition] != NULL);
shelper->out_shuffle_col[gpu][col][partition] = temp + sum_count[partition];
}
}
}
int* temp;
if (shelper->out_shuffle_off[gpu][table_id] == NULL) {
shelper->out_shuffle_off[gpu][table_id] = new int*[NUM_PARTITION]();
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
cout << "allocate out_shuffle_off " << table_id << " " << (int) sum_count[NUM_PARTITION] * MULTIPLIER << endl;
temp = cm->customCudaMalloc<int>((int) sum_count[NUM_PARTITION] * MULTIPLIER, gpu);
} else {
assert(shelper->out_shuffle_off[gpu][table_id] != NULL);
temp = shelper->out_shuffle_off[gpu][table_id][0];
}
assert(shelper->out_shuffle_off[gpu][table_id] != NULL);
for (int partition = 0; partition < NUM_PARTITION; partition++) {
assert(temp + sum_count[partition] != NULL);
shelper->out_shuffle_off[gpu][table_id][partition] = temp + sum_count[partition];
}
int*** helper_col = new int**[NUM_COLUMN]();
for (int col = 0; col < NUM_COLUMN; col++) {
if (shelper->out_shuffle_col[gpu][col] != NULL) {
assert(shelper->out_shuffle_col[gpu][col] != NULL);
helper_col[col] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(helper_col[col], shelper->out_shuffle_col[gpu][col], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
}
}
int*** d_out_col = (int***) cm->customCudaMalloc<int**>(NUM_COLUMN, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_out_col, helper_col, NUM_COLUMN * sizeof(int**), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
int*** helper_off = new int**[NUM_TABLE]();
for (int table = 0; table < NUM_TABLE; table++) {
if (shelper->out_shuffle_off[gpu][table] != NULL) {
assert(shelper->out_shuffle_off[gpu][table] != NULL);
helper_off[table] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(helper_off[table], shelper->out_shuffle_off[gpu][table], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
}
}
int*** d_out_off = (int***) cm->customCudaMalloc<int**>(NUM_TABLE, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_out_off, helper_off, NUM_TABLE * sizeof(int**), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
int* d_pos = (int*) cm->customCudaMalloc<int>(NUM_PARTITION, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemsetAsync(d_pos, 0, NUM_PARTITION * sizeof(int), stream));
CubDebugExit(cudaSetDevice(0));
*sout = {
d_pos, d_out_col, d_out_off
};
}
void
KernelLaunch::preparePartitioningWithoutCount(int*** &off_col, int*** used_col_idx, short*** segment_group_each_gpu_count,
short*** segment_group_each_gpu, int** last_segment_gpu, bool* joinGPUcheck, bool first_shuffle, bool pipeline) {
assert(sargs != NULL);
assert(shelper != NULL);
assert(NUM_PARTITION == NUM_GPU);
if (!first_shuffle) {
assert(0);
assert(*(h_total[gpu]) > 0);
INPUT_LEN = *(h_total[gpu]);
// cout << "input : " << INPUT_LEN << endl;
// cout << "count : " << count[0] << " " << count[1] << endl;
} else {
if (off_col[gpu][table_id] == NULL) {
assert(segment_group_each_gpu_count != NULL);
assert(segment_group_each_gpu_count[gpu] != NULL);
assert(segment_group_each_gpu_count[gpu][table_id] != NULL);
if (table_id == 0) assert(segment_group_each_gpu_count[gpu][table_id][sg] > 0);
assert(segment_group_each_gpu != NULL);
assert(segment_group_each_gpu[gpu] != NULL);
assert(segment_group_each_gpu[gpu][table_id] != NULL);
if (last_segment_gpu[gpu][table_id] == sg) {
INPUT_LEN = (segment_group_each_gpu_count[gpu][table_id][sg] - 1) * SEGMENT_SIZE + cm->allTable[table_id]->LEN % SEGMENT_SIZE;
} else {
INPUT_LEN = segment_group_each_gpu_count[gpu][table_id][sg] * SEGMENT_SIZE;
}
// for (int i = 0; i < segment_group_each_gpu_count[gpu][table_id][sg]; i++) {
// printf("gpu %d segment idx %d\n", gpu, segment_group_each_gpu[gpu][table_id][sg * cm->allTable[table_id]->total_segment + i]);
// }
d_segment_group_each_gpu = (short*) cm->customCudaMalloc<short>(cm->allTable[table_id]->total_segment, gpu);
CubDebugExit(cudaSetDevice(gpu));
short* segment_group_each_gpu_ptr = segment_group_each_gpu[gpu][table_id] + (sg * cm->allTable[table_id]->total_segment);
CubDebugExit(cudaMemcpyAsync(d_segment_group_each_gpu, segment_group_each_gpu_ptr, segment_group_each_gpu_count[gpu][table_id][sg] * sizeof(short), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
//first partitioning but there is an offset from CPU (not yet supported)
} else {
// assert(0);
if (table_id == 0) assert(*(h_total[gpu]) > 0);
INPUT_LEN = *(h_total[gpu]);
}
}
// if (first_shuffle) {
//if not first shuffle, no need to allocate (this might not work for the case when the second partitioning returned a larger out_shuffle_col than the first partitioning)
assert(shelper->in_shuffle_col != NULL);
assert(shelper->in_shuffle_off != NULL);
assert(shelper->out_shuffle_col != NULL);
assert(shelper->out_shuffle_off != NULL);
assert(shelper->in_shuffle_col[gpu] != NULL);
assert(shelper->in_shuffle_off[gpu] != NULL);
assert(shelper->out_shuffle_col[gpu] != NULL);
assert(shelper->out_shuffle_off[gpu] != NULL);
for (int col = 0; col < NUM_COLUMN; col++) {
int table = cm->allColumn[col]->table_id;
if (used_col_idx[gpu][col] != NULL || shelper->temp_col[gpu][col] != NULL) {
if (table == table_id && shelper->out_shuffle_col[gpu][col] == NULL) {
if (pipeline) shelper->in_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
shelper->out_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
// cout << "allocate out_shuffle_col " << col << endl;
cout << "allocate out_shuffle_col " << col << " " << (int) INPUT_LEN * output_selectivity / NUM_PARTITION * MULTIPLIER << endl;
for (int partition = 0; partition < NUM_PARTITION; partition++) {
// cout << (int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER << endl;
// cout << INPUT_LEN << endl;
// cout << output_selectivity << endl;
// cout << INPUT_LEN / NUM_PARTITION << endl;
// cout << INPUT_LEN / NUM_PARTITION * output_selectivity << endl;
// shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN * output_selectivity / NUM_PARTITION * MULTIPLIER, gpu);
if (output_selectivity == 1.0) shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
else shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
// if (pipeline) shelper->in_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN * output_selectivity / NUM_PARTITION * MULTIPLIER, gpu);
}
}
}
}
// if (table_id == 0) {
// for (int table = 0; table < NUM_TABLE; table++) {
// // if (off_col[gpu][table] != NULL || latemat == 2) {
// if (table == 0 || joinGPUcheck[table] || off_col[gpu][table] != NULL) {
// if (pipeline) shelper->in_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
// shelper->out_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
// // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
// // cout << "allocate out_shuffle_off and in_shuffle_off " << table << endl;
// for (int partition = 0; partition < NUM_PARTITION; partition++) {
// shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
// if (pipeline) shelper->in_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
// }
// assert(shelper->out_shuffle_off[gpu][table] != NULL);
// if (pipeline) assert(shelper->in_shuffle_off[gpu][table] != NULL);
// }
// }
// } else {
if (shelper->out_shuffle_off[gpu][table_id] == NULL) {
// cout << INPUT_LEN << endl;
// cout << output_selectivity << endl;
// cout << INPUT_LEN / NUM_PARTITION << endl;
// cout << INPUT_LEN / NUM_PARTITION * output_selectivity << endl;
cout << "allocate out_shuffle_off " << table_id << " " << (int) INPUT_LEN * output_selectivity / NUM_PARTITION * MULTIPLIER << endl;
// if (pipeline) shelper->in_shuffle_off[gpu][table_id] = new int*[NUM_PARTITION]();
shelper->out_shuffle_off[gpu][table_id] = new int*[NUM_PARTITION]();
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
for (int partition = 0; partition < NUM_PARTITION; partition++) {
// shelper->out_shuffle_off[gpu][table_id][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN * output_selectivity / NUM_PARTITION * MULTIPLIER, gpu);
if (output_selectivity == 1.0) shelper->out_shuffle_off[gpu][table_id][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
else shelper->out_shuffle_off[gpu][table_id][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
// if (pipeline) shelper->in_shuffle_off[gpu][table_id][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
}
}
// }
// }
int*** helper_col = new int**[NUM_COLUMN]();
for (int col = 0; col < NUM_COLUMN; col++) {
if (shelper->out_shuffle_col[gpu][col] != NULL) {
assert(shelper->out_shuffle_col[gpu][col] != NULL);
helper_col[col] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(helper_col[col], shelper->out_shuffle_col[gpu][col], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
}
}
int*** d_out_col = (int***) cm->customCudaMalloc<int**>(NUM_COLUMN, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_out_col, helper_col, NUM_COLUMN * sizeof(int**), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
int*** helper_off = new int**[NUM_TABLE]();
for (int table = 0; table < NUM_TABLE; table++) {
if (shelper->out_shuffle_off[gpu][table] != NULL) {
assert(shelper->out_shuffle_off[gpu][table] != NULL);
helper_off[table] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(helper_off[table], shelper->out_shuffle_off[gpu][table], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
}
}
int*** d_out_off = (int***) cm->customCudaMalloc<int**>(NUM_TABLE, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_out_off, helper_off, NUM_TABLE * sizeof(int**), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
int* d_pos = (int*) cm->customCudaMalloc<int>(NUM_PARTITION, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemsetAsync(d_pos, 0, NUM_PARTITION * sizeof(int), stream));
CubDebugExit(cudaSetDevice(0));
*sout = {
d_pos, d_out_col, d_out_off
};
}
void
KernelLaunch::launchPartitioning(int latemat, int pipeline) {
if (table_id == 0) assert(INPUT_LEN > 0);
if (INPUT_LEN == 0) return;
int tile_items = 128*4;
assert(sargs != NULL);
assert(sout != NULL);
assert(sout->pos != NULL);
assert(sout->column != NULL);
assert(sout->out_off != NULL);
if (d_segment_group_each_gpu == NULL) {
// assert(0);
assert(sargs->column != NULL);
CubDebugExit(cudaSetDevice(gpu));
// CHECK_ERROR_STREAM(stream);
// if (latemat == 2) {
// assert(0);
//write offset and not value
// RangePartitioningKeyValue2<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
// *sargs, *sout, INPUT_LEN, 0, 1);
// RangePartitioningKeyValue3<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
// cm->gpuCache[gpu], *sargs, *sout, INPUT_LEN, 0, 1);
// } else {
// assert(0);
//write value and offset
// RangePartitioningKeyValue2<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
// *sargs, *sout, INPUT_LEN, 1, 0);
// cout << " lets go here " << endl;
// RangePartitioningKeyValue3<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
// cm->gpuCache[gpu], *sargs, *sout, INPUT_LEN, 1, 1);
// }
//WARNING: Have to handle latemat
RangePartitioningKeyValue2<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *sargs, *sout, INPUT_LEN, gpu, 1, 0);
// CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
} else {
assert(d_segment_group_each_gpu != NULL);
assert(sargs->col_idx != NULL);
CubDebugExit(cudaSetDevice(gpu));
// CHECK_ERROR_STREAM(stream);
if (latemat == 2 && pipeline == 1) {
//write offset and not value
RangePartitioningKeyValue<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *fargs, *sargs, *sout, INPUT_LEN, gpu, 0, 1, d_segment_group_each_gpu);
} else {
// write value and offset
RangePartitioningKeyValue<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *fargs, *sargs, *sout, INPUT_LEN, gpu, 1, 1, d_segment_group_each_gpu);
}
// CHECK_ERROR_STREAM(stream);
// // assert(0);
// // cout << gpu << endl;
// int* temp = new int[NUM_PARTITION];
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(temp, sout->pos, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
// CHECK_ERROR_STREAM(stream);
// CubDebugExit(cudaSetDevice(0));
// cout << temp[0] << " " << temp[1] << endl;
// assert(0);
// if (gpu == 1) assert(0);
CubDebugExit(cudaSetDevice(0));
}
}
void
KernelLaunch::clearPartitioning() {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaMemcpyAsync(shelper->result_count[gpu], sout->pos, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
}
void
KernelLaunch::prepareKernelDim(int*** &off_col, ColumnInfo* build_column, short*** segment_group_each_gpu_count,
short*** segment_group_each_gpu, int** last_segment_gpu, int table, bool has_shuffled) {
assert(build_column != NULL);
key_column = build_column;
assert(off_col != NULL);
assert(off_col[gpu] != NULL);
if (has_shuffled) {
// assert(0); //not implemented yet
assert(*(h_total[gpu]) > 0);
INPUT_LEN = *(h_total[gpu]);
key_off_col = off_col[gpu][table];
} else {
if (off_col[gpu][table] == NULL) {
assert(segment_group_each_gpu_count != NULL);
assert(segment_group_each_gpu_count[gpu] != NULL);
assert(segment_group_each_gpu_count[gpu][table_id] != NULL);
if (table == 0) assert(segment_group_each_gpu_count[gpu][table_id][sg] > 0);
assert(segment_group_each_gpu != NULL);
assert(segment_group_each_gpu[gpu] != NULL);
assert(segment_group_each_gpu[gpu][table_id] != NULL);
if (last_segment_gpu[gpu][table_id] == sg) {
INPUT_LEN = (segment_group_each_gpu_count[gpu][table_id][sg] - 1) * SEGMENT_SIZE + cm->allTable[table_id]->LEN % SEGMENT_SIZE;
} else {
INPUT_LEN = segment_group_each_gpu_count[gpu][table_id][sg] * SEGMENT_SIZE;
}
// cout << "gpu " << gpu << " " << INPUT_LEN << endl;
d_segment_group_each_gpu = (short*) cm->customCudaMalloc<short>(cm->allTable[table_id]->total_segment, gpu);
CubDebugExit(cudaSetDevice(gpu));
short* segment_group_each_gpu_ptr = segment_group_each_gpu[gpu][table_id] + (sg * cm->allTable[table_id]->total_segment);
CubDebugExit(cudaMemcpyAsync(d_segment_group_each_gpu, segment_group_each_gpu_ptr, segment_group_each_gpu_count[gpu][table_id][sg] * sizeof(short), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
} else {
if (table_id == 0) assert(*(h_total[gpu]) > 0);
assert(off_col[gpu][table] != NULL);
INPUT_LEN = *(h_total[gpu]);
key_off_col = off_col[gpu][table];
}
}
}
void
KernelLaunch::prepareKernelFact(int*** &off_col, int*** used_col_idx, short*** segment_group_each_gpu_count,
short*** segment_group_each_gpu, int** last_segment_gpu, bool* joinGPUcheck, int* fkey_col_id, int* group_col_id,
int will_shuffle, bool has_shuffled) {
off_col_out = new int*[NUM_TABLE]();
assert(off_col != NULL);
assert(off_col[gpu] != NULL);
//if (has_shuffled) input will be partition
//if there is a shuffle afterward, output will be allocated
//if there is no shuffle afteraward, output will not be allocated
//else input will be segment
if (has_shuffled) {
// cout << gpu << " " << *(h_total[gpu]) << endl;
// if (*(h_total[gpu]) == 0) return;
// assert(*(h_total[gpu]) > 0);
INPUT_LEN = *(h_total[gpu]);
// output_estimate = INPUT_LEN * output_selectivity * MULTIPLIER;
if (output_selectivity == 1.0) output_estimate = INPUT_LEN * MULTIPLIER;
else output_estimate = INPUT_LEN * output_selectivity;
if (kernel_type == JustProbe || kernel_type == JustFilter) {
for (int table = 0; table < NUM_TABLE; table++) {
if (table == 0 || (table > 0 && group_col_id[table-1] > 0)) {
cout << "allocating off col out " << table << " " << output_selectivity << " " << output_estimate << endl;
off_col_out[table] = (int*) cm->customCudaMalloc<int>(output_estimate, gpu);
}
}
// cout << "copying join result off" << endl;
int** d_join_result_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_join_result_off, off_col_out, NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
*out_off = {NULL, NULL, NULL, NULL, NULL, d_join_result_off};
}
*in_off = {off_col[gpu][0], off_col[gpu][1], off_col[gpu][2], off_col[gpu][3], off_col[gpu][4]};
// int* tmp = new int[INPUT_LEN];
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(tmp, off_col[gpu][1], INPUT_LEN * sizeof(int), cudaMemcpyDeviceToHost, stream));
// CubDebugExit(cudaSetDevice(0));
// for (int i = 0; i < INPUT_LEN; i++) {
// printf("hey ho gpu %d %d\n", gpu, tmp[i]);
// }
// cout << endl;
// CHECK_ERROR_STREAM(stream);
} else {
if (off_col[gpu][0] == NULL) { //initialize out off and in off for this gpu
assert(segment_group_each_gpu_count != NULL);
assert(segment_group_each_gpu_count[gpu] != NULL);
assert(segment_group_each_gpu_count[gpu][table_id] != NULL);
// assert(segment_group_each_gpu_count[gpu][table_id][sg] > 0);
assert(segment_group_each_gpu != NULL);
assert(segment_group_each_gpu[gpu] != NULL);
assert(segment_group_each_gpu[gpu][table_id] != NULL);
assert(off_col_out != NULL);
// output_estimate = SEGMENT_SIZE * segment_group_each_gpu_count[gpu][table_id][sg] * output_selectivity * MULTIPLIER;
if (output_selectivity == 1.0) output_estimate = SEGMENT_SIZE * segment_group_each_gpu_count[gpu][table_id][sg] * MULTIPLIER;
else output_estimate = SEGMENT_SIZE * segment_group_each_gpu_count[gpu][table_id][sg] * output_selectivity;
if (kernel_type == JustProbe || kernel_type == JustFilter) {
for (int table = 0; table < NUM_TABLE; table++) {
if (table == 0 || (table > 0 && group_col_id[table-1] > 0)) {
// cout << " i am here " << endl;
cout << "allocating off col out " << table << " " << output_selectivity << " " << output_estimate << endl;
off_col_out[table] = (int*) cm->customCudaMalloc<int>(output_estimate, gpu);
// cout << " i am not here " << endl;
}
}
*out_off = {off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4], NULL};
}
if (last_segment_gpu[gpu][table_id] == sg) {
INPUT_LEN = (segment_group_each_gpu_count[gpu][table_id][sg] - 1) * SEGMENT_SIZE + cm->allTable[table_id]->LEN % SEGMENT_SIZE;
} else {
INPUT_LEN = segment_group_each_gpu_count[gpu][table_id][sg] * SEGMENT_SIZE;
}
d_segment_group_each_gpu = (short*) cm->customCudaMalloc<short>(cm->allTable[table_id]->total_segment, gpu);
CubDebugExit(cudaSetDevice(gpu));
short* segment_group_each_gpu_ptr = segment_group_each_gpu[gpu][table_id] + (sg * cm->allTable[table_id]->total_segment);
CubDebugExit(cudaMemcpyAsync(d_segment_group_each_gpu, segment_group_each_gpu_ptr, segment_group_each_gpu_count[gpu][table_id][sg] * sizeof(short), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
// cpu_to_gpu[sg] += (qo->segment_group_each_gpu_count[0][sg] * sizeof(short));
} else {
// if (*(h_total[gpu]) == 0) return;
// assert(*(h_total[gpu]) > 0);
assert(off_col_out != NULL);
assert(off_col[gpu] != NULL);
assert(off_col[gpu][0] != NULL);
INPUT_LEN = *(h_total[gpu]);
// output_estimate = INPUT_LEN * output_selectivity * MULTIPLIER;
if (output_selectivity == 1.0) output_estimate = INPUT_LEN * MULTIPLIER;
else output_estimate = INPUT_LEN * output_selectivity;
if (kernel_type == JustProbe || kernel_type == JustFilter) {
for (int table = 0; table < cm->TOT_TABLE; table++) {
if (off_col[gpu][table] != NULL || table == 0 || (table > 0 && group_col_id[table-1] > 0)) {
// cout << " i am here " << endl;
cout << "allocating off col out " << table << " " << output_selectivity << " " << output_estimate << endl;
off_col_out[table] = (int*) cm->customCudaMalloc<int>(output_estimate, gpu);
// cout << " i am not here " << endl;
}
}
*out_off = {off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4], NULL};
}
*in_off = {off_col[gpu][0], off_col[gpu][1], off_col[gpu][2], off_col[gpu][3], off_col[gpu][4]};
}
}
//if there will be shuffle
if (will_shuffle) {
if (kernel_type == ProbePartition) {
//if not first shuffle, no need to allocate (this might not work for the case when the second partitioning returned a larger out_shuffle_col than the first partitioning)
assert(shelper->out_shuffle_col != NULL);
assert(shelper->out_shuffle_off != NULL);
assert(shelper->out_shuffle_col[gpu] != NULL);
assert(shelper->out_shuffle_off[gpu] != NULL);
if (latemat != 2) {
for (int col = 0; col < NUM_COLUMN; col++) {
if (used_col_idx[gpu][col] != NULL || shelper->temp_col[gpu][col] != NULL) {
if (shelper->out_shuffle_col[gpu][col] == NULL) {
// cout << "col " << col << endl;
shelper->out_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
cout << "allocate out_shuffle_col " << col << " " << (int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER << endl;
for (int partition = 0; partition < NUM_PARTITION; partition++) {
// shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
if (output_selectivity == 1.0) shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
else shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
}
}
}
}
} else {
for (int col = 0; col < NUM_COLUMN; col++) {
int table = cm->allColumn[col]->table_id;
if (used_col_idx[gpu][col] != NULL || shelper->temp_col[gpu][col] != NULL) {
if (table == 0 || (table > 0 && fkey_col_id[table-1] > 0 && col == group_col_id[table-1])) {
if (shelper->out_shuffle_col[gpu][col] == NULL) {
shelper->out_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
cout << "allocate out_shuffle_col " << col << " " << (int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER << endl;
for (int partition = 0; partition < NUM_PARTITION; partition++) {
// shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
if (output_selectivity == 1.0) shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
else shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
}
}
}
}
}
}
// printf("%d %d %d\n", latemat, qparams->groupGPUcheck, aggrGPUcheck);
if ((latemat != 2 && qparams->groupGPUcheck && aggrGPUcheck) || (latemat == 2 && NUM_GPU == 8 && qparams->groupGPUcheck && aggrGPUcheck)) {
int table = 0;
if (shelper->out_shuffle_off[gpu][table] == NULL) {
shelper->out_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
cout << "allocate out_shuffle_off " << table_id << " " << (int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER << endl;
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
for (int partition = 0; partition < NUM_PARTITION; partition++) {
// if (output_selectivity == 1.0) shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
if (output_selectivity == 1.0) shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
else shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
}
}
} else {
for (int table = 0; table < NUM_TABLE; table++) {
if (table == 0 || (table > 0 && fkey_col_id[table-1] > 0 && group_col_id[table-1] > 0)) {
if (shelper->out_shuffle_off[gpu][table] == NULL) {
// cout << "table " << table << endl;
shelper->out_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
cout << "allocate out_shuffle_off " << table_id << " " << (int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER << endl;
// we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
for (int partition = 0; partition < NUM_PARTITION; partition++) {
// if (output_selectivity == 1.0) shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
if (output_selectivity == 1.0) shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
else shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
}
}
}
}
}
}
int*** helper_col = new int**[NUM_COLUMN]();
for (int col = 0; col < NUM_COLUMN; col++) {
if (shelper->out_shuffle_col[gpu][col] != NULL) {
assert(shelper->out_shuffle_col[gpu][col] != NULL);
helper_col[col] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(helper_col[col], shelper->out_shuffle_col[gpu][col], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
}
}
int*** d_out_col = (int***) cm->customCudaMalloc<int**>(NUM_COLUMN, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_out_col, helper_col, NUM_COLUMN * sizeof(int**), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
int*** helper_off = new int**[NUM_TABLE]();
for (int table = 0; table < NUM_TABLE; table++) {
if (shelper->out_shuffle_off[gpu][table] != NULL) {
assert(shelper->out_shuffle_off[gpu][table] != NULL);
helper_off[table] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(helper_off[table], shelper->out_shuffle_off[gpu][table], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
}
}
int*** d_out_off = (int***) cm->customCudaMalloc<int**>(NUM_TABLE, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_out_off, helper_off, NUM_TABLE * sizeof(int**), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
int* d_pos = (int*) cm->customCudaMalloc<int>(NUM_PARTITION, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemsetAsync(d_pos, 0, NUM_PARTITION * sizeof(int), stream));
CubDebugExit(cudaSetDevice(0));
*sout = {
d_pos, d_out_col, d_out_off
};
}
}
void
KernelLaunch::launchKernel(bool has_shuffled, bool broadcast, int* toBroadcast, int broadcast_len) {
if (INPUT_LEN == 0 && broadcast_len == 0) return;
if (table_id == 0) assert(INPUT_LEN > 0);
int tile_items = 128*4;
int** d_gpuCache = NULL;
if (latemat != 0) {
d_gpuCache = (int**) cm->customCudaMalloc<int*>(NUM_GPU, gpu);
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(d_gpuCache, cm->gpuCache, NUM_GPU * sizeof(int*), cudaMemcpyHostToDevice, stream));
CubDebugExit(cudaSetDevice(0));
}
if (kernel_type == JustFilter) {
assert(d_total != NULL);
assert(d_total[gpu] != NULL);
if (has_shuffled) {
assert(0);
} else {
if (d_segment_group_each_gpu != NULL) {
assert(d_segment_group_each_gpu != NULL);
assert(fargs != NULL);
assert(out_off != NULL);
CubDebugExit(cudaSetDevice(gpu));
// CHECK_ERROR_STREAM(stream);
filter_GPU2<128,4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *fargs, off_col_out[table_id], INPUT_LEN, d_total[gpu], 0, d_segment_group_each_gpu);
// CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
} else {
assert(0);
}
}
} else if (kernel_type == JustProbe) {
assert(d_total != NULL);
assert(d_total[gpu] != NULL);
if (has_shuffled) {
assert(pargs != NULL);
assert(out_off != NULL);
CubDebugExit(cudaSetDevice(gpu));
// CHECK_ERROR_STREAM(stream);
probe_GPU<128,4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
*pargs, *in_off, *out_off, INPUT_LEN, d_total[gpu], 0);
// CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
} else {
if (d_segment_group_each_gpu != NULL) {
assert(d_segment_group_each_gpu != NULL);
assert(pargs != NULL);
assert(out_off != NULL);
CubDebugExit(cudaSetDevice(gpu));
// CHECK_ERROR_STREAM(stream);
if (fargs->filter_idx1 != NULL) {
filter_probe_GPU2<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *fargs, *pargs, *out_off, INPUT_LEN, d_total[gpu], 0, d_segment_group_each_gpu);
} else {
probe_GPU2<128,4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *pargs, *out_off, INPUT_LEN, d_total[gpu], 0, d_segment_group_each_gpu);
}
// CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
} else {
assert(0);
CubDebugExit(cudaSetDevice(gpu));
assert(pargs != NULL);
assert(out_off != NULL);
assert(in_off != NULL);
// CHECK_ERROR_STREAM(stream);
probe_GPU3<128,4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *in_off, *pargs, *out_off, INPUT_LEN, d_total[gpu]);
// CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
}
}
} else if (kernel_type == ProbeAggr) {
assert(qparams->d_res[gpu] != NULL);
if (has_shuffled) {
assert(pargs != NULL);
assert(gargs != NULL);
CubDebugExit(cudaSetDevice(gpu));
// CHECK_ERROR_STREAM(stream);
if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qparams->groupGPUcheck && aggrGPUcheck)) {
probe_aggr_GPU<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
*pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0);
} else {
probe_aggr_GPU_lm<128, 4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
d_gpuCache, *pargs, *gargs, *sargs, INPUT_LEN, qparams->d_res[gpu], gpu);
}
// probe_group_by_GPU<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
// *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0);
// int* result = (int*) cm->customCudaHostAlloc<int>(qparams->total_val * 6);
// CubDebugExit(cudaMemcpy(result, qparams->d_res[gpu], qparams->total_val * 6 * sizeof(int), cudaMemcpyDeviceToHost));
// CubDebugExit(cudaStreamSynchronize(stream));
// cout << "Result: " << endl;
// cout << result[0] << " " << result[1] << " " << result[2] << " " << result[3] << " " << reinterpret_cast<unsigned long long*>(&result[4])[0] << endl;
CubDebugExit(cudaSetDevice(0));
} else {
if (d_segment_group_each_gpu != NULL) {
assert(d_segment_group_each_gpu != NULL);
assert(pargs != NULL);
assert(gargs != NULL);
CubDebugExit(cudaSetDevice(gpu));
// CHECK_ERROR_STREAM(stream);
// WARNING: HAVE TO HANDLE A CASE WHERE THERE IS FILTER
if (fargs->filter_idx1 != NULL) {
filter_probe_aggr_GPU2<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *fargs, *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0, d_segment_group_each_gpu);
} else {
probe_aggr_GPU2<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0, d_segment_group_each_gpu);
}
// CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
} else {
assert(0);
assert(pargs != NULL);
assert(gargs != NULL);
assert(in_off != NULL);
CubDebugExit(cudaSetDevice(gpu));
// CHECK_ERROR_STREAM(stream);
probe_aggr_GPU3<128, 4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *in_off, *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu]);
// CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
}
}
} else if (kernel_type == ProbePartition) {
if (has_shuffled) {
// SETUP_TIMING();
// float time;
// cudaEventRecord(start, 0);
CubDebugExit(cudaSetDevice(gpu));
if (latemat == 2) {
if (NUM_GPU == 8 && qparams->groupGPUcheck && aggrGPUcheck) {
probe_partition_GPU2_simple_groupby_8GPUs<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *pargs, *gargs, *sargs, *sout, INPUT_LEN, 1, 1, latemat);
} else {
probe_partition_GPU2_simple<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *pargs, *sargs, *sout, INPUT_LEN, 1, 1, latemat);
}
} else {
probe_partition_GPU2<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *pargs, *sargs, *sout, INPUT_LEN, 1, 1, latemat);
}
// CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
// CHECK_ERROR_STREAM(stream);
// cudaEventRecord(stop, 0);
// cudaEventSynchronize(stop);
// cudaEventElapsedTime(&time, start, stop);
// cout << "probe partition kernel time " << time << endl;
// assert(0);
} else {
if (d_segment_group_each_gpu != NULL) {
assert(d_segment_group_each_gpu != NULL);
CubDebugExit(cudaSetDevice(gpu));
if (latemat == 2) {
if (NUM_GPU == 8 && qparams->groupGPUcheck && aggrGPUcheck) {
probe_partition_GPU_simple_groupby_8GPUs<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *fargs, *pargs, *gargs, *sargs, *sout, INPUT_LEN, gpu, 0, d_segment_group_each_gpu, 1, 1, latemat);
// CHECK_ERROR_STREAM(stream);
} else {
probe_partition_GPU_simple<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *fargs, *pargs, *sargs, *sout, INPUT_LEN, gpu, 0, d_segment_group_each_gpu, 1, 1, latemat);
}
} else {
probe_partition_GPU<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *fargs, *pargs, *sargs, *sout, INPUT_LEN, gpu, 0, d_segment_group_each_gpu, 1, 1, latemat);
}
CubDebugExit(cudaSetDevice(0));
} else {
assert(0);
}
}
} else if (kernel_type == ProbeGroupby) {
assert(qparams->d_res[gpu] != NULL);
if (has_shuffled) {
assert(pargs != NULL);
assert(gargs != NULL);
CubDebugExit(cudaSetDevice(gpu));
if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qparams->groupGPUcheck && aggrGPUcheck)) {
probe_group_by_GPU<128, 4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
*pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0);
} else {
probe_group_by_GPU_lm<128, 4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
d_gpuCache, *pargs, *gargs, *sargs, INPUT_LEN, qparams->d_res[gpu], gpu);
}
// CHECK_ERROR_STREAM(stream);
// assert(0);
CubDebugExit(cudaSetDevice(0));
} else {
if (d_segment_group_each_gpu != NULL) {
assert(d_segment_group_each_gpu != NULL);
assert(pargs != NULL);
assert(gargs != NULL);
CubDebugExit(cudaSetDevice(gpu));
probe_group_by_GPU2<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0, d_segment_group_each_gpu);
CubDebugExit(cudaSetDevice(0));
} else {
assert(0);
assert(pargs != NULL);
assert(gargs != NULL);
assert(in_off != NULL);
CubDebugExit(cudaSetDevice(gpu));
probe_group_by_GPU3<128, 4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *in_off, *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu]);
CubDebugExit(cudaSetDevice(0));
}
}
} else if (kernel_type == JustBuild) {
assert(qparams->ht_GPU[gpu][key_column] != NULL);
//SET THIS TO VARIES JOIN SELECTIVITY
float selectivity = 1.0;
if (has_shuffled) {
// assert(0); //not implemented yet
assert(bargs != NULL);
assert(key_off_col != NULL);
// cout << INPUT_LEN << endl;
// int* test2 = new int[2 * qparams->dim_len[key_column]];
// CubDebugExit(cudaMemcpyAsync(test, qparams->ht_GPU[gpu][key_column], 2 * qparams->dim_len[key_column] * sizeof(int), cudaMemcpyDeviceToHost, stream));
// CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(gpu));
build_GPU5<128,4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
key_off_col, *bargs, INPUT_LEN, qparams->ht_GPU[gpu][key_column], selectivity);
CubDebugExit(cudaSetDevice(0));
// int* test = new int[2 * qparams->dim_len[key_column]];
// CubDebugExit(cudaMemcpyAsync(test, qparams->ht_GPU[gpu][key_column], 2 * qparams->dim_len[key_column] * sizeof(int), cudaMemcpyDeviceToHost, stream));
// CHECK_ERROR_STREAM(stream);
// long long sum = 0;
// for (int i = 0; i < 2 * qparams->dim_len[key_column]; i++) {
// if (test[i] > 0) {
// // cout << test[i] << endl;
// sum++;
// }
// }
// cout << " sum " << sum << endl;
} else {
if (d_segment_group_each_gpu != NULL) {
assert(bargs != NULL);
assert(fargs != NULL);
// SETUP_TIMING();
// float time;
// cudaEventRecord(start, 0);
// int* a = cm->customCudaMalloc<int>(1, gpu);
// int* temp = cm->customCudaMalloc<int>(104857600, gpu);
CubDebugExit(cudaSetDevice(gpu));
// CHECK_ERROR_STREAM(stream);
// cout << "oh no " << INPUT_LEN << endl;
build_GPU2<128,4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], *fargs, *bargs, INPUT_LEN, qparams->ht_GPU[gpu][key_column], 0, d_segment_group_each_gpu, selectivity);
if (broadcast) {
// cout << gpu << " " << broadcast_len << endl;
build_GPU<128,4><<<(broadcast_len + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], toBroadcast, *fargs, *bargs, broadcast_len, qparams->ht_GPU[gpu][key_column], 0, selectivity);
}
// CHECK_ERROR_STREAM(stream);
// int* a = new int[4];
// int* b = new int[4096];
// cout << broadcast_len << endl;
// cout << bargs->num_slots << endl;
// cout << gpu << " " << INPUT_LEN << endl;
// if (key_column->column_id == 18) {
// int* test = new int[2 * qparams->dim_len[key_column]];
// CubDebugExit(cudaMemcpyAsync(test, qparams->ht_GPU[gpu][key_column], 2 * qparams->dim_len[key_column] * sizeof(int), cudaMemcpyDeviceToHost, stream));
// // // CubDebugExit(cudaMemcpyAsync(a, toBroadcast, 4 * sizeof(int), cudaMemcpyDeviceToHost, stream));
// // // CubDebugExit(cudaMemcpyAsync(b, bargs->key_col, 4096 * sizeof(int), cudaMemcpyDeviceToHost, stream));
// CHECK_ERROR_STREAM(stream);
// long long sum = 0;
// // for (int i = 0; i < 2 * qparams->dim_len[key_column]; i++) {
// for (int i = 0; i < 2 * qparams->dim_len[key_column]; i+=2) {
// // cout << test[i] << endl;
// if (test[i] > 0) {
// // cout << test[i] << endl;
// sum += test[i+1];
// // sum++;
// }
// }
// cout << "build sum " << sum << endl;
// }
// for (int i = 0; i < 1000; i++) {
// if (test[i] > 0) {
// cout << i << " " << test[i] << endl;
// }
// }
// for (int i = qparams->dim_len[key_column]; i < qparams->dim_len[key_column] + 1000; i++) {
// if (test[i] > 0) {
// cout << i << " " << test[i] << endl;
// }
// }
// cout << sum << endl;
// cout << endl;
// cout << endl;
// cout << endl;
// for (int i = 0; i < 4096; i++) {
// cout << b[i] << endl;
// }
// cout << endl;
// cout << endl;
// cout << endl;
// assert(0);
// test_kernel<128,4><<<(104857600 + 128 - 1)/128, 128, 0, stream>>>(temp, a, 104857600);
// CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
// cudaEventRecord(stop, 0);
// cudaEventSynchronize(stop);
// cudaEventElapsedTime(&time, start, stop);
// launching build kernel time " << time << endl;
} else {
assert(key_off_col != NULL);
assert(bargs != NULL);
assert(fargs != NULL);
CubDebugExit(cudaSetDevice(gpu));
// CHECK_ERROR_STREAM(stream);
// SETUP_TIMING();
// float time;
// cudaEventRecord(start, 0);
if (broadcast) {
build_GPU4<128,4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], key_off_col, *fargs, *bargs, INPUT_LEN, qparams->ht_GPU[gpu][key_column], selectivity);
} else {
build_GPU3<128,4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
cm->gpuCache[gpu], key_off_col, *fargs, *bargs, INPUT_LEN, qparams->ht_GPU[gpu][key_column], selectivity);
}
// CHECK_ERROR_STREAM(stream);
// if (key_column->column_id == 18) {
// int* test = new int[2 * qparams->dim_len[key_column]];
// CubDebugExit(cudaMemcpyAsync(test, qparams->ht_GPU[gpu][key_column], 2 * qparams->dim_len[key_column] * sizeof(int), cudaMemcpyDeviceToHost, stream));
// // // CubDebugExit(cudaMemcpyAsync(a, toBroadcast, 4 * sizeof(int), cudaMemcpyDeviceToHost, stream));
// // // CubDebugExit(cudaMemcpyAsync(b, bargs->key_col, 4096 * sizeof(int), cudaMemcpyDeviceToHost, stream));
// CHECK_ERROR_STREAM(stream);
// long long sum = 0;
// // for (int i = 0; i < 2 * qparams->dim_len[key_column]; i++) {
// for (int i = 0; i < 2 * qparams->dim_len[key_column]; i+=2) {
// // cout << test[i] << endl;
// if (test[i] > 0) {
// // cout << test[i] << endl;
// sum += test[i+1];
// // sum++;
// }
// }
// cout << "build sum " << sum << endl;
// }
// cudaEventRecord(stop, 0);
// cudaEventSynchronize(stop);
// cudaEventElapsedTime(&time, start, stop);
// cout << "launching build kernel time " << time << endl;
// assert(0);
CubDebugExit(cudaSetDevice(0));
}
}
} else {
assert(0);
}
}
void
KernelLaunch::clearKernel(int*** &off_col, int will_shuffle, bool has_shuffled) {
if (kernel_type == JustProbe || kernel_type == JustFilter) {
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(h_total[gpu], d_total[gpu], sizeof(int), cudaMemcpyDeviceToHost, stream));
CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
// cout << *(h_total[gpu]) << " " << output_estimate << endl;
assert(*(h_total[gpu]) <= output_estimate);
assert(off_col != NULL);
assert(off_col[gpu] != NULL);
off_col[gpu] = off_col_out;
for (int table = 0; table < NUM_TABLE; table++) {
off_col[gpu][table] = off_col_out[table];
}
return;
} else {
CubDebugExit(cudaSetDevice(gpu));
CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
}
if (kernel_type == JustBuild) return;
assert(off_col_out != NULL);
//probe partition
if (will_shuffle) {
CubDebugExit(cudaSetDevice(gpu));
CubDebugExit(cudaMemcpyAsync(shelper->result_count[gpu], sout->pos, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
CHECK_ERROR_STREAM(stream);
CubDebugExit(cudaSetDevice(0));
}
}
// //TODO: NOT YET SUPPORTED IF THE OUTPUT IS GOING TO CPU
// void
// KernelLaunch::prepareKernelPipelined(int*** &off_col, int*** used_col_idx, short*** segment_group_each_gpu_count,
// short*** segment_group_each_gpu, int** last_segment_gpu, bool* joinGPUcheck, int* fkey_col_id, int* group_col_id,
// bool has_shuffled, int latemat) {
// assert(off_col != NULL);
// assert(off_col[gpu] != NULL);
// assert(sargs != NULL);
// assert(shelper != NULL);
// assert(NUM_PARTITION == NUM_GPU);
// if (has_shuffled) {
// //calculate total input partition
// int sum = 0;
// for (int partition = 0; partition < NUM_GPU; partition++) {
// // cout << "partition " << partition << " gpu " << gpu << " result count " << shelper->result_count[partition][gpu] << endl;
// sum += shelper->result_count[partition][gpu];
// }
// INPUT_LEN = sum;
// assert(INPUT_LEN > 0);
// output_estimate = sum * output_selectivity * MULTIPLIER;
// if (kernel_type == ShuffleProbe) {
// off_col_out = new int*[NUM_TABLE];
// for (int table = 0; table < NUM_TABLE; table++) {
// if (table == 0 || joinGPUcheck[table]) {
// off_col_out[table] = (int*) cm->customCudaMalloc<int>(output_estimate, gpu);
// }
// }
// int** d_join_result_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(d_join_result_off, off_col_out, NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream));
// CubDebugExit(cudaSetDevice(0));
// *out_off = {NULL, NULL, NULL, NULL, NULL, d_join_result_off};
// }
// } else {
// if (off_col[gpu][0] == NULL) { //initialize out off and in off for this gpu
// assert(segment_group_each_gpu_count != NULL);
// assert(segment_group_each_gpu_count[gpu] != NULL);
// assert(segment_group_each_gpu_count[gpu][table_id] != NULL);
// assert(segment_group_each_gpu_count[gpu][table_id][sg] > 0);
// assert(segment_group_each_gpu != NULL);
// assert(segment_group_each_gpu[gpu] != NULL);
// assert(segment_group_each_gpu[gpu][table_id] != NULL);
// output_estimate = SEGMENT_SIZE * segment_group_each_gpu_count[gpu][table_id][sg] * output_selectivity * MULTIPLIER;
// if (last_segment_gpu[gpu][table_id] == sg) {
// INPUT_LEN = (segment_group_each_gpu_count[gpu][table_id][sg] - 1) * SEGMENT_SIZE + cm->allTable[table_id]->LEN % SEGMENT_SIZE;
// } else {
// INPUT_LEN = segment_group_each_gpu_count[gpu][table_id][sg] * SEGMENT_SIZE;
// }
// d_segment_group_each_gpu = (short*) cm->customCudaMalloc<short>(cm->allTable[table_id]->total_segment, gpu);
// CubDebugExit(cudaSetDevice(gpu));
// short* segment_group_each_gpu_ptr = segment_group_each_gpu[gpu][table_id] + (sg * cm->allTable[table_id]->total_segment);
// CubDebugExit(cudaMemcpyAsync(d_segment_group_each_gpu, segment_group_each_gpu_ptr, segment_group_each_gpu_count[gpu][table_id][sg] * sizeof(short), cudaMemcpyHostToDevice, stream));
// CubDebugExit(cudaSetDevice(0));
// // cpu_to_gpu[sg] += (qo->segment_group_each_gpu_count[0][sg] * sizeof(short));
// //first shuffle but got inputs from the CPU, this is not yet supported
// } else {
// assert(0);
// }
// }
// //if not first shuffle, no need to allocate (this might not work for the case when the second partitioning returned a larger out_shuffle_col than the first partitioning)
// if (kernel_type == ShuffleProbePartition) {
// for (int col = 0; col < NUM_COLUMN; col++) {
// if (used_col_idx[gpu][col] != NULL) {
// if (shelper->out_shuffle_col[gpu][col] == NULL && shelper->in_shuffle_col[gpu][col] == NULL) {
// shelper->out_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
// shelper->in_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
// // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
// // for the pipelined version, we will allocate separately for each partition
// for (int partition = 0; partition < NUM_PARTITION; partition++) {
// shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
// shelper->in_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
// }
// assert(shelper->out_shuffle_col[gpu][col] != NULL);
// assert(shelper->in_shuffle_col[gpu][col] != NULL);
// }
// }
// }
// //if there is global offset
// for (int table = 0; table < NUM_TABLE; table++) {
// if (table == 0 || (table > 0 && fkey_col_id[table-1] > 0 && group_col_id[table-1] > 0)) {
// if (shelper->out_shuffle_off[gpu][table] == NULL && shelper->in_shuffle_off[gpu][table] == NULL) {
// shelper->out_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
// shelper->in_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
// // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
// for (int partition = 0; partition < NUM_PARTITION; partition++) {
// shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
// shelper->in_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
// }
// assert(shelper->out_shuffle_off[gpu][table] != NULL);
// assert(shelper->in_shuffle_off[gpu][table] != NULL);
// }
// }
// }
// }
// int*** helper_col = new int**[NUM_COLUMN]();
// for (int col = 0; col < NUM_COLUMN; col++) {
// if (shelper->out_shuffle_col[gpu][col] != NULL) {
// assert(shelper->out_shuffle_col[gpu][col] != NULL);
// helper_col[col] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(helper_col[col], shelper->out_shuffle_col[gpu][col], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
// CubDebugExit(cudaSetDevice(0));
// }
// }
// int*** d_out_col = (int***) cm->customCudaMalloc<int**>(NUM_COLUMN, gpu);
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(d_out_col, helper_col, NUM_COLUMN * sizeof(int**), cudaMemcpyHostToDevice, stream));
// CubDebugExit(cudaSetDevice(0));
// int*** helper_off = new int**[NUM_TABLE]();
// for (int table = 0; table < NUM_TABLE; table++) {
// if (shelper->out_shuffle_off[gpu][table] != NULL) {
// assert(shelper->out_shuffle_off[gpu][table] != NULL);
// helper_off[table] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(helper_off[table], shelper->out_shuffle_off[gpu][table], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
// CubDebugExit(cudaSetDevice(0));
// }
// }
// int*** d_out_off = (int***) cm->customCudaMalloc<int**>(NUM_TABLE, gpu);
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(d_out_off, helper_off, NUM_TABLE * sizeof(int**), cudaMemcpyHostToDevice, stream));
// CubDebugExit(cudaSetDevice(0));
// int* d_pos = (int*) cm->customCudaMalloc<int>(NUM_PARTITION, gpu);
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemsetAsync(d_pos, 0, NUM_PARTITION * sizeof(int), stream));
// CubDebugExit(cudaSetDevice(0));
// // for (int table = 0; table < NUM_TABLE; table++) {
// // if (shelper->in_shuffle_off[gpu][table] != NULL) {
// // for (int partition = 0; partition < NUM_PARTITION; partition++) {
// // if (shelper->in_shuffle_off[gpu][table][partition] != NULL)
// // cout << " in im here for table " << table << " partition " << partition << endl;
// // }
// // }
// // if (shelper->out_shuffle_off[gpu][table] != NULL) {
// // for (int partition = 0; partition < NUM_PARTITION; partition++) {
// // if (shelper->out_shuffle_off[gpu][table][partition] != NULL)
// // cout << " out im here for table " << table << " partition " << partition << endl;
// // }
// // }
// // }
// *sout = {
// d_pos, d_out_col, d_out_off
// };
// }
// void
// KernelLaunch::launchKernelPipelined(int latemat, int first_join_in_pipeline) {
// assert(INPUT_LEN > 0);
// int tile_items = 128*4;
// assert(sargs != NULL);
// assert(sout != NULL);
// assert(sout->pos != NULL);
// assert(sout->column != NULL);
// int num_tiles = 0;
// int* d_partition_count = NULL;
// int** d_gpuCache = NULL;
// if (d_segment_group_each_gpu == NULL) {
// d_partition_count = (int*) cm->customCudaMalloc<int>(NUM_GPU, gpu);
// int* partition_count = (int*) cm->customCudaHostAlloc<int>(NUM_GPU);
// int* block_scan_partition = new int[NUM_GPU]();
// for (int partition = 0; partition < NUM_GPU; partition++) {
// num_tiles += (shelper->result_count[partition][gpu] + tile_items - 1)/tile_items;
// partition_count[partition] = shelper->result_count[partition][gpu];
// }
// for (int partition = 0; partition < NUM_GPU; partition++) {
// if (partition == 0) {
// block_scan_partition[partition] = (partition_count[partition] + tile_items - 1) / tile_items;
// } else {
// block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_items - 1) / tile_items;
// }
// }
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(d_partition_count, partition_count, NUM_GPU * sizeof(int), cudaMemcpyHostToDevice, stream));
// CubDebugExit(cudaSetDevice(0));
// if (latemat == 2) {
// d_gpuCache = (int**) cm->customCudaMalloc<int*>(NUM_GPU, gpu);
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(d_gpuCache, cm->gpuCache, NUM_GPU * sizeof(int*), cudaMemcpyHostToDevice, stream));
// CubDebugExit(cudaSetDevice(0));
// }
// } else {
// num_tiles = (INPUT_LEN+ tile_items - 1)/tile_items;
// }
// assert(num_tiles > 0);
// if (kernel_type == ShuffleProbe) {
// assert(d_total != NULL);
// assert(d_total[gpu] != NULL);
// if (d_segment_group_each_gpu == NULL) {
// assert(sargs->column_part != NULL);
// assert(d_partition_count != NULL);
// CubDebugExit(cudaSetDevice(gpu));
// if (latemat == 0) {
// assert(0);
// } else if (latemat == 1) {
// assert(0);
// } else if (latemat == 2) { //global
// shuffle_probe_global_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
// d_gpuCache, d_partition_count,
// *pargs, *sargs, *out_off, d_total[gpu], gpu, first_join_in_pipeline, 1, 1);
// }
// CubDebugExit(cudaSetDevice(0));
// } else {
// assert(0);
// }
// // CHECK_ERROR_STREAM(stream);
// // assert(0);
// } else if (kernel_type == ShuffleProbePartition) {
// if (d_segment_group_each_gpu == NULL) {
// assert(sargs->column_part != NULL);
// assert(d_partition_count != NULL);
// CubDebugExit(cudaSetDevice(gpu));
// if (latemat == 0) {
// assert(0);
// // shuffle_probe_partition_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
// // cm->gpuCache[gpu], d_partition_count, *pargs, *sargs, *sout, gpu, 1, 1, latemat);
// } else if (latemat == 1) {
// assert(0);
// // shuffle_probe_partition_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
// // cm->gpuCache[gpu], d_partition_count, *pargs, *sargs, *sout, gpu, 1, 1, latemat);
// } else if (latemat == 2) { //global
// // CHECK_ERROR_STREAM(stream);
// // test_out_off<128,4><<<1, 128, 0, stream>>>(sout->out_off, 128);
// // int* tmp = new int[100];
// // CubDebugExit(cudaSetDevice(0));
// // CubDebugExit(cudaMemcpyAsync(tmp, shelper->in_shuffle_off[0][0][0], 100 * sizeof(int), cudaMemcpyDeviceToHost, stream));
// // CubDebugExit(cudaSetDevice(0));
// // for (int i = 0; i < 100; i++) {
// // printf("tmp %d\n", tmp[i]);
// // }
// // CubDebugExit(cudaSetDevice(1));
// // CubDebugExit(cudaMemcpyAsync(tmp, shelper->in_shuffle_off[1][0][0], 100 * sizeof(int), cudaMemcpyDeviceToHost, stream));
// // CubDebugExit(cudaSetDevice(0));
// // for (int i = 0; i < 100; i++) {
// // printf("tmp %d\n", tmp[i]);
// // }
// // CHECK_ERROR_STREAM(stream);
// // test_out_off<128,4><<<num_tiles, 128, 0, stream>>>(*sout, 128);
// // cout << " look at me gpu " << gpu << " " << num_tiles << endl;
// // CHECK_ERROR_STREAM(stream);
// shuffle_probe_partition_global_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(d_gpuCache, d_partition_count,
// *pargs, *sargs, *sout, gpu, first_join_in_pipeline, 1, 1);
// // CHECK_ERROR_STREAM(stream);
// // assert(0);
// }
// CubDebugExit(cudaSetDevice(0));
// } else {
// assert(d_segment_group_each_gpu != NULL);
// assert(sargs->col_idx != NULL);
// CubDebugExit(cudaSetDevice(gpu));
// probe_partition_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
// cm->gpuCache[gpu], *fargs, *pargs, *sargs, *sout, INPUT_LEN, gpu, 0, d_segment_group_each_gpu, 0, 1);
// CubDebugExit(cudaSetDevice(0));
// }
// } else if (kernel_type == ShuffleProbeAggr) {
// if (d_segment_group_each_gpu == NULL) {
// assert(sargs->column_part != NULL);
// assert(d_partition_count != NULL);
// CubDebugExit(cudaSetDevice(gpu));
// if (latemat == 0) {
// assert(0);
// // shuffle_probe_aggr_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
// // d_partition_count, *pargs, *gargs, *sargs, qparams->d_res[gpu], gpu, latemat);
// } else if (latemat == 1) {
// assert(0);
// // shuffle_probe_aggr_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
// // d_partition_count, *pargs, *gargs, *sargs, qparams->d_res[gpu], gpu, latemat);
// } else if (latemat == 2) { //global
// shuffle_probe_aggr_global_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
// d_gpuCache, d_partition_count,
// *pargs, *gargs, *sargs, qparams->d_res[gpu], gpu, first_join_in_pipeline, 1, 0);
// }
// CubDebugExit(cudaSetDevice(0));
// } else {
// assert(0);
// }
// } else if (kernel_type == ShuffleProbeGroupby) {
// if (d_segment_group_each_gpu == NULL) {
// assert(sargs->column_part != NULL);
// assert(d_partition_count != NULL);
// CubDebugExit(cudaSetDevice(gpu));
// if (latemat == 0) {
// assert(0);
// } else if (latemat == 1) {
// assert(0);
// } else if (latemat == 2) { //global
// shuffle_probe_group_by_global_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
// d_gpuCache, d_partition_count,
// *pargs, *gargs, *sargs, qparams->d_res[gpu], gpu, first_join_in_pipeline, 1, 0);
// }
// CubDebugExit(cudaSetDevice(0));
// } else {
// assert(0);
// }
// } else {
// assert(0);
// }
// }
// void
// KernelLaunch::clearPipelined(int*** &off_col) {
// if (kernel_type == ShuffleProbe) {
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(h_total[gpu], d_total[gpu], sizeof(int), cudaMemcpyDeviceToHost, stream));
// CHECK_ERROR_STREAM(stream);
// CubDebugExit(cudaSetDevice(0));
// // cout << *(h_total[gpu]) << " " << output_estimate << endl;
// assert(*(h_total[gpu]) <= output_estimate);
// assert(off_col != NULL);
// assert(off_col[gpu] != NULL);
// off_col[gpu] = off_col_out;
// for (int table = 0; table < NUM_TABLE; table++) {
// off_col[gpu][table] = off_col_out[table];
// }
// return;
// }
// // int* test = new int[NUM_PARTITION];
// // CubDebugExit(cudaSetDevice(gpu));
// // CubDebugExit(cudaMemcpyAsync(test, sout->pos, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
// // CHECK_ERROR_STREAM(stream);
// // CubDebugExit(cudaSetDevice(0));
// CubDebugExit(cudaSetDevice(gpu));
// CubDebugExit(cudaMemcpyAsync(shelper->result_count[gpu], sout->pos, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
// CHECK_ERROR_STREAM(stream);
// CubDebugExit(cudaSetDevice(0));
// int* temp;
// for (int col = 0; col < NUM_COLUMN; col++) {
// if (shelper->out_shuffle_col[gpu][col] != NULL) {
// assert(shelper->in_shuffle_col[gpu][col] != NULL);
// for (int partition = 0; partition < NUM_PARTITION; partition++) {
// temp = shelper->out_shuffle_col[gpu][col][partition];
// shelper->out_shuffle_col[gpu][col][partition] = shelper->in_shuffle_col[gpu][col][partition];
// shelper->in_shuffle_col[gpu][col][partition] = temp;
// }
// }
// }
// for (int table = 0; table < NUM_TABLE; table++) {
// if (shelper->out_shuffle_off[gpu][table] != NULL) {
// assert(shelper->in_shuffle_off[gpu][table] != NULL);
// for (int partition = 0; partition < NUM_PARTITION; partition++) {
// // cout << " im here for table " << table << " partition " << partition << endl;
// temp = shelper->out_shuffle_off[gpu][table][partition];
// shelper->out_shuffle_off[gpu][table][partition] = shelper->in_shuffle_off[gpu][table][partition];
// shelper->in_shuffle_off[gpu][table][partition] = temp;
// }
// }
// }
// // int* tmp = new int[100];
// // CubDebugExit(cudaSetDevice(0));
// // CubDebugExit(cudaMemcpyAsync(tmp, shelper->out_shuffle_off[0][0][0], 100 * sizeof(int), cudaMemcpyDeviceToHost, stream));
// // CubDebugExit(cudaSetDevice(0));
// // for (int i = 0; i < 100; i++) {
// // printf("tmp %d\n", tmp[i]);
// // }
// // CubDebugExit(cudaSetDevice(1));
// // CubDebugExit(cudaMemcpyAsync(tmp, shelper->out_shuffle_off[1][0][0], 100 * sizeof(int), cudaMemcpyDeviceToHost, stream));
// // CubDebugExit(cudaSetDevice(0));
// // for (int i = 0; i < 100; i++) {
// // printf("tmp %d\n", tmp[i]);
// // }
// }