Lancelot / src / gpudb / PartitioningGPU.h
PartitioningGPU.h
Raw
#pragma once

#include <cub/cub.cuh>
#include <curand.h>

#include <cuda.h>

#include "crystal/crystal.cuh"
#include "BlockLibrary.cuh"
#include "ShuffleGPU.h"
#include "SimplePartitioning.h"
#include "KernelArgs.h"
#include "gpu_utils.h"

using namespace cub;

template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void RangePartitioningCount(
  int* gpuCache, struct filterArgsGPU fargs, struct shuffleArgsGPU sargs, int* count, int num_tuples, int gpu, short* segment_group = NULL) {

  int item_key[ITEMS_PER_THREAD];
  int item_val[ITEMS_PER_THREAD];
  int selection_flags[ITEMS_PER_THREAD];

  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;
  int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment
  int* ptr;

  int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment

  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  __shared__ int smem_index[NUM_RANGE];
  __shared__ int64_t key_segment;
  __shared__ int64_t filter_segment1;
  __shared__ int64_t filter_segment2;
  __shared__ int64_t segment_index;
  __shared__ int is_replicated;

  //SEGMENT INDEX IS THE SEGMENT INDEX OF THE ACTUAL TABLE (GLOBAL SEGMENT INDEX)
  if (threadIdx.x == 0) {
    if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE];
    else segment_index = ( tile_offset ) / SEGMENT_SIZE; // previously start_offset + tile_offset
    if (fargs.filter_idx1 != NULL) {
      filter_segment1 = fargs.filter_idx1[segment_index];
      cudaAssert(filter_segment1 != -1);
    }
    if (fargs.filter_idx2 != NULL) {
      filter_segment2 = fargs.filter_idx2[segment_index];
      cudaAssert(filter_segment2 != -1);
    }
    if (sargs.col_idx[sargs.key_column] != NULL) {
      key_segment = sargs.col_idx[sargs.key_column][segment_index];
      cudaAssert(key_segment != -1);
    }
    is_replicated = sargs.seg_is_replicated[segment_index];
  }

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  //this range_size would not work for date column
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  if (fargs.filter_idx1 != NULL) {
    ptr = gpuCache + filter_segment1 * SEGMENT_SIZE;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_val, num_tile_items);
    // BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items);
    // BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare2, selection_flags, num_tile_items);

    if (fargs.mode1 == 0) { //equal to
      BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items);
    } else if (fargs.mode1 == 1) { //between
      BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items);
      BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare2, selection_flags, num_tile_items);
    } else if (fargs.mode1 == 2) { //equal or equal
      BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items);
      BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare2, selection_flags, num_tile_items);
    }
  }

  if (fargs.filter_idx2 != NULL) {
    ptr = gpuCache + filter_segment2 * SEGMENT_SIZE;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_val, num_tile_items);
    // BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items);
    // BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare4, selection_flags, num_tile_items);

    if (fargs.mode2 == 0) { //equal to
      BlockPredAndEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items);
    } else if (fargs.mode2 == 1) { //between
      BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items);
      BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare4, selection_flags, num_tile_items);
    } else if (fargs.mode2 == 2) { //equal or equal
      BlockPredAndEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items);
      BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare4, selection_flags, num_tile_items);
    }
  }

  ptr = gpuCache + key_segment * SEGMENT_SIZE;
  BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
  BlockCountPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu);

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    atomicAdd(&(count[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

}

//WARNING: ONLY WORKS FOR DIM TABLE
template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void RangePartitioningCount2(
  int* gpuCache, struct shuffleArgsGPU sargs, int* count, int num_tuples, int gpu) {

  int item_off[ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int selection_flags[ITEMS_PER_THREAD];

  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;

  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  __shared__ int smem_index[NUM_RANGE];

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  cudaAssert(sargs.column[sargs.key_column] != NULL);
  //this range_size would not work for date column
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  cudaAssert(sargs.col_idx[sargs.key_column] != NULL);
  cudaAssert(sargs.in_off[sargs.table] != NULL);
  int* ptr = sargs.in_off[sargs.table];
  BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off, num_tile_items);
  BlockReadOffsetGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off, item_key, selection_flags, gpuCache, sargs.col_idx[sargs.key_column], num_tile_items);

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int segment_idx = item_off[ITEM]/SEGMENT_SIZE;
        // int is_replicated = sargs.seg_is_replicated[segment_idx];
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        cudaAssert(partition <= NUM_RANGE);
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        // if (is_replicated) {
        //   if (partition == gpu) atomicAdd(&(smem_index[partition]), 1);
        // } else 
        atomicAdd(&(smem_index[partition]), 1);
      }
    }
  }

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    atomicAdd(&(count[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

}

//range partitioning reading from segment directly
template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void RangePartitioningKeyValue(
  int* gpuCache, struct filterArgsGPU fargs, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, int gpu,
  int write_val = 1, int write_offset = 0, short* segment_group = NULL) {

  __const__ int column_map[NUM_COLUMN] = {
    0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
    2, 2, 2, 2,
    1, 1, 1, 1,
    3, 3, 3, 3,
    4, 4, 4
  };

  int item_key[ITEMS_PER_THREAD];
  int item_val[ITEMS_PER_THREAD];
  int selection_flags[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];

  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;
  int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment
  int* ptr;

  int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment

  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  __shared__ int smem_index[NUM_RANGE];
  __shared__ int64_t key_segment;
  __shared__ int64_t filter_segment1;
  __shared__ int64_t filter_segment2;
  __shared__ int64_t segment_index;
  __shared__ int is_replicated;

  //SEGMENT INDEX IS THE SEGMENT INDEX OF THE ACTUAL TABLE (GLOBAL SEGMENT INDEX)
  if (threadIdx.x == 0) {
    if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE];
    else segment_index = ( tile_offset ) / SEGMENT_SIZE; // previously start_offset + tile_offset
    if (fargs.filter_idx1 != NULL) {
      filter_segment1 = fargs.filter_idx1[segment_index];
      cudaAssert(filter_segment1 != -1);
    }
    if (fargs.filter_idx2 != NULL) {
      filter_segment2 = fargs.filter_idx2[segment_index];
      cudaAssert(filter_segment2 != -1);
    }
    if (sargs.col_idx[sargs.key_column] != NULL) {
      key_segment = sargs.col_idx[sargs.key_column][segment_index];
      cudaAssert(key_segment != -1);
    }
    is_replicated = sargs.seg_is_replicated[segment_index];
  }

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  int start_offset = segment_index * SEGMENT_SIZE;
  //this range_size would not work for date table
  //THIS ONLY WORKS IF THE TOTAL DIM SEGMENT IS DIVISIBLE BY NUM GPU
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  if (fargs.filter_idx1 != NULL) {
    ptr = gpuCache + filter_segment1 * SEGMENT_SIZE;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_val, num_tile_items);
    // BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items);
    // BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare2, selection_flags, num_tile_items);

    if (fargs.mode1 == 0) { //equal to
      BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items);
    } else if (fargs.mode1 == 1) { //between
      BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items);
      BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare2, selection_flags, num_tile_items);
    } else if (fargs.mode1 == 2) { //equal or equal
      BlockPredEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items);
      BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare2, selection_flags, num_tile_items);
    }
  }

  if (fargs.filter_idx2 != NULL) {
    ptr = gpuCache + filter_segment2 * SEGMENT_SIZE;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_val, num_tile_items);
    // BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items);
    // BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare4, selection_flags, num_tile_items);

    if (fargs.mode2 == 0) { //equal to
      BlockPredAndEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items);
    } else if (fargs.mode2 == 1) { //between
      BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items);
      BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare4, selection_flags, num_tile_items);
    } else if (fargs.mode2 == 2) { //equal or equal
      BlockPredAndEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items);
      BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare4, selection_flags, num_tile_items);
    }
  }

  cudaAssert(sargs.col_idx[sargs.key_column] != NULL);
  ptr = gpuCache + key_segment * SEGMENT_SIZE;
  BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu);
  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        index[ITEM] += smem_index[partition];
      }
    }
  }

  __syncthreads();

  cudaAssert(sout.column[sargs.key_column] != NULL);
  BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, is_replicated, gpu);

  if (write_val) {
    //scatter the value
    for (int col = 0; col < NUM_COLUMN; col++) {
      int table = column_map[col];
      if (sargs.col_idx[col] != NULL && sargs.key_column != col && sargs.table == table) {
        cudaAssert(sargs.col_idx[col] != NULL);
        cudaAssert(sout.column[col] != NULL);
        int64_t val_segment = sargs.col_idx[col][segment_index];
        cudaAssert(val_segment != -1);
        ptr = gpuCache + val_segment * SEGMENT_SIZE;
        BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[col], num_tile_items, range_size, is_replicated, gpu);
      }
    }
  }

  if (write_offset) {
    //scatter the offset (global offset and not the local offset) come from CPU
    // for (int table = 0; table < NUM_TABLE; table++) {
      //this will not happen for now (in_off is from off_col)
      // if (sargs.in_off != NULL && sargs.in_off[table] != NULL) { //writing the global offset from CPU
      //     cudaAssert(sout.out_off[table] != NULL);
      //     cudaAssert(0);
      // } else if (sargs.table == table) { //writing the global offset of the current table which get partitioned
          cudaAssert(sout.out_off[sargs.table] != NULL);
          BlockWriteOffSelf<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.out_off[sargs.table], start_offset, num_tile_items, range_size, is_replicated, gpu);
      // }
    // }
  }

}

// range partition with offset coming from CPU
//WARNING: ONLY WORKS FOR DIM TABLE
template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void RangePartitioningKeyValue2(
  int* gpuCache, struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples, int gpu,
  int write_val = 1, int write_offset = 0) {

  __const__ int column_map[NUM_COLUMN] = {
    0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
    2, 2, 2, 2,
    1, 1, 1, 1,
    3, 3, 3, 3,
    4, 4, 4
  };

  int item_off[NUM_TABLE][ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int item_val[ITEMS_PER_THREAD];
  int selection_flags[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];
  // int is_replicated[ITEMS_PER_THREAD];

  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;

  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  __shared__ int smem_index[NUM_RANGE];

  // //WARNING: For the case filter on CPU, partitioning join, and some of the dim table is replicated, we will just set is_replicated to 0 for simplicity
  // int is_replicated = 0;

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  //this range_size would not work for date table
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  cudaAssert(sargs.col_idx[sargs.key_column] != NULL);
  cudaAssert(sargs.in_off[sargs.table] != NULL);
  int* ptr = sargs.in_off[sargs.table];
  BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[sargs.table], num_tile_items);
  BlockReadOffsetGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[sargs.table], item_key, selection_flags, gpuCache, sargs.col_idx[sargs.key_column], num_tile_items);
  // BlockCountIndexPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu);

  // #pragma unroll
  // for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
  //   int segment_idx = item_off[sargs.table][ITEM]/SEGMENT_SIZE;
  //   is_replicated[ITEM] = sargs.seg_is_replicated[segment_idx];
  // }

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        cudaAssert(partition <= NUM_RANGE);
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        // if (is_replicated[ITEM]) {
        //   if (partition == gpu) index[ITEM] = atomicAdd(&(smem_index[partition]), 1);
        //   else index[ITEM] = 0;
        // } else 
        index[ITEM] = atomicAdd(&(smem_index[partition]), 1);
      }
    }
  }

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        index[ITEM] += smem_index[partition];
      }
    }
  }

  __syncthreads();

  cudaAssert(sout.column[sargs.key_column] != NULL);
  // BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, is_replicated, gpu);
  // BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[sargs.table], item_key, index, selection_flags, sout.out_off[sargs.table], num_tile_items, range_size, is_replicated, gpu);

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        cudaAssert(partition <= NUM_RANGE);
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        cudaAssert(sout.column[sargs.key_column][partition] != NULL);
        // if (is_replicated[ITEM]) {
        //   if (partition == gpu) sout.column[sargs.key_column][partition][index[ITEM]] = item_key[ITEM];
        // } else 
        sout.column[sargs.key_column][partition][index[ITEM]] = item_key[ITEM];
      }
    }
  }

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        cudaAssert(partition <= NUM_RANGE);
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        cudaAssert(sout.out_off[sargs.table][partition] != NULL);
        // if (is_replicated[ITEM]) {
        //   if (partition == gpu) sout.out_off[sargs.table][partition][index[ITEM]] = item_off[sargs.table][ITEM];
        // } else 
        sout.out_off[sargs.table][partition][index[ITEM]] = item_off[sargs.table][ITEM];
      }
    }
  }

  __syncthreads();

  if (write_val) {
    //scatter the value
    for (int col = 0; col < NUM_COLUMN; col++) {
      int table = column_map[col];
      if (sargs.col_idx[col] != NULL && sargs.key_column != col && sargs.table == table) {
        cudaAssert(sargs.col_idx[col] != NULL);
        cudaAssert(sout.column[col] != NULL);
        BlockReadOffsetGPU2<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[sargs.table], item_val, selection_flags, gpuCache, sargs.col_idx[col], num_tile_items);
        // BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, is_replicated, gpu);
            #pragma unroll
            for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
              if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
                if (selection_flags[ITEM]) {
                  int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
                  cudaAssert(partition <= NUM_RANGE);
                  if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
                  cudaAssert(sout.column[col][partition] != NULL);
                  // if (is_replicated[ITEM]) {
                  //   if (partition == gpu) sout.column[col][partition][index[ITEM]] = item_val[ITEM];
                  // } else 
                  sout.column[col][partition][index[ITEM]] = item_val[ITEM];
                }
              }
            }
      }
    }
  }

  // if (write_offset) {
  //   //scatter the offset (global offset and not the local offset) come from CPU
  //   for (int table = 0; table < NUM_TABLE; table++) {
  //     //this will not happen for now (in_off is from off_col)
  //     if (sargs.in_off != NULL && sargs.in_off[table] != NULL && sargs.table != table) { //writing the global offset from CPU
  //         cudaAssert(sout.out_off[table] != NULL);
  //         cudaAssert(0);
  //         //blockload
  //         //blockwritepartition
  //     }
  //   }
  // }

}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD>
__global__ void test_kernel(int* input, int* output, int size) {
    int tid = threadIdx.x + blockIdx.x * blockDim.x;
    int sum = 0;
    while (tid < size) {
        sum += input[tid];
        tid += blockDim.x * gridDim.x;
    }
    atomicAdd(output, sum);
}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD>
__global__ void test_kernel3(int** seg_row_to_gpu) {
    cudaAssert(seg_row_to_gpu[0][1] >= 0 && seg_row_to_gpu[0][1] < NUM_GPU);
}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD>
__global__ void test_out_off(struct shuffleOutGPU sout, int size) {
  if (threadIdx.x < size) {
    // if (sout.out_off[3][1] == NULL) printf("i failed assertion");
    // printf("%d\n", sout.out_off[3][1][0]);
    for (int table = 1; table < 4; table++) {
      for (int partition = 0; partition < 2; partition++) {
        // if (in_off_part[partition][table] != NULL) {
          // if (sout.out_off[table][partition] == NULL) printf("table %d %d\n", table, partition);
          cudaAssert(sout.out_off[table] != NULL);
          cudaAssert(sout.out_off[table][partition] != NULL);
        // }
      }
    }
  }
}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD>
__global__ void test_kernel2(int**** d_column_part) {
    if (threadIdx.x == 0 && blockIdx.x == 0) {
      for (int i = 0; i < 10; i++)
        printf("keys %d %d %d %d\n", d_column_part[0][1][1][i], d_column_part[1][1][1][i], d_column_part[2][1][1][i], d_column_part[3][1][1][i]);
    }
}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void probe_partition_GPU(
  int* gpuCache, struct filterArgsGPU fargs, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, 
  struct shuffleOutGPU sout, int num_tuples, int gpu, int start_offset = 0, short* segment_group = NULL,
  int write_val = 1, int write_offset = 0, int latemat = 0) {

  __const__ int column_map[NUM_COLUMN] = {
    0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
    2, 2, 2, 2,
    1, 1, 1, 1,
    3, 3, 3, 3,
    4, 4, 4
  };

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each
  typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt;
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;
  int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment
  int* ptr;

  int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment

  // Allocate shared memory for BlockLoad
  __shared__ union TempStorage
  {
    typename BlockScanInt::TempStorage scan;
  } temp_storage;

  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[NUM_TABLE][ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int item_val[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];

  __shared__ int smem_index[NUM_RANGE];
  __shared__ int check_join[NUM_TABLE];
  __shared__ int64_t key_segment1; 
  __shared__ int64_t key_segment2;
  __shared__ int64_t key_segment3;
  __shared__ int64_t key_segment4;
  __shared__ int64_t filter_segment1;
  __shared__ int64_t filter_segment2;
  __shared__ int64_t shuffle_key;
  __shared__ int64_t segment_index;
  __shared__ int is_replicated;

  if (threadIdx.x == 0) {
    if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE];
    else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE;
    if (pargs.key_idx1 != NULL) {
      key_segment1 = pargs.key_idx1[segment_index];
      cudaAssert(key_segment1 != -1);
    }
    if (pargs.key_idx2 != NULL) {
      key_segment2 = pargs.key_idx2[segment_index];
      cudaAssert(key_segment2 != -1);
    }
    if (pargs.key_idx3 != NULL) {
      key_segment3 = pargs.key_idx3[segment_index];
      cudaAssert(key_segment3 != -1);
    }
    if (pargs.key_idx4 != NULL) {
      key_segment4 = pargs.key_idx4[segment_index];
      cudaAssert(key_segment4 != -1);
    }
    if (fargs.filter_idx1 != NULL) {
      filter_segment1 = fargs.filter_idx1[segment_index];
      cudaAssert(filter_segment1 != -1);
    }
    if (fargs.filter_idx2 != NULL) {
      filter_segment2 = fargs.filter_idx2[segment_index];
      cudaAssert(filter_segment2 != -1);
    }
    if (sargs.col_idx[sargs.key_column] != NULL) {
      shuffle_key = sargs.col_idx[sargs.key_column][segment_index];
      cudaAssert(shuffle_key != -1);
    }
    for (int table = 0; table < NUM_TABLE; table++) {
      check_join[table] = -1;
    }
    is_replicated = sargs.seg_is_replicated[segment_index];
  }

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  if (fargs.filter_idx1 != NULL) {
    ptr = gpuCache + filter_segment1 * SEGMENT_SIZE;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_val, num_tile_items);
    BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items);
    BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare2, selection_flags, num_tile_items);
  }

  if (fargs.filter_idx2 != NULL) {
    ptr = gpuCache + filter_segment2 * SEGMENT_SIZE;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_val, num_tile_items);
    BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items);
    BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare4, selection_flags, num_tile_items);
  }

  if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation)
    ptr = gpuCache + key_segment1 * SEGMENT_SIZE;
    check_join[0] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
  } else { //we are not doing join for this column, there is no result from prev join (first join)
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items);
  }

  if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) {
    ptr = gpuCache + key_segment2 * SEGMENT_SIZE;
    check_join[1] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items);
  }

  if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) {
    ptr = gpuCache + key_segment3 * SEGMENT_SIZE;
    check_join[2] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items);
  }

  if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) {
    ptr = gpuCache + key_segment4 * SEGMENT_SIZE;
    check_join[3] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items);
  }

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
    if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
      if(selection_flags[ITEM]) {
        item_off[0][ITEM] = blockIdx.x * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; //local offset to each partition
      }
    }
  }

  //DO PARTITIONING
  //this range_size would not work for date column
  start_offset = segment_index * SEGMENT_SIZE;
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  //right now we only support probe table as the table getting partitioned
  cudaAssert(sargs.table == 0);

  //loading the partitioned key column and count the partition
  cudaAssert(sargs.col_idx[sargs.key_column] != NULL);
  ptr = gpuCache + shuffle_key * SEGMENT_SIZE;
  BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu);

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        index[ITEM] += smem_index[partition];
      }
    }
  }

  //Barrier
  cudaAssert(sout.column[sargs.key_column] != NULL);
  BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, is_replicated, gpu);

  if (write_val) {
    //scatter the value
    //todo: what if offset coming from join
    cudaAssert(sargs.col_idx != NULL);
    cudaAssert(sout.column != NULL);
    for (int col = 0; col < NUM_COLUMN; col++) {
      int table = column_map[col];
      //write values from segment
      if (sargs.col_idx[col] != NULL && sargs.key_column != col && table == 0) {
        //WARNING: IF JOIN KEY HAS BEEN USED, NO NEED TO PARTITION THAT JOIN KEY
        // if (latemat != 2  || (latemat == 2 && col != pargs.fkey_col_id[0] && col != pargs.fkey_col_id[1] && col != pargs.fkey_col_id[2] && col != pargs.fkey_col_id[3])) {
          cudaAssert(sargs.col_idx[col] != NULL);
          cudaAssert(sout.column[col] != NULL);
          cudaAssert(table == 0);
          int64_t val_segment = sargs.col_idx[col][segment_index];
          cudaAssert(val_segment != -1);
          ptr = gpuCache + val_segment * SEGMENT_SIZE;
          BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[col], num_tile_items, range_size, is_replicated, gpu);
        // }
      //write values from dimension table with offset coming from current join (materialize it and write the values)
      } else if (latemat == 0 && sargs.col_idx[col] != NULL && table > 0 && check_join[table-1] != -1 && sargs.key_column != col) {
        cudaAssert(sargs.col_idx[col] != NULL);
        cudaAssert(sargs.broadcast_idx[col] != NULL);
        cudaAssert(sout.column[col] != NULL);
        BlockReadOffsetGPU3<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[table], item_val, selection_flags, gpuCache, sargs.col_idx[col], sargs.broadcast_idx[col], num_tile_items);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, is_replicated, gpu);
      }
    }
  }

  //TODO: NOT YET SUPPORTED, handle the case where global offset is coming from join
  if (write_offset) {
    cudaAssert(sout.out_off != NULL);
    //scatter the offset (global offset and not the local offset)
    for (int table = 0; table < NUM_TABLE; table++) {
      //offset coming from CPU
      if (sargs.in_off != NULL && sargs.in_off[table] != NULL && table > 0) { //writing the global offset from previous join
          cudaAssert(check_join[table-1] == -1);
          cudaAssert(0); //this will never happen for now
      //offset coming from self
      } else if (sargs.table == table && table == 0) { //writing the global offset of the current table which get partitioned
          cudaAssert(sout.out_off[table] != NULL);
          BlockWriteOffSelf<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.out_off[table], start_offset, num_tile_items, range_size, is_replicated, gpu);
      //offset coming from current join
      } else if (table > 0 && check_join[table-1] != -1 && sout.out_off[table] != NULL) {
          cudaAssert(table != 0);
          // cudaAssert(sout.out_off[table] != NULL);
          BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[table], item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, is_replicated, gpu);
      }
    }
  }
}


template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void probe_partition_GPU2(
  int* gpuCache, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, 
  struct shuffleOutGPU sout, int num_tuples, int write_val = 1, int write_offset = 0, int latemat = 0) {

  __const__ int column_map[NUM_COLUMN] = {
    0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
    2, 2, 2, 2,
    1, 1, 1, 1,
    3, 3, 3, 3,
    4, 4, 4
  };

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each
  typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt;
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;
  // int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment
  int* ptr;

  // Allocate shared memory for BlockLoad
  __shared__ union TempStorage
  {
    typename BlockScanInt::TempStorage scan;
  } temp_storage;

  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  __shared__ int smem_index[NUM_RANGE];

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[NUM_TABLE][ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int item_val[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation)
    ptr = pargs.key_column[0];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
  } else { //we are not doing join for this column, there is no result from prev join (first join)
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items);
  }

  if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) {
    ptr = pargs.key_column[1];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items);
  }

  if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) {
    ptr = pargs.key_column[2];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items);
  }

  if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) {
    ptr = pargs.key_column[3];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items);
  }

  //DO PARTITIONING
  //this range_size would not work for date column
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  //right now we only support probe table as the table getting partitioned
  cudaAssert(sargs.table == 0);

  //loading the partitioned key column and count the partition
  cudaAssert(sargs.column[sargs.key_column] != NULL);
  ptr = sargs.column[sargs.key_column];
  BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, 0, -1);

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        index[ITEM] += smem_index[partition];
      }
    }
  }

  //Barrier
  cudaAssert(sout.column[sargs.key_column] != NULL);
  BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, 0, -1);

  if (write_val) {
    //scatter the value
    cudaAssert(sargs.col_idx != NULL);
    //use the local offset to scatter the value
    //TODO: we have to support the case where the offset for dimension table is a global offset
    for (int col = 0; col < NUM_COLUMN; col++) {
      int table = column_map[col];
      //forwarding column from previous join for table 0
      if (sargs.column[col] != NULL && table == 0 && sargs.key_column != col) {
        //WARNING: IF JOIN KEY HAS BEEN USED, NO NEED TO PARTITION THAT JOIN KEY
        // if (latemat != 2  || (latemat == 2 && col != pargs.fkey_col_id[0] && col != pargs.fkey_col_id[1] && col != pargs.fkey_col_id[2] && col != pargs.fkey_col_id[3])) {
          cudaAssert(sargs.column[col] != NULL);
          cudaAssert(sout.column[col] != NULL);
          ptr = sargs.column[col];
          BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
          // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, num_tile_items);
          BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1);
        // }
      //forwarding column from previous join for dim table
      } else if (latemat == 0 && sargs.column[col] != NULL && table > 0 && pargs.key_column[table-1] == NULL && sargs.key_column != col) {
        cudaAssert(sargs.column[col] != NULL);
        cudaAssert(sout.column[col] != NULL);
        ptr = sargs.column[col];
        BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1);      
      } else if (latemat == 0 && sargs.col_idx[col] != NULL && table > 0 && pargs.key_column[table-1] != NULL && sargs.key_column != col) { //materializing the global offset
        cudaAssert(table != 0);
        cudaAssert(sargs.col_idx[col] != NULL);
        cudaAssert(sargs.broadcast_idx[col] != NULL);
        cudaAssert(sout.column[col] != NULL);
        BlockReadOffsetGPU3<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[table], item_val, selection_flags, gpuCache, sargs.col_idx[col], sargs.broadcast_idx[col], num_tile_items);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[col], num_tile_items, range_size, 0, -1); 
      } 
    }
  }

  if (write_offset) {
    //scatter the offset (global offset and not the local offset), local offset would be useless once the shuffle is done
    for (int table = 0; table < NUM_TABLE; table++) {
      cudaAssert(sargs.in_off != NULL);
        //forwarding offset from table = 0
      if (sargs.in_off[table] != NULL && sargs.table == table && table == 0) {
        cudaAssert(sargs.in_off[table] != NULL);
        cudaAssert(sout.out_off[table] != NULL);     
        ptr = sargs.in_off[table];
        BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
        // BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, num_tile_items);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1);
        //forwarding offset from previous join
      } else if (sargs.in_off[table] != NULL && table > 0 && pargs.key_column[table-1] == NULL) {
        cudaAssert(sargs.in_off[table] != NULL);
        cudaAssert(sout.out_off[table] != NULL);
        ptr = sargs.in_off[table];
        BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1);
        //storing offset from current join
      } else if (table > 0 && pargs.key_column[table-1] != NULL && sout.out_off[table] != NULL) {
        cudaAssert(sargs.in_off[table] != NULL);
        cudaAssert(sout.out_off[table] != NULL);        
        BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[table], item_key, index, selection_flags, sout.out_off[table], num_tile_items, range_size, 0, -1);
      }
    }
  }
}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD>
__global__ void probe_group_by_GPU_lm(
  int** gpuCache, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, int num_tuples,
  int* res, int gpu) {

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)
  
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;
  int* ptr;

  // Load a segment of consecutive items that are blocked across threads
  int items[ITEMS_PER_THREAD];
  int selection_flags[ITEMS_PER_THREAD];
  int groupval[4][ITEMS_PER_THREAD];
  int aggrval1[ITEMS_PER_THREAD];
  int aggrval2[ITEMS_PER_THREAD];

  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes
    ptr = pargs.key_column[0];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval[0], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[0], 0, num_tile_items);
  }

  if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) {
    ptr = pargs.key_column[1];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval[1], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[1], 0, num_tile_items);
  }

  if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) {
    ptr = pargs.key_column[2];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval[2], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
    // if (blockIdx.x == 0) printf("%d %d %d %d\n", groupval[2][0], groupval[2][1], groupval[2][3], groupval[2][4]);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[2], 0, num_tile_items);
  }

  if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) {
    ptr = pargs.key_column[3];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items);
    // if (blockIdx.x == 0) printf("%d %d %d %d %d %d %d %d\n", items[0], items[1], items[2], items[3], selection_flags[0], selection_flags[1], selection_flags[2], selection_flags[3]);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, groupval[3], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[3], 0, num_tile_items);
  }

  for (int i = 0; i < NUM_TABLE-1; i++) {
    if (pargs.key_column[i] == NULL && gargs.group_col_id[i] != -1) {
        ptr = sargs.in_off[i+1];
        BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, selection_flags, num_tile_items);
        // if (i == 2) if (threadIdx.x == 0) printf("%d %d %d %d\n", items[0], items[1], items[2], items[3]);
        BlockReadOffsetGlobal2<BLOCK_THREADS, ITEMS_PER_THREAD>(items, selection_flags, groupval[i], gpuCache, sargs.all_col_idx, sargs.broadcast_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.group_col_id[i], i+1, gpu, num_tile_items);
        // if (blockIdx.x == 0 && i == 3) printf("%d %d %d %d %d %d %d %d\n", groupval[i][0], groupval[i][1], groupval[i][2], groupval[i][3]);
    }
  }

  ptr = sargs.in_off[0];
  BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, selection_flags, num_tile_items);
  if (gargs.aggr_col_id[0] != -1) {
    // BlockReadOffsetGlobal2<BLOCK_THREADS, ITEMS_PER_THREAD>(items, selection_flags, aggrval1, gpuCache, sargs.all_col_idx, sargs.broadcast_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[0], 0, gpu, num_tile_items);
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(items, selection_flags, aggrval1, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[0], 0, gpu, num_tile_items);
    // BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 0, num_tile_items);
  }

  if (gargs.aggr_col_id[1] != -1) {
    // BlockReadOffsetGlobal2<BLOCK_THREADS, ITEMS_PER_THREAD>(items, selection_flags, aggrval2, gpuCache, sargs.all_col_idx, sargs.broadcast_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[1], 0, gpu, num_tile_items);
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(items, selection_flags, aggrval2, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[1], 0, gpu, num_tile_items);
    // BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 0, num_tile_items);
  }

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
    if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
      if (selection_flags[ITEM]) {
        int hash = ((groupval[0][ITEM] - gargs.min_val1) * gargs.unique_val1 + (groupval[1][ITEM] - gargs.min_val2) * gargs.unique_val2 +  (groupval[2][ITEM] - gargs.min_val3) * gargs.unique_val3 + (groupval[3][ITEM] - gargs.min_val4) * gargs.unique_val4) % gargs.total_val; //!
        res[hash * 6] = groupval[0][ITEM];
        res[hash * 6 + 1] = groupval[1][ITEM];
        res[hash * 6 + 2] = groupval[2][ITEM];
        res[hash * 6 + 3] = groupval[3][ITEM];

        int temp;
        if (aggrval1[ITEM] > aggrval2[ITEM]) temp = aggrval1[ITEM] - aggrval2[ITEM];
        else temp = aggrval2[ITEM] - aggrval1[ITEM];
        atomicAdd(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp));

      }
    }
  }
}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD>
__global__ void probe_aggr_GPU_lm(
  int** gpuCache, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, int num_tuples,
  int* res, int gpu) {

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)
  
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;
  int* ptr;

  // Load a segment of consecutive items that are blocked across threads
  int items[ITEMS_PER_THREAD];
  int selection_flags[ITEMS_PER_THREAD];
  int aggrval1[ITEMS_PER_THREAD];
  int aggrval2[ITEMS_PER_THREAD];
  int temp[ITEMS_PER_THREAD];

  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //normal operation, here pargs.key_idx will be lo_partkey, lo_suppkey, etc (the join key column) -> no group by attributes
    ptr = pargs.key_column[0];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
  }

  if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) {
    ptr = pargs.key_column[1];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
  }

  if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) {
    ptr = pargs.key_column[2];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
  }

  if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) {
    ptr = pargs.key_column[3];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, items, temp, selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
  }

  ptr = sargs.in_off[0];
  BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, items, selection_flags, num_tile_items);
  if (gargs.aggr_col_id[0] != -1) {
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(items, selection_flags, aggrval1, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[0], 0, gpu, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items);
  }

  if (gargs.aggr_col_id[1] != -1) {
    BlockReadOffsetGlobal<BLOCK_THREADS, ITEMS_PER_THREAD>(items, selection_flags, aggrval2, gpuCache, sargs.all_col_idx, sargs.seg_row_to_gpu, sargs.seg_is_replicated, gargs.aggr_col_id[1], 0, gpu, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items);
  }

  long long sum = 0;

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
    if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
      if (selection_flags[ITEM]) {
        // printf("here %d %d\n", aggrval1[ITEM], aggrval2[ITEM]);
        sum+= (aggrval1[ITEM] * aggrval2[ITEM]);
      }
    }
  }
  
  __syncthreads();
  static __shared__ long long buffer[32];
  unsigned long long aggregate = BlockSum<long long, BLOCK_THREADS, ITEMS_PER_THREAD>(sum, (long long*)buffer);
  __syncthreads();

  if (threadIdx.x == 0) {
    atomicAdd(reinterpret_cast<unsigned long long*>(&res[4]), aggregate);   
    // printf("%lu\n", res[4]);
  }
}