Lancelot / src / gpudb / ShuffleGPU.h
ShuffleGPU.h
Raw
#pragma once

#include <cub/cub.cuh>
#include <curand.h>

#include <cuda.h>

#include "crystal/crystal.cuh"
#include "BlockLibrary.cuh"
#include "ShuffleGPU.h"
#include "KernelArgs.h"
#include "gpu_utils.h"

using namespace cub;

template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void shuffle_probe_partition_GPU(int* gpuCache, int* partition_count,
  struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, 
  struct shuffleOutGPU sout, int gpu, int write_val = 1, int write_offset = 0, int latemat = 1) {

  __const__ int column_map[NUM_COLUMN] = {
    0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
    2, 2, 2, 2,
    1, 1, 1, 1,
    3, 3, 3, 3,
    4, 4, 4
  };

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each
  typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt;
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int* ptr;

  // Allocate shared memory for BlockLoad
  __shared__ union TempStorage
  {
    typename BlockScanInt::TempStorage scan;
  } temp_storage;

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[NUM_TABLE][ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int item_val[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];

  __shared__ int block_scan_partition[NUM_GPU];
  __shared__ int smem_index[NUM_RANGE];
  __shared__ int my_partition;
  __shared__ int blockId; //blockId within a partition

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  if (threadIdx.x == 0) {
    my_partition = -1; blockId = -1;
    for (int partition = 0; partition < NUM_GPU; partition++) {
      if (partition == 0) {
        block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size;
      } else {
        block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size;
      }
    }
    for (int partition = 0; partition < NUM_GPU; partition++) {
      if (blockIdx.x < block_scan_partition[partition]) {
        my_partition = partition;
        if (my_partition == 0) blockId = blockIdx.x;
        else blockId = blockIdx.x - block_scan_partition[my_partition-1];
        break;
      }
    }
  }

  __syncthreads();

  cudaAssert(my_partition != -1);
  cudaAssert(blockId != -1);
  int num_tile_items = tile_size;
  int tile_offset = blockId * tile_size;
  int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size;
  if (blockId == num_tiles_partition - 1) {
    num_tile_items = partition_count[my_partition] - tile_offset;
  }

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  cudaAssert(sargs.column_part != NULL);
  cudaAssert(sargs.column_part[my_partition] != NULL);

  if (pargs.fkey_col_id[0] == 1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column
    cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL);
    ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu];
    if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
  } else { //we are not doing join for this column, there is no result from prev join (first join)
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items);
  }

  if (pargs.fkey_col_id[1] == 2 && pargs.ht2 != NULL) {
    cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL);
    ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu];
    if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items);
  }

  if (pargs.fkey_col_id[2] == 3 && pargs.ht3 != NULL) {
    cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL);
    ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu];
    if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items);
  }

  if (pargs.fkey_col_id[3] == 4 && pargs.ht4 != NULL) {
    cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL);
    ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu];
    if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items);
  }

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
    if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
      if(selection_flags[ITEM]) {
        item_off[0][ITEM] = blockId * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; //local offset to each partition
      } 
    }
  }

  //DO PARTITIONING
  //this range_size would not work for date column
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  //right now we only support probe table as the table getting partitioned
  cudaAssert(sargs.table == 0);
  cudaAssert(column_map[sargs.key_column] == sargs.table);
  cudaAssert(sargs.column_part[my_partition][sargs.key_column] != NULL);
  cudaAssert(sargs.column_part[my_partition][sargs.key_column][gpu] != NULL);

  //loading the partitioned key column and count the partition
  // ptr = sargs.column_part[my_partition][sargs.key_column][gpu];
  // BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size);
  
  //loading the partitioned key column and count the partition
  ptr = sargs.column_part[my_partition][sargs.key_column][gpu];
  if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
  else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
  int is_replicated = 0;
  BlockCountIndexPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu);

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    // printf("%d\n", smem_index[threadIdx.x]);
    smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        index[ITEM] += smem_index[partition];
      }
    }
  }

  //Barrier
  __syncthreads();

  //write the keys
  cudaAssert(sout.column[sargs.key_column] != NULL);
  BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, 0, -1);

  __syncthreads();

  if (write_val) {
    cudaAssert(sout.column != NULL);
    cudaAssert(sargs.col_idx != NULL);
    //scatter the value
    //use the local offset to scatter the value
    //TODO: we have to support the case where the offset for dimension table is a global offset
    for (int col = 0; col < NUM_COLUMN; col++) {
      int table = column_map[col];
        //write the values from the shuffled input
      if (sargs.column_part[my_partition][col] != NULL && table == 0 && sargs.key_column != col) { //materializing the local offset
        //WARNING: IF JOIN KEY HAS BEEN USED, NO NEED TO PARTITION THAT JOIN KEY
        if (latemat != 2  || (latemat == 2 && col != pargs.fkey_col_id[0] && col != pargs.fkey_col_id[1] && col != pargs.fkey_col_id[2] && col != pargs.fkey_col_id[3])) {
          cudaAssert(sargs.column_part[my_partition][col][gpu] != NULL);
          cudaAssert(sout.column[col] != NULL);
          // BlockReadOffsetShuffle2<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[table], selection_flags, sargs.column_part[my_partition][col][gpu], item_val, num_tile_items);
          ptr = sargs.column_part[my_partition][col][gpu];
          BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
          BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1);
        }
        //for now sargs.col_idx[col] should be NULL except for groupby column in dimension table in SSB
        //not yet working cause sargs.col_idx is NULL
      } else if (sargs.column_part[my_partition][col] != NULL && table > 0 && pargs.fkey_col_id[table-1] < -1 && sargs.key_column != col) {
        cudaAssert(sargs.column_part[my_partition][col][gpu] != NULL);
        cudaAssert(sout.column[col] != NULL);
        // BlockReadOffsetShuffle2<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[table], selection_flags, sargs.column_part[my_partition][col][gpu], item_val, num_tile_items);
        ptr = sargs.column_part[my_partition][col][gpu];
        BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1);      
      } else if (sargs.col_idx[col] != NULL && table > 0 && pargs.fkey_col_id[table-1] > 0 && sargs.key_column != col) { //materializing the global offset
        cudaAssert(sargs.col_idx[col] != NULL);
		    cudaAssert(sargs.broadcast_idx[col] != NULL);
        cudaAssert(sout.column[col] != NULL);
		    BlockReadOffsetGPU3<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[table], item_val, selection_flags, gpuCache, sargs.col_idx[col], sargs.broadcast_idx[col], num_tile_items);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1);
        // cudaAssert(0);
      }
    }
  }

  //TODO: NOT YET SUPPORTED
  if (write_offset) {
    // cudaAssert(0);
    //scatter the offset (global offset and not the local offset), local offset would be useless once the shuffle is done
    cudaAssert(sargs.in_off_part != NULL);
    cudaAssert(sargs.in_off_part[my_partition] != NULL);
    cudaAssert(sout.out_off != NULL);
    for (int table = 0; table < NUM_TABLE; table++) {
      //if the offset coming from previous join
      if (sargs.in_off_part[my_partition][table] != NULL && table == 0) {
        cudaAssert(sargs.in_off_part[my_partition][table][gpu] != NULL);
        cudaAssert(sout.out_off[table] != NULL);
        ptr = sargs.in_off_part[my_partition][table][gpu];
        BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1);
      } else if (sargs.in_off_part[my_partition][table] != NULL && table > 0 && pargs.fkey_col_id[table-1] < -1) {
        cudaAssert(sargs.in_off_part[my_partition][table][gpu] != NULL);
        cudaAssert(sout.out_off[table] != NULL);
        // BlockReadOffsetShuffle2<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[table], selection_flags, sargs.in_off_part[my_partition][table][gpu], item_val, num_tile_items);
        ptr = sargs.in_off_part[my_partition][table][gpu];
        BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1);
      //if the offset coming from join
      } else if (table > 0 && pargs.fkey_col_id[table-1] > 0) {
        cudaAssert(sout.out_off[table] != NULL);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[table], item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1);
      }
    }
  }
}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void shuffle_probe_partition_global_GPU(int** gpuCache, int* partition_count,
  struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, 
  struct shuffleOutGPU sout, int gpu, int first_join_in_pipeline, int write_val = 1, int write_offset = 0) {

  __const__ int column_map[NUM_COLUMN] = {
    0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
    2, 2, 2, 2,
    1, 1, 1, 1,
    3, 3, 3, 3,
    4, 4, 4
  };

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each
  typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt;
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int* ptr;

  // Allocate shared memory for BlockLoad
  __shared__ union TempStorage
  {
    typename BlockScanInt::TempStorage scan;
  } temp_storage;

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[NUM_TABLE][ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];
  int table_id = 0;

  __shared__ int block_scan_partition[NUM_GPU];
  __shared__ int smem_index[NUM_RANGE];
  __shared__ int my_partition;
  __shared__ int blockId; //blockId within a partition

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  if (threadIdx.x == 0) {
    my_partition = -1; blockId = -1;
    for (int partition = 0; partition < NUM_GPU; partition++) {
      if (partition == 0) {
        block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size;
      } else {
        block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size;
      }
    }
    for (int partition = 0; partition < NUM_GPU; partition++) {
      if (blockIdx.x < block_scan_partition[partition]) {
        my_partition = partition;
        if (my_partition == 0) blockId = blockIdx.x;
        else blockId = blockIdx.x - block_scan_partition[my_partition-1];
        break;
      }
    }
  }

  __syncthreads();

  cudaAssert(my_partition != -1);
  cudaAssert(blockId != -1);
  int num_tile_items = tile_size;
  int tile_offset = blockId * tile_size;
  int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size;
  if (blockId == num_tiles_partition - 1) {
    num_tile_items = partition_count[my_partition] - tile_offset;
  }

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  cudaAssert(sargs.column_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
  cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
  BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);

  if (pargs.fkey_col_id[0] == 1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column
    if (first_join_in_pipeline == 1) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      //partition size is the original partition size of the fact table during initial partition, it is different from partition count
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[0], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
    }
  } else { //we are not doing join for this column, there might be offset from previous join but will be handled in write offset
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items);
  }

  if (pargs.fkey_col_id[1] == 2 && pargs.ht2 != NULL) {
    if (first_join_in_pipeline == 2) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[1], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
    }
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items);
  }

  if (pargs.fkey_col_id[2] == 3 && pargs.ht3 != NULL) {
    if (first_join_in_pipeline == 3) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[2], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
    }
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items);
  }

  if (pargs.fkey_col_id[3] == 4 && pargs.ht4 != NULL) {
    if (first_join_in_pipeline == 4) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[3], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
    }
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items);
  }

  //DO PARTITIONING
  //this range_size would not work for date column
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  //right now we only support probe table as the table getting partitioned
  cudaAssert(sargs.table == 0);
  cudaAssert(column_map[sargs.key_column] == sargs.table);
  // cudaAssert(sargs.column[sargs.key_column] != NULL);

  //loading the partitioned key column and count the partition
  BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, sargs.key_column, table_id, gpu, num_tile_items);
  int is_replicated = 0;
  BlockCountIndexPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu);

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

  // if (threadIdx.x == 0) printf("hey %d %d %d\n", smem_index[0], smem_index[1], sargs.key_column);

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        index[ITEM] += smem_index[partition];
      }
    }
  }

  //Barrier
  __syncthreads();

  cudaAssert(sout.column[sargs.key_column] != NULL);
  BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, 0, -1);
  // if (threadIdx.x == 0 && blockIdx.x == 0) printf("writing keys\n");
  BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[0], item_key, index, selection_flags, sout.out_off[0], num_tile_items, range_size, 0, 100);

  __syncthreads();

  // NOT SURE HOW TO DO THIS WITH GLOBAL LATE MATERIALIZATION
  // TODO: NOT YET SUPPORTED
  if (write_offset) {
    cudaAssert(sargs.in_off_part != NULL);
    cudaAssert(sargs.in_off_part[my_partition] != NULL);
    cudaAssert(sout.out_off != NULL);
    //only need to address from table = 1 cause table = 0 offset already written above
    for (int table = 1; table < NUM_TABLE; table++) {
      //offset coming from previous join
      if (sargs.in_off_part[my_partition][table] != NULL && pargs.fkey_col_id[table-1] < -1) {
        cudaAssert(sout.out_off[table] != NULL);
        int* ptr = sargs.in_off_part[my_partition][table][gpu];
        BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[table], selection_flags, num_tile_items);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[table], item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, 200);
      //offset coming from current join
      } else if (pargs.fkey_col_id[table-1] > 0) {
        cudaAssert(sout.out_off[table] != NULL);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[table], item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, 300);
      }
    }
  }
}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void shuffle_probe_aggr_global_GPU(int** gpuCache, int* partition_count,
  struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, int* res,
  int gpu, int first_join_in_pipeline, int write_val = 1, int write_offset = 0) {

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each
  typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt;
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int* ptr;

  // Allocate shared memory for BlockLoad
  __shared__ union TempStorage
  {
    typename BlockScanInt::TempStorage scan;
  } temp_storage;

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[NUM_TABLE][ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];
  int aggrval1[ITEMS_PER_THREAD];
  int aggrval2[ITEMS_PER_THREAD];
  int table_id = 0;

  __shared__ int block_scan_partition[NUM_GPU];
  __shared__ int smem_index[NUM_RANGE];
  __shared__ int my_partition;
  __shared__ int blockId; //blockId within a partition

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  if (threadIdx.x == 0) {
    my_partition = -1; blockId = -1;
    for (int partition = 0; partition < NUM_GPU; partition++) {
      if (partition == 0) {
        block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size;
      } else {
        block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size;
      }
    }
    for (int partition = 0; partition < NUM_GPU; partition++) {
      if (blockIdx.x < block_scan_partition[partition]) {
        my_partition = partition;
        if (my_partition == 0) blockId = blockIdx.x;
        else blockId = blockIdx.x - block_scan_partition[my_partition-1];
        break;
      }
    }
  }

  __syncthreads();

  cudaAssert(my_partition != -1);
  cudaAssert(blockId != -1);
  int num_tile_items = tile_size;
  int tile_offset = blockId * tile_size;
  int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size;
  if (blockId == num_tiles_partition - 1) {
    num_tile_items = partition_count[my_partition] - tile_offset;
  }

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  cudaAssert(sargs.column_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
  cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
  BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);

  if (pargs.fkey_col_id[0] == 1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column
    if (first_join_in_pipeline == 1) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      //partition size is the original partition size of the fact table during initial partition, it is different from partition count
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[0], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
    }
  }

  if (pargs.fkey_col_id[1] == 2 && pargs.ht2 != NULL) {
    if (first_join_in_pipeline == 2) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[1], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
    }
  }

  if (pargs.fkey_col_id[2] == 3 && pargs.ht3 != NULL) {
    if (first_join_in_pipeline == 3) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[2], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
    }
  }

  if (pargs.fkey_col_id[3] == 4 && pargs.ht4 != NULL) {
    if (first_join_in_pipeline == 4) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[3], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
    }
  }

  if (gargs.aggr_col_id[0] != -1) {
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, aggrval1, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[0], table_id, gpu, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items);
  }

  if (gargs.aggr_col_id[1] != -1) {
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, aggrval2, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[1], table_id, gpu, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items);
  }

  long long sum = 0;

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
    if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
      if (selection_flags[ITEM]) {
        sum += (aggrval1[ITEM] * aggrval2[ITEM]);
      }
    }
  }

  __syncthreads();
  static __shared__ long long buffer[32];
  unsigned long long aggregate = BlockSum<long long, BLOCK_THREADS, ITEMS_PER_THREAD>(sum, (long long*)buffer);
  __syncthreads();

  if (threadIdx.x == 0) {
    atomicAdd(reinterpret_cast<unsigned long long*>(&res[4]), aggregate);   
  }
}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void shuffle_probe_group_by_global_GPU(int** gpuCache, int* partition_count,
  struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, int* res,
  int gpu, int first_join_in_pipeline, int write_val = 1, int write_offset = 0) {

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each
  typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt;
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int* ptr;

  // Allocate shared memory for BlockLoad
  __shared__ union TempStorage
  {
    typename BlockScanInt::TempStorage scan;
  } temp_storage;

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[NUM_TABLE][ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];
  int groupval1[ITEMS_PER_THREAD];
  int groupval2[ITEMS_PER_THREAD];
  int groupval3[ITEMS_PER_THREAD];
  int groupval4[ITEMS_PER_THREAD];
  int aggrval1[ITEMS_PER_THREAD];
  int aggrval2[ITEMS_PER_THREAD];
  int table_id = 0;

  __shared__ int block_scan_partition[NUM_GPU];
  __shared__ int smem_index[NUM_RANGE];
  __shared__ int my_partition;
  __shared__ int blockId; //blockId within a partition

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  if (threadIdx.x == 0) {
    my_partition = -1; blockId = -1;
    for (int partition = 0; partition < NUM_GPU; partition++) {
      if (partition == 0) {
        block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size;
      } else {
        block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size;
      }
    }
    for (int partition = 0; partition < NUM_GPU; partition++) {
      if (blockIdx.x < block_scan_partition[partition]) {
        my_partition = partition;
        if (my_partition == 0) blockId = blockIdx.x;
        else blockId = blockIdx.x - block_scan_partition[my_partition-1];
        break;
      }
    }
  }

  __syncthreads();

  cudaAssert(my_partition != -1);
  cudaAssert(blockId != -1);
  int num_tile_items = tile_size;
  int tile_offset = blockId * tile_size;
  int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size;
  if (blockId == num_tiles_partition - 1) {
    num_tile_items = partition_count[my_partition] - tile_offset;
  }

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  cudaAssert(sargs.column_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
  cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
  BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);

  //normal operation
  if (pargs.fkey_col_id[0] == 1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column
    if (first_join_in_pipeline == 1) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
      BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
    } else {
      //partition size is the original partition size of the fact table during initial partition, it is different from partition count
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[0], table_id, gpu, num_tile_items);
      BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval1, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
    }
  //there is offset from previous join
  } else if (gargs.group_col_id[0] != -1) {
    cudaAssert(sargs.in_off_part[my_partition][1] != NULL);
    cudaAssert(sargs.in_off_part[my_partition][1][gpu] != NULL);
    ptr = sargs.in_off_part[my_partition][1][gpu];
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[1], selection_flags, num_tile_items);
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[1], selection_flags, groupval1, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.group_col_id[0], 1, gpu, num_tile_items);
  } else { //we are not doing join for this column, there is no result from prev join (first join)
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval1, 0, num_tile_items);
  }

  if (pargs.fkey_col_id[1] == 2 && pargs.ht2 != NULL) {
    if (first_join_in_pipeline == 2) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[1], table_id, gpu, num_tile_items);
      BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval2, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
    }
  } else if (gargs.group_col_id[1] != -1) {
    cudaAssert(sargs.in_off_part[my_partition][2] != NULL);
    cudaAssert(sargs.in_off_part[my_partition][2][gpu] != NULL);
    ptr = sargs.in_off_part[my_partition][2][gpu];
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[2], selection_flags, num_tile_items);
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[2], selection_flags, groupval2, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.group_col_id[1], 2, gpu, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval2, 0, num_tile_items);
  }

  if (pargs.fkey_col_id[2] == 3 && pargs.ht3 != NULL) {
    if (first_join_in_pipeline == 3) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[2], 3, gpu, num_tile_items);
      BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval3, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
    }
  } else if (gargs.group_col_id[2] != -1) {
    cudaAssert(sargs.in_off_part[my_partition][3] != NULL);
    cudaAssert(sargs.in_off_part[my_partition][3][gpu] != NULL);
    ptr = sargs.in_off_part[my_partition][3][gpu];
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[3], selection_flags, num_tile_items);
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[3], selection_flags, groupval3, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.group_col_id[2], 3, gpu, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval3, 0, num_tile_items);
  }

  if (pargs.fkey_col_id[3] == 4 && pargs.ht4 != NULL) {
    if (first_join_in_pipeline == 4) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[3], table_id, gpu, num_tile_items);
      BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval4, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
    }
  } else if (gargs.group_col_id[3] != -1) {
    cudaAssert(sargs.in_off_part[my_partition][4] != NULL);
    cudaAssert(sargs.in_off_part[my_partition][4][gpu] != NULL);
    ptr = sargs.in_off_part[my_partition][4][gpu];
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[4], selection_flags, num_tile_items);
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[4], selection_flags, groupval4, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.group_col_id[3], 4, gpu, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval4, 0, num_tile_items);
  }

  if (gargs.aggr_col_id[0] != -1) {
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, aggrval1, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[0], table_id, gpu, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 0, num_tile_items);
  }

  if (gargs.aggr_col_id[1] != -1) {
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, aggrval2, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[1], table_id, gpu, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 0, num_tile_items);
  }

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
    if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
      if (selection_flags[ITEM]) {
        int hash = ((groupval1[ITEM] - gargs.min_val1) * gargs.unique_val1 + (groupval2[ITEM] - gargs.min_val2) * gargs.unique_val2 +  (groupval3[ITEM] - gargs.min_val3) * gargs.unique_val3 + (groupval4[ITEM] - gargs.min_val4) * gargs.unique_val4) % gargs.total_val; //!
        res[hash * 6] = groupval1[ITEM];
        res[hash * 6 + 1] = groupval2[ITEM];
        res[hash * 6 + 2] = groupval3[ITEM];
        res[hash * 6 + 3] = groupval4[ITEM];

        int temp;
        if (aggrval1[ITEM] > aggrval2[ITEM]) temp = aggrval1[ITEM] - aggrval2[ITEM];
        else temp = aggrval2[ITEM] - aggrval1[ITEM];
        atomicAdd(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp));

      }
    }
  }

}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void shuffle_probe_global_GPU(int** gpuCache, int* partition_count,
  struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, struct offsetGPU out_off, int* total,
  int gpu, int first_join_in_pipeline, int write_val = 1, int write_offset = 0) {

  // __const__ int column_map[NUM_COLUMN] = {
  //   0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
  //   2, 2, 2, 2,
  //   1, 1, 1, 1,
  //   3, 3, 3, 3,
  //   4, 4, 4
  // };

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each
  typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt;
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int* ptr;

  // Allocate shared memory for BlockLoad
  __shared__ union TempStorage
  {
    typename BlockScanInt::TempStorage scan;
  } temp_storage;

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[NUM_TABLE][ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];
  int t_count = 0; // Number of items selected per thread
  int c_t_count = 0; //Prefix sum of t_count
  __shared__ int block_off;
  int table_id = 0;

  __shared__ int block_scan_partition[NUM_GPU];
  __shared__ int smem_index[NUM_RANGE];
  __shared__ int my_partition;
  __shared__ int blockId; //blockId within a partition

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  if (threadIdx.x == 0) {
    my_partition = -1; blockId = -1;
    for (int partition = 0; partition < NUM_GPU; partition++) {
      if (partition == 0) {
        block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size;
      } else {
        block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size;
      }
    }
    for (int partition = 0; partition < NUM_GPU; partition++) {
      if (blockIdx.x < block_scan_partition[partition]) {
        my_partition = partition;
        if (my_partition == 0) blockId = blockIdx.x;
        else blockId = blockIdx.x - block_scan_partition[my_partition-1];
        break;
      }
    }
  }

  __syncthreads();

  cudaAssert(my_partition != -1);
  cudaAssert(blockId != -1);
  int num_tile_items = tile_size;
  int tile_offset = blockId * tile_size;
  int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size;
  if (blockId == num_tiles_partition - 1) {
    num_tile_items = partition_count[my_partition] - tile_offset;
  }

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  cudaAssert(sargs.column_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition] != NULL);
  cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
  cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
  BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);

  if (pargs.fkey_col_id[0] == 1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column
    if (first_join_in_pipeline == 1) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      //partition size is the original partition size of the fact table during initial partition, it is different from partition count
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[0], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
    }
  } else { //we are not doing join for this column, there is no result from prev join (first join)
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items);
  }

  if (pargs.fkey_col_id[1] == 2 && pargs.ht2 != NULL) {
    if (first_join_in_pipeline == 2) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[1], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
    }
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items);
  }

  if (pargs.fkey_col_id[2] == 3 && pargs.ht3 != NULL) {
    if (first_join_in_pipeline == 3) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[2], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
    }
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items);
  }

  if (pargs.fkey_col_id[3] == 4 && pargs.ht4 != NULL) {
    if (first_join_in_pipeline == 4) {
      cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0] != NULL);
      cudaAssert(sargs.in_off_part[my_partition][0][gpu] != NULL);
      ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu];
      BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off_part[my_partition][0][gpu] + tile_offset, item_off[0], selection_flags, num_tile_items);
    } else {
      BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], selection_flags, item_key, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, pargs.fkey_col_id[3], table_id, gpu, num_tile_items);
      BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
    }
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items);
  }

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
    if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
      if(selection_flags[ITEM]) t_count++;
    }
  }

  //Barrier
  __syncthreads();

  // TODO: need to check logic for offset
  BlockScanInt(temp_storage.scan).ExclusiveSum(t_count, c_t_count); //doing a prefix sum of all the previous threads in the block and store it to c_t_count
  if(threadIdx.x == blockDim.x - 1) { //if the last thread in the block, add the prefix sum of all the prev threads + sum of my threads to global variable total
      block_off = atomicAdd(total, t_count+c_t_count); //the previous value of total is gonna be assigned to block_off
  } //block_off does not need to be global (it's just need to be shared), because it will get the previous value from total which is global

  __syncthreads();

  cudaAssert(sargs.in_off_part != NULL);
  cudaAssert(sargs.in_off_part[my_partition] != NULL);
  #pragma unroll
  for (int table = 1; table < NUM_TABLE; table++) {
    //offset coming from previous join
    if (sargs.in_off_part[my_partition][table] != NULL && pargs.fkey_col_id[table-1] < -1) {
      int* ptr = sargs.in_off_part[my_partition][table][gpu];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[table], selection_flags, num_tile_items);
      // if (table == 1) printf("supplier off %d %d %d %d\n", item_off[1][0], item_off[1][1], item_off[1][2], item_off[1][3]);
    }
  }

  cudaAssert(out_off.join_result_off != NULL);
  cudaAssert(out_off.join_result_off[0] != NULL);
  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
    if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
      if(selection_flags[ITEM]) {
        int offset = block_off + c_t_count++;
        out_off.join_result_off[0][offset] = item_off[0][ITEM];
        if (out_off.join_result_off[1] != NULL) out_off.join_result_off[1][offset] = item_off[1][ITEM]; //global offset
        if (out_off.join_result_off[2] != NULL) out_off.join_result_off[2][offset] = item_off[2][ITEM]; //global offset
        if (out_off.join_result_off[3] != NULL) out_off.join_result_off[3][offset] = item_off[3][ITEM]; //global offset
        if (out_off.join_result_off[4] != NULL) out_off.join_result_off[4][offset] = item_off[4][ITEM]; //global offset
      }
    }
  }

}