#ifndef _GPU_PROCESSING_H_ #define _GPU_PROCESSING_H_ #pragma once #include <cub/cub.cuh> #include <curand.h> #include <cuda.h> #include "crystal/crystal.cuh" #include "BlockLibrary.cuh" #include "ShuffleGPU.h" #include "PartitioningGPU.h" #include "KernelArgs.h" #include "gpu_utils.h" using namespace cub; //probe input cache (udah) //probe input offset from CPU (udah) //probe input from shuffle just offset and key (ntar) -> semi join approach //probe input from shuffle everything (belum) -> probe langsung aja //probe can output either everything (ntar) or just local offset (belum) //shuffle input cache (udah) //shuffle input offset from CPU (ntar) -> materialize the offset using cache and then shuffle //shuffle input local offset from probe (udah) -> materialize the offset column from the value column and then shuffle //shuffle input everything from probe (ntar) -> langsung shuffle //pipelined probe and shuffle (belum) //shuffle can output either everything (udah) or just offset and key (belum) //if cpu before gpu, then materialize and shuffle in cpu, send offset only to gpu template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void probe_group_by_GPU( struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, int num_tuples, int* res, int start_offset = 0) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int* ptr; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int groupval1[ITEMS_PER_THREAD]; int groupval2[ITEMS_PER_THREAD]; int groupval3[ITEMS_PER_THREAD]; int groupval4[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes ptr = pargs.key_column[0]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else if (gargs.group_column[0] != NULL) { ptr = gargs.group_column[0]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, groupval1, num_tile_items); // printf(" i reach here %d %d %d %d\n", groupval1[0], groupval1[1], groupval1[2], groupval1[3]); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval1, 0, num_tile_items); } if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) { ptr = pargs.key_column[1]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else if (gargs.group_column[1] != NULL) { ptr = gargs.group_column[1]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, groupval2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval2, 0, num_tile_items); } if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) { ptr = pargs.key_column[2]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else if (gargs.group_column[2] != NULL) { ptr = gargs.group_column[2]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, groupval3, num_tile_items); // if (blockIdx.x ==0) printf(" i reach here %d %d %d %d\n", groupval3[0], groupval3[1], groupval3[2], groupval3[3]); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval3, 0, num_tile_items); } if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) { ptr = pargs.key_column[3]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else if (gargs.group_column[3] != NULL) { ptr = gargs.group_column[3]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, groupval4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval4, 0, num_tile_items); } if (gargs.aggr_column[0] != NULL) { ptr = gargs.aggr_column[0]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, aggrval1, num_tile_items); // if (blockIdx.x ==0) printf(" i also reach here %d %d %d %d\n", aggrval1[0], aggrval1[1], aggrval1[2], aggrval1[3]); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 0, num_tile_items); } if (gargs.aggr_column[1] != NULL) { ptr = gargs.aggr_column[1]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, aggrval2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 0, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { int hash = ((groupval1[ITEM] - gargs.min_val1) * gargs.unique_val1 + (groupval2[ITEM] - gargs.min_val2) * gargs.unique_val2 + (groupval3[ITEM] - gargs.min_val3) * gargs.unique_val3 + (groupval4[ITEM] - gargs.min_val4) * gargs.unique_val4) % gargs.total_val; //! res[hash * 6] = groupval1[ITEM]; res[hash * 6 + 1] = groupval2[ITEM]; res[hash * 6 + 2] = groupval3[ITEM]; res[hash * 6 + 3] = groupval4[ITEM]; int temp; if (aggrval1[ITEM] > aggrval2[ITEM]) temp = aggrval1[ITEM] - aggrval2[ITEM]; else temp = aggrval2[ITEM] - aggrval1[ITEM]; atomicAdd(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp)); } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void probe_group_by_GPU2( int* gpuCache, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, int num_tuples, int* res, int start_offset = 0, short* segment_group = NULL) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int* ptr; int tiles_per_segment = SEGMENT_SIZE/tile_size; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int groupval1[ITEMS_PER_THREAD]; int groupval2[ITEMS_PER_THREAD]; int groupval3[ITEMS_PER_THREAD]; int groupval4[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int64_t key_segment1; __shared__ int64_t key_segment2; __shared__ int64_t key_segment3; __shared__ int64_t key_segment4; __shared__ int64_t aggr_segment1; __shared__ int64_t aggr_segment2; __shared__ int64_t segment_index; if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; if (pargs.key_idx1 != NULL) key_segment1 = pargs.key_idx1[segment_index]; if (pargs.key_idx2 != NULL) key_segment2 = pargs.key_idx2[segment_index]; if (pargs.key_idx3 != NULL) key_segment3 = pargs.key_idx3[segment_index]; if (pargs.key_idx4 != NULL) key_segment4 = pargs.key_idx4[segment_index]; if (gargs.aggr_idx1 != NULL) aggr_segment1 = gargs.aggr_idx1[segment_index]; if (gargs.aggr_idx2 != NULL) aggr_segment2 = gargs.aggr_idx2[segment_index]; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes ptr = gpuCache + key_segment1 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval1, 0, num_tile_items); } if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { ptr = gpuCache + key_segment2 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval2, 0, num_tile_items); } if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { ptr = gpuCache + key_segment3 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval3, 0, num_tile_items); } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { ptr = gpuCache + key_segment4 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval4, 0, num_tile_items); } if (gargs.aggr_idx1 != NULL) { ptr = gpuCache + aggr_segment1 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, aggrval1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 0, num_tile_items); } if (gargs.aggr_idx2 != NULL) { ptr = gpuCache + aggr_segment2 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, aggrval2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 0, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { int hash = ((groupval1[ITEM] - gargs.min_val1) * gargs.unique_val1 + (groupval2[ITEM] - gargs.min_val2) * gargs.unique_val2 + (groupval3[ITEM] - gargs.min_val3) * gargs.unique_val3 + (groupval4[ITEM] - gargs.min_val4) * gargs.unique_val4) % gargs.total_val; //! // printf("%d %d %d %d\n", groupval1[ITEM], groupval2[ITEM], groupval3[ITEM], groupval4[ITEM]); res[hash * 6] = groupval1[ITEM]; res[hash * 6 + 1] = groupval2[ITEM]; res[hash * 6 + 2] = groupval3[ITEM]; res[hash * 6 + 3] = groupval4[ITEM]; // int temp = (*(gargs.d_group_func))(aggrval1[ITEM], aggrval2[ITEM]); int temp; if (aggrval1[ITEM] > aggrval2[ITEM]) temp = aggrval1[ITEM] - aggrval2[ITEM]; else temp = aggrval2[ITEM] - aggrval1[ITEM]; atomicAdd(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp)); } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void probe_group_by_GPU3 (int* gpuCache, struct offsetGPU offset, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, int num_tuples, int* res) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int items_lo[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int groupval1[ITEMS_PER_THREAD]; int groupval2[ITEMS_PER_THREAD]; int groupval3[ITEMS_PER_THREAD]; int groupval4[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(offset.lo_off != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.lo_off + tile_offset, items_lo, num_tile_items); if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, groupval1, selection_flags, gpuCache, pargs.key_idx1, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else if (gargs.group_idx1 != NULL) { //we take the result from prev join in dim_off but we will also take the groupby column, here gargs.group_idx will be the groupby column (d_year, p_brand1, etc.) cudaAssert(offset.dim_off1 != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.dim_off1 + tile_offset, items, num_tile_items); BlockReadFilteredOffset<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval1, selection_flags, gpuCache, gargs.group_idx1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval1, 0, num_tile_items); } if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, groupval2, selection_flags, gpuCache, pargs.key_idx2, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else if (gargs.group_idx2 != NULL) { cudaAssert(offset.dim_off2 != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.dim_off2 + tile_offset, items, num_tile_items); BlockReadFilteredOffset<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval2, selection_flags, gpuCache, gargs.group_idx2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval2, 0, num_tile_items); } if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, groupval3, selection_flags, gpuCache, pargs.key_idx3, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else if (gargs.group_idx3 != NULL) { cudaAssert(offset.dim_off3 != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.dim_off3 + tile_offset, items, num_tile_items); BlockReadFilteredOffset<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval3, selection_flags, gpuCache, gargs.group_idx3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval3, 0, num_tile_items); } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, groupval4, selection_flags, gpuCache, pargs.key_idx4, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else if (gargs.group_idx4 != NULL) { cudaAssert(offset.dim_off4 != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.dim_off4 + tile_offset, items, num_tile_items); BlockReadFilteredOffset<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval4, selection_flags, gpuCache, gargs.group_idx4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval4, 0, num_tile_items); } if (gargs.aggr_idx1 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, aggrval1, gpuCache, gargs.aggr_idx1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 0, num_tile_items); } if (gargs.aggr_idx2 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, aggrval2, gpuCache, gargs.aggr_idx2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 0, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { int hash = ((groupval1[ITEM] - gargs.min_val1) * gargs.unique_val1 + (groupval2[ITEM] - gargs.min_val2) * gargs.unique_val2 + (groupval3[ITEM] - gargs.min_val3) * gargs.unique_val3 + (groupval4[ITEM] - gargs.min_val4) * gargs.unique_val4) % gargs.total_val; //! res[hash * 6] = groupval1[ITEM]; res[hash * 6 + 1] = groupval2[ITEM]; res[hash * 6 + 2] = groupval3[ITEM]; res[hash * 6 + 3] = groupval4[ITEM]; // int temp = (*(gargs.d_group_func))(aggrval1[ITEM], aggrval2[ITEM]); int temp; if (aggrval1[ITEM] > aggrval2[ITEM]) temp = aggrval1[ITEM] - aggrval2[ITEM]; else temp = aggrval2[ITEM] - aggrval1[ITEM]; atomicAdd(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp)); // if (gargs.mode == 0) atomicAdd(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(aggrval1[ITEM])); // else if (gargs.mode == 1) atomicAdd(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(aggrval1[ITEM] - aggrval2[ITEM])); // else if (gargs.mode == 2) atomicAdd(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(aggrval1[ITEM] * aggrval2[ITEM])); } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void filter_probe_GPU2( int* gpuCache, struct filterArgsGPU fargs, struct probeArgsGPU pargs, struct offsetGPU out_off, int num_tuples, int *total, int start_offset = 0, short* segment_group = NULL) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment int* ptr; int tile_idx = blockIdx.x % tiles_per_segment; // Current tile index int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; // int dim_offset1[ITEMS_PER_THREAD]; // int dim_offset2[ITEMS_PER_THREAD]; // int dim_offset3[ITEMS_PER_THREAD]; int dim_offset4[ITEMS_PER_THREAD]; int t_count = 0; // Number of items selected per thread int c_t_count = 0; //Prefix sum of t_count __shared__ int block_off; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } // __shared__ int64_t key_segment1; // __shared__ int64_t key_segment2; // __shared__ int64_t key_segment3; __shared__ int64_t key_segment4; __shared__ int64_t filter_segment1; __shared__ int64_t filter_segment2; __shared__ int64_t segment_index; if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; // if (pargs.key_idx1 != NULL) key_segment1 = pargs.key_idx1[segment_index]; // if (pargs.key_idx2 != NULL) key_segment2 = pargs.key_idx2[segment_index]; // if (pargs.key_idx3 != NULL) key_segment3 = pargs.key_idx3[segment_index]; if (pargs.key_idx4 != NULL) key_segment4 = pargs.key_idx4[segment_index]; if (fargs.filter_idx1 != NULL) filter_segment1 = fargs.filter_idx1[segment_index]; if (fargs.filter_idx2 != NULL) filter_segment2 = fargs.filter_idx2[segment_index]; } __syncthreads(); start_offset = segment_index * SEGMENT_SIZE; InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (fargs.filter_idx1 != NULL) { ptr = gpuCache + filter_segment1 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); // (*(fargs.d_filter_func1))(items, selection_flags, fargs.compare1, fargs.compare2, num_tile_items); } if (fargs.filter_idx2 != NULL) { ptr = gpuCache + filter_segment2 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare3, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare4, selection_flags, num_tile_items); // (*(fargs.d_filter_func2))(items, selection_flags, fargs.compare3, fargs.compare4, num_tile_items); } // if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) // ptr = gpuCache + key_segment1 * SEGMENT_SIZE; // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); // BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); // } else { //we are not doing join for this column, there is no result from prev join (first join) // BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset1, 1, num_tile_items); // } // if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { // ptr = gpuCache + key_segment2 * SEGMENT_SIZE; // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); // BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); // } else { // BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset2, 1, num_tile_items); // } // if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { // ptr = gpuCache + key_segment3 * SEGMENT_SIZE; // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); // BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); // } else { // BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset3, 1, num_tile_items); // } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { ptr = gpuCache + key_segment4 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset4, 1, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) t_count++; } } //Barrier __syncthreads(); // TODO: need to check logic for offset BlockScanInt(temp_storage.scan).ExclusiveSum(t_count, c_t_count); //doing a prefix sum of all the previous threads in the block and store it to c_t_count if(threadIdx.x == blockDim.x - 1) { //if the last thread in the block, add the prefix sum of all the prev threads + sum of my threads to global variable total block_off = atomicAdd(total, t_count+c_t_count); //the previous value of total is gonna be assigned to block_off } //block_off does not need to be global (it's just need to be shared), because it will get the previous value from total which is global __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { int offset = block_off + c_t_count++; out_off.lo_off[offset] = start_offset + tile_idx * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; // if (out_off.dim_off1 != NULL) out_off.dim_off1[offset] = dim_offset1[ITEM]; // if (out_off.dim_off2 != NULL) out_off.dim_off2[offset] = dim_offset2[ITEM]; // if (out_off.dim_off3 != NULL) out_off.dim_off3[offset] = dim_offset3[ITEM]; if (out_off.dim_off4 != NULL) out_off.dim_off4[offset] = dim_offset4[ITEM]; //printf("%d %d %d %d\n", dim_offset1[ITEM], dim_offset2[ITEM], dim_offset3[ITEM], dim_offset4[ITEM]); } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void filter_probe_GPU3( int* gpuCache, struct offsetGPU in_off, struct filterArgsGPU fargs, struct probeArgsGPU pargs, struct offsetGPU out_off, int num_tuples, int *total) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int items_lo[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; // int dim_offset1[ITEMS_PER_THREAD]; // int dim_offset2[ITEMS_PER_THREAD]; // int dim_offset3[ITEMS_PER_THREAD]; int dim_offset4[ITEMS_PER_THREAD]; int t_count = 0; // Number of items selected per thread int c_t_count = 0; //Prefix sum of t_count __shared__ int block_off; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(in_off.lo_off != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.lo_off + tile_offset, items_lo, num_tile_items); if (fargs.filter_idx1 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, items, gpuCache, fargs.filter_idx1, num_tile_items); BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); // (*(fargs.d_filter_func1))(items, selection_flags, fargs.compare1, fargs.compare2, num_tile_items); } if (fargs.filter_idx2 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, items, gpuCache, fargs.filter_idx2, num_tile_items); BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare3, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare4, selection_flags, num_tile_items); // (*(fargs.d_filter_func2))(items, selection_flags, fargs.compare3, fargs.compare4, num_tile_items); } // if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) // BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, items, gpuCache, pargs.key_idx1, num_tile_items); // BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); // } else if (in_off.dim_off1 != NULL) { //load result from prev join, we are just passing it through (no probing) // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off1 + tile_offset, dim_offset1, num_tile_items); // } else { //we are not doing join for this column, there is no result from prev join (first join) // BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset1, 1, num_tile_items); // } // if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { // BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, items, gpuCache, pargs.key_idx2, num_tile_items); // BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); // } else if (in_off.dim_off2 != NULL) { // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off2 + tile_offset, dim_offset2, num_tile_items); // } else { // BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset2, 1, num_tile_items); // } // if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { // BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, items, gpuCache, pargs.key_idx3, num_tile_items); // BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); // } else if (in_off.dim_off3 != NULL) { // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off3 + tile_offset, dim_offset3, num_tile_items); // } else { // BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset3, 1, num_tile_items); // } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, items, gpuCache, pargs.key_idx4, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else if (in_off.dim_off4 != NULL) { BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off4 + tile_offset, dim_offset4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset4, 1, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) t_count++; } } //Barrier __syncthreads(); // TODO: need to check logic for offset BlockScanInt(temp_storage.scan).ExclusiveSum(t_count, c_t_count); //doing a prefix sum of all the previous threads in the block and store it to c_t_count if(threadIdx.x == blockDim.x - 1) { //if the last thread in the block, add the prefix sum of all the prev threads + sum of my threads to global variable total block_off = atomicAdd(total, t_count+c_t_count); //the previous value of total is gonna be assigned to block_off } //block_off does not need to be global (it's just need to be shared), because it will get the previous value from total which is global __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { int offset = block_off + c_t_count++; out_off.lo_off[offset] = items_lo[ITEM]; // if (out_off.dim_off1 != NULL) out_off.dim_off1[offset] = dim_offset1[ITEM]; // if (out_off.dim_off2 != NULL) out_off.dim_off2[offset] = dim_offset2[ITEM]; // if (out_off.dim_off3 != NULL) out_off.dim_off3[offset] = dim_offset3[ITEM]; if (out_off.dim_off4 != NULL) out_off.dim_off4[offset] = dim_offset4[ITEM]; //printf("%d %d %d %d\n", dim_offset1[ITEM], dim_offset2[ITEM], dim_offset3[ITEM], dim_offset4[ITEM]); } } } } //the input is coming from shuffled column template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void probe_GPU( struct probeArgsGPU pargs, struct offsetGPU in_off, struct offsetGPU out_off, int num_tuples, int *total, int start_offset = 0) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int* ptr; // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int lo_offset[ITEMS_PER_THREAD]; int dim_offset1[ITEMS_PER_THREAD]; int dim_offset2[ITEMS_PER_THREAD]; int dim_offset3[ITEMS_PER_THREAD]; int dim_offset4[ITEMS_PER_THREAD]; int t_count = 0; // Number of items selected per thread int c_t_count = 0; //Prefix sum of t_count __shared__ int block_off; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) ptr = pargs.key_column[0]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset1, 1, num_tile_items); } if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) { ptr = pargs.key_column[1]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset2, 1, num_tile_items); } if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) { ptr = pargs.key_column[2]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset3, 1, num_tile_items); } if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) { ptr = pargs.key_column[3]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset4, 1, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) t_count++; } } //Barrier __syncthreads(); // TODO: need to check logic for offset BlockScanInt(temp_storage.scan).ExclusiveSum(t_count, c_t_count); //doing a prefix sum of all the previous threads in the block and store it to c_t_count if(threadIdx.x == blockDim.x - 1) { //if the last thread in the block, add the prefix sum of all the prev threads + sum of my threads to global variable total block_off = atomicAdd(total, t_count+c_t_count); //the previous value of total is gonna be assigned to block_off } //block_off does not need to be global (it's just need to be shared), because it will get the previous value from total which is global __syncthreads(); cudaAssert(in_off.lo_off != NULL); //offset coming from previous join BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.lo_off + tile_offset, lo_offset, selection_flags, num_tile_items); if (in_off.dim_off1 != NULL && pargs.key_column[0] == NULL) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off1 + tile_offset, dim_offset1, selection_flags, num_tile_items); // printf("supplier off %d %d %d %d %d %d %d %d\n", selection_flags[0], selection_flags[1], selection_flags[2], selection_flags[3], dim_offset1[0], dim_offset1[1], dim_offset1[2], dim_offset1[3]); } if (in_off.dim_off2 != NULL && pargs.key_column[1] == NULL) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off2 + tile_offset, dim_offset2, selection_flags, num_tile_items); } if (in_off.dim_off3 != NULL && pargs.key_column[2] == NULL) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off3 + tile_offset, dim_offset3, selection_flags, num_tile_items); } if (in_off.dim_off4 != NULL && pargs.key_column[3] == NULL) { BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off4 + tile_offset, dim_offset4, selection_flags, num_tile_items); } cudaAssert(out_off.join_result_off != NULL); cudaAssert(out_off.join_result_off[0] != NULL); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { int offset = block_off + c_t_count++; out_off.join_result_off[0][offset] = lo_offset[ITEM]; if (out_off.join_result_off[1] != NULL) out_off.join_result_off[1][offset] = dim_offset1[ITEM]; //global offset if (out_off.join_result_off[2] != NULL) out_off.join_result_off[2][offset] = dim_offset2[ITEM]; //global offset if (out_off.join_result_off[3] != NULL) out_off.join_result_off[3][offset] = dim_offset3[ITEM]; //global offset if (out_off.join_result_off[4] != NULL) out_off.join_result_off[4][offset] = dim_offset4[ITEM]; //global offset } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void probe_GPU2( int* gpuCache, struct probeArgsGPU pargs, struct offsetGPU out_off, int num_tuples, int *total, int start_offset = 0, short* segment_group = NULL) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment int* ptr; int tile_idx = blockIdx.x % tiles_per_segment; // Current tile index int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int dim_offset1[ITEMS_PER_THREAD]; int dim_offset2[ITEMS_PER_THREAD]; int dim_offset3[ITEMS_PER_THREAD]; int dim_offset4[ITEMS_PER_THREAD]; int t_count = 0; // Number of items selected per thread int c_t_count = 0; //Prefix sum of t_count __shared__ int block_off; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int64_t key_segment1; __shared__ int64_t key_segment2; __shared__ int64_t key_segment3; __shared__ int64_t key_segment4; __shared__ int64_t segment_index; if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; if (pargs.key_idx1 != NULL) key_segment1 = pargs.key_idx1[segment_index]; if (pargs.key_idx2 != NULL) key_segment2 = pargs.key_idx2[segment_index]; if (pargs.key_idx3 != NULL) key_segment3 = pargs.key_idx3[segment_index]; if (pargs.key_idx4 != NULL) key_segment4 = pargs.key_idx4[segment_index]; } __syncthreads(); start_offset = segment_index * SEGMENT_SIZE; InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) ptr = gpuCache + key_segment1 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset1, 1, num_tile_items); } if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { ptr = gpuCache + key_segment2 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset2, 1, num_tile_items); } if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { ptr = gpuCache + key_segment3 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset3, 1, num_tile_items); } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { ptr = gpuCache + key_segment4 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset4, 1, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) t_count++; } } //Barrier __syncthreads(); // TODO: need to check logic for offset BlockScanInt(temp_storage.scan).ExclusiveSum(t_count, c_t_count); //doing a prefix sum of all the previous threads in the block and store it to c_t_count if(threadIdx.x == blockDim.x - 1) { //if the last thread in the block, add the prefix sum of all the prev threads + sum of my threads to global variable total block_off = atomicAdd(total, t_count+c_t_count); //the previous value of total is gonna be assigned to block_off } //block_off does not need to be global (it's just need to be shared), because it will get the previous value from total which is global __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { int offset = block_off + c_t_count++; out_off.lo_off[offset] = start_offset + tile_idx * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; if (out_off.dim_off1 != NULL) out_off.dim_off1[offset] = dim_offset1[ITEM]; if (out_off.dim_off2 != NULL) out_off.dim_off2[offset] = dim_offset2[ITEM]; if (out_off.dim_off3 != NULL) out_off.dim_off3[offset] = dim_offset3[ITEM]; if (out_off.dim_off4 != NULL) out_off.dim_off4[offset] = dim_offset4[ITEM]; //printf("%d %d %d %d\n", dim_offset1[ITEM], dim_offset2[ITEM], dim_offset3[ITEM], dim_offset4[ITEM]); } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void probe_GPU3( int* gpuCache, struct offsetGPU in_off, struct probeArgsGPU pargs, struct offsetGPU out_off, int num_tuples, int *total) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int items_lo[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int dim_offset1[ITEMS_PER_THREAD]; int dim_offset2[ITEMS_PER_THREAD]; int dim_offset3[ITEMS_PER_THREAD]; int dim_offset4[ITEMS_PER_THREAD]; int t_count = 0; // Number of items selected per thread int c_t_count = 0; //Prefix sum of t_count __shared__ int block_off; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(in_off.lo_off != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.lo_off + tile_offset, items_lo, num_tile_items); if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation) BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, items, gpuCache, pargs.key_idx1, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } else if (in_off.dim_off1 != NULL) { //load result from prev join, we are just passing it through (no probing) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off1 + tile_offset, dim_offset1, num_tile_items); } else { //we are not doing join for this column, there is no result from prev join (first join) BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset1, 1, num_tile_items); } if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, items, gpuCache, pargs.key_idx2, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } else if (in_off.dim_off2 != NULL) { BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off2 + tile_offset, dim_offset2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset2, 1, num_tile_items); } if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, items, gpuCache, pargs.key_idx3, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } else if (in_off.dim_off3 != NULL) { BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off3 + tile_offset, dim_offset3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset3, 1, num_tile_items); } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, items, gpuCache, pargs.key_idx4, num_tile_items); BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, dim_offset4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } else if (in_off.dim_off4 != NULL) { BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(in_off.dim_off4 + tile_offset, dim_offset4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, dim_offset4, 1, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) t_count++; } } //Barrier __syncthreads(); // TODO: need to check logic for offset BlockScanInt(temp_storage.scan).ExclusiveSum(t_count, c_t_count); //doing a prefix sum of all the previous threads in the block and store it to c_t_count if(threadIdx.x == blockDim.x - 1) { //if the last thread in the block, add the prefix sum of all the prev threads + sum of my threads to global variable total block_off = atomicAdd(total, t_count+c_t_count); //the previous value of total is gonna be assigned to block_off } //block_off does not need to be global (it's just need to be shared), because it will get the previous value from total which is global __syncthreads(); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { int offset = block_off + c_t_count++; // printf("%d\n", items_lo[ITEM]); out_off.lo_off[offset] = items_lo[ITEM]; // printf("%d\n", items_lo[ITEM]); if (out_off.dim_off1 != NULL) out_off.dim_off1[offset] = dim_offset1[ITEM]; if (out_off.dim_off2 != NULL) out_off.dim_off2[offset] = dim_offset2[ITEM]; if (out_off.dim_off3 != NULL) out_off.dim_off3[offset] = dim_offset3[ITEM]; if (out_off.dim_off4 != NULL) out_off.dim_off4[offset] = dim_offset4[ITEM]; // printf("%d %d %d %d\n", dim_offset1[ITEM], dim_offset2[ITEM], dim_offset3[ITEM], dim_offset4[ITEM]); } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void build_GPU( int* gpuCache, int* off_to_seg_id, struct filterArgsGPU fargs, struct buildArgsGPU bargs, int num_tuples, int* hash_table, int start_offset = 0, float selectivity = 1.0) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int items[ITEMS_PER_THREAD]; int vals[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; int tile_idx = blockIdx.x % tiles_per_segment; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int64_t segment_index; __shared__ int64_t val_segment; __shared__ int64_t key_segment; __shared__ int64_t filter_segment; if (threadIdx.x == 0) { cudaAssert(off_to_seg_id != NULL); segment_index = off_to_seg_id[(blockIdx.x / tiles_per_segment)]; if (bargs.broadcast_filter_idx != NULL) filter_segment = bargs.broadcast_filter_idx[segment_index]; if (bargs.broadcast_key_idx != NULL) key_segment = bargs.broadcast_key_idx[segment_index]; if (bargs.broadcast_val_idx != NULL) val_segment = bargs.broadcast_val_idx[segment_index]; cudaAssert(filter_segment != -1); cudaAssert(key_segment != -1); cudaAssert(val_segment != -1); } __syncthreads(); start_offset = segment_index * SEGMENT_SIZE; // if (threadIdx.x == 0) { // printf("%d %d %d\n", blockIdx.x, (blockIdx.x / tiles_per_segment), off_to_seg_id[(blockIdx.x / tiles_per_segment)]); // } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (bargs.broadcast_filter_idx != NULL) { int* ptr = gpuCache + filter_segment * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); if (fargs.mode1 == 0) { //equal to BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); } else if (fargs.mode1 == 1) { //between BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); } else if (fargs.mode1 == 2) { //equal or equal BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); } // (*(fargs.d_filter_func1))(items, selection_flags, fargs.compare1, fargs.compare2, num_tile_items); } int* ptr_key = gpuCache + key_segment * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr_key + segment_tile_offset, items, num_tile_items); // printf("thread %d key %d\n", threadIdx.x, items[0]); if (bargs.val_idx != NULL) { int* ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, vals, num_tile_items); BlockBuildValueGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, tile_idx, start_offset, items, vals, selection_flags, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); } else { BlockBuildOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, tile_idx, start_offset, items, selection_flags, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); } // if (fargs.filter_col != NULL) { // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(fargs.filter_col + tile_offset, items, num_tile_items); // if (fargs.mode1 == 0) { //equal to // BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); // } else if (fargs.mode1 == 1) { //between // BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); // BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); // } else if (fargs.mode1 == 2) { //equal or equal // BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); // BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); // } // } // cudaAssert(bargs.key_col != NULL); // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(bargs.key_col + tile_offset, items, num_tile_items); // // printf("thread %d key %d\n", threadIdx.x, items[0]); // if (bargs.val_col != NULL) { // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(bargs.val_col + tile_offset, vals, num_tile_items); // BlockBuildValueGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, tile_idx, start_offset, // items, vals, selection_flags, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); // } else { // BlockBuildOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, tile_idx, start_offset, // items, selection_flags, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); // } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void build_GPU2( int* gpuCache, struct filterArgsGPU fargs, struct buildArgsGPU bargs, int num_tuples, int* hash_table, int start_offset = 0, short* segment_group = NULL, float selectivity = 1.0) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int items[ITEMS_PER_THREAD]; int vals[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; int tile_idx = blockIdx.x % tiles_per_segment; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int64_t segment_index; __shared__ int64_t val_segment; __shared__ int64_t key_segment; __shared__ int64_t filter_segment; cudaAssert(bargs.key_idx != NULL); if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; if (bargs.val_idx != NULL) val_segment = bargs.val_idx[segment_index]; if (bargs.key_idx != NULL) key_segment = bargs.key_idx[segment_index]; if (fargs.filter_idx1 != NULL) filter_segment = fargs.filter_idx1[segment_index]; } __syncthreads(); start_offset = segment_index * SEGMENT_SIZE; InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); // int x = (int) 1/selectivity; // for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { // int row_id = start_offset + tile_idx * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; // if (row_id % x == 0) { // selection_flags[ITEM] = 1; // } else { // selection_flags[ITEM] = 0; // } // } if (fargs.filter_idx1 != NULL) { int* ptr = gpuCache + filter_segment * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); if (fargs.mode1 == 0) { //equal to BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); } else if (fargs.mode1 == 1) { //between BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); } else if (fargs.mode1 == 2) { //equal or equal BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); } // (*(fargs.d_filter_func1))(items, selection_flags, fargs.compare1, fargs.compare2, num_tile_items); } int* ptr_key = gpuCache + key_segment * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr_key + segment_tile_offset, items, num_tile_items); // printf("thread %d key %d\n", threadIdx.x, items[0]); // for (int i = 0; i < ITEMS_PER_THREAD; i++) { // if (items[i] < 0 || items[i] > 41943040) // printf("%d %d %lld %lld %d\n", blockIdx.x, threadIdx.x, segment_index, key_segment, items[i]); // cudaAssert(items[i] >= 0 && items[i] < 41943040); // } if (bargs.val_idx != NULL) { int* ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, vals, num_tile_items); BlockBuildValueGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, tile_idx, start_offset, items, vals, selection_flags, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); } else { // if (threadIdx.x == 0 && blockIdx.x == 0) printf("im here inside kernel\n"); BlockBuildOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, tile_idx, start_offset, items, selection_flags, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void build_GPU3( int* gpuCache, int* dim_off, struct filterArgsGPU fargs, struct buildArgsGPU bargs, int num_tuples, int* hash_table, float selectivity = 1.0) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); // int x = (int) 1/selectivity; // for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { // int row_id = blockIdx.x * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; // if (row_id % x == 0) { // selection_flags[ITEM] = 1; // } else { // selection_flags[ITEM] = 0; // } // } cudaAssert(fargs.filter_idx1 == NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(dim_off + tile_offset, items, num_tile_items); if (bargs.val_idx != NULL) { BlockBuildValueGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, selection_flags, gpuCache, bargs.key_idx, bargs.val_idx, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); } else { BlockBuildOffsetGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, selection_flags, gpuCache, bargs.key_idx, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void build_GPU4( int* gpuCache, int* dim_off, struct filterArgsGPU fargs, struct buildArgsGPU bargs, int num_tuples, int* hash_table, float selectivity = 1.0) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(fargs.filter_idx1 == NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(dim_off + tile_offset, items, num_tile_items); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int64_t dimkey_seg = bargs.key_idx[items[ITEM] / SEGMENT_SIZE]; if (dimkey_seg == -1) { cudaAssert(bargs.broadcast_key_idx != NULL); dimkey_seg = bargs.broadcast_key_idx[items[ITEM] / SEGMENT_SIZE]; // dimkey_seg = 0; } cudaAssert(dimkey_seg != -1); int key = gpuCache[dimkey_seg * SEGMENT_SIZE + (items[ITEM] % SEGMENT_SIZE)]; int hash = HASH(key, bargs.num_slots, bargs.val_min); if (bargs.val_idx != NULL) { int64_t dimval_seg = bargs.val_idx[items[ITEM] / SEGMENT_SIZE]; if (dimval_seg == -1) { cudaAssert(bargs.broadcast_val_idx != NULL); dimval_seg = bargs.broadcast_val_idx[items[ITEM] / SEGMENT_SIZE]; } cudaAssert(dimval_seg != -1); int val = gpuCache[dimval_seg * SEGMENT_SIZE + (items[ITEM] % SEGMENT_SIZE)]; atomicCAS(&hash_table[hash << 1], 0, val); } hash_table[(hash << 1) + 1] = items[ITEM] + 1; } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void build_GPU5( int* dim_off, struct buildArgsGPU bargs, int num_tuples, int* hash_table, float selectivity = 1.0) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int items[ITEMS_PER_THREAD]; int vals[ITEMS_PER_THREAD]; int item_off[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(bargs.key_col != NULL); cudaAssert(dim_off != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(bargs.key_col + tile_offset, items, num_tile_items); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(dim_off + tile_offset, item_off, num_tile_items); if (bargs.val_col != NULL) { BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(bargs.val_col + tile_offset, vals, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { if (selection_flags[ITEM]) { int hash = HASH(items[ITEM], bargs.num_slots, bargs.val_min); if (bargs.val_col != NULL) { atomicCAS(&hash_table[hash << 1], 0, vals[ITEM]); } // if (blockIdx.x == 0) printf("%d %d %d %d\n", hash, bargs.num_slots, bargs.val_min, items[ITEM]); hash_table[(hash << 1) + 1] = item_off[ITEM] + 1; } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void build_GPU_minmax( int* gpuCache, struct filterArgsGPU fargs, struct buildArgsGPU bargs, int num_tuples, int* hash_table, int* min_global, int* max_global, int start_offset = 0, short* segment_group = NULL) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int items[ITEMS_PER_THREAD]; int vals[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; int tile_idx = blockIdx.x % tiles_per_segment; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } int min = bargs.val_max, max = bargs.val_min; __shared__ int64_t segment_index; __shared__ int64_t val_segment; __shared__ int64_t key_segment; __shared__ int64_t filter_segment; __shared__ int min_shared; __shared__ int max_shared; if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; if (bargs.val_idx != NULL) val_segment = bargs.val_idx[segment_index]; if (bargs.key_idx != NULL) key_segment = bargs.key_idx[segment_index]; if (fargs.filter_idx1 != NULL) filter_segment = fargs.filter_idx1[segment_index]; min_shared = bargs.val_max; max_shared = bargs.val_min; } __syncthreads(); start_offset = segment_index * SEGMENT_SIZE; InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (fargs.filter_idx1 != NULL) { int* ptr = gpuCache + filter_segment * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); (*(fargs.d_filter_func1))(items, selection_flags, fargs.compare1, fargs.compare2, num_tile_items); } cudaAssert(bargs.key_idx != NULL); int* ptr_key = gpuCache + key_segment * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr_key + segment_tile_offset, items, num_tile_items); BlockMinMaxGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, selection_flags, min, max, num_tile_items); if (bargs.val_idx != NULL) { int* ptr = gpuCache + val_segment * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, vals, num_tile_items); BlockBuildValueGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, tile_idx, start_offset, items, vals, selection_flags, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); } else { BlockBuildOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, tile_idx, start_offset, items, selection_flags, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); } __syncthreads(); atomicMin(&min_shared, min); atomicMax(&max_shared, max); __syncthreads(); if (threadIdx.x == 0) { atomicMin(min_global, min_shared); atomicMax(max_global, max_shared); } __syncthreads(); } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void build_GPU_minmax2( int* gpuCache, int* dim_off, struct filterArgsGPU fargs, struct buildArgsGPU bargs, int num_tuples, int* hash_table, int* min_global, int* max_global) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } int min = bargs.val_max, max = bargs.val_min; __shared__ int min_shared; __shared__ int max_shared; if (threadIdx.x == 0) { min_shared = bargs.val_max; max_shared = bargs.val_min; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(fargs.filter_idx1 == NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(dim_off + tile_offset, items, num_tile_items); BlockMinMaxGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, selection_flags, gpuCache, bargs.key_idx, min, max, num_tile_items); if (bargs.val_idx != NULL) { BlockBuildValueGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, selection_flags, gpuCache, bargs.key_idx, bargs.val_idx, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); } else { BlockBuildOffsetGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, selection_flags, gpuCache, bargs.key_idx, hash_table, bargs.num_slots, bargs.val_min, num_tile_items); } __syncthreads(); atomicMin(&min_shared, min); atomicMax(&max_shared, max); __syncthreads(); if (threadIdx.x == 0) { atomicMin(min_global, min_shared); atomicMax(max_global, max_shared); } __syncthreads(); } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void filter_GPU2( int* gpuCache, struct filterArgsGPU fargs, int* out_off, int num_tuples, int* total, int start_offset = 0, short* segment_group = NULL) { typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int t_count = 0; // Number of items selected per thread int c_t_count = 0; //Prefix sum of t_count __shared__ int block_off; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int tiles_per_segment = SEGMENT_SIZE/tile_size; int tile_idx = blockIdx.x % tiles_per_segment; // Current tile index int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int64_t filter_segment1; __shared__ int64_t filter_segment2; __shared__ int64_t segment_index; if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; if (fargs.filter_idx1 != NULL) filter_segment1 = fargs.filter_idx1[segment_index]; if (fargs.filter_idx2 != NULL) filter_segment2 = fargs.filter_idx2[segment_index]; } __syncthreads(); start_offset = segment_index * SEGMENT_SIZE; InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (fargs.filter_idx1 != NULL) { int* ptr = gpuCache + filter_segment1 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); if (fargs.mode1 == 0) { //equal to BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); } else if (fargs.mode1 == 1) { //between BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); } else if (fargs.mode1 == 2) { //equal or equal BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); } // (*(fargs.d_filter_func1))(items, selection_flags, fargs.compare1, fargs.compare2, num_tile_items); } if (fargs.filter_idx2 != NULL) { int* ptr = gpuCache + filter_segment2 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); if (fargs.mode2 == 0) { //equal to BlockPredAndEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare3, selection_flags, num_tile_items); } else if (fargs.mode2 == 1) { //between BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare3, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare4, selection_flags, num_tile_items); } else if (fargs.mode2 == 2) { //equal or equal BlockPredAndEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare3, selection_flags, num_tile_items); BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare4, selection_flags, num_tile_items); } // (*(fargs.d_filter_func2))(items, selection_flags, fargs.compare3, fargs.compare4, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) t_count++; } } //Barrier __syncthreads(); // TODO: need to check logic for offset BlockScanInt(temp_storage.scan).ExclusiveSum(t_count, c_t_count); //doing a prefix sum of all the previous threads in the block and store it to c_t_count if(threadIdx.x == blockDim.x - 1) { //if the last thread in the block, add the prefix sum of all the prev threads + sum of my threads to global variable total block_off = atomicAdd(total, t_count+c_t_count); //the previous value of total is gonna be assigned to block_off } //block_off does not need to be global (it's just need to be shared), because it will get the previous value from total which is global __syncthreads(); cudaAssert(out_off != NULL); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { int offset = block_off + c_t_count++; out_off[offset] = start_offset + tile_idx * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void filter_GPU3( int* gpuCache, int* off_col, struct filterArgsGPU fargs, int* out_off, int num_tuples, int* total) { typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt; //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) // Allocate shared memory for BlockLoad __shared__ union TempStorage { typename BlockScanInt::TempStorage scan; } temp_storage; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int items_off[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int t_count = 0; // Number of items selected per thread int c_t_count = 0; //Prefix sum of t_count __shared__ int block_off; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } cudaAssert(off_col != NULL); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (fargs.filter_idx1 != NULL) { BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(off_col + tile_offset, items_off, num_tile_items); BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_off, items, gpuCache, fargs.filter_idx1, num_tile_items); if (fargs.mode1 == 0) { //equal to BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); } else if (fargs.mode1 == 1) { //between BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); } else if (fargs.mode1 == 2) { //equal or equal BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); } // (*(fargs.d_filter_func1))(items, selection_flags, fargs.compare1, fargs.compare2, num_tile_items); } if (fargs.filter_idx2 != NULL) { BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(off_col + tile_offset, items_off, num_tile_items); BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_off, items, gpuCache, fargs.filter_idx2, num_tile_items); if (fargs.mode2 == 0) { //equal to BlockPredAndEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare3, selection_flags, num_tile_items); } else if (fargs.mode2 == 1) { //between BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare3, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare4, selection_flags, num_tile_items); } else if (fargs.mode2 == 2) { //equal or equal BlockPredAndEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare3, selection_flags, num_tile_items); BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare4, selection_flags, num_tile_items); } // (*(fargs.d_filter_func2))(items, selection_flags, fargs.compare3, fargs.compare4, num_tile_items); } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) t_count++; } } //Barrier __syncthreads(); // TODO: need to check logic for offset BlockScanInt(temp_storage.scan).ExclusiveSum(t_count, c_t_count); //doing a prefix sum of all the previous threads in the block and store it to c_t_count if(threadIdx.x == blockDim.x - 1) { //if the last thread in the block, add the prefix sum of all the prev threads + sum of my threads to global variable total block_off = atomicAdd(total, t_count+c_t_count); //the previous value of total is gonna be assigned to block_off } //block_off does not need to be global (it's just need to be shared), because it will get the previous value from total which is global __syncthreads(); cudaAssert(out_off != NULL); #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if(selection_flags[ITEM]) { int offset = block_off + c_t_count++; out_off[offset] = items_off[ITEM]; } } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void groupByGPU( int* gpuCache, struct offsetGPU offset, struct groupbyArgsGPU gargs, int num_tuples, int* res) { int items_off[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int groupval1[ITEMS_PER_THREAD]; int groupval2[ITEMS_PER_THREAD]; int groupval3[ITEMS_PER_THREAD]; int groupval4[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } cudaAssert(offset.lo_off != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.lo_off + tile_offset, items_off, num_tile_items); if (gargs.aggr_idx1 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_off, aggrval1, gpuCache, gargs.aggr_idx1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 0, num_tile_items); } if (gargs.aggr_idx2 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_off, aggrval2, gpuCache, gargs.aggr_idx2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 0, num_tile_items); } if (gargs.group_idx1 != NULL) { cudaAssert(offset.dim_off1 != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.dim_off1 + tile_offset, items_off, num_tile_items); BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_off, groupval1, gpuCache, gargs.group_idx1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval1, 0, num_tile_items); } if (gargs.group_idx2 != NULL) { cudaAssert(offset.dim_off2 != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.dim_off2 + tile_offset, items_off, num_tile_items); BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_off, groupval2, gpuCache, gargs.group_idx2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval2, 0, num_tile_items); } if (gargs.group_idx3 != NULL) { cudaAssert(offset.dim_off3 != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.dim_off3 + tile_offset, items_off, num_tile_items); BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_off, groupval3, gpuCache, gargs.group_idx3, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval3, 0, num_tile_items); } if (gargs.group_idx4 != NULL) { cudaAssert(offset.dim_off4 != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.dim_off4 + tile_offset, items_off, num_tile_items); BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_off, groupval4, gpuCache, gargs.group_idx4, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval4, 0, num_tile_items); } // if (gargs.mode == 0) { // cudaAssert(gargs.aggr_idx1 != NULL); // } else if (gargs.mode == 1) { // cudaAssert(gargs.aggr_idx1 != NULL && gargs.aggr_idx2 != NULL); // } else if (gargs.mode == 2) { // cudaAssert(gargs.aggr_idx1 != NULL && gargs.aggr_idx2 != NULL); // } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { int hash = ((groupval1[ITEM] - gargs.min_val1) * gargs.unique_val1 + (groupval2[ITEM] - gargs.min_val2) * gargs.unique_val2 + (groupval3[ITEM] - gargs.min_val3) * gargs.unique_val3 + (groupval4[ITEM] - gargs.min_val4) * gargs.unique_val4) % gargs.total_val; //! //printf("%d %d %d %d\n", groupval1[ITEM], groupval2[ITEM], groupval3[ITEM], groupval4[ITEM]); res[hash * 6] = groupval1[ITEM]; res[hash * 6 + 1] = groupval2[ITEM]; res[hash * 6 + 2] = groupval3[ITEM]; res[hash * 6 + 3] = groupval4[ITEM]; // if (gargs.mode == 0) temp = aggrval1[ITEM]; // else if (gargs.mode == 1) temp = aggrval1[ITEM] - aggrval2[ITEM]; // else if (gargs.mode == 2) temp = aggrval1[ITEM] * aggrval2[ITEM]; // int temp = (*(gargs.d_group_func))(aggrval1[ITEM], aggrval2[ITEM]); int temp; if (aggrval1[ITEM] > aggrval2[ITEM]) temp = aggrval1[ITEM] - aggrval2[ITEM]; else temp = aggrval2[ITEM] - aggrval1[ITEM]; atomicAdd(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp)); } } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void aggregationGPU( int* gpuCache, int* lo_off, struct groupbyArgsGPU gargs, int num_tuples, int* res) { int items_off[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } cudaAssert(lo_off != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(lo_off + tile_offset, items_off, num_tile_items); if (gargs.aggr_idx1 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_off, aggrval1, gpuCache, gargs.aggr_idx1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items); } if (gargs.aggr_idx2 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_off, aggrval2, gpuCache, gargs.aggr_idx2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items); } // if (gargs.mode == 0) { // cudaAssert(gargs.aggr_idx1 != NULL); // } else if (gargs.mode == 1) { // cudaAssert(gargs.aggr_idx1 != NULL && gargs.aggr_idx2 != NULL); // } else if (gargs.mode == 2) { // cudaAssert(gargs.aggr_idx1 != NULL && gargs.aggr_idx2 != NULL); // } long long sum = 0; #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { // if (gargs.mode == 0) sum += aggrval1[ITEM]; // else if (gargs.mode == 1) sum+= (aggrval1[ITEM] - aggrval2[ITEM]); // else if (gargs.mode == 2) sum+= (aggrval1[ITEM] * aggrval2[ITEM]); // sum += (*(gargs.d_group_func))(aggrval1[ITEM], aggrval2[ITEM]); sum+= (aggrval1[ITEM] * aggrval2[ITEM]); } } __syncthreads(); static __shared__ long long buffer[32]; unsigned long long aggregate = BlockSum<long long, BLOCK_THREADS, ITEMS_PER_THREAD>(sum, (long long*)buffer); __syncthreads(); if (threadIdx.x == 0) { atomicAdd(reinterpret_cast<unsigned long long*>(&res[4]), aggregate); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void probe_aggr_GPU( struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, int num_tuples, int* res, int start_offset = 0) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int* ptr; // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int temp[ITEMS_PER_THREAD]; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes ptr = pargs.key_column[0]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) { ptr = pargs.key_column[1]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) { ptr = pargs.key_column[2]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) { ptr = pargs.key_column[3]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } if (gargs.aggr_column[0] != NULL) { ptr = gargs.aggr_column[0]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, aggrval1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items); } if (gargs.aggr_column[1] != NULL) { ptr = gargs.aggr_column[1]; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, aggrval2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items); } long long sum = 0; #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { sum+= (aggrval1[ITEM] * aggrval2[ITEM]); } } } __syncthreads(); static __shared__ long long buffer[32]; unsigned long long aggregate = BlockSum<long long, BLOCK_THREADS, ITEMS_PER_THREAD>(sum, (long long*)buffer); __syncthreads(); if (threadIdx.x == 0) { atomicAdd(reinterpret_cast<unsigned long long*>(&res[4]), aggregate); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void probe_aggr_GPU2( int* gpuCache, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, int num_tuples, int* res, int start_offset = 0, short* segment_group = NULL) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int* ptr; int tiles_per_segment = SEGMENT_SIZE/tile_size; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int temp[ITEMS_PER_THREAD]; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int64_t key_segment1; __shared__ int64_t key_segment2; __shared__ int64_t key_segment3; __shared__ int64_t key_segment4; __shared__ int64_t aggr_segment1; __shared__ int64_t aggr_segment2; __shared__ int64_t segment_index; if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; if (pargs.key_idx1 != NULL) key_segment1 = pargs.key_idx1[segment_index]; if (pargs.key_idx2 != NULL) key_segment2 = pargs.key_idx2[segment_index]; if (pargs.key_idx3 != NULL) key_segment3 = pargs.key_idx3[segment_index]; if (pargs.key_idx4 != NULL) key_segment4 = pargs.key_idx4[segment_index]; if (gargs.aggr_idx1 != NULL) aggr_segment1 = gargs.aggr_idx1[segment_index]; if (gargs.aggr_idx2 != NULL) aggr_segment2 = gargs.aggr_idx2[segment_index]; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes ptr = gpuCache + key_segment1 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { ptr = gpuCache + key_segment2 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { ptr = gpuCache + key_segment3 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { ptr = gpuCache + key_segment4 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } if (gargs.aggr_idx1 != NULL) { ptr = gpuCache + aggr_segment1 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, aggrval1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items); } if (gargs.aggr_idx2 != NULL) { ptr = gpuCache + aggr_segment2 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, aggrval2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items); } // if (gargs.mode == 0) { // cudaAssert(gargs.aggr_idx1 != NULL); // } else if (gargs.mode == 1) { // cudaAssert(gargs.aggr_idx1 != NULL && gargs.aggr_idx2 != NULL); // } else if (gargs.mode == 2) { // cudaAssert(gargs.aggr_idx1 != NULL && gargs.aggr_idx2 != NULL); // } long long sum = 0; #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { // if (gargs.mode == 0) sum += aggrval1[ITEM]; // else if (gargs.mode == 1) sum+= (aggrval1[ITEM] - aggrval2[ITEM]); // else if (gargs.mode == 2) sum+= (aggrval1[ITEM] * aggrval2[ITEM]); // sum += (*(gargs.d_group_func))(aggrval1[ITEM], aggrval2[ITEM]); sum+= (aggrval1[ITEM] * aggrval2[ITEM]); } } } __syncthreads(); static __shared__ long long buffer[32]; unsigned long long aggregate = BlockSum<long long, BLOCK_THREADS, ITEMS_PER_THREAD>(sum, (long long*)buffer); __syncthreads(); if (threadIdx.x == 0) { atomicAdd(reinterpret_cast<unsigned long long*>(&res[4]), aggregate); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void probe_aggr_GPU3( int* gpuCache, struct offsetGPU offset, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, int num_tuples, int* res) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; // Load a segment of consecutive items that are blocked across threads int items_lo[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int temp[ITEMS_PER_THREAD]; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(offset.lo_off != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.lo_off + tile_offset, items_lo, num_tile_items); if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, temp, selection_flags, gpuCache, pargs.key_idx1, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, temp, selection_flags, gpuCache, pargs.key_idx2, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, temp, selection_flags, gpuCache, pargs.key_idx3, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, temp, selection_flags, gpuCache, pargs.key_idx4, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } if (gargs.aggr_idx1 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, aggrval1, gpuCache, gargs.aggr_idx1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items); } if (gargs.aggr_idx2 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, aggrval2, gpuCache, gargs.aggr_idx2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items); } // if (gargs.mode == 0) { // cudaAssert(gargs.aggr_idx1 != NULL); // } else if (gargs.mode == 1) { // cudaAssert(gargs.aggr_idx1 != NULL && gargs.aggr_idx2 != NULL); // } else if (gargs.mode == 2) { // cudaAssert(gargs.aggr_idx1 != NULL && gargs.aggr_idx2 != NULL); // } long long sum = 0; #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { // if (gargs.mode == 0) sum += aggrval1[ITEM]; // else if (gargs.mode == 1) sum+= (aggrval1[ITEM] - aggrval2[ITEM]); // else if (gargs.mode == 2) sum+= (aggrval1[ITEM] * aggrval2[ITEM]); // sum += (*(gargs.d_group_func))(aggrval1[ITEM], aggrval2[ITEM]); sum+= (aggrval1[ITEM] * aggrval2[ITEM]); } } } __syncthreads(); static __shared__ long long buffer[32]; unsigned long long aggregate = BlockSum<long long, BLOCK_THREADS, ITEMS_PER_THREAD>(sum, (long long*)buffer); __syncthreads(); if (threadIdx.x == 0) { atomicAdd(reinterpret_cast<unsigned long long*>(&res[4]), aggregate); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void filter_probe_aggr_GPU2( int* gpuCache, struct filterArgsGPU fargs, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, int num_tuples, int* res, int start_offset = 0, short* segment_group = NULL) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int* ptr; int tiles_per_segment = SEGMENT_SIZE/tile_size; int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment // Load a segment of consecutive items that are blocked across threads int items[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int temp[ITEMS_PER_THREAD]; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } __shared__ int64_t key_segment1; __shared__ int64_t key_segment2; __shared__ int64_t key_segment3; __shared__ int64_t key_segment4; __shared__ int64_t aggr_segment1; __shared__ int64_t aggr_segment2; __shared__ int64_t filter_segment1; __shared__ int64_t filter_segment2; __shared__ int64_t segment_index; if (threadIdx.x == 0) { if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE]; else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE; if (pargs.key_idx1 != NULL) key_segment1 = pargs.key_idx1[segment_index]; if (pargs.key_idx2 != NULL) key_segment2 = pargs.key_idx2[segment_index]; if (pargs.key_idx3 != NULL) key_segment3 = pargs.key_idx3[segment_index]; if (pargs.key_idx4 != NULL) key_segment4 = pargs.key_idx4[segment_index]; if (gargs.aggr_idx1 != NULL) aggr_segment1 = gargs.aggr_idx1[segment_index]; if (gargs.aggr_idx2 != NULL) aggr_segment2 = gargs.aggr_idx2[segment_index]; if (fargs.filter_idx1 != NULL) filter_segment1 = fargs.filter_idx1[segment_index]; if (fargs.filter_idx2 != NULL) filter_segment2 = fargs.filter_idx2[segment_index]; } __syncthreads(); InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); if (fargs.filter_idx1 != NULL) { ptr = gpuCache + filter_segment1 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare1, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare2, selection_flags, num_tile_items); // (*(fargs.d_filter_func1))(items, selection_flags, fargs.compare1, fargs.compare2, num_tile_items); } if (fargs.filter_idx2 != NULL) { ptr = gpuCache + filter_segment2 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare3, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(items, fargs.compare4, selection_flags, num_tile_items); // (*(fargs.d_filter_func2))(items, selection_flags, fargs.compare3, fargs.compare4, num_tile_items); } if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes ptr = gpuCache + key_segment1 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); } if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { ptr = gpuCache + key_segment2 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); } if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { ptr = gpuCache + key_segment3 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { ptr = gpuCache + key_segment4 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, items, num_tile_items); BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } if (gargs.aggr_idx1 != NULL) { ptr = gpuCache + aggr_segment1 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, aggrval1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items); } if (gargs.aggr_idx2 != NULL) { ptr = gpuCache + aggr_segment2 * SEGMENT_SIZE; BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, aggrval2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items); } long long sum = 0; #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { // if (gargs.mode == 0) sum += aggrval1[ITEM]; // else if (gargs.mode == 1) sum+= (aggrval1[ITEM] - aggrval2[ITEM]); // else if (gargs.mode == 2) sum+= (aggrval1[ITEM] * aggrval2[ITEM]); // sum += (*(gargs.d_group_func))(aggrval1[ITEM], aggrval2[ITEM]); sum+= (aggrval1[ITEM] * aggrval2[ITEM]); } } } __syncthreads(); static __shared__ long long buffer[32]; unsigned long long aggregate = BlockSum<long long, BLOCK_THREADS, ITEMS_PER_THREAD>(sum, (long long*)buffer); __syncthreads(); if (threadIdx.x == 0) { atomicAdd(reinterpret_cast<unsigned long long*>(&res[4]), aggregate); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void filter_probe_aggr_GPU3( int* gpuCache, struct offsetGPU offset, struct filterArgsGPU fargs, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, int num_tuples, int* res) { //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment) //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size) int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; // Load a segment of consecutive items that are blocked across threads int items_lo[ITEMS_PER_THREAD]; int selection_flags[ITEMS_PER_THREAD]; int aggrval1[ITEMS_PER_THREAD]; int aggrval2[ITEMS_PER_THREAD]; int temp[ITEMS_PER_THREAD]; int num_tiles = (num_tuples + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = num_tuples - tile_offset; } InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags); cudaAssert(offset.lo_off != NULL); BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(offset.lo_off + tile_offset, items_lo, num_tile_items); // if (fargs.filter_idx1 != NULL) { // BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, temp, gpuCache, fargs.filter_idx1, num_tile_items); // BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(temp, fargs.compare1, selection_flags, num_tile_items); // BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(temp, fargs.compare2, selection_flags, num_tile_items); // (*(fargs.d_filter_func1))(temp, selection_flags, fargs.compare1, fargs.compare2, num_tile_items); // } if (fargs.filter_idx2 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, temp, gpuCache, fargs.filter_idx2, num_tile_items); BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(temp, fargs.compare3, selection_flags, num_tile_items); BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(temp, fargs.compare4, selection_flags, num_tile_items); // (*(fargs.d_filter_func2))(temp, selection_flags, fargs.compare3, fargs.compare4, num_tile_items); } // if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes // BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, temp, selection_flags, gpuCache, pargs.key_idx1, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items); // } // if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) { // BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, temp, selection_flags, gpuCache, pargs.key_idx2, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items); // } // if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) { // BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, temp, selection_flags, gpuCache, pargs.key_idx3, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items); // } if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) { BlockProbeGroupByGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, temp, selection_flags, gpuCache, pargs.key_idx4, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items); } if (gargs.aggr_idx1 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, aggrval1, gpuCache, gargs.aggr_idx1, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items); } if (gargs.aggr_idx2 != NULL) { BlockReadOffsetGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items_lo, aggrval2, gpuCache, gargs.aggr_idx2, num_tile_items); } else { BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items); } // if (gargs.mode == 0) { // cudaAssert(gargs.aggr_idx1 != NULL); // } else if (gargs.mode == 1) { // cudaAssert(gargs.aggr_idx1 != NULL && gargs.aggr_idx2 != NULL); // } else if (gargs.mode == 2) { // cudaAssert(gargs.aggr_idx1 != NULL && gargs.aggr_idx2 != NULL); // } long long sum = 0; #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) { if (selection_flags[ITEM]) { // if (gargs.mode == 0) sum += aggrval1[ITEM]; // else if (gargs.mode == 1) sum+= (aggrval1[ITEM] - aggrval2[ITEM]); // else if (gargs.mode == 2) sum+= (aggrval1[ITEM] * aggrval2[ITEM]); // sum += (*(gargs.d_group_func))(aggrval1[ITEM], aggrval2[ITEM]); sum+= (aggrval1[ITEM] * aggrval2[ITEM]); } } } __syncthreads(); static __shared__ long long buffer[32]; unsigned long long aggregate = BlockSum<long long, BLOCK_THREADS, ITEMS_PER_THREAD>(sum, (long long*)buffer); __syncthreads(); if (threadIdx.x == 0) { atomicAdd(reinterpret_cast<unsigned long long*>(&res[4]), aggregate); } } template<int BLOCK_THREADS, int ITEMS_PER_THREAD> __global__ void zeroMem( int* array, int N) { int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD; int tile_offset = blockIdx.x * tile_size; int num_tiles = (N + tile_size - 1) / tile_size; int num_tile_items = tile_size; if (blockIdx.x == num_tiles - 1) { num_tile_items = N - tile_offset; } #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) { array[tile_offset + (ITEM * BLOCK_THREADS) + threadIdx.x] = 0; } } } template<int GPUs> __global__ void mergeGPU( int** resGPU, int num_tuples, int lead_gpu) { unsigned long long aggregate = 0; int tid = threadIdx.x + blockIdx.x * blockDim.x; if (tid < num_tuples) { for (int gpu = 0; gpu < GPUs; gpu++) { if (gpu != lead_gpu) { unsigned long long* tmp = reinterpret_cast<unsigned long long*>(&resGPU[gpu][tid * 6 + 4]); if (tmp != 0) aggregate += *(tmp); if (resGPU[gpu][tid * 6] != 0) resGPU[lead_gpu][tid * 6] = resGPU[gpu][tid * 6]; if (resGPU[gpu][tid * 6 + 1] != 0) resGPU[lead_gpu][tid * 6 + 1] = resGPU[gpu][tid * 6 + 1]; if (resGPU[gpu][tid * 6 + 2] != 0) resGPU[lead_gpu][tid * 6 + 2] = resGPU[gpu][tid * 6 + 2]; if (resGPU[gpu][tid * 6 + 3] != 0) resGPU[lead_gpu][tid * 6 + 3] = resGPU[gpu][tid * 6 + 3]; } } unsigned long long* tmp = reinterpret_cast<unsigned long long*>(&resGPU[lead_gpu][tid * 6 + 4]); *(tmp) += aggregate; } } #endif