#pragma once #include <cub/cub.cuh> #include <curand.h> #include <cuda.h> #include "crystal/crystal.cuh" #include "BlockLibrary.cuh" #include "KernelArgs.h" #include "gpu_utils.h" template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE> __global__ void probe_partition_GPU_simple( int* gpuCache, struct filterArgsGPU fargs, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, int gpu, int start_offset = 0, short* segment_group = NULL, int write_val = 1, int write_offset = 0, int latemat = 0) { cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment int* ptr; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[NUM_TABLE][ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int item_val[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; __shared__ int smem_index[NUM_RANGE]; __shared__ int64_t key_segment1; __shared__ int64_t key_segment2; __shared__ int64_t key_segment3; __shared__ int64_t key_segment4; // __shared__ int64_t filter_segment1; // __shared__ int64_t filter_segment2; __shared__ int64_t shuffle_key; __shared__ int64_t segment_index; __shared__ int is_replicated; if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; if (pargs.key_idx1 != NULL) { key_segment1 = pargs.key_idx1[segment_index]; cudaAssert(key_segment1 != -1); } if (pargs.key_idx2 != NULL) { key_segment2 = pargs.key_idx2[segment_index]; cudaAssert(key_segment2 != -1); } if (pargs.key_idx3 != NULL) { key_segment3 = pargs.key_idx3[segment_index]; cudaAssert(key_segment3 != -1); } if (pargs.key_idx4 != NULL) { key_segment4 = pargs.key_idx4[segment_index]; cudaAssert(key_segment4 != -1); } // if (fargs.filter_idx1 != NULL) { // filter_segment1 = fargs.filter_idx1[segment_index]; // cudaAssert(filter_segment1 != -1); // } // if (fargs.filter_idx2 != NULL) { // filter_segment2 = fargs.filter_idx2[segment_index]; // cudaAssert(filter_segment2 != -1); // } if (sargs.col_idx[sargs.key_column] != NULL) { shuffle_key = sargs.col_idx[sargs.key_column][segment_index]; cudaAssert(shuffle_key != -1); } is_replicated = sargs.seg_is_replicated[segment_index]; } __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); // if (fargs.filter_idx1 != NULL) { // ptr = gpuCache + filter_segment1 * SEGMENT_SIZE; // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_val, num_tile_items); // BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items); // BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare2, selection_flags, num_tile_items); // } // if (fargs.filter_idx2 != NULL) { // ptr = gpuCache + filter_segment2 * SEGMENT_SIZE; // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_val, num_tile_items); // BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items); // BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare4, selection_flags, num_tile_items); // } if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) ptr = gpuCache + key_segment1 * SEGMENT_SIZE; // check_join[0] = 1; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items); } if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { ptr = gpuCache + key_segment2 * SEGMENT_SIZE; // check_join[1] = 1; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items); } if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { ptr = gpuCache + key_segment3 * SEGMENT_SIZE; // check_join[2] = 1; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items); } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { ptr = gpuCache + key_segment4 * SEGMENT_SIZE; // check_join[3] = 1; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { item_off[0][ITEM] = blockIdx.x * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; //local offset to each partition } } } //DO PARTITIONING //this range_size would not work for date column start_offset = segment_index * SEGMENT_SIZE; int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) //right now we only support probe table as the table getting partitioned cudaAssert(sargs.table == 0); //loading the partitioned key column and count the partition cudaAssert(sargs.col_idx[sargs.key_column] != NULL); ptr = gpuCache + shuffle_key * SEGMENT_SIZE; BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu); __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition if (partition >= NUM_RANGE) partition = NUM_RANGE-1; index[ITEM] += smem_index[partition]; } } } //Barrier cudaAssert(sout.column[sargs.key_column] != NULL); BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, is_replicated, gpu); BlockWriteOffSelf<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.out_off[0], start_offset, num_tile_items, range_size, is_replicated, gpu); //current join (if used by groupby) -> not used by groupby will have sout.out_off == NULL if (pargs.fkey_col_id[0] > 0 && sout.out_off[1] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[1], item_key, index, selection_flags, sout.out_off[1], num_tile_items, range_size, is_replicated, gpu); if (pargs.fkey_col_id[1] > 0 && sout.out_off[2] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[2], item_key, index, selection_flags, sout.out_off[2], num_tile_items, range_size, is_replicated, gpu); if (pargs.fkey_col_id[2] > 0 && sout.out_off[3] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[3], item_key, index, selection_flags, sout.out_off[3], num_tile_items, range_size, is_replicated, gpu); if (pargs.fkey_col_id[3] > 0 && sout.out_off[4] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[4], item_key, index, selection_flags, sout.out_off[4], num_tile_items, range_size, is_replicated, gpu); //future join (prev join would have sargs.col_idx == NULL) if (pargs.fkey_col_id[0] < 0 && sargs.col_idx[1] != NULL && sargs.key_column != 1) { int64_t val_segment = sargs.col_idx[1][segment_index]; ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[1], num_tile_items, range_size, is_replicated, gpu); } if (pargs.fkey_col_id[1] < 0 && sargs.col_idx[2] != NULL && sargs.key_column != 2) { int64_t val_segment = sargs.col_idx[2][segment_index]; ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[2], num_tile_items, range_size, is_replicated, gpu); } if (pargs.fkey_col_id[2] < 0 && sargs.col_idx[3] != NULL && sargs.key_column != 3) { int64_t val_segment = sargs.col_idx[3][segment_index]; ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[3], num_tile_items, range_size, is_replicated, gpu); } // cudaAssert(sargs.col_idx[pargs.fkey_col_id[3]] != NULL); if (pargs.fkey_col_id[3] < 0 && sargs.col_idx[4] != NULL && sargs.key_column != 4) { int64_t val_segment = sargs.col_idx[4][segment_index]; ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[4], num_tile_items, range_size, is_replicated, gpu); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE> __global__ void probe_partition_GPU_simple_groupby_8GPUs( int* gpuCache, struct filterArgsGPU fargs, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, int gpu, int start_offset = 0, short* segment_group = NULL, int write_val = 1, int write_offset = 0, int latemat = 0) { cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment int* ptr; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int item_val[ITEMS_PER_THREAD]; int groupval[4][ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; __shared__ int smem_index[NUM_RANGE]; __shared__ int64_t key_segment1; __shared__ int64_t key_segment2; __shared__ int64_t key_segment3; __shared__ int64_t key_segment4; __shared__ int64_t shuffle_key; __shared__ int64_t segment_index; __shared__ int is_replicated; if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; if (pargs.key_idx1 != NULL) { key_segment1 = pargs.key_idx1[segment_index]; cudaAssert(key_segment1 != -1); } if (pargs.key_idx2 != NULL) { key_segment2 = pargs.key_idx2[segment_index]; cudaAssert(key_segment2 != -1); } if (pargs.key_idx3 != NULL) { key_segment3 = pargs.key_idx3[segment_index]; cudaAssert(key_segment3 != -1); } if (pargs.key_idx4 != NULL) { key_segment4 = pargs.key_idx4[segment_index]; cudaAssert(key_segment4 != -1); } if (sargs.col_idx[sargs.key_column] != NULL) { shuffle_key = sargs.col_idx[sargs.key_column][segment_index]; cudaAssert(shuffle_key != -1); } is_replicated = sargs.seg_is_replicated[segment_index]; } __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) ptr = gpuCache + key_segment1 * SEGMENT_SIZE; // check_join[0] = 1; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[0], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[0], 0, num_tile_items); } if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { ptr = gpuCache + key_segment2 * SEGMENT_SIZE; // check_join[1] = 1; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[1], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[1], 0, num_tile_items); } if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { ptr = gpuCache + key_segment3 * SEGMENT_SIZE; // check_join[2] = 1; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[2], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[2], 0, num_tile_items); } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { ptr = gpuCache + key_segment4 * SEGMENT_SIZE; // check_join[3] = 1; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[3], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[3], 0, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { item_off[ITEM] = blockIdx.x * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; //local offset to each partition } } } //DO PARTITIONING //this range_size would not work for date column start_offset = segment_index * SEGMENT_SIZE; int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) //right now we only support probe table as the table getting partitioned cudaAssert(sargs.table == 0); //loading the partitioned key column and count the partition cudaAssert(sargs.col_idx[sargs.key_column] != NULL); ptr = gpuCache + shuffle_key * SEGMENT_SIZE; BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu); __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition if (partition >= NUM_RANGE) partition = NUM_RANGE-1; index[ITEM] += smem_index[partition]; } } } //Barrier cudaAssert(sout.column[sargs.key_column] != NULL); BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, is_replicated, gpu); BlockWriteOffSelf<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.out_off[0], start_offset, num_tile_items, range_size, is_replicated, gpu); //current join (if used by groupby) -> writing the groupby column value if (pargs.fkey_col_id[0] > 0 && gargs.group_col_id[0] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[0], item_key, index, selection_flags, sout.column[gargs.group_col_id[0]], num_tile_items, range_size, is_replicated, gpu); if (pargs.fkey_col_id[1] > 0 && gargs.group_col_id[1] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[1], item_key, index, selection_flags, sout.column[gargs.group_col_id[1]], num_tile_items, range_size, is_replicated, gpu); if (pargs.fkey_col_id[2] > 0 && gargs.group_col_id[2] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[2], item_key, index, selection_flags, sout.column[gargs.group_col_id[2]], num_tile_items, range_size, is_replicated, gpu); if (pargs.fkey_col_id[3] > 0 && gargs.group_col_id[3] != -1) { BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[3], item_key, index, selection_flags, sout.column[gargs.group_col_id[3]], num_tile_items, range_size, is_replicated, gpu); // if (blockIdx.x == 0) printf("%d %d %d %d %d %d %d %d\n", selection_flags[0], selection_flags[1], selection_flags[2], selection_flags[3], groupval[3][0], groupval[3][1], groupval[3][2], groupval[3][3]); } //future join (prev join would have sargs.col_idx == NULL) if (pargs.fkey_col_id[0] < 0 && sargs.col_idx[1] != NULL && sargs.key_column != 1) { int64_t val_segment = sargs.col_idx[1][segment_index]; ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[1], num_tile_items, range_size, is_replicated, gpu); } if (pargs.fkey_col_id[1] < 0 && sargs.col_idx[2] != NULL && sargs.key_column != 2) { int64_t val_segment = sargs.col_idx[2][segment_index]; ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[2], num_tile_items, range_size, is_replicated, gpu); } if (pargs.fkey_col_id[2] < 0 && sargs.col_idx[3] != NULL && sargs.key_column != 3) { int64_t val_segment = sargs.col_idx[3][segment_index]; ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[3], num_tile_items, range_size, is_replicated, gpu); } // cudaAssert(sargs.col_idx[pargs.fkey_col_id[3]] != NULL); if (pargs.fkey_col_id[3] < 0 && sargs.col_idx[4] != NULL && sargs.key_column != 4) { int64_t val_segment = sargs.col_idx[4][segment_index]; ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[4], num_tile_items, range_size, is_replicated, gpu); } if (gargs.aggr_col_id[0] != -1) { int64_t val_segment = sargs.col_idx[gargs.aggr_col_id[0]][segment_index]; ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[gargs.aggr_col_id[0]], num_tile_items, range_size, is_replicated, gpu); // if (blockIdx.x == 0) printf("%d %d %d %d %d %d %d %d\n", selection_flags[0], selection_flags[1], selection_flags[2], selection_flags[3], item_val[0], item_val[1], item_val[2], item_val[3]); } if (gargs.aggr_col_id[1] != -1) { int64_t val_segment = sargs.col_idx[gargs.aggr_col_id[1]][segment_index]; ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[gargs.aggr_col_id[1]], num_tile_items, range_size, is_replicated, gpu); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE> __global__ void probe_partition_GPU2_simple( int* gpuCache, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, int write_val = 1, int write_offset = 0, int latemat = 0) { cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int* ptr; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int smem_index[NUM_RANGE]; // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[NUM_TABLE][ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int item_val[ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) ptr = pargs.key_column[0]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items); } if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) { ptr = pargs.key_column[1]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items); } if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) { ptr = pargs.key_column[2]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items); } if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) { ptr = pargs.key_column[3]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items); } //DO PARTITIONING //this range_size would not work for date column int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) //right now we only support probe table as the table getting partitioned cudaAssert(sargs.table == 0); //loading the partitioned key column and count the partition cudaAssert(sargs.column[sargs.key_column] != NULL); ptr = sargs.column[sargs.key_column]; BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, 0, -1); __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition if (partition >= NUM_RANGE) partition = NUM_RANGE-1; index[ITEM] += smem_index[partition]; } } } //Barrier cudaAssert(sout.column[sargs.key_column] != NULL); BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, 0, -1); ptr = sargs.in_off[0]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.out_off[0], num_tile_items, range_size, 0, -1); if (pargs.fkey_col_id[0] > 0 && sout.out_off[1] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[1], item_key, index, selection_flags, sout.out_off[1], num_tile_items, range_size, 0, -1); else if (pargs.fkey_col_id[0] < -1 && sargs.in_off[1] != NULL) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off[1] + tile_offset, item_off[1], selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[1], item_key, index, selection_flags, sout.out_off[1], num_tile_items, range_size, 0, -1); } if (pargs.fkey_col_id[1] > 0 && sout.out_off[2] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[2], item_key, index, selection_flags, sout.out_off[2], num_tile_items, range_size, 0, -1); else if (pargs.fkey_col_id[1] < -1 && sargs.in_off[2] != NULL) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off[2] + tile_offset, item_off[2], selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[2], item_key, index, selection_flags, sout.out_off[2], num_tile_items, range_size, 0, -1); } if (pargs.fkey_col_id[2] > 0 && sout.out_off[3] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[3], item_key, index, selection_flags, sout.out_off[3], num_tile_items, range_size, 0, -1); else if (pargs.fkey_col_id[2] < -1 && sargs.in_off[3] != NULL) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off[3] + tile_offset, item_off[3], selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[3], item_key, index, selection_flags, sout.out_off[3], num_tile_items, range_size, 0, -1); } //current join (if used by groupby) -> not used by group by would have sout.out_off == NULL if (pargs.fkey_col_id[3] > 0 && sout.out_off[4] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[4], item_key, index, selection_flags, sout.out_off[4], num_tile_items, range_size, 0, -1); //prev join (if used by groupby) -> not used by group by would have sargs.in_off == NULL else if (pargs.fkey_col_id[3] < -1 && sargs.in_off[4] != NULL) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off[4] + tile_offset, item_off[4], selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[4], item_key, index, selection_flags, sout.out_off[4], num_tile_items, range_size, 0, -1); } //future join (past join would have sargs.column == NULL) if (pargs.fkey_col_id[0] < 0 && sargs.column[1] != NULL && sargs.key_column != 1) { ptr = sargs.column[1]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[1], num_tile_items, range_size, 0, -1); } if (pargs.fkey_col_id[1] < 0 && sargs.column[2] != NULL && sargs.key_column != 2) { ptr = sargs.column[2]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[2], num_tile_items, range_size, 0, -1); } if (pargs.fkey_col_id[2] < 0 && sargs.column[3] != NULL && sargs.key_column != 3) { ptr = sargs.column[3]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[3], num_tile_items, range_size, 0, -1); } // cudaAssert(sargs.col_idx[pargs.fkey_col_id[3]] != NULL); if (pargs.fkey_col_id[3] < 0 && sargs.column[4] != NULL && sargs.key_column != 4) { ptr = sargs.column[4]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[4], num_tile_items, range_size, 0, -1); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE> __global__ void probe_partition_GPU2_simple_groupby_8GPUs( int* gpuCache, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, int write_val = 1, int write_offset = 0, int latemat = 0) { cudaAssert(NUM_GPU == NUM_RANGE); //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int* ptr; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int smem_index[NUM_RANGE]; // Load a segment of consecutive items that are blocked across threads int selection_flags[ITEMS_PER_THREAD]; int item_off[ITEMS_PER_THREAD]; int item_key[ITEMS_PER_THREAD]; int item_val[ITEMS_PER_THREAD]; int groupval[4][ITEMS_PER_THREAD]; int index[ITEMS_PER_THREAD]; __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = 0; } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { index[ITEM] = 0; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) ptr = pargs.key_column[0]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[0], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[0], 0, num_tile_items); } if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) { ptr = pargs.key_column[1]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[1], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[1], 0, num_tile_items); } if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) { ptr = pargs.key_column[2]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[2], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[2], 0, num_tile_items); } if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) { ptr = pargs.key_column[3]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[3], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[3], 0, num_tile_items); } //DO PARTITIONING //this range_size would not work for date column int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4) //right now we only support probe table as the table getting partitioned cudaAssert(sargs.table == 0); //loading the partitioned key column and count the partition cudaAssert(sargs.column[sargs.key_column] != NULL); ptr = sargs.column[sargs.key_column]; BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, 0, -1); __syncthreads(); if (threadIdx.x < NUM_RANGE) { smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]); } __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition if (partition >= NUM_RANGE) partition = NUM_RANGE-1; index[ITEM] += smem_index[partition]; } } } //Barrier cudaAssert(sout.column[sargs.key_column] != NULL); BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, 0, -1); ptr = sargs.in_off[0]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off, item_key, index, selection_flags, sout.out_off[0], num_tile_items, range_size, 0, -1); // if (threadIdx.x == 0 && blockIdx.x == 0) printf("%d %d\n", pargs.fkey_col_id[0], gargs.group_col_id[0]); if (pargs.fkey_col_id[0] > 0 && gargs.group_col_id[0] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[0], item_key, index, selection_flags, sout.column[gargs.group_col_id[0]], num_tile_items, range_size, 0, -1); else if (pargs.fkey_col_id[0] < -1 && gargs.group_col_id[0] != -1) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.column[gargs.group_col_id[0]] + tile_offset, groupval[0], selection_flags, num_tile_items); // groupval[0][0] = 1; groupval[0][1] = 1; groupval[0][2] = 1; groupval[0][3] = 1; // printf("here\n"); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[0], item_key, index, selection_flags, sout.column[gargs.group_col_id[0]], num_tile_items, range_size, 0, -1); } if (pargs.fkey_col_id[1] > 0 && gargs.group_col_id[1] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[1], item_key, index, selection_flags, sout.column[gargs.group_col_id[1]], num_tile_items, range_size, 0, -1); else if (pargs.fkey_col_id[1] < -1 && gargs.group_col_id[1] != -1) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.column[gargs.group_col_id[1]] + tile_offset, groupval[1], selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[1], item_key, index, selection_flags, sout.column[gargs.group_col_id[1]], num_tile_items, range_size, 0, -1); } if (pargs.fkey_col_id[2] > 0 && gargs.group_col_id[2] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[2], item_key, index, selection_flags, sout.column[gargs.group_col_id[2]], num_tile_items, range_size, 0, -1); else if (pargs.fkey_col_id[2] < -1 && gargs.group_col_id[2] != -1) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.column[gargs.group_col_id[2]] + tile_offset, groupval[2], selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[2], item_key, index, selection_flags, sout.column[gargs.group_col_id[2]], num_tile_items, range_size, 0, -1); } //current join (if used by groupby) -> writing the groupby column if (pargs.fkey_col_id[3] > 0 && gargs.group_col_id[3] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[3], item_key, index, selection_flags, sout.column[gargs.group_col_id[3]], num_tile_items, range_size, 0, -1); //prev join (if used by groupby) -> writing the groupby column from prev join else if (pargs.fkey_col_id[3] < -1 && gargs.group_col_id[3] != -1) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.column[gargs.group_col_id[3]] + tile_offset, groupval[3], selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[3], item_key, index, selection_flags, sout.column[gargs.group_col_id[3]], num_tile_items, range_size, 0, -1); } //future join (past join would have sargs.column == NULL) if (pargs.fkey_col_id[0] < 0 && sargs.column[1] != NULL && sargs.key_column != 1) { ptr = sargs.column[1]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[1], num_tile_items, range_size, 0, -1); } if (pargs.fkey_col_id[1] < 0 && sargs.column[2] != NULL && sargs.key_column != 2) { ptr = sargs.column[2]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[2], num_tile_items, range_size, 0, -1); } if (pargs.fkey_col_id[2] < 0 && sargs.column[3] != NULL && sargs.key_column != 3) { ptr = sargs.column[3]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[3], num_tile_items, range_size, 0, -1); } // cudaAssert(sargs.col_idx[pargs.fkey_col_id[3]] != NULL); if (pargs.fkey_col_id[3] < 0 && sargs.column[4] != NULL && sargs.key_column != 4) { ptr = sargs.column[4]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[4], num_tile_items, range_size, 0, -1); } if (gargs.aggr_col_id[0] != -1) { ptr = sargs.column[gargs.aggr_col_id[0]]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[gargs.aggr_col_id[0]], num_tile_items, range_size, 0, -1); } if (gargs.aggr_col_id[1] != -1) { ptr = sargs.column[gargs.aggr_col_id[1]]; BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items); BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[gargs.aggr_col_id[1]], num_tile_items, range_size, 0, -1); } //WARNING: aggr_col_id and group_col_id represent slightly different information //aggr_col_id != -1 meaning that segment group will do agregation on that column on GPU //group_col_id != -1 meaning that there will be groupby on that column at some point (doesn't matter whether its in CPU or GPU) }