Lancelot / src / gpudb / KernelArgs.h
KernelArgs.h
Raw
#ifndef _KERNEL_ARGS_H_
#define _KERNEL_ARGS_H_

#include "common.h"

// #define BLOCK_T 128
// #define ITEMS_PER_T 4

#define MAX_JOIN 4
#define MAX_GROUPBY 4
#define MAX_AGGR 2

class ColumnInfo;

template<typename T>
using group_func_t = T (*) (T, T);

template<typename T, int BLOCK_THREADS, int ITEMS_PER_THREADS>
using filter_func_t_dev = void (*) (T(&) [ITEMS_PER_THREADS], int(&) [ITEMS_PER_THREADS], T, T, int);

template<typename T>
using filter_func_t_host = bool (*) (T, T, T);

template <typename T> 
__device__ T sub_func (T x, T y)
{
    return x - y;
}

template <typename T> 
__device__ T mul_func (T x, T y)
{
    return x * y;
}

template <typename T> 
T host_sub_func (T x, T y)
{
    return x - y;
}

template <typename T> 
T host_mul_func (T x, T y)
{
    return x * y;
}

template<typename T, int BLOCK_THREADS, int ITEMS_PER_THREADS>
__device__ void pred_eq (
	T  (&items)[ITEMS_PER_THREADS],
	int (&selection_flags)[ITEMS_PER_THREADS], 
	T compare1, T compare2, int num_tile_items)
{
    BlockPredAndEQ<int, BLOCK_THREADS, ITEMS_PER_THREADS>(items, compare1, selection_flags, num_tile_items);
}

template<typename T, int BLOCK_THREADS, int ITEMS_PER_THREADS>
__device__ void pred_eq_or_eq (
	T  (&items)[ITEMS_PER_THREADS],
	int (&selection_flags)[ITEMS_PER_THREADS], 
	T compare1, T compare2, int num_tile_items)
{
	BlockPredAndEQ<int, BLOCK_THREADS, ITEMS_PER_THREADS>(items, compare1, selection_flags, num_tile_items);
	BlockPredOrEQ<int, BLOCK_THREADS, ITEMS_PER_THREADS>(items, compare2, selection_flags, num_tile_items);
}

template<typename T, int BLOCK_THREADS, int ITEMS_PER_THREADS>
__device__ void pred_between (
	T  (&items)[ITEMS_PER_THREADS],
	int (&selection_flags)[ITEMS_PER_THREADS], 
	T compare1, T compare2, int num_tile_items)
{
    BlockPredAndGTE<int, BLOCK_THREADS, ITEMS_PER_THREADS>(items, compare1, selection_flags, num_tile_items);
    BlockPredAndLTE<int, BLOCK_THREADS, ITEMS_PER_THREADS>(items, compare2, selection_flags, num_tile_items);
}

template<typename T>
bool host_pred_eq (T x, T compare1, T compare2) {
	return (x == compare1);
}

template<typename T>
bool host_pred_eq_or_eq (T x, T compare1, T compare2) {
	return ((x == compare1) || (x == compare2));
}

template<typename T>
bool host_pred_between (T x, T compare1, T compare2) {
	return ((x >= compare1) && (x <= compare2));
}

template <typename T> 
__device__ group_func_t<T> p_sub_func = sub_func<T>;

template <typename T> 
__device__ group_func_t<T> p_mul_func = mul_func<T>;

template<typename T, int BLOCK_THREADS, int ITEMS_PER_THREADS>
__device__ filter_func_t_dev<T, BLOCK_THREADS, ITEMS_PER_THREADS> p_pred_eq = pred_eq<T, BLOCK_THREADS, ITEMS_PER_THREADS>;

template<typename T, int BLOCK_THREADS, int ITEMS_PER_THREADS>
__device__ filter_func_t_dev<T, BLOCK_THREADS, ITEMS_PER_THREADS> p_pred_eq_or_eq = pred_eq_or_eq<T, BLOCK_THREADS, ITEMS_PER_THREADS>;

template<typename T, int BLOCK_THREADS, int ITEMS_PER_THREADS>
__device__ filter_func_t_dev<T, BLOCK_THREADS, ITEMS_PER_THREADS> p_pred_between = pred_between<T, BLOCK_THREADS, ITEMS_PER_THREADS>;

class QueryParams{
public:

  int query;
  
  map<ColumnInfo*, int> min_key;
  map<ColumnInfo*, int> max_key;
  map<ColumnInfo*, int> min_val;
  map<ColumnInfo*, int> unique_val;
  map<ColumnInfo*, int> dim_len;

  vector<map<ColumnInfo*, int>> dim_len_GPU;
  vector<map<ColumnInfo*, int>> min_key_GPU;

  map<ColumnInfo*, int*> ht_CPU;
  vector<map<ColumnInfo*, int*>> ht_GPU;

  map<int, bool> ht_replicated;

  map<ColumnInfo*, int> compare1;
  map<ColumnInfo*, int> compare2;
  map<ColumnInfo*, int> mode;

  map<ColumnInfo*, float> selectivity; //estimated output
  map<ColumnInfo*, float> real_selectivity; //real_selectivity

  int total_val, mode_group;

  int* res;
  vector<int*> d_res;

  group_func_t<int> d_group_func;
  group_func_t<int> h_group_func;

  map<ColumnInfo*, filter_func_t_dev<int, 128, 4>> map_filter_func_dev;
  map<ColumnInfo*, filter_func_t_host<int>> map_filter_func_host;

  bool* joinGPUcheck;
  bool groupGPUcheck;

  QueryParams(int _query): query(_query) {
    assert(_query == 11 || _query == 12 || _query == 13 ||
          _query == 21 || _query == 22 || _query == 23 ||
          _query == 31 || _query == 32 || _query == 33 || _query == 34 ||
          _query == 41 || _query == 42 || _query == 43 || 
		  _query == 51 || _query == 52 || _query == 53 || _query == 54); 
	ht_GPU.resize(NUM_GPU);
	d_res.resize(NUM_GPU);
	dim_len_GPU.resize(NUM_GPU);
	min_key_GPU.resize(NUM_GPU);
  };
};

typedef struct probeArgsGPU {
	int* key_idx1;
	int* key_idx2;
	int* key_idx3;
	int* key_idx4;
	int* ht1;
	int* ht2;
	int* ht3;
	int* ht4;
	int dim_len1;
	int dim_len2;
	int dim_len3; 
	int dim_len4;
	int min_key1;
	int min_key2;
	int min_key3;
	int min_key4;
	int** key_column;
	int* fkey_col_id;

	// probeArgsGPU()
	// : key_idx1(NULL), key_idx2(NULL), key_idx3(NULL), key_idx4(NULL),
	// 	ht1(NULL), ht2(NULL), ht3(NULL), ht4(NULL),
	// 	dim_len1(0), dim_len2(0), dim_len3(0), dim_len4(0),
	// 	min_key1(0), min_key2(0), min_key3(0), min_key4(0) {}
} probeArgsGPU;

typedef struct probeArgsCPU {
	int* key_col1;
	int* key_col2;
	int* key_col3;
	int* key_col4;
	int* ht1;
	int* ht2;
	int* ht3;
	int* ht4;
	int dim_len1;
	int dim_len2;
	int dim_len3; 
	int dim_len4;
	int min_key1;
	int min_key2;
	int min_key3;
	int min_key4;

	// probeArgsCPU()
	// : key_col1(NULL), key_col2(NULL), key_col3(NULL), key_col4(NULL),
	// 	ht1(NULL), ht2(NULL), ht3(NULL), ht4(NULL),
	// 	dim_len1(0), dim_len2(0), dim_len3(0), dim_len4(0),
	// 	min_key1(0), min_key2(0), min_key3(0), min_key4(0) {}
} probeArgsCPU;

typedef struct buildArgsGPU {
	int *key_idx;
	int *val_idx;
	int num_slots;
	int val_min;
	int val_max;
	int* key_col;
	int* val_col;
	int* broadcast_key_idx;
	int* broadcast_val_idx;
	int* broadcast_filter_idx;

	// buildArgsGPU()
	// : key_idx(NULL), val_idx(NULL), num_slots(0), val_min(0) {}
} buildArgsGPU;

typedef struct buildArgsCPU {
	int *key_col;
	int *val_col;
	int num_slots;
	int val_min;
	int val_max;

	// buildArgsCPU()
	// : key_col(NULL), val_col(NULL), num_slots(0), val_min(0) {}
} buildArgsCPU;

typedef struct groupbyArgsGPU {
	int* aggr_idx1;
	int* aggr_idx2;
	int* group_idx1;
	int* group_idx2;
	int* group_idx3;
	int* group_idx4;
	int min_val1;
	int min_val2;
	int min_val3;
	int min_val4;
	int unique_val1;
	int unique_val2;
	int unique_val3;
	int unique_val4;
	int total_val;
	int mode;
	int** group_column;
	int** aggr_column;
	int* group_col_id;
	int* aggr_col_id;

	group_func_t<int> d_group_func;

	// groupbyArgsGPU()
	// : aggr_idx1(NULL), aggr_idx2(NULL), group_idx1(NULL), group_idx2(NULL), group_idx3(NULL), group_idx4(NULL),
	//   min_val1(0), min_val2(0), min_val3(0), min_val4(0), unique_val1(0), unique_val2(0), unique_val3(0), unique_val4(0),
	//   total_val(0), mode(0) {}
} groupbyArgsGPU;

typedef struct groupbyArgsCPU {
	int* aggr_col1;
	int* aggr_col2;
	int* group_col1;
	int* group_col2;
	int* group_col3;
	int* group_col4;
	int min_val1;
	int min_val2;
	int min_val3;
	int min_val4;
	int unique_val1;
	int unique_val2;
	int unique_val3;
	int unique_val4;
	int total_val;
	int mode;

	group_func_t<int> h_group_func;

	// groupbyArgsCPU()
	// : aggr_col1(NULL), aggr_col2(NULL), group_col1(NULL), group_col2(NULL), group_col3(NULL), group_col4(NULL),
	//   min_val1(0), min_val2(0), min_val3(0), min_val4(0), unique_val1(0), unique_val2(0), unique_val3(0), unique_val4(0),
	//   total_val(0), mode(0) {}
} groupbyArgsCPU;

typedef struct filterArgsGPU {
	int* filter_idx1;
	int* filter_idx2;
	int compare1;
	int compare2;
	int compare3;
	int compare4;
	int mode1;
	int mode2;
	int* filter_col;

	filter_func_t_dev<int, 128, 4> d_filter_func1;
	filter_func_t_dev<int, 128, 4> d_filter_func2;

	// filterArgsGPU()
	// : filter_idx1(NULL), filter_idx2(NULL), compare1(0), compare2(0), compare3(0), compare4(0),
	// mode1(0), mode2(0) {}
} filterArgsGPU;

typedef struct filterArgsCPU {
	int* filter_col1;
	int* filter_col2;
	int compare1;
	int compare2;
	int compare3;
	int compare4;
	int mode1;
	int mode2;

	filter_func_t_host<int> h_filter_func1;
	filter_func_t_host<int> h_filter_func2;

	// filterArgsCPU()
	// : filter_col1(NULL), filter_col2(NULL), compare1(0), compare2(0), compare3(0), compare4(0),
	// mode1(0), mode2(0) {}
} filterArgsCPU;

typedef struct offsetGPU {
	int* lo_off;
	int* dim_off1;
	int* dim_off2;
	int* dim_off3;
	int* dim_off4;
	int** join_result_off;

	// offsetGPU()
	// : lo_off(NULL), dim_off1(NULL), dim_off2(NULL), dim_off3(NULL), dim_off4(NULL) {}
} offsetGPU;

typedef struct offsetCPU {
	int* h_lo_off;
	int* h_dim_off1;
	int* h_dim_off2;
	int* h_dim_off3;
	int* h_dim_off4;

	// offsetCPU()
	// : h_lo_off(NULL), h_dim_off1(NULL), h_dim_off2(NULL), h_dim_off3(NULL), h_dim_off4(NULL) {}
} offsetCPU;


typedef struct shuffleArgsGPU {
	int** column;
	int**** column_part; //column but partitioned in multiple GPUs
	int** col_idx; //column index (only for current GPU)
	int** in_off; //column offset from CPU (global offset) (table)
	int**** in_off_part; //column offset from CPU but partitioned in multiple GPUs (global offset) used in pipelined shuffle (gpu, table, partition)
	int** local_off; //intermediate result of join (local offset for probe table but global offset for build table) (table)
	int key_column;
	int table;
	int min_key;
	int max_key;
	int*** all_col_idx; //column index (for all GPUs)
	int** seg_row_to_gpu; //segment row to single gpu
	int* seg_is_replicated; //check if that segment is replicated
	int** broadcast_idx; //just like col_idx but for broadcasted segment
} shuffleArgsGPU;

typedef struct shuffleOutGPU {
	int* pos;
	int*** column;
	int*** out_off; //output_offset (for global materialization, this is global offset) (table, partition)
} shuffleOutGPU;

//have to think if temp_off should exist as well (intermediate result of shuffling for global offset)
typedef struct shuffleHelper {
	int** result_count; //partitioning result count
	int*** temp_col; //intermediate result of shuffling/exchanging output of partitioning (column)
	// int*** temp_off; //intermediate result of shuffling/exchanging output of partitioning (global offset)
	//temp_off is now replaced by regular off_col which is passed as a function argument
	int*** join_result_off; //intermediate result of join (local offset to probe table and global offset to build table) (gpu, table)
	int**** out_shuffle_col; //output of partitioning (column)
    int**** out_shuffle_off; //output of partitioning (global offset)
	int**** in_shuffle_col; //input of partitioning (column) for pipelined shuffle (gpu, table, partition)
	int**** in_shuffle_off; //input of partitioning (global offset) for pipelined shuffle (gpu, table, partition)
} shuffleHelper;

//FOR NOW temp_off IS REPLACED BY REGULAR off_col (PASSED AS FUNCTION ARGUMENTS)

// offset note:
// 1. we haven't supported the case where we use the offset in the dimension table (this offset should be global offset when passed to CPU)
// 2. join_result_off currently is a local offset for the probe table and global offset for the dimension table
// 3. out_off in RangePartitionKeyValue and RangePartitionKeyValue2 and RangePartitionKeyValue2 is a global offset from CPU (offset coming from in_off) or from offset calculation but still global (from segment_id)
// 4. out_off in shuffle_probe_partition_global is a global offset coming from in_off_part (in_off_part is coming from out_shuffle_off)
// 5. in_off_part is a global offset and it's coming from out_shuffle_off which is coming from out_off
// 6. out_shuffle_off is the out_off in rangepartitioningkeyvalue (global)
// 7. in_shuffle_off is the global offset coming from CPU
// 8. off_col is in_shuffle_off and is also coming from CPU, off_col is also used as temp_off and coming from out_shuffle_off. off_col is not that much used for late materialization
// 9. segment_idx inside a kernel is actually segment_id (global)
// 10. join_result_off is allocated as off_col_out

//ASSUMPTION:
//1. replicated granularity is segment row
//2. all GPU are used during execution
//3. star schema workload
//4. replication on fact table only benefits the first join where the inputs are still segments
//5. broadcast segment from dimension table only
//6. latemat = 1 not supported
//7. GPU execute first, and then CPU, this means that for dim table, if the filter column not fully cached, the join will be done on CPU

//step by step (if CPU is included)
//1. partitioning input: segment & h_off_col output: out_shuffle_col & out_shuffle_off (global offset)
//2. shuffling input: out_shuffle_col & out_shuffle_off (global offset) output: temp_col & temp_off (global offset)
//3. join input: temp_col & temp_off (global offset) output: join_result_off (local offset to probe table and global offset to build table)
//4. partitioning input: join_result_off (local offset to probe table and global offset to build table) output: out_shuffle_col & out_shuffle_off (global offset)

//step by step (if CPU is not included)
//1. partitioning input: segment output: out_shuffle_col
//2. shuffling input: out_shuffle_col output:temp_col
//3. join input: temp_col output: join_result_off (local offset to probe table and global offset to build table)
//4. partitioning input: join_result_off (local offset to probe table and global offset to build table) output: out_shuffle_col

//step by step (if late materialization and CPU is included) -> THIS IS NOT USED ANYMORE
//first shuffle
//1. partitioning input: segment & h_off_col output: out_shuffle_off(global offset) & out_shuffle_col(just key)
//2. shuffling input: out_shuffle_off(global offset) & out_shuffle_col(just key) output:temp_col(just key) & temp_off(global offset)
//3. join input: temp_col(just key) & temp_off(global offset) output: join_result_off (global offset)
//4. send join_result_off (global offset) back (just send the probe offset is enough)
//5. materialize probe table input: join_result_off (global offset) output: out_shuffle_off(global offset) & out_shuffle_col
//6. send out_shuffle_off(global offset) & out_shuffle_col and it becomes temp_col & temp_off(global offset)
//7. materialize build table input: join_result_off (global offset) output: temp_col & temp_off(global offset)
//second shuffle
//8. partitioning input: temp_col & temp_off(global offset) output: out_shuffle_off(local offset) & out_shuffle_col(just key) 
//9. shuffling input: out_shuffle_off(local offset) & out_shuffle_col(just key) output:temp_col(just key) & temp_off(local to prev node)
//10. join input: temp_col(just key) & temp_off(local to prev node) output: join_result_off(local to prev_node for probe table and global to build table) and temp_col (build table columns only)
//11. send join_result_off(local to prev_node for probe table) back (just send the probe offset is enough)
//12. materialize probe table input: join_result_off(local to prev node for probe table) output: out_shuffle_off(global offset) & out_shuffle_col
//13. send out_shuffle_off(global offset) & out_shuffle_col and it becomes temp_col & temp_off(global offset)
//14. materialize build table input: join_result_off (global to build table) output: temp_col & temp_off (global offset)

//Three late materialization options (from the most eager to the laziest)
//1. described above
//2. when materialize, only materialize all the key columns for the next join instead of the whole table
//3. when partitioning, the output would be all the key columns for all the subsequent joins
//the problem of 2 and 3 is that when we need to materialize it at the very end, it is possible that each element in the row ID pair (i.e. 10, 23, 100) can reside in different GPUs.
//solution: 
//1. broadcast offset all to all, materialize, merge
//2. partition offset of one table, send to the right GPU partition, materialize, merge, and then repeat it for every table offset
//3. UVA where GPU in which the offset resides do random read to the data
//4. UVA where every GPU do a sequential read to offsets resides in every other GPU, materialize, merge
//5. send to CPU, materialize


// template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
// __global__ void shuffle_probe_aggr_GPU(
//   int* partition_count, struct probeArgsGPU pargs, struct groupbyArgsGPU gargs, 
//   struct shuffleArgsGPU sargs, int* res, int gpu, int latemat = 1) {

//   //assume start_offset always in the beginning of a segment (ga mungkin start di tengah2 segment)
//   //assume tile_size is a factor of SEGMENT_SIZE (SEGMENT SIZE kelipatan tile_size)

//   cudaAssert(NUM_GPU == NUM_RANGE);

//   // Specialize BlockLoad for a 1D block of 128 threads owning 4 integer items each
//   typedef cub::BlockScan<int, BLOCK_THREADS> BlockScanInt;
//   int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
//   int* ptr;

//   // Allocate shared memory for BlockLoad
//   __shared__ union TempStorage
//   {
//     typename BlockScanInt::TempStorage scan;
//   } temp_storage;

//   // Load a segment of consecutive items that are blocked across threads
//   int selection_flags[ITEMS_PER_THREAD];
//   int item_off[NUM_TABLE][ITEMS_PER_THREAD];
//   int item_key[ITEMS_PER_THREAD];
//   int index[ITEMS_PER_THREAD];
//   int aggrval1[ITEMS_PER_THREAD];
//   int aggrval2[ITEMS_PER_THREAD];

//   __shared__ int block_scan_partition[NUM_GPU];
//   __shared__ int smem_index[NUM_RANGE];
//   __shared__ int my_partition;
//   __shared__ int blockId; //blockId within a partition

//   __syncthreads();

//   if (threadIdx.x < NUM_RANGE) {
//     smem_index[threadIdx.x] = 0;
//   }

//   __syncthreads();

//   #pragma unroll
//   for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
//     index[ITEM] = 0;
//   }

//   if (threadIdx.x == 0) {
//     my_partition = -1; blockId = -1;
//     for (int partition = 0; partition < NUM_GPU; partition++) {
//       if (partition == 0) {
//         block_scan_partition[partition] = (partition_count[partition] + tile_size - 1) / tile_size;
//       } else {
//         block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_size - 1) / tile_size;
//       }
//     }
//     for (int partition = 0; partition < NUM_GPU; partition++) {
//       if (blockIdx.x < block_scan_partition[partition]) {
//         my_partition = partition;
//         if (my_partition == 0) blockId = blockIdx.x;
//         else blockId = blockIdx.x - block_scan_partition[my_partition-1];
//         break;
//       }
//     }
//   }

//   __syncthreads();

//   cudaAssert(my_partition != -1);
//   cudaAssert(blockId != -1);
//   int num_tile_items = tile_size;
//   int tile_offset = blockId * tile_size;
//   int num_tiles_partition = (partition_count[my_partition] + tile_size - 1) / tile_size;
//   if (blockId == num_tiles_partition - 1) {
//     num_tile_items = partition_count[my_partition] - tile_offset;
//   }

//   InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

//   cudaAssert(sargs.column_part[my_partition] != NULL);

//   if (pargs.fkey_col_id[0] != -1 && pargs.ht1 != NULL) { //we are reading a remote partition for this column
//     cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[0]] != NULL);
//     ptr = sargs.column_part[my_partition][pargs.fkey_col_id[0]][gpu];
//     if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
//     else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
//     BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[1], selection_flags, pargs.ht1, pargs.dim_len1, pargs.min_key1, num_tile_items);
//   } else { //we are not doing join for this column, there is no result from prev join (first join)
//     BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[1], 1, num_tile_items);
//   }

//   if (pargs.fkey_col_id[1] != -1 && pargs.ht2 != NULL) {
//     cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[1]] != NULL);
//     ptr = sargs.column_part[my_partition][pargs.fkey_col_id[1]][gpu];
//     if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
//     else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
//     BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[2], selection_flags, pargs.ht2, pargs.dim_len2, pargs.min_key2, num_tile_items);
//   } else {
//     BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[2], 1, num_tile_items);
//   }

//   if (pargs.fkey_col_id[2] != -1 && pargs.ht3 != NULL) {
//     cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[2]] != NULL);
//     ptr = sargs.column_part[my_partition][pargs.fkey_col_id[2]][gpu];
//     if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
//     else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
//     BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[3], selection_flags, pargs.ht3, pargs.dim_len3, pargs.min_key3, num_tile_items);
//   } else {
//     BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[3], 1, num_tile_items);
//   }

//   if (pargs.fkey_col_id[3] != -1 && pargs.ht4 != NULL) {
//     cudaAssert(sargs.column_part[my_partition][pargs.fkey_col_id[3]] != NULL);
//     ptr = sargs.column_part[my_partition][pargs.fkey_col_id[3]][gpu];
//     if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, num_tile_items);
//     else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_key, selection_flags, num_tile_items);
//     BlockProbeGPU<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_key, item_off[4], selection_flags, pargs.ht4, pargs.dim_len4, pargs.min_key4, num_tile_items);
//   } else {
//     BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[4], 1, num_tile_items);
//   }

//   if (gargs.aggr_col_id[0] != -1) {
//     cudaAssert(sargs.column_part[my_partition][gargs.aggr_col_id[0]] != NULL);
//     ptr = sargs.column_part[my_partition][gargs.aggr_col_id[0]][gpu];
//     if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, aggrval1, num_tile_items);
//     else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, aggrval1, selection_flags, num_tile_items);
//   } else {
//     BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval1, 1, num_tile_items);
//   }

//   if (gargs.aggr_col_id[1] != -1) {
//     cudaAssert(sargs.column_part[my_partition][gargs.aggr_col_id[1]] != NULL);
//     ptr = sargs.column_part[my_partition][gargs.aggr_col_id[1]][gpu];
//     if (latemat == 0) BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, aggrval2, num_tile_items);
//     else BlockLoadMasked<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, aggrval2, selection_flags, num_tile_items);
//   } else {
//     BlockSetValue<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, aggrval2, 1, num_tile_items);
//   }

//   long long sum = 0;

//   #pragma unroll
//   for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) {
//     if (threadIdx.x + ITEM * BLOCK_THREADS < num_tile_items) {
//       if (selection_flags[ITEM]) {
//         sum += (aggrval1[ITEM] * aggrval2[ITEM]);
//       }
//     }
//   }

//   __syncthreads();
//   static __shared__ long long buffer[32];
//   unsigned long long aggregate = BlockSum<long long, BLOCK_THREADS, ITEMS_PER_THREAD>(sum, (long long*)buffer);
//   __syncthreads();

//   if (threadIdx.x == 0) {
//     atomicAdd(reinterpret_cast<unsigned long long*>(&res[4]), aggregate);   
//   }

// }
#endif