Lancelot / src / cpu / select.cpp
select.cpp
Raw
#include <math.h>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <immintrin.h>

#include "tbb/tbb.h"
#include "tbb/parallel_for.h"

#include "utils/cpu_utils.h"

using namespace std;
using namespace tbb;

#define BATCH_SIZE 2048

float selectIfCPU(float* in, float* out, int num_items, float cutpoint,
    int& num_selected) {
  chrono::high_resolution_clock::time_point start, finish;
  const int batch_size = BATCH_SIZE;

  start = chrono::high_resolution_clock::now();
  tbb::atomic<int> counter;
  counter = 0;

  parallel_for(blocked_range<size_t>(0, num_items, batch_size), [&](auto range) {
    int count = 0;
    for (size_t i = range.begin(); i < range.end(); i++) {
      count += (in[i] < cutpoint);
    }
    int offset = counter.fetch_and_add(count);
    for (size_t i = range.begin(); i < range.end(); i++) {
      if (in[i] < cutpoint) {
        out[offset++] = in[i];
      }
    }
  });
  finish = chrono::high_resolution_clock::now();

  num_selected = counter.fetch_and_add(0);

  std::chrono::duration<double> diff = finish - start;
  return diff.count() * 1000;
}

float selectFlaggedCPU(float* in, float* val, float* out, char* flags,
    int num_items, float cutpoint, int& num_selected) {
  chrono::high_resolution_clock::time_point start, finish;
  const int batch_size = BATCH_SIZE;

  start = chrono::high_resolution_clock::now();
  tbb::atomic<uint> counter = 0;
  parallel_for(blocked_range<size_t>(0, num_items, batch_size), [&](auto range) {
    uint count = 0;
    for (size_t i = range.begin(); i < range.end(); i++) {
      flags[i] = (in[i] < cutpoint);
      count += flags[i];
    }
    uint offset = counter.fetch_and_add(count);
    for (size_t i = range.begin(); i < range.end(); i++) {
      if (flags[i]) {
        out[offset++] = val[i];
      }
    }
  });
  finish = chrono::high_resolution_clock::now();

  num_selected = counter.fetch_and_add(0);

  return (chrono::duration_cast<chrono::milliseconds>(finish-start)).count();
}

float selectIfPredCPU(float* in, float* out, int num_items, float cutpoint,
    int& num_selected) {
  chrono::high_resolution_clock::time_point start, finish;
  const int batch_size = BATCH_SIZE;

  start = chrono::high_resolution_clock::now();
  tbb::atomic<uint> counter = 0;
  parallel_for(blocked_range<size_t>(0, num_items, batch_size), [&](auto range) {
    uint count = 0;
    for (size_t i = range.begin(); i < range.end(); i++) {
      count += (in[i] < cutpoint);
    }
    uint offset = counter.fetch_and_add(count);
    for (size_t i = range.begin(); i < range.end(); i++) {
      out[offset] = in[i];
      offset += (in[i] < cutpoint);
    }
  });
  finish = chrono::high_resolution_clock::now();

  num_selected = counter.fetch_and_add(0);

  std::chrono::duration<double> diff = finish - start;
  return diff.count() * 1000;
}

float selectFlaggedPredCPU(float* in, float* val, float* out, char* flags,
    int num_items, float cutpoint, int& num_selected) {
  chrono::high_resolution_clock::time_point start, finish;
  const int batch_size = BATCH_SIZE;

  start = chrono::high_resolution_clock::now();
  tbb::atomic<uint> counter = 0;
  parallel_for(blocked_range<size_t>(0, num_items, batch_size), [&](auto range) {
    uint count = 0;
    for (size_t i = range.begin(); i < range.end(); i++) {
      flags[i] = (in[i] < cutpoint);
      count += flags[i];
    }
    uint offset = counter.fetch_and_add(count);
    for (size_t i = range.begin(); i < range.end(); i++) {
      out[offset] = val[i];
      offset += flags[i];
    }
  });
  finish = chrono::high_resolution_clock::now();

  num_selected = counter.fetch_and_add(0);

  std::chrono::duration<double> diff = finish - start;
  return diff.count() * 1000;
}


float selectSIMDCPU_nostream(float* in, float* out, int num_items, float cutpoint,
    int& num_selected) {
  chrono::high_resolution_clock::time_point start, finish;
  const int batch_size = BATCH_SIZE;

  start = chrono::high_resolution_clock::now();
  tbb::atomic<int> counter;
  counter = 0;

  parallel_for(blocked_range<size_t>(0, num_items, batch_size), [&](auto range) {
    int count = 0;
    #pragma simd
    for (size_t i = range.begin(); i < range.end(); i++) {
      count += (in[i] < cutpoint);
    }
    int offset = counter.fetch_and_add(count);
    const __m256 all_1 = _mm256_set1_ps(0xffff);
    const __m256 cps = _mm256_set1_ps(cutpoint);
    size_t o = range.begin();
    if (range.begin() % 8 != 0 || range.end() % 8 != 0)
        printf("TODO. input data does not align with AVX2\n");
    for (size_t i = range.begin(); i < range.end(); i+= 8) {
      __m256 invec = _mm256_load_ps(&in[i]);
      __m256 comp = _mm256_cmp_ps(invec, cps, _CMP_GT_OQ);
      if (!_mm256_testc_si256(comp, all_1)) {
        size_t m = _mm256_movemask_ps(comp);
        m ^= 255;
        do {
          size_t j = __tzcnt_u64(m);
          uint32_t k = in[i + j];
          out[offset++] = in[i];
          m &= m - 1;
        } while(m);
      }
    }
  });
  finish = chrono::high_resolution_clock::now();

  num_selected = counter.fetch_and_add(0);

  std::chrono::duration<double> diff = finish - start;
  return diff.count() * 1000;
}

float selectSIMDCPU(float* in, float* out, int num_items, float cutpoint,
    int& num_selected) {
  chrono::high_resolution_clock::time_point start, finish;
  const int batch_size = BATCH_SIZE;

  start = chrono::high_resolution_clock::now();
  tbb::atomic<int> counter;
  tbb::atomic<int> counter_tail;
  counter = 0;
  counter_tail = 0;
  float tail_buffer[32*64];
  __m256 mask = _mm256_set1_ps(cutpoint);
  parallel_for(blocked_range<size_t>(0, num_items, batch_size), [&](auto range) {
    int k = 0;
    int buf_size = 256 + 8;
    float buffer[buf_size] __attribute__ ((aligned (32)));
    for (size_t i = range.begin(); i < range.end(); i += 8) {
      __m256 key = _mm256_load_ps(&in[i]);
      // generates bitmask - 1111 for true 0000 for false
      __m256 cmp = _mm256_cmp_ps(key, mask, _CMP_LT_OS);
      /* load key columns and evaluate predicates */
      if (!_mm256_testz_ps(cmp, cmp)) {
        /* permute and store the input pointers */
        // The following algorithm is copied from https://stackoverflow.com/questions/36932240/avx2-what-is-the-most-efficient-way-to-pack-left-based-on-a-mask?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa
        // gets 8 sign bits
        int m = _mm256_movemask_ps(cmp); //_mm256_castsi256_ps(cmp));
        uint64_t expanded_mask = _pdep_u64(m, 0x0101010101010101);  // unpack each bit to a byte
        expanded_mask *= 0xFF;   // ABC... -> AAAAAAAABBBBBBBBCCCCCCCC...: replicate each bit to fill its byte
        const uint64_t identity_indices = 0x0706050403020100;    // the identity shuffle for vpermps, packed to one index per byte
        uint64_t wanted_indices = _pext_u64(identity_indices, expanded_mask);
        __m128i bytevec = _mm_cvtsi64_si128(wanted_indices);
        __m256i shufmask = _mm256_cvtepu8_epi32(bytevec);
        __m256 ptr = _mm256_permutevar8x32_ps(key, shufmask);
        __m256i cmp_i = _mm256_cvtps_epi32(cmp);
        _mm256_maskstore_ps(&buffer[k], cmp_i, ptr);
        k += _mm_popcnt_u64(m);
        if (k > buf_size - 8) {
          int offset = counter.fetch_and_add(buf_size - 8);
          /* flush the buffer */
          int b;
          for (b=0; b!=buf_size-8; b+=8) {
            /* dereference column values and store */
            __m256 ptr = _mm256_load_ps(&buffer[b]);
            _mm256_stream_ps(&out[offset + b], ptr);
          }
            /* move extra items to the start of the buffer */
          ptr = _mm256_load_ps(&buffer[b]);
          _mm256_store_ps(&buffer[0], ptr);
          k -= buf_size - 8;
        }
      }
    }
    // int offset = counter_tail.fetch_and_add(k);
    // TODO. copy of the tail of each local buffer
    //rid = _mm256_add_epi32(rid, mask_8);
    //for (int i=0; i<k; i++)
    int offset = counter.fetch_and_add((k >> 3) << 3);
    int b;
    for (b = 0; b < k; b += 8) {
      __m256 ptr = _mm256_load_ps(&buffer[b]);
      _mm256_stream_ps(&out[offset + b], ptr);
    }
    offset = counter_tail.fetch_and_add(k - ((k >> 3) << 3));
    //for (; b < k; b++) {
      //out[offset + b] = buffer[b];
    /*}*/
  });

  //memcpy(&out[counter.fetch_and_add(0)], tail_buffer, counter_tail.fetch_and_add(0));
  finish = chrono::high_resolution_clock::now();
  num_selected = counter.fetch_and_add(0) + counter_tail.fetch_and_add(0);
  std::chrono::duration<double> diff = finish - start;
  return diff.count() * 1000;
}


//---------------------------------------------------------------------
// Main
//---------------------------------------------------------------------

int main(int argc, char** argv) {
    int num_items           = 1<<28;
    int num_trials          = 3;

    // Initialize command line
    CommandLineArgs args(argc, argv);
    args.GetCmdLineArgument("n", num_items);
    args.GetCmdLineArgument("t", num_trials);

    // Print usage
    if (args.CheckCmdLineFlag("help")) {
        printf("%s "
            "[--n=<input items>] "
            "[--t=<num trials>] "
            "[--device=<device-id>] "
            "[--v] "
            "\n", argv[0]);
        exit(0);
    }

    float *h_in, *h_val, *h_out;
    char *h_flags;

    h_in = (float*) _mm_malloc(sizeof(float) * num_items, 256);
    h_val = (float*) malloc(sizeof(float) * num_items);
    h_out = (float*) _mm_malloc(sizeof(float) * num_items, 256);
    h_flags = (char*) malloc(sizeof(float) * num_items);

    parallel_for(blocked_range<size_t>(0, num_items, 32 * 1024), [&](auto range) {
      unsigned int seed = range.begin();
      for (size_t i = range.begin(); i < range.end(); i++) {
        h_in[i] = static_cast <float> (rand_r(&seed)) / static_cast <float> (RAND_MAX);;
        h_val[i] = h_in[i];
        h_out[i] = 0;
        h_flags[i] = 0;
      }
    });

    for (int t = 0; t < num_trials; t++) {
        for (int i = 0; i <= 10; i++) {
            float selectivity = i/10.0;

            float time_flagged_cpu, time_if_cpu;
            float time_simd_cpu_no_stream, time_simd_cpu;
            float time_flagged_pred_cpu, time_if_pred_cpu;
            int num_selected_flagged_cpu, num_selected_if_cpu;
            int num_selected_simd_cpu_no_stream, num_selected_simd_cpu;
            int num_selected_flagged_pred_cpu, num_selected_if_pred_cpu;

            time_if_cpu = selectIfCPU(h_in, h_out, num_items, selectivity,
                num_selected_if_cpu);

            time_simd_cpu_no_stream = selectSIMDCPU_nostream(h_in, h_out, num_items, selectivity,
                num_selected_simd_cpu_no_stream);

            time_simd_cpu = selectSIMDCPU(h_in, h_out, num_items, selectivity,
                num_selected_simd_cpu);

            time_flagged_cpu = selectFlaggedCPU(h_in, h_val, h_out, h_flags, num_items, selectivity,
                num_selected_flagged_cpu);

            time_if_pred_cpu = selectIfPredCPU(h_in, h_out, num_items, selectivity,
                num_selected_if_pred_cpu);

            time_flagged_pred_cpu = selectFlaggedPredCPU(h_in, h_val, h_out, h_flags, num_items, selectivity,
                num_selected_flagged_pred_cpu);

            int s = num_selected_flagged_cpu;
            if (!(s == num_selected_flagged_cpu &&
                s == num_selected_if_cpu &&
                s == num_selected_simd_cpu &&
                s == num_selected_simd_cpu_no_stream &&
                s == num_selected_flagged_pred_cpu && s == num_selected_if_pred_cpu)) {
                cout << "Answers don't match. "
                     << "\n\tif_cpu: " << num_selected_if_cpu
                     << "\n\tsimd_cpu: " <<  num_selected_simd_cpu
                     << endl;
            }

            cout<< "{"
                << "\"selectivity\":" << selectivity
                << ",\"time_if_cpu\":" << time_if_cpu
                << ",\"time_simd_cpu\":" << time_simd_cpu
                << ",\"time_simd_cpu_no_stream\":" << time_simd_cpu_no_stream
                << ",\"time_flagged_cpu\":" << time_flagged_cpu
                << ",\"time_if_pred_cpu\":" << time_if_pred_cpu
                << ",\"time_flagged_pred_cpu\":" << time_flagged_pred_cpu
                << ",\"num_selected\":" << num_selected_flagged_cpu
                << ",\"num_entries\":" << num_items
                << ",\"selectivity_real\":" << (((float) num_selected_flagged_cpu) / num_items)
                << "}" << endl;
        }
    }

    return 0;
}