Lancelot / src / gpudb / MultiGPUProcessing.h
MultiGPUProcessing.h
Raw
#ifndef _MULTI_PROCESSING_H_
#define _MULTI_PROCESSING_H_

#pragma once

#include "QueryOptimizer.h"
#include "GPUProcessing.h"
#include "CPUProcessing.h"
#include "common.h"

class CPUGPUProcessing {
public:
  CacheManager* cm;
  QueryOptimizer* qo;
  ncclComm_t* comms;

  bool custom;
  bool skipping;
  bool reorder;

  int*** col_idx; //for each gpu, store the local index of each segment involved in this kernel
  int**** all_col_idx; //for each gpu, store the index of each segment for each column in each gpu
  int*** seg_row_to_gpu; //for each gpu, store the location of each segment row for each table
  int*** d_off_to_seg_id; //for each gpu, for each table, convert the offset to segment id of that table for broadcasted data
  int*** broadcast_idx; //same as col_idx but for broadcasted segment 
  int*** d_broadcast_idx; //gpu located version of broadcast_idx
  int** broadcast_count;//for each gpu, for each table, count broadcasted segment
  chrono::high_resolution_clock::time_point begin_time;
  bool verbose;

  double cpu_total;
  double gpu_total;

  double* cpu_time;
  double* gpu_time;
  double* nvlink_bytes;
  double* shuffle_time;

  unsigned long long* cpu_to_gpu;
  unsigned long long* gpu_to_cpu;

  unsigned long long cpu_to_gpu_total;
  unsigned long long gpu_to_cpu_total;

  double execution_total;
  double optimization_total;
  double merging_total;
	double preparing_total;
	double nvlink_total;
	double shuffle_total;

  int sg_broadcast_count;

  CPUGPUProcessing(size_t _cache_size, size_t _broadcast_size, size_t _processing_size, size_t _pinned_memsize, ncclComm_t* _comms, 
      bool _verbose, bool _skipping, bool _reorder);

  ~CPUGPUProcessing() {

   	for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
	      for (int col = 0; col < cm->TOT_COLUMN; col++) {
          delete[] all_col_idx[gpu][gpu_iter][col];
	      }
        delete[] all_col_idx[gpu][gpu_iter];
      } 		
      delete[] all_col_idx[gpu];
  	}
    delete[] all_col_idx;

   	for (int gpu = 0; gpu < NUM_GPU; gpu++) {
	    for (int table = 0; table < cm->TOT_TABLE; table++) {
	      	delete[] seg_row_to_gpu[gpu][table];
	    }
	    delete[] seg_row_to_gpu[gpu];
  	}
    delete[] seg_row_to_gpu;

   	for (int gpu = 0; gpu < NUM_GPU; gpu++) {
	    for (int i = 0; i < cm->TOT_COLUMN; i++) {
	      	delete[] col_idx[gpu][i];
	    } 		
	    delete[] col_idx[gpu];
  	}
    delete[] col_idx;
    delete[] cpu_time;
    delete[] gpu_time;
    delete[] shuffle_time;
    delete[] nvlink_bytes;
    delete[] cpu_to_gpu;
    delete[] gpu_to_cpu;
    delete qo;
  }

  void resetCGP() {
  	for (int gpu = 0; gpu < NUM_GPU; gpu++) {
	    for (int i = 0; i < cm->TOT_COLUMN; i++) {
	      	if (col_idx[gpu][i] != NULL && !custom) cudaFree(col_idx[gpu]);
          if (cm->allColumn[i]->table_id > 0) { //only do this for dimension table
            //we might want to remove this
            memset(broadcast_idx[gpu][i], -1, cm->allColumn[i]->total_segment * sizeof(int));
          }
          col_idx[gpu][i] = NULL;
	    } 		
  	}
  }

  void renewMetadata() {

    cudaStream_t stream[NUM_GPU];

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaStreamCreate(&stream[gpu]));
    }
    CubDebugExit(cudaSetDevice(0));

    //TODO: these two metadata transfers should have been done after every replication epoch
    //transferring global index to each GPU
    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
          for (int column = 0; column < NUM_COLUMN; column++) {
            assert(all_col_idx[gpu][gpu_iter][column] != NULL);
            CubDebugExit(cudaSetDevice(gpu));
            CubDebugExit(cudaMemcpyAsync(all_col_idx[gpu][gpu_iter][column], cm->segment_list[gpu_iter][column], cm->allColumn[column]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
            CubDebugExit(cudaStreamSynchronize(stream[gpu]));
            CubDebugExit(cudaSetDevice(0));
          }
        }
    }

	  //transferring seg_row_to_GPU
	  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        for (int table = 0; table < NUM_TABLE; table++) {
          assert(seg_row_to_gpu[gpu][table] != NULL);
          CubDebugExit(cudaSetDevice(gpu));
          CubDebugExit(cudaMemcpyAsync(seg_row_to_gpu[gpu][table], cm->seg_row_to_single_gpu[table], cm->allTable[table]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
          CubDebugExit(cudaStreamSynchronize(stream[gpu]));
          CubDebugExit(cudaSetDevice(0));
        }
    }

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaStreamSynchronize(stream[gpu]));
      CubDebugExit(cudaStreamDestroy(stream[gpu]));
    }
    CubDebugExit(cudaSetDevice(0));

  }

  void resetTime();

  void switchDeviceGPUtoCPU(int* h_total_all, int** h_total, int*** off_col, int** h_off_col, int sg, cudaStream_t* stream);

  void switchDeviceCPUtoGPUScatter(int** h_total, int*** off_col, int*** h_off_col_part, int sg, cudaStream_t* stream);

  void switchDeviceCPUtoGPUBroadcast(int* h_total_all, int** h_total, int*** off_col, int** h_off_col, int sg, cudaStream_t* stream);

  void switchDeviceCPUtoGPUBroadcastDim(int* h_total_all, int** h_total, int*** off_col, int* h_off_col, int table, int sg, cudaStream_t* stream);

  void broadcastIdxTransfer(int*** d_broadcast_idx, int*** used_broadcast_idx, cudaStream_t* stream);

  void broadcastSegments(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res);

  void broadcastSegments2(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res);

  void broadcastSegmentsNCCL(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res, cudaStream_t* stream);

  void broadcastSegmentsNCCL2(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res, cudaStream_t* stream);

  // void switch_device_fact(int*** &off_col, int** &h_off_col_all, int** &d_total, int** h_total, int sg, int mode, int table, cudaStream_t* stream);

  // void switch_device_dim(int** &off_col, int* &h_off_col_all, int** &d_total, int** h_total, int sg, int mode, int table, cudaStream_t* stream);

  void shuffleData(int*** &off_col, int** h_total, struct shuffleHelper* shelper, bool first_shuffle, cudaStream_t* stream);

  void shuffleDataNCCL(int*** &off_col, int** h_total, struct shuffleHelper* shelper, bool first_shuffle, int sg, cudaStream_t* stream);

  void shuffleDataOpt(int*** &off_col, int** h_total, struct shuffleHelper* shelper, bool first_shuffle, int sg, cudaStream_t* stream);

  // void call_pfilter_probe_GPU(QueryParams* params, int** &off_col, int* &d_total, int* h_total, int sg, int select_so_far, cudaStream_t stream);

  // void call_probe_group_by_GPU(QueryParams* params, int** &off_col, int* h_total, int sg, cudaStream_t stream);

  void call_probe_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, cudaStream_t* stream);

  // void call_pfilter_GPU(QueryParams* params, int** &off_col, int* &d_total, int* h_total, int sg, int select_so_far, cudaStream_t stream);

  // void call_group_by_GPU(QueryParams* params, int** &off_col, int* h_total, int sg, cudaStream_t stream);

  // void call_aggregation_GPU(QueryParams* params, int* &off_col, int* h_total, int sg, cudaStream_t stream);

  // void call_probe_aggr_GPU(QueryParams* params, int** &off_col, int* h_total, int sg, cudaStream_t stream);

  // void call_pfilter_probe_aggr_GPU(QueryParams* params, int** &off_col, int* h_total, int sg, int select_so_far, cudaStream_t stream);


  // void call_bfilter_build_GPU(QueryParams* params, int* &d_off_col, int* h_total, int sg, int table, cudaStream_t stream);

  // void call_build_GPU(QueryParams* params, int* &d_off_col, int* h_total, int sg, int table, cudaStream_t stream);

  // void call_bfilter_GPU(QueryParams* params, int* &d_off_col, int* &d_total, int* h_total, int sg, int table, cudaStream_t stream);

  void call_operator_fact_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int select_so_far, int latemat, cudaStream_t* stream);

  void call_operator_fact_pipeline_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int select_so_far, int latemat, cudaStream_t* stream);

  void call_filter_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int table, int select_so_far, cudaStream_t* stream);

  void call_filter_partition_CPU(QueryParams* params, int*** h_off_col_part, int** h_total, int sg, int table);

  void call_operator_dim_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int table, bool filter_on_gpu, bool broadcast, bool already_partitioned, cudaStream_t* stream);

  void call_pfilter_probe_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far);

  void call_probe_group_by_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg);

  void call_probe_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg);

  void call_pfilter_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far);

  void call_group_by_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg);

  void call_aggregation_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg);

  void call_probe_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg);

  void call_pfilter_probe_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far);


  void call_bfilter_build_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table);

  void call_build_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table);

  void call_bfilter_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table);

  void call_pfilter_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far);

};

#endif





// void 
// CPUGPUProcessing::call_probe_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, cudaStream_t* stream) {

//   int _min_key[4] = {0}, _dim_len[4] = {0};
//   float output_selectivity = 1.0;
//   bool first_shuffle = true;

//   if(qo->joinGPUPipelineCol[sg].size() == 0) return;

//   for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//     CubDebugExit(cudaSetDevice(gpu));
//     CubDebugExit(cudaMemsetAsync(d_total[gpu], 0, sizeof(int), stream[gpu]));
//   }
//   CubDebugExit(cudaSetDevice(0));
  
//   int num_join = qo->joinGPUPipelineCol[sg].size();

//   for (int join = 0; join < num_join; join++) {

//     ColumnInfo* column = qo->joinGPUPipelineCol[sg][join];
//     int table_id = qo->fkey_pkey[column]->table_id;
//     ColumnInfo* pkey = qo->fkey_pkey[column];

//     for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//       cm->indexTransfer(col_idx[gpu], column, stream[gpu], custom, gpu);
//       assert(col_idx[gpu][column->column_id] != NULL);
//       cpu_to_gpu[sg] += (column->total_segment * sizeof(int));
//     }

//     _min_key[table_id - 1] = params->min_key[pkey];
//     _dim_len[table_id - 1] = params->dim_len[pkey];
//     output_selectivity *= params->selectivity[column];
//   }

//   //these four structs are helper for shuffling
//   int*** temp_col = new int**[NUM_GPU]();
//   for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//     temp_col[gpu] = new int*[NUM_COLUMN]();
//   }

//   int** result_count = new int*[NUM_GPU]();
//   for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//     result_count[gpu] = new int[NUM_PARTITION]();
//   }

//   //offset output of local join
//   int*** join_result_off = new int**[NUM_GPU]();
//   for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//     join_result_off[gpu] = new int*[NUM_TABLE]();
//   }

//   //output of partitioning (column)
//   int**** out_shuffle_col = new int***[NUM_GPU]();
//   for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//     out_shuffle_col[gpu] = new int**[NUM_COLUMN]();
//   }

//   //output of partitioning (offset)
//   int**** out_shuffle_off = new int***[NUM_GPU]();
//   for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//     out_shuffle_off[gpu] = new int**[NUM_TABLE]();
//   }

//   shuffleHelper* shelper = new shuffleHelper{
//     result_count, temp_col, join_result_off, out_shuffle_col, out_shuffle_off
//   };
  
//   vector<tuple<ColumnInfo*, ColumnInfo*>> pipeline_column;

//   for (int join = 0; join < num_join; join++) {

//     ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join];
//     int table_id = qo->fkey_pkey[fkey]->table_id;
//     ColumnInfo* pkey = qo->fkey_pkey[fkey];

//     int next_table_id = -1; ColumnInfo* next_fkey = NULL;
//     if (join != num_join - 1) {
//       next_fkey = qo->joinGPUPipelineCol[sg][join+1];
//       next_table_id = qo->fkey_pkey[next_fkey]->table_id;
//     }

//     // if current join need shuffling
//     if (!params->ht_replicated[table_id]) {

//       assert(NUM_GPU == NUM_PARTITION);
//       for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//         //do shuffling in each GPU

//         filterArgsGPU* fargs = new filterArgsGPU();
//         probeArgsGPU* pargs = new probeArgsGPU();
//         groupbyArgsGPU* gargs = new groupbyArgsGPU();
//         buildArgsGPU* bargs = new buildArgsGPU();

//         int** d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
//         int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
//         int** d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
//         int** d_local_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);

//         CubDebugExit(cudaSetDevice(gpu));
//         CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
//         CubDebugExit(cudaMemcpyAsync(d_col_idx, col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); 
//         if (off_col[gpu] != NULL) {
//           CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
//         } else { 
//           d_in_off=NULL;
//         }
//         CubDebugExit(cudaMemcpyAsync(d_local_off, join_result_off[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
//         CubDebugExit(cudaSetDevice(0));

//         shuffleArgsGPU* sargs = new shuffleArgsGPU{
//             d_temp_col, d_col_idx, d_in_off, d_local_off, fkey->column_id, fkey->table->table_id
//         };
        
//         KernelParams* kparams = new KernelParams(fargs, pargs, bargs, gargs, sargs, shelper, d_total, h_total);
//         KernelLaunch* kernellaunch = new KernelLaunch(cm, kparams, sg, gpu, 0, table_id, output_selectivity, stream[gpu]);
//         kernellaunch->preparePartitioning(off_col, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, first_shuffle);
//         kernellaunch->launchPartitioning();
//         kernellaunch->clearPartitioning();
        
//       }

//       shuffleData(off_col, shelper, first_shuffle, stream);
//       first_shuffle = false;

//     }

//     // if next join need shuffling (current join cannot be pipelined with next join)
//     if (!params->ht_replicated[next_table_id] || (join + 1) == num_join) {
//       //do join (either pipelined or individual)
//       pipeline_column.push_back(make_tuple(fkey, pkey));

//       int *ht[NUM_GPU][4] = {}, *fkey_idx[NUM_GPU][4] = {}; //initialize it to null

//       for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) {
//             ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]);
//             ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]);

//             for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//                 fkey_idx[gpu][pkey_pipeline->table_id -1] = col_idx[gpu][fkey_pipeline->column_id];
//                 ht[gpu][pkey_pipeline->table_id - 1] = params->ht_GPU[gpu][pkey_pipeline];
//             }

//       }

//       //initialize kernel launch object
//       for (int gpu = 0; gpu < NUM_GPU; gpu++) {

//             probeArgsGPU* pargs = new probeArgsGPU{ //initialize pargs for this gpu
//                   fkey_idx[gpu][0], fkey_idx[gpu][1], fkey_idx[gpu][2], fkey_idx[gpu][3],
//                   ht[gpu][0], ht[gpu][1], ht[gpu][2], ht[gpu][3], 
//                   _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
//                   _min_key[0], _min_key[1], _min_key[2], _min_key[3]
//                 };

//             filterArgsGPU* fargs = new filterArgsGPU();
//             groupbyArgsGPU* gargs = new groupbyArgsGPU();
//             shuffleArgsGPU* sargs = new shuffleArgsGPU();
//             buildArgsGPU* bargs = new buildArgsGPU();

//             KernelParams* kparams = new KernelParams(fargs, pargs, bargs, gargs, sargs, shelper, d_total, h_total);
//             KernelLaunch* kernellaunch = new KernelLaunch(cm, kparams, sg, gpu, 0, table_id, output_selectivity, stream[gpu]);
//             kernellaunch->prepareKernelFact(off_col, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck);
//             kernellaunch->launchKernel();
//             kernellaunch->clearKernel(off_col);
//       }

//       pipeline_column.clear();
//     // if next join does not need shuffling (current join can be pipelined with next join)
//     } else {
//       pipeline_column.push_back({fkey, pkey});
//     }

//   }

// }

// void 
// CPUGPUProcessing::switch_device_fact(int*** &off_col, int** &h_off_col_all, int** &d_total, int** h_total, int sg, int mode, int table, cudaStream_t* stream) {
//   float time;
//   SETUP_TIMING();

//   if (mode == 0) {
//     // if (h_off_col == NULL) return;
//     // assert(h_off_col != NULL);
//     // // assert(*h_total > 0); // DONT BE SURPRISED IF WE REACHED THIS FOR 19980401-19980430 PREDICATES CAUSE THE RESULT IS 0
//     // assert(h_off_col[0] != NULL);
//     // off_col = new int*[cm->TOT_TABLE]();

//     // CubDebugExit(cudaMemcpyAsync(d_total, h_total, sizeof(int), cudaMemcpyHostToDevice, stream));
//     // CubDebugExit(cudaStreamSynchronize(stream));
//     // cpu_to_gpu[sg] += (1 * sizeof(int));

//     // cudaEventRecord(start, 0);

//     // for (int i = 0; i < cm->TOT_TABLE; i++) {
//     //   if (h_off_col[i] != NULL) {
//     //     if (!custom) CubDebugExit(cudaMalloc((void**) &off_col[i], *h_total * sizeof(int)));
//     //     if (custom) off_col[i] = (int*) cm->customCudaMalloc<int>(*h_total);
//     //   }
//     // }

//     assert(0);
//   } else {
//     if (off_col == NULL) return;
//     assert(off_col != NULL);
//     for (int gpu = 0; gpu < NUM_GPU; gpu++) assert(off_col[gpu][0] != NULL);

//     h_off_col_all = new int*[cm->TOT_TABLE]();
//     gpu_to_cpu[sg] += (1 * sizeof(int));

//     cudaEventRecord(start, 0);

//     //THIS WOULD NOT WORK BECAUSE *h_total_all HAS TO BE RESET FOR EACH TABLE
//     for (int table = 0; table < NUM_TABLE; table++) {
//       int h_total_all = 0;
//       for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//         if (off_col[gpu][table] != NULL) {
//           assert(*(h_total[gpu]) > 0);
//           h_total_all += *(h_total[gpu]);
//         }
//       }
//       if (h_total_all > 0) 
//         h_off_col_all[table] = (int*) cm->customCudaHostAlloc<int>(h_total_all);
//       else
//         h_off_col_all[table] = NULL;
//     }
//   }

//   cudaEventRecord(stop, 0);
//   cudaEventSynchronize(stop);
//   cudaEventElapsedTime(&time, start, stop);
//   malloc_time[sg] += time;
//   // cout << "sg: " << sg << " transfer malloc time: " << malloc_time[sg]<< endl;
  
//   if (verbose) cout << "Transfer size: " << *h_total << " sg: " << sg << endl;

//   cudaEventRecord(start, 0);

//   if (mode == 0) { //CPU to GPU
//     // for (int i = 0; i < cm->TOT_TABLE; i++) {
//     //   if (h_off_col[i] != NULL) {
//     //     // if (custom) off_col[i] = (int*) cm->customCudaMalloc<int>(*h_total);
//     //     CubDebugExit(cudaMemcpyAsync(off_col[i], h_off_col[i], *h_total * sizeof(int), cudaMemcpyHostToDevice, stream));
//     //     CubDebugExit(cudaStreamSynchronize(stream));
//     //     if (!custom) cudaFreeHost(h_off_col[i]);
//     //     cpu_to_gpu[sg] += (*h_total * sizeof(int));
//     //   }
//     // }
//     // CubDebugExit(cudaStreamSynchronize(stream));
//     assert(0);
//   } else { // GPU to CPU

//     for (int table = 0; table < NUM_TABLE; table++) {
//       int temp = 0;
//       for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//         if (off_col[gpu][table] != NULL) {
//           assert(h_off_col_all[table] != NULL);
//           assert(*(h_total[gpu]) > 0);
//           CubDebugExit(cudaSetDevice(gpu));
//           CubDebugExit(cudaMemcpyAsync(h_off_col_all[table] + temp, off_col[gpu][table], *(h_total[gpu]) * sizeof(int), cudaMemcpyDeviceToHost, stream[gpu]));
//           CubDebugExit(cudaStreamSynchronize(stream[gpu]));
//           gpu_to_cpu[sg] += (*(h_total[gpu]) * sizeof(int));
//           temp += *(h_total[gpu]);
//         }
//       }
//       CubDebugExit(cudaSetDevice(0));
//     }

//   }

//   cudaEventRecord(stop, 0);
//   cudaEventSynchronize(stop);
//   cudaEventElapsedTime(&time, start, stop);
//   if (verbose) cout << "Transfer Time: " << time << " sg: " << sg << endl;
//   transfer_time[sg] += time;
  
// }

// void 
// CPUGPUProcessing::switch_device_dim(int** &off_col, int* &h_off_col_all, int** &d_total, int** h_total, int sg, int mode, int table, cudaStream_t* stream) {

//   float time;
//   SETUP_TIMING();
//   cudaEventRecord(start, 0);

//   if (mode == 0) {
//     // if (h_off_col == NULL) return;
//     // assert(h_off_col != NULL);
//     // assert(*h_total > 0);

//     // CubDebugExit(cudaMemcpyAsync(d_total, h_total, sizeof(int), cudaMemcpyHostToDevice, stream));
//     // CubDebugExit(cudaStreamSynchronize(stream));
//     // cpu_to_gpu[sg] += (1 * sizeof(int));

//     // if (!custom) CubDebugExit(cudaMalloc((void**) &d_off_col, *h_total * sizeof(int)));
//     // if (custom) d_off_col = (int*) cm->customCudaMalloc<int>(*h_total);
//     assert(0);
//   } else {
//     if (off_col == NULL) return;
//     assert(off_col != NULL);
//     for (int gpu = 0; gpu < NUM_GPU; gpu++) assert(off_col[gpu] != NULL);

//     int h_total_all = 0;
//     for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//       if (off_col[gpu] != NULL) {
//         assert(*(h_total[gpu]) > 0);
//         h_total_all += *(h_total[gpu]);
//       }
//     }
//     if (h_total_all > 0)
//       h_off_col_all = (int*) cm->customCudaHostAlloc<int>(h_total_all);
//     else
//       h_off_col_all = NULL;
//   }

//   cudaEventRecord(stop, 0);
//   cudaEventSynchronize(stop);
//   cudaEventElapsedTime(&time, start, stop);
//   malloc_time[sg] += time;

//   if (verbose) cout << "Transfer size: " << *h_total << " sg: " << sg << endl;

//   cudaEventRecord(start, 0);

//   if (mode == 0) { //CPU to GPU
//     // if (custom) d_off_col = (int*) cm->customCudaMalloc<int>(*h_total);

//     // if (h_off_col != NULL) {
//     //   CubDebugExit(cudaMemcpyAsync(d_off_col, h_off_col, *h_total * sizeof(int), cudaMemcpyHostToDevice, stream));
//     //   CubDebugExit(cudaStreamSynchronize(stream));
//     //   cpu_to_gpu[sg] += (*h_total * sizeof(int));
//     //   // if (!custom) cudaFreeHost(h_off_col); //TODO: UNCOMMENTING THIS WILL CAUSE SEGFAULT BECAUSE THERE IS A POSSIBILITY OF
//     //   //FILTER CPU -> SWITCH -> BUILD GPU -> BUILD CPU (H_OFF_COL WILL BE USED AGAIN IN BUILD CPU)
//     // } else d_off_col = NULL;

//     // CubDebugExit(cudaStreamSynchronize(stream));

//     assert(0);

//   } else { // GPU to CPU
//     assert(h_off_col_all != NULL);

//     for (int gpu = 0; gpu < NUM_GPU; gpu++) {
//       int temp = 0;
//       if (off_col[gpu] != NULL) {
//         assert(*(h_total[gpu]) > 0);
//         CubDebugExit(cudaSetDevice(gpu));
//         CubDebugExit(cudaMemcpyAsync(h_off_col_all + temp, off_col[gpu], *(h_total[gpu]) * sizeof(int), cudaMemcpyDeviceToHost, stream[gpu]));
//         CubDebugExit(cudaStreamSynchronize(stream[gpu]));
//         gpu_to_cpu[sg] += (*(h_total[gpu]) * sizeof(int));
//         temp += *(h_total[gpu]);
//       }
//     }
//     CubDebugExit(cudaSetDevice(0));
//   }

//   cudaEventRecord(stop, 0);
//   cudaEventSynchronize(stop);
//   cudaEventElapsedTime(&time, start, stop);
//   if (verbose) cout << "Transfer Time: " << time << " sg: " << sg << endl;
//   transfer_time[sg] += time;
  
// }


  // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
  //   assert(params->ht_GPU[gpu][key_column] != NULL);
  //   if (params->ht_GPU[gpu][key_column] != NULL) {

  //     filterArgsGPU* fargs;

  //     if (filter_idx[gpu]) {
  //       fargs = new filterArgsGPU{
  //         filter_idx[gpu], NULL,
  //         params->compare1[filter_col], params->compare2[filter_col], 0, 0,
  //         params->mode[filter_col], 0,
  //         NULL, NULL
  //       };
  //     } else {
  //       fargs = new filterArgsGPU{
  //         NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL
  //       };        
  //     }
  
  //     // buildArgsGPU* bargs = new buildArgsGPU{
  //     //   dimkey_idx[gpu], group_idx[gpu],
  //     //   params->dim_len[key_column], params->min_key[key_column]
  //     // };

  //     //WE WILL ALWAYS SET GROUPBY IDX TO NULL FOR NOW
  //     buildArgsGPU* bargs = new buildArgsGPU{
  //       dimkey_idx[gpu], NULL,
  //       params->dim_len[key_column], params->min_key[key_column]
  //     };

  //     probeArgsGPU* pargs = new probeArgsGPU();
  //     groupbyArgsGPU* gargs = new groupbyArgsGPU();
  //     shuffleArgsGPU* sargs = new shuffleArgsGPU();
  //     shuffleHelper* shelper = new shuffleHelper();

  //     KernelParams* kparams = new KernelParams(fargs, pargs, bargs, gargs, sargs, shelper, NULL, h_total);
  //     KernelLaunch* kernellaunch = new KernelLaunch(cm, kparams, params, sg, gpu, 3, table, 1, stream[gpu]);
  //     kernellaunch->prepareKernelDim(d_off_col, key_column, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, false);
  //     kernellaunch->launchKernel(false);
  //     //clear kernel is not called for dim table
  //   }

  // }

  // //range partition with local offset (intermediate result of join after the first shuffle) but you have to materialize the global offset for the build table
// template<int BLOCK_THREADS, int ITEMS_PER_THREAD, int NUM_RANGE>
// __global__ void RangePartitioningKeyValue3(int* gpuCache,
//   struct shuffleArgsGPU sargs, struct shuffleOutGPU sout, int num_tuples,
//   int write_val = 1, int write_offset = 0) {

//   __const__ int column_map[NUM_COLUMN] = {
//     0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
//     2, 2, 2, 2,
//     1, 1, 1, 1,
//     3, 3, 3, 3,
//     4, 4, 4
//   };

//   int item_off[NUM_TABLE][ITEMS_PER_THREAD];
//   int item_key[ITEMS_PER_THREAD];
//   int item_val[ITEMS_PER_THREAD];
//   int selection_flags[ITEMS_PER_THREAD];
//   int index[ITEMS_PER_THREAD];

//   int tile_size = BLOCK_THREADS * ITEMS_PER_THREAD;
//   int tile_offset = blockIdx.x * tile_size;

//   int num_tiles = (num_tuples + tile_size - 1) / tile_size;
//   int num_tile_items = tile_size;
//   if (blockIdx.x == num_tiles - 1) {
//     num_tile_items = num_tuples - tile_offset;
//   }

//   //this range_size would not work for date column
//   int range_size = (sargs.max_key + 1 - sargs.min_key)/NUM_RANGE; //have to figure out how to make the range size correct depending on the num partition (e.g 10 data 3 part -> 3 3 4)

//   __shared__ int smem_index[NUM_RANGE];

//   __syncthreads();

//   if (threadIdx.x < NUM_RANGE) {
//     smem_index[threadIdx.x] = 0;
//   }

//   __syncthreads();

//   #pragma unroll
//   for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
//     index[ITEM] = 0;
//   }

//   InitFlags<BLOCK_THREADS, ITEMS_PER_THREAD>(selection_flags);

//   cudaAssert(sargs.column[sargs.key_column] != NULL);
//   //loading local offset
//   int* ptr = sargs.local_off[sargs.table];
//   BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[sargs.table], num_tile_items);
//   BlockReadOffsetAndCountIndexPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[sargs.table], sargs.column[sargs.key_column], item_key, index, smem_index, num_tile_items, range_size);

//   __syncthreads();

//   if (threadIdx.x < NUM_RANGE) {
//     smem_index[threadIdx.x] = atomicAdd(&(sout.pos[threadIdx.x]), smem_index[threadIdx.x]);
//   }

//   __syncthreads();

//   #pragma unroll
//   for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) {
//     if (threadIdx.x + (ITEM * BLOCK_THREADS) < num_tile_items) {
//       int partition = item_key[ITEM]/range_size; //if range size is wrong, there could be an extra partition
//       if (partition >= NUM_RANGE) partition = NUM_RANGE-1;
//       index[ITEM] += smem_index[partition];
//     }
//   }

//   __syncthreads();

//   BlockWritePartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_key, index, sout.column[sargs.key_column], num_tile_items, range_size, 0, 0);

//   //read the local offset (or global offset if the table is the build table)
//   for (int table = 0; table < NUM_TABLE; table++) {
//     if (sargs.table != table && sargs.local_off[table] != NULL) {
//       cudaAssert(sargs.local_off[table] != NULL);
//       ptr = sargs.local_off[table];
//       BlockLoadCrystal<int, BLOCK_THREADS, ITEMS_PER_THREAD>(ptr + tile_offset, item_off[table], num_tile_items);
//     }
//   }

//   if (write_val) {
//     //scatter the value
//     cudaAssert(sargs.col_idx != NULL);
//     //use the local offset to scatter the value
//     //TODO: we have to support the case where the offset for dimension table is a global offset
//     for (int col = 0; col < NUM_COLUMN; col++) {
//       int table = column_map[col];
//       // if (col == 11 || col == 15) printf("%d %d\n", sargs.local_off[table] != NULL, sargs.col_idx[col] != NULL);
//         //materializing column using local offset for table 0 from current join (we assume that all input table (joined) is now called table 0)
//       //TODO: THIS PART DOESN'T WORK BECAUSE THE CODE ALWAYS GO TO THE FIRST BRANCH (sargs.column[col] != NULL for every col involved in the query)
//       if (sargs.column[col] != NULL && sargs.key_column != col) { 
//         cudaAssert(sargs.column[col] != NULL);
//         cudaAssert(sout.column[col] != NULL);
//         // if (col == 11 || col == 15) printf("im here 1\n");
//         BlockReadOffsetShuffle<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], sargs.column[col], item_val, num_tile_items);
//         BlockWriteValPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, sout.column[col], num_tile_items, range_size);
//         //for now sargs.col_idx[col] should be NULL except for groupby column in SSB in dimension table
//         //sargs.local_off[table] here is global offset from dimension table
//         //materializing column using local offset from current join
//       } else if (sargs.local_off[table] != NULL && sargs.col_idx[col] != NULL && sargs.table != table && sargs.key_column != col) { //materializing the global offset
//         cudaAssert(table != 0);
//         cudaAssert(sargs.col_idx[col] != NULL);
//         cudaAssert(sargs.broadcast_idx[col] != NULL);
//         cudaAssert(sout.column[col] != NULL);
//         // if (col == 11 || col == 15) printf("im here 2\n");
//         BlockReadOffsetGPU3<BLOCK_THREADS, ITEMS_PER_THREAD>(threadIdx.x, item_off[table], item_val, selection_flags, gpuCache, sargs.col_idx[col], sargs.broadcast_idx[col], num_tile_items);
//         BlockWriteValPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, sout.column[col], num_tile_items, range_size); 
//       }
//     }
//   }

//   if (write_offset) {
//     //scatter the offset (global offset and not the local offset), local offset would be useless once the shuffle is done
//     for (int table = 0; table < NUM_TABLE; table++) {
//       cudaAssert(sargs.in_off != NULL);
//         //materializing global offset (fact) using local offset table 0 from current join
//       if (sargs.in_off[table] != NULL && sargs.table == 0) {
//         cudaAssert(sargs.in_off[table] != NULL);
//         cudaAssert(sout.out_off[table] != NULL);
//         cudaAssert(sargs.local_off[0] != NULL);
//         BlockReadOffsetShuffle<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], sargs.in_off[table], item_val, num_tile_items);
//         BlockWriteValPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, sout.out_off[table], num_tile_items, range_size);
//         //materializing global offset (dim) from previous join using local offset table 0 from current join
//       //TODO: THIS PART DOESN'T WORK BECAUSE sargs.local_off[table] != NULL for every table involved in the query (WE CANNOT DISTINGUISH BETWEEN CURRENT JOIN AND EVERY OTHER JON)
//       } else if (sargs.in_off[table] != NULL && sargs.local_off[table] == NULL && table > 0) {
//         cudaAssert(sargs.in_off[table] != NULL);
//         cudaAssert(sout.out_off[table] != NULL);
//         BlockReadOffsetShuffle<BLOCK_THREADS, ITEMS_PER_THREAD>(item_off[0], sargs.in_off[table], item_val, num_tile_items);
//         BlockWriteValPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_val, item_key, index, sout.out_off[table], num_tile_items, range_size);
//         //writing global offset (dim) from current jioin
//       } else if (sargs.in_off[table] == NULL && sargs.local_off[table] != NULL && table > 0) {
//         BlockWriteValPartition<BLOCK_THREADS, ITEMS_PER_THREAD, NUM_RANGE>(item_off[table], item_key, index, sout.out_off[table], num_tile_items, range_size);
//       }
//     }
//   }

// }