Lancelot / src / gpudb / KernelLaunch.cu
KernelLaunch.cu
Raw
#include "CacheManager.h"
#include "KernelLaunch.h"
#include "QueryOptimizer.h"

KernelParams::KernelParams(struct filterArgsGPU* _fargs, struct probeArgsGPU* _pargs, struct buildArgsGPU* _bargs,
  struct groupbyArgsGPU* _gargs, struct shuffleArgsGPU* _sargs,
  struct shuffleHelper* _shelper, int** _d_total, int** _h_total) {

  fargs = _fargs;
  pargs = _pargs;
  gargs = _gargs;
  bargs = _bargs;
  shelper = _shelper;
  sargs = _sargs;
  // assert(_d_total != NULL);
  d_total = _d_total;
  assert(_h_total != NULL);
  h_total = _h_total;
  in_off = new offsetGPU(); //input offset
  out_off = new offsetGPU(); //output offset
  sout = new shuffleOutGPU(); //output of partitioning

  assert(shelper != NULL);
}


KernelLaunch::KernelLaunch(CacheManager* _cm, KernelParams* _kparams, QueryParams* _qparams,
    int _sg, int _gpu, KernelType _kernel_type, int _table_id,
    float _output_selectivity, int _latemat, bool _aggrGPUcheck, cudaStream_t _stream) {

  cm = _cm;
  sg = _sg;
  gpu = _gpu;
  kernel_type = _kernel_type;
  table_id = _table_id;
  stream = _stream;
  output_estimate = 0;
  INPUT_LEN = 0;
  d_segment_group_each_gpu = NULL;
  output_selectivity = _output_selectivity;

  kparams = _kparams;
  qparams = _qparams;

  fargs = _kparams->fargs;
  pargs = _kparams->pargs;
  gargs = _kparams->gargs;
  bargs = _kparams->bargs;
  in_off = _kparams->in_off;
  out_off = _kparams->out_off;
  sargs = _kparams->sargs;
  sout = _kparams->sout;
  shelper = _kparams->shelper;
  d_total = _kparams->d_total;
  h_total = _kparams->h_total;

  count = NULL;
  key_off_col = NULL;

  latemat = _latemat;
  aggrGPUcheck = _aggrGPUcheck;

  assert(shelper != NULL);

  // result_count = shelper->result_count; //partitioning result count
  // temp_col = shelper->temp_col; //intermediate result of shuffling (column)
  // temp_off = shelper->temp_off; //intermediate result of shuffling (offset)
  // join_result_off = shelper->join_result_off; //intermediate result of join (can be global or local offset depending on needs)
  // out_shuffle_col = shelper->out_shuffle_col; //output of partitioning (column)
  // out_shuffle_off = shelper->out_shuffle_off; //output of partitioning (can be global or local offset depending on needs)
}

void
KernelLaunch::countPartitioning(int*** &off_col, short*** segment_group_each_gpu_count, short*** segment_group_each_gpu, 
  int** last_segment_gpu, bool first_shuffle) {
  assert(sargs != NULL);
  assert(shelper != NULL);
  assert(NUM_PARTITION == NUM_GPU);

  int tile_items = 128*4;

  count = new int[NUM_PARTITION]();
  int* d_count = (int*) cm->customCudaMalloc<int>(NUM_PARTITION, gpu);

  CubDebugExit(cudaSetDevice(gpu));
  CubDebugExit(cudaMemsetAsync(d_count, 0, NUM_PARTITION * sizeof(int), stream));
  CubDebugExit(cudaSetDevice(0));

  if (!first_shuffle) {
    assert(0);
    assert(*(h_total[gpu]) > 0);

    INPUT_LEN = *(h_total[gpu]);

    // CubDebugExit(cudaSetDevice(gpu));
    // RangePartitioningCount2<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
    //   *sargs, d_count, INPUT_LEN);
    // CubDebugExit(cudaMemcpyAsync(count, d_count, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
    // CubDebugExit(cudaSetDevice(0));

  } else {

    if (off_col[gpu][table_id] == NULL) {
      assert(segment_group_each_gpu_count != NULL);
      assert(segment_group_each_gpu_count[gpu] != NULL);
      assert(segment_group_each_gpu_count[gpu][table_id] != NULL);

      assert(segment_group_each_gpu_count[gpu][table_id][sg] > 0);
      assert(segment_group_each_gpu != NULL);
      assert(segment_group_each_gpu[gpu] != NULL);
      assert(segment_group_each_gpu[gpu][table_id] != NULL);

      if (last_segment_gpu[gpu][table_id] == sg) {
        INPUT_LEN = (segment_group_each_gpu_count[gpu][table_id][sg] - 1) * SEGMENT_SIZE + cm->allTable[table_id]->LEN % SEGMENT_SIZE;
      } else { 
        INPUT_LEN = segment_group_each_gpu_count[gpu][table_id][sg] * SEGMENT_SIZE;
      }

      d_segment_group_each_gpu = (short*) cm->customCudaMalloc<short>(cm->allTable[table_id]->total_segment, gpu);
      
      CubDebugExit(cudaSetDevice(gpu));
      short* segment_group_each_gpu_ptr = segment_group_each_gpu[gpu][table_id] + (sg * cm->allTable[table_id]->total_segment);
      CubDebugExit(cudaMemcpyAsync(d_segment_group_each_gpu, segment_group_each_gpu_ptr, segment_group_each_gpu_count[gpu][table_id][sg] * sizeof(short), cudaMemcpyHostToDevice, stream));
      RangePartitioningCount<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
        cm->gpuCache[gpu], *fargs, *sargs, d_count, INPUT_LEN, gpu, d_segment_group_each_gpu);
      CubDebugExit(cudaMemcpyAsync(count, d_count, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
      CubDebugExit(cudaSetDevice(0));

    //first partitioning but there is an offset from CPU (not yet supported)
    } else {
      assert(*(h_total[gpu]) > 0);
      INPUT_LEN = *(h_total[gpu]);
      CubDebugExit(cudaSetDevice(gpu));
      RangePartitioningCount2<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
        cm->gpuCache[gpu], *sargs, d_count, INPUT_LEN, gpu);
      CubDebugExit(cudaMemcpyAsync(count, d_count, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
      CubDebugExit(cudaSetDevice(0)); 
    }

  }
}

void
KernelLaunch::synchronizePartitioning() {
  CubDebugExit(cudaSetDevice(gpu));
  CHECK_ERROR_STREAM(stream);
  CubDebugExit(cudaSetDevice(0));
}

void
KernelLaunch::preparePartitioningAfterCount(int*** &off_col, int*** used_col_idx, bool* joinGPUcheck, bool first_shuffle) {
  assert(table_id != 0);
  assert(sargs != NULL);
  assert(shelper != NULL);
  assert(NUM_PARTITION == NUM_GPU);

  int* sum_count = new int[NUM_PARTITION+1]();
  sum_count[0] = 0;
  for (int partition = 1; partition <= NUM_PARTITION; partition++) {
    sum_count[partition] = sum_count[partition-1] + count[partition-1];
  }

  assert(shelper->out_shuffle_col != NULL);
  assert(shelper->out_shuffle_off != NULL);
  assert(shelper->out_shuffle_col[gpu] != NULL);
  assert(shelper->out_shuffle_off[gpu] != NULL);
  //if not first shuffle, no need to allocate (this might not work for the case when the second partitioning returned a larger out_shuffle_col than the first partitioning)
  for (int col = 0; col < NUM_COLUMN; col++) {
    int table = cm->allColumn[col]->table_id;
    if (used_col_idx[gpu][col] != NULL || shelper->temp_col[gpu][col] != NULL) {
      int* temp;
      if (table == table_id && shelper->out_shuffle_col[gpu][col] == NULL) {
        shelper->out_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
        // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
        cout << "allocate out_shuffle_col " << col << " " << (int) sum_count[NUM_PARTITION] * MULTIPLIER << endl;
        temp = cm->customCudaMalloc<int>((int) sum_count[NUM_PARTITION] * MULTIPLIER, gpu);
      } else {
        temp = shelper->out_shuffle_col[gpu][col][0];
      }
      assert(shelper->out_shuffle_col[gpu][col] != NULL);
      for (int partition = 0; partition < NUM_PARTITION; partition++) {
        assert(temp + sum_count[partition] != NULL);
        shelper->out_shuffle_col[gpu][col][partition] = temp + sum_count[partition];
      }
    }
  }

  int* temp;
  if (shelper->out_shuffle_off[gpu][table_id] == NULL) {
        shelper->out_shuffle_off[gpu][table_id] = new int*[NUM_PARTITION]();
        // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
        cout << "allocate out_shuffle_off " << table_id << " " << (int) sum_count[NUM_PARTITION] * MULTIPLIER << endl;
        temp = cm->customCudaMalloc<int>((int) sum_count[NUM_PARTITION] * MULTIPLIER, gpu);
  } else {
        assert(shelper->out_shuffle_off[gpu][table_id] != NULL);
        temp = shelper->out_shuffle_off[gpu][table_id][0];
  }
  assert(shelper->out_shuffle_off[gpu][table_id] != NULL);
  for (int partition = 0; partition < NUM_PARTITION; partition++) {
    assert(temp + sum_count[partition] != NULL);
    shelper->out_shuffle_off[gpu][table_id][partition] = temp + sum_count[partition];
  }

  int*** helper_col = new int**[NUM_COLUMN]();
  for (int col = 0; col < NUM_COLUMN; col++) {
    if (shelper->out_shuffle_col[gpu][col] != NULL) {
      assert(shelper->out_shuffle_col[gpu][col] != NULL);
      helper_col[col] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaMemcpyAsync(helper_col[col], shelper->out_shuffle_col[gpu][col], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
      CubDebugExit(cudaSetDevice(0));
    }
  }
  int*** d_out_col = (int***) cm->customCudaMalloc<int**>(NUM_COLUMN, gpu);
  CubDebugExit(cudaSetDevice(gpu));
  CubDebugExit(cudaMemcpyAsync(d_out_col, helper_col, NUM_COLUMN * sizeof(int**), cudaMemcpyHostToDevice, stream));
  CubDebugExit(cudaSetDevice(0));

  int*** helper_off = new int**[NUM_TABLE]();
  for (int table = 0; table < NUM_TABLE; table++) {
    if (shelper->out_shuffle_off[gpu][table] != NULL) {
      assert(shelper->out_shuffle_off[gpu][table] != NULL);
      helper_off[table] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaMemcpyAsync(helper_off[table], shelper->out_shuffle_off[gpu][table], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
      CubDebugExit(cudaSetDevice(0));
    }
  }
  int*** d_out_off = (int***) cm->customCudaMalloc<int**>(NUM_TABLE, gpu);
  CubDebugExit(cudaSetDevice(gpu));  
  CubDebugExit(cudaMemcpyAsync(d_out_off, helper_off, NUM_TABLE * sizeof(int**), cudaMemcpyHostToDevice, stream)); 
  CubDebugExit(cudaSetDevice(0));

  int* d_pos = (int*) cm->customCudaMalloc<int>(NUM_PARTITION, gpu);
  CubDebugExit(cudaSetDevice(gpu));
  CubDebugExit(cudaMemsetAsync(d_pos, 0, NUM_PARTITION * sizeof(int), stream));
  CubDebugExit(cudaSetDevice(0));

  *sout = {
    d_pos, d_out_col, d_out_off
  };
}

void
KernelLaunch::preparePartitioningWithoutCount(int*** &off_col, int*** used_col_idx, short*** segment_group_each_gpu_count, 
  short*** segment_group_each_gpu, int** last_segment_gpu, bool* joinGPUcheck, bool first_shuffle, bool pipeline) {

    assert(sargs != NULL);
    assert(shelper != NULL);

    assert(NUM_PARTITION == NUM_GPU);

    if (!first_shuffle) {
      assert(0);
      assert(*(h_total[gpu]) > 0);

      INPUT_LEN = *(h_total[gpu]);

      // cout << "input : " << INPUT_LEN << endl;
      // cout << "count : " << count[0] << " " << count[1] << endl;

    } else {

      if (off_col[gpu][table_id] == NULL) {
        assert(segment_group_each_gpu_count != NULL);
        assert(segment_group_each_gpu_count[gpu] != NULL);
        assert(segment_group_each_gpu_count[gpu][table_id] != NULL);
  
        if (table_id == 0) assert(segment_group_each_gpu_count[gpu][table_id][sg] > 0);
        assert(segment_group_each_gpu != NULL);
        assert(segment_group_each_gpu[gpu] != NULL);
        assert(segment_group_each_gpu[gpu][table_id] != NULL);
  
        if (last_segment_gpu[gpu][table_id] == sg) {
          INPUT_LEN = (segment_group_each_gpu_count[gpu][table_id][sg] - 1) * SEGMENT_SIZE + cm->allTable[table_id]->LEN % SEGMENT_SIZE;
        } else { 
          INPUT_LEN = segment_group_each_gpu_count[gpu][table_id][sg] * SEGMENT_SIZE;
        }

        // for (int i = 0; i < segment_group_each_gpu_count[gpu][table_id][sg]; i++) {
        //   printf("gpu %d segment idx %d\n", gpu, segment_group_each_gpu[gpu][table_id][sg * cm->allTable[table_id]->total_segment + i]);
        // }
  
        d_segment_group_each_gpu = (short*) cm->customCudaMalloc<short>(cm->allTable[table_id]->total_segment, gpu);
        
        CubDebugExit(cudaSetDevice(gpu));
        short* segment_group_each_gpu_ptr = segment_group_each_gpu[gpu][table_id] + (sg * cm->allTable[table_id]->total_segment);
        CubDebugExit(cudaMemcpyAsync(d_segment_group_each_gpu, segment_group_each_gpu_ptr, segment_group_each_gpu_count[gpu][table_id][sg] * sizeof(short), cudaMemcpyHostToDevice, stream));
        CubDebugExit(cudaSetDevice(0));

      //first partitioning but there is an offset from CPU (not yet supported)
      } else {
        // assert(0);
        if (table_id == 0) assert(*(h_total[gpu]) > 0);
        INPUT_LEN = *(h_total[gpu]);

      }

    }

    // if (first_shuffle) {
      //if not first shuffle, no need to allocate (this might not work for the case when the second partitioning returned a larger out_shuffle_col than the first partitioning)
      assert(shelper->in_shuffle_col != NULL);
      assert(shelper->in_shuffle_off != NULL);
      assert(shelper->out_shuffle_col != NULL);
      assert(shelper->out_shuffle_off != NULL);
      assert(shelper->in_shuffle_col[gpu] != NULL);
      assert(shelper->in_shuffle_off[gpu] != NULL);
      assert(shelper->out_shuffle_col[gpu] != NULL);
      assert(shelper->out_shuffle_off[gpu] != NULL);
      for (int col = 0; col < NUM_COLUMN; col++) {
        int table = cm->allColumn[col]->table_id;
        if (used_col_idx[gpu][col] != NULL || shelper->temp_col[gpu][col] != NULL) {
          if (table == table_id && shelper->out_shuffle_col[gpu][col] == NULL) {
            if (pipeline) shelper->in_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
            shelper->out_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
            // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
            // cout << "allocate out_shuffle_col " << col << endl;
            cout << "allocate out_shuffle_col " << col << " " << (int) INPUT_LEN * output_selectivity / NUM_PARTITION * MULTIPLIER << endl;
            for (int partition = 0; partition < NUM_PARTITION;  partition++) {
              // cout << (int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER << endl;
              // cout << INPUT_LEN << endl;
              // cout << output_selectivity << endl;
              // cout << INPUT_LEN / NUM_PARTITION << endl;
              // cout << INPUT_LEN / NUM_PARTITION * output_selectivity << endl;

              // shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN * output_selectivity / NUM_PARTITION * MULTIPLIER, gpu);
              if (output_selectivity == 1.0) shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
              else shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
              // if (pipeline) shelper->in_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN * output_selectivity / NUM_PARTITION * MULTIPLIER, gpu);
            }
          }
        }
      }

      // if (table_id == 0) {
      //   for (int table = 0; table < NUM_TABLE; table++) {
      //     // if (off_col[gpu][table] != NULL || latemat == 2) {
      //     if (table == 0 || joinGPUcheck[table] || off_col[gpu][table] != NULL) {
      //         if (pipeline) shelper->in_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
      //         shelper->out_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
      //         // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
      //         // cout << "allocate out_shuffle_off and in_shuffle_off " << table << endl;
      //         for (int partition = 0; partition < NUM_PARTITION;  partition++) {
      //           shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
      //           if (pipeline) shelper->in_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
      //         }
      //         assert(shelper->out_shuffle_off[gpu][table] != NULL);
      //         if (pipeline) assert(shelper->in_shuffle_off[gpu][table] != NULL);
      //     }
      //   }
      // } else {
        if (shelper->out_shuffle_off[gpu][table_id] == NULL) {
              // cout << INPUT_LEN << endl;
              // cout << output_selectivity << endl;
              // cout << INPUT_LEN / NUM_PARTITION << endl;
              // cout << INPUT_LEN / NUM_PARTITION * output_selectivity << endl;
              cout << "allocate out_shuffle_off " << table_id << " " << (int) INPUT_LEN * output_selectivity / NUM_PARTITION * MULTIPLIER << endl;
              // if (pipeline) shelper->in_shuffle_off[gpu][table_id] = new int*[NUM_PARTITION]();
              shelper->out_shuffle_off[gpu][table_id] = new int*[NUM_PARTITION]();
              // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
              for (int partition = 0; partition < NUM_PARTITION;  partition++) {

                // shelper->out_shuffle_off[gpu][table_id][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN * output_selectivity / NUM_PARTITION * MULTIPLIER, gpu);
                if (output_selectivity == 1.0) shelper->out_shuffle_off[gpu][table_id][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
                else shelper->out_shuffle_off[gpu][table_id][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
                // if (pipeline) shelper->in_shuffle_off[gpu][table_id][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
              }
        }
      // }
    // }
  
    int*** helper_col = new int**[NUM_COLUMN]();
    for (int col = 0; col < NUM_COLUMN; col++) {
      if (shelper->out_shuffle_col[gpu][col] != NULL) {
        assert(shelper->out_shuffle_col[gpu][col] != NULL);
        helper_col[col] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaMemcpyAsync(helper_col[col], shelper->out_shuffle_col[gpu][col], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
        CubDebugExit(cudaSetDevice(0));
      }
    }
    int*** d_out_col = (int***) cm->customCudaMalloc<int**>(NUM_COLUMN, gpu);
    CubDebugExit(cudaSetDevice(gpu));
    CubDebugExit(cudaMemcpyAsync(d_out_col, helper_col, NUM_COLUMN * sizeof(int**), cudaMemcpyHostToDevice, stream));
    CubDebugExit(cudaSetDevice(0));

    int*** helper_off = new int**[NUM_TABLE]();
    for (int table = 0; table < NUM_TABLE; table++) {
      if (shelper->out_shuffle_off[gpu][table] != NULL) {
        assert(shelper->out_shuffle_off[gpu][table] != NULL);
        helper_off[table] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaMemcpyAsync(helper_off[table], shelper->out_shuffle_off[gpu][table], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
        CubDebugExit(cudaSetDevice(0));
      }
    }
    int*** d_out_off = (int***) cm->customCudaMalloc<int**>(NUM_TABLE, gpu);
    CubDebugExit(cudaSetDevice(gpu));  
    CubDebugExit(cudaMemcpyAsync(d_out_off, helper_off, NUM_TABLE * sizeof(int**), cudaMemcpyHostToDevice, stream)); 
    CubDebugExit(cudaSetDevice(0));

    int* d_pos = (int*) cm->customCudaMalloc<int>(NUM_PARTITION, gpu);
    CubDebugExit(cudaSetDevice(gpu));
    CubDebugExit(cudaMemsetAsync(d_pos, 0, NUM_PARTITION * sizeof(int), stream));
    CubDebugExit(cudaSetDevice(0));

   *sout = {
      d_pos, d_out_col, d_out_off
    };

}

void
KernelLaunch::launchPartitioning(int latemat, int pipeline) {

    if (table_id == 0) assert(INPUT_LEN > 0);
    if (INPUT_LEN == 0) return;
    int tile_items = 128*4;

    assert(sargs != NULL);
    assert(sout != NULL);
    assert(sout->pos != NULL);
    assert(sout->column != NULL);
    assert(sout->out_off != NULL);

    if (d_segment_group_each_gpu == NULL) {
      // assert(0);
  
      assert(sargs->column != NULL);

      CubDebugExit(cudaSetDevice(gpu));

      // CHECK_ERROR_STREAM(stream);

      // if (latemat == 2) {
      //   assert(0);
        //write offset and not value
        // RangePartitioningKeyValue2<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
        //   *sargs, *sout, INPUT_LEN, 0, 1);
        // RangePartitioningKeyValue3<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
        //   cm->gpuCache[gpu], *sargs, *sout, INPUT_LEN, 0, 1);
      // } else {
      //   assert(0);
        //write value and offset
        // RangePartitioningKeyValue2<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
        //   *sargs, *sout, INPUT_LEN, 1, 0);
        // cout << " lets go here " << endl;
        // RangePartitioningKeyValue3<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
        //   cm->gpuCache[gpu], *sargs, *sout, INPUT_LEN, 1, 1);
      // }

      //WARNING: Have to handle latemat
      RangePartitioningKeyValue2<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
        cm->gpuCache[gpu], *sargs, *sout, INPUT_LEN, gpu, 1, 0);

      // CHECK_ERROR_STREAM(stream);

      CubDebugExit(cudaSetDevice(0));

    } else {
      assert(d_segment_group_each_gpu != NULL);
      assert(sargs->col_idx != NULL);

      CubDebugExit(cudaSetDevice(gpu));

      // CHECK_ERROR_STREAM(stream);

      if (latemat == 2 && pipeline == 1) {
        //write offset and not value
        RangePartitioningKeyValue<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
          cm->gpuCache[gpu], *fargs, *sargs, *sout, INPUT_LEN, gpu, 0, 1, d_segment_group_each_gpu);
      } else {
        // write value and offset
        RangePartitioningKeyValue<128,4, NUM_PARTITION><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
          cm->gpuCache[gpu], *fargs, *sargs, *sout, INPUT_LEN, gpu, 1, 1, d_segment_group_each_gpu);
      }

      // CHECK_ERROR_STREAM(stream);
      // // assert(0);

      // // cout << gpu << endl;

      // int* temp = new int[NUM_PARTITION];
      // CubDebugExit(cudaSetDevice(gpu));
      // CubDebugExit(cudaMemcpyAsync(temp, sout->pos, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
      // CHECK_ERROR_STREAM(stream);
      // CubDebugExit(cudaSetDevice(0));

      // cout << temp[0] << " " << temp[1] << endl;

      // assert(0);

      // if (gpu == 1) assert(0);

      CubDebugExit(cudaSetDevice(0));


    }

}

void
KernelLaunch::clearPartitioning() {

  CubDebugExit(cudaSetDevice(gpu));
  CHECK_ERROR_STREAM(stream);
  CubDebugExit(cudaMemcpyAsync(shelper->result_count[gpu], sout->pos, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
  CHECK_ERROR_STREAM(stream);
  CubDebugExit(cudaSetDevice(0));

}

void
KernelLaunch::prepareKernelDim(int*** &off_col, ColumnInfo* build_column, short*** segment_group_each_gpu_count, 
  short*** segment_group_each_gpu, int** last_segment_gpu, int table, bool has_shuffled) {

    assert(build_column != NULL);
    key_column = build_column;
    assert(off_col != NULL);
    assert(off_col[gpu] != NULL);

    if (has_shuffled) {
      // assert(0); //not implemented yet
      assert(*(h_total[gpu]) > 0);
      INPUT_LEN = *(h_total[gpu]);
      key_off_col = off_col[gpu][table];

    } else {
      if (off_col[gpu][table] == NULL) {
        assert(segment_group_each_gpu_count != NULL);
        assert(segment_group_each_gpu_count[gpu] != NULL);
        assert(segment_group_each_gpu_count[gpu][table_id] != NULL);
  
        if (table == 0) assert(segment_group_each_gpu_count[gpu][table_id][sg] > 0);
        assert(segment_group_each_gpu != NULL);
        assert(segment_group_each_gpu[gpu] != NULL);
        assert(segment_group_each_gpu[gpu][table_id] != NULL);

        if (last_segment_gpu[gpu][table_id] == sg) {
          INPUT_LEN = (segment_group_each_gpu_count[gpu][table_id][sg] - 1) * SEGMENT_SIZE + cm->allTable[table_id]->LEN % SEGMENT_SIZE;
        } else { 
          INPUT_LEN = segment_group_each_gpu_count[gpu][table_id][sg] * SEGMENT_SIZE;
        }
        // cout << "gpu " << gpu << " " << INPUT_LEN << endl;

        d_segment_group_each_gpu = (short*) cm->customCudaMalloc<short>(cm->allTable[table_id]->total_segment, gpu);
    
        CubDebugExit(cudaSetDevice(gpu));
        short* segment_group_each_gpu_ptr = segment_group_each_gpu[gpu][table_id] + (sg * cm->allTable[table_id]->total_segment);
        CubDebugExit(cudaMemcpyAsync(d_segment_group_each_gpu, segment_group_each_gpu_ptr, segment_group_each_gpu_count[gpu][table_id][sg] * sizeof(short), cudaMemcpyHostToDevice, stream));
        CubDebugExit(cudaSetDevice(0));

      } else {
        if (table_id == 0) assert(*(h_total[gpu]) > 0);
        assert(off_col[gpu][table] != NULL);
        INPUT_LEN = *(h_total[gpu]);
        key_off_col = off_col[gpu][table];
      }
    }

}

void
KernelLaunch::prepareKernelFact(int*** &off_col, int*** used_col_idx, short*** segment_group_each_gpu_count, 
  short*** segment_group_each_gpu, int** last_segment_gpu, bool* joinGPUcheck, int* fkey_col_id, int* group_col_id,
  int will_shuffle, bool has_shuffled) {

  off_col_out = new int*[NUM_TABLE]();
  assert(off_col != NULL);
  assert(off_col[gpu] != NULL);

  //if (has_shuffled) input will be partition
  //if there is a shuffle afterward, output will be allocated
  //if there is no shuffle afteraward, output will not be allocated
  //else input will be segment

  if (has_shuffled) {
    // cout << gpu << " " << *(h_total[gpu]) << endl;
    // if (*(h_total[gpu]) == 0) return;
    // assert(*(h_total[gpu]) > 0);
    INPUT_LEN = *(h_total[gpu]);
    // output_estimate = INPUT_LEN * output_selectivity * MULTIPLIER;
    if (output_selectivity == 1.0) output_estimate = INPUT_LEN * MULTIPLIER;
    else output_estimate = INPUT_LEN * output_selectivity;
    if (kernel_type == JustProbe || kernel_type == JustFilter) {
      for (int table = 0; table < NUM_TABLE; table++) {
        if (table == 0 || (table > 0 && group_col_id[table-1] > 0)) {
          cout << "allocating off col out " << table << " " << output_selectivity << " " << output_estimate << endl;
          off_col_out[table] = (int*) cm->customCudaMalloc<int>(output_estimate, gpu);
        }
      }
      // cout << "copying join result off" << endl;
      int** d_join_result_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaMemcpyAsync(d_join_result_off, off_col_out, NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream));
      CubDebugExit(cudaSetDevice(0));
      *out_off = {NULL, NULL, NULL, NULL, NULL, d_join_result_off};
    }
    *in_off = {off_col[gpu][0], off_col[gpu][1], off_col[gpu][2], off_col[gpu][3], off_col[gpu][4]};

    // int* tmp = new int[INPUT_LEN];
    // CubDebugExit(cudaSetDevice(gpu));
    // CubDebugExit(cudaMemcpyAsync(tmp, off_col[gpu][1], INPUT_LEN * sizeof(int), cudaMemcpyDeviceToHost, stream));
    // CubDebugExit(cudaSetDevice(0));
    // for (int i = 0; i < INPUT_LEN; i++) {
    //   printf("hey ho gpu %d %d\n", gpu, tmp[i]);
    // }
    // cout << endl;
    // CHECK_ERROR_STREAM(stream);

  } else {
    if (off_col[gpu][0] == NULL) { //initialize out off and in off for this gpu
      assert(segment_group_each_gpu_count != NULL);
      assert(segment_group_each_gpu_count[gpu] != NULL);
      assert(segment_group_each_gpu_count[gpu][table_id] != NULL);
      // assert(segment_group_each_gpu_count[gpu][table_id][sg] > 0);
      assert(segment_group_each_gpu != NULL);
      assert(segment_group_each_gpu[gpu] != NULL);
      assert(segment_group_each_gpu[gpu][table_id] != NULL);
      assert(off_col_out != NULL);
  
      // output_estimate = SEGMENT_SIZE * segment_group_each_gpu_count[gpu][table_id][sg] * output_selectivity * MULTIPLIER;
      if (output_selectivity == 1.0) output_estimate = SEGMENT_SIZE * segment_group_each_gpu_count[gpu][table_id][sg] * MULTIPLIER;
      else output_estimate = SEGMENT_SIZE * segment_group_each_gpu_count[gpu][table_id][sg] * output_selectivity;
      if (kernel_type == JustProbe || kernel_type == JustFilter) {
        for (int table = 0; table < NUM_TABLE; table++) {
          if (table == 0 || (table > 0 && group_col_id[table-1] > 0)) {
            // cout << " i am here " << endl;
            cout << "allocating off col out " << table << " " << output_selectivity << " " << output_estimate << endl;
            off_col_out[table] = (int*) cm->customCudaMalloc<int>(output_estimate, gpu);
            // cout << " i am not here " << endl;
          }
        }
        *out_off = {off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4], NULL};
      }
  
      if (last_segment_gpu[gpu][table_id] == sg) {
        INPUT_LEN = (segment_group_each_gpu_count[gpu][table_id][sg] - 1) * SEGMENT_SIZE + cm->allTable[table_id]->LEN % SEGMENT_SIZE;
      } else { 
        INPUT_LEN = segment_group_each_gpu_count[gpu][table_id][sg] * SEGMENT_SIZE;
      }
  
      d_segment_group_each_gpu = (short*) cm->customCudaMalloc<short>(cm->allTable[table_id]->total_segment, gpu);
      
      CubDebugExit(cudaSetDevice(gpu));
      short* segment_group_each_gpu_ptr = segment_group_each_gpu[gpu][table_id] + (sg * cm->allTable[table_id]->total_segment);
      CubDebugExit(cudaMemcpyAsync(d_segment_group_each_gpu, segment_group_each_gpu_ptr, segment_group_each_gpu_count[gpu][table_id][sg] * sizeof(short), cudaMemcpyHostToDevice, stream));
      CubDebugExit(cudaSetDevice(0));
      // cpu_to_gpu[sg] += (qo->segment_group_each_gpu_count[0][sg] * sizeof(short));
  
    } else {
      // if (*(h_total[gpu]) == 0) return;
      // assert(*(h_total[gpu]) > 0);
      assert(off_col_out != NULL);
      assert(off_col[gpu] != NULL);
      assert(off_col[gpu][0] != NULL);
      INPUT_LEN = *(h_total[gpu]);
  
      // output_estimate = INPUT_LEN * output_selectivity * MULTIPLIER;
      if (output_selectivity == 1.0) output_estimate = INPUT_LEN * MULTIPLIER;
      else output_estimate = INPUT_LEN * output_selectivity;
      if (kernel_type == JustProbe || kernel_type == JustFilter) {
        for (int table = 0; table < cm->TOT_TABLE; table++) {
          if (off_col[gpu][table] != NULL || table == 0 || (table > 0 && group_col_id[table-1] > 0)) {
            // cout << " i am here " << endl;
            cout << "allocating off col out " << table << " " << output_selectivity << " " << output_estimate << endl;
            off_col_out[table] = (int*) cm->customCudaMalloc<int>(output_estimate, gpu);
            // cout << " i am not here " << endl;
          }
        }
        *out_off = {off_col_out[0], off_col_out[1], off_col_out[2], off_col_out[3], off_col_out[4], NULL};
      }
      *in_off = {off_col[gpu][0], off_col[gpu][1], off_col[gpu][2], off_col[gpu][3], off_col[gpu][4]};
    }
  }

    //if there will be shuffle
  if (will_shuffle) {
    
    if (kernel_type == ProbePartition) {
        //if not first shuffle, no need to allocate (this might not work for the case when the second partitioning returned a larger out_shuffle_col than the first partitioning)
        assert(shelper->out_shuffle_col != NULL);
        assert(shelper->out_shuffle_off != NULL);
        assert(shelper->out_shuffle_col[gpu] != NULL);
        assert(shelper->out_shuffle_off[gpu] != NULL);

        if (latemat != 2) {
          for (int col = 0; col < NUM_COLUMN; col++) {
            if (used_col_idx[gpu][col] != NULL || shelper->temp_col[gpu][col] != NULL) {
                if (shelper->out_shuffle_col[gpu][col] == NULL) {
                    // cout << "col " << col << endl;
                    shelper->out_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
                    // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
                    cout << "allocate out_shuffle_col " << col << " " << (int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER << endl;
                    for (int partition = 0; partition < NUM_PARTITION;  partition++) {
                        // shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
                        if (output_selectivity == 1.0) shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
                        else shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
                    }
                }
            }
          }
        } else {
          for (int col = 0; col < NUM_COLUMN; col++) {
            int table = cm->allColumn[col]->table_id;
            if (used_col_idx[gpu][col] != NULL || shelper->temp_col[gpu][col] != NULL) {
              if (table == 0 || (table > 0 && fkey_col_id[table-1] > 0 && col == group_col_id[table-1])) {
                if (shelper->out_shuffle_col[gpu][col] == NULL) {
                    shelper->out_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
                    // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
                    cout << "allocate out_shuffle_col " << col << " " << (int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER << endl;
                    for (int partition = 0; partition < NUM_PARTITION;  partition++) {
                        // shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
                        if (output_selectivity == 1.0) shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
                        else shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
                    }
                }
              }
            }
          }
        }

        // printf("%d %d %d\n", latemat, qparams->groupGPUcheck, aggrGPUcheck);
        if ((latemat != 2 && qparams->groupGPUcheck && aggrGPUcheck) || (latemat == 2 && NUM_GPU == 8 && qparams->groupGPUcheck && aggrGPUcheck)) {
                int table = 0;
                if (shelper->out_shuffle_off[gpu][table] == NULL) {
                    shelper->out_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
                    cout << "allocate out_shuffle_off " << table_id << " " << (int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER << endl;
                    // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
                    for (int partition = 0; partition < NUM_PARTITION;  partition++) {
                      // if (output_selectivity == 1.0) shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
                      if (output_selectivity == 1.0) shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
                      else shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
                    }
                }
        } else {
                for (int table = 0; table < NUM_TABLE; table++) {
                  if (table == 0 || (table > 0 && fkey_col_id[table-1] > 0 && group_col_id[table-1] > 0)) {
                    if (shelper->out_shuffle_off[gpu][table] == NULL) {
                        // cout << "table " << table << endl;
                        shelper->out_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
                        cout << "allocate out_shuffle_off " << table_id << " " << (int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER << endl;
                        // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
                        for (int partition = 0; partition < NUM_PARTITION;  partition++) {
                          // if (output_selectivity == 1.0) shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
                          if (output_selectivity == 1.0) shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * MULTIPLIER, gpu);
                          else shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity, gpu);
                        }
                    }
                  }
                }
        }
    }

    int*** helper_col = new int**[NUM_COLUMN]();
    for (int col = 0; col < NUM_COLUMN; col++) {
        if (shelper->out_shuffle_col[gpu][col] != NULL) {
        assert(shelper->out_shuffle_col[gpu][col] != NULL);
        helper_col[col] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaMemcpyAsync(helper_col[col], shelper->out_shuffle_col[gpu][col], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
        CubDebugExit(cudaSetDevice(0));
        }
    }
    int*** d_out_col = (int***) cm->customCudaMalloc<int**>(NUM_COLUMN, gpu);
    CubDebugExit(cudaSetDevice(gpu));
    CubDebugExit(cudaMemcpyAsync(d_out_col, helper_col, NUM_COLUMN * sizeof(int**), cudaMemcpyHostToDevice, stream));
    CubDebugExit(cudaSetDevice(0));

    int*** helper_off = new int**[NUM_TABLE]();
    for (int table = 0; table < NUM_TABLE; table++) {
        if (shelper->out_shuffle_off[gpu][table] != NULL) {
        assert(shelper->out_shuffle_off[gpu][table] != NULL);
        helper_off[table] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaMemcpyAsync(helper_off[table], shelper->out_shuffle_off[gpu][table], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
        CubDebugExit(cudaSetDevice(0));
        }
    }
    int*** d_out_off = (int***) cm->customCudaMalloc<int**>(NUM_TABLE, gpu);
    CubDebugExit(cudaSetDevice(gpu));  
    CubDebugExit(cudaMemcpyAsync(d_out_off, helper_off, NUM_TABLE * sizeof(int**), cudaMemcpyHostToDevice, stream)); 
    CubDebugExit(cudaSetDevice(0));

    int* d_pos = (int*) cm->customCudaMalloc<int>(NUM_PARTITION, gpu);
    CubDebugExit(cudaSetDevice(gpu));
    CubDebugExit(cudaMemsetAsync(d_pos, 0, NUM_PARTITION * sizeof(int), stream));
    CubDebugExit(cudaSetDevice(0));

    *sout = {
        d_pos, d_out_col, d_out_off
    };
  }
}

void
KernelLaunch::launchKernel(bool has_shuffled, bool broadcast, int* toBroadcast, int broadcast_len) {
  if (INPUT_LEN == 0 && broadcast_len == 0) return;
  if (table_id == 0) assert(INPUT_LEN > 0);
  int tile_items = 128*4;
  
  int** d_gpuCache = NULL;
  if (latemat != 0) {
    d_gpuCache = (int**) cm->customCudaMalloc<int*>(NUM_GPU, gpu);
    CubDebugExit(cudaSetDevice(gpu));
    CubDebugExit(cudaMemcpyAsync(d_gpuCache, cm->gpuCache, NUM_GPU * sizeof(int*), cudaMemcpyHostToDevice, stream)); 
    CubDebugExit(cudaSetDevice(0));
  }

  if (kernel_type == JustFilter) {
    assert(d_total != NULL);
    assert(d_total[gpu] != NULL);

    if (has_shuffled) {
        assert(0);
    } else {
      if (d_segment_group_each_gpu != NULL) {
        assert(d_segment_group_each_gpu != NULL);
        assert(fargs != NULL);
        assert(out_off != NULL);
  
        CubDebugExit(cudaSetDevice(gpu));
  
        // CHECK_ERROR_STREAM(stream);
  
        filter_GPU2<128,4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
          cm->gpuCache[gpu], *fargs, off_col_out[table_id], INPUT_LEN, d_total[gpu], 0, d_segment_group_each_gpu);

        // CHECK_ERROR_STREAM(stream);
  
        CubDebugExit(cudaSetDevice(0));
  
      } else {
        assert(0);
      }
    }

  } else if (kernel_type == JustProbe) {
    assert(d_total != NULL);
    assert(d_total[gpu] != NULL);

    if (has_shuffled) {

        assert(pargs != NULL);
        assert(out_off != NULL);
        CubDebugExit(cudaSetDevice(gpu));
    
        // CHECK_ERROR_STREAM(stream);

        probe_GPU<128,4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
          *pargs, *in_off, *out_off, INPUT_LEN, d_total[gpu], 0);

        // CHECK_ERROR_STREAM(stream);

        CubDebugExit(cudaSetDevice(0));

    } else {
      if (d_segment_group_each_gpu != NULL) {
        assert(d_segment_group_each_gpu != NULL);
        assert(pargs != NULL);
        assert(out_off != NULL);
  
        CubDebugExit(cudaSetDevice(gpu));
  
        // CHECK_ERROR_STREAM(stream);
  
        if (fargs->filter_idx1 != NULL) {
          filter_probe_GPU2<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
            cm->gpuCache[gpu], *fargs, *pargs, *out_off, INPUT_LEN, d_total[gpu], 0, d_segment_group_each_gpu);
        } else {
          probe_GPU2<128,4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
            cm->gpuCache[gpu], *pargs, *out_off, INPUT_LEN, d_total[gpu], 0, d_segment_group_each_gpu);
        }

        // CHECK_ERROR_STREAM(stream);
  
        CubDebugExit(cudaSetDevice(0));
  
      } else {

        assert(0);
  
        CubDebugExit(cudaSetDevice(gpu));
        assert(pargs != NULL);
        assert(out_off != NULL);
        assert(in_off != NULL);
  
        // CHECK_ERROR_STREAM(stream);
  
        probe_GPU3<128,4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
          cm->gpuCache[gpu], *in_off, *pargs, *out_off, INPUT_LEN, d_total[gpu]);
  
        // CHECK_ERROR_STREAM(stream); 
  
        CubDebugExit(cudaSetDevice(0));
      }
    }

  } else if (kernel_type == ProbeAggr) {
    assert(qparams->d_res[gpu] != NULL);

    if (has_shuffled) {

      assert(pargs != NULL);
      assert(gargs != NULL);

      CubDebugExit(cudaSetDevice(gpu));

      // CHECK_ERROR_STREAM(stream);

      if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qparams->groupGPUcheck && aggrGPUcheck)) {
        probe_aggr_GPU<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
          *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0);
      } else {
        probe_aggr_GPU_lm<128, 4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
          d_gpuCache, *pargs, *gargs, *sargs, INPUT_LEN, qparams->d_res[gpu], gpu);          
      }

      // probe_group_by_GPU<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
      //   *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0);



      // int* result = (int*) cm->customCudaHostAlloc<int>(qparams->total_val * 6);
      // CubDebugExit(cudaMemcpy(result, qparams->d_res[gpu], qparams->total_val * 6 * sizeof(int), cudaMemcpyDeviceToHost));
      // CubDebugExit(cudaStreamSynchronize(stream));
      // cout << "Result: "  << endl;
      // cout << result[0] << " " << result[1] << " " << result[2] << " " << result[3] << " " << reinterpret_cast<unsigned long long*>(&result[4])[0]  << endl;

      CubDebugExit(cudaSetDevice(0));
    } else {
      if (d_segment_group_each_gpu != NULL) {
        assert(d_segment_group_each_gpu != NULL);
        assert(pargs != NULL);
        assert(gargs != NULL);

        CubDebugExit(cudaSetDevice(gpu));

        // CHECK_ERROR_STREAM(stream);

        // WARNING: HAVE TO HANDLE A CASE WHERE THERE IS FILTER
        if (fargs->filter_idx1 != NULL) {
          filter_probe_aggr_GPU2<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
            cm->gpuCache[gpu], *fargs, *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0, d_segment_group_each_gpu);
        } else {
          probe_aggr_GPU2<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
            cm->gpuCache[gpu], *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0, d_segment_group_each_gpu);
        }

        // CHECK_ERROR_STREAM(stream);

        CubDebugExit(cudaSetDevice(0));

      } else {

        assert(0);

        assert(pargs != NULL);
        assert(gargs != NULL);
        assert(in_off != NULL);
        CubDebugExit(cudaSetDevice(gpu));

        // CHECK_ERROR_STREAM(stream);

        probe_aggr_GPU3<128, 4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
          cm->gpuCache[gpu], *in_off, *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu]);

        // CHECK_ERROR_STREAM(stream); 

        CubDebugExit(cudaSetDevice(0));
      }
    }
  } else if (kernel_type == ProbePartition) {
      if (has_shuffled) {

        // SETUP_TIMING();
        // float time;
        // cudaEventRecord(start, 0);

        CubDebugExit(cudaSetDevice(gpu));
        if (latemat == 2) {
          if (NUM_GPU == 8 && qparams->groupGPUcheck && aggrGPUcheck) {
            probe_partition_GPU2_simple_groupby_8GPUs<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
              cm->gpuCache[gpu], *pargs, *gargs, *sargs, *sout, INPUT_LEN, 1, 1, latemat);
          } else {
            probe_partition_GPU2_simple<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
              cm->gpuCache[gpu], *pargs, *sargs, *sout, INPUT_LEN, 1, 1, latemat);
          }
        } else {
          probe_partition_GPU2<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
              cm->gpuCache[gpu], *pargs, *sargs, *sout, INPUT_LEN, 1, 1, latemat);
        }
          // CHECK_ERROR_STREAM(stream);
        CubDebugExit(cudaSetDevice(0));

        // CHECK_ERROR_STREAM(stream);
  
        // cudaEventRecord(stop, 0);
        // cudaEventSynchronize(stop);
        // cudaEventElapsedTime(&time, start, stop);
        // cout << "probe partition kernel time " << time << endl;

        // assert(0);
  
      } else {
        if (d_segment_group_each_gpu != NULL) {
          assert(d_segment_group_each_gpu != NULL);

          CubDebugExit(cudaSetDevice(gpu));
          if (latemat == 2) {
            if (NUM_GPU == 8 && qparams->groupGPUcheck && aggrGPUcheck) {
              probe_partition_GPU_simple_groupby_8GPUs<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
                cm->gpuCache[gpu], *fargs, *pargs, *gargs, *sargs, *sout, INPUT_LEN, gpu, 0, d_segment_group_each_gpu, 1, 1, latemat);
              // CHECK_ERROR_STREAM(stream);
            } else {
              probe_partition_GPU_simple<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
                cm->gpuCache[gpu], *fargs, *pargs, *sargs, *sout, INPUT_LEN, gpu, 0, d_segment_group_each_gpu, 1, 1, latemat);
            }
          } else {
            probe_partition_GPU<128,4, NUM_GPU><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
                cm->gpuCache[gpu], *fargs, *pargs, *sargs, *sout, INPUT_LEN, gpu, 0, d_segment_group_each_gpu, 1, 1, latemat);
          }
          CubDebugExit(cudaSetDevice(0));
        } else {
          assert(0);
        }
      }
  
  } else if (kernel_type == ProbeGroupby) {
    assert(qparams->d_res[gpu] != NULL);

    if (has_shuffled) {

      assert(pargs != NULL);
      assert(gargs != NULL);

      CubDebugExit(cudaSetDevice(gpu));

      if (latemat != 2 || (latemat == 2 && NUM_GPU == 8 && qparams->groupGPUcheck && aggrGPUcheck)) {
        probe_group_by_GPU<128, 4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
          *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0);
      } else {
        probe_group_by_GPU_lm<128, 4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
          d_gpuCache, *pargs, *gargs, *sargs, INPUT_LEN, qparams->d_res[gpu], gpu);          
      }

      // CHECK_ERROR_STREAM(stream);
      // assert(0);

      CubDebugExit(cudaSetDevice(0));
    } else {
      if (d_segment_group_each_gpu != NULL) {
        assert(d_segment_group_each_gpu != NULL);
        assert(pargs != NULL);
        assert(gargs != NULL);

        CubDebugExit(cudaSetDevice(gpu));

        probe_group_by_GPU2<128, 4><<<(INPUT_LEN+ tile_items - 1)/tile_items, 128, 0, stream>>>(
          cm->gpuCache[gpu], *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu], 0, d_segment_group_each_gpu);

        CubDebugExit(cudaSetDevice(0));

      } else {
        assert(0);

        assert(pargs != NULL);
        assert(gargs != NULL);
        assert(in_off != NULL);
        CubDebugExit(cudaSetDevice(gpu));

        probe_group_by_GPU3<128, 4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
          cm->gpuCache[gpu], *in_off, *pargs, *gargs, INPUT_LEN, qparams->d_res[gpu]);

        CubDebugExit(cudaSetDevice(0));
      }
    }
  } else if (kernel_type == JustBuild) {

    assert(qparams->ht_GPU[gpu][key_column] != NULL);
    //SET THIS TO VARIES JOIN SELECTIVITY
    float selectivity = 1.0;

    if (has_shuffled) {
      // assert(0); //not implemented yet
        assert(bargs != NULL);
        assert(key_off_col != NULL);

        // cout << INPUT_LEN << endl;
        // int* test2 = new int[2 * qparams->dim_len[key_column]];
        // CubDebugExit(cudaMemcpyAsync(test, qparams->ht_GPU[gpu][key_column], 2 * qparams->dim_len[key_column] * sizeof(int), cudaMemcpyDeviceToHost, stream));
        // CHECK_ERROR_STREAM(stream);

        CubDebugExit(cudaSetDevice(gpu));
        build_GPU5<128,4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
          key_off_col, *bargs, INPUT_LEN, qparams->ht_GPU[gpu][key_column], selectivity);
        CubDebugExit(cudaSetDevice(0));

        // int* test = new int[2 * qparams->dim_len[key_column]];
        // CubDebugExit(cudaMemcpyAsync(test, qparams->ht_GPU[gpu][key_column], 2 * qparams->dim_len[key_column] * sizeof(int), cudaMemcpyDeviceToHost, stream));
        // CHECK_ERROR_STREAM(stream);

        // long long sum = 0;
        // for (int i = 0; i < 2 * qparams->dim_len[key_column]; i++) {
        //   if (test[i] > 0) {
        //     // cout << test[i] << endl;
        //     sum++;
        //   }
        // }
        // cout << " sum " << sum << endl;

    } else {
      if (d_segment_group_each_gpu != NULL) {

        assert(bargs != NULL);
        assert(fargs != NULL);

        // SETUP_TIMING();
        // float time;
        // cudaEventRecord(start, 0);

        // int* a = cm->customCudaMalloc<int>(1, gpu);
        // int* temp = cm->customCudaMalloc<int>(104857600, gpu);

        CubDebugExit(cudaSetDevice(gpu));
  
        // CHECK_ERROR_STREAM(stream);
        // cout << "oh no " << INPUT_LEN << endl;
        build_GPU2<128,4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
          cm->gpuCache[gpu], *fargs, *bargs, INPUT_LEN, qparams->ht_GPU[gpu][key_column], 0, d_segment_group_each_gpu, selectivity);
        if (broadcast) {
          // cout << gpu << " " << broadcast_len << endl;
          build_GPU<128,4><<<(broadcast_len + tile_items - 1)/tile_items, 128, 0, stream>>>(
            cm->gpuCache[gpu], toBroadcast, *fargs, *bargs, broadcast_len, qparams->ht_GPU[gpu][key_column], 0, selectivity);
        }

        // CHECK_ERROR_STREAM(stream);

        
        // int* a = new int[4];
        // int* b = new int[4096];
        // cout << broadcast_len << endl;
        // cout << bargs->num_slots << endl;
        // cout << gpu << " " << INPUT_LEN << endl;

        // if (key_column->column_id == 18) {
        //     int* test = new int[2 * qparams->dim_len[key_column]];
        //     CubDebugExit(cudaMemcpyAsync(test, qparams->ht_GPU[gpu][key_column], 2 * qparams->dim_len[key_column] * sizeof(int), cudaMemcpyDeviceToHost, stream));
        //     // // CubDebugExit(cudaMemcpyAsync(a, toBroadcast, 4 * sizeof(int), cudaMemcpyDeviceToHost, stream));
        //     // // CubDebugExit(cudaMemcpyAsync(b, bargs->key_col, 4096 * sizeof(int), cudaMemcpyDeviceToHost, stream));
        //     CHECK_ERROR_STREAM(stream);

        //     long long sum = 0;
        //     // for (int i = 0; i < 2 * qparams->dim_len[key_column]; i++) {

            
        //     for (int i = 0; i < 2 * qparams->dim_len[key_column]; i+=2) {
        //       // cout << test[i] << endl;
        //       if (test[i] > 0) {
        //         // cout << test[i] << endl;
        //         sum += test[i+1];
        //         // sum++;
        //       }
        //     }

        //     cout << "build sum " << sum << endl;
        // }

        // for (int i = 0; i < 1000; i++) {
        //   if (test[i] > 0) {
        //     cout << i << " " << test[i] << endl;
        //   }
        // }

        // for (int i = qparams->dim_len[key_column]; i < qparams->dim_len[key_column] + 1000; i++) {
        //   if (test[i] > 0) {
        //     cout << i << " " << test[i] << endl;
        //   }
        // }
        // cout << sum << endl;
        // cout << endl;
        // cout << endl;
        // cout << endl;

        // for (int i = 0; i < 4096; i++) {
        //   cout << b[i] << endl;
        // }

        // cout << endl;
        // cout << endl;
        // cout << endl;

        // assert(0);
        
        // test_kernel<128,4><<<(104857600 + 128 - 1)/128, 128, 0, stream>>>(temp, a, 104857600);
  
        // CHECK_ERROR_STREAM(stream);
  
        CubDebugExit(cudaSetDevice(0));

        // cudaEventRecord(stop, 0);
        // cudaEventSynchronize(stop);
        // cudaEventElapsedTime(&time, start, stop);
        // launching build kernel time " << time << endl;
  
      } else {
        assert(key_off_col != NULL);
        assert(bargs != NULL);
        assert(fargs != NULL);

        CubDebugExit(cudaSetDevice(gpu));
  
        // CHECK_ERROR_STREAM(stream);

        // SETUP_TIMING();
        // float time;
        // cudaEventRecord(start, 0);
  
        if (broadcast) {
          build_GPU4<128,4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
            cm->gpuCache[gpu], key_off_col, *fargs, *bargs, INPUT_LEN, qparams->ht_GPU[gpu][key_column], selectivity);
        } else {
          build_GPU3<128,4><<<(INPUT_LEN + tile_items - 1)/tile_items, 128, 0, stream>>>(
            cm->gpuCache[gpu], key_off_col, *fargs, *bargs, INPUT_LEN, qparams->ht_GPU[gpu][key_column], selectivity);
        }

        // CHECK_ERROR_STREAM(stream);

        // if (key_column->column_id == 18) {
        //     int* test = new int[2 * qparams->dim_len[key_column]];
        //     CubDebugExit(cudaMemcpyAsync(test, qparams->ht_GPU[gpu][key_column], 2 * qparams->dim_len[key_column] * sizeof(int), cudaMemcpyDeviceToHost, stream));
        //     // // CubDebugExit(cudaMemcpyAsync(a, toBroadcast, 4 * sizeof(int), cudaMemcpyDeviceToHost, stream));
        //     // // CubDebugExit(cudaMemcpyAsync(b, bargs->key_col, 4096 * sizeof(int), cudaMemcpyDeviceToHost, stream));
        //     CHECK_ERROR_STREAM(stream);

        //     long long sum = 0;
        //     // for (int i = 0; i < 2 * qparams->dim_len[key_column]; i++) {

            
        //     for (int i = 0; i < 2 * qparams->dim_len[key_column]; i+=2) {
        //       // cout << test[i] << endl;
        //       if (test[i] > 0) {
        //         // cout << test[i] << endl;
        //         sum += test[i+1];
        //         // sum++;
        //       }
        //     }

        //     cout << "build sum " << sum << endl;
        // }

        // cudaEventRecord(stop, 0);
        // cudaEventSynchronize(stop);
        // cudaEventElapsedTime(&time, start, stop);
        // cout << "launching build kernel time " << time << endl;

        // assert(0);
  
        CubDebugExit(cudaSetDevice(0));
          
      }
    }
  } else {
    assert(0);
  }
}

void
KernelLaunch::clearKernel(int*** &off_col, int will_shuffle, bool has_shuffled) {
  

  if (kernel_type == JustProbe || kernel_type == JustFilter) {
    CubDebugExit(cudaSetDevice(gpu));
    CubDebugExit(cudaMemcpyAsync(h_total[gpu], d_total[gpu], sizeof(int), cudaMemcpyDeviceToHost, stream));
    CHECK_ERROR_STREAM(stream);
    CubDebugExit(cudaSetDevice(0));
    // cout << *(h_total[gpu]) << " " << output_estimate << endl;
    assert(*(h_total[gpu]) <= output_estimate);

    assert(off_col != NULL);
    assert(off_col[gpu] != NULL);
    off_col[gpu] = off_col_out;
    for (int table = 0; table < NUM_TABLE; table++) {
      off_col[gpu][table] = off_col_out[table];
    }
    return;
  } else {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(stream);
    CubDebugExit(cudaSetDevice(0));
  }

  if (kernel_type == JustBuild) return;

  assert(off_col_out != NULL);

  //probe partition
  if (will_shuffle) {
    CubDebugExit(cudaSetDevice(gpu));
    CubDebugExit(cudaMemcpyAsync(shelper->result_count[gpu], sout->pos, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
    CHECK_ERROR_STREAM(stream);
    CubDebugExit(cudaSetDevice(0));
  }

}

// //TODO: NOT YET SUPPORTED IF THE OUTPUT IS GOING TO CPU
// void
// KernelLaunch::prepareKernelPipelined(int*** &off_col, int*** used_col_idx, short*** segment_group_each_gpu_count, 
//   short*** segment_group_each_gpu, int** last_segment_gpu, bool* joinGPUcheck, int* fkey_col_id, int* group_col_id,
//   bool has_shuffled, int latemat) {

//   assert(off_col != NULL);
//   assert(off_col[gpu] != NULL);

//   assert(sargs != NULL);
//   assert(shelper != NULL);

//   assert(NUM_PARTITION == NUM_GPU);

//   if (has_shuffled) {

//     //calculate total input partition
//     int sum = 0;
//     for (int partition = 0; partition < NUM_GPU; partition++) {
//       // cout << "partition " << partition << " gpu " << gpu << " result count " << shelper->result_count[partition][gpu] << endl;
//       sum += shelper->result_count[partition][gpu];
//     }

//     INPUT_LEN = sum;
//     assert(INPUT_LEN > 0);
//     output_estimate = sum * output_selectivity * MULTIPLIER;

//     if (kernel_type == ShuffleProbe) {
//       off_col_out = new int*[NUM_TABLE];
//       for (int table = 0; table < NUM_TABLE; table++) {
//         if (table == 0 || joinGPUcheck[table]) {
//           off_col_out[table] = (int*) cm->customCudaMalloc<int>(output_estimate, gpu);
//         }
//       }
//       int** d_join_result_off = (int**) cm->customCudaMalloc<int*>(NUM_TABLE, gpu);
//       CubDebugExit(cudaSetDevice(gpu));
//       CubDebugExit(cudaMemcpyAsync(d_join_result_off, off_col_out, NUM_TABLE * sizeof(int*), cudaMemcpyHostToDevice, stream));
//       CubDebugExit(cudaSetDevice(0));

//       *out_off = {NULL, NULL, NULL, NULL, NULL, d_join_result_off};
//     }

//   } else {

//     if (off_col[gpu][0] == NULL) { //initialize out off and in off for this gpu
//       assert(segment_group_each_gpu_count != NULL);
//       assert(segment_group_each_gpu_count[gpu] != NULL);
//       assert(segment_group_each_gpu_count[gpu][table_id] != NULL);
//       assert(segment_group_each_gpu_count[gpu][table_id][sg] > 0);
//       assert(segment_group_each_gpu != NULL);
//       assert(segment_group_each_gpu[gpu] != NULL);
//       assert(segment_group_each_gpu[gpu][table_id] != NULL);
  
//       output_estimate = SEGMENT_SIZE * segment_group_each_gpu_count[gpu][table_id][sg] * output_selectivity * MULTIPLIER;
  
//       if (last_segment_gpu[gpu][table_id] == sg) {
//         INPUT_LEN = (segment_group_each_gpu_count[gpu][table_id][sg] - 1) * SEGMENT_SIZE + cm->allTable[table_id]->LEN % SEGMENT_SIZE;
//       } else { 
//         INPUT_LEN = segment_group_each_gpu_count[gpu][table_id][sg] * SEGMENT_SIZE;
//       }
  
//       d_segment_group_each_gpu = (short*) cm->customCudaMalloc<short>(cm->allTable[table_id]->total_segment, gpu);
      
//       CubDebugExit(cudaSetDevice(gpu));
//       short* segment_group_each_gpu_ptr = segment_group_each_gpu[gpu][table_id] + (sg * cm->allTable[table_id]->total_segment);
//       CubDebugExit(cudaMemcpyAsync(d_segment_group_each_gpu, segment_group_each_gpu_ptr, segment_group_each_gpu_count[gpu][table_id][sg] * sizeof(short), cudaMemcpyHostToDevice, stream));
//       CubDebugExit(cudaSetDevice(0));
//       // cpu_to_gpu[sg] += (qo->segment_group_each_gpu_count[0][sg] * sizeof(short));
  
//     //first shuffle but got inputs from the CPU, this is not yet supported
//     } else {
//       assert(0);
//     }

//   }

//   //if not first shuffle, no need to allocate (this might not work for the case when the second partitioning returned a larger out_shuffle_col than the first partitioning)
//   if (kernel_type == ShuffleProbePartition) {
//     for (int col = 0; col < NUM_COLUMN; col++) {
//       if (used_col_idx[gpu][col] != NULL) {
//         if (shelper->out_shuffle_col[gpu][col] == NULL && shelper->in_shuffle_col[gpu][col] == NULL) {
//           shelper->out_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
//           shelper->in_shuffle_col[gpu][col] = new int*[NUM_PARTITION]();
//           // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
//           // for the pipelined version, we will allocate separately for each partition
//           for (int partition = 0; partition < NUM_PARTITION; partition++) {
//             shelper->out_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
//             shelper->in_shuffle_col[gpu][col][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
//           }
//           assert(shelper->out_shuffle_col[gpu][col] != NULL);
//           assert(shelper->in_shuffle_col[gpu][col] != NULL);
//         }
//       }
//     }

//     //if there is global offset
//       for (int table = 0; table < NUM_TABLE; table++) {
//         if (table == 0 || (table > 0 && fkey_col_id[table-1] > 0 && group_col_id[table-1] > 0)) {
//           if (shelper->out_shuffle_off[gpu][table] == NULL && shelper->in_shuffle_off[gpu][table] == NULL) {
//             shelper->out_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
//             shelper->in_shuffle_off[gpu][table] = new int*[NUM_PARTITION]();
//             // we will allocate more than need be just for pessimistic assumption in case second shuffle returns larger result than first shuffle
//             for (int partition = 0; partition < NUM_PARTITION; partition++) {
//               shelper->out_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
//               shelper->in_shuffle_off[gpu][table][partition] = cm->customCudaMalloc<int>((int) INPUT_LEN / NUM_PARTITION * output_selectivity * MULTIPLIER, gpu);
//             }
//             assert(shelper->out_shuffle_off[gpu][table] != NULL);
//             assert(shelper->in_shuffle_off[gpu][table] != NULL);
//           }
//         }
//       }
//   }

//   int*** helper_col = new int**[NUM_COLUMN]();
//   for (int col = 0; col < NUM_COLUMN; col++) {
//     if (shelper->out_shuffle_col[gpu][col] != NULL) {
//       assert(shelper->out_shuffle_col[gpu][col] != NULL);
//       helper_col[col] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
//       CubDebugExit(cudaSetDevice(gpu));
//       CubDebugExit(cudaMemcpyAsync(helper_col[col], shelper->out_shuffle_col[gpu][col], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
//       CubDebugExit(cudaSetDevice(0));
//     }
//   }
//   int*** d_out_col = (int***) cm->customCudaMalloc<int**>(NUM_COLUMN, gpu);
//   CubDebugExit(cudaSetDevice(gpu));
//   CubDebugExit(cudaMemcpyAsync(d_out_col, helper_col, NUM_COLUMN * sizeof(int**), cudaMemcpyHostToDevice, stream));
//   CubDebugExit(cudaSetDevice(0));

//   int*** helper_off = new int**[NUM_TABLE]();
//   for (int table = 0; table < NUM_TABLE; table++) {
//     if (shelper->out_shuffle_off[gpu][table] != NULL) {
//       assert(shelper->out_shuffle_off[gpu][table] != NULL);
//       helper_off[table] = (int**) cm->customCudaMalloc<int*>(NUM_PARTITION, gpu);
//       CubDebugExit(cudaSetDevice(gpu));
//       CubDebugExit(cudaMemcpyAsync(helper_off[table], shelper->out_shuffle_off[gpu][table], NUM_PARTITION * sizeof(int*), cudaMemcpyHostToDevice, stream));
//       CubDebugExit(cudaSetDevice(0));
//     }
//   }
//   int*** d_out_off = (int***) cm->customCudaMalloc<int**>(NUM_TABLE, gpu);
//   CubDebugExit(cudaSetDevice(gpu));  
//   CubDebugExit(cudaMemcpyAsync(d_out_off, helper_off, NUM_TABLE * sizeof(int**), cudaMemcpyHostToDevice, stream)); 
//   CubDebugExit(cudaSetDevice(0));

//   int* d_pos = (int*) cm->customCudaMalloc<int>(NUM_PARTITION, gpu);
//   CubDebugExit(cudaSetDevice(gpu));
//   CubDebugExit(cudaMemsetAsync(d_pos, 0, NUM_PARTITION * sizeof(int), stream));
//   CubDebugExit(cudaSetDevice(0));

//   // for (int table = 0; table < NUM_TABLE; table++) {
//   //   if (shelper->in_shuffle_off[gpu][table] != NULL) {
//   //     for (int partition = 0; partition < NUM_PARTITION; partition++) {
//   //         if (shelper->in_shuffle_off[gpu][table][partition] != NULL)
//   //           cout << " in im here for table " << table << " partition " << partition << endl;
//   //     }
//   //   }
//   //   if (shelper->out_shuffle_off[gpu][table] != NULL) {
//   //     for (int partition = 0; partition < NUM_PARTITION; partition++) {
//   //         if (shelper->out_shuffle_off[gpu][table][partition] != NULL)
//   //           cout << " out im here for table " << table << " partition " << partition << endl;
//   //     }
//   //   }
//   // }

//  *sout = {
//     d_pos, d_out_col, d_out_off
//   };

// }

// void
// KernelLaunch::launchKernelPipelined(int latemat, int first_join_in_pipeline) {

//     assert(INPUT_LEN > 0);
//     int tile_items = 128*4;

//     assert(sargs != NULL);
//     assert(sout != NULL);
//     assert(sout->pos != NULL);
//     assert(sout->column != NULL);

//     int num_tiles = 0;
//     int* d_partition_count = NULL;
//     int** d_gpuCache = NULL;
    
//     if (d_segment_group_each_gpu == NULL) {
//       d_partition_count = (int*) cm->customCudaMalloc<int>(NUM_GPU, gpu);
//       int* partition_count = (int*) cm->customCudaHostAlloc<int>(NUM_GPU);
//       int* block_scan_partition = new int[NUM_GPU]();

//       for (int partition = 0; partition < NUM_GPU; partition++) {
//         num_tiles += (shelper->result_count[partition][gpu] + tile_items - 1)/tile_items;
//         partition_count[partition] = shelper->result_count[partition][gpu];
//       }

//       for (int partition = 0; partition < NUM_GPU; partition++) {
//         if (partition == 0) {
//           block_scan_partition[partition] = (partition_count[partition] + tile_items - 1) / tile_items;
//         } else {
//           block_scan_partition[partition] = block_scan_partition[partition-1] + (partition_count[partition] + tile_items - 1) / tile_items;
//         }
//       }

//       CubDebugExit(cudaSetDevice(gpu));
//       CubDebugExit(cudaMemcpyAsync(d_partition_count, partition_count, NUM_GPU * sizeof(int), cudaMemcpyHostToDevice, stream)); 
//       CubDebugExit(cudaSetDevice(0));

//       if (latemat == 2) {
//         d_gpuCache = (int**) cm->customCudaMalloc<int*>(NUM_GPU, gpu);
//         CubDebugExit(cudaSetDevice(gpu));
//         CubDebugExit(cudaMemcpyAsync(d_gpuCache, cm->gpuCache, NUM_GPU * sizeof(int*), cudaMemcpyHostToDevice, stream)); 
//         CubDebugExit(cudaSetDevice(0));
//       }

//     } else {
//       num_tiles = (INPUT_LEN+ tile_items - 1)/tile_items;
//     }

//     assert(num_tiles > 0);

//     if (kernel_type == ShuffleProbe) {
//       assert(d_total != NULL);
//       assert(d_total[gpu] != NULL);
//       if (d_segment_group_each_gpu == NULL) {
//         assert(sargs->column_part != NULL);
//         assert(d_partition_count != NULL);

//         CubDebugExit(cudaSetDevice(gpu));
//         if (latemat == 0) {
//           assert(0);
//         } else if (latemat == 1) {
//           assert(0);
//         } else if (latemat == 2) { //global
//           shuffle_probe_global_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
//             d_gpuCache, d_partition_count,
//             *pargs, *sargs, *out_off, d_total[gpu], gpu, first_join_in_pipeline, 1, 1);
//         } 
//         CubDebugExit(cudaSetDevice(0));
  
//       } else {
//         assert(0);
//       }

//       // CHECK_ERROR_STREAM(stream);
//       // assert(0);
//     } else if (kernel_type == ShuffleProbePartition) {
//       if (d_segment_group_each_gpu == NULL) {
//         assert(sargs->column_part != NULL);
//         assert(d_partition_count != NULL);

//         CubDebugExit(cudaSetDevice(gpu));
//         if (latemat == 0) {
//           assert(0);
//           // shuffle_probe_partition_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
//           //   cm->gpuCache[gpu], d_partition_count, *pargs, *sargs, *sout, gpu, 1, 1, latemat);
//         } else if (latemat == 1) {
//           assert(0);
//           // shuffle_probe_partition_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
//           //   cm->gpuCache[gpu], d_partition_count, *pargs, *sargs, *sout, gpu, 1, 1, latemat);          
//         } else if (latemat == 2) { //global
//           // CHECK_ERROR_STREAM(stream);
//           // test_out_off<128,4><<<1, 128, 0, stream>>>(sout->out_off, 128);

//           // int* tmp = new int[100];
//           // CubDebugExit(cudaSetDevice(0));
//           // CubDebugExit(cudaMemcpyAsync(tmp, shelper->in_shuffle_off[0][0][0], 100 * sizeof(int), cudaMemcpyDeviceToHost, stream));
//           // CubDebugExit(cudaSetDevice(0));  

//           // for (int i = 0; i < 100; i++) {
//           //   printf("tmp %d\n", tmp[i]);
//           // }

//           // CubDebugExit(cudaSetDevice(1));
//           // CubDebugExit(cudaMemcpyAsync(tmp, shelper->in_shuffle_off[1][0][0], 100 * sizeof(int), cudaMemcpyDeviceToHost, stream));
//           // CubDebugExit(cudaSetDevice(0));  

//           // for (int i = 0; i < 100; i++) {
//           //   printf("tmp %d\n", tmp[i]);
//           // }

//           // CHECK_ERROR_STREAM(stream);
//           // test_out_off<128,4><<<num_tiles, 128, 0, stream>>>(*sout, 128);
//           // cout << " look at me gpu " << gpu << " " << num_tiles << endl;
//           // CHECK_ERROR_STREAM(stream);
//           shuffle_probe_partition_global_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(d_gpuCache, d_partition_count,
//             *pargs, *sargs, *sout, gpu, first_join_in_pipeline, 1, 1);
//           // CHECK_ERROR_STREAM(stream);

//           // assert(0);
//         }
//         CubDebugExit(cudaSetDevice(0));
  
//       } else {
//         assert(d_segment_group_each_gpu != NULL);
//         assert(sargs->col_idx != NULL);
  
//         CubDebugExit(cudaSetDevice(gpu));
//         probe_partition_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
//           cm->gpuCache[gpu], *fargs, *pargs, *sargs, *sout, INPUT_LEN, gpu, 0, d_segment_group_each_gpu, 0, 1);
//         CubDebugExit(cudaSetDevice(0));
//       }
//     } else if (kernel_type == ShuffleProbeAggr) {
//       if (d_segment_group_each_gpu == NULL) {
//         assert(sargs->column_part != NULL);
//         assert(d_partition_count != NULL);

//         CubDebugExit(cudaSetDevice(gpu));
//         if (latemat == 0) {
//           assert(0);
//           // shuffle_probe_aggr_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
//           //   d_partition_count, *pargs, *gargs, *sargs, qparams->d_res[gpu], gpu, latemat);
//         } else if (latemat == 1) {
//           assert(0);
//           // shuffle_probe_aggr_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
//           //   d_partition_count, *pargs, *gargs, *sargs, qparams->d_res[gpu], gpu, latemat);
//         } else if (latemat == 2) { //global
//           shuffle_probe_aggr_global_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
//             d_gpuCache, d_partition_count,
//             *pargs, *gargs, *sargs, qparams->d_res[gpu], gpu, first_join_in_pipeline, 1, 0);
//         } 
//         CubDebugExit(cudaSetDevice(0));
  
//       } else {
//         assert(0);
//       }     
//     } else if (kernel_type == ShuffleProbeGroupby) {
//       if (d_segment_group_each_gpu == NULL) {
//         assert(sargs->column_part != NULL);
//         assert(d_partition_count != NULL);

//         CubDebugExit(cudaSetDevice(gpu));
//         if (latemat == 0) {
//           assert(0);
//         } else if (latemat == 1) {
//           assert(0);
//         } else if (latemat == 2) { //global
//           shuffle_probe_group_by_global_GPU<128,4, NUM_GPU><<<num_tiles, 128, 0, stream>>>(
//             d_gpuCache, d_partition_count,
//             *pargs, *gargs, *sargs, qparams->d_res[gpu], gpu, first_join_in_pipeline, 1, 0);
//         } 
//         CubDebugExit(cudaSetDevice(0));
  
//       } else {
//         assert(0);
//       }  
//     } else {
//       assert(0);
//     }

// }

// void
// KernelLaunch::clearPipelined(int*** &off_col) {

//   if (kernel_type == ShuffleProbe) {
//     CubDebugExit(cudaSetDevice(gpu));
//     CubDebugExit(cudaMemcpyAsync(h_total[gpu], d_total[gpu], sizeof(int), cudaMemcpyDeviceToHost, stream));
//     CHECK_ERROR_STREAM(stream);
//     CubDebugExit(cudaSetDevice(0));
//     // cout << *(h_total[gpu]) << " " << output_estimate << endl;
//     assert(*(h_total[gpu]) <= output_estimate);

//     assert(off_col != NULL);
//     assert(off_col[gpu] != NULL);
//     off_col[gpu] = off_col_out;
//     for (int table = 0; table < NUM_TABLE; table++) {
//       off_col[gpu][table] = off_col_out[table];
//     }
//     return;
//   }

//   // int* test = new int[NUM_PARTITION];
//   // CubDebugExit(cudaSetDevice(gpu));
//   // CubDebugExit(cudaMemcpyAsync(test, sout->pos, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
//   // CHECK_ERROR_STREAM(stream);
//   // CubDebugExit(cudaSetDevice(0));
  
//   CubDebugExit(cudaSetDevice(gpu));
//   CubDebugExit(cudaMemcpyAsync(shelper->result_count[gpu], sout->pos, NUM_PARTITION * sizeof(int), cudaMemcpyDeviceToHost, stream));
//   CHECK_ERROR_STREAM(stream);
//   CubDebugExit(cudaSetDevice(0));

//   int* temp;
//   for (int col = 0; col < NUM_COLUMN; col++) {
//     if (shelper->out_shuffle_col[gpu][col] != NULL) {
//       assert(shelper->in_shuffle_col[gpu][col] != NULL);
//       for (int partition = 0; partition < NUM_PARTITION; partition++) {
//         temp = shelper->out_shuffle_col[gpu][col][partition];
//         shelper->out_shuffle_col[gpu][col][partition] = shelper->in_shuffle_col[gpu][col][partition];
//         shelper->in_shuffle_col[gpu][col][partition] = temp;
//       }
//     }
//   }

//   for (int table = 0; table < NUM_TABLE; table++) {
//     if (shelper->out_shuffle_off[gpu][table] != NULL) {
//       assert(shelper->in_shuffle_off[gpu][table] != NULL);
//       for (int partition = 0; partition < NUM_PARTITION; partition++) {
//         // cout << " im here for table " << table << " partition " << partition << endl;
//         temp = shelper->out_shuffle_off[gpu][table][partition];
//         shelper->out_shuffle_off[gpu][table][partition] = shelper->in_shuffle_off[gpu][table][partition];
//         shelper->in_shuffle_off[gpu][table][partition] = temp;
//       }
//     }
//   }

//   // int* tmp = new int[100];
//   // CubDebugExit(cudaSetDevice(0));
//   // CubDebugExit(cudaMemcpyAsync(tmp, shelper->out_shuffle_off[0][0][0], 100 * sizeof(int), cudaMemcpyDeviceToHost, stream));
//   // CubDebugExit(cudaSetDevice(0));

//   // for (int i = 0; i < 100; i++) {
//   //   printf("tmp %d\n", tmp[i]);
//   // }

//   // CubDebugExit(cudaSetDevice(1));
//   // CubDebugExit(cudaMemcpyAsync(tmp, shelper->out_shuffle_off[1][0][0], 100 * sizeof(int), cudaMemcpyDeviceToHost, stream));
//   // CubDebugExit(cudaSetDevice(0));  

//   // for (int i = 0; i < 100; i++) {
//   //   printf("tmp %d\n", tmp[i]);
//   // }

// }