Lancelot / src / cpu / bandwidth.cpp
bandwidth.cpp
Raw
#include <cmath>
#include <chrono>
#include <cstdlib>
#include <iostream>

#include "tbb/tbb.h"
#include "tbb/parallel_for.h"

#include "utils/cpu_utils.h"

using namespace tbb;
using namespace std;

double agg(int* key_buf, int num_items) {
  chrono::high_resolution_clock::time_point start = chrono::high_resolution_clock::now();
  float result = parallel_reduce(
      blocked_range<int*>(key_buf, key_buf + num_items, 1 << 20),
      0,
      [](const blocked_range<int*>& r, int init)->float {
          for (int* a=r.begin(); a!=r.end(); ++a )
              init += *a;
          return init;
      },
      []( int x, int y )->float {
          return x+y;
      }
  );

  chrono::high_resolution_clock::time_point finish = chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = finish - start;
  return diff.count() * 1000000.0;
}

double write(int* key_buf, int num_items) {
  chrono::high_resolution_clock::time_point start = chrono::high_resolution_clock::now();
  int batch_size = 1<<20;
  parallel_for(blocked_range<size_t>(0, num_items, batch_size), [&](auto range) {
    int count = 0;
    for (size_t i = range.begin(); i < range.end(); i++) {
	  key_buf[i] = i;
    }
  });
  chrono::high_resolution_clock::time_point finish = chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = finish - start;
  return diff.count() * 1000000.0;
}

double streaming_write(int* key_buf, int num_items) {
  chrono::high_resolution_clock::time_point start = chrono::high_resolution_clock::now();
  int batch_size = 1<<20;
  parallel_for(blocked_range<size_t>(0, num_items, batch_size), [&](auto range) {
    int count = 0;
    for (size_t i = range.begin(); i < range.end(); i += 8) {
      //if (i%8 != 0) cout << i << endl;
      __m256i ones = _mm256_set1_epi32(1);
      _mm256_stream_si256((__m256i*)&key_buf[i], ones);
    }
  });
  chrono::high_resolution_clock::time_point finish = chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = finish - start;
  return diff.count() * 1000000.0;
}

double streaming_read(int* key_buf, int num_items) {
  chrono::high_resolution_clock::time_point start = chrono::high_resolution_clock::now();
  int batch_size = 1<<20;
  __m256i global_counter = _mm256_set1_epi32(0);
  parallel_for(blocked_range<size_t>(0, num_items, batch_size), [&](auto range) {
    int count = 0;
    __m256i counter = _mm256_set1_epi32(0);
    for (size_t i = range.begin(); i < range.end(); i += 8) {
      //if (i%8 != 0) cout << i << endl;
      __m256i load = _mm256_load_si256((__m256i*)&key_buf[i]);
      counter = _mm256_add_epi32(load, counter);
    }
    global_counter = _mm256_add_epi32(counter, global_counter);
  });
  chrono::high_resolution_clock::time_point finish = chrono::high_resolution_clock::now();
  std::chrono::duration<double> diff = finish - start;
  int t = _mm256_extract_epi32(global_counter, 0);
  cout << t << endl;
  return diff.count() * 1000000.0;
}

int main(int argc, char** argv)
{
    uint num_items          = 1 << 28;
    int num_trials          = 3;
    bool full_agg           = true;

    // Initialize command line
    CommandLineArgs args(argc, argv);
    args.GetCmdLineArgument("n", num_items);
    args.GetCmdLineArgument("t", num_trials);

    // Print usage
    if (args.CheckCmdLineFlag("help"))
    {
        printf("%s "
            "[--n=<input items>] "
            "[--t=<num trials>] "
            "[--v] "
            "\n", argv[0]);
        exit(0);
    }

    int *h_keys;
    h_keys = (int*) _mm_malloc(sizeof(int) * num_items, 256);

    parallel_for(blocked_range<size_t>(0, num_items, 32 * 1024), [&](auto range) {
      unsigned int seed = range.begin();
      for (size_t i = range.begin(); i < range.end(); i++) {
        h_keys[i] = rand_r(&seed) & 15;
      }
    });

    for (int i = 0; i < num_trials; i++) {
        // Full Aggregation.
        double time_taken;
        double bandwidth;

        time_taken = agg(h_keys, num_items);
        bandwidth = (num_items * 4) / time_taken;

        cout<< "{"
            << "\"time_taken\":" << time_taken
            << "\"read_bandwidth\":" << bandwidth
            << "}" << endl;
    }

    for (int i = 0; i < num_trials; i++) {
        // Full Aggregation.
        double time_taken;
        double bandwidth;

        time_taken = write(h_keys, num_items);
        bandwidth = (num_items * 4) / time_taken;

        cout<< "{"
            << "\"time_taken\":" << time_taken
            << "\"write_bandwidth\":" << bandwidth
            << "}" << endl;
    }

    for (int i = 0; i < num_trials; i++) {
        // Full Aggregation.
        double time_taken;
        double bandwidth;

        time_taken = streaming_write(h_keys, num_items);
        bandwidth = (num_items * 4) / time_taken;

        cout<< "{"
            << "\"time_taken\":" << time_taken
            << "\"streaming_write_bandwidth\":" << bandwidth
            << "}" << endl;
    }

    for (int i = 0; i < num_trials; i++) {
        // Full Aggregation.
        double time_taken;
        double bandwidth;

        time_taken = streaming_read(h_keys, num_items);
        bandwidth = (num_items * 4) / time_taken;

        cout<< "{"
            << "\"time_taken\":" << time_taken
            << "\"streaming_read_bandwidth\":" << bandwidth
            << "}" << endl;
    }

    _mm_free(h_keys);

    return 0;
}