WiscSort / wiscSort / Gamma / ext_sort.cc
ext_sort.cc
Raw
#include "ext_sort.h"
#include <chrono>
#include <sys/types.h>
#include <sys/stat.h>
#include <assert.h>
#include <algorithm>
#include <sys/mman.h>
#include "timer.h"

#include "ips4o/include/ips4o.hpp"

Timer timer;
ExtSort::ExtSort(string mount_path)
{
    folder_path_ = mount_path + std::to_string(std::time(nullptr));
    mkdir(folder_path_.c_str(), 0777);
}

// return number of records processed
size_t ExtSort::InMemorySort(size_t rec_off, DataManager &data_manger, string output_file)
{
    size_t read_array_len = data_manger.file_size_[0] / conf.record_size;
    vector<in_record_t> keys_idx;
    // This assumes the key + index fits the memory limit, so just get size from input
    keys_idx.resize(read_array_len);

    timer.start("RUN read");
    size_t read_records = data_manger.RunRead(rec_off, read_array_len, keys_idx);
    // size_t read_records = data_manger.RunReadPMSort(rec_off, read_array_len, keys_idx);
    timer.end("RUN read");
    // printf("Finished reading %lu!\n", read_records);

    if (!read_records)
        return 0;

    timer.start("SORT");
#ifdef bandwidth
    uint64_t sort_start_time = rdtsc();
#endif
    ips4o::parallel::sort(keys_idx.begin(), keys_idx.end(), std::less<>{});
#ifdef bandwidth
    uint64_t sort_delta = rdtsc() - sort_start_time;
    timer.end("checkpoints");
    printf("%f,SORT,%f,%lu\n", timer.get_overall_time("checkpoints"), ((float)(sort_delta) / NANOSECONDS_IN_SECOND), keys_idx.size() * (KEY_SIZE + INDEX_SIZE));
    timer.start("checkpoints");
#endif
    timer.end("SORT");
    // printf("Finished sorting %lu!\n", read_records);

    // Create a file equal to the length of the final output.
    data_manger.OpenAndMapOutputFile(output_file, data_manger.file_size_[0]);

    size_t write_records = 0;
    // timer.start("RUN write");
    /**
     * Version 1
     * - Each thread reads offset from file and writes to the output file.
     * - This could be bad because of uncrontolled number of writes give poor bandwidth.
     */
    // write_records += data_manger.RunWrite(keys_idx, read_records);
    /**
     * Version 2
     * Each thread reads offset from file to a output buffer
     * The output buffer once full is then written to pmem in controlled fashion.
     */
    write_records += data_manger.RunWrite2(keys_idx, read_records);
    // timer.end("RUN write");
    // printf("Finished writing %lu!\n", write_records);

    return write_records;
}

vector<string> ExtSort::RunGeneration(vector<string> files)
{
    DataManager data_manager(files, &timer);
    uint64_t rec_off = 0;
    string file_name = folder_path_ + "/sorted";
    vector<string> run_names;
    // REVIEW: Loop until all data is processed? ex: index >= total number of records?
    while (1)
    {
        uint64_t rec_processed = InMemorySort(rec_off, data_manager, file_name);
        if (!rec_processed)
            break;
        rec_off += rec_processed;
    }
    return std::move(run_names);
}

void ExtSort::Sort(vector<string> &files)
{
#ifdef bandwidth
    timer.start("checkpoints");
#endif

    timer.start("RUN");
    vector<string> runs = RunGeneration(files);
    timer.end("RUN");

#ifdef bandiwdth
    timer.stop("checkpoints");
#endif

    printf("====================================:\n");
    printf("\t RUN read: %f\n", timer.get_overall_time("RUN read"));
    printf("\t RUN sort: %f\n", timer.get_overall_time("SORT"));
    printf("\t RECORD read: %f\n", timer.get_overall_time("RecRead"));
    printf("\t RUN write: %f\n", timer.get_overall_time("RUN write"));
    printf("Total RUN: %f\n", timer.get_time("RUN"));
}