WiscSort / wiscSort / Alpha / ext_sort.cc
ext_sort.cc
Raw
#include "ext_sort.h"
#include <chrono>
#include <sys/types.h>
#include <sys/stat.h>
#include <assert.h>
#include <algorithm>
#include <sys/mman.h>
#include "timer.h"

#ifdef pmdk
#include <libpmem.h>
#endif

#ifdef __cplusplus
#include <stdint.h>
extern "C"
{
    void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s);
}
#endif

#define min(a, b) (((a) < (b)) ? (a) : (b))

Timer timer;
ExtSort::ExtSort(string mount_path)
{
    folder_path_ = mount_path + std::to_string(std::time(nullptr));
    mkdir(folder_path_.c_str(), 0777);
    //  DEBUG:
    // folder_path_ = "/mnt/pmem/1649252474";
}

// return number of blocks processed
size_t ExtSort::InMemorySort(size_t blk_off, DataManager &data_manger, string output_file)
{
    vector<in_record_t> keys_idx;
    size_t read_buff_sz = READ_BUFFER_COUNT * BLOCK_SIZE;
    record_t *read_buffer = (record_t *)malloc(read_buff_sz);
    assert(read_buffer);

    timer.start("RUN read");
    size_t read_records = data_manger.RunRead(blk_off, READ_BUFFER_COUNT, read_buffer);
    timer.end("RUN read");
#ifdef checkpoints
    timer.end("checkpoints");
    printf("RUN read: %f\n", timer.get_overall_time("checkpoints"));
    timer.start("checkpoints");
#endif
    if (!read_records)
        return 0;

    // printf("Finished reading!\n");
    keys_idx.resize(read_records);

    size_t off = 0;
    roffset r_offset;
    for (size_t idx = 0; idx < read_records; ++idx)
    {
        // Let's store the corresponding record offset - Not value offset
        off = (idx * RECORD_SIZE) + // For each record
              blk_off * BLOCK_SIZE; // For values beyond first read buffer
        r_offset = off;
        keys_idx[idx] = in_record_t(read_buffer[idx].k, r_offset);
    }

    timer.start("SORT");
    std::sort(keys_idx.begin(), keys_idx.end());
    timer.end("SORT");
#ifdef checkpoints
    timer.end("checkpoints");
    printf("SORT: %f\n", timer.get_overall_time("checkpoints"));
    timer.start("checkpoints");
#endif
    // printf("Finished sorting %lu!\n", read_records);
    // create a file equal to the length of the index
    data_manger.OpenAndMapOutputFile(output_file, read_records * IDX_RECORD_SIZE);

    size_t write_records = 0;
    // Write all the read records to a file directly from the sorted array.

    timer.start("RUN write");
    write_records += data_manger.RunWrite(keys_idx, read_records);
    timer.end("RUN write");
#ifdef checkpoints
    timer.end("checkpoints");
    printf("RUN write: %f\n", timer.get_overall_time("checkpoints"));
    timer.start("checkpoints");
#endif
    // delete buffers
    delete[] read_buffer;
    // Divide by BLOCK_SIZE because reads use BLOCK_SIZE.
    return (write_records * RECORD_SIZE) / BLOCK_SIZE;
}

vector<string> ExtSort::RunGeneration(vector<string> files)
{
    DataManager data_manager(files);
    uint64_t blk_off = 0;
    string file_name = folder_path_ + "/run_";
    size_t run_num = 0;
    vector<string> run_names;
    // REVIEW: Loop until all data is processed? ex: index >= total number of records?
    while (1)
    {
        uint64_t blk_processed = InMemorySort(blk_off, data_manager,
                                              file_name + std::to_string(run_num));
        if (!blk_processed)
            break;
        blk_off += blk_processed;
        run_names.push_back(file_name + std::to_string(run_num));
        run_num++;
    }
    return std::move(run_names);
}

void ExtSort::MergeRuns(vector<string> runs, string input_file)
{
    // Game plan!
    // 1. Open all run files and create a new file of size sum of all runs.
    // 2. Create a read buffer that is evenly divisible by the number of files
    // 3. Create current pointer and end pointers (or atleast know end for each file's buffer)
    // 4a. Open the orginal input_file and mmap it
    // 4b. First mmap runs and fill the read buffer from their respective files.
    // 5. find min of keys pointed by current pointers (ideally a min on list of (key,file_num)
    // 6. Memcpy the offset from input_file to output_buffer directly
    // 6. If output buffer is full flush it and reset pointer.
    // 7. increment the pointer of the respective run_buffer
    // 8. if current pointer reach the end pointer load the new data to the part of that buffer.
    // 9. if the last block of a file is read and current pointer for that file buffer is at the
    // end then stop doing reads.
    // 10. If only one file buffer hasn't reached it's last block then just copy the rest of that
    // file to the output buffer.

    // The read buffer must be evenly divisible by runs.size() and IDX_BLOCK_SIZE
    size_t READ_BUFFER_SIZE_M = ((M_READ_BUFFER_SIZE / runs.size()) / IDX_BLOCK_SIZE) * IDX_BLOCK_SIZE * runs.size();
    size_t READ_BUFFER_COUNT_M = READ_BUFFER_SIZE_M / IDX_BLOCK_SIZE;

    // [1]
    DataManager data_manager(runs);
    string output_file_name = folder_path_ + "/sorted";
    size_t output_file_size = 0;
    for (auto i : data_manager.file_size_)
        output_file_size += ((i / IDX_RECORD_SIZE) * RECORD_SIZE);
    data_manager.OpenAndMapOutputFile(output_file_name, output_file_size);
    // Here the write_buffer size should evenly divide the output_file_size
    record_t *write_buffer = (record_t *)malloc(WRITE_BUFFER_COUNT * BLOCK_SIZE);
    assert(write_buffer);

    // [2]
    size_t read_buff_sz = READ_BUFFER_COUNT_M * IDX_BLOCK_SIZE;
    in_record_t *read_buffer = (in_record_t *)malloc(read_buff_sz);
    assert(read_buffer);

    // [3]
    vector<size_t> cur_rec_off(runs.size(), 0);   // Holds the rec offset of key that needs comparison per file
    vector<size_t> end_rec_off(runs.size(), 0);   // Holds the last rec offset of the space allocated per file
    vector<size_t> rfile_blk_off(runs.size(), 0); // Has the current blk offset of the file which is to be read
    vector<char *> mapped_run_files(runs.size()); // mmap all the run files

    size_t space_per_run = read_buff_sz / runs.size();

    // [4a] - REVIEW: Perhaps just pass run's data_manager object where file is already opened and mapped?
    // open file and mmap it
    int fd = open(input_file.c_str(), O_RDWR | O_DIRECT);
    if (fd < 0)
    {
        printf("Couldn't open input file %s: %s\n", input_file.c_str(), strerror(errno));
        exit(1);
    }
    struct stat st;
    stat(input_file.c_str(), &st);
    // mmap file
    if (!data_manager.MMapFile(st.st_size, 0, fd, data_manager.input_mapped_buffer_))
        exit(1);

    // [4b]
    size_t read_records = 0;
    size_t read_size_blk = space_per_run / IDX_BLOCK_SIZE;
    for (uint32_t file_num = 0; file_num < runs.size(); ++file_num)
    {
        // Initialize pointers
        cur_rec_off[file_num] = file_num * (space_per_run / IDX_RECORD_SIZE);
        end_rec_off[file_num] = (file_num + 1) * (space_per_run / IDX_RECORD_SIZE) - 1;

        // mmap files
        data_manager.MMapFile(data_manager.file_size_[file_num], 0,
                              data_manager.file_ptr_[file_num], mapped_run_files[file_num]);

        timer.start("MERGE seq read");
        read_records += data_manager.MergeRead(read_buffer, cur_rec_off[file_num],
                                               mapped_run_files[file_num], rfile_blk_off[file_num],
                                               read_size_blk);
        timer.end("MERGE seq read");
#ifdef checkpoints
        timer.end("checkpoints");
        printf("MERGE seq read: %f\n", timer.get_overall_time("checkpoints"));
        timer.start("checkpoints");
#endif
        rfile_blk_off[file_num] = read_size_blk;
    }

    vector<k_t> min_vec(runs.size());
    // Load the vec with first key from each run
    for (uint32_t file_num = 0; file_num < runs.size(); ++file_num)
    {
        min_vec[file_num] = read_buffer[cur_rec_off[file_num]].k;
    }

    ////////////// DEBUG:
    // char *test = (char *)malloc(512);

    size_t write_buff_rec_off = 0;
    uint32_t min_index = 0;
    size_t recs_written = 0;
    size_t run_file_sz = 0;
    size_t input_file_off = 0;
    k_t tmp_k;
    tmp_k = 0xff;
    // Loop until all the recs are written
    while (recs_written < output_file_size / RECORD_SIZE)
    {
        // [5]
        min_index = std::min_element(min_vec.begin(), min_vec.end()) - min_vec.begin();
        // DEBUG:
        // for (int i; i < 10; i++)
        //     printf("%x ", read_buffer[cur_rec_off[min_index]].k.key[i]);
        // printf("\n");

        // [6] // This is the additional random read to fetch the value.
        input_file_off = data_manager.ConvertHexToRoff(read_buffer[cur_rec_off[min_index]].r_off.val);
        // DEBUG:
        // printf("write_buff_rec_off - %lu, cur_rec_off - %lu, input_file_off - %lu \n",
        //    write_buff_rec_off, cur_rec_off[min_index], input_file_off);
        timer.start("MERGE rand read");
#ifdef avx512
        __memmove_chk_avx512_no_vzeroupper(&write_buffer[write_buff_rec_off],
                                           &data_manager.input_mapped_buffer_[input_file_off],
                                           RECORD_SIZE);
// #elif pmdk
// pmem_memcpy_nodrain(&write_buffer[write_buff_rec_off],
//                     &data_manager.input_mapped_buffer_[input_file_off],
//                     RECORD_SIZE);
#else
        // memcpy((void *)(test),
        //        &data_manager.input_mapped_buffer_[input_file_off],
        //        min(output_file_size - input_file_off, 512));
        // memcpy(&write_buffer[write_buff_rec_off], (void *)test, RECORD_SIZE);
        memcpy(&write_buffer[write_buff_rec_off],
               &data_manager.input_mapped_buffer_[input_file_off],
               RECORD_SIZE);
#endif
        timer.end("MERGE rand read");
        write_buff_rec_off++;

        // check if all RUN files reached the end
        if (count(min_vec.begin(), min_vec.end(), tmp_k) == (uint32_t)runs.size())
        {
            // What if write buffer is not completely full but all records are processed.
            // Here min_vec indicates all the records are processed for all runs
            // Dump remaining write_buffer and break
            timer.start("MERGE write");
            recs_written += data_manager.MergeWrite(write_buffer, write_buff_rec_off);
            timer.end("MERGE write");
#ifdef checkpoints
            timer.end("checkpoints");
            printf("MERGE write: %f\n", timer.get_overall_time("checkpoints"));
            timer.start("checkpoints");
#endif
            break;
        }
        // Write buffer is full so flush it.
        if (write_buff_rec_off == REC_PER_WRITE_BUFFER)
        {
            timer.start("MERGE write");
            recs_written += data_manager.MergeWrite(write_buffer, write_buff_rec_off);
            timer.end("MERGE write");
#ifdef checkpoints
            timer.end("checkpoints");
            printf("MERGE write: %f\n", timer.get_overall_time("checkpoints"));
            timer.start("checkpoints");
#endif
            write_buff_rec_off = 0;
        }

        // [7]
        // Now replace min_vec with new value from respective buffer space.
        cur_rec_off[min_index]++;
        min_vec[min_index] = read_buffer[cur_rec_off[min_index]].k;

        // Now check if corresponding chunk has reached the end
        if (cur_rec_off[min_index] >= end_rec_off[min_index])
        {
            // [9]
            // Also see if you have hit the end of file for that run
            run_file_sz = data_manager.file_size_[min_index] / IDX_BLOCK_SIZE;
            if (rfile_blk_off[min_index] >= run_file_sz)
            {
                // [10]
                // if (count(min_vec.begin(), min_vec.end(), tmp_k) == (uint32_t)runs.size() - 1)
                // {
                //     for (uint32_t file_id = 0; file_id < min_vec.size(); ++file_id)
                //     {
                //         if (!(min_vec[file_id] == tmp_k))
                //         {
                //             // First flush the write_buffer to file
                //             // Read data to read buffer and write it directly to outputfile
                //             recs_written += data_manager.MoveRemainingInputToOuputBuff();
                //             break;
                //         }
                //     }
                // }

                // if you reached the end of a file then set a max value for it in min_vec index.
                min_vec[min_index] = tmp_k; // 0xffffffffffffffffffff
            }
            else
            {
                // [8]
                // reset current rec offset for that RUN
                cur_rec_off[min_index] = min_index * (space_per_run / IDX_RECORD_SIZE);
                // Read the next set of blocks from the file
                size_t read_size = min(read_size_blk, run_file_sz - rfile_blk_off[min_index]);
                timer.start("MERGE seq read");
                data_manager.MergeRead(read_buffer, cur_rec_off[min_index],
                                       mapped_run_files[min_index], rfile_blk_off[min_index],
                                       read_size);
                timer.end("MERGE seq read");
#ifdef checkpoints
                timer.end("checkpoints");
                printf("MERGE seq read: %f\n", timer.get_overall_time("checkpoints"));
                timer.start("checkpoints");
#endif
                // Update end record in the case where read_buffer is not completely full
                // because no more records to read from for that RUN file.
                end_rec_off[min_index] = (min_index + 1) * ((read_size * IDX_BLOCK_SIZE) / IDX_RECORD_SIZE) - 1;
                rfile_blk_off[min_index] += read_size;
            }
        }
    }
}

void ExtSort::Sort(vector<string> &files)
{
#ifdef checkpoints
    timer.start("checkpoints");
#endif
    // DEBUG: Wait until all measuring scripts are ready.
    sleep(5);
    timer.start("RUN");
    vector<string> runs = RunGeneration(files);
    timer.end("RUN");
    // DEBUG:
    // vector<string> runs{"/mnt/pmem/1649252474/run_0", "/mnt/pmem/1649252474/run_1"};
    timer.start("MERGE");
    MergeRuns(runs, files[0]);
    timer.end("MERGE");

#ifdef checkpoints
    timer.end("checkpoints");
#endif
    printf("====================================\n");
    printf("Total RUN: %f\n", timer.get_time("RUN"));
    printf("\t RUN read: %f\n", timer.get_overall_time("RUN read"));
    printf("\t RUN sort: %f\n", timer.get_overall_time("SORT"));
    printf("\t RUN write: %f\n", timer.get_overall_time("RUN write"));
    printf("Total MERGE: %f\n", timer.get_time("MERGE"));
    printf("\t MERGE seq read: %f\n", timer.get_overall_time("MERGE seq read"));
    printf("\t MERGE rand read: %f\n", timer.get_overall_time("MERGE rand read"));
    printf("\t MERGE write: %f\n", timer.get_overall_time("MERGE write"));
    printf("Total: %f\n", timer.get_time("RUN") + timer.get_time("MERGE"));
}