#pragma once #include <cub/cub.cuh> #include <curand.h> #include <cuda.h> #include "crystal/crystal.cuh" #include "BlockLibrary.cuh" #include "ShuffleGPU.h" #include "KernelArgs.h" #include "gpu_utils.h" using namespace cub; template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE> __global__ void shuffle_probe_partition_GPU(int* gpuCache, int* partition_count, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int gpu, int write_val = 1, int write_offset = 0, int latemat = 1) { __const__ int column_map[NUM_COLUMN] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 1, 1, 1, 1, 3, 3, 3, 3, 4, 4, 4 }; cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int* ptr; // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[NUM_TABLE][ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int item_val[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; __shared__ int block_scan_partition[NUM_GPU]; __shared__ int smem_index[NUM_RANGE]; __shared__ int my_partition; __shared__ int blockId; //blockId within a partition __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } if (threadIdx.x == 0) { my_partition = -1; blockId = -1; for (int partition = 0; partition < NUM_GPU; partition++) { if (partition == 0) { block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size; } else { block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size; } } for (int partition = 0; partition < NUM_GPU; partition++) { if (blockIdx.x < block_scan_partition[partition]) { my_partition = partition; if (my_partition == 0) blockId = blockIdx.x; else blockId = blockIdx.x - block_scan_partition[my_partition-1]; break; } } } __syncthreads(); cudaAssert(my_partition != -1); cudaAssert(blockId != -1); int num_tile_items = tile_size; int tile_offset = blockId * tile_size; int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size; if (blockId == num_tiles_partition - 1) { num_tile_items = partition_count[my_partition] - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(sargs.column_part != NULL); cudaAssert(sargs.column_part[my_partition] != NULL); if (pargs.fkey_col_id[0] == 1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu]; if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items); } if (pargs.fkey_col_id[1] == 2 && pargs.ht2 != NULL) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu]; if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items); } if (pargs.fkey_col_id[2] == 3 && pargs.ht3 != NULL) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu]; if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items); } if (pargs.fkey_col_id[3] == 4 && pargs.ht4 != NULL) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu]; if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { item_off[0][ITEM] = blockId * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; //local offset to each partition } } } //DO PARTITIONING //this range_size would not work for date column int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) //right now we only support probe table as the table getting partitioned cudaAssert(sargs.table == 0); cudaAssert(column_map[sargs.key_column] == sargs.table); cudaAssert(sargs.column_part[my_partition][sargs.key_column] != NULL); cudaAssert(sargs.column_part[my_partition][sargs.key_column][gpu] != NULL); //loading the partitioned key column and count the partition // ptr = sargs.column_part[my_partition][sargs.key_column][gpu]; // BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size); //loading the partitioned key column and count the partition ptr = sargs.column_part[my_partition][sargs.key_column][gpu]; if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items); int is_replicated = 0; BlockCountIndexPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu); __syncthreads(); if (threadIdx.x < NUM_RANGE) { // printf("%d\n", smem_index[threadIdx.x]); smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition if (partition >= NUM_RANGE) partition = NUM_RANGE-1; index[ITEM] += smem_index[partition]; } } } //Barrier __syncthreads(); //write the keys cudaAssert(sout.column[sargs.key_column] != NULL); BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, 0, -1); __syncthreads(); if (write_val) { cudaAssert(sout.column != NULL); cudaAssert(sargs.col_idx != NULL); //scatter the value //use the local offset to scatter the value //TODO: we have to support the case where the offset for dimension table is a global offset for (int col = 0; col < NUM_COLUMN; col++) { int table = column_map[col]; //write the values from the shuffled input if (sargs.column_part[my_partition][col] != NULL && table == 0 && sargs.key_column != col) { //materializing the local offset //WARNING: IF JOIN KEY HAS BEEN USED, NO NEED TO PARTITION THAT JOIN KEY if (latemat != 2 || (latemat == 2 && col != pargs.fkey_col_id[0] && col != pargs.fkey_col_id[1] && col != pargs.fkey_col_id[2] && col != pargs.fkey_col_id[3])) { cudaAssert(sargs.column_part[my_partition][col][gpu] != NULL); cudaAssert(sout.column[col] != NULL); // BlockReadOffsetShuffle2<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[table], selection_flags, sargs.column_part[my_partition][col][gpu], item_val, num_tile_items); ptr = sargs.column_part[my_partition][col][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1); } //for now sargs.col_idx[col] should be NULL except for groupby column in dimension table in SSB //not yet working cause sargs.col_idx is NULL } else if (sargs.column_part[my_partition][col] != NULL && table > 0 && pargs.fkey_col_id[table-1] < -1 && sargs.key_column != col) { cudaAssert(sargs.column_part[my_partition][col][gpu] != NULL); cudaAssert(sout.column[col] != NULL); // BlockReadOffsetShuffle2<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[table], selection_flags, sargs.column_part[my_partition][col][gpu], item_val, num_tile_items); ptr = sargs.column_part[my_partition][col][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1); } else if (sargs.col_idx[col] != NULL && table > 0 && pargs.fkey_col_id[table-1] > 0 && sargs.key_column != col) { //materializing the global offset cudaAssert(sargs.col_idx[col] != NULL); cudaAssert(sargs.broadcast_idx[col] != NULL); cudaAssert(sout.column[col] != NULL); BlockReadOffsetGPU3<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[table], item_val, selection_flags, gpuCache, sargs.col_idx[col], sargs.broadcast_idx[col], num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1); // cudaAssert(0); } } } //TODO: NOT YET SUPPORTED if (write_offset) { // cudaAssert(0); //scatter the offset (global offset and not the local offset), local offset would be useless once the shuffle is done cudaAssert(sargs.in_off_part != NULL); cudaAssert(sargs.in_off_part[my_partition] != NULL); cudaAssert(sout.out_off != NULL); for (int table = 0; table < NUM_TABLE; table++) { //if the offset coming from previous join if (sargs.in_off_part[my_partition][table] != NULL && table == 0) { cudaAssert(sargs.in_off_part[my_partition][table][gpu] != NULL); cudaAssert(sout.out_off[table] != NULL); ptr = sargs.in_off_part[my_partition][table][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1); } else if (sargs.in_off_part[my_partition][table] != NULL && table > 0 && pargs.fkey_col_id[table-1] < -1) { cudaAssert(sargs.in_off_part[my_partition][table][gpu] != NULL); cudaAssert(sout.out_off[table] != NULL); // BlockReadOffsetShuffle2<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[table], selection_flags, sargs.in_off_part[my_partition][table][gpu], item_val, num_tile_items); ptr = sargs.in_off_part[my_partition][table][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1); //if the offset coming from join } else if (table > 0 && pargs.fkey_col_id[table-1] > 0) { cudaAssert(sout.out_off[table] != NULL); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[table], item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1); } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE> __global__ void shuffle_probe_partition_global_GPU(int** gpuCache, int* partition_count, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int gpu, int first_join_in_pipeline, int write_val = 1, int write_offset = 0) { __const__ int column_map[NUM_COLUMN] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 1, 1, 1, 1, 3, 3, 3, 3, 4, 4, 4 }; cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int* ptr; // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[NUM_TABLE][ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; int table_id = 0; __shared__ int block_scan_partition[NUM_GPU]; __shared__ int smem_index[NUM_RANGE]; __shared__ int my_partition; __shared__ int blockId; //blockId within a partition __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } if (threadIdx.x == 0) { my_partition = -1; blockId = -1; for (int partition = 0; partition < NUM_GPU; partition++) { if (partition == 0) { block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size; } else { block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size; } } for (int partition = 0; partition < NUM_GPU; partition++) { if (blockIdx.x < block_scan_partition[partition]) { my_partition = partition; if (my_partition == 0) blockId = blockIdx.x; else blockId = blockIdx.x - block_scan_partition[my_partition-1]; break; } } } __syncthreads(); cudaAssert(my_partition != -1); cudaAssert(blockId != -1); int num_tile_items = tile_size; int tile_offset = blockId * tile_size; int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size; if (blockId == num_tiles_partition - 1) { num_tile_items = partition_count[my_partition] - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(sargs.column_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); if (pargs.fkey_col_id[0] == 1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column if (first_join_in_pipeline == 1) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { //partition size is the original partition size of the fact table during initial partition, it is different from partition count BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[0], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } } else { //we are not doing join for this column, there might be offset from previous join but will be handled in write offset BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items); } if (pargs.fkey_col_id[1] == 2 && pargs.ht2 != NULL) { if (first_join_in_pipeline == 2) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[1], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items); } if (pargs.fkey_col_id[2] == 3 && pargs.ht3 != NULL) { if (first_join_in_pipeline == 3) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[2], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items); } if (pargs.fkey_col_id[3] == 4 && pargs.ht4 != NULL) { if (first_join_in_pipeline == 4) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[3], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items); } //DO PARTITIONING //this range_size would not work for date column int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) //right now we only support probe table as the table getting partitioned cudaAssert(sargs.table == 0); cudaAssert(column_map[sargs.key_column] == sargs.table); // cudaAssert(sargs.column[sargs.key_column] != NULL); //loading the partitioned key column and count the partition BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, sargs.key_column, table_id, gpu, num_tile_items); int is_replicated = 0; BlockCountIndexPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu); __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); // if (threadIdx.x == 0) printf("hey %d %d %d\n", smem_index[0], smem_index[1], sargs.key_column); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition if (partition >= NUM_RANGE) partition = NUM_RANGE-1; index[ITEM] += smem_index[partition]; } } } //Barrier __syncthreads(); cudaAssert(sout.column[sargs.key_column] != NULL); BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, 0, -1); // if (threadIdx.x == 0 && blockIdx.x == 0) printf("writing keys\n"); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[0], item_key, index, selection_flags, sout.out_off[0], num_tile_items, range_size, 0, 100); __syncthreads(); // NOT SURE HOW TO DO THIS WITH GLOBAL LATE MATERIALIZATION // TODO: NOT YET SUPPORTED if (write_offset) { cudaAssert(sargs.in_off_part != NULL); cudaAssert(sargs.in_off_part[my_partition] != NULL); cudaAssert(sout.out_off != NULL); //only need to address from table = 1 cause table = 0 offset already written above for (int table = 1; table < NUM_TABLE; table++) { //offset coming from previous join if (sargs.in_off_part[my_partition][table] != NULL && pargs.fkey_col_id[table-1] < -1) { cudaAssert(sout.out_off[table] != NULL); int* ptr = sargs.in_off_part[my_partition][table][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[table], selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[table], item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, 200); //offset coming from current join } else if (pargs.fkey_col_id[table-1] > 0) { cudaAssert(sout.out_off[table] != NULL); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[table], item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, 300); } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE> __global__ void shuffle_probe_aggr_global_GPU(int** gpuCache, int* partition_count, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, int* res, int gpu, int first_join_in_pipeline, int write_val = 1, int write_offset = 0) { cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int* ptr; // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[NUM_TABLE][ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int table_id = 0; __shared__ int block_scan_partition[NUM_GPU]; __shared__ int smem_index[NUM_RANGE]; __shared__ int my_partition; __shared__ int blockId; //blockId within a partition __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } if (threadIdx.x == 0) { my_partition = -1; blockId = -1; for (int partition = 0; partition < NUM_GPU; partition++) { if (partition == 0) { block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size; } else { block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size; } } for (int partition = 0; partition < NUM_GPU; partition++) { if (blockIdx.x < block_scan_partition[partition]) { my_partition = partition; if (my_partition == 0) blockId = blockIdx.x; else blockId = blockIdx.x - block_scan_partition[my_partition-1]; break; } } } __syncthreads(); cudaAssert(my_partition != -1); cudaAssert(blockId != -1); int num_tile_items = tile_size; int tile_offset = blockId * tile_size; int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size; if (blockId == num_tiles_partition - 1) { num_tile_items = partition_count[my_partition] - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(sargs.column_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); if (pargs.fkey_col_id[0] == 1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column if (first_join_in_pipeline == 1) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { //partition size is the original partition size of the fact table during initial partition, it is different from partition count BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[0], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } } if (pargs.fkey_col_id[1] == 2 && pargs.ht2 != NULL) { if (first_join_in_pipeline == 2) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[1], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } } if (pargs.fkey_col_id[2] == 3 && pargs.ht3 != NULL) { if (first_join_in_pipeline == 3) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[2], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } } if (pargs.fkey_col_id[3] == 4 && pargs.ht4 != NULL) { if (first_join_in_pipeline == 4) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[3], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } } if (gargs.aggr_col_id[0] != -1) { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, aggrval1, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[0], table_id, gpu, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items); } if (gargs.aggr_col_id[1] != -1) { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, aggrval2, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[1], table_id, gpu, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items); } long long sum = 0; #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { sum += (aggrval1[ITEM] * aggrval2[ITEM]); } } } __syncthreads(); static __shared__ long long buffer[32]; unsigned long long aggregate = BlockSum<long long, BLOCK_THREADS, ITEMS_PER_THREAD>(sum, (long long*)buffer); __syncthreads(); if (threadIdx.x == 0) { atomicAdd(reinterpret_cast<unsigned long long*>(&res[4]), aggregate); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE> __global__ void shuffle_probe_group_by_global_GPU(int** gpuCache, int* partition_count, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, int* res, int gpu, int first_join_in_pipeline, int write_val = 1, int write_offset = 0) { cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int* ptr; // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[NUM_TABLE][ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; int groupval1[ITEMS_PER_THREAD]; int groupval2[ITEMS_PER_THREAD]; int groupval3[ITEMS_PER_THREAD]; int groupval4[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int table_id = 0; __shared__ int block_scan_partition[NUM_GPU]; __shared__ int smem_index[NUM_RANGE]; __shared__ int my_partition; __shared__ int blockId; //blockId within a partition __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } if (threadIdx.x == 0) { my_partition = -1; blockId = -1; for (int partition = 0; partition < NUM_GPU; partition++) { if (partition == 0) { block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size; } else { block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size; } } for (int partition = 0; partition < NUM_GPU; partition++) { if (blockIdx.x < block_scan_partition[partition]) { my_partition = partition; if (my_partition == 0) blockId = blockIdx.x; else blockId = blockIdx.x - block_scan_partition[my_partition-1]; break; } } } __syncthreads(); cudaAssert(my_partition != -1); cudaAssert(blockId != -1); int num_tile_items = tile_size; int tile_offset = blockId * tile_size; int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size; if (blockId == num_tiles_partition - 1) { num_tile_items = partition_count[my_partition] - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(sargs.column_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); //normal operation if (pargs.fkey_col_id[0] == 1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column if (first_join_in_pipeline == 1) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { //partition size is the original partition size of the fact table during initial partition, it is different from partition count BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[0], table_id, gpu, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } //there is offset from previous join } else if (gargs.group_col_id[0] != -1) { cudaAssert(sargs.in_off_part[my_partition][1] != NULL); cudaAssert(sargs.in_off_part[my_partition][1][gpu] != NULL); ptr = sargs.in_off_part[my_partition][1][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[1], selection_flags, num_tile_items); BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[1], selection_flags, groupval1, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.group_col_id[0], 1, gpu, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval1, 0, num_tile_items); } if (pargs.fkey_col_id[1] == 2 && pargs.ht2 != NULL) { if (first_join_in_pipeline == 2) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[1], table_id, gpu, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } } else if (gargs.group_col_id[1] != -1) { cudaAssert(sargs.in_off_part[my_partition][2] != NULL); cudaAssert(sargs.in_off_part[my_partition][2][gpu] != NULL); ptr = sargs.in_off_part[my_partition][2][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[2], selection_flags, num_tile_items); BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[2], selection_flags, groupval2, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.group_col_id[1], 2, gpu, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval2, 0, num_tile_items); } if (pargs.fkey_col_id[2] == 3 && pargs.ht3 != NULL) { if (first_join_in_pipeline == 3) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[2], 3, gpu, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } } else if (gargs.group_col_id[2] != -1) { cudaAssert(sargs.in_off_part[my_partition][3] != NULL); cudaAssert(sargs.in_off_part[my_partition][3][gpu] != NULL); ptr = sargs.in_off_part[my_partition][3][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[3], selection_flags, num_tile_items); BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[3], selection_flags, groupval3, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.group_col_id[2], 3, gpu, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval3, 0, num_tile_items); } if (pargs.fkey_col_id[3] == 4 && pargs.ht4 != NULL) { if (first_join_in_pipeline == 4) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[3], table_id, gpu, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } } else if (gargs.group_col_id[3] != -1) { cudaAssert(sargs.in_off_part[my_partition][4] != NULL); cudaAssert(sargs.in_off_part[my_partition][4][gpu] != NULL); ptr = sargs.in_off_part[my_partition][4][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[4], selection_flags, num_tile_items); BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[4], selection_flags, groupval4, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.group_col_id[3], 4, gpu, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval4, 0, num_tile_items); } if (gargs.aggr_col_id[0] != -1) { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, aggrval1, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[0], table_id, gpu, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 0, num_tile_items); } if (gargs.aggr_col_id[1] != -1) { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, aggrval2, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[1], table_id, gpu, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 0, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { int hash = ((groupval1[ITEM] - gargs.min_val1) * gargs.unique_val1 + (groupval2[ITEM] - gargs.min_val2) * gargs.unique_val2 + (groupval3[ITEM] - gargs.min_val3) * gargs.unique_val3 + (groupval4[ITEM] - gargs.min_val4) * gargs.unique_val4) % gargs.total_val; //! res[hash * 6] = groupval1[ITEM]; res[hash * 6 + 1] = groupval2[ITEM]; res[hash * 6 + 2] = groupval3[ITEM]; res[hash * 6 + 3] = groupval4[ITEM]; int temp; if (aggrval1[ITEM] > aggrval2[ITEM]) temp = aggrval1[ITEM] - aggrval2[ITEM]; else temp = aggrval2[ITEM] - aggrval1[ITEM]; atomicAdd(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp)); } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE> __global__ void shuffle_probe_global_GPU(int** gpuCache, int* partition_count, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, struct offsetGPU out_off, int* total, int gpu, int first_join_in_pipeline, int write_val = 1, int write_offset = 0) { // __const__ int column_map[NUM_COLUMN] = { // 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 2, 2, 2, 2, // 1, 1, 1, 1, // 3, 3, 3, 3, // 4, 4, 4 // }; cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int* ptr; // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[NUM_TABLE][ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; int t_count = 0; // Number of items selected per thread int c_t_count = 0; //Prefix sum of t_count __shared__ int block_off; int table_id = 0; __shared__ int block_scan_partition[NUM_GPU]; __shared__ int smem_index[NUM_RANGE]; __shared__ int my_partition; __shared__ int blockId; //blockId within a partition __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } if (threadIdx.x == 0) { my_partition = -1; blockId = -1; for (int partition = 0; partition < NUM_GPU; partition++) { if (partition == 0) { block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size; } else { block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size; } } for (int partition = 0; partition < NUM_GPU; partition++) { if (blockIdx.x < block_scan_partition[partition]) { my_partition = partition; if (my_partition == 0) blockId = blockIdx.x; else blockId = blockIdx.x - block_scan_partition[my_partition-1]; break; } } } __syncthreads(); cudaAssert(my_partition != -1); cudaAssert(blockId != -1); int num_tile_items = tile_size; int tile_offset = blockId * tile_size; int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size; if (blockId == num_tiles_partition - 1) { num_tile_items = partition_count[my_partition] - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(sargs.column_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); if (pargs.fkey_col_id[0] == 1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column if (first_join_in_pipeline == 1) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { //partition size is the original partition size of the fact table during initial partition, it is different from partition count BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[0], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items); } if (pargs.fkey_col_id[1] == 2 && pargs.ht2 != NULL) { if (first_join_in_pipeline == 2) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[1], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items); } if (pargs.fkey_col_id[2] == 3 && pargs.ht3 != NULL) { if (first_join_in_pipeline == 3) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[2], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items); } if (pargs.fkey_col_id[3] == 4 && pargs.ht4 != NULL) { if (first_join_in_pipeline == 4) { cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL); cudaAssert(sargs.in_off_part[my_partition][0] != NULL); cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL); ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items); } else { BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[3], table_id, gpu, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) t_count++; } } //Barrier __syncthreads(); // TODO: need to check logic for offset BlockScanInt(temp_storage.scan).ExclusiveSum(t_count, c_t_count); //doing a prefix sum of all the previous threads in the block and store it to c_t_count if(threadIdx.x == blockDim.x - 1) { //if the last thread in the block, add the prefix sum of all the prev threads + sum of my threads to global variable total block_off = atomicAdd(total, t_count+c_t_count); //the previous value of total is gonna be assigned to block_off } //block_off does not need to be global (it's just need to be shared), because it will get the previous value from total which is global __syncthreads(); cudaAssert(sargs.in_off_part != NULL); cudaAssert(sargs.in_off_part[my_partition] != NULL); #pragma unroll for (int table = 1; table < NUM_TABLE; table++) { //offset coming from previous join if (sargs.in_off_part[my_partition][table] != NULL && pargs.fkey_col_id[table-1] < -1) { int* ptr = sargs.in_off_part[my_partition][table][gpu]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[table], selection_flags, num_tile_items); // if (table == 1) printf("supplier off %d %d %d %d\n", item_off[1][0], item_off[1][1], item_off[1][2], item_off[1][3]); } } cudaAssert(out_off.join_result_off != NULL); cudaAssert(out_off.join_result_off[0] != NULL); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { int offset = block_off + c_t_count++; out_off.join_result_off[0][offset] = item_off[0][ITEM]; if (out_off.join_result_off[1] != NULL) out_off.join_result_off[1][offset] = item_off[1][ITEM]; //global offset if (out_off.join_result_off[2] != NULL) out_off.join_result_off[2][offset] = item_off[2][ITEM]; //global offset if (out_off.join_result_off[3] != NULL) out_off.join_result_off[3][offset] = item_off[3][ITEM]; //global offset if (out_off.join_result_off[4] != NULL) out_off.join_result_off[4][offset] = item_off[4][ITEM]; //global offset } } } }