#pragma once #include #include #include #include "crystal/crystal.cuh" #include "BlockLibrary.cuh" #include "ShuffleGPU.h" #include "SimplePartitioning.h" #include "KernelArgs.h" #include "gpu_utils.h" using namespace cub; template __global__ void RangePartitioningCount( int* gpuCache, struct filterArgsGPU fargs, struct shuffleArgsGPU sargs, int* count, int num_tuples, int gpu, short* segment_group = NULL) { int item_key[ITEMS_PER_THREAD]; int item_val[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment int* ptr; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int smem_index[NUM_RANGE]; __shared__ int64_t key_segment; __shared__ int64_t filter_segment1; __shared__ int64_t filter_segment2; __shared__ int64_t segment_index; __shared__ int is_replicated; //SEGMENT INDEX IS THE SEGMENT INDEX OF THE ACTUAL TABLE (GLOBAL SEGMENT INDEX) if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( tile_offset ) / SEGMENT_SIZE; // previously start_offset + tile_offset if (fargs.filter_idx1 != NULL) { filter_segment1 = fargs.filter_idx1[segment_index]; cudaAssert(filter_segment1 != -1); } if (fargs.filter_idx2 != NULL) { filter_segment2 = fargs.filter_idx2[segment_index]; cudaAssert(filter_segment2 != -1); } if (sargs.col_idx[sargs.key_column] != NULL) { key_segment = sargs.col_idx[sargs.key_column][segment_index]; cudaAssert(key_segment != -1); } is_replicated = sargs.seg_is_replicated[segment_index]; } __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); InitFlags(selection_flags); //this range_size would not work for date column int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) if (fargs.filter_idx1 != NULL) { ptr = gpuCache + filter_segment1 * SEGMENT_SIZE; BlockLoadCrystal(ptr + segment_tile_offset, item_val, num_tile_items); // BlockPredGTE(item_val, fargs.compare1, selection_flags, num_tile_items); // BlockPredAndLTE(item_val, fargs.compare2, selection_flags, num_tile_items); if (fargs.mode1 == 0) { //equal to BlockPredEQ(item_val, fargs.compare1, selection_flags, num_tile_items); } else if (fargs.mode1 == 1) { //between BlockPredGTE(item_val, fargs.compare1, selection_flags, num_tile_items); BlockPredAndLTE(item_val, fargs.compare2, selection_flags, num_tile_items); } else if (fargs.mode1 == 2) { //equal or equal BlockPredEQ(item_val, fargs.compare1, selection_flags, num_tile_items); BlockPredOrEQ(item_val, fargs.compare2, selection_flags, num_tile_items); } } if (fargs.filter_idx2 != NULL) { ptr = gpuCache + filter_segment2 * SEGMENT_SIZE; BlockLoadCrystal(ptr + segment_tile_offset, item_val, num_tile_items); // BlockPredAndGTE(item_val, fargs.compare3, selection_flags, num_tile_items); // BlockPredAndLTE(item_val, fargs.compare4, selection_flags, num_tile_items); if (fargs.mode2 == 0) { //equal to BlockPredAndEQ(item_val, fargs.compare3, selection_flags, num_tile_items); } else if (fargs.mode2 == 1) { //between BlockPredAndGTE(item_val, fargs.compare3, selection_flags, num_tile_items); BlockPredAndLTE(item_val, fargs.compare4, selection_flags, num_tile_items); } else if (fargs.mode2 == 2) { //equal or equal BlockPredAndEQ(item_val, fargs.compare3, selection_flags, num_tile_items); BlockPredOrEQ(item_val, fargs.compare4, selection_flags, num_tile_items); } } ptr = gpuCache + key_segment * SEGMENT_SIZE; BlockLoadCrystal(ptr + segment_tile_offset, item_key, num_tile_items); BlockCountPartition(item_key, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu); __syncthreads(); if (threadIdx.x < NUM_RANGE) { atomicAdd(&(count[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); } //WARNING: ONLY WORKS FOR DIM TABLE template __global__ void RangePartitioningCount2( int* gpuCache, struct shuffleArgsGPU sargs, int* count, int num_tuples, int gpu) { int item_off[ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int smem_index[NUM_RANGE]; __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); InitFlags(selection_flags); cudaAssert(sargs.column[sargs.key_column] != NULL); //this range_size would not work for date column int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) cudaAssert(sargs.col_idx[sargs.key_column] != NULL); cudaAssert(sargs.in_off[sargs.table] != NULL); int* ptr = sargs.in_off[sargs.table]; BlockLoadCrystal(ptr + tile_offset, item_off, num_tile_items); BlockReadOffsetGPU2(threadIdx.x, item_off, item_key, selection_flags, gpuCache, sargs.col_idx[sargs.key_column], num_tile_items); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int segment_idx = item_off[ITEM]/SEGMENT_SIZE; // int is_replicated = sargs.seg_is_replicated[segment_idx]; int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition cudaAssert(partition <= NUM_RANGE); if (partition >= NUM_RANGE) partition = NUM_RANGE-1; // if (is_replicated) { // if (partition == gpu) atomicAdd(&(smem_index[partition]), 1); // } else atomicAdd(&(smem_index[partition]), 1); } } } __syncthreads(); if (threadIdx.x < NUM_RANGE) { atomicAdd(&(count[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); } //range partitioning reading from segment directly template __global__ void RangePartitioningKeyValue( int* gpuCache, struct filterArgsGPU fargs, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, int gpu, int write_val = 1, int write_offset = 0, short* segment_group = NULL) { __const__ int column_map[NUM_COLUMN] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 1, 1, 1, 1, 3, 3, 3, 3, 4, 4, 4 }; int item_key[ITEMS_PER_THREAD]; int item_val[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment int* ptr; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int smem_index[NUM_RANGE]; __shared__ int64_t key_segment; __shared__ int64_t filter_segment1; __shared__ int64_t filter_segment2; __shared__ int64_t segment_index; __shared__ int is_replicated; //SEGMENT INDEX IS THE SEGMENT INDEX OF THE ACTUAL TABLE (GLOBAL SEGMENT INDEX) if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( tile_offset ) / SEGMENT_SIZE; // previously start_offset + tile_offset if (fargs.filter_idx1 != NULL) { filter_segment1 = fargs.filter_idx1[segment_index]; cudaAssert(filter_segment1 != -1); } if (fargs.filter_idx2 != NULL) { filter_segment2 = fargs.filter_idx2[segment_index]; cudaAssert(filter_segment2 != -1); } if (sargs.col_idx[sargs.key_column] != NULL) { key_segment = sargs.col_idx[sargs.key_column][segment_index]; cudaAssert(key_segment != -1); } is_replicated = sargs.seg_is_replicated[segment_index]; } __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); InitFlags(selection_flags); int start_offset = segment_index * SEGMENT_SIZE; //this range_size would not work for date table //THIS ONLY WORKS IF THE TOTAL DIM SEGMENT IS DIVISIBLE BY NUM GPU int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } if (fargs.filter_idx1 != NULL) { ptr = gpuCache + filter_segment1 * SEGMENT_SIZE; BlockLoadCrystal(ptr + segment_tile_offset, item_val, num_tile_items); // BlockPredGTE(item_val, fargs.compare1, selection_flags, num_tile_items); // BlockPredAndLTE(item_val, fargs.compare2, selection_flags, num_tile_items); if (fargs.mode1 == 0) { //equal to BlockPredEQ(item_val, fargs.compare1, selection_flags, num_tile_items); } else if (fargs.mode1 == 1) { //between BlockPredGTE(item_val, fargs.compare1, selection_flags, num_tile_items); BlockPredAndLTE(item_val, fargs.compare2, selection_flags, num_tile_items); } else if (fargs.mode1 == 2) { //equal or equal BlockPredEQ(item_val, fargs.compare1, selection_flags, num_tile_items); BlockPredOrEQ(item_val, fargs.compare2, selection_flags, num_tile_items); } } if (fargs.filter_idx2 != NULL) { ptr = gpuCache + filter_segment2 * SEGMENT_SIZE; BlockLoadCrystal(ptr + segment_tile_offset, item_val, num_tile_items); // BlockPredAndGTE(item_val, fargs.compare3, selection_flags, num_tile_items); // BlockPredAndLTE(item_val, fargs.compare4, selection_flags, num_tile_items); if (fargs.mode2 == 0) { //equal to BlockPredAndEQ(item_val, fargs.compare3, selection_flags, num_tile_items); } else if (fargs.mode2 == 1) { //between BlockPredAndGTE(item_val, fargs.compare3, selection_flags, num_tile_items); BlockPredAndLTE(item_val, fargs.compare4, selection_flags, num_tile_items); } else if (fargs.mode2 == 2) { //equal or equal BlockPredAndEQ(item_val, fargs.compare3, selection_flags, num_tile_items); BlockPredOrEQ(item_val, fargs.compare4, selection_flags, num_tile_items); } } cudaAssert(sargs.col_idx[sargs.key_column] != NULL); ptr = gpuCache + key_segment * SEGMENT_SIZE; BlockLoadAndCountIndexPartition2(ptr + segment_tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu); __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition if (partition >= NUM_RANGE) partition = NUM_RANGE-1; index[ITEM] += smem_index[partition]; } } } __syncthreads(); cudaAssert(sout.column[sargs.key_column] != NULL); BlockWritePartition2(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, is_replicated, gpu); if (write_val) { //scatter the value for (int col = 0; col < NUM_COLUMN; col++) { int table = column_map[col]; if (sargs.col_idx[col] != NULL && sargs.key_column != col && sargs.table == table) { cudaAssert(sargs.col_idx[col] != NULL); cudaAssert(sout.column[col] != NULL); int64_t val_segment = sargs.col_idx[col][segment_index]; cudaAssert(val_segment != -1); ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[col], num_tile_items, range_size, is_replicated, gpu); } } } if (write_offset) { //scatter the offset (global offset and not the local offset) come from CPU // for (int table = 0; table < NUM_TABLE; table++) { //this will not happen for now (in_off is from off_col) // if (sargs.in_off != NULL && sargs.in_off[table] != NULL) { //writing the global offset from CPU // cudaAssert(sout.out_off[table] != NULL); // cudaAssert(0); // } else if (sargs.table == table) { //writing the global offset of the current table which get partitioned cudaAssert(sout.out_off[sargs.table] != NULL); BlockWriteOffSelf(item_key, index, selection_flags, sout.out_off[sargs.table], start_offset, num_tile_items, range_size, is_replicated, gpu); // } // } } } // range partition with offset coming from CPU //WARNING: ONLY WORKS FOR DIM TABLE template __global__ void RangePartitioningKeyValue2( int* gpuCache, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, int gpu, int write_val = 1, int write_offset = 0) { __const__ int column_map[NUM_COLUMN] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 1, 1, 1, 1, 3, 3, 3, 3, 4, 4, 4 }; int item_off[NUM_TABLE][ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int item_val[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; // int is_replicated[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int smem_index[NUM_RANGE]; // //WARNING: For the case filter on CPU, partitioning join, and some of the dim table is replicated, we will just set is_replicated to 0 for simplicity // int is_replicated = 0; __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); InitFlags(selection_flags); //this range_size would not work for date table int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } cudaAssert(sargs.col_idx[sargs.key_column] != NULL); cudaAssert(sargs.in_off[sargs.table] != NULL); int* ptr = sargs.in_off[sargs.table]; BlockLoadCrystal(ptr + tile_offset, item_off[sargs.table], num_tile_items); BlockReadOffsetGPU2(threadIdx.x, item_off[sargs.table], item_key, selection_flags, gpuCache, sargs.col_idx[sargs.key_column], num_tile_items); // BlockCountIndexPartition(item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu); // #pragma unroll // for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { // int segment_idx = item_off[sargs.table][ITEM]/SEGMENT_SIZE; // is_replicated[ITEM] = sargs.seg_is_replicated[segment_idx]; // } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition cudaAssert(partition <= NUM_RANGE); if (partition >= NUM_RANGE) partition = NUM_RANGE-1; // if (is_replicated[ITEM]) { // if (partition == gpu) index[ITEM] = atomicAdd(&(smem_index[partition]), 1); // else index[ITEM] = 0; // } else index[ITEM] = atomicAdd(&(smem_index[partition]), 1); } } } __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition if (partition >= NUM_RANGE) partition = NUM_RANGE-1; index[ITEM] += smem_index[partition]; } } } __syncthreads(); cudaAssert(sout.column[sargs.key_column] != NULL); // BlockWritePartition2(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, is_replicated, gpu); // BlockWriteValPartition2(item_off[sargs.table], item_key, index, selection_flags, sout.out_off[sargs.table], num_tile_items, range_size, is_replicated, gpu); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition cudaAssert(partition <= NUM_RANGE); if (partition >= NUM_RANGE) partition = NUM_RANGE-1; cudaAssert(sout.column[sargs.key_column][partition] != NULL); // if (is_replicated[ITEM]) { // if (partition == gpu) sout.column[sargs.key_column][partition][index[ITEM]] = item_key[ITEM]; // } else sout.column[sargs.key_column][partition][index[ITEM]] = item_key[ITEM]; } } } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition cudaAssert(partition <= NUM_RANGE); if (partition >= NUM_RANGE) partition = NUM_RANGE-1; cudaAssert(sout.out_off[sargs.table][partition] != NULL); // if (is_replicated[ITEM]) { // if (partition == gpu) sout.out_off[sargs.table][partition][index[ITEM]] = item_off[sargs.table][ITEM]; // } else sout.out_off[sargs.table][partition][index[ITEM]] = item_off[sargs.table][ITEM]; } } } __syncthreads(); if (write_val) { //scatter the value for (int col = 0; col < NUM_COLUMN; col++) { int table = column_map[col]; if (sargs.col_idx[col] != NULL && sargs.key_column != col && sargs.table == table) { cudaAssert(sargs.col_idx[col] != NULL); cudaAssert(sout.column[col] != NULL); BlockReadOffsetGPU2(threadIdx.x, item_off[sargs.table], item_val, selection_flags, gpuCache, sargs.col_idx[col], num_tile_items); // BlockWriteValPartition2(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, is_replicated, gpu); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition cudaAssert(partition <= NUM_RANGE); if (partition >= NUM_RANGE) partition = NUM_RANGE-1; cudaAssert(sout.column[col][partition] != NULL); // if (is_replicated[ITEM]) { // if (partition == gpu) sout.column[col][partition][index[ITEM]] = item_val[ITEM]; // } else sout.column[col][partition][index[ITEM]] = item_val[ITEM]; } } } } } } // if (write_offset) { // //scatter the offset (global offset and not the local offset) come from CPU // for (int table = 0; table < NUM_TABLE; table++) { // //this will not happen for now (in_off is from off_col) // if (sargs.in_off != NULL && sargs.in_off[table] != NULL && sargs.table != table) { //writing the global offset from CPU // cudaAssert(sout.out_off[table] != NULL); // cudaAssert(0); // //blockload // //blockwritepartition // } // } // } } template __global__ void test_kernel(int* input, int* output, int size) { int tid = threadIdx.x + blockIdx.x * blockDim.x; int sum = 0; while (tid < size) { sum += input[tid]; tid += blockDim.x * gridDim.x; } atomicAdd(output, sum); } template __global__ void test_kernel3(int** seg_row_to_gpu) { cudaAssert(seg_row_to_gpu[0][1] >= 0 && seg_row_to_gpu[0][1] < NUM_GPU); } template __global__ void test_out_off(struct shuffleOutGPU sout, int size) { if (threadIdx.x < size) { // if (sout.out_off[3][1] == NULL) printf("i failed assertion"); // printf("%d\n", sout.out_off[3][1][0]); for (int table = 1; table < 4; table++) { for (int partition = 0; partition < 2; partition++) { // if (in_off_part[partition][table] != NULL) { // if (sout.out_off[table][partition] == NULL) printf("table %d %d\n", table, partition); cudaAssert(sout.out_off[table] != NULL); cudaAssert(sout.out_off[table][partition] != NULL); // } } } } } template __global__ void test_kernel2(int**** d_column_part) { if (threadIdx.x == 0 && blockIdx.x == 0) { for (int i = 0; i < 10; i++) printf("keys %d %d %d %d\n", d_column_part[0][1][1][i], d_column_part[1][1][1][i], d_column_part[2][1][1][i], d_column_part[3][1][1][i]); } } template __global__ void probe_partition_GPU( int* gpuCache, struct filterArgsGPU fargs, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, int gpu, int start_offset = 0, short* segment_group = NULL, int write_val = 1, int write_offset = 0, int latemat = 0) { __const__ int column_map[NUM_COLUMN] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 1, 1, 1, 1, 3, 3, 3, 3, 4, 4, 4 }; cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment int* ptr; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[NUM_TABLE][ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int item_val[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; __shared__ int smem_index[NUM_RANGE]; __shared__ int check_join[NUM_TABLE]; __shared__ int64_t key_segment1; __shared__ int64_t key_segment2; __shared__ int64_t key_segment3; __shared__ int64_t key_segment4; __shared__ int64_t filter_segment1; __shared__ int64_t filter_segment2; __shared__ int64_t shuffle_key; __shared__ int64_t segment_index; __shared__ int is_replicated; if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; if (pargs.key_idx1 != NULL) { key_segment1 = pargs.key_idx1[segment_index]; cudaAssert(key_segment1 != -1); } if (pargs.key_idx2 != NULL) { key_segment2 = pargs.key_idx2[segment_index]; cudaAssert(key_segment2 != -1); } if (pargs.key_idx3 != NULL) { key_segment3 = pargs.key_idx3[segment_index]; cudaAssert(key_segment3 != -1); } if (pargs.key_idx4 != NULL) { key_segment4 = pargs.key_idx4[segment_index]; cudaAssert(key_segment4 != -1); } if (fargs.filter_idx1 != NULL) { filter_segment1 = fargs.filter_idx1[segment_index]; cudaAssert(filter_segment1 != -1); } if (fargs.filter_idx2 != NULL) { filter_segment2 = fargs.filter_idx2[segment_index]; cudaAssert(filter_segment2 != -1); } if (sargs.col_idx[sargs.key_column] != NULL) { shuffle_key = sargs.col_idx[sargs.key_column][segment_index]; cudaAssert(shuffle_key != -1); } for (int table = 0; table < NUM_TABLE; table++) { check_join[table] = -1; } is_replicated = sargs.seg_is_replicated[segment_index]; } __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } __syncthreads(); InitFlags(selection_flags); if (fargs.filter_idx1 != NULL) { ptr = gpuCache + filter_segment1 * SEGMENT_SIZE; BlockLoadCrystal(ptr + segment_tile_offset, item_val, num_tile_items); BlockPredGTE(item_val, fargs.compare1, selection_flags, num_tile_items); BlockPredAndLTE(item_val, fargs.compare2, selection_flags, num_tile_items); } if (fargs.filter_idx2 != NULL) { ptr = gpuCache + filter_segment2 * SEGMENT_SIZE; BlockLoadCrystal(ptr + segment_tile_offset, item_val, num_tile_items); BlockPredAndGTE(item_val, fargs.compare3, selection_flags, num_tile_items); BlockPredAndLTE(item_val, fargs.compare4, selection_flags, num_tile_items); } if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) ptr = gpuCache + key_segment1 * SEGMENT_SIZE; check_join[0] = 1; BlockLoadCrystal(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGPU(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue(threadIdx.x, item_off[1], 1, num_tile_items); } if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { ptr = gpuCache + key_segment2 * SEGMENT_SIZE; check_join[1] = 1; BlockLoadCrystal(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGPU(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue(threadIdx.x, item_off[2], 1, num_tile_items); } if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { ptr = gpuCache + key_segment3 * SEGMENT_SIZE; check_join[2] = 1; BlockLoadCrystal(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGPU(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockSetValue(threadIdx.x, item_off[3], 1, num_tile_items); } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { ptr = gpuCache + key_segment4 * SEGMENT_SIZE; check_join[3] = 1; BlockLoadCrystal(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGPU(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue(threadIdx.x, item_off[4], 1, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { item_off[0][ITEM] = blockIdx.x * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; //local offset to each partition } } } //DO PARTITIONING //this range_size would not work for date column start_offset = segment_index * SEGMENT_SIZE; int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) //right now we only support probe table as the table getting partitioned cudaAssert(sargs.table == 0); //loading the partitioned key column and count the partition cudaAssert(sargs.col_idx[sargs.key_column] != NULL); ptr = gpuCache + shuffle_key * SEGMENT_SIZE; BlockLoadAndCountIndexPartition2(ptr + segment_tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu); __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition if (partition >= NUM_RANGE) partition = NUM_RANGE-1; index[ITEM] += smem_index[partition]; } } } //Barrier cudaAssert(sout.column[sargs.key_column] != NULL); BlockWritePartition2(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, is_replicated, gpu); if (write_val) { //scatter the value //todo: what if offset coming from join cudaAssert(sargs.col_idx != NULL); cudaAssert(sout.column != NULL); for (int col = 0; col < NUM_COLUMN; col++) { int table = column_map[col]; //write values from segment if (sargs.col_idx[col] != NULL && sargs.key_column != col && table == 0) { //WARNING: IF JOIN KEY HAS BEEN USED, NO NEED TO PARTITION THAT JOIN KEY // if (latemat != 2 || (latemat == 2 && col != pargs.fkey_col_id[0] && col != pargs.fkey_col_id[1] && col != pargs.fkey_col_id[2] && col != pargs.fkey_col_id[3])) { cudaAssert(sargs.col_idx[col] != NULL); cudaAssert(sout.column[col] != NULL); cudaAssert(table == 0); int64_t val_segment = sargs.col_idx[col][segment_index]; cudaAssert(val_segment != -1); ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[col], num_tile_items, range_size, is_replicated, gpu); // } //write values from dimension table with offset coming from current join (materialize it and write the values) } else if (latemat == 0 && sargs.col_idx[col] != NULL && table > 0 && check_join[table-1] != -1 && sargs.key_column != col) { cudaAssert(sargs.col_idx[col] != NULL); cudaAssert(sargs.broadcast_idx[col] != NULL); cudaAssert(sout.column[col] != NULL); BlockReadOffsetGPU3(threadIdx.x, item_off[table], item_val, selection_flags, gpuCache, sargs.col_idx[col], sargs.broadcast_idx[col], num_tile_items); BlockWriteValPartition2(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, is_replicated, gpu); } } } //TODO: NOT YET SUPPORTED, handle the case where global offset is coming from join if (write_offset) { cudaAssert(sout.out_off != NULL); //scatter the offset (global offset and not the local offset) for (int table = 0; table < NUM_TABLE; table++) { //offset coming from CPU if (sargs.in_off != NULL && sargs.in_off[table] != NULL && table > 0) { //writing the global offset from previous join cudaAssert(check_join[table-1] == -1); cudaAssert(0); //this will never happen for now //offset coming from self } else if (sargs.table == table && table == 0) { //writing the global offset of the current table which get partitioned cudaAssert(sout.out_off[table] != NULL); BlockWriteOffSelf(item_key, index, selection_flags, sout.out_off[table], start_offset, num_tile_items, range_size, is_replicated, gpu); //offset coming from current join } else if (table > 0 && check_join[table-1] != -1 && sout.out_off[table] != NULL) { cudaAssert(table != 0); // cudaAssert(sout.out_off[table] != NULL); BlockWriteValPartition2(item_off[table], item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, is_replicated, gpu); } } } } template __global__ void probe_partition_GPU2( int* gpuCache, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, int write_val = 1, int write_offset = 0, int latemat = 0) { __const__ int column_map[NUM_COLUMN] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 1, 1, 1, 1, 3, 3, 3, 3, 4, 4, 4 }; cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; // int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment int* ptr; // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int smem_index[NUM_RANGE]; // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[NUM_TABLE][ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int item_val[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } __syncthreads(); InitFlags(selection_flags); if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) ptr = pargs.key_column[0]; BlockLoadCrystal(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue(threadIdx.x, item_off[1], 1, num_tile_items); } if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) { ptr = pargs.key_column[1]; BlockLoadCrystal(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue(threadIdx.x, item_off[2], 1, num_tile_items); } if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) { ptr = pargs.key_column[2]; BlockLoadCrystal(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockSetValue(threadIdx.x, item_off[3], 1, num_tile_items); } if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) { ptr = pargs.key_column[3]; BlockLoadCrystal(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue(threadIdx.x, item_off[4], 1, num_tile_items); } //DO PARTITIONING //this range_size would not work for date column int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) //right now we only support probe table as the table getting partitioned cudaAssert(sargs.table == 0); //loading the partitioned key column and count the partition cudaAssert(sargs.column[sargs.key_column] != NULL); ptr = sargs.column[sargs.key_column]; BlockLoadAndCountIndexPartition2(ptr + tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, 0, -1); __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition if (partition >= NUM_RANGE) partition = NUM_RANGE-1; index[ITEM] += smem_index[partition]; } } } //Barrier cudaAssert(sout.column[sargs.key_column] != NULL); BlockWritePartition2(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, 0, -1); if (write_val) { //scatter the value cudaAssert(sargs.col_idx != NULL); //use the local offset to scatter the value //TODO: we have to support the case where the offset for dimension table is a global offset for (int col = 0; col < NUM_COLUMN; col++) { int table = column_map[col]; //forwarding column from previous join for table 0 if (sargs.column[col] != NULL && table == 0 && sargs.key_column != col) { //WARNING: IF JOIN KEY HAS BEEN USED, NO NEED TO PARTITION THAT JOIN KEY // if (latemat != 2 || (latemat == 2 && col != pargs.fkey_col_id[0] && col != pargs.fkey_col_id[1] && col != pargs.fkey_col_id[2] && col != pargs.fkey_col_id[3])) { cudaAssert(sargs.column[col] != NULL); cudaAssert(sout.column[col] != NULL); ptr = sargs.column[col]; BlockLoadMasked(ptr + tile_offset, item_val, selection_flags, num_tile_items); // BlockLoadCrystal(ptr + tile_offset, item_val, num_tile_items); BlockWriteValPartition2(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1); // } //forwarding column from previous join for dim table } else if (latemat == 0 && sargs.column[col] != NULL && table > 0 && pargs.key_column[table-1] == NULL && sargs.key_column != col) { cudaAssert(sargs.column[col] != NULL); cudaAssert(sout.column[col] != NULL); ptr = sargs.column[col]; BlockLoadMasked(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1); } else if (latemat == 0 && sargs.col_idx[col] != NULL && table > 0 && pargs.key_column[table-1] != NULL && sargs.key_column != col) { //materializing the global offset cudaAssert(table != 0); cudaAssert(sargs.col_idx[col] != NULL); cudaAssert(sargs.broadcast_idx[col] != NULL); cudaAssert(sout.column[col] != NULL); BlockReadOffsetGPU3(threadIdx.x, item_off[table], item_val, selection_flags, gpuCache, sargs.col_idx[col], sargs.broadcast_idx[col], num_tile_items); BlockWriteValPartition2(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1); } } } if (write_offset) { //scatter the offset (global offset and not the local offset), local offset would be useless once the shuffle is done for (int table = 0; table < NUM_TABLE; table++) { cudaAssert(sargs.in_off != NULL); //forwarding offset from table = 0 if (sargs.in_off[table] != NULL && sargs.table == table && table == 0) { cudaAssert(sargs.in_off[table] != NULL); cudaAssert(sout.out_off[table] != NULL); ptr = sargs.in_off[table]; BlockLoadMasked(ptr + tile_offset, item_val, selection_flags, num_tile_items); // BlockLoadCrystal(ptr + tile_offset, item_val, num_tile_items); BlockWriteValPartition2(item_val, item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1); //forwarding offset from previous join } else if (sargs.in_off[table] != NULL && table > 0 && pargs.key_column[table-1] == NULL) { cudaAssert(sargs.in_off[table] != NULL); cudaAssert(sout.out_off[table] != NULL); ptr = sargs.in_off[table]; BlockLoadMasked(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2(item_val, item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1); //storing offset from current join } else if (table > 0 && pargs.key_column[table-1] != NULL && sout.out_off[table] != NULL) { cudaAssert(sargs.in_off[table] != NULL); cudaAssert(sout.out_off[table] != NULL); BlockWriteValPartition2(item_off[table], item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1); } } } } template __global__ void probe_group_by_GPU_lm( int** gpuCache, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, int num_tuples, int* res, int gpu) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int* ptr; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int groupval[4][ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __syncthreads(); InitFlags(selection_flags); if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes ptr = pargs.key_column[0]; BlockLoadCrystal(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU(threadIdx.x, items, groupval[0], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { BlockSetValue(threadIdx.x, groupval[0], 0, num_tile_items); } if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) { ptr = pargs.key_column[1]; BlockLoadCrystal(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU(threadIdx.x, items, groupval[1], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue(threadIdx.x, groupval[1], 0, num_tile_items); } if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) { ptr = pargs.key_column[2]; BlockLoadCrystal(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU(threadIdx.x, items, groupval[2], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); // if (blockIdx.x == 0) printf("%d %d %d %d\n", groupval[2][0], groupval[2][1], groupval[2][3], groupval[2][4]); } else { BlockSetValue(threadIdx.x, groupval[2], 0, num_tile_items); } if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) { ptr = pargs.key_column[3]; BlockLoadCrystal(ptr + tile_offset, items, num_tile_items); // if (blockIdx.x == 0) printf("%d %d %d %d %d %d %d %d\n", items[0], items[1], items[2], items[3], selection_flags[0], selection_flags[1], selection_flags[2], selection_flags[3]); BlockProbeGroupByGPU(threadIdx.x, items, groupval[3], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue(threadIdx.x, groupval[3], 0, num_tile_items); } for (int i = 0; i < NUM_TABLE-1; i++) { if (pargs.key_column[i] == NULL && gargs.group_col_id[i] != -1) { ptr = sargs.in_off[i+1]; BlockLoadMasked(ptr + tile_offset, items, selection_flags, num_tile_items); // if (i == 2) if (threadIdx.x == 0) printf("%d %d %d %d\n", items[0], items[1], items[2], items[3]); BlockReadOffsetGlobal2(items, selection_flags, groupval[i], gpuCache, sargs.all_col_idx, sargs.broadcast_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.group_col_id[i], i+1, gpu, num_tile_items); // if (blockIdx.x == 0 && i == 3) printf("%d %d %d %d %d %d %d %d\n", groupval[i][0], groupval[i][1], groupval[i][2], groupval[i][3]); } } ptr = sargs.in_off[0]; BlockLoadMasked(ptr + tile_offset, items, selection_flags, num_tile_items); if (gargs.aggr_col_id[0] != -1) { // BlockReadOffsetGlobal2(items, selection_flags, aggrval1, gpuCache, sargs.all_col_idx, sargs.broadcast_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[0], 0, gpu, num_tile_items); BlockReadOffsetGlobal(items, selection_flags, aggrval1, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[0], 0, gpu, num_tile_items); // BlockSetValue(threadIdx.x, aggrval1, 1, num_tile_items); } else { BlockSetValue(threadIdx.x, aggrval1, 0, num_tile_items); } if (gargs.aggr_col_id[1] != -1) { // BlockReadOffsetGlobal2(items, selection_flags, aggrval2, gpuCache, sargs.all_col_idx, sargs.broadcast_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[1], 0, gpu, num_tile_items); BlockReadOffsetGlobal(items, selection_flags, aggrval2, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[1], 0, gpu, num_tile_items); // BlockSetValue(threadIdx.x, aggrval2, 1, num_tile_items); } else { BlockSetValue(threadIdx.x, aggrval2, 0, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { int hash = ((groupval[0][ITEM] - gargs.min_val1) * gargs.unique_val1 + (groupval[1][ITEM] - gargs.min_val2) * gargs.unique_val2 + (groupval[2][ITEM] - gargs.min_val3) * gargs.unique_val3 + (groupval[3][ITEM] - gargs.min_val4) * gargs.unique_val4) % gargs.total_val; //! res[hash * 6] = groupval[0][ITEM]; res[hash * 6 + 1] = groupval[1][ITEM]; res[hash * 6 + 2] = groupval[2][ITEM]; res[hash * 6 + 3] = groupval[3][ITEM]; int temp; if (aggrval1[ITEM] > aggrval2[ITEM]) temp = aggrval1[ITEM] - aggrval2[ITEM]; else temp = aggrval2[ITEM] - aggrval1[ITEM]; atomicAdd(reinterpret_cast(&res[hash * 6 + 4]), (long long)(temp)); } } } } template __global__ void probe_aggr_GPU_lm( int** gpuCache, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, int num_tuples, int* res, int gpu) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int* ptr; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int temp[ITEMS_PER_THREAD]; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __syncthreads(); InitFlags(selection_flags); if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes ptr = pargs.key_column[0]; BlockLoadCrystal(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU(threadIdx.x, items, temp, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) { ptr = pargs.key_column[1]; BlockLoadCrystal(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU(threadIdx.x, items, temp, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) { ptr = pargs.key_column[2]; BlockLoadCrystal(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU(threadIdx.x, items, temp, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) { ptr = pargs.key_column[3]; BlockLoadCrystal(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU(threadIdx.x, items, temp, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } ptr = sargs.in_off[0]; BlockLoadMasked(ptr + tile_offset, items, selection_flags, num_tile_items); if (gargs.aggr_col_id[0] != -1) { BlockReadOffsetGlobal(items, selection_flags, aggrval1, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[0], 0, gpu, num_tile_items); } else { BlockSetValue(threadIdx.x, aggrval1, 1, num_tile_items); } if (gargs.aggr_col_id[1] != -1) { BlockReadOffsetGlobal(items, selection_flags, aggrval2, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[1], 0, gpu, num_tile_items); } else { BlockSetValue(threadIdx.x, aggrval2, 1, num_tile_items); } long long sum = 0; #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { // printf("here %d %d\n", aggrval1[ITEM], aggrval2[ITEM]); sum+= (aggrval1[ITEM] * aggrval2[ITEM]); } } } __syncthreads(); static __shared__ long long buffer[32]; unsigned long long aggregate = BlockSum(sum, (long long*)buffer); __syncthreads(); if (threadIdx.x == 0) { atomicAdd(reinterpret_cast(&res[4]), aggregate); // printf("%lu\n", res[4]); } }