Lancelot / src / gpudb / MultiGPUProcessing.cu
MultiGPUProcessing.cu
Raw
#include "MultiGPUProcessing.h"
#include "CacheManager.h"
#include "KernelLaunch.h"
#include "QueryOptimizer.h"

  //naive approach
  //launch shuffling kernel in each gpu
  //for each gpu
  //check each receiving buffer if ready to launch kernel
  //if ready then transfer and launch kernel in the destination gpu

  //have a cpu thread for each gpu checking 
    //if the input buffer is full
    //if there is available output buffer
    //if both are satisfied, launch a kernel for the particular input buffer
  //have a cpu thread for each gpu checking 
    //if the output buffer is full
    //if there is available input buffer on destination gpu
    //if both are satisfied, memcpy the buffer
  //extension:
    //have separate buffer for each kernel type (easy)
    //have a condensed and global output buffer for all kernel (kernel has to write the output intelligently to the correct buffer)
    //have a single kernel reading from multiple buffer (if available)
    //multiple kernel from the same and different operator can run at the same time (each buffer has separate stream)
    //buffer full mechanism (tricks: if there are conservatively not enough output buffer, then don't launch the kernel)

CPUGPUProcessing::CPUGPUProcessing(size_t _cache_size, size_t _broadcast_size, size_t _processing_size, size_t _pinned_memsize, ncclComm_t* _comms, 
      bool _verbose, bool _skipping, bool _reorder) {

  skipping = _skipping;
  reorder = _reorder;
  custom = true;

  if (custom) qo = new QueryOptimizer(_cache_size, _broadcast_size, _processing_size, _pinned_memsize, this);
  else qo = new QueryOptimizer(_cache_size, 0, 0, 0, this);
  cm = qo->cm;
  begin_time = chrono::high_resolution_clock::now();

  col_idx = new int**[NUM_GPU];
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    col_idx[gpu] = new int*[cm->TOT_COLUMN]();
  }

  all_col_idx = new int***[NUM_GPU];
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    all_col_idx[gpu] = new int**[NUM_GPU];
    for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
      all_col_idx[gpu][gpu_iter] = new int*[NUM_COLUMN]();
      for (int column = 0; column < NUM_COLUMN; column++) {
        CubDebugExit(cudaSetDevice(gpu));
        //cannot do custom malloc since custom malloc will be reset after every query and this metadata persists across queries
        CubDebugExit(cudaMalloc((void**) &all_col_idx[gpu][gpu_iter][column], cm->allColumn[column]->total_segment * sizeof(int)));
        CubDebugExit(cudaSetDevice(0));
      }
    }
  }

  seg_row_to_gpu = new int**[NUM_GPU];
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    seg_row_to_gpu[gpu] = new int*[NUM_TABLE]();
    for (int table = 0; table < NUM_TABLE; table++) {
      CubDebugExit(cudaSetDevice(gpu));
      //cannot do custom malloc since custom malloc will be reset after every query and this metadata persists across queries
      CubDebugExit(cudaMalloc((void**) &seg_row_to_gpu[gpu][table], cm->allTable[table]->total_segment * sizeof(int)));
      CubDebugExit(cudaSetDevice(0));
    }
  }

  verbose = _verbose;
  cpu_time = new double[MAX_GROUPS];
  gpu_time = new double[MAX_GROUPS];

  nvlink_bytes = new double[MAX_GROUPS];
  shuffle_time = new double[MAX_GROUPS];

  cpu_to_gpu = new unsigned long long[MAX_GROUPS];
  gpu_to_cpu = new unsigned long long[MAX_GROUPS];

  comms = _comms;

  //THIS IS USED FOR BROADCAST
  broadcast_idx = new int**[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    broadcast_idx[gpu] = new int*[cm->TOT_COLUMN]();
    for (int col = 0; col < NUM_COLUMN; col++) {
      broadcast_idx[gpu][col] = (int*) malloc (cm->allColumn[col]->total_segment * sizeof(int));
      memset(broadcast_idx[gpu][col], -1, cm->allColumn[col]->total_segment * sizeof(int));
    }
  }

  sg_broadcast_count = 0;

  resetTime();
}

void
CPUGPUProcessing::switchDeviceGPUtoCPU(int* h_total_all, int** h_total, int*** off_col, int** h_off_col, int sg, cudaStream_t* stream) {
  assert(h_total != NULL);
  assert(h_total_all != NULL);
  assert(off_col != NULL);
  assert(h_off_col != NULL);
  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);
  
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    assert(off_col[gpu] != NULL);
    // assert(*(h_total[gpu]) > 0);
    *(h_total_all) += *(h_total[gpu]);
  }

  if (*(h_total_all) == 0) {
    return;
  }

  for (int table = 0; table < NUM_TABLE; table++) {
      if (off_col[0][table] != NULL) {
        h_off_col[table] = (int*) cm->customCudaHostAlloc<int>(*h_total_all);
      }
  }

  assert(off_col != NULL);
  for (int table = 0; table < NUM_TABLE; table++) {
    int temp = 0;
    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      assert(off_col[gpu] != NULL);
      if (off_col[gpu][table] != NULL) {
        assert(h_off_col[table] != NULL);
        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaMemcpyAsync(h_off_col[table] + temp, off_col[gpu][table], *(h_total[gpu]) * sizeof(int), cudaMemcpyDeviceToHost, stream[gpu]));
        CubDebugExit(cudaSetDevice(0));
        temp += *(h_total[gpu]);
      }
    }
    if (off_col[0][table] != NULL) assert(temp == *h_total_all);
  }

  cout << *h_total_all << endl;

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(stream[gpu]);
    CubDebugExit(cudaSetDevice(0));
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  if (verbose) cout << "Transferred data from GPU to CPU " << time << endl;
}

void
CPUGPUProcessing::switchDeviceCPUtoGPUScatter(int** h_total, int*** off_col, int*** h_off_col_part, int sg, cudaStream_t* stream) {

  assert(h_total != NULL);
  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  int h_total_all = 0;
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    h_total_all += *(h_total[gpu]);
  }
  if (h_total_all == 0) return;

  assert(off_col != NULL);
  assert(h_off_col_part != NULL);

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    assert(h_off_col_part[gpu] != NULL);
    assert(off_col[gpu] != NULL);
    for (int table = 0; table < NUM_TABLE; table++) {
      if (h_off_col_part[gpu][table] != NULL) {
        if (*(h_total[gpu]) > 0) off_col[gpu][table] = (int*) cm->customCudaMalloc<int>(*(h_total[gpu]), gpu);
      }
    }
  }

  for (int table = 0; table < NUM_TABLE; table++) {
    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      if (off_col[gpu][table] != NULL) {
        assert(h_off_col_part[gpu][table] != NULL);
        CubDebugExit(cudaSetDevice(gpu));
        if (*(h_total[gpu]) > 0) CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], h_off_col_part[gpu][table], *(h_total[gpu]) * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
        CubDebugExit(cudaSetDevice(0));
      }
    }
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(stream[gpu]);
    CubDebugExit(cudaSetDevice(0));
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  if (verbose) cout << "Transferred data from CPU to GPU " << time << endl;
}

void
CPUGPUProcessing::switchDeviceCPUtoGPUBroadcast(int* h_total_all, int** h_total, int*** off_col, int** h_off_col, int sg, cudaStream_t* stream) {

  assert(h_total_all != NULL);
  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  if (*(h_total_all) == 0) return;

  assert(off_col != NULL);
  assert(h_off_col != NULL);

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    assert(off_col[gpu] != NULL);
    for (int table = 0; table < NUM_TABLE; table++) {
      if (h_off_col[table] != NULL) {
        off_col[gpu][table] = (int*) cm->customCudaMalloc<int>(*(h_total_all), gpu);
      }
    }
  }

  // for (int table = 0; table < NUM_TABLE; table++) {
  //   for (int gpu = 0; gpu < NUM_GPU; gpu++) {
  //     if (off_col[gpu][table] != NULL) {
  //       assert(h_off_col[table] != NULL);
  //       CubDebugExit(cudaSetDevice(gpu));
  //       CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], h_off_col[table], *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
  //       CubDebugExit(cudaSetDevice(0));
  //     }
  //   }
  // }

  int lead_gpu = 0;
  for (int table = 0; table < NUM_TABLE; table++) {
    CubDebugExit(cudaSetDevice(lead_gpu));
    CubDebugExit(cudaMemcpyAsync(off_col[lead_gpu][table], h_off_col, *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[lead_gpu]));
    CubDebugExit(cudaSetDevice(0));
  }
  CubDebugExit(cudaSetDevice(lead_gpu));
  CHECK_ERROR_STREAM(stream[lead_gpu]);
  CubDebugExit(cudaSetDevice(0));
  for (int table = 0; table < NUM_TABLE; table++) {
    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      if (gpu != lead_gpu && off_col[gpu][table] != NULL) {
        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], off_col[lead_gpu][table], *(h_total_all) * sizeof(int), cudaMemcpyDeviceToDevice, stream[gpu]));
        nvlink_bytes[sg] += *(h_total_all);
        CubDebugExit(cudaSetDevice(0));
      }
    }
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(stream[gpu]);
    CubDebugExit(cudaSetDevice(0));
    *(h_total[gpu]) = *(h_total_all);
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  if (verbose) cout << "Transferred data from CPU to GPU " << time << endl;
}

void
CPUGPUProcessing::switchDeviceCPUtoGPUBroadcastDim(int* h_total_all, int** h_total, int*** off_col, int* h_off_col, int table, int sg, cudaStream_t* stream) {
  assert(h_total_all != NULL);
  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  if (*(h_total_all) == 0) return;

  assert(off_col != NULL);
  assert(h_off_col != NULL);

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    assert(off_col[gpu] != NULL);
    off_col[gpu][table] = (int*) cm->customCudaMalloc<int>(*(h_total_all), gpu);
  }

  // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
  //   if (off_col[gpu][table] != NULL) {
  //     CubDebugExit(cudaSetDevice(gpu));
  //     CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], h_off_col, *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
  //     CubDebugExit(cudaSetDevice(0));
  //   }
  // }

  int lead_gpu = 0;
  CubDebugExit(cudaSetDevice(lead_gpu));
  CubDebugExit(cudaMemcpyAsync(off_col[lead_gpu][table], h_off_col, *(h_total_all) * sizeof(int), cudaMemcpyHostToDevice, stream[lead_gpu]));
  CHECK_ERROR_STREAM(stream[lead_gpu]);
  CubDebugExit(cudaSetDevice(0));
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    if (gpu != lead_gpu && off_col[gpu][table] != NULL) {
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaMemcpyAsync(off_col[gpu][table], off_col[lead_gpu][table], *(h_total_all) * sizeof(int), cudaMemcpyDeviceToDevice, stream[gpu]));
      nvlink_bytes[sg] += *(h_total_all);
      CubDebugExit(cudaSetDevice(0));
    }
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(stream[gpu]);
    CubDebugExit(cudaSetDevice(0));
    *(h_total[gpu]) = *(h_total_all);
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  if (verbose) cout << "Transferred data from CPU to GPU " << time << endl;
}

void
CPUGPUProcessing::call_filter_partition_CPU(QueryParams* params, int*** h_off_col_part, int** h_total, int sg, int table) {

  ColumnInfo* temp;
  assert(h_total != NULL);

  for (int i = 0; i < qo->join.size(); i++) {
    if (qo->join[i].second->table_id == table) {
      temp = qo->join[i].second; break;
    }
  }

  if (qo->select_build[temp].size() == 0) return;

  ColumnInfo* column = qo->select_build[temp][0];
  int* filter_col = column->col_ptr;

  int* output_estimate = new int[NUM_GPU]();

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    output_estimate[gpu] = qo->segment_group_each_gpu_count[gpu][table][sg] * SEGMENT_SIZE * params->selectivity[column];
  }

  SETUP_TIMING();
  float time;

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    if (output_estimate[gpu] > 0) {
      h_off_col_part[gpu][table] = (int*) cm->customCudaHostAlloc<int>(output_estimate[gpu]);
    }
  }

  cudaEventRecord(start, 0);

  int LEN;
  if (sg == qo->last_segment[table]) {
    LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE;
  } else { 
    LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE;
  }

  struct filterArgsCPU fargs = {
    filter_col, NULL,
    params->compare1[column], params->compare2[column], 0, 0,
    params->mode[column], 0, params->map_filter_func_host[column], NULL
  };

  short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment);

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    assert(h_total[gpu] != NULL);
  }
  
  filter_partition_CPU(fargs, h_off_col_part, LEN, h_total, table, cm->seg_row_to_single_gpu, 0, segment_group_ptr);

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    assert(*(h_total[gpu]) <= output_estimate[gpu]);
    assert(*(h_total[gpu]) > 0);
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Filter Partition Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;

};

void
CPUGPUProcessing::shuffleDataOpt(int*** &off_col, int** h_total,
  struct shuffleHelper* shelper, bool first_shuffle, int sg, cudaStream_t* stream) {

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);
  assert(NUM_PARTITION == NUM_GPU);

  //send to the correct GPU

  //count total partition size and update h_total for calling subsequent operator
  int* total_partition_size = new int[NUM_PARTITION]();
  for (int partition = 0; partition < NUM_PARTITION; partition++) {
    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      total_partition_size[partition] += shelper->result_count[gpu][partition];
      if (verbose) cout << "result count in gpu " << gpu << " for partition " << partition << " is " << shelper->result_count[gpu][partition] << endl;
      // assert(shelper->result_count[gpu][partition] > 0);
    }
    *(h_total[partition]) = total_partition_size[partition];
    // cout << "total partition size for partition " << partition << " is " << total_partition_size[partition] << endl;
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    for (int col = 0; col < NUM_COLUMN; col++) {
      if (shelper->out_shuffle_col[gpu][col] != NULL && shelper->temp_col[gpu][col] == NULL) {
        // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
        if (verbose) cout << "allocate temp col gpu " << gpu << " col " << col << " " << total_partition_size[gpu] * MULTIPLIER << endl;
        shelper->temp_col[gpu][col] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu);
      }
    }
    for (int table = 0; table < NUM_TABLE; table++) {
      // if (shelper->out_shuffle_off[gpu][table] == NULL) cout << "out_shuffle " << gpu << " " << table << endl;
      // if (off_col[gpu][table] != NULL) cout << "off col " << gpu << " " << table << endl;
      if (shelper->out_shuffle_off[gpu][table] != NULL && off_col[gpu][table] == NULL) {
        // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
        if (verbose) cout << "allocate temp off gpu " << gpu << " table " << table << " " << total_partition_size[gpu] * MULTIPLIER << endl;
        off_col[gpu][table] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu);
      }
    }
  }

  cudaStream_t** streamopt = new cudaStream_t*[NUM_GPU];
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    streamopt[gpu] = new cudaStream_t[NUM_PARTITION];
    CubDebugExit(cudaSetDevice(gpu));
    for (int partition = 0; partition < NUM_PARTITION; partition++) {
      CubDebugExit(cudaStreamCreate(&streamopt[gpu][partition]));
    }
  }
  CubDebugExit(cudaSetDevice(0));

  //cudamemcpy from one GPU to all the other GPU (all to all communication)
  //gpu is the destination gpu, partition is the source gpu
  for (int col = 0; col < NUM_COLUMN; col++) {
    if (shelper->out_shuffle_col[0][col] != NULL) {
      cout << "Transferring column " << col << endl;
      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        assert(shelper->temp_col[gpu][col] != NULL);
        int temp = 0;
        for (int partition = 0; partition < NUM_PARTITION; partition++) {
          if (shelper->out_shuffle_col[partition][col][gpu] != NULL){
            CubDebugExit(cudaSetDevice(gpu));
            CubDebugExit(cudaMemcpyAsync(shelper->temp_col[gpu][col] + temp, shelper->out_shuffle_col[partition][col][gpu], shelper->result_count[partition][gpu] * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[partition][gpu]));
            if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu];
            CubDebugExit(cudaSetDevice(0));
            temp += shelper->result_count[partition][gpu];
            // cout << "partition " << partition << " gpu " << gpu << " offset " << temp << endl;
          }
        }
      }
    }
  }

  for (int table = 0; table < NUM_TABLE; table++) {
    if (shelper->out_shuffle_off[0][table] != NULL) {
      cout << "Transferring offset " << table << endl;
      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        assert(off_col[gpu][table] != NULL);
        int temp = 0;
        for (int partition = 0; partition < NUM_PARTITION; partition++) {
          if (shelper->out_shuffle_off[partition][table][gpu] != NULL){
            CubDebugExit(cudaSetDevice(gpu));
            // cout << "From " << partition << " To " << gpu << " " << shelper->result_count[partition][gpu] << endl;
            CubDebugExit(cudaMemcpyAsync(off_col[gpu][table] + temp, shelper->out_shuffle_off[partition][table][gpu], shelper->result_count[partition][gpu] * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[partition][gpu]));
            if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu];
            CubDebugExit(cudaSetDevice(0));

            temp += shelper->result_count[partition][gpu];

          }
        }
      }
    }
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    for (int partition = 0; partition < NUM_PARTITION; partition++) {
      CHECK_ERROR_STREAM(streamopt[gpu][partition]);
      CubDebugExit(cudaStreamDestroy(streamopt[gpu][partition]));
    }
  }
  CubDebugExit(cudaSetDevice(0));

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Shuffle done" << endl;
}

void
CPUGPUProcessing::shuffleDataNCCL(int*** &off_col, int** h_total,
  struct shuffleHelper* shelper, bool first_shuffle, int sg, cudaStream_t* stream) {

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  assert(NUM_PARTITION == NUM_GPU);

  //send to the correct GPU
  
  int** scan_result_count = new int*[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    scan_result_count[gpu] = new int[NUM_PARTITION]();
    for (int partition = 0; partition < NUM_PARTITION; partition++) {
      if (partition == 0) scan_result_count[gpu][partition] = 0;
      else scan_result_count[gpu][partition] = scan_result_count[gpu][partition - 1] + shelper->result_count[partition - 1][gpu];
    }
  }

  //count total partition size and update h_total for calling subsequent operator
  int* total_partition_size = new int[NUM_PARTITION]();
  for (int partition = 0; partition < NUM_PARTITION; partition++) {
    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      total_partition_size[partition] += shelper->result_count[gpu][partition];
      if (verbose) cout << "result count in gpu " << gpu << " for partition " << partition << " is " << shelper->result_count[gpu][partition] << endl;
      assert(shelper->result_count[gpu][partition] > 0);
    }
    *(h_total[partition]) = total_partition_size[partition];
    // cout << "total partition size for partition " << partition << " is " << total_partition_size[partition] << endl;
  }

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      for (int col = 0; col < NUM_COLUMN; col++) {
        if (shelper->out_shuffle_col[gpu][col] != NULL && shelper->temp_col[gpu][col] == NULL) {
          // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
          shelper->temp_col[gpu][col] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu);
        }
      }
      for (int table = 0; table < NUM_TABLE; table++) {
        if (shelper->out_shuffle_off[gpu][table] != NULL && off_col[gpu][table] == NULL) {
          // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
          off_col[gpu][table] = cm->customCudaMalloc<int>((int) (total_partition_size[gpu] * MULTIPLIER), gpu);
        }
      }
    }

  ncclGroupStart();
  for (int col = 0; col < NUM_COLUMN; col++) {
    if (shelper->out_shuffle_col[0][col] != NULL) {
      if (verbose) cout << "Transferring column " << col << endl;
      //asserting pointers
      // for(int gpu = 0; gpu < NUM_GPU; gpu++) {
      //   assert(shelper->temp_col[gpu][col] != NULL);
      //   for(int partition = 0; partition < NUM_PARTITION; partition++) {
      //     assert(shelper->out_shuffle_col[gpu][col][partition] != NULL);
      //   }
      // }
      //use nccl all2all communication
      // ncclGroupStart();        
      for(int gpu = 0; gpu < NUM_GPU; gpu++) {
        for(int partition = 0; partition < NUM_PARTITION; partition++) {
            ncclSend(shelper->out_shuffle_col[gpu][col][partition], shelper->result_count[gpu][partition], ncclInt, partition, comms[gpu], stream[gpu]);
            ncclRecv(shelper->temp_col[gpu][col] + scan_result_count[gpu][partition], shelper->result_count[partition][gpu], ncclInt, partition, comms[gpu], stream[gpu]);
            if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu];
        }
      }
      // ncclGroupEnd();
    }
  }
  ncclGroupEnd();

  ncclGroupStart();
  for (int table = 0; table < NUM_TABLE; table++) {
    if (shelper->out_shuffle_off[0][table] != NULL) {
      if (verbose) cout << "Transferring offset " << table << endl;
      //asserting pointers
      // for(int gpu = 0; gpu < NUM_GPU; gpu++) {
      //   assert(off_col[gpu][table] != NULL);
      //   for(int partition = 0; partition < NUM_PARTITION; partition++) {
      //     assert(shelper->out_shuffle_off[gpu][table][partition] != NULL);
      //   }
      // }
      //use nccl all2all communication
      // ncclGroupStart();        
      for(int gpu = 0; gpu < NUM_GPU; gpu++) {
        for(int partition = 0; partition < NUM_PARTITION; partition++) {
            ncclSend(shelper->out_shuffle_off[gpu][table][partition], shelper->result_count[gpu][partition], ncclInt, partition, comms[gpu], stream[gpu]);
            ncclRecv(off_col[gpu][table] + scan_result_count[gpu][partition], shelper->result_count[partition][gpu], ncclInt, partition, comms[gpu], stream[gpu]);
            if (partition != gpu) nvlink_bytes[sg] += shelper->result_count[partition][gpu];
        }
      }
      // ncclGroupEnd();
    }
  }
  ncclGroupEnd();

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(stream[gpu]);
    CubDebugExit(cudaSetDevice(0));
  }

  // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
  //   CubDebugExit(cudaSetDevice(gpu));
  //   CubDebugExit(cudaDeviceSynchronize());
    
  // }
  // CubDebugExit(cudaSetDevice(0));
  
  // assert(0);
  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Shuffle done" << endl;
}

void
CPUGPUProcessing::broadcastIdxTransfer(int*** d_broadcast_idx, int*** used_broadcast_idx, cudaStream_t* stream){

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      d_broadcast_idx[gpu] = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
      int** tmp = new int*[NUM_COLUMN];
      for (int col = 0; col < NUM_COLUMN; col++) {
        if (used_broadcast_idx[gpu][col] != NULL && cm->allColumn[col]->table_id > 0) { //only do this for dimension table
          tmp[col] = (int*) cm->customCudaMalloc<int>(cm->allColumn[col]->total_segment, gpu);
          CubDebugExit(cudaSetDevice(gpu));
          CubDebugExit(cudaMemcpyAsync(tmp[col], used_broadcast_idx[gpu][col], cm->allColumn[col]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu])); 
          CubDebugExit(cudaSetDevice(0));
        }
      }
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaMemcpyAsync(d_broadcast_idx[gpu], tmp, NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); 
      CubDebugExit(cudaSetDevice(0));
  }

}

void
CPUGPUProcessing::broadcastSegments(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res) {

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  int* toBroadcastCount = new int[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    toBroadcastCount[gpu] = 0;
  }

  // cout << qo->segment_group_count[table][sg] << endl;

  for (int i = 0; i < qo->segment_group_count[table][sg]; i++) {
    int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i];
    //check if the segment is replicated
    //if segment is not replicated (means that it's only in one GPU)
    if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) {
      assert(cm->segment_row_to_gpu[table][seg_id].size() == 1);
      int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id];
      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        if (gpu != cur_gpu) {
          toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id;
          toBroadcastCount[gpu]++;
          if (seg_id == cm->allTable[table]->total_segment-1) {
              // cout << "here" << endl;
              if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) {
                broadcast_len[gpu] += SEGMENT_SIZE;
              } else {
                broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE;
              }
              // cout << gpu << " " << broadcast_len[gpu] << endl;
          } else {
              broadcast_len[gpu] += SEGMENT_SIZE;
              // cout << gpu << " " << broadcast_len[gpu] << endl;
          }
        }
      }
    }
  }

  assert(broadcast_col != NULL);
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    assert(broadcast_col[gpu] != NULL);
    if (toBroadcastCount[gpu] > 0) {
      for (int col = 0; col < NUM_COLUMN; col++) {
        if (used_col_idx[gpu][col] != NULL) {
          int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE;
          int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED);
          // cout << gpu << " " << cm->broadcastPointer[gpu] << endl;
          assert((tmp + size) < cm->each_broadcast_size);
          broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp;
          // cout << cm->broadcastPointer[gpu] << endl;
        }
      }
    }
  }

  cudaStream_t** streamopt = new cudaStream_t*[NUM_GPU];
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    streamopt[gpu] = new cudaStream_t[NUM_GPU];
    CubDebugExit(cudaSetDevice(gpu));
    for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
      CubDebugExit(cudaStreamCreate(&streamopt[gpu][cur_gpu]));
    }
  }
  CubDebugExit(cudaSetDevice(0));

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    if (toBroadcastCount[gpu] > 0) {
      int* src = toBroadcast[gpu];
      int* dst = d_toBroadcast[gpu];
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, streamopt[gpu][0]));
      CubDebugExit(cudaSetDevice(0));
    }
  }
  CubDebugExit(cudaSetDevice(0));

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    for (int col = 0; col < NUM_COLUMN; col++) {
      //check if the column is used in the query at all (assume that used_col_idx is segment group aware)
      if (used_col_idx[gpu][col] != NULL) {  
        int temp = 0;
        for (int i = 0; i < toBroadcastCount[gpu]; i++) {
          int seg = toBroadcast[gpu][i];
          int cur_gpu = cm->seg_row_to_single_gpu[table][seg];
          //check if the column actually stored in GPU (for now we assume dimension table is always symmetrical)
          assert(cm->segment_list[cur_gpu][col][seg] != -1);
          int64_t idx = cm->segment_list[cur_gpu][col][seg];
          int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]);
          int* dst = broadcast_col[gpu][col] + temp * SEGMENT_SIZE;
          CubDebugExit(cudaSetDevice(gpu));
          CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[gpu][cur_gpu]));
          if (gpu != cur_gpu) nvlink_bytes[sg] += SEGMENT_SIZE;
          CubDebugExit(cudaSetDevice(0));
          int64_t start_idx = (broadcast_col[gpu][col] - cm->gpuCache[gpu])/SEGMENT_SIZE;
          // cout << start_idx + temp << endl;
          broadcast_idx[gpu][col][seg] = start_idx + temp;
          // if (col == 11 || col == 15) cout << "col " << col << " " << broadcast_idx[gpu][col][seg] << endl;
          temp++;
        }
      }
    }
  }
  CubDebugExit(cudaSetDevice(0));

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
      CHECK_ERROR_STREAM(streamopt[gpu][cur_gpu]);
      CubDebugExit(cudaStreamDestroy(streamopt[gpu][cur_gpu]));
    }
  }
  CubDebugExit(cudaSetDevice(0));

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Broadcast done: " << time << endl;

}


void
CPUGPUProcessing::broadcastSegments2(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res) {

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  vector<vector<int>> shouldBroadcast;
  shouldBroadcast.resize(NUM_GPU);

  int* toBroadcastCount = new int[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    toBroadcastCount[gpu] = 0;
  }

  // cout << qo->segment_group_count[table][sg] << endl;

  for (int i = 0; i < qo->segment_group_count[table][sg]; i++) {
    int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i];
    //check if the segment is replicated
    //if segment is not replicated (means that it's only in one GPU)
    if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) {
      assert(cm->segment_row_to_gpu[table][seg_id].size() == 1);
      int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id];
      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        // if (gpu != cur_gpu) {
          toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id;
          toBroadcastCount[gpu]++;
          if (seg_id == cm->allTable[table]->total_segment-1) {
              if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) {
                broadcast_len[gpu] += SEGMENT_SIZE;
              } else {
                broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE;
              }
          } else {
              broadcast_len[gpu] += SEGMENT_SIZE;
          }
        // }
      }
      shouldBroadcast[cur_gpu].push_back(seg_id);
    }
  }

  assert(broadcast_col != NULL);
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    assert(broadcast_col[gpu] != NULL);
    if (toBroadcastCount[gpu] > 0) {
      for (int col = 0; col < NUM_COLUMN; col++) {
        if (used_col_idx[gpu][col] != NULL) {
          int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE;
          int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED);
          assert((tmp + size) < cm->each_broadcast_size);
          broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp;
        }
      }
    }
  }

  cudaStream_t** streamopt = new cudaStream_t*[NUM_GPU];
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    streamopt[gpu] = new cudaStream_t[NUM_GPU];
    CubDebugExit(cudaSetDevice(gpu));
    for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
      CubDebugExit(cudaStreamCreate(&streamopt[gpu][cur_gpu]));
    }
  }
  CubDebugExit(cudaSetDevice(0));

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    if (toBroadcastCount[gpu] > 0) {
      int* src = toBroadcast[gpu];
      int* dst = d_toBroadcast[gpu];
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, streamopt[gpu][0]));
      CubDebugExit(cudaSetDevice(0));
    }
  }
  CubDebugExit(cudaSetDevice(0));

  // int*** tmp_buffer = new int**[NUM_GPU];
  // int scanShouldBroadcast = 0;
  // for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
  //   if (shouldBroadcast[cur_gpu].size() > 0) {
  //       // cout << cur_gpu << endl;
  //       tmp_buffer[cur_gpu] = new int*[NUM_COLUMN];
  //       for (int col = 0; col < NUM_COLUMN; col++) {
  //         // cout << col << endl;
  //         if (used_col_idx[cur_gpu][col] != NULL) {  
  //           tmp_buffer[cur_gpu][col] = (int*) cm->customCudaMalloc<int>(shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE, cur_gpu);
  //           for (int i = 0; i < shouldBroadcast[cur_gpu].size(); i++) {
  //             int seg_id = shouldBroadcast[cur_gpu][i];
  //             assert(cm->segment_list[cur_gpu][col][seg_id] != -1);
  //             int64_t idx = cm->segment_list[cur_gpu][col][seg_id];
  //             int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]);
  //             int* dst = tmp_buffer[cur_gpu][col] + i * SEGMENT_SIZE;
  //             CubDebugExit(cudaSetDevice(cur_gpu));
  //             CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][cur_gpu]));
  //             for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
  //               // if (cur_gpu != dst_gpu) {
  //                 int64_t start_idx = (broadcast_col[dst_gpu][col] - cm->gpuCache[dst_gpu])/SEGMENT_SIZE;
  //                 broadcast_idx[dst_gpu][col][seg_id] = start_idx + scanShouldBroadcast + i;
  //                 // if (col == 18) cout << "dst gpu " << dst_gpu << " " << seg_id << " " << broadcast_idx[dst_gpu][col][seg_id] << endl;
  //               // }
  //             }
  //           }
  //         }
  //       }
  //       scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
  //       // cout << endl;
  //   }
  // }

  int*** tmp_buffer = new int**[NUM_COLUMN];
  for (int col = 0; col < NUM_COLUMN; col++) {
    if (used_col_idx[0][col] != NULL) { 
        tmp_buffer[col] = new int*[NUM_GPU];
        int scanShouldBroadcast = 0;
        for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
          if (shouldBroadcast[cur_gpu].size() > 0) { 
                    tmp_buffer[col][cur_gpu] = (int*) cm->customCudaMalloc<int>(shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE, cur_gpu);

                    for (int i = 0; i < shouldBroadcast[cur_gpu].size(); i++) {
                      int seg_id = shouldBroadcast[cur_gpu][i];
                      assert(cm->segment_list[cur_gpu][col][seg_id] != -1);
                      int64_t idx = cm->segment_list[cur_gpu][col][seg_id];
                      int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]);
                      int* dst = tmp_buffer[col][cur_gpu] + i * SEGMENT_SIZE;
                      CubDebugExit(cudaSetDevice(cur_gpu));
                      CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][cur_gpu]));
                      for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
                          int64_t start_idx = (broadcast_col[dst_gpu][col] - cm->gpuCache[dst_gpu])/SEGMENT_SIZE;
                          broadcast_idx[dst_gpu][col][seg_id] = start_idx + scanShouldBroadcast + i;
                      }
                    }

                    CubDebugExit(cudaSetDevice(cur_gpu));
                    CHECK_ERROR_STREAM(streamopt[cur_gpu][cur_gpu]);
                    CubDebugExit(cudaSetDevice(0));

                    for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
                          int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE;
                          CubDebugExit(cudaSetDevice(cur_gpu));
                          CubDebugExit(cudaMemcpyAsync(dst, tmp_buffer[col][cur_gpu], shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][dst_gpu]));
                          if (dst_gpu != cur_gpu) nvlink_bytes[sg] += shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE;
                          CubDebugExit(cudaSetDevice(0));
                    }
                    scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
          }
        }
    }
  }

  // for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
  //   CubDebugExit(cudaSetDevice(cur_gpu));
  //   CHECK_ERROR_STREAM(streamopt[cur_gpu][cur_gpu]);
  //   CubDebugExit(cudaSetDevice(0));
  // }

  // scanShouldBroadcast = 0;
  // for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
  //   if (shouldBroadcast[cur_gpu].size() > 0) {
  //       for (int col = 0; col < NUM_COLUMN; col++) {
  //         //check if the column is used in the query at all (assume that used_col_idx is segment group aware)
  //         if (used_col_idx[cur_gpu][col] != NULL) {  
  //             int* src = tmp_buffer[cur_gpu][col];
  //             for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
  //                 // if (cur_gpu != dst_gpu) {
  //                   // cout << cur_gpu << " " << dst_gpu << endl;
  //                   int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE;
  //                   CubDebugExit(cudaSetDevice(cur_gpu));
  //                   CubDebugExit(cudaMemcpyAsync(dst, src, shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, streamopt[cur_gpu][dst_gpu]));
  //                   CubDebugExit(cudaSetDevice(0));
  //                 // }
  //             }
  //         }
  //       }
  //       scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
  //       // cout << scanShouldBroadcast << endl;
  //   }
  // }
  // CubDebugExit(cudaSetDevice(0));

  // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
  //   CubDebugExit(cudaSetDevice(gpu));
  //   for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
  //     CHECK_ERROR_STREAM(streamopt[gpu][cur_gpu]);
  //   }
  // }
  // CubDebugExit(cudaSetDevice(0));

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
      CHECK_ERROR_STREAM(streamopt[gpu][cur_gpu]);
      CubDebugExit(cudaStreamDestroy(streamopt[gpu][cur_gpu]));
    }
  }
  CubDebugExit(cudaSetDevice(0));

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Broadcast done: " << time << endl;

  // assert(0);

}

void
CPUGPUProcessing::broadcastSegmentsNCCL2(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res, cudaStream_t* stream) {

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  vector<vector<int>> shouldBroadcast;
  shouldBroadcast.resize(NUM_GPU);

  int* toBroadcastCount = new int[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    toBroadcastCount[gpu] = 0;
  }

  // cout << qo->segment_group_count[table][sg] << endl;

  for (int i = 0; i < qo->segment_group_count[table][sg]; i++) {
    int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i];
    //check if the segment is replicated
    //if segment is not replicated (means that it's only in one GPU)
    if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) {
      assert(cm->segment_row_to_gpu[table][seg_id].size() == 1);
      int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id];
      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        // if (gpu != cur_gpu) {
          toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id;
          toBroadcastCount[gpu]++;
          if (seg_id == cm->allTable[table]->total_segment-1) {
              if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) {
                broadcast_len[gpu] += SEGMENT_SIZE;
              } else {
                broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE;
              }
          } else {
              broadcast_len[gpu] += SEGMENT_SIZE;
          }
        // }
      }
      shouldBroadcast[cur_gpu].push_back(seg_id);
    }
  }

  assert(broadcast_col != NULL);
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    assert(broadcast_col[gpu] != NULL);
    if (toBroadcastCount[gpu] > 0) {
      for (int col = 0; col < NUM_COLUMN; col++) {
        if (used_col_idx[gpu][col] != NULL) {
          int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE;
          int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED);
          assert((tmp + size) < cm->each_broadcast_size);
          broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp;
        }
      }
    }
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    if (toBroadcastCount[gpu] > 0) {
      int* src = toBroadcast[gpu];
      int* dst = d_toBroadcast[gpu];
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
      CubDebugExit(cudaSetDevice(0));
    }
  }
  CubDebugExit(cudaSetDevice(0));

  int*** tmp_buffer = new int**[NUM_GPU];
  int scanShouldBroadcast = 0;
  for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
    if (shouldBroadcast[cur_gpu].size() > 0) {
        // cout << cur_gpu << endl;
        tmp_buffer[cur_gpu] = new int*[NUM_COLUMN];
        for (int col = 0; col < NUM_COLUMN; col++) {
          // cout << col << endl;
          if (used_col_idx[cur_gpu][col] != NULL) {  
            tmp_buffer[cur_gpu][col] = (int*) cm->customCudaMalloc<int>(shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE, cur_gpu);
            for (int i = 0; i < shouldBroadcast[cur_gpu].size(); i++) {
              int seg_id = shouldBroadcast[cur_gpu][i];
              assert(cm->segment_list[cur_gpu][col][seg_id] != -1);
              int64_t idx = cm->segment_list[cur_gpu][col][seg_id];
              int* src = &(cm->gpuCache[cur_gpu][idx * SEGMENT_SIZE]);
              int* dst = tmp_buffer[cur_gpu][col] + i * SEGMENT_SIZE;
              CubDebugExit(cudaSetDevice(cur_gpu));
              CubDebugExit(cudaMemcpyAsync(dst, src, SEGMENT_SIZE * sizeof(int), cudaMemcpyDeviceToDevice, stream[cur_gpu]));
              for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
                // if (cur_gpu != dst_gpu) {
                  int64_t start_idx = (broadcast_col[dst_gpu][col] - cm->gpuCache[dst_gpu])/SEGMENT_SIZE;
                  broadcast_idx[dst_gpu][col][seg_id] = start_idx + scanShouldBroadcast + i;
                  // if (col == 18) cout << "dst gpu " << dst_gpu << " " << seg_id << " " << broadcast_idx[dst_gpu][col][seg_id] << endl;
                // }
              }
            }
          }
        }
        scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
        // cout << endl;
    }
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(stream[gpu]);
    CubDebugExit(cudaSetDevice(0));
  }

  

  // for (int col = 0; col < NUM_COLUMN; col++) {
  //   ncclGroupStart();
  //   if (used_col_idx[0][col] != NULL) { 
  //       scanShouldBroadcast = 0;
  //       for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) { 
  //           if (shouldBroadcast[cur_gpu].size() > 0) {
  //               for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
  //                   // cudaSetDevice(dst_gpu);
  //                   // int* abc = (int*) cm->customCudaMalloc<int>(2000000, dst_gpu);
  //                   int* ptr = tmp_buffer[cur_gpu][col];
  //                   int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE;
  //                   int count = shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE;
  //                   // cout << count << endl;
  //                   ncclBroadcast(ptr, dst, count, ncclInt32, cur_gpu, comms[dst_gpu], stream[dst_gpu]);
  //                   // CubDebugExit(cudaMemcpyAsync(dst, ptr, count * sizeof(int), cudaMemcpyDeviceToDevice, stream[cur_gpu]));
  //               }
  //               scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
  //           }
  //       }
  //   }
  //   ncclGroupEnd(); 
  // }

  
  ncclGroupStart();
  scanShouldBroadcast = 0;
  for (int cur_gpu = 0; cur_gpu < NUM_GPU; cur_gpu++) {
    if (shouldBroadcast[cur_gpu].size() > 0) {
        for (int col = 0; col < NUM_COLUMN; col++) {
            if (used_col_idx[cur_gpu][col] != NULL) { 
                nvlink_bytes[sg] += shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE * (NUM_GPU-1);
                for (int dst_gpu = 0; dst_gpu < NUM_GPU; dst_gpu++) {
                    // cudaSetDevice(dst_gpu);
                    // int* abc = (int*) cm->customCudaMalloc<int>(2000000, dst_gpu);
                    int* ptr = tmp_buffer[cur_gpu][col];
                    int* dst = broadcast_col[dst_gpu][col] + scanShouldBroadcast * SEGMENT_SIZE;
                    int count = shouldBroadcast[cur_gpu].size() * SEGMENT_SIZE;
                    // cout << count << endl;
                    ncclBroadcast(ptr, dst, count, ncclInt32, cur_gpu, comms[dst_gpu], stream[dst_gpu]);
                    // CubDebugExit(cudaMemcpyAsync(dst, ptr, count * sizeof(int), cudaMemcpyDeviceToDevice, stream[cur_gpu]));
                }
            }
        }
        scanShouldBroadcast += shouldBroadcast[cur_gpu].size();
    }
  }
  ncclGroupEnd(); 
  

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(stream[gpu]);
    CubDebugExit(cudaSetDevice(0));
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Broadcast done: " << time << endl;

  // assert(0);

}

void
CPUGPUProcessing::broadcastSegmentsNCCL(int sg, int table, int*** broadcast_col, int*** used_col_idx, int* broadcast_len, int** toBroadcast, int** d_toBroadcast, int* res, cudaStream_t* stream) {

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  vector<int> shouldBroadcast;

  int* toBroadcastCount = new int[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    toBroadcastCount[gpu] = 0;
  }

  // cout << qo->segment_group_count[table][sg] << endl;

  for (int i = 0; i < qo->segment_group_count[table][sg]; i++) {
    int seg_id = qo->segment_group[table][sg * cm->allTable[table]->total_segment + i];
    //check if the segment is replicated
    //if segment is not replicated (means that it's only in one GPU)
    if (cm->segment_row_to_gpu[table][seg_id].size() < NUM_GPU) {
      assert(cm->segment_row_to_gpu[table][seg_id].size() == 1);
      // int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id];
      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        // if (gpu != cur_gpu) {
          toBroadcast[gpu][toBroadcastCount[gpu]] = seg_id;
          toBroadcastCount[gpu]++;
          if (seg_id == cm->allTable[table]->total_segment-1) {
              if (cm->allTable[table]->LEN % SEGMENT_SIZE == 0) {
                broadcast_len[gpu] += SEGMENT_SIZE;
              } else {
                broadcast_len[gpu] += cm->allTable[table]->LEN % SEGMENT_SIZE;
              }
          } else {
              broadcast_len[gpu] += SEGMENT_SIZE;
          }
        // }
      }
      shouldBroadcast.push_back(seg_id);
    }
  }

  assert(broadcast_col != NULL);
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    assert(broadcast_col[gpu] != NULL);
    if (toBroadcastCount[gpu] > 0) {
      for (int col = 0; col < NUM_COLUMN; col++) {
        if (used_col_idx[gpu][col] != NULL) {
          int64_t size = toBroadcastCount[gpu] * SEGMENT_SIZE;
          int tmp = __atomic_fetch_add(&cm->broadcastPointer[gpu], size, __ATOMIC_RELAXED);
          assert((tmp + size) < cm->each_broadcast_size);
          broadcast_col[gpu][col] = cm->gpuBroadcast[gpu] + tmp;
        }
      }
    }
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    if (toBroadcastCount[gpu] > 0) {
      int* src = toBroadcast[gpu];
      int* dst = d_toBroadcast[gpu];
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaMemcpyAsync(dst, src, toBroadcastCount[gpu] * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
      CubDebugExit(cudaSetDevice(0));
    }
  }
  CubDebugExit(cudaSetDevice(0));
  
  int**** dest = new int***[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    dest[gpu] = new int**[NUM_COLUMN]();
    for (int col = 0; col < NUM_COLUMN; col++) {
      //check if the column is used in the query at all (assume that used_col_idx is segment group aware)
      if (used_col_idx[gpu][col] != NULL) {  
        int temp = 0;
        dest[gpu][col] = new int*[cm->allColumn[col]->total_segment]();
        for (int i = 0; i < toBroadcastCount[gpu]; i++) {
          int seg_id = toBroadcast[gpu][i];
          // int cur_gpu = cm->seg_row_to_single_gpu[table][seg_id];
          dest[gpu][col][seg_id] = broadcast_col[gpu][col] + temp * SEGMENT_SIZE;
          int64_t start_idx = (broadcast_col[gpu][col] - cm->gpuCache[gpu])/SEGMENT_SIZE;
          broadcast_idx[gpu][col][seg_id] = start_idx + temp;
          temp++;
        }
      }
    }
  }

  ncclGroupStart();
  for (int col = 0; col < NUM_COLUMN; col++) {
    if (used_col_idx[0][col] != NULL) { 
      for (int i = 0; i < shouldBroadcast.size(); i++) {
        int seg_id = shouldBroadcast[i];
        int root = cm->seg_row_to_single_gpu[table][seg_id];
        int64_t idx = cm->segment_list[root][col][seg_id];
        assert(idx != -1);
        int* ptr = &(cm->gpuCache[root][idx * SEGMENT_SIZE]);
        nvlink_bytes[sg] += SEGMENT_SIZE * (NUM_GPU-1);
        for (int gpu = 0; gpu < NUM_GPU; gpu++) {
          if (gpu != root) assert(dest[gpu][col][seg_id] != NULL);
          ncclBroadcast(ptr, dest[gpu][col][seg_id], SEGMENT_SIZE, ncclInt, root, comms[gpu], stream[gpu]);
        }
      }
    }
  }
  ncclGroupEnd(); 

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(stream[gpu]);
    CubDebugExit(cudaSetDevice(0));
  }

  // ncclGroupStart();
  // if (rank == root) {
  //   for (int r=0; r<nranks; r++)
  //     ncclSend(sendbuff[r], size, type, r, comm, stream);
  // }
  // ncclRecv(recvbuff, size, type, root, comm, stream);
  // ncclGroupEnd();

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Broadcast done: " << time << endl;

  // assert(0);

}

void 
CPUGPUProcessing::call_operator_dim_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int table, bool filter_on_gpu, bool broadcast, bool already_partitioned, cudaStream_t* stream) {
  int* dimkey_idx[NUM_GPU] = {}, *group_idx[NUM_GPU] = {}, *filter_idx[NUM_GPU] = {};
  ColumnInfo* key_column = NULL, *filter_col = NULL, *group_col = NULL;
  int*** used_col_idx = new int**[NUM_GPU];
  float output_selectivity = 1.0;
  bool first_shuffle = true;
  bool has_shuffled = false;
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    used_col_idx[gpu] = new int*[NUM_COLUMN]();
  }

  for (int i = 0; i < qo->join.size(); i++) {
    if (qo->join[i].second->table_id == table) {
      key_column = qo->join[i].second; break;
    }
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    if (qo->groupby_build.size() > 0 && qo->groupby_build[key_column].size() > 0) {
      if (qo->groupGPUcheck) {
        group_col = qo->groupby_build[key_column][0];
        cm->indexTransfer(col_idx[gpu], group_col, stream[gpu], gpu, custom);
        assert(col_idx[gpu][group_col->column_id] != NULL);
        group_idx[gpu] = col_idx[gpu][group_col->column_id];
        used_col_idx[gpu][group_col->column_id] = col_idx[gpu][group_col->column_id];
      }
    }
  
    //WARNING: IT'S POSSIBLE THAT THE FILTER IS FROM CPU
    if (filter_on_gpu) {
      if (qo->select_build[key_column].size() > 0) {
        filter_col = qo->select_build[key_column][0];
        cm->indexTransfer(col_idx[gpu], filter_col, stream[gpu], gpu, custom);
        assert(col_idx[gpu][filter_col->column_id] != NULL);
        filter_idx[gpu] = col_idx[gpu][filter_col->column_id];
        used_col_idx[gpu][filter_col->column_id] = col_idx[gpu][filter_col->column_id];
        output_selectivity = params->selectivity[filter_col];
      }
    }

    // cout << "call index transfer for column " << key_column << " in gpu " << gpu << endl;
    cm->indexTransfer(col_idx[gpu], key_column, stream[gpu], gpu, custom);
    assert(col_idx[gpu][key_column->column_id] != NULL);
    cpu_to_gpu[sg] += (key_column->total_segment * sizeof(int));
    dimkey_idx[gpu] = col_idx[gpu][key_column->column_id];
    used_col_idx[gpu][key_column->column_id] = col_idx[gpu][key_column->column_id];
  }

  //these four structs are helper for shuffling
  //output of shuffling (column)
  int*** temp_col = new int**[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    temp_col[gpu] = new int*[NUM_COLUMN]();
  }

  int** result_count = new int*[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    result_count[gpu] = new int[NUM_PARTITION]();
  }

  //input of partitioning (column) -> only for pipelined
  int**** in_shuffle_col = new int***[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    in_shuffle_col[gpu] = new int**[NUM_COLUMN]();
  }

  //input of partitioning (offset) -> can be global or local depending on needs -> only for pipelined
  int**** in_shuffle_off = new int***[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    in_shuffle_off[gpu] = new int**[NUM_TABLE]();
  }

  //output of partitioning (column)
  int**** out_shuffle_col = new int***[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    out_shuffle_col[gpu] = new int**[NUM_COLUMN]();
  }

  //output of partitioning (offset) -> can be global or local depending on needs
  int**** out_shuffle_off = new int***[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    out_shuffle_off[gpu] = new int**[NUM_TABLE]();
  }

  shuffleHelper* shelper = new shuffleHelper{
    result_count, temp_col, NULL, out_shuffle_col, out_shuffle_off, in_shuffle_col, in_shuffle_off
  };

  int** d_seg_is_replicated = new int*[NUM_GPU];
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    d_seg_is_replicated[gpu] = (int*) cm->customCudaMalloc<int>(cm->allTable[table]->total_segment, gpu);
    CubDebugExit(cudaMemcpyAsync(d_seg_is_replicated[gpu], cm->seg_is_replicated[table], cm->allTable[table]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
    CubDebugExit(cudaSetDevice(0));
  }

  filterArgsGPU** fargs = new filterArgsGPU*[NUM_GPU]();
  buildArgsGPU** bargs = new buildArgsGPU*[NUM_GPU]();
  probeArgsGPU** pargs = new probeArgsGPU*[NUM_GPU]();
  groupbyArgsGPU** gargs = new groupbyArgsGPU*[NUM_GPU]();
  shuffleArgsGPU** sargs = new shuffleArgsGPU*[NUM_GPU]();

  KernelParams** kparams = new KernelParams*[NUM_GPU]();
  KernelLaunch** kernellaunch = new KernelLaunch*[NUM_GPU]();

  //THIS IS USED FOR BROADCAST
  //broadcast_idx universal to all segment group
  //broadcast_col local to segment group
  //broadcast_len local to segment group
  //toBroadcast local to segment group
  int** toBroadcast = new int*[NUM_GPU](); //list of segment row that are broadcasted to this GPU
  int** d_toBroadcast = new int*[NUM_GPU](); //list of segment row that are broadcasted to this GPU
  int*** broadcast_col = new int**[NUM_GPU](); //storing segments that are broadcasted to this GPU
  int* broadcast_len = new int[NUM_GPU](); //total segment row that are broadcasted to this GPU
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    toBroadcast[gpu] = new int[cm->allTable[table]->total_segment]();
    d_toBroadcast[gpu] = (int*) cm->customCudaMalloc<int>(cm->allTable[table]->total_segment, gpu);
    broadcast_col[gpu] = new int*[NUM_COLUMN]();
    broadcast_len[gpu] = 0;
  }

  int** tmp_broadcast_idx_key = new int*[NUM_GPU]();
  int** tmp_broadcast_idx_val = new int*[NUM_GPU]();
  int** tmp_broadcast_idx_filter = new int*[NUM_GPU]();
  if (broadcast) {
    SETUP_TIMING();
    float time;
    cudaEventRecord(start, 0);
    // broadcastSegments2(sg, table, broadcast_col, used_col_idx, broadcast_len, toBroadcast, d_toBroadcast, params->res);
    if (NUM_GPU == 8) broadcastSegmentsNCCL(sg, table, broadcast_col, used_col_idx, broadcast_len, toBroadcast, d_toBroadcast, params->res, stream);
    else broadcastSegments2(sg, table, broadcast_col, used_col_idx, broadcast_len, toBroadcast, d_toBroadcast, params->res);
    // __atomic_fetch_add(&sg_broadcast_count, 1, __ATOMIC_RELAXED);
    // while (sg_broadcast_count < qo->par_segment_count[table]) {};
    // assert(sg_broadcast_count == qo->par_segment_count[table]);
    // cout << sg_broadcast_count << endl;

    //HAVE TO TRANSFER THE IDX AFTER DOING BROADCAST, THE IDX ONLY VALID FOR THIS SEGMENT GROUP (DIFFERENT FROM THE ONE IN call_operator_fact)
    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        tmp_broadcast_idx_key[gpu] = (int*) cm->customCudaMalloc<int>(key_column->total_segment, gpu);
        if (group_col != NULL) tmp_broadcast_idx_val[gpu] = (int*) cm->customCudaMalloc<int>(group_col->total_segment, gpu);
        if (filter_col != NULL) tmp_broadcast_idx_filter[gpu] = (int*) cm->customCudaMalloc<int>(filter_col->total_segment, gpu);
        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaMemcpyAsync(tmp_broadcast_idx_key[gpu], broadcast_idx[gpu][key_column->column_id], key_column->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
        if (group_col != NULL) CubDebugExit(cudaMemcpyAsync(tmp_broadcast_idx_val[gpu], broadcast_idx[gpu][group_col->column_id], group_col->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
        if (filter_col != NULL) CubDebugExit(cudaMemcpyAsync(tmp_broadcast_idx_filter[gpu], broadcast_idx[gpu][filter_col->column_id], filter_col->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
        CubDebugExit(cudaSetDevice(0));
    }

    cudaEventRecord(stop, 0);
    cudaEventSynchronize(stop);
    cudaEventElapsedTime(&time, start, stop);
    if (verbose) cout << "Broadcast time " << time << endl;
    shuffle_time[sg] += time;
  }

  //WARNING set used_col_idx on filter to NULL so that it doesn't participate in partitioning
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    if (filter_on_gpu && group_col != NULL) {
      if (filter_col->column_id != group_col->column_id) {
        if (qo->select_build[key_column].size() > 0) {
          used_col_idx[gpu][filter_col->column_id] = NULL;
        }
      }
    }
  }

  // if current join needs shuffling
  if (!broadcast && !params->ht_replicated[table] && !already_partitioned) {

    SETUP_TIMING();
    float time;
    cudaEventRecord(start, 0);

    assert(NUM_GPU == NUM_PARTITION);
    
    // if this is the first join then we will call rangepartitioningkeyvalue
    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      //do shuffling in each GPU

      //it's possible there is filter before shuffling
      if (filter_idx[gpu]) {
        fargs[gpu] = new filterArgsGPU{
              filter_idx[gpu], NULL,
              params->compare1[filter_col], params->compare2[filter_col], 0, 0,
              params->mode[filter_col], 0,
              NULL, NULL, NULL,
            };
      } else {
        fargs[gpu] = new filterArgsGPU{NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL, NULL};        
      }

      int** d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
      int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
      int** d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);

      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
      CubDebugExit(cudaMemcpyAsync(d_col_idx, used_col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); 
      assert(off_col[gpu] != NULL);
      if (off_col[gpu][table] != NULL) {
        CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
      } else {
        d_in_off=NULL;
      }
      CubDebugExit(cudaSetDevice(0));

      if (verbose) cout << "Partitioning with column id " << key_column->column_id << endl;
      sargs[gpu] = new shuffleArgsGPU{
          d_temp_col, NULL, d_col_idx, d_in_off, NULL, NULL, 
          key_column->column_id, table, params->min_key[key_column], params->max_key[key_column],
          NULL, NULL,
          d_seg_is_replicated[gpu], NULL
      };
      
      kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total);
      kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, Nope, table, output_selectivity, 0, 0, stream[gpu]);
      bool pipeline = 0;
      // kernellaunch[gpu]->preparePartitioningWithoutCount(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, first_shuffle, pipeline);
      kernellaunch[gpu]->countPartitioning(off_col, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, first_shuffle);
      kernellaunch[gpu]->preparePartitioningAfterCount(off_col, used_col_idx, qo->joinGPUcheck, first_shuffle);
    }

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      kernellaunch[gpu]->launchPartitioning();
    }

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      kernellaunch[gpu]->clearPartitioning();
      delete fargs[gpu];
      delete sargs[gpu];
      delete kparams[gpu];
      delete kernellaunch[gpu];
    }

    if (NUM_GPU == 8) shuffleDataNCCL(off_col, h_total, shelper, first_shuffle, sg, stream);
    else shuffleDataOpt(off_col, h_total, shelper, first_shuffle, sg, stream);
    
    cudaEventRecord(stop, 0);
    cudaEventSynchronize(stop);
    cudaEventElapsedTime(&time, start, stop);

    first_shuffle = false;
    has_shuffled = true;
    if (verbose) cout << "Partitioning and Shuffle time " << time << endl;
    shuffle_time[sg] += time;

  }

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {

    assert(params->ht_GPU[gpu][key_column] != NULL);

    if (filter_idx[gpu] && !has_shuffled) {
      fargs[gpu] = new filterArgsGPU{
            filter_idx[gpu], NULL,
            params->compare1[filter_col], params->compare2[filter_col], 0, 0,
            params->mode[filter_col], 0,
            broadcast_col[gpu][filter_col->column_id],
            NULL, NULL,
          };
    } else {
      fargs[gpu] = new filterArgsGPU{NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL, NULL};        
    }
  
    if (broadcast) {
      bargs[gpu] = new buildArgsGPU{
        dimkey_idx[gpu], group_idx[gpu],
        params->dim_len_GPU[gpu][key_column], params->min_key_GPU[gpu][key_column], params->max_key[key_column],
        NULL,
        NULL,
        tmp_broadcast_idx_key[gpu],
        (group_col != NULL) ? tmp_broadcast_idx_val[gpu] : NULL,
        (filter_col != NULL) ? tmp_broadcast_idx_filter[gpu] : NULL
      };
    } else {
      if (!has_shuffled) {
        // cout << " here " << endl;
        // cout << params->dim_len_GPU[gpu][key_column] << " " << params->min_key_GPU[gpu][key_column] << endl;;
        bargs[gpu] = new buildArgsGPU{
          dimkey_idx[gpu], group_idx[gpu],
          params->dim_len_GPU[gpu][key_column], params->min_key_GPU[gpu][key_column], params->max_key[key_column],
          NULL, NULL, NULL, NULL, NULL
        };
      } else {
        bargs[gpu] = new buildArgsGPU{
          NULL, NULL,
          params->dim_len_GPU[gpu][key_column], params->min_key_GPU[gpu][key_column], params->max_key[key_column],
          temp_col[gpu][key_column->column_id],
          (group_col != NULL) ? temp_col[gpu][group_col->column_id] : NULL, 
          NULL, NULL, NULL
        };
        // assert(group_col != NULL);
        // assert(temp_col[gpu][group_col->column_id] != NULL);
      }
    }

    int latemat = 0;
    bool aggrGPUcheck = 0;
    kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, NULL, h_total);
    kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, JustBuild, table, 1, latemat, aggrGPUcheck, stream[gpu]);
    kernellaunch[gpu]->prepareKernelDim(off_col, key_column, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, table, has_shuffled);
    kernellaunch[gpu]->launchKernel(has_shuffled, broadcast, d_toBroadcast[gpu], broadcast_len[gpu]);
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    int*** temp;
    int will_shuffle = 0;
    kernellaunch[gpu]->clearKernel(temp, will_shuffle, has_shuffled);
    delete fargs[gpu];
    delete bargs[gpu];
    delete kparams[gpu];
    delete kernellaunch[gpu];
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  if (verbose) cout << "Build time " << time << endl;
  gpu_time[sg] += time;

  delete[] fargs;
  delete[] bargs;
  delete[] pargs;
  delete[] sargs;
  delete[] gargs;
  delete[] kparams;
  delete[] kernellaunch;
  delete shelper;

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    delete[] temp_col[gpu];
    delete[] result_count[gpu];
    delete[] in_shuffle_col[gpu];
    delete[] out_shuffle_col[gpu];
    delete[] in_shuffle_off[gpu];
    delete[] out_shuffle_off[gpu];
    delete[] used_col_idx[gpu];
  }
  delete[] temp_col;
  delete[] result_count;
  delete[] in_shuffle_col;
  delete[] in_shuffle_off;
  delete[] out_shuffle_col;
  delete[] out_shuffle_off;
  delete[] d_seg_is_replicated;
  delete[] toBroadcast;
  delete[] d_toBroadcast;
  delete[] broadcast_col;
  delete[] broadcast_len;
  delete[] tmp_broadcast_idx_key;
  delete[] tmp_broadcast_idx_filter;
  delete[] tmp_broadcast_idx_val;
  delete[] used_col_idx;

};

void
CPUGPUProcessing::call_filter_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int table, int select_so_far, cudaStream_t* stream) {
  if(qo->selectGPUPipelineCol[sg].size() == 0) return;
  
  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  int _compare1[2] = {0}, _compare2[2] = {0};
  int *filter_idx[NUM_GPU][2] = {};
  float output_selectivity = 1.0;

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CubDebugExit(cudaMemsetAsync(d_total[gpu], 0, sizeof(int), stream[gpu]));
  }
  CubDebugExit(cudaSetDevice(0));

  int num_select = qo->selectGPUPipelineCol[sg].size();
  int total_select = qo->selectGPUPipelineCol[sg].size() + qo->selectCPUPipelineCol[sg].size();

  for (int select = 0; select < num_select; select++) {
    if (select_so_far == total_select) break;
    ColumnInfo* column = qo->selectGPUPipelineCol[sg][select];

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom);
      assert(col_idx[gpu][column->column_id] != NULL);
      filter_idx[gpu][select_so_far + select] = col_idx[gpu][column->column_id];
    }

    _compare1[select_so_far + select] = params->compare1[column];
    _compare2[select_so_far + select] = params->compare2[column];

    output_selectivity *= params->selectivity[column];
  }

  int has_shuffled = 0;
  int will_shuffle = 0;
  filterArgsGPU** fargs = new filterArgsGPU*[NUM_GPU]();
  buildArgsGPU** bargs = new buildArgsGPU*[NUM_GPU]();
  probeArgsGPU** pargs = new probeArgsGPU*[NUM_GPU]();
  groupbyArgsGPU** gargs = new groupbyArgsGPU*[NUM_GPU]();
  shuffleArgsGPU** sargs = new shuffleArgsGPU*[NUM_GPU]();
  shuffleHelper* shelper = new shuffleHelper[1]();
  KernelParams** kparams = new KernelParams*[NUM_GPU]();
  KernelLaunch** kernellaunch = new KernelLaunch*[NUM_GPU]();

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
            //do shuffling in each GPU
            //it's possible there is filter before shuffling
            fargs[gpu] = new filterArgsGPU{
                  filter_idx[gpu][0], filter_idx[gpu][1], 
                  _compare1[0], _compare2[0], _compare1[1], _compare2[1], 
                  1, 1,
                  NULL, NULL
                };

            int latemat = 0;
            bool aggrGPUcheck = 0;
            kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total);
            kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, JustFilter, table, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);

            kernellaunch[gpu]->prepareKernelFact(off_col, NULL, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, 
              NULL, NULL, will_shuffle, has_shuffled);
            kernellaunch[gpu]->launchKernel(has_shuffled);
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
            kernellaunch[gpu]->clearKernel(off_col, will_shuffle, has_shuffled);
            delete fargs[gpu];
            delete kparams[gpu];
            delete kernellaunch[gpu];
  }

  delete[] fargs;
  delete[] bargs;
  delete[] pargs;
  delete[] sargs;
  delete[] gargs;
  delete[] kparams;
  delete[] kernellaunch;
  delete shelper;

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  if (verbose) cout << "Filter GPU time " << time << endl;
  gpu_time[sg] += time;

}

//NOT SUPPORTED YET: 
//(1) pipeline filter before partitioning
//(2) operator generate global offset (for example CPU operator or filter in GPU) which will be the input of partitioning
void 
CPUGPUProcessing::call_operator_fact_GPU(QueryParams* params, int*** &off_col, int** &d_total, int** h_total, int sg, int select_so_far, int latemat, cudaStream_t* stream) {

  int _min_key[NUM_GPU][4] = {0}, _dim_len[NUM_GPU][4] = {0};
  int _compare1[2] = {0}, _compare2[2] = {0};
  int _min_val[4] = {0}, _unique_val[4] = {0};
  int *aggr_idx[NUM_GPU][2] = {}, *group_idx[NUM_GPU][4] = {}, *filter_idx[NUM_GPU][2] = {};
  float output_selectivity = 1.0;
  bool first_shuffle = true;
  bool has_shuffled = false;
  int group_col_id[MAX_GROUPBY] = {0};
  int aggr_col_id[MAX_AGGR] = {0};
  int fkey_col_id[MAX_JOIN] = {0};
  for (int fkey_col = 0; fkey_col < MAX_JOIN; fkey_col++) {
    fkey_col_id[fkey_col] = -1;
  }
  //initialize group_col_id and aggr_col_id to -1
  for (int group = 0; group < MAX_GROUPBY; group++) {
    group_col_id[group] = -1;
  }
  for (int aggr = 0; aggr < MAX_AGGR; aggr++) {
    aggr_col_id[aggr] = -1;
  }

  int*** used_col_idx = new int**[NUM_GPU];
  int**** used_all_col_idx = new int***[NUM_GPU]();
  int*** used_broadcast_idx = new int**[NUM_GPU];
  int*** d_broadcast_idx = new int**[NUM_GPU];
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    used_col_idx[gpu] = new int*[NUM_COLUMN]();
    used_broadcast_idx[gpu] = new int*[NUM_COLUMN]();
    used_all_col_idx[gpu] = new int**[NUM_GPU]();
    for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
      used_all_col_idx[gpu][gpu_iter] = new int*[NUM_COLUMN]();
    }
  }

  assert(NUM_GPU == NUM_PARTITION);

  // ColumnInfo* filter_col[2] = {};

  if(qo->joinGPUPipelineCol[sg].size() == 0) return;

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CubDebugExit(cudaMemsetAsync(d_total[gpu], 0, sizeof(int), stream[gpu]));
  }
  CubDebugExit(cudaSetDevice(0));
  
  int num_join = qo->joinGPUPipelineCol[sg].size();
  int num_select = qo->selectGPUPipelineCol[sg].size();
  int total_select = qo->selectGPUPipelineCol[sg].size() + qo->selectCPUPipelineCol[sg].size();
  bool aggrGPUcheck = qo->groupbyGPUPipelineCol[sg].size() == qo->queryAggrColumn.size();
  if (verbose) cout << "aggr gpu check " << aggrGPUcheck << endl;

  for (int select = 0; select < num_select; select++) {
    if (select_so_far == total_select) break;
    ColumnInfo* column = qo->selectGPUPipelineCol[sg][select];

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
        assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL);
        used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id];
      }
      cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom);
      assert(col_idx[gpu][column->column_id] != NULL);
      cpu_to_gpu[sg] += (column->total_segment * sizeof(int));
      filter_idx[gpu][select_so_far + select] = col_idx[gpu][column->column_id];
      // if (latemat != 2) used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id];
    }

    _compare1[select_so_far + select] = params->compare1[column];
    _compare2[select_so_far + select] = params->compare2[column];
    output_selectivity *= params->selectivity[column];
    // filter_col[select_so_far + select] = column;
  }

  for (int join = 0; join < num_join; join++) {

    ColumnInfo* column = qo->joinGPUPipelineCol[sg][join];
    int table_id = qo->fkey_pkey[column]->table_id;
    ColumnInfo* pkey = qo->fkey_pkey[column];

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
        assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL);
        used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id];
      }
      cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom);
      assert(col_idx[gpu][column->column_id] != NULL);
      cpu_to_gpu[sg] += (column->total_segment * sizeof(int));
      used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id];

      _min_key[gpu][table_id - 1] = params->min_key_GPU[gpu][pkey];
      _dim_len[gpu][table_id - 1] = params->dim_len_GPU[gpu][pkey];
    }
  }

  //have to fix cm->lo_orderdate in case we use synthetic bench
  for (int aggr = 0; aggr < qo->aggregation[cm->lo_orderdate].size(); aggr++) {
    if (qo->groupGPUcheck && aggrGPUcheck) {
      ColumnInfo* column = qo->aggregation[cm->lo_orderdate][aggr];
      aggr_col_id[aggr] = column->column_id;

      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
          assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL);
          used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id];
        }
        cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom);
        assert(col_idx[gpu][column->column_id] != NULL);
        cpu_to_gpu[sg] += (column->total_segment * sizeof(int));
        aggr_idx[gpu][aggr] = col_idx[gpu][column->column_id];
        if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qo->groupGPUcheck && aggrGPUcheck)) used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id];
        used_broadcast_idx[gpu][column->column_id] = broadcast_idx[gpu][column->column_id];
      }
    }
  }

  unordered_map<ColumnInfo*, vector<ColumnInfo*>>::iterator it;
  for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) {
    if (qo->groupGPUcheck && aggrGPUcheck) {
      if (it->second.size() > 0) {
        ColumnInfo* column = it->second[0];
        ColumnInfo* column_key = it->first;
        group_col_id[column_key->table_id - 1] = column->column_id;

        for (int gpu = 0; gpu < NUM_GPU; gpu++) {
          for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
            assert(all_col_idx[gpu][gpu_iter][column->column_id] != NULL);
            used_all_col_idx[gpu][gpu_iter][column->column_id] = all_col_idx[gpu][gpu_iter][column->column_id];
          }
          cm->indexTransfer(col_idx[gpu], column, stream[gpu], gpu, custom);
          assert(col_idx[gpu][column->column_id] != NULL);
          cpu_to_gpu[sg] += (column->total_segment * sizeof(int));
          group_idx[gpu][column_key->table_id - 1] = col_idx[gpu][column->column_id];
          if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qo->groupGPUcheck && aggrGPUcheck)) used_col_idx[gpu][column->column_id] = col_idx[gpu][column->column_id];
          used_broadcast_idx[gpu][column->column_id] = broadcast_idx[gpu][column->column_id];
        }

        _min_val[column_key->table_id - 1] = params->min_val[column_key];
        _unique_val[column_key->table_id - 1] = params->unique_val[column_key];
        // cout << column->column_id << endl;
      }
    } else {
      if (it->second.size() > 0) {
        ColumnInfo* column = it->second[0];
        ColumnInfo* column_key = it->first;
        group_col_id[column_key->table_id - 1] = column->column_id;
      }
    }
  }

  //WARNING: aggr_col_id and group_col_id represent slightly different information
  //aggr_col_id != -1 meaning that segment group will do agregation on that column on GPU
  //group_col_id != -1 meaning that there will be groupby on that column at some point (doesn't matter whether its in CPU or GPU)
  //WARNING: HOW DO WE KNOW THAT SEG IS REPLICATED FOR THE OTHER TABLES???
  broadcastIdxTransfer(d_broadcast_idx, used_broadcast_idx, stream);
  int** d_seg_is_replicated = new int*[NUM_GPU];
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    int table_id = 0;
    CubDebugExit(cudaSetDevice(gpu));
    d_seg_is_replicated[gpu] = (int*) cm->customCudaMalloc<int>(cm->allTable[table_id]->total_segment, gpu);
    CubDebugExit(cudaMemcpyAsync(d_seg_is_replicated[gpu], cm->seg_is_replicated[table_id], cm->allTable[table_id]->total_segment * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
    CubDebugExit(cudaSetDevice(0));

    // cout << "seg is replicated " << cm->seg_is_replicated[4][0] << endl;
    // assert(cm->seg_is_replicated[4][0] == 1);
  }

  //these four structs are helper for shuffling
  //output of shuffling (column)
  int*** temp_col = new int**[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    temp_col[gpu] = new int*[NUM_COLUMN]();
  }

  //output of shuffling (offset) -> can be global or local depending on needs
  //NOW THIS IS REPLACED BY off_col
  // int*** temp_off = new int**[NUM_GPU]();
  // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
  //   temp_off[gpu] = new int*[NUM_TABLE]();
  //   if (off_col != NULL) {
  //     for (int table = 0; table < NUM_TABLE; table++) {
  //       temp_off[gpu][table] = off_col[gpu][table];
  //     }
  //   }
  // }

  int** result_count = new int*[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    result_count[gpu] = new int[NUM_PARTITION]();
  }

  //offset output of local join
  int*** join_result_off = new int**[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    join_result_off[gpu] = new int*[NUM_TABLE]();
  }

  //input of partitioning (column) -> only for pipelined
  int**** in_shuffle_col = new int***[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    in_shuffle_col[gpu] = new int**[NUM_COLUMN]();
  }

  //input of partitioning (offset) -> can be global or local depending on needs -> only for pipelined
  int**** in_shuffle_off = new int***[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    in_shuffle_off[gpu] = new int**[NUM_TABLE]();
  }

  //output of partitioning (column)
  int**** out_shuffle_col = new int***[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    out_shuffle_col[gpu] = new int**[NUM_COLUMN]();
  }

  //output of partitioning (offset) -> can be global or local depending on needs
  int**** out_shuffle_off = new int***[NUM_GPU]();
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    out_shuffle_off[gpu] = new int**[NUM_TABLE]();
  }

  shuffleHelper* shelper = new shuffleHelper{
    result_count, temp_col, join_result_off, out_shuffle_col, out_shuffle_off, in_shuffle_col, in_shuffle_off
  };
  
  vector<tuple<ColumnInfo*, ColumnInfo*>> pipeline_column;

  vector<ColumnInfo*> temp_pipeline;
  vector<ColumnInfo*> temp_pipeline2;

  if (reorder) {
    for (int join = 0; join < num_join; join++) {
      ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join];
      ColumnInfo* pkey = qo->fkey_pkey[fkey];
      if (!params->ht_replicated[pkey->table_id]) {
        // cout << pkey->column_name << endl;
        temp_pipeline2.push_back(fkey);
        // qo->joinGPUPipelineCol[sg].erase( qo->joinGPUPipelineCol[sg].begin() + join );
      } else {
        temp_pipeline.push_back(fkey);
      }
    }

    qo->joinGPUPipelineCol[sg].clear();

    for (int i = 0; i < temp_pipeline.size(); i++) {
      qo->joinGPUPipelineCol[sg].push_back(temp_pipeline[i]);
    }

    for (int i = 0; i < temp_pipeline2.size(); i++) {
      qo->joinGPUPipelineCol[sg].push_back(temp_pipeline2[i]);
    }
  } else {
    for (int join = 0; join < num_join; join++) {
      ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join];
      ColumnInfo* pkey = qo->fkey_pkey[fkey];
      if (fkey == cm->lo_suppkey) {
        // cout << pkey->column_name << endl;
        temp_pipeline2.push_back(fkey);
        // qo->joinGPUPipelineCol[sg].erase( qo->joinGPUPipelineCol[sg].begin() + join );
      } else {
        temp_pipeline.push_back(fkey);
      }
    }

    qo->joinGPUPipelineCol[sg].clear();

    for (int i = 0; i < temp_pipeline.size(); i++) {
      qo->joinGPUPipelineCol[sg].push_back(temp_pipeline[i]);
    }

    for (int i = 0; i < temp_pipeline2.size(); i++) {
      qo->joinGPUPipelineCol[sg].push_back(temp_pipeline2[i]);
    }
  }

  for (int join = 0; join < qo->joinGPUPipelineCol[sg].size(); join++) {
    cout << qo->joinGPUPipelineCol[sg][join]->column_name << endl;
  }
  // assert(0);

  for (int join = 0; join < num_join; join++) {

    filterArgsGPU** fargs = new filterArgsGPU*[NUM_GPU]();
    buildArgsGPU** bargs = new buildArgsGPU*[NUM_GPU]();
    probeArgsGPU** pargs = new probeArgsGPU*[NUM_GPU]();
    groupbyArgsGPU** gargs = new groupbyArgsGPU*[NUM_GPU]();
    shuffleArgsGPU** sargs = new shuffleArgsGPU*[NUM_GPU]();
  
    KernelParams** kparams = new KernelParams*[NUM_GPU]();
    KernelLaunch** kernellaunch = new KernelLaunch*[NUM_GPU]();

    ColumnInfo* fkey = qo->joinGPUPipelineCol[sg][join];
    int table_id = qo->fkey_pkey[fkey]->table_id;
    ColumnInfo* pkey = qo->fkey_pkey[fkey];

    int next_table_id = -1; ColumnInfo* next_fkey = NULL, *next_pkey = NULL;
    if (join != num_join - 1) {
      next_fkey = qo->joinGPUPipelineCol[sg][join+1];
      next_pkey = qo->fkey_pkey[next_fkey];
      next_table_id = qo->fkey_pkey[next_fkey]->table_id;
    }

    // if current join needs shuffling
    if (join == 0 && !params->ht_replicated[table_id]) {

      SETUP_TIMING();
      float time;
      cudaEventRecord(start, 0);

      assert(NUM_GPU == NUM_PARTITION);
      
      // if this is the first join then we will call rangepartitioningkeyvalue
      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        //do shuffling in each GPU

        //it's possible there is filter before shuffling
        fargs[gpu] = new filterArgsGPU{
          filter_idx[gpu][0], filter_idx[gpu][1], 
          _compare1[0], _compare2[0], _compare1[1], _compare2[1], 
          1, 1,
          NULL, NULL
        };

        int** d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
        int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
        int** d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
        int** d_local_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);

        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
        CubDebugExit(cudaMemcpyAsync(d_col_idx, used_col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); 
        assert(off_col[gpu] != NULL);
        if (off_col[gpu][0] != NULL) {
          CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
        } else {
          d_in_off=NULL;
        }
        CubDebugExit(cudaMemcpyAsync(d_local_off, join_result_off[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
        CubDebugExit(cudaSetDevice(0));

        if (verbose) cout << "Partitioning with column id " << fkey->column_id << endl;
        sargs[gpu] = new shuffleArgsGPU{
            d_temp_col, NULL, d_col_idx, d_in_off, NULL, d_local_off, 
            fkey->column_id, fkey->table->table_id, params->min_key[pkey], params->max_key[pkey],
            NULL, NULL,
            d_seg_is_replicated[gpu], d_broadcast_idx[gpu]
        };
        
        kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total);
        // if (join == 0 && num_select > 0) { //there is filter before this join
        //   assert(0); //not supported for now
        // } else {
          kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, Nope, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
        // }
        
        // kernellaunch[gpu]->countPartitioning(qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, first_shuffle);
        bool pipeline = 0;
        kernellaunch[gpu]->preparePartitioningWithoutCount(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, first_shuffle, pipeline);
      }

      // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      //   kernellaunch[gpu]->synchronizePartitioning();
      // }

      // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      //   kernellaunch[gpu]->preparePartitioningAfterCount(off_col, used_col_idx, qo->joinGPUcheck, first_shuffle);
      // }

      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        kernellaunch[gpu]->launchPartitioning();
      }

      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        kernellaunch[gpu]->clearPartitioning();
        output_selectivity = 1.0;
        delete fargs[gpu];
        delete sargs[gpu];
        delete kparams[gpu];
        delete kernellaunch[gpu];
      }

      // cudaEventRecord(stop, 0);
      // cudaEventSynchronize(stop);
      // cudaEventElapsedTime(&time, start, stop);

      // if (verbose) cout << "Partitioning time " << time << endl;

      // cudaEventRecord(start, 0);

      if (NUM_GPU == 8) shuffleDataNCCL(off_col, h_total, shelper, first_shuffle, sg, stream);
      else shuffleDataOpt(off_col, h_total, shelper, first_shuffle, sg, stream);
      
      cudaEventRecord(stop, 0);
      cudaEventSynchronize(stop);
      cudaEventElapsedTime(&time, start, stop);
      first_shuffle = false;
      has_shuffled = true;
      if (verbose) cout << "Shuffle time " << time << endl;
      shuffle_time[sg] += time;

    }

    //if this is the last join
    if (next_table_id == -1 || (join + 1) == num_join) { //this is the last join

      SETUP_TIMING();
      float time;
      cudaEventRecord(start, 0);

      //do join+groupby (either pipelined or individual)
      pipeline_column.push_back(make_tuple(fkey, pkey));

      int *ht[NUM_GPU][MAX_JOIN] = {}, *fkey_idx[NUM_GPU][MAX_JOIN] = {}; //initialize it to null
      int *key_column[NUM_GPU][MAX_JOIN] = {}, *group_column[NUM_GPU][MAX_GROUPBY] = {}, *aggr_column[NUM_GPU][MAX_AGGR] = {};

      for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) {
            ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]);
            ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]);
            assert(fkey_col_id[pkey_pipeline->table_id - 1] == -1);
            fkey_col_id[pkey_pipeline->table_id - 1] = fkey_pipeline->column_id;

            for (int gpu = 0; gpu < NUM_GPU; gpu++) {
                assert(col_idx[gpu][fkey_pipeline->column_id] != NULL);
                assert(params->ht_GPU[gpu][pkey_pipeline] != NULL);
                fkey_idx[gpu][pkey_pipeline->table_id -1] = col_idx[gpu][fkey_pipeline->column_id];
                ht[gpu][pkey_pipeline->table_id - 1] = params->ht_GPU[gpu][pkey_pipeline];
                if (has_shuffled) {
                  key_column[gpu][pkey_pipeline->table_id - 1] = temp_col[gpu][fkey_pipeline->column_id];
                  assert(temp_col[gpu][fkey_pipeline->column_id] != NULL);
                }
            }
            // cout << fkey_pipeline->column_name << " " << params->selectivity[fkey_pipeline]<< endl;
            output_selectivity *= params->selectivity[fkey_pipeline];
            // cout << output_selectivity << endl;
      }

      if (has_shuffled) {
        if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qo->groupGPUcheck && aggrGPUcheck)) {
        //have to fix cm->lo_orderdate in case we use synthetic bench
          if (qo->groupGPUcheck && aggrGPUcheck) {
              for (int aggr = 0; aggr < qo->aggregation[cm->lo_orderdate].size(); aggr++) {
                ColumnInfo* column = qo->aggregation[cm->lo_orderdate][aggr];
                for (int gpu = 0; gpu < NUM_GPU; gpu++) {
                  assert(temp_col[gpu][column->column_id] != NULL);
                  aggr_column[gpu][aggr] = temp_col[gpu][column->column_id];
                }
              }

              for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) {
                if (it->second.size() > 0) {
                  ColumnInfo* column = it->second[0];
                  ColumnInfo* column_key = it->first;
                  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
                    group_column[gpu][column_key->table_id - 1] = temp_col[gpu][column->column_id];
                  }
                }
              }
          }
        }
      }

      for (int gpu = 0; gpu < NUM_GPU; gpu++) {

            int**d_in_off = NULL;
            int *d_group_col_id = NULL, *d_aggr_col_id = NULL, *d_fkey_col_id = NULL;
            int*** d_all_col_idx = NULL, **d_seg_row_to_gpu = NULL;

            if (has_shuffled) {

              d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
              CubDebugExit(cudaSetDevice(gpu));
              assert(off_col[gpu] != NULL);
              if (off_col[gpu][0] != NULL) {
                CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
              } else {
                d_in_off=NULL;
              }
              CubDebugExit(cudaSetDevice(0));
              
              d_group_col_id = (int*) cm->customCudaMalloc<int>(MAX_GROUPBY, gpu);
              d_aggr_col_id = (int*) cm->customCudaMalloc<int>(MAX_AGGR, gpu);
              d_fkey_col_id = (int*) cm->customCudaMalloc<int>(MAX_JOIN, gpu);
              CubDebugExit(cudaSetDevice(gpu));
              CubDebugExit(cudaMemcpyAsync(d_group_col_id, group_col_id, MAX_GROUPBY * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
              CubDebugExit(cudaMemcpyAsync(d_aggr_col_id, aggr_col_id, MAX_AGGR * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
              CubDebugExit(cudaMemcpyAsync(d_fkey_col_id, fkey_col_id, MAX_JOIN * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
              CubDebugExit(cudaSetDevice(0));

              d_all_col_idx = (int***) cm->customCudaMalloc<int**>(NUM_GPU, gpu);
              int*** tmp_all_col_idx = new int**[NUM_GPU];
              for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
                tmp_all_col_idx[gpu_iter] = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
                CubDebugExit(cudaSetDevice(gpu));
                CubDebugExit(cudaMemcpyAsync(tmp_all_col_idx[gpu_iter], used_all_col_idx[gpu][gpu_iter], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); 
                CubDebugExit(cudaSetDevice(0));
              }
              CubDebugExit(cudaSetDevice(gpu));
              CubDebugExit(cudaMemcpyAsync(d_all_col_idx, tmp_all_col_idx, NUM_GPU * sizeof(int**), cudaMemcpyHostToDevice, stream[gpu])); 
              CubDebugExit(cudaSetDevice(0));

              d_seg_row_to_gpu = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
              CubDebugExit(cudaSetDevice(gpu));
              CubDebugExit(cudaMemcpyAsync(d_seg_row_to_gpu, seg_row_to_gpu[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); 
              CubDebugExit(cudaSetDevice(0));
            }

            int** d_key_column = (int**) cm->customCudaMalloc<int*>(MAX_JOIN, gpu);
            int** d_group_column = (int**) cm->customCudaMalloc<int*>(MAX_GROUPBY, gpu);
            int** d_aggr_column = (int**) cm->customCudaMalloc<int*>(MAX_AGGR, gpu);

            CubDebugExit(cudaSetDevice(gpu));
            CubDebugExit(cudaMemcpyAsync(d_key_column, key_column[gpu], MAX_JOIN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
            CubDebugExit(cudaMemcpyAsync(d_group_column, group_column[gpu], MAX_GROUPBY * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); 
            CubDebugExit(cudaMemcpyAsync(d_aggr_column, aggr_column[gpu], MAX_AGGR * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
            CubDebugExit(cudaSetDevice(0));

            pargs[gpu] = new probeArgsGPU{ //initialize pargs for this gpu
                  fkey_idx[gpu][0], fkey_idx[gpu][1], fkey_idx[gpu][2], fkey_idx[gpu][3],
                  ht[gpu][0], ht[gpu][1], ht[gpu][2], ht[gpu][3], 
                  _dim_len[gpu][0], _dim_len[gpu][1], _dim_len[gpu][2], _dim_len[gpu][3],
                  _min_key[gpu][0], _min_key[gpu][1], _min_key[gpu][2], _min_key[gpu][3],
                  d_key_column, d_fkey_col_id
                };

            gargs[gpu] = new groupbyArgsGPU{
                  aggr_idx[gpu][0], aggr_idx[gpu][1], 
                  group_idx[gpu][0], group_idx[gpu][1], group_idx[gpu][2], group_idx[gpu][3],
                  _min_val[0], _min_val[1], _min_val[2], _min_val[3],
                  _unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3],
                  params->total_val, params->mode_group, 
                  d_group_column, d_aggr_column, d_group_col_id, d_aggr_col_id,
                  params->d_group_func
                };

            fargs[gpu] = new filterArgsGPU{
                  filter_idx[gpu][0], filter_idx[gpu][1], 
                  _compare1[0], _compare2[0], _compare1[1], _compare2[1], 
                  1, 1,
                  NULL, NULL
                };

            sargs[gpu] = new shuffleArgsGPU{
              NULL, NULL, NULL, d_in_off, NULL, NULL, 
              -1, -1, -1, -1, 
              d_all_col_idx, d_seg_row_to_gpu,
              d_seg_is_replicated[gpu], d_broadcast_idx[gpu],
            };

            kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total);
            // if (join == 0 && num_select > 0) { //there is filter before this join (meaning that the last join is the first join and it does not require shuffling)
            //   assert(0); //not supported for now
            // } else {
              // cout << "groupgpucheck " << qo->groupGPUcheck << endl;
              if (qo->groupby_build.size() > 0 && qo->groupGPUcheck && aggrGPUcheck) kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, ProbeGroupby, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
              else if (qo->aggregation[cm->lo_orderdate].size() > 0 && qo->groupGPUcheck && aggrGPUcheck) kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, ProbeAggr, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
              else kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, JustProbe, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
            // }
            int will_shuffle = 0;
            // cout << " Here " << endl;
            kernellaunch[gpu]->prepareKernelFact(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, 
              fkey_col_id, group_col_id, will_shuffle, has_shuffled);
            if (has_shuffled) {
              kernellaunch[gpu]->launchKernel(has_shuffled);
            } else {
              kernellaunch[gpu]->launchKernel(has_shuffled);
            }
      }

      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        int will_shuffle = 0;
        kernellaunch[gpu]->clearKernel(off_col, will_shuffle, has_shuffled);
        output_selectivity = 1.0;
        delete pargs[gpu];
        delete fargs[gpu];
        delete gargs[gpu];
        delete sargs[gpu];
        delete kparams[gpu];
        delete kernellaunch[gpu];
      }

      pipeline_column.clear();

      cudaEventRecord(stop, 0);
      cudaEventSynchronize(stop);
      cudaEventElapsedTime(&time, start, stop);
      if (verbose) cout << "Last Probe time " << time << endl;
      gpu_time[sg] += time;

    // if this is not the last join but the next join need shuffling (current join cannot be pipelined with next join)
    } else if (!params->ht_replicated[next_table_id]) {

      SETUP_TIMING();
      float time;
      cudaEventRecord(start, 0);

      //do join (either pipelined or individual)
      pipeline_column.push_back(make_tuple(fkey, pkey));

      int *ht[NUM_GPU][4] = {}, *fkey_idx[NUM_GPU][4] = {}; //initialize it to null
      int *key_column[NUM_GPU][MAX_JOIN] = {};

      for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) {
            ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]);
            ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]);
            assert(fkey_col_id[pkey_pipeline->table_id - 1] == -1);
            fkey_col_id[pkey_pipeline->table_id - 1] = fkey_pipeline->column_id;

            for (int gpu = 0; gpu < NUM_GPU; gpu++) {
                assert(col_idx[gpu][fkey_pipeline->column_id] != NULL);
                assert(params->ht_GPU[gpu][pkey_pipeline] != NULL);
                fkey_idx[gpu][pkey_pipeline->table_id -1] = col_idx[gpu][fkey_pipeline->column_id];
                ht[gpu][pkey_pipeline->table_id - 1] = params->ht_GPU[gpu][pkey_pipeline];
                if (has_shuffled) {
                  assert(temp_col[gpu][fkey_pipeline->column_id] != NULL);
                  key_column[gpu][pkey_pipeline->table_id - 1] = temp_col[gpu][fkey_pipeline->column_id];
                }
            }
            output_selectivity *= params->selectivity[fkey_pipeline];

      }

      for (int gpu = 0; gpu < NUM_GPU; gpu++) {

            int** d_temp_col = NULL, **d_in_off = NULL;

            if (has_shuffled) {
              d_temp_col = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
              
              d_in_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);

              CubDebugExit(cudaSetDevice(gpu));
              CubDebugExit(cudaMemcpyAsync(d_temp_col, temp_col[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
              assert(off_col[gpu] != NULL);
              if (off_col[gpu][0] != NULL) {
                CubDebugExit(cudaMemcpyAsync(d_in_off, off_col[gpu], NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
              } else {
                d_in_off=NULL;
              }
              CubDebugExit(cudaSetDevice(0));
            }

            int** d_col_idx = (int**) cm->customCudaMalloc<int*>(NUM_COLUMN, gpu);
            int** d_key_column = (int**) cm->customCudaMalloc<int*>(MAX_JOIN, gpu);
            int* d_fkey_col_id = (int*) cm->customCudaMalloc<int>(MAX_JOIN, gpu);
            int* d_group_col_id = (int*) cm->customCudaMalloc<int>(MAX_GROUPBY, gpu);
            int* d_aggr_col_id = (int*) cm->customCudaMalloc<int>(MAX_AGGR, gpu);
            CubDebugExit(cudaSetDevice(gpu));
            CubDebugExit(cudaMemcpyAsync(d_key_column, key_column[gpu], MAX_JOIN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu]));
            CubDebugExit(cudaMemcpyAsync(d_col_idx, used_col_idx[gpu], NUM_COLUMN * sizeof(int*), cudaMemcpyHostToDevice, stream[gpu])); 
            CubDebugExit(cudaMemcpyAsync(d_fkey_col_id, fkey_col_id, MAX_JOIN * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
            CubDebugExit(cudaMemcpyAsync(d_group_col_id, group_col_id, MAX_GROUPBY * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
            CubDebugExit(cudaMemcpyAsync(d_aggr_col_id, aggr_col_id, MAX_AGGR * sizeof(int), cudaMemcpyHostToDevice, stream[gpu]));
            CubDebugExit(cudaSetDevice(0));

            fargs[gpu] = new filterArgsGPU{
                  filter_idx[gpu][0], filter_idx[gpu][1], 
                  _compare1[0], _compare2[0], _compare1[1], _compare2[1], 
                  1, 1,
                  NULL, NULL
                };

            pargs[gpu] = new probeArgsGPU{ //initialize pargs for this gpu
                  fkey_idx[gpu][0], fkey_idx[gpu][1], fkey_idx[gpu][2], fkey_idx[gpu][3],
                  ht[gpu][0], ht[gpu][1], ht[gpu][2], ht[gpu][3], 
                  _dim_len[gpu][0], _dim_len[gpu][1], _dim_len[gpu][2], _dim_len[gpu][3],
                  _min_key[gpu][0], _min_key[gpu][1], _min_key[gpu][2], _min_key[gpu][3],
                  d_key_column, d_fkey_col_id
                };

            gargs[gpu] = new groupbyArgsGPU{
                  NULL, NULL, 
                  NULL, NULL, NULL, NULL,
                  _min_val[0], _min_val[1], _min_val[2], _min_val[3],
                  _unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3],
                  params->total_val, params->mode_group, 
                  NULL, NULL, d_group_col_id, d_aggr_col_id,
                  NULL
                };
                
            sargs[gpu] = new shuffleArgsGPU{
                d_temp_col, NULL, d_col_idx, d_in_off, NULL, NULL, 
                next_fkey->column_id, next_fkey->table->table_id, params->min_key[next_pkey], params->max_key[next_pkey],
                NULL, NULL,
                d_seg_is_replicated[gpu], d_broadcast_idx[gpu]
            };

            kparams[gpu] = new KernelParams(fargs[gpu], pargs[gpu], bargs[gpu], gargs[gpu], sargs[gpu], shelper, d_total, h_total);
            kernellaunch[gpu] = new KernelLaunch(cm, kparams[gpu], params, sg, gpu, ProbePartition, fkey->table->table_id, output_selectivity, latemat, aggrGPUcheck, stream[gpu]);
            int will_shuffle = 1;
            // for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL);
            kernellaunch[gpu]->prepareKernelFact(off_col, used_col_idx, qo->segment_group_each_gpu_count, qo->segment_group_each_gpu, qo->last_segment_gpu, qo->joinGPUcheck, 
              fkey_col_id, group_col_id, will_shuffle, has_shuffled);
            // for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL);
            kernellaunch[gpu]->launchKernel(has_shuffled);
      }

      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        int will_shuffle = 1;
        // for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL);
        kernellaunch[gpu]->clearKernel(off_col, will_shuffle, has_shuffled);
        // for (int table = 0; table < NUM_TABLE; table++) assert(off_col[gpu][table] == NULL);
        output_selectivity = 1.0;
        delete fargs[gpu];
        delete sargs[gpu];
        delete pargs[gpu];
        delete kparams[gpu];
        delete kernellaunch[gpu];
      }

      cudaEventRecord(stop, 0);
      cudaEventSynchronize(stop);
      cudaEventElapsedTime(&time, start, stop);
      if (verbose) cout << "Probe Partition time " << time << endl;
      gpu_time[sg] += time;

      // assert(0);

      cudaEventRecord(start, 0);

      //WARNING: WE NEED TO ELIMINATE ALL COLUMNS WHICH HAVE BEEN USED FOR THIS JOIN
      if (latemat == 2) {
        for (int pipeline = 0; pipeline < pipeline_column.size(); pipeline++) {
              ColumnInfo* fkey_pipeline = get<0>(pipeline_column[pipeline]);
              ColumnInfo* pkey_pipeline = get<1>(pipeline_column[pipeline]);
              assert(fkey_col_id[pkey_pipeline->table_id - 1] > 0);
              for (int gpu = 0; gpu < NUM_GPU; gpu++) {
                  assert(used_col_idx[gpu][fkey_pipeline->column_id] != NULL);
                  assert(out_shuffle_col[gpu][fkey_pipeline->column_id] != NULL);
                  used_col_idx[gpu][fkey_pipeline->column_id] = NULL;
                  out_shuffle_col[gpu][fkey_pipeline->column_id] = NULL;
                  if (has_shuffled) {
                    assert(temp_col[gpu][fkey_pipeline->column_id] != NULL);
                    temp_col[gpu][fkey_pipeline->column_id] = NULL;
                  }
              }
              fkey_col_id[pkey_pipeline->table_id - 1] = fkey_col_id[pkey_pipeline->table_id - 1] * -1 * 2;
        }
      }

      // if this is not the first join then we will just shuffle right away (we already partition it after probe)
      if (NUM_GPU == 8) shuffleDataNCCL(off_col, h_total, shelper, first_shuffle, sg, stream);
      else shuffleDataOpt(off_col, h_total, shelper, first_shuffle, sg, stream);
      
      first_shuffle = false;
      has_shuffled = true;

      cudaEventRecord(stop, 0);
      cudaEventSynchronize(stop);
      cudaEventElapsedTime(&time, start, stop);
      if (verbose) cout << "Shuffle time " << time << endl;
      shuffle_time[sg] += time;

      pipeline_column.clear();
      
    // if next join does not need shuffling (current join can be pipelined with next join)
    } else {
      pipeline_column.push_back({fkey, pkey});
    }

    delete[] fargs;
    delete[] bargs;
    delete[] pargs;
    delete[] gargs;
    delete[] sargs;
    delete[] kparams;
    delete[] kernellaunch;
  }

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      delete[] temp_col[gpu];
      delete[] result_count[gpu];
      delete[] in_shuffle_col[gpu];
      delete[] out_shuffle_col[gpu];
      delete[] in_shuffle_off[gpu];
      delete[] out_shuffle_off[gpu];
      delete[] used_col_idx[gpu];
      delete[] used_broadcast_idx[gpu];
      for (int gpu_iter = 0; gpu_iter < NUM_GPU; gpu_iter++) {
        delete[] used_all_col_idx[gpu][gpu_iter];
      }
      delete[] used_all_col_idx[gpu];
    }
    delete[] temp_col;
    delete[] result_count;
    delete[] in_shuffle_col;
    delete[] in_shuffle_off;
    delete[] out_shuffle_col;
    delete[] out_shuffle_off;
    delete[] d_seg_is_replicated;
    delete[] used_col_idx;
    delete[] used_broadcast_idx;
    delete[] used_all_col_idx;
    delete[] d_broadcast_idx;
}

void
CPUGPUProcessing::resetTime() {
  for (int sg = 0 ; sg < MAX_GROUPS; sg++) {
    cpu_time[sg] = 0;
    gpu_time[sg] = 0;

    cpu_to_gpu[sg] = 0;
    gpu_to_cpu[sg] = 0;

    nvlink_bytes[sg] = 0;
    shuffle_time[sg] = 0;
  }

  gpu_total = 0;
  cpu_total = 0;

  cpu_to_gpu_total = 0;
  gpu_to_cpu_total = 0;

  execution_total = 0;
  optimization_total = 0;
  merging_total = 0;

	preparing_total = 0;
	nvlink_total = 0;
	shuffle_total = 0;
}


void 
CPUGPUProcessing::call_pfilter_probe_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) {
  int **off_col_out;
  int _min_key[4] = {0}, _dim_len[4] = {0};
  int *ht[4] = {}, *fkey_col[4] = {};
  int out_total = 0;
  ColumnInfo *filter_col[2] = {};
  int _compare1[2] = {0}, _compare2[2] = {0};
  float output_selectivity = 1.0;
  int output_estimate = 0;

  if(qo->joinCPUPipelineCol[sg].size() == 0) return;

  off_col_out = new int*[cm->TOT_TABLE] (); //initialize to null

  for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) {
    if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break;
    ColumnInfo* column = qo->selectCPUPipelineCol[sg][i];
    filter_col[select_so_far + i] = column;
    _compare1[select_so_far + i] = params->compare1[column];
    _compare2[select_so_far + i] = params->compare2[column];
    output_selectivity *= params->selectivity[column];
  }

  for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) {
    ColumnInfo* column = qo->joinCPUPipelineCol[sg][i];
    int table_id = qo->fkey_pkey[column]->table_id;
    fkey_col[table_id - 1] = column->col_ptr;
    ColumnInfo* pkey = qo->fkey_pkey[column];
    ht[table_id - 1] = params->ht_CPU[pkey];
    _min_key[table_id - 1] = params->min_key[pkey];
    _dim_len[table_id - 1] = params->dim_len[pkey];
    output_selectivity *= params->selectivity[column];
  }

  struct filterArgsCPU fargs = {
    (filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL), 
    (filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL),
    _compare1[0], _compare2[0], _compare1[1], _compare2[1],
    1, 1, 
    (filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL), 
    (filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL)
  };

  struct probeArgsCPU pargs = {
    fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3],
    ht[0], ht[1], ht[2], ht[3], 
    _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
    _min_key[0], _min_key[1], _min_key[2], _min_key[3]
  };

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  if (h_off_col == NULL) {
    output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity;
    for (int i = 0; i < cm->TOT_TABLE; i++) {
      if (i == 0 || qo->joinCPUcheck[i]) {
        if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault));
        if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
      }
    }
  } else {
    assert(*h_total > 0);
    output_estimate = *h_total * output_selectivity;
    for (int i = 0; i < cm->TOT_TABLE; i++) {
      if (h_off_col[i] != NULL || i == 0 || qo->joinCPUcheck[i]) {
        if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault));
        if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
      }
    }
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  cudaEventRecord(start, 0);

  if (h_off_col == NULL) {

    struct offsetCPU out_off = {
      off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4]
    };

    int LEN;
    if (sg == qo->last_segment[0]) {
      LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
    } else { 
      LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
    }

    short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);

    filter_probe_CPU(
      fargs, pargs, out_off, LEN, &out_total, 0, segment_group_ptr);

  } else {

    assert(*h_total > 0);

    struct offsetCPU in_off = {
      h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4]
    };

    struct offsetCPU out_off = {
      off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4]
    };

    filter_probe_CPU2(
      in_off, fargs, pargs, out_off, *h_total, &out_total, 0);

    if (!custom) {
      for (int i = 0; i < cm->TOT_TABLE; i++) {
        if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]);
      }
    }

  }

  h_off_col = off_col_out;

  for (int i = 0; i < cm->TOT_TABLE; i++)
    h_off_col[i] = off_col_out[i];

  *h_total = out_total;

  if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg << endl;
  assert(*h_total <= output_estimate);
  // assert(*h_total > 0);

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  if (verbose) cout << "Filter Probe Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;

};

void
CPUGPUProcessing::call_probe_group_by_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) {

  if (*h_total == 0) return;

  int _min_key[4] = {0}, _dim_len[4] = {0};
  int *ht[4] = {}, *fkey_col[4] = {};
  int _min_val[4] = {0}, _unique_val[4] = {0};
  int *aggr_col[2] = {}, *group_col[4] = {};

  for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) {
    ColumnInfo* column = qo->joinCPUPipelineCol[sg][i];
    int table_id = qo->fkey_pkey[column]->table_id;
    fkey_col[table_id - 1] = column->col_ptr;
    ColumnInfo* pkey = qo->fkey_pkey[column];
    ht[table_id - 1] = params->ht_CPU[pkey];
    _min_key[table_id - 1] = params->min_key[pkey];
    _dim_len[table_id - 1] = params->dim_len[pkey];
  }

  for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
    ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
    aggr_col[i] = column->col_ptr;
  }

  unordered_map<ColumnInfo*, vector<ColumnInfo*>>::iterator it;
  for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) {
    if (it->second.size() > 0) {
      ColumnInfo* column = it->second[0];
      ColumnInfo* column_key = it->first;
      group_col[column_key->table_id - 1] = column->col_ptr;
      _min_val[column_key->table_id - 1] = params->min_val[column_key];
      _unique_val[column_key->table_id - 1] = params->unique_val[column_key];
    }
  }

  struct probeArgsCPU pargs = {
    fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3],
    ht[0], ht[1], ht[2], ht[3], 
    _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
    _min_key[0], _min_key[1], _min_key[2], _min_key[3]
  };

  struct groupbyArgsCPU gargs = {
    aggr_col[0], aggr_col[1], group_col[0], group_col[1], group_col[2], group_col[3],
    _min_val[0], _min_val[1], _min_val[2], _min_val[3],
    _unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3],
    params->total_val, params->mode_group, params->h_group_func
  };

  float time;
  SETUP_TIMING();
  cudaEventRecord(start, 0);

  if (h_off_col == NULL) {

    int LEN;
    if (sg == qo->last_segment[0]) {
      LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
    } else { 
      LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
    }

    short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);

    // ColumnInfo* key_column = cm->s_suppkey;
    // int count = 0;
    // for (int i = 0; i < cm->s_suppkey->LEN; i+=2) {
    //   if (ht[0][i+1] != 0) {
    //     // cout << params->ht_CPU[key_column][i] << " " << params->ht_CPU[key_column][i+1] << endl;
    //     count++;
    //   }
    // }
    // cout << cm->s_suppkey->LEN << " " << count << endl;
    // cout << endl;

    // key_column = cm->c_custkey;
    // count = 0;
    // for (int i = 0; i < cm->c_custkey->LEN; i+=2) {
    //   if (ht[1][i+1] != 0) {
    //     // cout << params->ht_CPU[key_column][i] << " " << params->ht_CPU[key_column][i+1] << endl;
    //     count++;
    //   }
    // }
    // cout << cm->c_custkey->LEN << " " << count << endl;
    // cout << endl;

    probe_group_by_CPU(pargs, gargs, LEN , params->res, 0, segment_group_ptr);
  } else {

    struct offsetCPU offset = {
      h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4]
    };

    probe_group_by_CPU2(offset, pargs, gargs, *h_total, params->res, 0);

    if (!custom) {
      for (int i = 0; i < cm->TOT_TABLE; i++) {
        if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]);
      }
    }
  }

  cudaEventRecord(stop, 0);                  // Stop time measuring
  cudaEventSynchronize(stop);               // Wait until the completion of all device 
                                            // work preceding the most recent call to cudaEventRecord()
  cudaEventElapsedTime(&time, start, stop); // Saving the time measured

  if (verbose) cout << "Probe Group Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;

};

void 
CPUGPUProcessing::call_probe_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) {
  int **off_col_out;
  int _min_key[4] = {0}, _dim_len[4] = {0};
  int *ht[4] = {}, *fkey_col[4] = {};
  int out_total = 0;
  float output_selectivity = 1.0;
  int output_estimate = 0;

  if(qo->joinCPUPipelineCol[sg].size() == 0) return;

  off_col_out = new int*[cm->TOT_TABLE] (); //initialize to null

  for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) {
    ColumnInfo* column = qo->joinCPUPipelineCol[sg][i];
    int table_id = qo->fkey_pkey[column]->table_id;
    fkey_col[table_id - 1] = column->col_ptr;
    ColumnInfo* pkey = qo->fkey_pkey[column];
    ht[table_id - 1] = params->ht_CPU[pkey];
    _min_key[table_id - 1] = params->min_key[pkey];
    _dim_len[table_id - 1] = params->dim_len[pkey];
    output_selectivity *= params->selectivity[column];
  }

  struct probeArgsCPU pargs = {
    fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3],
    ht[0], ht[1], ht[2], ht[3], 
    _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
    _min_key[0], _min_key[1], _min_key[2], _min_key[3]
  };

  float time;
  SETUP_TIMING();
  cudaEventRecord(start, 0);

  if (h_off_col == NULL) {
    output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity;
    for (int i = 0; i < cm->TOT_TABLE; i++) {
      if (i == 0 || qo->joinCPUcheck[i]) {
        if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault));
        if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
      }
    }
  } else {
    assert(*h_total > 0);
    output_estimate = *h_total * output_selectivity;
    for (int i = 0; i < cm->TOT_TABLE; i++) {
      if (h_off_col[i] != NULL || i == 0 || qo->joinCPUcheck[i]) {
        if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault));
        if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
      }
    }
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  cudaEventRecord(start, 0);

  if (h_off_col == NULL) {

    // output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity;

    // for (int i = 0; i < cm->TOT_TABLE; i++) {
    //   if (i == 0 || qo->joinCPUcheck[i]) {
    //     if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
    //   }
    // }

    struct offsetCPU out_off = {
      off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4]
    };

    int LEN;
    if (sg == qo->last_segment[0]) {
      LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
    } else { 
      LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
    }

    short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);

    probe_CPU(pargs, out_off, LEN, &out_total, 0, segment_group_ptr);

  } else {

    assert(*h_total > 0);

    // output_estimate = *h_total * output_selectivity;

    // for (int i = 0; i < cm->TOT_TABLE; i++) {
    //   if (h_off_col[i] != NULL || i == 0 || qo->joinCPUcheck[i]) {
    //     if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
    //   }
    // }

    struct offsetCPU in_off = {
      h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4]
    };

    struct offsetCPU out_off = {
      off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4]
    };

    probe_CPU2(in_off, pargs, out_off, *h_total, &out_total, 0);

    if (!custom) {
      for (int i = 0; i < cm->TOT_TABLE; i++) {
        if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]);
      }
    }

  }

  h_off_col = off_col_out;

  for (int i = 0; i < cm->TOT_TABLE; i++)
    h_off_col[i] = off_col_out[i];

  *h_total = out_total;

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg  << endl;
  assert(*h_total <= output_estimate);
  // assert(*h_total > 0);

  if (verbose) cout << "Probe Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;
};


//WONT WORK IF JOIN HAPPEN BEFORE FILTER (ONLY WRITE OUTPUT AS A SINGLE COLUMN OFF_COL_OUT[0])
void
CPUGPUProcessing::call_pfilter_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) {
  int **off_col_out;
  ColumnInfo *filter_col[2] = {};
  int out_total = 0;
  int _compare1[2] = {0}, _compare2[2] = {0}, _mode[2] = {0};
  float output_selectivity = 1.0;
  int output_estimate = 0;

  if (qo->selectCPUPipelineCol[sg].size() == 0) return;

  off_col_out = new int*[cm->TOT_TABLE](); //initialize to NULL

  for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) {
    if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break;
    ColumnInfo* column = qo->selectCPUPipelineCol[sg][i];
    filter_col[select_so_far + i] = column;
    _compare1[select_so_far + i] = params->compare1[column];
    _compare2[select_so_far + i] = params->compare2[column];
    _mode[select_so_far + i] = params->mode[column];
    output_selectivity *= params->selectivity[column];
  }

  struct filterArgsCPU fargs = {
    (filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL), 
    (filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL),
    _compare1[0], _compare2[0], _compare1[1], _compare2[1],
    _mode[0], _mode[1], 
    (filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL), 
    (filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL)
  };

  float time;
  SETUP_TIMING();
  cudaEventRecord(start, 0);

  if (h_off_col == NULL) {
    output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity;
    if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[0], output_estimate * sizeof(int), cudaHostAllocDefault));
    if (custom) off_col_out[0] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
  } else {
    // assert(filter_col[0] == NULL);
    // assert(filter_col[1] != NULL);
    assert(*h_total > 0);
    output_estimate = *h_total * output_selectivity;
    for (int i = 0; i < cm->TOT_TABLE; i++) {
      if (h_off_col[i] != NULL || i == 0) {
        if (!custom) CubDebugExit(cudaHostAlloc((void**) &off_col_out[i], output_estimate * sizeof(int), cudaHostAllocDefault));
        if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
      }
    }
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  cudaEventRecord(start, 0);

  if (h_off_col == NULL) {

    // output_estimate = SEGMENT_SIZE * qo->segment_group_count[0][sg] * output_selectivity;

    // if (custom) off_col_out[0] = (int*) cm->customCudaHostAlloc<int>(output_estimate);

    int LEN;
    if (sg == qo->last_segment[0]) {
      LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
    } else { 
      LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
    }

    short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);

    filter_CPU(fargs, off_col_out[0], LEN, &out_total, 0, segment_group_ptr);

  } else {
    // assert(filter_col[0] == NULL);
    // assert(filter_col[1] != NULL);
    assert(*h_total > 0);

    // output_estimate = *h_total * output_selectivity;

    // for (int i = 0; i < cm->TOT_TABLE; i++) {
    //   if (h_off_col[i] != NULL || i == 0) {
    //     if (custom) off_col_out[i] = (int*) cm->customCudaHostAlloc<int>(output_estimate);
    //   }
    // }

    filter_CPU2(h_off_col[0], fargs, off_col_out[0], *h_total, &out_total, 0);

    if (!custom) {
      for (int i = 0; i < cm->TOT_TABLE; i++) {
        if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]);
      }
    }

  }

  h_off_col = off_col_out;

  for (int i = 0; i < cm->TOT_TABLE; i++)
    h_off_col[i] = off_col_out[i];

  *h_total = out_total;

  if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg  << endl;
  assert(*h_total <= output_estimate);
  assert(*h_total > 0);

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Filter Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;

}

void
CPUGPUProcessing::call_pfilter_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) {
  ColumnInfo *filter_col[2] = {};
  int _compare1[2] = {0}, _compare2[2] = {0}, _mode[2] = {0};
  int *aggr_col[2] = {};

  if (qo->aggregation[cm->lo_orderdate].size() == 0) return;

  if (qo->selectCPUPipelineCol[sg].size() == 0) return;

  for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
    ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
    aggr_col[i] = column->col_ptr;
  }

  for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) {
    if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break;
    ColumnInfo* column = qo->selectCPUPipelineCol[sg][i];
    filter_col[select_so_far + i] = column;
    _compare1[select_so_far + i] = params->compare1[column];
    _compare2[select_so_far + i] = params->compare2[column];
    _mode[select_so_far + i] = params->mode[column];
  }

  struct groupbyArgsCPU gargs = {
    aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL,
    0, 0, 0, 0,
    0, 0, 0, 0,
    0, params->mode_group, params->h_group_func
  };

  struct filterArgsCPU fargs = {
    (filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL), 
    (filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL),
    _compare1[0], _compare2[0], _compare1[1], _compare2[1],
    _mode[0], _mode[1], 
    (filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL), 
    (filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL)
  };

  float time;
  SETUP_TIMING();
  cudaEventRecord(start, 0);

  if (h_off_col == NULL) {
    assert(0);
  } else {
    // assert(*h_total > 0);
    if (*h_total == 0) return;
    filter_aggr_CPU2(h_off_col[0], fargs, gargs, params->res, *h_total);
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Filter Aggregation Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;
};

void 
CPUGPUProcessing::call_bfilter_build_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table) {

  ColumnInfo* column, *filter_col;
  int* group_ptr = NULL, *filter_ptr = NULL;

  for (int i = 0; i < qo->join.size(); i++) {
    if (qo->join[i].second->table_id == table) {
      column = qo->join[i].second; break;
    }
  }

  if (qo->groupby_build.size() > 0 && qo->groupby_build[column].size() > 0) {
    group_ptr = qo->groupby_build[column][0]->col_ptr;
  }

  if (qo->select_build[column].size() > 0) {
    filter_col = qo->select_build[column][0];
    filter_ptr = filter_col->col_ptr;
  }

  struct filterArgsCPU fargs = {
    filter_ptr, NULL,
    params->compare1[filter_col], params->compare2[filter_col], 0, 0,
    params->mode[filter_col], 0, 
    (filter_col != NULL) ? (params->map_filter_func_host[filter_col]) : (NULL), NULL
  };

  struct buildArgsCPU bargs = {
    column->col_ptr, group_ptr,
    params->dim_len[column], params->min_key[column]
  };

  if (params->ht_CPU[column] != NULL) {

    SETUP_TIMING();
    float time;
    cudaEventRecord(start, 0);

    if (h_off_col == NULL) {

      int LEN;
      if (sg == qo->last_segment[table]) {
        LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE;
      } else { 
        LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE;
      }

      short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment);

      build_CPU(fargs, bargs, LEN, params->ht_CPU[column], 0, segment_group_ptr);

    } else {

      build_CPU2(h_off_col, fargs, bargs, *h_total, params->ht_CPU[column], 0);

      if (!custom) cudaFreeHost(h_off_col);

    }

    cudaEventRecord(stop, 0);
    cudaEventSynchronize(stop);
    cudaEventElapsedTime(&time, start, stop);

    if (verbose) cout << "Filter Build Kernel time CPU: " << time << endl;
    cpu_time[sg] += time;

  }
};

void 
CPUGPUProcessing::call_build_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table) {

  ColumnInfo* column;
  int* group_ptr = NULL;

  for (int i = 0; i < qo->join.size(); i++) {
    if (qo->join[i].second->table_id == table) {
      column = qo->join[i].second; break;
    }
  }

  if (qo->groupby_build.size() > 0 && qo->groupby_build[column].size() > 0) {
    group_ptr = qo->groupby_build[column][0]->col_ptr;
  }

  struct filterArgsCPU fargs = {NULL, NULL, 0, 0, 0, 0, 0, 0, NULL, NULL};

  struct buildArgsCPU bargs = {
    column->col_ptr, group_ptr,
    params->dim_len[column], params->min_key[column]
  };

  if (params->ht_CPU[column] != NULL) {

    SETUP_TIMING();
    float time;
    cudaEventRecord(start, 0);

    if (h_off_col == NULL) {

      int LEN;
      if (sg == qo->last_segment[table]) {
        LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE;
      } else { 
        LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE;
      }

      short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment);

      build_CPU(fargs, bargs, LEN, params->ht_CPU[column], 0, segment_group_ptr);

    } else {

      build_CPU2(h_off_col, fargs, bargs, *h_total, params->ht_CPU[column], 0);

      if (!custom) cudaFreeHost(h_off_col);

    }

    cudaEventRecord(stop, 0);
    cudaEventSynchronize(stop);
    cudaEventElapsedTime(&time, start, stop);

    if (verbose) cout << "Build Kernel time CPU: " << time << endl;
    cpu_time[sg] += time;
  }
};


void
CPUGPUProcessing::call_bfilter_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg, int table) {

  ColumnInfo* temp;

  for (int i = 0; i < qo->join.size(); i++) {
    if (qo->join[i].second->table_id == table) {
      temp = qo->join[i].second; break;
    }
  }

  // assert(qo->select_build[temp].size() > 0);
  if (qo->select_build[temp].size() == 0) return;

  ColumnInfo* column = qo->select_build[temp][0];
  int* filter_col = column->col_ptr;

  int output_estimate = qo->segment_group_count[table][sg] * SEGMENT_SIZE * params->selectivity[column];

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  if (custom) h_off_col = (int*) cm->customCudaHostAlloc<int>(output_estimate);
  else CubDebugExit(cudaHostAlloc((void**) &h_off_col, output_estimate * sizeof(int), cudaHostAllocDefault));

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  cudaEventRecord(start, 0);

  int LEN;
  if (sg == qo->last_segment[table]) {
    LEN = (qo->segment_group_count[table][sg] - 1) * SEGMENT_SIZE + column->LEN % SEGMENT_SIZE;
  } else { 
    LEN = qo->segment_group_count[table][sg] * SEGMENT_SIZE;
  }

  struct filterArgsCPU fargs = {
    filter_col, NULL,
    params->compare1[column], params->compare2[column], 0, 0,
    params->mode[column], 0, params->map_filter_func_host[column], NULL
  };

  short* segment_group_ptr = qo->segment_group[table] + (sg * column->total_segment);

  filter_CPU(fargs, h_off_col, LEN, h_total, 0, segment_group_ptr);

  if (verbose) cout << "h_total: " << *h_total << " output_estimate: " << output_estimate << " sg: " << sg  << endl;
  assert(*h_total <= output_estimate);
  assert(*h_total > 0);

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Filter Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;

};

void
CPUGPUProcessing::call_group_by_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) {
  int _min_val[4] = {0}, _unique_val[4] = {0};
  int *aggr_col[2] = {}, *group_col[4] = {};

  if (qo->aggregation[cm->lo_orderdate].size() == 0) return;

  for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
    ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
    aggr_col[i] = column->col_ptr;
  }

  unordered_map<ColumnInfo*, vector<ColumnInfo*>>::iterator it;
  for (it = qo->groupby_build.begin(); it != qo->groupby_build.end(); it++) {
    if (it->second.size() > 0) {
      ColumnInfo* column = it->second[0];
      ColumnInfo* column_key = it->first;
      group_col[column_key->table_id - 1] = column->col_ptr;
      _min_val[column_key->table_id - 1] = params->min_val[column_key];
      _unique_val[column_key->table_id - 1] = params->unique_val[column_key];
    }
  }

  struct groupbyArgsCPU gargs = {
    aggr_col[0], aggr_col[1], group_col[0], group_col[1], group_col[2], group_col[3],
    _min_val[0], _min_val[1], _min_val[2], _min_val[3],
    _unique_val[0], _unique_val[1], _unique_val[2], _unique_val[3],
    params->total_val, params->mode_group, params->h_group_func
  };

  struct offsetCPU offset = {
    h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4], 
  };

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  cout << *h_total << endl;

  if (*h_total > 0) groupByCPU(offset, gargs, *h_total, params->res);

  if (!custom) {
    for (int i = 0; i < cm->TOT_TABLE; i++) {
      if (h_off_col[i] != NULL) cudaFreeHost(h_off_col[i]);
    }
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Group Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;

};

void 
CPUGPUProcessing::call_aggregation_CPU(QueryParams* params, int* &h_off_col, int* h_total, int sg) {
  int *aggr_col[2] = {};

  if (qo->aggregation[cm->lo_orderdate].size() == 0) return;

  for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
    ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
    aggr_col[i] = column->col_ptr;
  }

  struct groupbyArgsCPU gargs = {
    aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL,
    0, 0, 0, 0,
    0, 0, 0, 0,
    0, params->mode_group, params->h_group_func
  };

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  // assert(h_off_col != NULL);

  if (*h_total > 0) aggregationCPU(h_off_col, gargs, *h_total, params->res);

  if (!custom) cudaFree(h_off_col);

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Aggr Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;
};

void 
CPUGPUProcessing::call_probe_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg) {

  int _min_key[4] = {0}, _dim_len[4] = {0};
  int *ht[4] = {}, *fkey_col[4] = {};
  int *aggr_col[2] = {};

  for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) {
    ColumnInfo* column = qo->joinCPUPipelineCol[sg][i];
    int table_id = qo->fkey_pkey[column]->table_id;
    fkey_col[table_id - 1] = column->col_ptr;
    ColumnInfo* pkey = qo->fkey_pkey[column];
    ht[table_id - 1] = params->ht_CPU[pkey];
    _min_key[table_id - 1] = params->min_key[pkey];
    _dim_len[table_id - 1] = params->dim_len[pkey];
  }

  for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
    ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
    aggr_col[i] = column->col_ptr;
  }

  struct probeArgsCPU pargs = {
    fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3],
    ht[0], ht[1], ht[2], ht[3], 
    _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
    _min_key[0], _min_key[1], _min_key[2], _min_key[3]
  };

  struct groupbyArgsCPU gargs = {
    aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL,
    0, 0, 0, 0,
    0, 0, 0, 0,
    0, params->mode_group, params->h_group_func
  };

  cudaEvent_t start, stop;
  float time;
  cudaEventCreate(&start);
  cudaEventCreate(&stop);
  cudaEventRecord(start, 0);

  if (h_off_col == NULL) {

    int LEN;
    if (sg == qo->last_segment[0]) {
      LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
    } else { 
      LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
    }

    short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);

    probe_aggr_CPU(pargs, gargs, LEN, params->res, 0, segment_group_ptr);
  } else {

    struct offsetCPU offset = {
      h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4]
    };

    probe_aggr_CPU2(offset, pargs, gargs, *h_total, params->res, 0);
  }

  cudaEventRecord(stop, 0);                  // Stop time measuring
  cudaEventSynchronize(stop);               // Wait until the completion of all device 
                                            // work preceding the most recent call to cudaEventRecord()
  cudaEventElapsedTime(&time, start, stop); // Saving the time measured

  if (verbose) cout << "Probe Aggr Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;

};

void 
CPUGPUProcessing::call_pfilter_probe_aggr_CPU(QueryParams* params, int** &h_off_col, int* h_total, int sg, int select_so_far) {
  int _min_key[4] = {0}, _dim_len[4] = {0};
  int *ht[4] = {}, *fkey_col[4] = {};
  ColumnInfo* filter_col[2] = {};
  int _compare1[2] = {0}, _compare2[2] = {0};
  int *aggr_col[2] = {};

  for (int i = 0; i < qo->selectCPUPipelineCol[sg].size(); i++) {
    if (select_so_far == qo->select_probe[cm->lo_orderdate].size()) break;
    ColumnInfo* column = qo->selectCPUPipelineCol[sg][i];
    filter_col[select_so_far + i] = column;
    _compare1[select_so_far + i] = params->compare1[column];
    _compare2[select_so_far + i] = params->compare2[column];
  }

  for (int i = 0; i < qo->joinCPUPipelineCol[sg].size(); i++) {
    ColumnInfo* column = qo->joinCPUPipelineCol[sg][i];
    int table_id = qo->fkey_pkey[column]->table_id;
    fkey_col[table_id - 1] = column->col_ptr;
    ColumnInfo* pkey = qo->fkey_pkey[column];
    ht[table_id - 1] = params->ht_CPU[pkey];
    _min_key[table_id - 1] = params->min_key[pkey];
    _dim_len[table_id - 1] = params->dim_len[pkey];
  }

  for (int i = 0; i < qo->aggregation[cm->lo_orderdate].size(); i++) {
    ColumnInfo* column = qo->aggregation[cm->lo_orderdate][i];
    aggr_col[i] = column->col_ptr;
  }

  struct filterArgsCPU fargs = {
    (filter_col[0] != NULL) ? (filter_col[0]->col_ptr) : (NULL), 
    (filter_col[1] != NULL) ? (filter_col[1]->col_ptr) : (NULL),
    _compare1[0], _compare2[0], _compare1[1], _compare2[1],
    1, 1,
    (filter_col[0] != NULL) ? (params->map_filter_func_host[filter_col[0]]) : (NULL), 
    (filter_col[1] != NULL) ? (params->map_filter_func_host[filter_col[1]]) : (NULL)
  };

  struct probeArgsCPU pargs = {
    fkey_col[0], fkey_col[1], fkey_col[2], fkey_col[3],
    ht[0], ht[1], ht[2], ht[3], 
    _dim_len[0], _dim_len[1], _dim_len[2], _dim_len[3],
    _min_key[0], _min_key[1], _min_key[2], _min_key[3]
  };

  struct groupbyArgsCPU gargs = {
    aggr_col[0], aggr_col[1], NULL, NULL, NULL, NULL,
    0, 0, 0, 0,
    0, 0, 0, 0,
    0, params->mode_group, params->h_group_func
  };

  cudaEvent_t start, stop;   // variables that holds 2 events 
  float time;                // Variable that will hold the time
  cudaEventCreate(&start);   // creating the event 1
  cudaEventCreate(&stop);    // creating the event 2
  cudaEventRecord(start, 0); // start measuring  the time

  if (h_off_col == NULL) {

    int LEN;
    if (sg == qo->last_segment[0]) {
      LEN = (qo->segment_group_count[0][sg] - 1) * SEGMENT_SIZE + cm->lo_orderdate->LEN % SEGMENT_SIZE;
    } else { 
      LEN = qo->segment_group_count[0][sg] * SEGMENT_SIZE;
    }

    short* segment_group_ptr = qo->segment_group[0] + (sg * cm->lo_orderdate->total_segment);

    filter_probe_aggr_CPU(fargs, pargs, gargs, LEN, params->res, 0, segment_group_ptr);
  } else {

    struct offsetCPU offset = {
      h_off_col[0], h_off_col[1], h_off_col[2], h_off_col[3], h_off_col[4]
    };

    filter_probe_aggr_CPU2(offset, fargs, pargs, gargs, *h_total, params->res, 0);
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  if (verbose) cout << "Filter Probe Aggr Kernel time CPU: " << time << endl;
  cpu_time[sg] += time;
};