Lancelot / src / gpudb / SimplePartitioning.h
SimplePartitioning.h
Raw
#pragma once

#include <cub/cub.cuh>
#include <curand.h>

#include <cuda.h>

#include "crystal/crystal.cuh"
#include "BlockLibrary.cuh"
#include "KernelArgs.h"
#include "gpu_utils.h"

template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void probe_partition_GPU_simple(
  int* gpuCache, struct filterArgsGPU fargs, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, 
  struct shuffleOutGPU sout, int num_tuples, int gpu, int start_offset = 0, short* segment_group = NULL,
  int write_val = 1, int write_offset = 0, int latemat = 0) {

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;
  int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment
  int* ptr;

  int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment

  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[NUM_TABLE][ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int item_val[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];

  __shared__ int smem_index[NUM_RANGE];
  __shared__ int64_t key_segment1; 
  __shared__ int64_t key_segment2;
  __shared__ int64_t key_segment3;
  __shared__ int64_t key_segment4;
  // __shared__ int64_t filter_segment1;
  // __shared__ int64_t filter_segment2;
  __shared__ int64_t shuffle_key;
  __shared__ int64_t segment_index;
  __shared__ int is_replicated;

  if (threadIdx.x == 0) {
    if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE];
    else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE;
    if (pargs.key_idx1 != NULL) {
      key_segment1 = pargs.key_idx1[segment_index];
      cudaAssert(key_segment1 != -1);
    }
    if (pargs.key_idx2 != NULL) {
      key_segment2 = pargs.key_idx2[segment_index];
      cudaAssert(key_segment2 != -1);
    }
    if (pargs.key_idx3 != NULL) {
      key_segment3 = pargs.key_idx3[segment_index];
      cudaAssert(key_segment3 != -1);
    }
    if (pargs.key_idx4 != NULL) {
      key_segment4 = pargs.key_idx4[segment_index];
      cudaAssert(key_segment4 != -1);
    }
    // if (fargs.filter_idx1 != NULL) {
    //   filter_segment1 = fargs.filter_idx1[segment_index];
    //   cudaAssert(filter_segment1 != -1);
    // }
    // if (fargs.filter_idx2 != NULL) {
    //   filter_segment2 = fargs.filter_idx2[segment_index];
    //   cudaAssert(filter_segment2 != -1);
    // }
    if (sargs.col_idx[sargs.key_column] != NULL) {
      shuffle_key = sargs.col_idx[sargs.key_column][segment_index];
      cudaAssert(shuffle_key != -1);
    }
    is_replicated = sargs.seg_is_replicated[segment_index];
  }

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  // if (fargs.filter_idx1 != NULL) {
  //   ptr = gpuCache + filter_segment1 * SEGMENT_SIZE;
  //   BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_val, num_tile_items);
  //   BlockPredGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare1, selection_flags, num_tile_items);
  //   BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare2, selection_flags, num_tile_items);
  // }

  // if (fargs.filter_idx2 != NULL) {
  //   ptr = gpuCache + filter_segment2 * SEGMENT_SIZE;
  //   BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_val, num_tile_items);
  //   BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare3, selection_flags, num_tile_items);
  //   BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREAD>(item_val, fargs.compare4, selection_flags, num_tile_items);
  // }

  if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation)
    ptr = gpuCache + key_segment1 * SEGMENT_SIZE;
    // check_join[0] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
  } else { //we are not doing join for this column, there is no result from prev join (first join)
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items);
  }

  if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) {
    ptr = gpuCache + key_segment2 * SEGMENT_SIZE;
    // check_join[1] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items);
  }

  if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) {
    ptr = gpuCache + key_segment3 * SEGMENT_SIZE;
    // check_join[2] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items);
  }

  if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) {
    ptr = gpuCache + key_segment4 * SEGMENT_SIZE;
    // check_join[3] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items);
  }

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
    if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
      if(selection_flags[ITEM]) {
        item_off[0][ITEM] = blockIdx.x * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; //local offset to each partition
      }
    }
  }

  //DO PARTITIONING
  //this range_size would not work for date column
  start_offset = segment_index * SEGMENT_SIZE;
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  //right now we only support probe table as the table getting partitioned
  cudaAssert(sargs.table == 0);

  //loading the partitioned key column and count the partition
  cudaAssert(sargs.col_idx[sargs.key_column] != NULL);
  ptr = gpuCache + shuffle_key * SEGMENT_SIZE;
  BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu);

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        index[ITEM] += smem_index[partition];
      }
    }
  }

  //Barrier
  cudaAssert(sout.column[sargs.key_column] != NULL);
  BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, is_replicated, gpu);
  BlockWriteOffSelf<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.out_off[0], start_offset, num_tile_items, range_size, is_replicated, gpu);
  
  //current join (if used by groupby) -> not used by groupby will have sout.out_off == NULL
  if (pargs.fkey_col_id[0] > 0 && sout.out_off[1] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[1], item_key, index, selection_flags, sout.out_off[1], num_tile_items, range_size, is_replicated, gpu);
  if (pargs.fkey_col_id[1] > 0 && sout.out_off[2] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[2], item_key, index, selection_flags, sout.out_off[2], num_tile_items, range_size, is_replicated, gpu);
  if (pargs.fkey_col_id[2] > 0 && sout.out_off[3] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[3], item_key, index, selection_flags, sout.out_off[3], num_tile_items, range_size, is_replicated, gpu);
  if (pargs.fkey_col_id[3] > 0 && sout.out_off[4] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[4], item_key, index, selection_flags, sout.out_off[4], num_tile_items, range_size, is_replicated, gpu);

  //future join (prev join would have sargs.col_idx == NULL)
  if (pargs.fkey_col_id[0] < 0 && sargs.col_idx[1] != NULL && sargs.key_column != 1) {
    int64_t val_segment = sargs.col_idx[1][segment_index];
    ptr = gpuCache + val_segment * SEGMENT_SIZE;
    BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[1], num_tile_items, range_size, is_replicated, gpu);
  }
  if (pargs.fkey_col_id[1] < 0 && sargs.col_idx[2] != NULL && sargs.key_column != 2) {
    int64_t val_segment = sargs.col_idx[2][segment_index];
    ptr = gpuCache + val_segment * SEGMENT_SIZE;
    BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[2], num_tile_items, range_size, is_replicated, gpu);
  }
  if (pargs.fkey_col_id[2] < 0 && sargs.col_idx[3] != NULL && sargs.key_column != 3) {
    int64_t val_segment = sargs.col_idx[3][segment_index];
    ptr = gpuCache + val_segment * SEGMENT_SIZE;
    BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[3], num_tile_items, range_size, is_replicated, gpu);
  }
  // cudaAssert(sargs.col_idx[pargs.fkey_col_id[3]] != NULL);
  if (pargs.fkey_col_id[3] < 0 && sargs.col_idx[4] != NULL && sargs.key_column != 4) {
    int64_t val_segment = sargs.col_idx[4][segment_index];
    ptr = gpuCache + val_segment * SEGMENT_SIZE;
    BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[4], num_tile_items, range_size, is_replicated, gpu);
  }
}


template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void probe_partition_GPU_simple_groupby_8GPUs(
  int* gpuCache, struct filterArgsGPU fargs, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs, 
  struct shuffleOutGPU sout, int num_tuples, int gpu, int start_offset = 0, short* segment_group = NULL,
  int write_val = 1, int write_offset = 0, int latemat = 0) {

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;
  int tiles_per_segment = SEGMENT_SIZE/tile_size; //how many block per segment
  int* ptr;

  int segment_tile_offset = (blockIdx.x % tiles_per_segment) * tile_size; //tile offset inside a segment

  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int item_val[ITEMS_PER_THREAD];
  int groupval[4][ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];

  __shared__ int smem_index[NUM_RANGE];
  __shared__ int64_t key_segment1; 
  __shared__ int64_t key_segment2;
  __shared__ int64_t key_segment3;
  __shared__ int64_t key_segment4;
  __shared__ int64_t shuffle_key;
  __shared__ int64_t segment_index;
  __shared__ int is_replicated;

  if (threadIdx.x == 0) {
    if (segment_group != NULL) segment_index = segment_group[tile_offset / SEGMENT_SIZE];
    else segment_index = ( start_offset + tile_offset ) / SEGMENT_SIZE;
    if (pargs.key_idx1 != NULL) {
      key_segment1 = pargs.key_idx1[segment_index];
      cudaAssert(key_segment1 != -1);
    }
    if (pargs.key_idx2 != NULL) {
      key_segment2 = pargs.key_idx2[segment_index];
      cudaAssert(key_segment2 != -1);
    }
    if (pargs.key_idx3 != NULL) {
      key_segment3 = pargs.key_idx3[segment_index];
      cudaAssert(key_segment3 != -1);
    }
    if (pargs.key_idx4 != NULL) {
      key_segment4 = pargs.key_idx4[segment_index];
      cudaAssert(key_segment4 != -1);
    }
    if (sargs.col_idx[sargs.key_column] != NULL) {
      shuffle_key = sargs.col_idx[sargs.key_column][segment_index];
      cudaAssert(shuffle_key != -1);
    }
    is_replicated = sargs.seg_is_replicated[segment_index];
  }

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  if (pargs.key_idx1 != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation)
    ptr = gpuCache + key_segment1 * SEGMENT_SIZE;
    // check_join[0] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[0], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
  } else { //we are not doing join for this column, there is no result from prev join (first join)
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[0], 0, num_tile_items);
  }

  if (pargs.key_idx2 != NULL && pargs.ht2 != NULL) {
    ptr = gpuCache + key_segment2 * SEGMENT_SIZE;
    // check_join[1] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[1], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[1], 0, num_tile_items);
  }

  if (pargs.key_idx3 != NULL && pargs.ht3 != NULL) {
    ptr = gpuCache + key_segment3 * SEGMENT_SIZE;
    // check_join[2] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[2], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[2], 0, num_tile_items);
  }

  if (pargs.key_idx4 != NULL && pargs.ht4 != NULL) {
    ptr = gpuCache + key_segment4 * SEGMENT_SIZE;
    // check_join[3] = 1;
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + segment_tile_offset, item_key, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[3], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[3], 0, num_tile_items);
  }

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
    if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
      if(selection_flags[ITEM]) {
        item_off[ITEM] = blockIdx.x * tile_size + threadIdx.x + ITEM * BLOCK_THREADS; //local offset to each partition
      }
    }
  }

  //DO PARTITIONING
  //this range_size would not work for date column
  start_offset = segment_index * SEGMENT_SIZE;
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  //right now we only support probe table as the table getting partitioned
  cudaAssert(sargs.table == 0);

  //loading the partitioned key column and count the partition
  cudaAssert(sargs.col_idx[sargs.key_column] != NULL);
  ptr = gpuCache + shuffle_key * SEGMENT_SIZE;
  BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, is_replicated, gpu);

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        index[ITEM] += smem_index[partition];
      }
    }
  }

  //Barrier
  cudaAssert(sout.column[sargs.key_column] != NULL);
  BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, is_replicated, gpu);
  BlockWriteOffSelf<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.out_off[0], start_offset, num_tile_items, range_size, is_replicated, gpu);
  
  //current join (if used by groupby) -> writing the groupby column value
  if (pargs.fkey_col_id[0] > 0 && gargs.group_col_id[0] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[0], item_key, index, selection_flags, sout.column[gargs.group_col_id[0]], num_tile_items, range_size, is_replicated, gpu);
  if (pargs.fkey_col_id[1] > 0 && gargs.group_col_id[1] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[1], item_key, index, selection_flags, sout.column[gargs.group_col_id[1]], num_tile_items, range_size, is_replicated, gpu);
  if (pargs.fkey_col_id[2] > 0 && gargs.group_col_id[2] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[2], item_key, index, selection_flags, sout.column[gargs.group_col_id[2]], num_tile_items, range_size, is_replicated, gpu);
  if (pargs.fkey_col_id[3] > 0 && gargs.group_col_id[3] != -1) {
    BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[3], item_key, index, selection_flags, sout.column[gargs.group_col_id[3]], num_tile_items, range_size, is_replicated, gpu);
    //  if (blockIdx.x == 0) printf("%d %d %d %d %d %d %d %d\n", selection_flags[0], selection_flags[1], selection_flags[2], selection_flags[3], groupval[3][0], groupval[3][1], groupval[3][2], groupval[3][3]);
  }

  //future join (prev join would have sargs.col_idx == NULL)
  if (pargs.fkey_col_id[0] < 0 && sargs.col_idx[1] != NULL && sargs.key_column != 1) {
    int64_t val_segment = sargs.col_idx[1][segment_index];
    ptr = gpuCache + val_segment * SEGMENT_SIZE;
    BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[1], num_tile_items, range_size, is_replicated, gpu);
  }
  if (pargs.fkey_col_id[1] < 0 && sargs.col_idx[2] != NULL && sargs.key_column != 2) {
    int64_t val_segment = sargs.col_idx[2][segment_index];
    ptr = gpuCache + val_segment * SEGMENT_SIZE;
    BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[2], num_tile_items, range_size, is_replicated, gpu);
  }
  if (pargs.fkey_col_id[2] < 0 && sargs.col_idx[3] != NULL && sargs.key_column != 3) {
    int64_t val_segment = sargs.col_idx[3][segment_index];
    ptr = gpuCache + val_segment * SEGMENT_SIZE;
    BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[3], num_tile_items, range_size, is_replicated, gpu);
  }
  // cudaAssert(sargs.col_idx[pargs.fkey_col_id[3]] != NULL);
  if (pargs.fkey_col_id[3] < 0 && sargs.col_idx[4] != NULL && sargs.key_column != 4) {
    int64_t val_segment = sargs.col_idx[4][segment_index];
    ptr = gpuCache + val_segment * SEGMENT_SIZE;
    BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[4], num_tile_items, range_size, is_replicated, gpu);
  }
  if (gargs.aggr_col_id[0] != -1) {
    int64_t val_segment = sargs.col_idx[gargs.aggr_col_id[0]][segment_index];
    ptr = gpuCache + val_segment * SEGMENT_SIZE;
    BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[gargs.aggr_col_id[0]], num_tile_items, range_size, is_replicated, gpu);
    // if (blockIdx.x == 0) printf("%d %d %d %d %d %d %d %d\n", selection_flags[0], selection_flags[1], selection_flags[2], selection_flags[3], item_val[0], item_val[1], item_val[2], item_val[3]);
  }
  if (gargs.aggr_col_id[1] != -1) {
    int64_t val_segment = sargs.col_idx[gargs.aggr_col_id[1]][segment_index];
    ptr = gpuCache + val_segment * SEGMENT_SIZE;
    BlockLoadAndWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + segment_tile_offset, item_key, item_val, index, selection_flags, sout.column[gargs.aggr_col_id[1]], num_tile_items, range_size, is_replicated, gpu);
  }
}


template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void probe_partition_GPU2_simple(
  int* gpuCache, struct probeArgsGPU pargs, struct shuffleArgsGPU sargs, 
  struct shuffleOutGPU sout, int num_tuples, int write_val = 1, int write_offset = 0, int latemat = 0) {

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;
  int* ptr;


  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  __shared__ int smem_index[NUM_RANGE];

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[NUM_TABLE][ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int item_val[ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation)
    ptr = pargs.key_column[0];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
  } else { //we are not doing join for this column, there is no result from prev join (first join)
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items);
  }

  if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) {
    ptr = pargs.key_column[1];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items);
  }

  if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) {
    ptr = pargs.key_column[2];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items);
  }

  if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) {
    ptr = pargs.key_column[3];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items);
  }

  //DO PARTITIONING
  //this range_size would not work for date column
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  //right now we only support probe table as the table getting partitioned
  cudaAssert(sargs.table == 0);

  //loading the partitioned key column and count the partition
  cudaAssert(sargs.column[sargs.key_column] != NULL);
  ptr = sargs.column[sargs.key_column];
  BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, 0, -1);

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        index[ITEM] += smem_index[partition];
      }
    }
  }

  //Barrier
  cudaAssert(sout.column[sargs.key_column] != NULL);
  BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, 0, -1);
  ptr = sargs.in_off[0];
  BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
  BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.out_off[0], num_tile_items, range_size, 0, -1);
  if (pargs.fkey_col_id[0] > 0 && sout.out_off[1] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[1], item_key, index, selection_flags, sout.out_off[1], num_tile_items, range_size, 0, -1);
  else if (pargs.fkey_col_id[0] < -1 && sargs.in_off[1] != NULL) {
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off[1] + tile_offset, item_off[1], selection_flags, num_tile_items);
    BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[1], item_key, index, selection_flags, sout.out_off[1], num_tile_items, range_size, 0, -1);
  }

  if (pargs.fkey_col_id[1] > 0 && sout.out_off[2] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[2], item_key, index, selection_flags, sout.out_off[2], num_tile_items, range_size, 0, -1);
  else if (pargs.fkey_col_id[1] < -1 && sargs.in_off[2] != NULL) {
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off[2] + tile_offset, item_off[2], selection_flags, num_tile_items);
    BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[2], item_key, index, selection_flags, sout.out_off[2], num_tile_items, range_size, 0, -1);
  }

  if (pargs.fkey_col_id[2] > 0 && sout.out_off[3] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[3], item_key, index, selection_flags, sout.out_off[3], num_tile_items, range_size, 0, -1);
  else if (pargs.fkey_col_id[2] < -1 && sargs.in_off[3] != NULL) {
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off[3] + tile_offset, item_off[3], selection_flags, num_tile_items);
    BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[3], item_key, index, selection_flags, sout.out_off[3], num_tile_items, range_size, 0, -1);
  }

  //current join (if used by groupby) -> not used by group by would have sout.out_off == NULL
  if (pargs.fkey_col_id[3] > 0 && sout.out_off[4] != NULL) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[4], item_key, index, selection_flags, sout.out_off[4], num_tile_items, range_size, 0, -1);
  //prev join (if used by groupby) -> not used by group by would have sargs.in_off == NULL
  else if (pargs.fkey_col_id[3] < -1 && sargs.in_off[4] != NULL) {
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.in_off[4] + tile_offset, item_off[4], selection_flags, num_tile_items);
    BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[4], item_key, index, selection_flags, sout.out_off[4], num_tile_items, range_size, 0, -1);
  }
  
  //future join (past join would have sargs.column == NULL)
  if (pargs.fkey_col_id[0] < 0 && sargs.column[1] != NULL && sargs.key_column != 1) {
      ptr = sargs.column[1];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
      BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[1], num_tile_items, range_size, 0, -1);
  }
  if (pargs.fkey_col_id[1] < 0 && sargs.column[2] != NULL && sargs.key_column != 2) {
      ptr = sargs.column[2];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
      BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[2], num_tile_items, range_size, 0, -1);
  }
  if (pargs.fkey_col_id[2] < 0 && sargs.column[3] != NULL && sargs.key_column != 3) {
      ptr = sargs.column[3];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
      BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[3], num_tile_items, range_size, 0, -1);
  }
  // cudaAssert(sargs.col_idx[pargs.fkey_col_id[3]] != NULL);
  if (pargs.fkey_col_id[3] < 0 && sargs.column[4] != NULL && sargs.key_column != 4) {
      ptr = sargs.column[4];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
      BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[4], num_tile_items, range_size, 0, -1);
  }

}

template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
__global__ void probe_partition_GPU2_simple_groupby_8GPUs(
  int* gpuCache, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, struct shuffleArgsGPU sargs,
  struct shuffleOutGPU sout, int num_tuples, int write_val = 1, int write_offset = 0, int latemat = 0) {

  cudaAssert(NUM_GPU == NUM_RANGE);

  //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
  //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

  // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each
  int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
  int tile_offset = blockIdx.x * tile_size;
  int* ptr;


  int num_tiles = (num_tuples + tile_size - 1) / tile_size;
  int num_tile_items = tile_size;
  if (blockIdx.x == num_tiles - 1) {
    num_tile_items = num_tuples - tile_offset;
  }

  __shared__ int smem_index[NUM_RANGE];

  // Load a segment of consecutive items that are blocked across threads
  int selection_flags[ITEMS_PER_THREAD];
  int item_off[ITEMS_PER_THREAD];
  int item_key[ITEMS_PER_THREAD];
  int item_val[ITEMS_PER_THREAD];
  int groupval[4][ITEMS_PER_THREAD];
  int index[ITEMS_PER_THREAD];

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = 0;
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    index[ITEM] = 0;
  }

  __syncthreads();

  InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

  if (pargs.key_column[0] != NULL && pargs.ht1 != NULL) { //we are doing probing for this column (normal operation)
    ptr = pargs.key_column[0];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[0], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
  } else { //we are not doing join for this column, there is no result from prev join (first join)
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[0], 0, num_tile_items);
  }

  if (pargs.key_column[1] != NULL && pargs.ht2 != NULL) {
    ptr = pargs.key_column[1];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[1], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[1], 0, num_tile_items);
  }

  if (pargs.key_column[2] != NULL && pargs.ht3 != NULL) {
    ptr = pargs.key_column[2];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[2], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[2], 0, num_tile_items);
  }

  if (pargs.key_column[3] != NULL && pargs.ht4 != NULL) {
    ptr = pargs.key_column[3];
    BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
    BlockProbeGroupByGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, groupval[3], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
  } else {
    BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, groupval[3], 0, num_tile_items);
  }

  //DO PARTITIONING
  //this range_size would not work for date column
  int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

  //right now we only support probe table as the table getting partitioned
  cudaAssert(sargs.table == 0);

  //loading the partitioned key column and count the partition
  cudaAssert(sargs.column[sargs.key_column] != NULL);
  ptr = sargs.column[sargs.key_column];
  BlockLoadAndCountIndexPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(ptr + tile_offset, item_key, index, selection_flags, smem_index, num_tile_items, range_size, 0, -1);

  __syncthreads();

  if (threadIdx.x < NUM_RANGE) {
    smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
  }

  __syncthreads();

  #pragma unroll
  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
    if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
      if (selection_flags[ITEM]) {
        int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
        if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
        index[ITEM] += smem_index[partition];
      }
    }
  }

  //Barrier
  cudaAssert(sout.column[sargs.key_column] != NULL);
  BlockWritePartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, selection_flags, sout.column[sargs.key_column], num_tile_items, range_size, 0, -1);
  ptr = sargs.in_off[0];
  BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off, selection_flags, num_tile_items);
  BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off, item_key, index, selection_flags, sout.out_off[0], num_tile_items, range_size, 0, -1);

  // if (threadIdx.x == 0 && blockIdx.x == 0) printf("%d %d\n", pargs.fkey_col_id[0], gargs.group_col_id[0]);
  if (pargs.fkey_col_id[0] > 0 && gargs.group_col_id[0] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[0], item_key, index, selection_flags, sout.column[gargs.group_col_id[0]], num_tile_items, range_size, 0, -1);
  else if (pargs.fkey_col_id[0] < -1 && gargs.group_col_id[0] != -1) {
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.column[gargs.group_col_id[0]] + tile_offset, groupval[0], selection_flags, num_tile_items);
    // groupval[0][0] = 1; groupval[0][1] = 1; groupval[0][2] = 1; groupval[0][3] = 1;
    // printf("here\n");
    BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[0], item_key, index, selection_flags, sout.column[gargs.group_col_id[0]], num_tile_items, range_size, 0, -1);
  }

  if (pargs.fkey_col_id[1] > 0 && gargs.group_col_id[1] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[1], item_key, index, selection_flags, sout.column[gargs.group_col_id[1]], num_tile_items, range_size, 0, -1);
  else if (pargs.fkey_col_id[1] < -1 && gargs.group_col_id[1] != -1) {
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.column[gargs.group_col_id[1]] + tile_offset, groupval[1], selection_flags, num_tile_items);
    BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[1], item_key, index, selection_flags, sout.column[gargs.group_col_id[1]], num_tile_items, range_size, 0, -1);
  }

  if (pargs.fkey_col_id[2] > 0 && gargs.group_col_id[2] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[2], item_key, index, selection_flags, sout.column[gargs.group_col_id[2]], num_tile_items, range_size, 0, -1);
  else if (pargs.fkey_col_id[2] < -1 && gargs.group_col_id[2] != -1) {
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.column[gargs.group_col_id[2]] + tile_offset, groupval[2], selection_flags, num_tile_items);
    BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[2], item_key, index, selection_flags, sout.column[gargs.group_col_id[2]], num_tile_items, range_size, 0, -1);
  }

  //current join (if used by groupby) -> writing the groupby column
  if (pargs.fkey_col_id[3] > 0 && gargs.group_col_id[3] != -1) BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[3], item_key, index, selection_flags, sout.column[gargs.group_col_id[3]], num_tile_items, range_size, 0, -1);
  //prev join (if used by groupby) -> writing the groupby column from prev join
  else if (pargs.fkey_col_id[3] < -1 && gargs.group_col_id[3] != -1) {
    BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(sargs.column[gargs.group_col_id[3]] + tile_offset, groupval[3], selection_flags, num_tile_items);
    BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(groupval[3], item_key, index, selection_flags, sout.column[gargs.group_col_id[3]], num_tile_items, range_size, 0, -1);
  }
  
  //future join (past join would have sargs.column == NULL)
  if (pargs.fkey_col_id[0] < 0 && sargs.column[1] != NULL && sargs.key_column != 1) {
      ptr = sargs.column[1];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
      BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[1], num_tile_items, range_size, 0, -1);
  }
  if (pargs.fkey_col_id[1] < 0 && sargs.column[2] != NULL && sargs.key_column != 2) {
      ptr = sargs.column[2];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
      BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[2], num_tile_items, range_size, 0, -1);
  }
  if (pargs.fkey_col_id[2] < 0 && sargs.column[3] != NULL && sargs.key_column != 3) {
      ptr = sargs.column[3];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
      BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[3], num_tile_items, range_size, 0, -1);
  }
  // cudaAssert(sargs.col_idx[pargs.fkey_col_id[3]] != NULL);
  if (pargs.fkey_col_id[3] < 0 && sargs.column[4] != NULL && sargs.key_column != 4) {
      ptr = sargs.column[4];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
      BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[4], num_tile_items, range_size, 0, -1);
  }

  if (gargs.aggr_col_id[0] != -1) {
      ptr = sargs.column[gargs.aggr_col_id[0]];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
      BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[gargs.aggr_col_id[0]], num_tile_items, range_size, 0, -1);
  }
  if (gargs.aggr_col_id[1] != -1) {
      ptr = sargs.column[gargs.aggr_col_id[1]];
      BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_val, selection_flags, num_tile_items);
      BlockWriteValPartition2<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, selection_flags, sout.column[gargs.aggr_col_id[1]], num_tile_items, range_size, 0, -1);
  }

  //WARNING: aggr_col_id and group_col_id represent slightly different information
  //aggr_col_id != -1 meaning that segment group will do agregation on that column on GPU
  //group_col_id != -1 meaning that there will be groupby on that column at some point (doesn't matter whether its in CPU or GPU)

}