Lancelot / src / gpudb / QueryProcessing.cu
QueryProcessing.cu
Raw
#include "QueryProcessing.h"
#include "CacheManager.h"
#include "QueryOptimizer.h"
#include "MultiGPUProcessing.h"
// #include "common.h"

int queries[13] = {11, 12, 13, 21, 22, 23, 31, 32, 33, 34, 41, 42, 43};

void
QueryProcessing::executeTableDim(int table_id, int sg) {
    int* h_off_col = NULL;
    int*** h_off_col_part = new int**[NUM_GPU]();
    int** d_total = new int*[NUM_GPU]();
    int** h_total = new int*[NUM_GPU]();
    int*** off_col = new int**[NUM_GPU]();
    int* h_total_all = NULL;
    bool filter_on_gpu = 1;

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      h_off_col_part[gpu] = new int*[NUM_TABLE]();
      off_col[gpu] = new int*[NUM_TABLE]();
      h_total[gpu] = (int*) cm->customCudaHostAlloc<int>(1);
      memset(h_total[gpu], 0, sizeof(int));   
      d_total[gpu] = (int*) cm->customCudaMalloc<int>(1, gpu);   
    }

    h_total_all = (int*) cm->customCudaHostAlloc<int>(1);
    memset(h_total_all, 0, sizeof(int));  

    // cout << "dim " << sg << endl;

    if (sg == 0 || sg == 1) {

      if (qo->joinCPUcheck[table_id] && qo->joinGPUcheck[table_id]) {
        // assert(sg != 0);
        // cgp->call_bfilter_CPU(params, h_off_col, h_total_all, sg, table_id);
        // cgp->switch_device_dim(d_off_col, h_off_col, d_total, h_total_all, sg, 0, table_id, streams[sg]);
        // cgp->call_build_GPU(params, d_off_col, h_total_all, sg, table_id, streams[sg]);
        // cgp->call_build_CPU(params, h_off_col, h_total_all, sg, table_id);
        // assert(0);

        if (sg == 1) {
          if (params->ht_replicated[table_id]) {
            cgp->call_bfilter_CPU(params, h_off_col, h_total_all, sg, table_id);
            cgp->switchDeviceCPUtoGPUBroadcastDim(h_total_all, h_total, off_col, h_off_col, table_id, sg, streams[sg]);
          } else {
            cgp->call_filter_partition_CPU(params, h_off_col_part, h_total, sg, table_id);
            cgp->switchDeviceCPUtoGPUScatter(h_total, off_col, h_off_col_part, sg, streams[sg]);
          }
          filter_on_gpu = 0;
        }

        cgp->call_operator_dim_GPU(params, off_col, d_total, h_total, sg, table_id, filter_on_gpu, broadcast[table_id], already_partitioned[table_id], streams[sg]);
        cgp->call_bfilter_build_CPU(params, h_off_col, h_total_all, sg, table_id);

      } else if (qo->joinCPUcheck[table_id] && !(qo->joinGPUcheck[table_id])) {

        cgp->call_bfilter_build_CPU(params, h_off_col, h_total_all, sg, table_id);
        // assert(0);

      } else if (!(qo->joinCPUcheck[table_id]) && qo->joinGPUcheck[table_id]) {
        assert(sg != 0);
        // cgp->call_bfilter_CPU(params, h_off_col, h_total_all, sg, table_id);
        // cgp->switch_device_dim(d_off_col, h_off_col, d_total, h_total_all, sg, 0, table_id, streams[sg]);
        // cgp->call_build_GPU(params, d_off_col, h_total_all, sg, table_id, streams[sg]); 

        if (sg == 1) {
          if (params->ht_replicated[table_id]) {
            cgp->call_bfilter_CPU(params, h_off_col, h_total_all, sg, table_id);
            cgp->switchDeviceCPUtoGPUBroadcastDim(h_total_all, h_total, off_col, h_off_col, table_id, sg, streams[sg]);
          } else {
            cgp->call_filter_partition_CPU(params, h_off_col_part, h_total, sg, table_id);
            cgp->switchDeviceCPUtoGPUScatter(h_total, off_col, h_off_col_part, sg, streams[sg]);
          }
          filter_on_gpu = 0;
        }

        cgp->call_operator_dim_GPU(params, off_col, d_total, h_total, sg, table_id, filter_on_gpu, broadcast[table_id], already_partitioned[table_id], streams[sg]);
        // assert(0);

      }

    } else if (sg == 2 || sg == 3) {

      if (qo->joinGPUcheck[table_id]) {
        assert(sg != 2);
        // cgp->call_bfilter_build_GPU(params, d_off_col, h_total_all, sg, table_id, streams[sg]);
        // assert(0);
        cgp->call_operator_dim_GPU(params, off_col, d_total, h_total, sg, table_id, filter_on_gpu, broadcast[table_id], already_partitioned[table_id], streams[sg]);

      }

      if (qo->joinCPUcheck[table_id]) {

        cgp->call_bfilter_build_CPU(params, h_off_col, h_total_all, sg, table_id);
        // assert(0);
      }
      
    } else {
      assert(0);
    }

}

void
QueryProcessing::executeTableFactSimplified(int sg) {
    int** d_total = new int*[NUM_GPU]();
    int** h_total = new int*[NUM_GPU]();
    int*** off_col = new int**[NUM_GPU]();
    int** h_off_col = NULL;
    int* h_total_all = new int[1]();

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      off_col[gpu] = new int*[NUM_TABLE]();
      for (int table = 0; table < NUM_TABLE; table++) {
        off_col[gpu][table] = NULL;
      }
      h_total[gpu] = (int*) cm->customCudaHostAlloc<int>(1);
      memset(h_total[gpu], 0, sizeof(int));   
      d_total[gpu] = (int*) cm->customCudaMalloc<int>(1, gpu);   
    }
    memset(h_total_all, 0, sizeof(int)); 

    int pipeline = 0;

    if (verbose) printf("fact sg = %d\n", sg);

    if (qo->selectCPUPipelineCol[sg].size() > 0) {
      
      if (qo->joinGPUPipelineCol[sg].size() > 0) {

        cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);
        h_off_col = new int*[NUM_TABLE]();
        cgp->switchDeviceGPUtoCPU(h_total_all, h_total, off_col, h_off_col, sg, streams[sg]);
        // cgp->call_pfilter_CPU(params, h_off_col, h_total_all, sg, qo->selectGPUPipelineCol[sg].size());
        // cgp->call_aggregation_CPU(params, h_off_col[0], h_total_all, sg);
        cgp->call_pfilter_aggr_CPU(params, h_off_col, h_total_all, sg, qo->selectGPUPipelineCol[sg].size());

        // assert(0);

      } else {
        //everything on CPU
        if (qo->groupby_build.size() == 0) cgp->call_pfilter_probe_aggr_CPU(params, h_off_col, h_total_all, sg, 0);
        else assert(0);
      }
      // assert(0);

    } else {
      if (qo->selectGPUPipelineCol[sg].size() > 0 && qo->joinGPUPipelineCol[sg].size() > 0) {
        //filter, probe, aggr on GPU
        if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() > 0) {

          if (qo->groupby_build.size() == 0) {

            // if (pipeline) cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);
            cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);
            
            // cgp->call_pfilter_probe_aggr_GPU(params, off_col, h_total_all, sg, 0, streams[sg]);
          } else assert(0);

        // filter on GPU, part of join and groupby on CPU
        } else if (qo->joinCPUPipelineCol[sg].size() > 0) {
          // cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, 2, streams[sg]);

          // cgp->call_pfilter_probe_GPU(params, off_col, d_total, h_total_all, sg, 0, streams[sg]);
          // cgp->switch_device_fact(off_col, h_off_col, d_total, h_total_all, sg, 1, 0, streams[sg]);
          // if (qo->groupby_build.size() == 0) cgp->call_probe_aggr_CPU(params, h_off_col, h_total_all, sg); 
          // else cgp->call_probe_group_by_CPU(params, h_off_col, h_total_all, sg);
          assert(0);

        // filter and join on GPU, groupby on CPU
        } else if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() == 0) {

          // if (pipeline) cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);
          cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);

          // cgp->call_pfilter_probe_GPU(params, off_col, d_total, h_total_all, sg, 0, streams[sg]);
          h_off_col = new int*[NUM_TABLE]();
          cgp->switchDeviceGPUtoCPU(h_total_all, h_total, off_col, h_off_col, sg, streams[sg]);
          if (qo->groupby_build.size() == 0) cgp->call_aggregation_CPU(params, h_off_col[0], h_total_all, sg); 
          else cgp->call_group_by_CPU(params, h_off_col, h_total_all, sg);
          // assert(0);

        }

      } else if (qo->selectGPUPipelineCol[sg].size() == 0 && qo->joinGPUPipelineCol[sg].size() > 0) {
        //join and groupby on GPU
        if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() > 0) {

          // if (pipeline) cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);
          cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);

          // if (qo->groupby_build.size() == 0) cgp->call_probe_aggr_GPU(params, off_col, h_total_all, sg, streams[sg]); 
          // else cgp->call_probe_group_by_GPU(params, off_col, h_total_all, sg, streams[sg]);   
          // assert(0);

        //part of join on GPU, part of join on CPU, groupby on CPU
        } else if (qo->joinCPUPipelineCol[sg].size() > 0) {

          // if (pipeline) cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);
          cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);

          // cgp->call_probe_GPU(params, off_col, d_total, h_total_all, sg, streams[sg]);
          h_off_col = new int*[NUM_TABLE]();
          // cout << "here " << endl;
          cgp->switchDeviceGPUtoCPU(h_total_all, h_total, off_col, h_off_col, sg, streams[sg]);
          if (qo->groupby_build.size() == 0) cgp->call_probe_aggr_CPU(params, h_off_col, h_total_all, sg); 
          else cgp->call_probe_group_by_CPU(params, h_off_col, h_total_all, sg);
          // assert(0);

        // join on GPU, groupby on CPU
        } else if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() == 0) {

          // if (pipeline) cgp->call_operator_fact_pipeline_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);
          cgp->call_operator_fact_GPU(params, off_col, d_total, h_total, sg, 0, latemat, streams[sg]);

          // cgp->call_probe_GPU(params, off_col, d_total, h_total_all, sg, streams[sg]);
          h_off_col = new int*[NUM_TABLE]();
          cgp->switchDeviceGPUtoCPU(h_total_all, h_total, off_col, h_off_col, sg, streams[sg]);
          if (qo->groupby_build.size() == 0) cgp->call_aggregation_CPU(params, h_off_col[0], h_total_all, sg); 
          else cgp->call_group_by_CPU(params, h_off_col, h_total_all, sg);
          // assert(0);

        }
      } else if (qo->selectGPUPipelineCol[sg].size() > 0 && qo->joinGPUPipelineCol[sg].size() == 0) {
        if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() > 0) {
          assert(0);
        //filter on GPU, join and groupby on CPU
        } else if (qo->joinCPUPipelineCol[sg].size() > 0) {

          cgp->call_filter_GPU(params, off_col, d_total, h_total, sg, 0, 0, streams[sg]);
          h_off_col = new int*[NUM_TABLE]();
          cgp->switchDeviceGPUtoCPU(h_total_all, h_total, off_col, h_off_col, sg, streams[sg]);
          if (qo->groupby_build.size() == 0) cgp->call_probe_aggr_CPU(params, h_off_col, h_total_all, sg); 
          else cgp->call_probe_group_by_CPU(params, h_off_col, h_total_all, sg);
          // assert(0);

        } else if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() == 0) {
          assert(0);
        }
      } else if (qo->selectGPUPipelineCol[sg].size() == 0 && qo->joinGPUPipelineCol[sg].size() == 0) {
        if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() > 0) {
          assert(0);

        //join and groupby on CPU
        } else if (qo->joinCPUPipelineCol[sg].size() > 0) {
          
          if (qo->groupby_build.size() == 0) cgp->call_probe_aggr_CPU(params, h_off_col, h_total_all, sg); 
          else cgp->call_probe_group_by_CPU(params, h_off_col, h_total_all, sg);
          // assert(0);

        } else if (qo->joinCPUPipelineCol[sg].size() == 0 && qo->groupbyGPUPipelineCol[sg].size() == 0) {
          assert(0);
        }   
      }
    }
}

void
QueryProcessing::runQuery(CUcontext ctx) {

  SETUP_TIMING();
  float time;
  cudaEventRecord(start, 0);

  for (int i = 0; i < qo->join.size(); i++) {
    int table_id = qo->join[i].second->table_id;

    //WARNING: MIGHT HAVE TO DO BROADCAST HERE BEFORE GOING INTO PARALLEL FOR
    // cgp->sg_broadcast_count = 0;

    if (NUM_GPU > 1 && qo->joinGPUcheck[table_id]) {
      cout << table_id << " " << qo->par_segment_count[table_id] << endl;
      assert(qo->par_segment_count[table_id] == 1);
    }

    parallel_for(short(0), qo->par_segment_count[table_id], [=](short j){

      CUcontext poppedCtx;
      cuCtxPushCurrent(ctx);

    //NCCL DOES NOT SUPPORT PARALLEL FOR
    // for (short j = 0; j < qo->par_segment_count[table_id]; j++) {
    // for (short j = qo->par_segment_count[table_id]-1; j >= 0; j--) {

      int sg = qo->par_segment[table_id][j];

      if (verbose) {
        cout << qo->join[i].second->column_name << endl;
        printf("sg = %d\n", sg);
      }

      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaStreamCreate(&streams[sg][gpu]));
      }
      CubDebugExit(cudaSetDevice(0));

      if (qo->segment_group_count[table_id][sg] > 0) {
        executeTableDim(table_id, sg);
      }

      for (int gpu = 0; gpu < NUM_GPU; gpu++) {
        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaStreamSynchronize(streams[sg][gpu]));
        CubDebugExit(cudaStreamDestroy(streams[sg][gpu]));
      }
      CubDebugExit(cudaSetDevice(0));

      cuCtxPopCurrent(&poppedCtx);

    });
    // }

    // CubDebugExit(cudaDeviceSynchronize());
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  if (verbose) cout << "Total dim table time " << time << endl;
  // cgp->execution_total += time;

  cudaEventRecord(start, 0);

  cout << qo->par_segment_count[0] << endl;

  parallel_for(short(0), qo->par_segment_count[0], [=](short i){

  //NCCL DOES NOT SUPPORT PARALLEL FOR
  // for (int i = 0; i < qo->par_segment_count[0]; i++) {
    CUcontext poppedCtx;
    cuCtxPushCurrent(ctx);
    int sg = qo->par_segment[0][i];

    // if (sg != 5) continue;

    cout << endl;
    cout << "Currently sg = " << sg << endl;

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaStreamCreate(&streams[sg][gpu]));
    }
    CubDebugExit(cudaSetDevice(0));

    float time_;
    cudaEvent_t start_, stop_; 
    cudaEventCreate(&start_); cudaEventCreate(&stop_);
    cudaEventRecord(start_, 0);

    if (qo->segment_group_count[0][sg] > 0) {
      executeTableFactSimplified(sg);
    }

    cudaEventRecord(stop_, 0);
    cudaEventSynchronize(stop_);
    cudaEventElapsedTime(&time_, start_, stop_);

    if (verbose) cout << "sg = " << sg << " non demand time = " << time_ << endl;

    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      CubDebugExit(cudaSetDevice(gpu));
      CubDebugExit(cudaStreamSynchronize(streams[sg][gpu]));
      CubDebugExit(cudaStreamDestroy(streams[sg][gpu]));
    }
    CubDebugExit(cudaSetDevice(0));

    cuCtxPopCurrent(&poppedCtx);

  });
  // }

  // CubDebugExit(cudaDeviceSynchronize());

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  if (verbose) cout << "Total fact table time " << time << endl;
  // cgp->execution_total += time;

  // cout << "Result:" << endl;
  // int res_count = 0;
  // for (int i=0; i< params->total_val; i++) {
  //   if (params->res[6*i+4] != 0) {
  //     cout << params->res[6*i] << " " << params->res[6*i+1] << " " << params->res[6*i+2] << " " << params->res[6*i+3] << " " << reinterpret_cast<unsigned long long*>(&params->res[6*i+4])[0]  << endl;
  //     res_count++;
  //   }
  // }
  // cout << "Res count = " << res_count << endl;

  cudaEventRecord(start, 0);

  // int* resGPU;
  // if (custom) resGPU = (int*) cm->customCudaHostAlloc<int>(params->total_val * 6);
  // for (int gpu = 0; gpu < NUM_GPU; gpu++) {
  //   CubDebugExit(cudaSetDevice(gpu));
  //   // else CubDebugExit(cudaHostAlloc((void**) &resGPU, params->total_val * 6 * sizeof(int), cudaHostAllocDefault));
  //   CubDebugExit(cudaMemcpy(resGPU, params->d_res[gpu], params->total_val * 6 * sizeof(int), cudaMemcpyDeviceToHost));
  //   cgp->gpu_to_cpu_total += (params->total_val * 6 * sizeof(int));
  //   CubDebugExit(cudaSetDevice(0));

  //   //check this again if this is right or not
  //   merge(params->res, resGPU, params->total_val);
  // }

  int** resGPU = new int*[NUM_GPU]();
  int lead_gpu = 0;
  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    if (gpu == lead_gpu) resGPU[gpu] = params->d_res[gpu];
    else resGPU[gpu] = (int*) cm->customCudaMalloc<int>(params->total_val * 6, lead_gpu);
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CubDebugExit(cudaStreamCreate(&streams[0][gpu]));
    CubDebugExit(cudaSetDevice(0));
  }

  if (NUM_GPU == 8) {
    ncclGroupStart();
    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      if (gpu == lead_gpu) {
        for (int r=0; r < NUM_GPU; r++) {
          ncclRecv(resGPU[r], params->total_val * 6, ncclInt, r, comms[gpu], streams[0][gpu]);
        }
      }
      ncclSend(params->d_res[gpu], params->total_val * 6, ncclInt, lead_gpu, comms[gpu], streams[0][gpu]);
      cgp->nvlink_total += params->total_val * 6;
    }
    ncclGroupEnd();
  } else {
    for (int gpu = 0; gpu < NUM_GPU; gpu++) {
      if (gpu != lead_gpu) {
        CubDebugExit(cudaSetDevice(gpu));
        CubDebugExit(cudaMemcpyAsync(resGPU[gpu], params->d_res[gpu], params->total_val * 6 * sizeof(int), cudaMemcpyDeviceToDevice, streams[0][gpu]));
        cgp->nvlink_total += params->total_val * 6;
        CubDebugExit(cudaSetDevice(0));
      }
    }
  }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(streams[0][gpu]);
    CubDebugExit(cudaSetDevice(0));
  }

  // if (qo->par_segment_count[0] > 1) {
    int* resCPU = (int*) cm->customCudaHostAlloc<int>(params->total_val * 6);
    int** d_resGPU = (int**) cm->customCudaMalloc<int*>(NUM_GPU, lead_gpu);
    CubDebugExit(cudaSetDevice(lead_gpu));
    CubDebugExit(cudaMemcpyAsync(d_resGPU, resGPU, NUM_GPU * sizeof(int*), cudaMemcpyHostToDevice, streams[0][lead_gpu]));
    mergeGPU<NUM_GPU><<<(params->total_val + 128 - 1)/128, 128, 0, streams[0][lead_gpu]>>>(d_resGPU, params->total_val, lead_gpu);
    CubDebugExit(cudaMemcpyAsync(resCPU, params->d_res[lead_gpu], params->total_val * 6 * sizeof(int), cudaMemcpyDeviceToHost, streams[0][lead_gpu]));
    CHECK_ERROR_STREAM(streams[0][lead_gpu])
    CubDebugExit(cudaSetDevice(0));
    merge(params->res, resCPU, params->total_val);
  // } else {
  //   //not all CPU
  //   if (qo->par_segment[0][0] != 0) {  
  //     // cudaEventRecord(start, 0);

  //     // cout << " Here " << endl;
  //     int* resCPU = (int*) cm->customCudaHostAlloc<int>(params->total_val * 6);
  //     int** d_resGPU = (int**) cm->customCudaMalloc<int*>(NUM_GPU, lead_gpu);
  //     CubDebugExit(cudaSetDevice(lead_gpu));
  //     CubDebugExit(cudaMemcpyAsync(d_resGPU, resGPU, NUM_GPU * sizeof(int*), cudaMemcpyHostToDevice, streams[0][lead_gpu]));
  //     mergeGPU<NUM_GPU><<<(params->total_val + 128 - 1)/128, 128, 0, streams[0][lead_gpu]>>>(d_resGPU, params->total_val, lead_gpu);
  //     CubDebugExit(cudaMemcpyAsync(resCPU, params->d_res[lead_gpu], params->total_val * 6 * sizeof(int), cudaMemcpyDeviceToHost, streams[0][lead_gpu]));
  //     CHECK_ERROR_STREAM(streams[0][lead_gpu]);
  //     params->res = resCPU;
  //     CubDebugExit(cudaSetDevice(0));

  //     // cudaEventRecord(stop, 0);
  //     // cudaEventSynchronize(stop);
  //     // cudaEventElapsedTime(&time, start, stop);
  //     // if (verbose) cout << "Merge time " << time << endl;
  //   }
  // }

  for (int gpu = 0; gpu < NUM_GPU; gpu++) {
    CubDebugExit(cudaSetDevice(gpu));
    CHECK_ERROR_STREAM(streams[0][gpu]);
    CubDebugExit(cudaStreamDestroy(streams[0][gpu]));
    CubDebugExit(cudaSetDevice(0));
  }

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  if (verbose) cout << "Merge time " << time << endl;
  cgp->merging_total += time;
}

double
QueryProcessing::processQuery(CUcontext ctx) {

  SETUP_TIMING();
  float time;

  cudaEventRecord(start, 0);

  qo->parseQuery(query);
  qo->prepareQuery(query, dist);
  params = qo->params;

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  cgp->preparing_total += time;

  if (verbose) {
    cout << "Query Prepare Time 1: " << time << endl;
    cout << endl;
  }

  cudaEventRecord(start, 0);

  qo->prepareOperatorPlacement(dist);
  for (int tbl = 0; tbl < qo->join.size(); tbl++) {
      qo->groupBitmapSegmentTable(qo->join[tbl].second->table_id, query, broadcast[qo->join[tbl].second->table_id]);
  }
  //WARNING: THE FACT TABLE HAS TO BE LAST (ALWAYS!!!!)
  qo->groupBitmapSegmentTable(0, query, broadcast[0]);

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);
  cgp->optimization_total += time;

  if (verbose) {
    cout << "Query Optimization Time: " << time << endl;
    cout << endl;    
  }

  cudaEventRecord(start, 0);

  qo->prepareQuery2(query, dist);

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  cgp->preparing_total += time;

  if (verbose) {
    cout << "Query Prepare Time 2: " << time << endl;
    cout << endl;
  }

  cudaEventRecord(start, 0);

  runQuery();

  cudaEventRecord(stop, 0);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&time, start, stop);

  // if (verbose) {
  //   cout << "Total query running time: " << time << endl;
  //   cout << endl;    
  // }

  cgp->execution_total += time;

  for (int sg = 0 ; sg < MAX_GROUPS; sg++) {
    cgp->cpu_to_gpu_total += cgp->cpu_to_gpu[sg];
    cgp->gpu_to_cpu_total += cgp->gpu_to_cpu[sg];
    cgp->nvlink_total += cgp->nvlink_bytes[sg];
    cgp->shuffle_total += cgp->shuffle_time[sg];
    cgp->gpu_total += cgp->gpu_time[sg];
    cgp->cpu_total += cgp->cpu_time[sg];
  }

  if (verbose) {
    cout << "Result:" << endl;
    int res_count = 0;
    for (int i=0; i< params->total_val; i++) {
    // for (int i=0; i< 500; i++) {
      if (params->res[6*i+4] != 0) {
        cout << params->res[6*i] << " " << params->res[6*i+1] << " " << params->res[6*i+2] << " " << params->res[6*i+3] << " " << reinterpret_cast<unsigned long long*>(&params->res[6*i+4])[0]  << endl;
        res_count++;
      }
    }
    cout << "Res count = " << res_count << endl;
    cout << "Query Execution Time: " << time << endl;
    cout << endl;
  }

  // updateStatsQuery();

  if (params->compare1[cm->lo_orderdate] >= 19960101) count_zipfian++;

  qo->clearPlacement();
  endQuery();
  qo->clearParsing();

  return cgp->execution_total + cgp->preparing_total + cgp->optimization_total;

};

void
QueryProcessing::ShuffleAwareExec() {

  if (NUM_GPU == 1) {
      broadcast[0] = false;
      broadcast[1] = false;
      broadcast[2] = false;
      broadcast[3] = false;
      broadcast[4] = false;

      already_partitioned[0] = false;
      already_partitioned[1] = false;
      already_partitioned[2] = false;
      already_partitioned[3] = false;
      already_partitioned[4] = false;
      return;
  }

  //original SSB
  if (SF % 10 == 0) {
      assert(dist != Zipf);
      broadcast[0] = false;
      broadcast[1] = true;
      broadcast[2] = true;
      broadcast[3] = true;
      broadcast[4] = true;

      already_partitioned[0] = false;
      already_partitioned[1] = false;
      already_partitioned[2] = false;
      already_partitioned[3] = false;
      already_partitioned[4] = false;
      return;
  }


  if (dist == Zipf) {
    if (SF == 322 && NUM_GPU == 8) {
      broadcast[0] = false;
      broadcast[1] = true;
      broadcast[2] = true;
      broadcast[3] = false;
      broadcast[4] = true;

      already_partitioned[0] = false;
      already_partitioned[1] = false;
      already_partitioned[2] = false;
      already_partitioned[3] = true;
      already_partitioned[4] = false;
    } else if ((SF == 162 && NUM_GPU >= 4) || (SF == 82 && NUM_GPU >=2)) {
      broadcast[0] = false;
      broadcast[1] = true;
      broadcast[2] = true;
      broadcast[3] = true;
      broadcast[4] = true;

      already_partitioned[0] = false;
      already_partitioned[1] = false;
      already_partitioned[2] = false;
      already_partitioned[3] = false;
      already_partitioned[4] = false;
    } else {
      broadcast[0] = false;
      broadcast[1] = true;
      broadcast[2] = false;
      broadcast[3] = false;
      broadcast[4] = true;

      already_partitioned[0] = false;
      already_partitioned[1] = false;
      already_partitioned[2] = true;
      already_partitioned[3] = true;
      already_partitioned[4] = false;
    }
  } else {
    if (adaptive) {
      broadcast[0] = false;
      broadcast[1] = true;
      broadcast[2] = false;
      broadcast[3] = false;
      broadcast[4] = true;

      already_partitioned[0] = false;
      already_partitioned[1] = false;
      already_partitioned[2] = true;
      already_partitioned[3] = true;
      already_partitioned[4] = false;
    } else {
      broadcast[0] = false;
      broadcast[1] = false;
      broadcast[2] = false;
      broadcast[3] = false;
      broadcast[4] = true;

      already_partitioned[0] = false;
      already_partitioned[1] = true;
      already_partitioned[2] = true;
      already_partitioned[3] = true;
      already_partitioned[4] = false;
    }
  }

  for (int table = 1; table < NUM_TABLE; table++) {
    if (broadcast[table] == false && already_partitioned[table] == false) {
      assert(cm->allTable[table]->total_segment % NUM_GPU == 0 && cm->allTable[table]->total_segment / NUM_GPU >= NUM_GPU 
        && (cm->allTable[table]->total_segment / NUM_GPU) % NUM_GPU == 0);
    }
  }
}

void
QueryProcessing::PartitioningOnlyExec() {

  if (SF % 10 == 0) assert(0);

  if (NUM_GPU == 1) {
      broadcast[0] = false;
      broadcast[1] = false;
      broadcast[2] = false;
      broadcast[3] = false;
      broadcast[4] = false;

      already_partitioned[0] = false;
      already_partitioned[1] = false;
      already_partitioned[2] = false;
      already_partitioned[3] = false;
      already_partitioned[4] = false;
      return;
  }

  if (SF == 322) {
    broadcast[0] = false;
    broadcast[1] = true;
    broadcast[2] = false;
    broadcast[3] = false;
    broadcast[4] = true;

    already_partitioned[0] = false;
    already_partitioned[1] = false;
    already_partitioned[2] = false;
    already_partitioned[3] = false;
    already_partitioned[4] = false;
  } else if (SF == 162) {
    broadcast[0] = false;
    broadcast[1] = true;
    if (query == 33 || query == 34) broadcast[2] = false;
    else broadcast[2] = true;
    // if (NUM_GPU < 4 || query == 42 || query == 43) broadcast[3] = false;
    // else broadcast[3] = true;
    broadcast[3] = false;
    broadcast[4] = true;

    already_partitioned[0] = false;
    already_partitioned[1] = false;
    already_partitioned[2] = false;
    already_partitioned[3] = false;
    already_partitioned[4] = false;    
  } else if (SF == 82) {
    broadcast[0] = false;
    broadcast[1] = true;
    if (query == 32 || query == 33 || query == 34) broadcast[2] = false;
    else broadcast[2] = true;
    if (query == 42 || query == 43) broadcast[3] = false;
    else broadcast[3] = true;
    broadcast[4] = true;

    already_partitioned[0] = false;
    already_partitioned[1] = false;
    already_partitioned[2] = false;
    already_partitioned[3] = false;
    already_partitioned[4] = false;    
  } else {
    // if (query == 22 || query == 23 || query == 31|| query == 32 || query == 33 || query == 34 || query == 42 || query == 43) broadcast[1] = false;
    // else broadcast[1] = true;
    broadcast[0] = false;
    broadcast[1] = true;
    if (query == 33 || query == 34) broadcast[2] = false;
    else broadcast[2] = true;
    if (query == 42 || query == 43) broadcast[3] = false;
    else broadcast[3] = true;
    broadcast[4] = true;

    already_partitioned[0] = false;
    already_partitioned[1] = false;
    already_partitioned[2] = false;
    already_partitioned[3] = false;
    already_partitioned[4] = false;
  }

  for (int table = 1; table < NUM_TABLE; table++) {
    if (broadcast[table] == false && already_partitioned[table] == false) {
      // assert(cm->allTable[table]->total_segment % NUM_GPU == 0 && cm->allTable[table]->total_segment / NUM_GPU >= NUM_GPU 
      //   && (cm->allTable[table]->total_segment / NUM_GPU) % NUM_GPU == 0);
      assert(cm->allTable[table]->total_segment % NUM_GPU == 0 && cm->allTable[table]->total_segment / NUM_GPU >= NUM_GPU);
    }
  }
}

void
QueryProcessing::ReplicationOnlyExec() {

  if (SF % 10 == 0) assert(0);

  broadcast[0] = false;
  broadcast[1] = false;
  broadcast[2] = false;
  broadcast[3] = false;
  broadcast[4] = false;

  already_partitioned[0] = false;
  already_partitioned[1] = false;
  already_partitioned[2] = false;
  already_partitioned[3] = false;
  already_partitioned[4] = false;
}


void
QueryProcessing::percentageData() {

  double fraction[NUM_QUERIES] = {0};

  cout << endl;
  for (int k = 0; k < NUM_QUERIES; k++) {
    int cur_query = queries[k];

    qo->parseQuery(cur_query);

    int total = 0;

    int cached = 0;

    for (int i = 0; i < qo->querySelectColumn.size(); i++) {
      ColumnInfo* column = qo->querySelectColumn[i];
      total += column->total_segment;
      cached += column->tot_seg_in_GPU;
    }

    for (int i = 0; i < qo->queryBuildColumn.size(); i++) {
      ColumnInfo* column = qo->queryBuildColumn[i];
      total += column->total_segment;
      cached += column->tot_seg_in_GPU;
    }

    for (int i = 0; i < qo->queryProbeColumn.size(); i++) {
      ColumnInfo* column = qo->queryProbeColumn[i];
      total += column->total_segment;
      cached += column->tot_seg_in_GPU;
    }

    for (int i = 0; i < qo->queryGroupByColumn.size(); i++) {
      ColumnInfo* column = qo->queryGroupByColumn[i];
      total += column->total_segment;
      cached += column->tot_seg_in_GPU;
    }

    for (int i = 0; i < qo->queryAggrColumn.size(); i++) {
      ColumnInfo* column = qo->queryAggrColumn[i];
      total += column->total_segment;
      cached += column->tot_seg_in_GPU;
    }

    fraction[k] = cached*1.0/total;

    cout << "Query " << cur_query << " fraction: " << fraction[k] << " total: " << total << " cached: " << cached << endl;

    qo->clearParsing();

  }
  cout << endl;


}

void
QueryProcessing::endQuery() {

  qo->clearPrepare();

  // qo->clearVector();

  cm->resetPointer();

  cgp->resetCGP();

  // cgp->resetTime();

}

void
QueryProcessing::updateStatsQuery() {
  chrono::high_resolution_clock::time_point cur_time = chrono::high_resolution_clock::now();
  chrono::duration<double> timestamp = cur_time - cgp->begin_time;

  double time_count = logical_time;
  logical_time += 20;

  for (int i = 0; i < qo->join.size(); i++) {
    for (int col = 0; col < qo->select_build[qo->join[i].second].size(); col++) {  
      cm->updateColumnTimestamp(qo->select_build[qo->join[i].second][col], time_count++);
      ColumnInfo* column = qo->select_build[qo->join[i].second][col];
      for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
        Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id];
        cm->updateSegmentTimeDirect(column, segment, time_count);
        cm->updateSegmentFreqDirect(column, segment);
      }
    }
    cm->updateColumnTimestamp(qo->join[i].second, time_count++);
    ColumnInfo* column = qo->join[i].second;
    for (int seg_id = 0; seg_id < column->total_segment; seg_id++) {
      Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id];
      cm->updateSegmentTimeDirect(column, segment, time_count);
      cm->updateSegmentFreqDirect(column, segment);
    }
  }

  parallel_for(short(0), qo->par_segment_count[0], [=](short i){

      double par_time_count = time_count;

      int sg = qo->par_segment[0][i];

      for (int col = 0; col < qo->selectCPUPipelineCol[sg].size(); col++) {
        ColumnInfo* column = qo->selectCPUPipelineCol[sg][col];
        cm->updateColumnTimestamp(column, par_time_count++);
        for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) {
          int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg];
          Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id];
          cm->updateSegmentTimeDirect(column, segment, par_time_count);
          cm->updateSegmentFreqDirect(column, segment);
        }
      }
      for (int col = 0; col < qo->selectGPUPipelineCol[sg].size(); col++) {
        ColumnInfo* column = qo->selectGPUPipelineCol[sg][col];
        cm->updateColumnTimestamp(column, par_time_count++);
        for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) {
          int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg];
          Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id];
          cm->updateSegmentTimeDirect(column, segment, par_time_count);
          cm->updateSegmentFreqDirect(column, segment);
        }
      }
      for (int col = 0; col < qo->joinGPUPipelineCol[sg].size(); col++) {
        ColumnInfo* column = qo->joinGPUPipelineCol[sg][col];
        cm->updateColumnTimestamp(column, par_time_count++);
        for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) {
          int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg];
          Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id];
          cm->updateSegmentTimeDirect(column, segment, par_time_count);
          cm->updateSegmentFreqDirect(column, segment);
        }
      }
      for (int col = 0; col < qo->joinCPUPipelineCol[sg].size(); col++) {
        ColumnInfo* column = qo->joinCPUPipelineCol[sg][col];
        cm->updateColumnTimestamp(column, par_time_count++);
        for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) {
          int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg];
          Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id];
          cm->updateSegmentTimeDirect(column, segment, par_time_count);
          cm->updateSegmentFreqDirect(column, segment);
        }
      }
      for (int col = 0; col < qo->queryGroupByColumn.size(); col++) {
        ColumnInfo* column = qo->queryGroupByColumn[col];
        cm->updateColumnTimestamp(column, par_time_count++);
        for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) {
          int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg];
          Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id];
          cm->updateSegmentTimeDirect(column, segment, par_time_count);
          cm->updateSegmentFreqDirect(column, segment);
        }
      }
      for (int col = 0; col < qo->queryAggrColumn.size(); col++) {
        ColumnInfo* column = qo->queryAggrColumn[col];
        cm->updateColumnTimestamp(column, par_time_count++);
        for (int seg = 0; seg < qo->segment_group_count[column->table_id][sg]; seg++) {
          int seg_id = qo->segment_group[column->table_id][sg * column->total_segment + seg];
          Segment* segment = qo->cm->index_to_segment[column->column_id][seg_id];
          cm->updateSegmentTimeDirect(column, segment, par_time_count);
          cm->updateSegmentFreqDirect(column, segment);
        }
      }

  });


  for (int i = 0; i < qo->querySelectColumn.size(); i++) {
    ColumnInfo* column = qo->querySelectColumn[i];
    cm->updateColumnFrequency(column);
    cm->updateColumnWeightDirect(column, qo->speedup[query][column]);
  }

  for (int i = 0; i < qo->queryBuildColumn.size(); i++) {
    ColumnInfo* column = qo->queryBuildColumn[i];
    cm->updateColumnFrequency(column);
    cm->updateColumnWeightDirect(column, qo->speedup[query][column]);
  }

  for (int i = 0; i < qo->queryProbeColumn.size(); i++) {
    ColumnInfo* column = qo->queryProbeColumn[i];
    cm->updateColumnFrequency(column);
    cm->updateColumnWeightDirect(column, qo->speedup[query][column]);
  }

  for (int i = 0; i < qo->queryGroupByColumn.size(); i++) {
    ColumnInfo* column = qo->queryGroupByColumn[i];
    cm->updateColumnFrequency(column); 
    cm->updateColumnWeightDirect(column, qo->speedup[query][column]);
  }

  for (int i = 0; i < qo->queryAggrColumn.size(); i++) {
    ColumnInfo* column = qo->queryAggrColumn[i];
    cm->updateColumnFrequency(column);
    cm->updateColumnWeightDirect(column, qo->speedup[query][column]);
  }
}

void
QueryProcessing::dumpTrace(string filename) {

    int data_size = 0;
    int cached_data = 0;

    for (int i = 1; i < cm->TOT_COLUMN; i++) {
      data_size += cm->allColumn[i]->total_segment;
      cached_data += cm->allColumn[i]->tot_seg_in_GPU;
    }

    FILE *fptr = fopen(filename.c_str(), "w");
    if (fptr == NULL)
    {
        printf("Could not open file\n");
        assert(0);
    }
   
    fprintf(fptr, "===========================\n");
    fprintf(fptr, "=======  CACHE INFO  ======\n");
    fprintf(fptr, "===========================\n");

    fprintf(fptr, "\n");
    fprintf(fptr, "Segment size: %d\n", SEGMENT_SIZE);
    fprintf(fptr, "Cache size: %d\n", cm->cache_total_seg);
    fprintf(fptr, "Data size: %d\n", data_size);
    fprintf(fptr, "Cached data: %d\n", cached_data);
    fprintf(fptr, "\n");

    for (int i = 1; i < cm->TOT_COLUMN; i++) {
        fprintf(fptr,"%s: %d/%d segments cached\n", cm->allColumn[i]->column_name.c_str(), cm->allColumn[i]->tot_seg_in_GPU, cm->allColumn[i]->total_segment);
    }

    fprintf(fptr, "\n");
    fprintf(fptr, "\n");
    fprintf(fptr, "\n");

    for (int i = 0; i < NUM_QUERIES; i++) {

        fprintf(fptr, "===========================\n");
        fprintf(fptr, "========  QUERY %d ========\n", queries[i]);
        fprintf(fptr, "===========================\n");

        qo->parseQuery(queries[i]);
        qo->prepareQuery(queries[i], dist);

        int* t_segment = new int[cm->TOT_COLUMN]();
        int* t_c_segment = new int[cm->TOT_COLUMN]();
        int total_cached = 0, total_touched = 0, total_cached_touched = 0;

        countTouchedSegment(0, t_segment, t_c_segment);
        for (int tbl = 0; tbl < qo->join.size(); tbl++) {
          countTouchedSegment(qo->join[tbl].second->table_id, t_segment, t_c_segment);
        }

        for (int col = 0; col < cm->TOT_COLUMN; col++)
        {
          total_cached+=cm->allColumn[col]->tot_seg_in_GPU;
          total_touched+=t_segment[col];
          total_cached_touched+=t_c_segment[col];
        }

        fprintf(fptr, "\n");
        fprintf(fptr,"Segment cached: %d\n", total_cached);
        fprintf(fptr,"Segment touched: %d\n", total_touched);
        fprintf(fptr,"Segment cached and touched: %d\n", total_cached_touched);
        fprintf(fptr, "\n");

        for (int col = 0; col < cm->TOT_COLUMN; col++)
        {
          if (t_segment[col] > 0) {
            fprintf(fptr, "\n");
            fprintf(fptr,"%s\n", cm->allColumn[col]->column_name.c_str());
            fprintf(fptr,"Speedup: %.3f\n", qo->speedup[queries[i]][cm->allColumn[col]]);
            fprintf(fptr,"Segment cached: %d\n", cm->allColumn[col]->tot_seg_in_GPU);
            fprintf(fptr,"Segment touched: %d\n", t_segment[col]);
            fprintf(fptr,"Segment cached and touched: %d\n", t_c_segment[col]);
            fprintf(fptr, "\n");
          }
        }

        fprintf(fptr, "\n");
        fprintf(fptr, "\n");
        fprintf(fptr, "\n");

        delete[] t_segment;
        delete[] t_c_segment;

        endQuery();
        qo->clearParsing();
    }

    fclose(fptr);
}


void
QueryProcessing::countTouchedSegment(int table_id, int* t_segment, int* t_c_segment) {
  int total_segment = cm->allColumn[cm->columns_in_table[table_id][0]]->total_segment;
  for (int i = 0; i < total_segment; i++) {
    if (qo->checkPredicate(table_id, i)) {
      for (int j = 0; j < qo->queryColumn[table_id].size(); j++) {
        ColumnInfo* column = qo->queryColumn[table_id][j];
        t_segment[column->column_id]++;
        if (cm->segment_bitmap[column->column_id][i]) t_c_segment[column->column_id]++;
      }
    }
  }
}