Lancelot / src / gpudb / CPUProcessing.cu
CPUProcessing.cu
Raw
// #ifndef _CPU_PROCESSING_H_
// #define _CPU_PROCESSING_H_

// #include <chrono>
// #include <atomic>
// #include <unistd.h>
// #include <iostream>
// #include <stdio.h>
// #include "tbb/tbb.h"
// #include "KernelArgs.h"

// using namespace std;
// using namespace tbb;

// #define BATCH_SIZE 256
// #define NUM_THREADS 48
// #define TASK_SIZE 1024 //! TASK_SIZE must be a factor of SEGMENT_SIZE and must be less than 20000

#include "CPUProcessing.h"

void filter_probe_CPU(
  struct filterArgsCPU fargs, struct probeArgsCPU pargs, struct offsetCPU out_off, int num_tuples,
  int* total, int start_offset = 0, short* segment_group = NULL) {

  assert(segment_group != NULL);

  // Probe
  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          int segment_idx = segment_group[start / SEGMENT_SIZE];
          unsigned int count = 0;
          unsigned int temp[5][end-start];

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int hash;
              long long slot;
              // int slot1 = 1, slot2 = 1, slot3 = 1; 
              int slot4 = 1;
              int lo_offset;

              lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              // if (fargs.filter_col1 != NULL) {
                if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
                // if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
              // }

              // if (fargs.filter_col2 != NULL) {
                if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
                // if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
              // }

              // if (pargs.ht1 != NULL && pargs.key_col1 != NULL) {
              //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
              //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
              //   if (slot == 0) continue;
              //   slot1 = slot >> 32;
              // }

              // if (pargs.ht2 != NULL && pargs.key_col2 != NULL) {
              //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
              //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
              //   if (slot == 0) continue;
              //   slot2 = slot >> 32;
              // }

              // if (pargs.ht3 != NULL && pargs.key_col3 != NULL) {
              //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
              //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
              //   if (slot == 0) continue;
              //   slot3 = slot >> 32;
              // }

              // if (pargs.ht4 != NULL && pargs.key_col4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
                slot4 = slot >> 32;
              // }


              temp[0][count] = lo_offset;
              // temp[1][count] = slot1-1;
              // temp[2][count] = slot2-1;
              // temp[3][count] = slot3-1;
              temp[4][count] = slot4-1;
              count++;

            }
          }

          for (int i = end_batch ; i < end; i++) {
              int hash;
              long long slot;
              // int slot1 = 1, slot2 = 1, slot3 = 1; 
              int slot4 = 1;
              int lo_offset;

              lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              // if (fargs.filter_col1 != NULL) {
                if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
                // if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
              // }

              // if (fargs.filter_col2 != NULL) {
                if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
                // if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
              // }

              // if (pargs.ht1 != NULL && pargs.key_col1 != NULL) {
              //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
              //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
              //   if (slot == 0) continue;
              //   slot1 = slot >> 32;
              // }

              // if (pargs.ht2 != NULL && pargs.key_col2 != NULL) {
              //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
              //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
              //   if (slot == 0) continue;
              //   slot2 = slot >> 32;
              // }

              // if (pargs.ht3 != NULL && pargs.key_col3 != NULL) {
              //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
              //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
              //   if (slot == 0) continue;
              //   slot3 = slot >> 32;
              // }

              // if (pargs.ht4 != NULL && pargs.key_col4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
                slot4 = slot >> 32;
              // }


              temp[0][count] = lo_offset;
              // temp[1][count] = slot1-1;
              // temp[2][count] = slot2-1;
              // temp[3][count] = slot3-1;
              temp[4][count] = slot4-1;
              count++;
          }

          int thread_off = __atomic_fetch_add(total, count, __ATOMIC_RELAXED);

          for (int i = 0; i < count; i++) {
            assert(out_off.h_lo_off != NULL);
            if (out_off.h_lo_off != NULL) out_off.h_lo_off[thread_off+i] = temp[0][i];
            // if (out_off.h_dim_off1 != NULL) out_off.h_dim_off1[thread_off+i] = temp[1][i];
            // if (out_off.h_dim_off2 != NULL) out_off.h_dim_off2[thread_off+i] = temp[2][i];
            // if (out_off.h_dim_off3 != NULL) out_off.h_dim_off3[thread_off+i] = temp[3][i];
            if (out_off.h_dim_off4 != NULL) out_off.h_dim_off4[thread_off+i] = temp[4][i];
          }

    }

  }, simple_partitioner());
}

void filter_probe_CPU2(struct offsetCPU in_off, struct filterArgsCPU fargs, struct probeArgsCPU pargs,
  struct offsetCPU out_off, int num_tuples, int* total, int start_offset = 0) {

  assert(out_off.h_lo_off != NULL);
  assert(in_off.h_lo_off != NULL);


  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          unsigned int count = 0;
          unsigned int temp[5][end-start];
    
          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int hash;
              long long slot;
              // int slot1 = 1, slot2 = 1, slot3 = 1;
              int slot4 = 1;
              int lo_offset;

              lo_offset = in_off.h_lo_off[start_offset + i];

              // if (fargs.filter_col1 != NULL) {
                if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
                // if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
              // }

              // if (fargs.filter_col2 != NULL) {
                if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
                // if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
              // }

              // if (pargs.ht1 != NULL && pargs.key_col1 != NULL) {
              //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
              //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
              //   if (slot == 0) continue;
              //   slot1 = slot >> 32;
              // } else if (in_off.h_dim_off1 != NULL) slot1 = in_off.h_dim_off1[start_offset + i] + 1;


              // if (pargs.ht2 != NULL && pargs.key_col2 != NULL) {
              //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
              //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
              //   if (slot == 0) continue;
              //   slot2 = slot >> 32;
              // } else if (in_off.h_dim_off2 != NULL) slot2 = in_off.h_dim_off2[start_offset + i] + 1;


              // if (pargs.ht3 != NULL && pargs.key_col3 != NULL) {
              //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
              //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
              //   if (slot == 0) continue;
              //   slot3 = slot >> 32;
              // } else if (in_off.h_dim_off3 != NULL) slot3 = in_off.h_dim_off3[start_offset + i] + 1;


              // if (pargs.ht4 != NULL && pargs.key_col4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
                slot4 = slot >> 32;
              // } else if (in_off.h_dim_off4 != NULL) slot4 = in_off.h_dim_off4[start_offset + i] + 1;


              temp[0][count] = lo_offset;
              // temp[1][count] = slot1-1;
              // temp[2][count] = slot2-1;
              // temp[3][count] = slot3-1;
              temp[4][count] = slot4-1;
              count++;

            }
          }

          for (int i = end_batch ; i < end; i++) {
            int hash;
            long long slot;
            // int slot1 = 1, slot2 = 1, slot3 = 1;
            int slot4 = 1;
            int lo_offset;

            lo_offset = in_off.h_lo_off[start_offset + i];

            // if (fargs.filter_col1 != NULL) {
              if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
              // if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
            // }

            // if (fargs.filter_col2 != NULL) {
              if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
              // if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
            // }

            // if (pargs.ht1 != NULL && pargs.key_col1 != NULL) {
            //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
            //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
            //   if (slot == 0) continue;
            //   slot1 = slot >> 32;
            // } else if (in_off.h_dim_off1 != NULL) slot1 = in_off.h_dim_off1[start_offset + i] + 1;


            // if (pargs.ht2 != NULL && pargs.key_col2 != NULL) {
            //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
            //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
            //   if (slot == 0) continue;
            //   slot2 = slot >> 32;
            // } else if (in_off.h_dim_off2 != NULL) slot2 = in_off.h_dim_off2[start_offset + i] + 1;


            // if (pargs.ht3 != NULL && pargs.key_col3 != NULL) {
            //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
            //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
            //   if (slot == 0) continue;
            //   slot3 = slot >> 32;
            // } else if (in_off.h_dim_off3 != NULL) slot3 = in_off.h_dim_off3[start_offset + i] + 1;


            // if (pargs.ht4 != NULL && pargs.key_col4 != NULL) {
              hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
              slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
              if (slot == 0) continue;
              slot4 = slot >> 32;
            // } else if (in_off.h_dim_off4 != NULL) slot4 = in_off.h_dim_off4[start_offset + i] + 1;


            temp[0][count] = lo_offset;
            // temp[1][count] = slot1-1;
            // temp[2][count] = slot2-1;
            // temp[3][count] = slot3-1;
            temp[4][count] = slot4-1;
            count++;
          }

          int thread_off = __atomic_fetch_add(total, count, __ATOMIC_RELAXED);

          for (int i = 0; i < count; i++) {
            assert(out_off.h_lo_off != NULL);
            if (out_off.h_lo_off != NULL) out_off.h_lo_off[thread_off+i] = temp[0][i];
            // if (out_off.h_dim_off1 != NULL) out_off.h_dim_off1[thread_off+i] = temp[1][i];
            // if (out_off.h_dim_off2 != NULL) out_off.h_dim_off2[thread_off+i] = temp[2][i];
            // if (out_off.h_dim_off3 != NULL) out_off.h_dim_off3[thread_off+i] = temp[3][i];
            if (out_off.h_dim_off4 != NULL) out_off.h_dim_off4[thread_off+i] = temp[4][i];
          }
    }
  }, simple_partitioner());
}

//WARNING:: THIS FUNCTION IS MODIFIED SO THAT WE DON'T HAVE TO MEMSET CPU HT FOR SUPPLIER CUSTOMER AND PART
void probe_CPU(
  struct probeArgsCPU pargs, struct offsetCPU out_off, int num_tuples,
  int* total, int start_offset = 0, short* segment_group = NULL) {

  assert(segment_group != NULL);
  assert(out_off.h_lo_off != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          int segment_idx = segment_group[start / SEGMENT_SIZE];
          unsigned int count = 0;
          unsigned int temp[5][end-start];
    
          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
            int hash;
            // int key;
            long long slot;
            int slot1 = 1, slot2 = 1, slot3 = 1, slot4 = 1;
            int lo_offset;

            lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

            if (pargs.ht1 != NULL && pargs.key_col1 != NULL) {
              hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
              slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
              if (slot == 0) continue;
              slot1 = slot >> 32;

                // key = pargs.key_col1[lo_offset];
                // hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
                // slot1 = pargs.ht1[(hash << 1) + 1];
                // if (slot1 != key - 1) continue;
            }

            if (pargs.ht2 != NULL && pargs.key_col2 != NULL) {
              hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
              slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
              if (slot == 0) continue;
              slot2 = slot >> 32;

                // key = pargs.key_col2[lo_offset];
                // hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
                // slot2 = pargs.ht2[(hash << 1) + 1];
                // if (slot2 != key - 1) continue;              
            }

            if (pargs.ht3 != NULL && pargs.key_col3 != NULL) {
              hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
              slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
              if (slot == 0) continue;
              slot3 = slot >> 32;

                // key = pargs.key_col3[lo_offset];
                // hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
                // slot3 = pargs.ht3[(hash << 1) + 1];
                // if (slot3 != key - 1) continue;
            }

            if (pargs.ht4 != NULL && pargs.key_col4 != NULL) {
              hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
              slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
              if (slot == 0) continue;
              slot4 = slot >> 32;
            }

            temp[0][count] = lo_offset;
            temp[1][count] = slot1-1;
            temp[2][count] = slot2-1;
            temp[3][count] = slot3-1;
            temp[4][count] = slot4-1;
            count++;

            }
          }

          for (int i = end_batch ; i < end; i++) {
            int hash;
            // int key;
            long long slot;
            int slot1 = 1, slot2 = 1, slot3 = 1, slot4 = 1;
            int lo_offset;

            lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

            if (pargs.ht1 != NULL && pargs.key_col1 != NULL) {
              hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
              slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
              if (slot == 0) continue;
              slot1 = slot >> 32;

                // key = pargs.key_col1[lo_offset];
                // hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
                // slot1 = pargs.ht1[(hash << 1) + 1];
                // if (slot1 != key - 1) continue;
            }

            if (pargs.ht2 != NULL && pargs.key_col2 != NULL) {
              hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
              slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
              if (slot == 0) continue;
              slot2 = slot >> 32;

                // key = pargs.key_col2[lo_offset];
                // hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
                // slot2 = pargs.ht2[(hash << 1) + 1];
                // if (slot2 != key - 1) continue;              
            }

            if (pargs.ht3 != NULL && pargs.key_col3 != NULL) {
              hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
              slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
              if (slot == 0) continue;
              slot3 = slot >> 32;

                // key = pargs.key_col3[lo_offset];
                // hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
                // slot3 = pargs.ht3[(hash << 1) + 1];
                // if (slot3 != key - 1) continue;              
            }

            if (pargs.ht4 != NULL && pargs.key_col4 != NULL) {
              hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
              slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
              if (slot == 0) continue;
              slot4 = slot >> 32;
            }

            temp[0][count] = lo_offset;
            temp[1][count] = slot1-1;
            temp[2][count] = slot2-1;
            temp[3][count] = slot3-1;
            temp[4][count] = slot4-1;
            count++;
          }

          int thread_off = __atomic_fetch_add(total, count, __ATOMIC_RELAXED);

          for (int i = 0; i < count; i++) {
            assert(out_off.h_lo_off != NULL);
            if (out_off.h_lo_off != NULL) out_off.h_lo_off[thread_off+i] = temp[0][i];
            if (out_off.h_dim_off1 != NULL) out_off.h_dim_off1[thread_off+i] = temp[1][i];
            if (out_off.h_dim_off2 != NULL) out_off.h_dim_off2[thread_off+i] = temp[2][i];
            if (out_off.h_dim_off3 != NULL) out_off.h_dim_off3[thread_off+i] = temp[3][i];
            if (out_off.h_dim_off4 != NULL) out_off.h_dim_off4[thread_off+i] = temp[4][i];
          }

    }

  }, simple_partitioner());
}

void probe_CPU2(struct offsetCPU in_off, struct probeArgsCPU pargs, struct offsetCPU out_off, int num_tuples,
  int* total, int start_offset = 0) {

  assert(in_off.h_lo_off != NULL);
  assert(out_off.h_lo_off != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          unsigned int count = 0;
          unsigned int temp[5][end-start];
    
          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int hash;
              // int key;
              long long slot;
              int slot1 = 1, slot2 = 1, slot3 = 1, slot4 = 1;
              int lo_offset;

              lo_offset = in_off.h_lo_off[start_offset + i];

              if (pargs.ht1 != NULL && pargs.key_col1 != NULL) {
                hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
                slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
                if (slot == 0) continue;
                slot1 = slot >> 32;

                // key = pargs.key_col1[lo_offset];
                // hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
                // slot1 = pargs.ht1[(hash << 1) + 1];
                // if (slot1 != key - 1) continue;
              } else if (in_off.h_dim_off1 != NULL) slot1 = in_off.h_dim_off1[start_offset + i] + 1;


              if (pargs.ht2 != NULL && pargs.key_col2 != NULL) {
                hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
                slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
                if (slot == 0) continue;
                slot2 = slot >> 32;

                // key = pargs.key_col2[lo_offset];
                // hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
                // slot2 = pargs.ht2[(hash << 1) + 1];
                // if (slot2 != key - 1) continue;
              } else if (in_off.h_dim_off2 != NULL) slot2 = in_off.h_dim_off2[start_offset + i] + 1;


              if (pargs.ht3 != NULL && pargs.key_col3 != NULL) {
                hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
                slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
                if (slot == 0) continue;
                slot3 = slot >> 32;

                // key = pargs.key_col3[lo_offset];
                // hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
                // slot3 = pargs.ht3[(hash << 1) + 1];
                // if (slot3 != key - 1) continue;
              } else if (in_off.h_dim_off3 != NULL) slot3 = in_off.h_dim_off3[start_offset + i] + 1;


              if (pargs.ht4 != NULL && pargs.key_col4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
                slot4 = slot >> 32;
              } else if (in_off.h_dim_off4 != NULL) slot4 = in_off.h_dim_off4[start_offset + i] + 1;


              temp[0][count] = lo_offset;
              temp[1][count] = slot1-1;
              temp[2][count] = slot2-1;
              temp[3][count] = slot3-1;
              temp[4][count] = slot4-1;
              count++;


            }
          }

          for (int i = end_batch ; i < end; i++) {
              int hash;
              // int key;
              long long slot;
              int slot1 = 1, slot2 = 1, slot3 = 1, slot4 = 1;
              int lo_offset;

              lo_offset = in_off.h_lo_off[start_offset + i];

              if (pargs.ht1 != NULL && pargs.key_col1 != NULL) {
                hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
                slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
                if (slot == 0) continue;
                slot1 = slot >> 32;

                // key = pargs.key_col1[lo_offset];
                // hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
                // slot1 = pargs.ht1[(hash << 1) + 1];
                // if (slot1 != key - 1) continue;
              } else if (in_off.h_dim_off1 != NULL) slot1 = in_off.h_dim_off1[start_offset + i] + 1;


              if (pargs.ht2 != NULL && pargs.key_col2 != NULL) {
                hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
                slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
                if (slot == 0) continue;
                slot2 = slot >> 32;

                // key = pargs.key_col2[lo_offset];
                // hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
                // slot2 = pargs.ht2[(hash << 1) + 1];
                // if (slot2 != key - 1) continue;
              } else if (in_off.h_dim_off2 != NULL) slot2 = in_off.h_dim_off2[start_offset + i] + 1;


              if (pargs.ht3 != NULL && pargs.key_col3 != NULL) {
                hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
                slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
                if (slot == 0) continue;
                slot3 = slot >> 32;

                // key = pargs.key_col3[lo_offset];
                // hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
                // slot3 = pargs.ht3[(hash << 1) + 1];
                // if (slot3 != key - 1) continue;
              } else if (in_off.h_dim_off3 != NULL) slot3 = in_off.h_dim_off3[start_offset + i] + 1;


              if (pargs.ht4 != NULL && pargs.key_col4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
                slot4 = slot >> 32;
              } else if (in_off.h_dim_off4 != NULL) slot4 = in_off.h_dim_off4[start_offset + i] + 1;


              temp[0][count] = lo_offset;
              temp[1][count] = slot1-1;
              temp[2][count] = slot2-1;
              temp[3][count] = slot3-1;
              temp[4][count] = slot4-1;
              count++;
          }

          int thread_off = __atomic_fetch_add(total, count, __ATOMIC_RELAXED);

          for (int i = 0; i < count; i++) {
            assert(out_off.h_lo_off != NULL);
            if (out_off.h_lo_off != NULL) out_off.h_lo_off[thread_off+i] = temp[0][i];
            if (out_off.h_dim_off1 != NULL) out_off.h_dim_off1[thread_off+i] = temp[1][i];
            if (out_off.h_dim_off2 != NULL) out_off.h_dim_off2[thread_off+i] = temp[2][i];
            if (out_off.h_dim_off3 != NULL) out_off.h_dim_off3[thread_off+i] = temp[3][i];
            if (out_off.h_dim_off4 != NULL) out_off.h_dim_off4[thread_off+i] = temp[4][i];
          }
    }

  }, simple_partitioner());
}


void probe_group_by_CPU(
  struct probeArgsCPU pargs,  struct groupbyArgsCPU gargs, int num_tuples, 
  int* res, int start_offset = 0, short* segment_group = NULL) {

  assert(segment_group != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          int segment_idx = segment_group[start / SEGMENT_SIZE];

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int hash;
              long long slot;
              int dim_val1 = 0, dim_val2 = 0, dim_val3 = 0, dim_val4 = 0;
              int lo_offset;

              lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
                hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
                slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
                if (slot == 0) {
                  continue;
                }
                dim_val1 = slot;
              }

              if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
                hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
                slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
                if (slot == 0) {
                  continue;
                }
                dim_val2 = slot;
              }

              if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
                hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
                slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
                if (slot == 0) {
                  continue;
                }
                dim_val3 = slot;
              }

              if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) {
                  continue;
                }
                dim_val4 = slot;
              }

              hash = ((dim_val1 - gargs.min_val1) * gargs.unique_val1 + (dim_val2 - gargs.min_val2) * gargs.unique_val2 +  (dim_val3 - gargs.min_val3) * gargs.unique_val3 + (dim_val4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;
              if (dim_val1 != 0) res[hash * 6] = dim_val1;
              if (dim_val2 != 0) res[hash * 6 + 1] = dim_val2;
              if (dim_val3 != 0) res[hash * 6 + 2] = dim_val3;
              if (dim_val4 != 0) res[hash * 6 + 3] = dim_val4;

              int aggr1 = 0; int aggr2 = 0;
              if (gargs.aggr_col1 != NULL) aggr1 = gargs.aggr_col1[lo_offset];
              if (gargs.aggr_col2 != NULL) aggr2 = gargs.aggr_col2[lo_offset];
              // int temp = (*(gargs.h_group_func))(aggr1, aggr2);
              int temp;
              if (aggr1 > aggr2) temp = aggr1 - aggr2;
              else temp = aggr2 - aggr1;

              __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
            }
          }

          for (int i = end_batch ; i < end; i++) {

            int hash;
            long long slot;
            int dim_val1 = 0, dim_val2 = 0, dim_val3 = 0, dim_val4 = 0;
            int lo_offset;

            lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

            if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
              hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
              slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
              if (slot == 0) continue;
              dim_val1 = slot;
            }

            if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
              hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
              slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
              if (slot == 0) continue;
              dim_val2 = slot;
            }

            if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
              hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
              slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
              if (slot == 0) continue;
              dim_val3 = slot;
            }

            if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
              hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
              slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
              if (slot == 0) continue;
              dim_val4 = slot;
            }

            hash = ((dim_val1 - gargs.min_val1) * gargs.unique_val1 + (dim_val2 - gargs.min_val2) * gargs.unique_val2 +  (dim_val3 - gargs.min_val3) * gargs.unique_val3 + (dim_val4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;
            if (dim_val1 != 0) res[hash * 6] = dim_val1;
            if (dim_val2 != 0) res[hash * 6 + 1] = dim_val2;
            if (dim_val3 != 0) res[hash * 6 + 2] = dim_val3;
            if (dim_val4 != 0) res[hash * 6 + 3] = dim_val4;

            int aggr1 = 0; int aggr2 = 0;
            if (gargs.aggr_col1 != NULL) aggr1 = gargs.aggr_col1[lo_offset];
            if (gargs.aggr_col2 != NULL) aggr2 = gargs.aggr_col2[lo_offset];
            // int temp = (*(gargs.h_group_func))(aggr1, aggr2);
            int temp;
            if (aggr1 > aggr2) temp = aggr1 - aggr2;
            else temp = aggr2 - aggr1;

            __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
          }

    }
  }, simple_partitioner());

}

void probe_group_by_CPU2(struct offsetCPU offset,
  struct probeArgsCPU pargs,  struct groupbyArgsCPU gargs, int num_tuples,
  int* res, int start_offset = 0) {

  assert(offset.h_lo_off != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int hash;
              long long slot;
              int dim_val1 = 0, dim_val2 = 0, dim_val3 = 0, dim_val4 = 0;
              int lo_offset;

              lo_offset = offset.h_lo_off[start_offset + i];

              if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
                hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
                slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
                if (slot == 0) continue;
                dim_val1 = slot;
              } else if (gargs.group_col1 != NULL) {
                assert(offset.h_dim_off1 != NULL);
                dim_val1 = gargs.group_col1[offset.h_dim_off1[start_offset + i]];
              }

              if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
                hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
                slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
                if (slot == 0) continue;
                dim_val2 = slot;
              } else if (gargs.group_col2 != NULL) {
                assert(offset.h_dim_off2 != NULL);
                dim_val2 = gargs.group_col2[offset.h_dim_off2[start_offset + i]];
              }

              if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
                hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
                slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
                if (slot == 0) continue;
                dim_val3 = slot;
              } else if (gargs.group_col3 != NULL) {
                assert(offset.h_dim_off3 != NULL);
                dim_val3 = gargs.group_col3[offset.h_dim_off3[start_offset + i]];
              }

              if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
                dim_val4 = slot;
              } else if (gargs.group_col4 != NULL) {
                assert(offset.h_dim_off4 != NULL);
                dim_val4 = gargs.group_col4[offset.h_dim_off4[start_offset + i]];
              }

              hash = ((dim_val1 - gargs.min_val1) * gargs.unique_val1 + (dim_val2 - gargs.min_val2) * gargs.unique_val2 +  (dim_val3 - gargs.min_val3) * gargs.unique_val3 + (dim_val4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;
              if (dim_val1 != 0) res[hash * 6] = dim_val1;
              if (dim_val2 != 0) res[hash * 6 + 1] = dim_val2;
              if (dim_val3 != 0) res[hash * 6 + 2] = dim_val3;
              if (dim_val4 != 0) res[hash * 6 + 3] = dim_val4;

              int aggr1 = 0; int aggr2 = 0;
              if (gargs.aggr_col1 != NULL) aggr1 = gargs.aggr_col1[lo_offset];
              if (gargs.aggr_col2 != NULL) aggr2 = gargs.aggr_col2[lo_offset];
              // int temp = (*(gargs.h_group_func))(aggr1, aggr2);
              int temp;
              if (aggr1 > aggr2) temp = aggr1 - aggr2;
              else temp = aggr2 - aggr1;

              __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
            }
          }

          for (int i = end_batch ; i < end; i++) {

              int hash;
              long long slot;
              int dim_val1 = 0, dim_val2 = 0, dim_val3 = 0, dim_val4 = 0;
              int lo_offset;

              lo_offset = offset.h_lo_off[start_offset + i];

              if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
                hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
                slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
                if (slot == 0) continue;
                dim_val1 = slot;
              } else if (gargs.group_col1 != NULL) {
                assert(offset.h_dim_off1 != NULL);
                dim_val1 = gargs.group_col1[offset.h_dim_off1[start_offset + i]];
              }

              if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
                hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
                slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
                if (slot == 0) continue;
                dim_val2 = slot;
              } else if (gargs.group_col2 != NULL) {
                assert(offset.h_dim_off2 != NULL);
                dim_val2 = gargs.group_col2[offset.h_dim_off2[start_offset + i]];
              }

              if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
                hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
                slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
                if (slot == 0) continue;
                dim_val3 = slot;
              } else if (gargs.group_col3 != NULL) {
                assert(offset.h_dim_off3 != NULL);
                dim_val3 = gargs.group_col3[offset.h_dim_off3[start_offset + i]];
              }

              if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
                dim_val4 = slot;
              } else if (gargs.group_col4 != NULL) {
                assert(offset.h_dim_off4 != NULL);
                dim_val4 = gargs.group_col4[offset.h_dim_off4[start_offset + i]];
              }

              hash = ((dim_val1 - gargs.min_val1) * gargs.unique_val1 + (dim_val2 - gargs.min_val2) * gargs.unique_val2 +  (dim_val3 - gargs.min_val3) * gargs.unique_val3 + (dim_val4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;
              if (dim_val1 != 0) res[hash * 6] = dim_val1;
              if (dim_val2 != 0) res[hash * 6 + 1] = dim_val2;
              if (dim_val3 != 0) res[hash * 6 + 2] = dim_val3;
              if (dim_val4 != 0) res[hash * 6 + 3] = dim_val4;

              int aggr1 = 0; int aggr2 = 0;
              if (gargs.aggr_col1 != NULL) aggr1 = gargs.aggr_col1[lo_offset];
              if (gargs.aggr_col2 != NULL) aggr2 = gargs.aggr_col2[lo_offset];
              // int temp = (*(gargs.h_group_func))(aggr1, aggr2);
              int temp;
              if (aggr1 > aggr2) temp = aggr1 - aggr2;
              else temp = aggr2 - aggr1;

              __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
          }

    }
  }, simple_partitioner());

}

// void filter_probe_group_by_CPU(
//   struct filterArgsCPU fargs, struct probeArgsCPU pargs, struct groupbyArgsCPU gargs,
//   int num_tuples, int* res, int start_offset = 0, short* segment_group = NULL) {

//   assert(segment_group != NULL);

//   int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
//   int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

//   parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
//     unsigned int start_task = range.begin();
//     unsigned int end_task = range.end();

//     for (int task = start_task; task < end_task; task++) {
//           unsigned int start = task * TASK_SIZE;
//           unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
//           unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

//           int segment_idx = segment_group[start / SEGMENT_SIZE];

//           for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
//             #pragma simd
//             for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
//               int hash;
//               long long slot;
//               int dim_val1 = 0, dim_val2 = 0, dim_val3 = 0, dim_val4 = 0;
//               int lo_offset;

//               lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

//               if (fargs.filter_col1 != NULL) {
//                 if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
//               }

//               if (fargs.filter_col2 != NULL) {
//                 if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
//               }

//               // if (fargs.filter_col1 != NULL) {
//               //   if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
//               // }

//               // if (fargs.filter_col2 != NULL) {
//               //   if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
//               // }

//               if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
//                 hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
//                 slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
//                 if (slot == 0) continue;
//                 dim_val1 = slot;
//               }

//               if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
//                 hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
//                 slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
//                 if (slot == 0) continue;
//                 dim_val2 = slot;
//               }

//               if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
//                 hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
//                 slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
//                 if (slot == 0) continue;
//                 dim_val3 = slot;
//               }

//               if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
//                 hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
//                 slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
//                 if (slot == 0) continue;
//                 dim_val4 = slot;
//               }

//               // if (!(pargs.key_col4[lo_offset] > 19930000 && pargs.key_col4[lo_offset] < 19940000)) continue;

//               hash = ((dim_val1 - gargs.min_val1) * gargs.unique_val1 + (dim_val2 - gargs.min_val2) * gargs.unique_val2 +  (dim_val3 - gargs.min_val3) * gargs.unique_val3 + (dim_val4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;
//               if (dim_val1 != 0) res[hash * 6] = dim_val1;
//               if (dim_val2 != 0) res[hash * 6 + 1] = dim_val2;
//               if (dim_val3 != 0) res[hash * 6 + 2] = dim_val3;
//               if (dim_val4 != 0) res[hash * 6 + 3] = dim_val4;

//               int aggr1 = 0; int aggr2 = 0;
//               if (gargs.aggr_col1 != NULL) aggr1 = gargs.aggr_col1[lo_offset];
//               if (gargs.aggr_col2 != NULL) aggr2 = gargs.aggr_col2[lo_offset];
//               // int temp = (*(gargs.h_group_func))(aggr1, aggr2);
//               int temp;
//               if (aggr1 > aggr2) temp = aggr1 - aggr2;
//               else temp = aggr2 - aggr1;

//               __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
//             }
//           }

//           for (int i = end_batch ; i < end; i++) {

//             int hash;
//             long long slot;
//             int dim_val1 = 0, dim_val2 = 0, dim_val3 = 0, dim_val4 = 0;
//             int lo_offset;

//             lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

//             if (fargs.filter_col1 != NULL) {
//               if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
//             }

//             if (fargs.filter_col2 != NULL) {
//               if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
//             }

//             // if (fargs.filter_col1 != NULL) {
//             //   if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
//             // }

//             // if (fargs.filter_col2 != NULL) {
//             //   if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
//             // }

//             if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
//               hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
//               slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
//               if (slot == 0) continue;
//               dim_val1 = slot;
//             }

//             if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
//               hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
//               slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
//               if (slot == 0) continue;
//               dim_val2 = slot;
//             }

//             if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
//               hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
//               slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
//               if (slot == 0) continue;
//               dim_val3 = slot;
//             }

//             if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
//               hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
//               slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
//               if (slot == 0) continue;
//               dim_val4 = slot;
//             }

//             // if (!(pargs.key_col4[lo_offset] > 19930000 && pargs.key_col4[lo_offset] < 19940000)) continue;

//             hash = ((dim_val1 - gargs.min_val1) * gargs.unique_val1 + (dim_val2 - gargs.min_val2) * gargs.unique_val2 +  (dim_val3 - gargs.min_val3) * gargs.unique_val3 + (dim_val4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;
//             if (dim_val1 != 0) res[hash * 6] = dim_val1;
//             if (dim_val2 != 0) res[hash * 6 + 1] = dim_val2;
//             if (dim_val3 != 0) res[hash * 6 + 2] = dim_val3;
//             if (dim_val4 != 0) res[hash * 6 + 3] = dim_val4;

//             int aggr1 = 0; int aggr2 = 0;
//             if (gargs.aggr_col1 != NULL) aggr1 = gargs.aggr_col1[lo_offset];
//             if (gargs.aggr_col2 != NULL) aggr2 = gargs.aggr_col2[lo_offset];
//             // int temp = (*(gargs.h_group_func))(aggr1, aggr2);
//             int temp;
//             if (aggr1 > aggr2) temp = aggr1 - aggr2;
//             else temp = aggr2 - aggr1;

//             __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);

//           }
//     }
//   }, simple_partitioner());

// }

// void filter_probe_group_by_CPU2(struct offsetCPU offset,
//   struct filterArgsCPU fargs, struct probeArgsCPU pargs, struct groupbyArgsCPU gargs,
//   int num_tuples, int* res, int start_offset = 0) {

//   assert(offset.h_lo_off != NULL);

//   int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
//   int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

//   parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
//     unsigned int start_task = range.begin();
//     unsigned int end_task = range.end();

//     for (int task = start_task; task < end_task; task++) {
//           unsigned int start = task * TASK_SIZE;
//           unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
//           unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

//           for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
//             #pragma simd
//             for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
//               int hash;
//               long long slot;
//               int dim_val1 = 0, dim_val2 = 0, dim_val3 = 0, dim_val4 = 0;
//               int lo_offset;

//               lo_offset = offset.h_lo_off[start_offset + i];

//               if (fargs.filter_col1 != NULL) {
//                 if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
//               }

//               if (fargs.filter_col2 != NULL) {
//                 if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
//               }

//               // if (fargs.filter_col1 != NULL) {
//               //   if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
//               // }

//               // if (fargs.filter_col2 != NULL) {
//               //   if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
//               // }

//               if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
//                 hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
//                 slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
//                 if (slot == 0) continue;
//                 dim_val1 = slot;
//               } else if (gargs.group_col1 != NULL) {
//                 assert(offset.h_dim_off1 != NULL);
//                 dim_val1 = gargs.group_col1[offset.h_dim_off1[start_offset + i]];
//               }

//               if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
//                 hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
//                 slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
//                 if (slot == 0) continue;
//                 dim_val2 = slot;
//               } else if (gargs.group_col2 != NULL) {
//                 assert(offset.h_dim_off2 != NULL);
//                 dim_val2 = gargs.group_col2[offset.h_dim_off2[start_offset + i]];
//               }

//               if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
//                 hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
//                 slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
//                 if (slot == 0) continue;
//                 dim_val3 = slot;
//               } else if (gargs.group_col3 != NULL) {
//                 assert(offset.h_dim_off3 != NULL);
//                 dim_val3 = gargs.group_col3[offset.h_dim_off3[start_offset + i]];
//               }

//               if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
//                 hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
//                 slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
//                 if (slot == 0) continue;
//                 dim_val4 = slot;
//               } else if (gargs.group_col4 != NULL) {
//                 assert(offset.h_dim_off4 != NULL);
//                 dim_val4 = gargs.group_col4[offset.h_dim_off4[start_offset + i]];
//               }

//               hash = ((dim_val1 - gargs.min_val1) * gargs.unique_val1 + (dim_val2 - gargs.min_val2) * gargs.unique_val2 +  (dim_val3 - gargs.min_val3) * gargs.unique_val3 + (dim_val4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;
//               if (dim_val1 != 0) res[hash * 6] = dim_val1;
//               if (dim_val2 != 0) res[hash * 6 + 1] = dim_val2;
//               if (dim_val3 != 0) res[hash * 6 + 2] = dim_val3;
//               if (dim_val4 != 0) res[hash * 6 + 3] = dim_val4;

//               int aggr1 = 0; int aggr2 = 0;
//               if (gargs.aggr_col1 != NULL) aggr1 = gargs.aggr_col1[lo_offset];
//               if (gargs.aggr_col2 != NULL) aggr2 = gargs.aggr_col2[lo_offset];
//               // int temp = (*(gargs.h_group_func))(aggr1, aggr2);
//               int temp;
//               if (aggr1 > aggr2) temp = aggr1 - aggr2;
//               else temp = aggr2 - aggr1;

//               __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
//             }
//           }

//           for (int i = end_batch ; i < end; i++) {

//               int hash;
//               long long slot;
//               int dim_val1 = 0, dim_val2 = 0, dim_val3 = 0, dim_val4 = 0;
//               int lo_offset;

//               lo_offset = offset.h_lo_off[start_offset + i];

//               if (fargs.filter_col1 != NULL) {
//                 if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
//               }

//               if (fargs.filter_col2 != NULL) {
//                 if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
//               }

//               // if (fargs.filter_col1 != NULL) {
//               //   if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
//               // }

//               // if (fargs.filter_col2 != NULL) {
//               //   if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
//               // }

//               if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
//                 hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
//                 slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
//                 if (slot == 0) continue;
//                 dim_val1 = slot;
//               } else if (gargs.group_col1 != NULL) {
//                 assert(offset.h_dim_off1 != NULL);
//                 dim_val1 = gargs.group_col1[offset.h_dim_off1[start_offset + i]];
//               }

//               if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
//                 hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
//                 slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
//                 if (slot == 0) continue;
//                 dim_val2 = slot;
//               } else if (gargs.group_col2 != NULL) {
//                 assert(offset.h_dim_off2 != NULL);
//                 dim_val2 = gargs.group_col2[offset.h_dim_off2[start_offset + i]];
//               }

//               if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
//                 hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
//                 slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
//                 if (slot == 0) continue;
//                 dim_val3 = slot;
//               } else if (gargs.group_col3 != NULL) {
//                 assert(offset.h_dim_off3 != NULL);
//                 dim_val3 = gargs.group_col3[offset.h_dim_off3[start_offset + i]];
//               }

//               if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
//                 hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
//                 slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
//                 if (slot == 0) continue;
//                 dim_val4 = slot;
//               } else if (gargs.group_col4 != NULL) {
//                 assert(offset.h_dim_off4 != NULL);
//                 dim_val4 = gargs.group_col4[offset.h_dim_off4[start_offset + i]];
//               }

//               hash = ((dim_val1 - gargs.min_val1) * gargs.unique_val1 + (dim_val2 - gargs.min_val2) * gargs.unique_val2 +  (dim_val3 - gargs.min_val3) * gargs.unique_val3 + (dim_val4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;
//               if (dim_val1 != 0) res[hash * 6] = dim_val1;
//               if (dim_val2 != 0) res[hash * 6 + 1] = dim_val2;
//               if (dim_val3 != 0) res[hash * 6 + 2] = dim_val3;
//               if (dim_val4 != 0) res[hash * 6 + 3] = dim_val4;

//               int aggr1 = 0; int aggr2 = 0;
//               if (gargs.aggr_col1 != NULL) aggr1 = gargs.aggr_col1[lo_offset];
//               if (gargs.aggr_col2 != NULL) aggr2 = gargs.aggr_col2[lo_offset];
//               // int temp = (*(gargs.h_group_func))(aggr1, aggr2);
//               int temp;
//               if (aggr1 > aggr2) temp = aggr1 - aggr2;
//               else temp = aggr2 - aggr1;

//               __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
//           }
//     }
//   }, simple_partitioner());

// }

void build_CPU(struct filterArgsCPU fargs,
  struct buildArgsCPU bargs, int num_tuples, int* hash_table,
  int start_offset = 0, short* segment_group = NULL) {

  assert(bargs.key_col != NULL);
  assert(hash_table != NULL);
  assert(segment_group != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          int segment_idx = segment_group[start / SEGMENT_SIZE];

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int table_offset;
              int flag = 1;

              table_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              if (fargs.filter_col1 != NULL) {
                // flag = (*(fargs.h_filter_func1))(fargs.filter_col1[table_offset], fargs.compare1, fargs.compare2);
                if (fargs.mode1 == 1)
                  flag = (fargs.filter_col1[table_offset] >= fargs.compare1 && fargs.filter_col1[table_offset] <= fargs.compare2);
                else if (fargs.mode1 == 2)
                  flag = (fargs.filter_col1[table_offset] == fargs.compare1 || fargs.filter_col1[table_offset] == fargs.compare2);
              }

              if (flag) {
                int key = bargs.key_col[table_offset];
                int hash = HASH(key, bargs.num_slots, bargs.val_min);
                hash_table[(hash << 1) + 1] = table_offset + 1;
                if (bargs.val_col != NULL) hash_table[hash << 1] = bargs.val_col[table_offset];
              }

            }
          }

          for (int i = end_batch ; i < end; i++) {
            int table_offset;
            int flag = 1;

            table_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

            if (fargs.filter_col1 != NULL) {
              // flag = (*(fargs.h_filter_func1))(fargs.filter_col1[table_offset], fargs.compare1, fargs.compare2);
              if (fargs.mode1 == 1)
                flag = (fargs.filter_col1[table_offset] >= fargs.compare1 && fargs.filter_col1[table_offset] <= fargs.compare2);
              else if (fargs.mode1 == 2)
                flag = (fargs.filter_col1[table_offset] == fargs.compare1 || fargs.filter_col1[table_offset] == fargs.compare2);
            }

            if (flag) {
              int key = bargs.key_col[table_offset];
              int hash = HASH(key, bargs.num_slots, bargs.val_min);
              hash_table[(hash << 1) + 1] = table_offset + 1;
              if (bargs.val_col != NULL) hash_table[hash << 1] = bargs.val_col[table_offset];
            }
          }
    }

  });
}

void build_CPU2(int *dim_off, struct filterArgsCPU fargs,
  struct buildArgsCPU bargs, int num_tuples, int* hash_table,
  int start_offset = 0) {

  assert(bargs.key_col != NULL);
  assert(hash_table != NULL);
  assert(dim_off != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int table_offset;
              // int flag = 1;

              table_offset = dim_off[start_offset + i];

              // if (fargs.filter_col1 != NULL) {
                // flag = (*(fargs.h_filter_func1))(fargs.filter_col1[table_offset], fargs.compare1, fargs.compare2);
              // }

              // if (flag) {
                int key = bargs.key_col[table_offset];
                int hash = HASH(key, bargs.num_slots, bargs.val_min);
                hash_table[(hash << 1) + 1] = table_offset + 1;
                if (bargs.val_col != NULL) hash_table[hash << 1] = bargs.val_col[table_offset];
              // }

            }
          }

          for (int i = end_batch ; i < end; i++) {
            int table_offset;
            // int flag = 1;

            table_offset = dim_off[start_offset + i];

              // if (fargs.filter_col1 != NULL) {
                // flag = (*(fargs.h_filter_func1))(fargs.filter_col1[table_offset], fargs.compare1, fargs.compare2);
              // }

            // if (flag) {
              int key = bargs.key_col[table_offset];
              int hash = HASH(key, bargs.num_slots, bargs.val_min);
              hash_table[(hash << 1) + 1] = table_offset + 1;
              if (bargs.val_col != NULL) hash_table[hash << 1] = bargs.val_col[table_offset];
            // }
          }

    }

  }, simple_partitioner());
}


void filter_CPU(struct filterArgsCPU fargs,
  int* out_off, int num_tuples, int* total,
  int start_offset = 0, short* segment_group = NULL) {

  assert(segment_group != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          int segment_idx = segment_group[start / SEGMENT_SIZE];

          int count = 0;
          int temp[end-start];

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              bool selection_flag = 1;
              int col_offset; 

              col_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              if (fargs.filter_col1 != NULL) {
                // selection_flag = (*(fargs.h_filter_func1))(fargs.filter_col1[col_offset], fargs.compare1, fargs.compare2);
                if (fargs.mode1 == 1)
                  selection_flag = (fargs.filter_col1[col_offset] >= fargs.compare1 && fargs.filter_col1[col_offset] <= fargs.compare2);
                else if (fargs.mode1 == 2)
                  selection_flag = (fargs.filter_col1[col_offset] == fargs.compare1 || fargs.filter_col1[col_offset] == fargs.compare2);
              }

              if (fargs.filter_col2 != NULL) {
                // selection_flag = selection_flag && (*(fargs.h_filter_func2))(fargs.filter_col2[col_offset], fargs.compare3, fargs.compare4);
                if (fargs.mode2 == 1)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] >= fargs.compare3 && fargs.filter_col2[col_offset] <= fargs.compare4);
                else if (fargs.mode2 == 2)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] == fargs.compare3 || fargs.filter_col2[col_offset] == fargs.compare4);
              }

              if (selection_flag) {
                temp[count] = col_offset;
                count++;        
              }
            }
          }

          for (int i = end_batch ; i < end; i++) {
            bool selection_flag = 1;
            int col_offset; 

            col_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              if (fargs.filter_col1 != NULL) {
                // selection_flag = (*(fargs.h_filter_func1))(fargs.filter_col1[col_offset], fargs.compare1, fargs.compare2);
                if (fargs.mode1 == 1)
                  selection_flag = (fargs.filter_col1[col_offset] >= fargs.compare1 && fargs.filter_col1[col_offset] <= fargs.compare2);
                else if (fargs.mode1 == 2)
                  selection_flag = (fargs.filter_col1[col_offset] == fargs.compare1 || fargs.filter_col1[col_offset] == fargs.compare2);
              }

              if (fargs.filter_col2 != NULL) {
                // selection_flag = selection_flag && (*(fargs.h_filter_func2))(fargs.filter_col2[col_offset], fargs.compare3, fargs.compare4);
                if (fargs.mode2 == 1)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] >= fargs.compare3 && fargs.filter_col2[col_offset] <= fargs.compare4);
                else if (fargs.mode2 == 2)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] == fargs.compare3 || fargs.filter_col2[col_offset] == fargs.compare4);
              }

            if (selection_flag) {
              temp[count] = col_offset;
              count++;
            }
          }

          int thread_off = __atomic_fetch_add(total, count, __ATOMIC_RELAXED);

          assert(out_off != NULL);
          for (int i = 0; i < count; i++) {
            out_off[thread_off+i] = temp[i];
          }

    }

  }, simple_partitioner());
}

void filter_CPU2(int* off_col, struct filterArgsCPU fargs,
  int* out_off, int num_tuples, int* total,
  int start_offset = 0) {

  assert(off_col != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          int count = 0;
          int temp[end-start];

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              bool selection_flag = 1;
              int col_offset;

              col_offset = off_col[start_offset + i];

              if (fargs.filter_col1 != NULL) {
                  // selection_flag = (*(fargs.h_filter_func1))(fargs.filter_col1[col_offset], fargs.compare1, fargs.compare2);
                if (fargs.mode1 == 1)
                  selection_flag = (fargs.filter_col1[col_offset] >= fargs.compare1 && fargs.filter_col1[col_offset] <= fargs.compare2);
                else if (fargs.mode1 == 2)
                  selection_flag = (fargs.filter_col1[col_offset] == fargs.compare1 || fargs.filter_col1[col_offset] == fargs.compare2);
              }

              if (fargs.filter_col2 != NULL) {
                // selection_flag = selection_flag && (*(fargs.h_filter_func2))(fargs.filter_col2[col_offset], fargs.compare3, fargs.compare4);
                if (fargs.mode2 == 1)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] >= fargs.compare3 && fargs.filter_col2[col_offset] <= fargs.compare4);
                else if (fargs.mode2 == 2)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] == fargs.compare3 || fargs.filter_col2[col_offset] == fargs.compare4);
              }

              if (selection_flag) {
                temp[count] = col_offset;
                count++;        
              }
            }
          }

          for (int i = end_batch ; i < end; i++) {
            bool selection_flag = 1;
            int col_offset;

            col_offset = off_col[start_offset + i];

              if (fargs.filter_col1 != NULL) {
                  // selection_flag = (*(fargs.h_filter_func1))(fargs.filter_col1[col_offset], fargs.compare1, fargs.compare2);
                if (fargs.mode1 == 1)
                  selection_flag = (fargs.filter_col1[col_offset] >= fargs.compare1 && fargs.filter_col1[col_offset] <= fargs.compare2);
                else if (fargs.mode1 == 2)
                  selection_flag = (fargs.filter_col1[col_offset] == fargs.compare1 || fargs.filter_col1[col_offset] == fargs.compare2);
              }

              if (fargs.filter_col2 != NULL) {
                // selection_flag = selection_flag && (*(fargs.h_filter_func2))(fargs.filter_col2[col_offset], fargs.compare3, fargs.compare4);
                if (fargs.mode2 == 1)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] >= fargs.compare3 && fargs.filter_col2[col_offset] <= fargs.compare4);
                else if (fargs.mode2 == 2)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] == fargs.compare3 || fargs.filter_col2[col_offset] == fargs.compare4);
              }

            if (selection_flag) {
              temp[count] = col_offset;
              count++;
            }
          }

          int thread_off = __atomic_fetch_add(total, count, __ATOMIC_RELAXED);

          assert(out_off != NULL);
          for (int i = 0; i < count; i++) {
            out_off[thread_off+i] = temp[i];
          }
    }

  }, simple_partitioner());
}

void filter_aggr_CPU2(int* off_col, struct filterArgsCPU fargs, struct groupbyArgsCPU gargs,
  int* res, int num_tuples) {

  assert(off_col != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();
    long long local_sum = 0;

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              bool selection_flag = 1;
              int col_offset;

              col_offset = off_col[i];

              if (fargs.filter_col1 != NULL) {
                  // selection_flag = (*(fargs.h_filter_func1))(fargs.filter_col1[col_offset], fargs.compare1, fargs.compare2);
                if (fargs.mode1 == 1)
                  selection_flag = (fargs.filter_col1[col_offset] >= fargs.compare1 && fargs.filter_col1[col_offset] <= fargs.compare2);
                else if (fargs.mode1 == 2)
                  selection_flag = (fargs.filter_col1[col_offset] == fargs.compare1 || fargs.filter_col1[col_offset] == fargs.compare2);
              }

              if (fargs.filter_col2 != NULL) {
                // selection_flag = selection_flag && (*(fargs.h_filter_func2))(fargs.filter_col2[col_offset], fargs.compare3, fargs.compare4);
                if (fargs.mode2 == 1)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] >= fargs.compare3 && fargs.filter_col2[col_offset] <= fargs.compare4);
                else if (fargs.mode2 == 2)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] == fargs.compare3 || fargs.filter_col2[col_offset] == fargs.compare4);
              }

              int aggrval1 = 0, aggrval2 = 0;

              if (selection_flag) {
                if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[col_offset];
                if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[col_offset];
                local_sum += aggrval1 * aggrval2;
              }

            }
          }

          for (int i = end_batch ; i < end; i++) {
            bool selection_flag = 1;
            int col_offset;

            col_offset = off_col[i];

              if (fargs.filter_col1 != NULL) {
                  // selection_flag = (*(fargs.h_filter_func1))(fargs.filter_col1[col_offset], fargs.compare1, fargs.compare2);
                if (fargs.mode1 == 1)
                  selection_flag = (fargs.filter_col1[col_offset] >= fargs.compare1 && fargs.filter_col1[col_offset] <= fargs.compare2);
                else if (fargs.mode1 == 2)
                  selection_flag = (fargs.filter_col1[col_offset] == fargs.compare1 || fargs.filter_col1[col_offset] == fargs.compare2);
              }

              if (fargs.filter_col2 != NULL) {
                // selection_flag = selection_flag && (*(fargs.h_filter_func2))(fargs.filter_col2[col_offset], fargs.compare3, fargs.compare4);
                if (fargs.mode2 == 1)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] >= fargs.compare3 && fargs.filter_col2[col_offset] <= fargs.compare4);
                else if (fargs.mode2 == 2)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] == fargs.compare3 || fargs.filter_col2[col_offset] == fargs.compare4);
              }

              int aggrval1 = 0, aggrval2 = 0;

              if (selection_flag) {
                if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[col_offset];
                if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[col_offset];
                local_sum += aggrval1 * aggrval2;
              }

          }

    }

    __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[4]), (long long)(local_sum), __ATOMIC_RELAXED);

  }, simple_partitioner());
};

void groupByCPU(struct offsetCPU offset, 
  struct groupbyArgsCPU gargs, int num_tuples, int* res) {

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  // parallel_for(blocked_range<size_t>(0, num_tuples), [&](auto range) {
  //   // unsigned int start_task = range.begin();
  //   // unsigned int end_task = range.end();
  //   unsigned int start = range.begin();
  //   unsigned int end = range.end();

  //   // for (int task = start_task; task < end_task; task++) {
  //   //       unsigned int start = task * TASK_SIZE;
  //   //       unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
  //         unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

  //         for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
  //           #pragma simd
  //           for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
  //             int groupval1 = 0, groupval2 = 0, groupval3 = 0, groupval4 = 0;
  //             int aggrval1 = 0, aggrval2 = 0;

  //             if (gargs.group_col1 != NULL) {
  //               assert(offset.h_dim_off1 != NULL);
  //               groupval1 = gargs.group_col1[offset.h_dim_off1[i]];
  //             }

  //             if (gargs.group_col2 != NULL) {
  //               assert(offset.h_dim_off2 != NULL);
  //               groupval2 = gargs.group_col2[offset.h_dim_off2[i]];
  //             }

  //             if (gargs.group_col3 != NULL) {
  //               assert(offset.h_dim_off3 != NULL);
  //               groupval3 = gargs.group_col3[offset.h_dim_off3[i]];
  //             }

  //             if (gargs.group_col4 != NULL) {
  //               assert(offset.h_dim_off4 != NULL);
  //               groupval4 = gargs.group_col4[offset.h_dim_off4[i]];
  //             }

  //             assert(offset.h_lo_off != NULL);
  //             if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[offset.h_lo_off[i]];
  //             if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[offset.h_lo_off[i]];

  //             int hash = ((groupval1 - gargs.min_val1) * gargs.unique_val1 + (groupval2 - gargs.min_val2) * gargs.unique_val2 +  (groupval3 - gargs.min_val3) * gargs.unique_val3 + (groupval4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;

  //             if (groupval1 != 0) res[hash * 6] = groupval1;
  //             if (groupval2 != 0) res[hash * 6 + 1] = groupval2;
  //             if (groupval3 != 0) res[hash * 6 + 2] = groupval3;
  //             if (groupval4 != 0) res[hash * 6 + 3] = groupval4;

  //             if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[offset.h_lo_off[i]];
  //             if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[offset.h_lo_off[i]];
  //             // int temp = (*(gargs.h_group_func))(aggrval1, aggrval2);
  //             int temp;
  //             if (aggrval1 > aggrval2) temp = aggrval1 - aggrval2;
  //             else temp = aggrval2 - aggrval1;

  //             __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
  //           }
  //         }
  //         for (int i = end_batch ; i < end; i++) {
  //             int groupval1 = 0, groupval2 = 0, groupval3 = 0, groupval4 = 0;
  //             int aggrval1 = 0, aggrval2 = 0;

  //             if (gargs.group_col1 != NULL) {
  //               assert(offset.h_dim_off1 != NULL);
  //               groupval1 = gargs.group_col1[offset.h_dim_off1[i]];
  //             }

  //             if (gargs.group_col2 != NULL) {
  //               assert(offset.h_dim_off2 != NULL);
  //               groupval2 = gargs.group_col2[offset.h_dim_off2[i]];
  //             }

  //             if (gargs.group_col3 != NULL) {
  //               assert(offset.h_dim_off3 != NULL);
  //               groupval3 = gargs.group_col3[offset.h_dim_off3[i]];
  //             }

  //             if (gargs.group_col4 != NULL) {
  //               assert(offset.h_dim_off4 != NULL);
  //               groupval4 = gargs.group_col4[offset.h_dim_off4[i]];
  //             }

  //             assert(offset.h_lo_off != NULL);
  //             if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[offset.h_lo_off[i]];
  //             if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[offset.h_lo_off[i]];

  //             int hash = ((groupval1 - gargs.min_val1) * gargs.unique_val1 + (groupval2 - gargs.min_val2) * gargs.unique_val2 +  (groupval3 - gargs.min_val3) * gargs.unique_val3 + (groupval4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;

  //             if (groupval1 != 0) res[hash * 6] = groupval1;
  //             if (groupval2 != 0) res[hash * 6 + 1] = groupval2;
  //             if (groupval3 != 0) res[hash * 6 + 2] = groupval3;
  //             if (groupval4 != 0) res[hash * 6 + 3] = groupval4;

  //             if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[offset.h_lo_off[i]];
  //             if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[offset.h_lo_off[i]];
  //             // int temp = (*(gargs.h_group_func))(aggrval1, aggrval2);
  //             int temp;
  //             if (aggrval1 > aggrval2) temp = aggrval1 - aggrval2;
  //             else temp = aggrval2 - aggrval1;

  //             __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
  //         }

  //   // }
  // });

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int groupval1 = 0, groupval2 = 0, groupval3 = 0, groupval4 = 0;
              int aggrval1 = 0, aggrval2 = 0;

              if (gargs.group_col1 != NULL) {
                assert(offset.h_dim_off1 != NULL);
                groupval1 = gargs.group_col1[offset.h_dim_off1[i]];
              }

              if (gargs.group_col2 != NULL) {
                assert(offset.h_dim_off2 != NULL);
                groupval2 = gargs.group_col2[offset.h_dim_off2[i]];
              }

              if (gargs.group_col3 != NULL) {
                assert(offset.h_dim_off3 != NULL);
                groupval3 = gargs.group_col3[offset.h_dim_off3[i]];
              }

              if (gargs.group_col4 != NULL) {
                assert(offset.h_dim_off4 != NULL);
                groupval4 = gargs.group_col4[offset.h_dim_off4[i]];
              }

              assert(offset.h_lo_off != NULL);
              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[offset.h_lo_off[i]];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[offset.h_lo_off[i]];

              int hash = ((groupval1 - gargs.min_val1) * gargs.unique_val1 + (groupval2 - gargs.min_val2) * gargs.unique_val2 +  (groupval3 - gargs.min_val3) * gargs.unique_val3 + (groupval4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;

              if (groupval1 != 0) res[hash * 6] = groupval1;
              if (groupval2 != 0) res[hash * 6 + 1] = groupval2;
              if (groupval3 != 0) res[hash * 6 + 2] = groupval3;
              if (groupval4 != 0) res[hash * 6 + 3] = groupval4;

              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[offset.h_lo_off[i]];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[offset.h_lo_off[i]];
              // int temp = (*(gargs.h_group_func))(aggrval1, aggrval2);
              int temp;
              if (aggrval1 > aggrval2) temp = aggrval1 - aggrval2;
              else temp = aggrval2 - aggrval1;

              __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
            }
          }
          for (int i = end_batch ; i < end; i++) {
              int groupval1 = 0, groupval2 = 0, groupval3 = 0, groupval4 = 0;
              int aggrval1 = 0, aggrval2 = 0;

              if (gargs.group_col1 != NULL) {
                assert(offset.h_dim_off1 != NULL);
                groupval1 = gargs.group_col1[offset.h_dim_off1[i]];
              }

              if (gargs.group_col2 != NULL) {
                assert(offset.h_dim_off2 != NULL);
                groupval2 = gargs.group_col2[offset.h_dim_off2[i]];
              }

              if (gargs.group_col3 != NULL) {
                assert(offset.h_dim_off3 != NULL);
                groupval3 = gargs.group_col3[offset.h_dim_off3[i]];
              }

              if (gargs.group_col4 != NULL) {
                assert(offset.h_dim_off4 != NULL);
                groupval4 = gargs.group_col4[offset.h_dim_off4[i]];
              }

              assert(offset.h_lo_off != NULL);
              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[offset.h_lo_off[i]];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[offset.h_lo_off[i]];

              int hash = ((groupval1 - gargs.min_val1) * gargs.unique_val1 + (groupval2 - gargs.min_val2) * gargs.unique_val2 +  (groupval3 - gargs.min_val3) * gargs.unique_val3 + (groupval4 - gargs.min_val4) * gargs.unique_val4) % gargs.total_val;

              if (groupval1 != 0) res[hash * 6] = groupval1;
              if (groupval2 != 0) res[hash * 6 + 1] = groupval2;
              if (groupval3 != 0) res[hash * 6 + 2] = groupval3;
              if (groupval4 != 0) res[hash * 6 + 3] = groupval4;

              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[offset.h_lo_off[i]];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[offset.h_lo_off[i]];
              // int temp = (*(gargs.h_group_func))(aggrval1, aggrval2);
              int temp;
              if (aggrval1 > aggrval2) temp = aggrval1 - aggrval2;
              else temp = aggrval2 - aggrval1;

              __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[hash * 6 + 4]), (long long)(temp), __ATOMIC_RELAXED);
          }

    }
  });
}

void aggregationCPU(int* lo_off, 
  struct groupbyArgsCPU gargs, int num_tuples, int* res) {

  assert(lo_off != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();
    long long local_sum = 0;

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;
    
          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int aggrval1 = 0, aggrval2 = 0;

              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[lo_off[i]];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[lo_off[i]];

              // local_sum += (*(gargs.h_group_func))(aggrval1, aggrval2);
              local_sum += aggrval1 * aggrval2;

            }
          }
          for (int i = end_batch ; i < end; i++) {
            int aggrval1 = 0, aggrval2 = 0;

            assert(lo_off != NULL);
            if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[lo_off[i]];
            if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[lo_off[i]];

            // local_sum += (*(gargs.h_group_func))(aggrval1, aggrval2);
            local_sum += aggrval1 * aggrval2;

          }

    }

    __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[4]), (long long)(local_sum), __ATOMIC_RELAXED);

  });
}


void probe_aggr_CPU(
  struct probeArgsCPU pargs, struct groupbyArgsCPU gargs, int num_tuples,
  int* res, int start_offset = 0, short* segment_group = NULL) {

  assert(segment_group != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    long long local_sum = 0;

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          int segment_idx = segment_group[start / SEGMENT_SIZE];

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int hash;
              long long slot;
              int lo_offset;

              lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              // if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
              //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
              //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
              //   if (slot == 0) continue;
              // }

              // if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
              //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
              //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
              //   if (slot == 0) continue;
              // }

              // if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
              //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
              //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
              //   if (slot == 0) continue;
              // }

              if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
              }

              int aggrval1 = 1, aggrval2 = 1;
              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[lo_offset];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[lo_offset];
              // local_sum += (*(gargs.h_group_func))(aggrval1, aggrval2);
              local_sum += aggrval1 * aggrval2;
            }
          }

          for (int i = end_batch ; i < end; i++) {

            int hash;
            long long slot;
            int lo_offset;

            lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

            // if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
            //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
            //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
            //   if (slot == 0) continue;
            // }

            // if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
            //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
            //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
            //   if (slot == 0) continue;
            // }

            // if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
            //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
            //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
            //   if (slot == 0) continue;
            // }

            if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
              hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
              slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
              if (slot == 0) continue;
            }

            int aggrval1 = 1, aggrval2 = 1;
            if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[lo_offset];
            if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[lo_offset];
            // local_sum += (*(gargs.h_group_func))(aggrval1, aggrval2);
            local_sum += aggrval1 * aggrval2;
          }
    }

    __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[4]), (long long)(local_sum), __ATOMIC_RELAXED);

  }, simple_partitioner());

}

void probe_aggr_CPU2(struct offsetCPU offset,
  struct probeArgsCPU pargs, struct groupbyArgsCPU gargs, 
  int num_tuples, int* res, int start_offset = 0) {

  assert(offset.h_lo_off != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    long long local_sum = 0;

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int hash;
              long long slot;
              int lo_offset;

              lo_offset = offset.h_lo_off[start_offset + i];

              // if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
              //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
              //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
              //   if (slot == 0) continue;
              // }

              // if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
              //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
              //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
              //   if (slot == 0) continue;
              // }

              // if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
              //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
              //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
              //   if (slot == 0) continue;
              // }

              if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
              }

              int aggrval1 = 1, aggrval2 = 1;
              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[lo_offset];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[lo_offset];
              // local_sum += (*(gargs.h_group_func))(aggrval1, aggrval2);
              local_sum += aggrval1 * aggrval2;

            }
          }

          for (int i = end_batch ; i < end; i++) {

            int hash;
            long long slot;
            int lo_offset;

            lo_offset = offset.h_lo_off[start_offset + i];

            // if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
            //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
            //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
            //   if (slot == 0) continue;
            // }

            // if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
            //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
            //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
            //   if (slot == 0) continue;
            // }

            // if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
            //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
            //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
            //   if (slot == 0) continue;
            // }

            if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
              hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
              slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
              if (slot == 0) continue;
            }

              int aggrval1 = 1, aggrval2 = 1;
              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[lo_offset];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[lo_offset];
              // local_sum += (*(gargs.h_group_func))(aggrval1, aggrval2);
              local_sum += aggrval1 * aggrval2;
          }
    }

    __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[4]), (long long)(local_sum), __ATOMIC_RELAXED);

  }, simple_partitioner());

}

void filter_probe_aggr_CPU(
  struct filterArgsCPU fargs, struct probeArgsCPU pargs, struct groupbyArgsCPU gargs,
  int num_tuples, int* res, int start_offset = 0, short* segment_group = NULL) {

  assert(segment_group != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    long long local_sum = 0;

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          int segment_idx = segment_group[start / SEGMENT_SIZE];

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int hash;
              long long slot;
              int lo_offset;

              lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              // lo_offset = start_offset + i;

              // bool selection_flag;
              // selection_flag = (pargs.key_col4[i] > 19930000 && pargs.key_col4[i] < 19940000);
              // selection_flag = selection_flag && (fargs.filter_col1[i] >= fargs.compare1 && fargs.filter_col1[i] <= fargs.compare2);
              // selection_flag = selection_flag && (fargs.filter_col2[i] >= fargs.compare3 && fargs.filter_col2[i] <= fargs.compare4);

              // if (selection_flag) local_sum += (gargs.aggr_col1[i] * gargs.aggr_col2[i]);

              // if (!(pargs.key_col4[lo_offset] > 19930000 && pargs.key_col4[lo_offset] < 19940000)) continue;

              // if (fargs.filter_col1 != NULL) {
                if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
                // if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
              // }

              // if (fargs.filter_col2 != NULL) {
                if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
                // if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
              // }

              // if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
              //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
              //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
              //   if (slot == 0) continue;
              // }

              // if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
              //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
              //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
              //   if (slot == 0) continue;
              // }

              // if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
              //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
              //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
              //   if (slot == 0) continue;
              // }

              // if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
              // }

              // if (!(pargs.key_col4[lo_offset] > 19930000 && pargs.key_col4[lo_offset] < 19940000)) continue;

              int aggrval1 = 1, aggrval2 = 1;
              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[lo_offset];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[lo_offset];
              // local_sum += (*(gargs.h_group_func))(aggrval1, aggrval2);
              local_sum += aggrval1 * aggrval2;

            }
          }

          for (int i = end_batch ; i < end; i++) {

            int hash;
            long long slot;
            int lo_offset;

            lo_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

            // lo_offset = start_offset + i;

            // bool selection_flag;
            // selection_flag = (pargs.key_col4[i] > 19930000 && pargs.key_col4[i] < 19940000);
            // selection_flag = selection_flag && (fargs.filter_col1[i] >= fargs.compare1 && fargs.filter_col1[i] <= fargs.compare2);
            // selection_flag = selection_flag && (fargs.filter_col2[i] >= fargs.compare3 && fargs.filter_col2[i] <= fargs.compare4);

            // if (selection_flag) local_sum += (gargs.aggr_col1[i] * gargs.aggr_col2[i]);

            // if (!(pargs.key_col4[lo_offset] > 19930000 && pargs.key_col4[lo_offset] < 19940000)) continue;

            // if (fargs.filter_col1 != NULL) {
              if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
              // if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
            // }

            // if (fargs.filter_col2 != NULL) {
              if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
              // if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
            // }

            // if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
            //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
            //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
            //   if (slot == 0) continue;
            // }

            // if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
            //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
            //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
            //   if (slot == 0) continue;
            // }

            // if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
            //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
            //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
            //   if (slot == 0) continue;
            // }

            // if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
              hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
              slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
              if (slot == 0) continue;
            // }

            // if (!(pargs.key_col4[lo_offset] > 19930000 && pargs.key_col4[lo_offset] < 19940000)) continue;

              int aggrval1 = 1, aggrval2 = 1;
              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[lo_offset];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[lo_offset];
              // local_sum += (*(gargs.h_group_func))(aggrval1, aggrval2);
              local_sum += aggrval1 * aggrval2;

          }

    }

    __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[4]), (long long)(local_sum), __ATOMIC_RELAXED);

  }, simple_partitioner());

}

void filter_probe_aggr_CPU2(struct offsetCPU offset,
  struct filterArgsCPU fargs, struct probeArgsCPU pargs, struct groupbyArgsCPU gargs,
  int num_tuples, int* res, int start_offset = 0) {

  assert(offset.h_lo_off != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    long long local_sum = 0;

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int hash;
              long long slot;
              int lo_offset;

              lo_offset = offset.h_lo_off[start_offset + i];

              // if (fargs.filter_col1 != NULL) {
                // if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
                // if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
              // }

              // if (fargs.filter_col2 != NULL) {
                if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
                // if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
              // }

              // if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
              //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
              //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
              //   if (slot == 0) continue;
              // }

              // if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
              //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
              //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
              //   if (slot == 0) continue;
              // }

              // if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
              //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
              //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
              //   if (slot == 0) continue;
              // }

              // if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
                hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
                slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
                if (slot == 0) continue;
              // }

              int aggrval1 = 1, aggrval2 = 1;
              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[lo_offset];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[lo_offset];
              // local_sum += (*(gargs.h_group_func))(aggrval1, aggrval2);
              local_sum += aggrval1 * aggrval2;
            }
          }

          for (int i = end_batch ; i < end; i++) {

            int hash;
            long long slot;
            int lo_offset;

            lo_offset = offset.h_lo_off[start_offset + i];

            // if (fargs.filter_col1 != NULL) {
              // if (!(fargs.filter_col1[lo_offset] >= fargs.compare1 && fargs.filter_col1[lo_offset] <= fargs.compare2)) continue; //only for Q1.x
              // if (!(*(fargs.h_filter_func1))(fargs.filter_col1[lo_offset], fargs.compare1, fargs.compare2)) continue;
            // }

            // if (fargs.filter_col2 != NULL) {
              if (!(fargs.filter_col2[lo_offset] >= fargs.compare3 && fargs.filter_col2[lo_offset] <= fargs.compare4)) continue; //only for Q1.x
              // if (!(*(fargs.h_filter_func2))(fargs.filter_col2[lo_offset], fargs.compare3, fargs.compare4)) continue;
            // }

            // if (pargs.key_col1 != NULL && pargs.ht1 != NULL) {
            //   hash = HASH(pargs.key_col1[lo_offset], pargs.dim_len1, pargs.min_key1);
            //   slot = reinterpret_cast<long long*>(pargs.ht1)[hash];
            //   if (slot == 0) continue;
            // }

            // if (pargs.key_col2 != NULL && pargs.ht2 != NULL) {
            //   hash = HASH(pargs.key_col2[lo_offset], pargs.dim_len2, pargs.min_key2);
            //   slot = reinterpret_cast<long long*>(pargs.ht2)[hash];
            //   if (slot == 0) continue;
            // }

            // if (pargs.key_col3 != NULL && pargs.ht3 != NULL) {
            //   hash = HASH(pargs.key_col3[lo_offset], pargs.dim_len3, pargs.min_key3);
            //   slot = reinterpret_cast<long long*>(pargs.ht3)[hash];
            //   if (slot == 0) continue;
            // }

            // if (pargs.key_col4 != NULL && pargs.ht4 != NULL) {
              hash = HASH(pargs.key_col4[lo_offset], pargs.dim_len4, pargs.min_key4);
              slot = reinterpret_cast<long long*>(pargs.ht4)[hash];
              if (slot == 0) continue;
            // }

              int aggrval1 = 1, aggrval2 = 1;
              if (gargs.aggr_col1 != NULL) aggrval1 = gargs.aggr_col1[lo_offset];
              if (gargs.aggr_col2 != NULL) aggrval2 = gargs.aggr_col2[lo_offset];
              // local_sum += (*(gargs.h_group_func))(aggrval1, aggrval2);
              local_sum += aggrval1 * aggrval2;
          }
    }

     __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&res[4]), (long long)(local_sum), __ATOMIC_RELAXED);

  }, simple_partitioner());

}


void merge(int* resCPU, int* resGPU, int num_tuples) {

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              if (resCPU[i * 6] == 0) resCPU[i * 6] = resGPU[i * 6];
              if (resCPU[i * 6 + 1] == 0) resCPU[i * 6 + 1] = resGPU[i * 6 + 1];
              if (resCPU[i * 6 + 2] == 0) resCPU[i * 6 + 2] = resGPU[i * 6 + 2];
              if (resCPU[i * 6 + 3] == 0) resCPU[i * 6 + 3] = resGPU[i * 6 + 3];
              // if (resCPU[i*6+4] != 0) printf("%llu\n", reinterpret_cast<unsigned long long*>(&resCPU[6*i+4])[0]);
              unsigned long long* tmp = reinterpret_cast<unsigned long long*>(&resGPU[i * 6 + 4]);
              __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&resCPU[i * 6 + 4]), (long long)(*tmp), __ATOMIC_RELAXED);
              // if (resCPU[i*6+4] != 0) printf("%llu\n", reinterpret_cast<unsigned long long*>(&resCPU[6*i+4])[0]);
              // reinterpret_cast<unsigned long long*>(resCPU)[i * 3 + 2] += reinterpret_cast<unsigned long long*>(resGPU)[i * 3 + 2];
            }
          }
          for (int i = end_batch ; i < end; i++) {
            if (resCPU[i * 6] == 0) resCPU[i * 6] = resGPU[i * 6];
            if (resCPU[i * 6 + 1] == 0) resCPU[i * 6 + 1] = resGPU[i * 6 + 1];
            if (resCPU[i * 6 + 2] == 0) resCPU[i * 6 + 2] = resGPU[i * 6 + 2];
            if (resCPU[i * 6 + 3] == 0) resCPU[i * 6 + 3] = resGPU[i * 6 + 3];
            // if (resCPU[i*6+4] != 0) printf("%llu\n", reinterpret_cast<unsigned long long*>(&resCPU[6*i+4])[0]);
            unsigned long long* tmp = reinterpret_cast<unsigned long long*>(&resGPU[i * 6 + 4]);
            __atomic_fetch_add(reinterpret_cast<unsigned long long*>(&resCPU[i * 6 + 4]), (long long)(*tmp), __ATOMIC_RELAXED);
            // if (resCPU[i*6+4] != 0) printf("%llu\n", reinterpret_cast<unsigned long long*>(&resCPU[6*i+4])[0]);
            // reinterpret_cast<unsigned long long*>(resCPU)[i * 3 + 2] += reinterpret_cast<unsigned long long*>(resGPU)[i * 3 + 2];
          }
    }
  });
}

void build_CPU_minmax(struct filterArgsCPU fargs,
  struct buildArgsCPU bargs, int num_tuples, int* hash_table, int* min_global, int* max_global, 
  int start_offset = 0, short* segment_group = NULL) {

  assert(bargs.key_col != NULL);
  assert(hash_table != NULL);
  assert(segment_group != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    int min = bargs.val_max, max = bargs.val_min;

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          int segment_idx = segment_group[start / SEGMENT_SIZE];

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int table_offset;
              int flag = 1;

              table_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              if (fargs.filter_col1 != NULL) {
                flag = (*(fargs.h_filter_func1))(fargs.filter_col1[table_offset], fargs.compare1, fargs.compare2);

              }

              if (flag) {
                int key = bargs.key_col[table_offset];
                if (key < min) min = key;
                if (key > max) max = key;
                int hash = HASH(key, bargs.num_slots, bargs.val_min);
                hash_table[(hash << 1) + 1] = table_offset + 1;
                if (bargs.val_col != NULL) hash_table[hash << 1] = bargs.val_col[table_offset];
              }

            }
          }

          for (int i = end_batch ; i < end; i++) {
            int table_offset;
            int flag = 1;

            table_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

            if (fargs.filter_col1 != NULL) {
              flag = (*(fargs.h_filter_func1))(fargs.filter_col1[table_offset], fargs.compare1, fargs.compare2);
            }

            if (flag) {
              int key = bargs.key_col[table_offset];
              if (key < min) min = key;
              if (key > max) max = key;
              int hash = HASH(key, bargs.num_slots, bargs.val_min);
              hash_table[(hash << 1) + 1] = table_offset + 1;
              if (bargs.val_col != NULL) hash_table[hash << 1] = bargs.val_col[table_offset];
            }
          }
    }

    bool ret;
    int prev_max = *max_global;
    int prev_min = *min_global;

    do {
      ret = true;
      if (max > prev_max)
        ret = __atomic_compare_exchange_n(max_global, &prev_max, max, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
    } while (!ret);

    do {    
      ret = true;
      if (min < prev_min)
        ret = __atomic_compare_exchange_n(min_global, &prev_min, min, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
    } while (!ret);

  });
}

void build_CPU_minmax2(int *dim_off, struct filterArgsCPU fargs,
  struct buildArgsCPU bargs, int num_tuples, int* hash_table, int* min_global, int* max_global, 
  int start_offset = 0) {

  assert(bargs.key_col != NULL);
  assert(hash_table != NULL);
  assert(dim_off != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    int min = bargs.val_max, max = bargs.val_min;

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              int table_offset;
              // int flag = 1;

              table_offset = dim_off[start_offset + i];

              // if (flag) {
                int key = bargs.key_col[table_offset];
                if (key < min) min = key;
                if (key > max) max = key;
                int hash = HASH(key, bargs.num_slots, bargs.val_min);
                hash_table[(hash << 1) + 1] = table_offset + 1;
                if (bargs.val_col != NULL) hash_table[hash << 1] = bargs.val_col[table_offset];
              // }

            }
          }

          for (int i = end_batch ; i < end; i++) {
            int table_offset;
            // int flag = 1;

            table_offset = dim_off[start_offset + i];

            // if (flag) {
              int key = bargs.key_col[table_offset];
              if (key < min) min = key;
              if (key > max) max = key;
              int hash = HASH(key, bargs.num_slots, bargs.val_min);
              hash_table[(hash << 1) + 1] = table_offset + 1;
              if (bargs.val_col != NULL) hash_table[hash << 1] = bargs.val_col[table_offset];
            // }
          }

    }

    bool ret;
    int prev_max = *max_global;
    int prev_min = *min_global;

    do {
      ret = true;
      if (max > prev_max)
        ret = __atomic_compare_exchange_n(max_global, &prev_max, max, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
    } while (!ret);

    do {    
      ret = true;
      if (min < prev_min)
        ret = __atomic_compare_exchange_n(min_global, &prev_min, min, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
    } while (!ret);


  }, simple_partitioner());
}

void filter_partition_CPU(struct filterArgsCPU fargs,
  int*** out_off, int num_tuples, int** total, int table, int** seg_row_to_single_gpu,
  int start_offset = 0, short* segment_group = NULL) {

  assert(segment_group != NULL);

  int task_count = (num_tuples + TASK_SIZE - 1)/TASK_SIZE;
  int rem_task = (num_tuples % TASK_SIZE == 0) ? (TASK_SIZE):(num_tuples % TASK_SIZE);

  parallel_for(blocked_range<size_t>(0, task_count), [&](auto range) {
    unsigned int start_task = range.begin();
    unsigned int end_task = range.end();

    for (int task = start_task; task < end_task; task++) {
          unsigned int start = task * TASK_SIZE;
          unsigned int end = (task == task_count - 1) ? (task * TASK_SIZE + rem_task):(task * TASK_SIZE + TASK_SIZE);
          unsigned int end_batch = start + ((end - start)/BATCH_SIZE) * BATCH_SIZE;

          int segment_idx = segment_group[start / SEGMENT_SIZE];

          int count = 0;
          int temp[end-start];

          for (int batch_start = start; batch_start < end_batch; batch_start += BATCH_SIZE) {
            #pragma simd
            for (int i = batch_start; i < batch_start + BATCH_SIZE; i++) {
              bool selection_flag = 1;
              int col_offset; 

              col_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              if (fargs.filter_col1 != NULL) {
                if (fargs.mode1 == 1)
                  selection_flag = (fargs.filter_col1[col_offset] >= fargs.compare1 && fargs.filter_col1[col_offset] <= fargs.compare2);
                else if (fargs.mode1 == 2)
                  selection_flag = (fargs.filter_col1[col_offset] == fargs.compare1 || fargs.filter_col1[col_offset] == fargs.compare2);
              }

              if (fargs.filter_col2 != NULL) {
                if (fargs.mode2 == 1)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] >= fargs.compare3 && fargs.filter_col2[col_offset] <= fargs.compare4);
                else if (fargs.mode2 == 2)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] == fargs.compare3 || fargs.filter_col2[col_offset] == fargs.compare4);
              }

              if (selection_flag) {
                temp[count] = col_offset;
                count++;        
              }
            }
          }

          for (int i = end_batch ; i < end; i++) {
            bool selection_flag = 1;
            int col_offset; 

            col_offset = segment_idx * SEGMENT_SIZE + (i % SEGMENT_SIZE);

              if (fargs.filter_col1 != NULL) {
                // selection_flag = (*(fargs.h_filter_func1))(fargs.filter_col1[col_offset], fargs.compare1, fargs.compare2);
                if (fargs.mode1 == 1)
                  selection_flag = (fargs.filter_col1[col_offset] >= fargs.compare1 && fargs.filter_col1[col_offset] <= fargs.compare2);
                else if (fargs.mode1 == 2)
                  selection_flag = (fargs.filter_col1[col_offset] == fargs.compare1 || fargs.filter_col1[col_offset] == fargs.compare2);
              }

              if (fargs.filter_col2 != NULL) {
                // selection_flag = selection_flag && (*(fargs.h_filter_func2))(fargs.filter_col2[col_offset], fargs.compare3, fargs.compare4);
                if (fargs.mode2 == 1)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] >= fargs.compare3 && fargs.filter_col2[col_offset] <= fargs.compare4);
                else if (fargs.mode2 == 2)
                  selection_flag = selection_flag && (fargs.filter_col2[col_offset] == fargs.compare3 || fargs.filter_col2[col_offset] == fargs.compare4);
              }

            if (selection_flag) {
              temp[count] = col_offset;
              count++;
            }
          }

          int seg = start / SEGMENT_SIZE;
          int gpu = seg_row_to_single_gpu[table][seg];
          int thread_off_gpu = __atomic_fetch_add(total[gpu], count, __ATOMIC_RELAXED);
          assert(out_off[gpu] != NULL);
          assert(out_off[gpu][table] != NULL);
          for (int i = 0; i < count; i++) {
            out_off[gpu][table][thread_off_gpu+i] = temp[i];
          }

    }

  }, simple_partitioner());
}

// #endif