WiscSort / wiscSort / Delta / ext_sort.cc
ext_sort.cc
Raw
#include "ext_sort.h"
#include <chrono>
#include <sys/types.h>
#include <sys/stat.h>
#include <assert.h>
#include <algorithm>
#include <sys/mman.h>
#include "timer.h"
#include <future>

#include "ips4o/include/ips4o.hpp"
#ifdef pmdk
#include <libpmem.h>
#endif

#ifdef __cplusplus
#include <stdint.h>
extern "C"
{
    void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s);
}
#endif

#define min(a, b) (((a) < (b)) ? (a) : (b))

Timer timer;
ExtSort::ExtSort(string mount_path)
{
    folder_path_ = mount_path + std::to_string(std::time(nullptr));
    mkdir(folder_path_.c_str(), 0777);
}

/**
 * @brief Get the buffer index
 *
 * @param ready_flag
 * @param STATE
 * @return int16_t
 */
int get_buffer_index(vector<BufferState> ready_flag, int STATE)
{
    return std::distance(ready_flag.begin(), std::find(ready_flag.begin(), ready_flag.end(), STATE));
}

/**
 * @brief
 * Isolating COMPUTE and IO
 * Reads and writes are overlapped with each other but sorting is not
 * @param files list of input file names (assuming 1 for now)
 * @return vector<string>
 */
vector<string> ExtSort::RunGeneration(vector<string> files)
{
    vector<string> run_names;
    string file_name = folder_path_ + "/run_";
    size_t run_num = 0;
    /*
     1. Create two buffes
     2. First READ first buffer
     3. SORT the buffer
     4. READ second buffer and WRITE first buffer at the same time.
     5. SORT second buffer
     6. READ first buffer and WRITE second buffer.
     */
    // In this approach we will have two buffers.
    vector<vector<in_record_t>> keys_idx(2);
    // vector<in_record_t> keys_idx_1, keys_idx_2;
    size_t rec_buff_size = conf.read_buffer_size / conf.idx_record_size;
    rec_buff_size = rec_buff_size / 2; // Two buffers
    keys_idx[0].resize(rec_buff_size);
    keys_idx[1].resize(rec_buff_size);

    /* Flag of size number of buffers
     * 1 -> Finished READ
     * 2 -> Finished SORT
     * 3 -> Finished WRITE
     */
    vector<BufferState> ready_flag(2, WRITTEN);
    DataManager data_manager(files, &timer);
    size_t read_rec_off = 0;
    size_t recs_written = 0;

    while ((recs_written * conf.record_size) < data_manager.file_size_[0])
    {
        // Either completed sorting everything or starting now.
        if (ready_flag[0] == WRITTEN && ready_flag[1] == WRITTEN)
        {
            // Record offset will be 0 since this condition is for first read only.
            timer.start("RUN read");
            read_rec_off += data_manager.RunRead(0, rec_buff_size, keys_idx[0]);
            timer.end("RUN read");
            ready_flag[0] = READ;
        }

        // Check if at least one buffer is ready to be sorted
        if (std::count(ready_flag.begin(), ready_flag.end(), READ) > 0)
        {
            int buf_idx = get_buffer_index(ready_flag, READ);
            timer.start("SORT");
#ifdef bandwidth
            uint64_t sort_start_time = rdtsc();
#endif
            ips4o::parallel::sort(keys_idx[buf_idx].begin(), keys_idx[buf_idx].end(), std::less<>{});
#ifdef bandwidth
            uint64_t sort_delta = rdtsc() - sort_start_time;
            timer.end("checkpoints");
            printf("%f,SORT,%f,%lu\n", timer.get_overall_time("checkpoints"), ((float)(sort_delta) / NANOSECONDS_IN_SECOND), keys_idx.size() * (KEY_SIZE + INDEX_SIZE));
            timer.start("checkpoints");
#endif
            timer.end("SORT");
            ready_flag[buf_idx] = SORTED;
        }
        // Need to parallelize reads and writes
        if (std::count(ready_flag.begin(), ready_flag.end(), SORTED) > 0)
        {
            // Identify which buffer must be read and which must be written.
            int wrt_buf_idx = get_buffer_index(ready_flag, SORTED);
            int rd_buf_idx = get_buffer_index(ready_flag, WRITTEN);

            // create a file equal to the index
            data_manager.OpenAndMapOutputFile(file_name + std::to_string(run_num), rec_buff_size * conf.idx_record_size);
            run_names.push_back(file_name + std::to_string(run_num));
            run_num++;

            if ((read_rec_off * conf.record_size) < data_manager.file_size_[0])
            {
                // Now spawn threads to do read and write in parallel
                timer.start("READ-WRITE");
                std::future<size_t> interfere = std::async(
                    std::launch::async,
                    &DataManager::RunWrite, &data_manager, keys_idx[wrt_buf_idx], rec_buff_size);
                read_rec_off += data_manager.RunRead(read_rec_off, rec_buff_size, keys_idx[rd_buf_idx]);
                recs_written += interfere.get();
                timer.end("READ-WRITE");

                // Mark the appropriate flags 1 and 3
                ready_flag[wrt_buf_idx] = WRITTEN;
                ready_flag[rd_buf_idx] = READ;
            }
            else
            {
                // This path is taken only for the last write
                timer.start("RUN write");
                recs_written += data_manager.RunWrite(keys_idx[wrt_buf_idx], rec_buff_size);
                timer.end("RUN write");
                ready_flag[wrt_buf_idx] = WRITTEN;
            }
        }
    }
    return std::move(run_names);
}

void ExtSort::MergeRuns(vector<string> runs, string input_file)
{
    // Game plan!
    // 1. Open all run files and create a new file of size sum of all runs.
    // 2. Create a read buffer that is evenly divisible by the number of files
    // 3. Create current pointer and end pointers (or atleast know end for each file's buffer)
    // 4a. Open the orginal input_file and mmap it
    // 4b. First mmap runs and fill the read buffer from their respective files.
    // 5. find min of keys pointed by current pointers (ideally a min on list of (key,file_num)
    // 6. Memcpy the offset from input_file to output_buffer directly
    // 6. If output buffer is full flush it and reset pointer.
    // 7. increment the pointer of the respective run_buffer
    // 8. if current pointer reach the end pointer load the new data to the part of that buffer.
    // 9. if the last block of a file is read and current pointer for that file buffer is at the
    // end then stop doing reads.
    // 10. If only one file buffer hasn't reached it's last block then just copy the rest of that
    // file to the output buffer.

    // [1]
    DataManager data_manager(runs, &timer);
    string output_file_name = folder_path_ + "/sorted";
    size_t output_file_size = 0;
    for (auto i : data_manager.file_size_)
        output_file_size += ((i / conf.idx_record_size) * conf.record_size);
    data_manager.OpenAndMapOutputFile(output_file_name, output_file_size);
    // Here the write_buffer size should evenly divide the output_file_size
    record_t *write_buffer = (record_t *)malloc(conf.write_buffer_size);
    assert(write_buffer);

    // [2]
    // The read buffer must be evenly divisible by runs.size() and IDX_BLOCK_SIZE
    size_t READ_BUFFER_SIZE_M = ((conf.merge_read_buffer_size / runs.size()) / conf.block_size) * conf.block_size * runs.size();
    in_record_t *read_buffer = (in_record_t *)malloc(READ_BUFFER_SIZE_M);
    assert(read_buffer);

    // [3]
    vector<size_t> cur_rec_off(runs.size(), 0);   // Holds the rec offset of key that needs comparison per file
    vector<size_t> end_rec_off(runs.size(), 0);   // Holds the last rec offset of the space allocated per file
    vector<size_t> rfile_blk_off(runs.size(), 0); // Has the current blk offset of the file which is to be read
    vector<char *> mapped_run_files(runs.size()); // mmap all the run files

    size_t space_per_run = READ_BUFFER_SIZE_M / runs.size();

    // [4a] - REVIEW: Perhaps just pass run's data_manager object where file is already opened and mapped?
    // open file and mmap it
    int fd = open(input_file.c_str(), O_RDWR | O_DIRECT);
    if (fd < 0)
    {
        printf("Couldn't open input file %s: %s\n", input_file.c_str(), strerror(errno));
        exit(1);
    }
    struct stat st;
    stat(input_file.c_str(), &st);
    // mmap file
    if (!data_manager.MMapFile(st.st_size, 0, fd, data_manager.input_mapped_buffer_))
        exit(1);

    // [4b]
    size_t read_records = 0;
    size_t read_size_blk = space_per_run / conf.block_size;
    for (uint32_t file_num = 0; file_num < runs.size(); ++file_num)
    {
        // Initialize pointers
        cur_rec_off[file_num] = file_num * (space_per_run / IDX_RECORD_SIZE);
        end_rec_off[file_num] = (file_num + 1) * (space_per_run / IDX_RECORD_SIZE) - 1;

        // mmap files
        data_manager.MMapFile(data_manager.file_size_[file_num], 0,
                              data_manager.file_ptr_[file_num], mapped_run_files[file_num]);

        timer.start("MERGE read");
        read_records += data_manager.MergeRead(read_buffer, cur_rec_off[file_num],
                                               mapped_run_files[file_num], rfile_blk_off[file_num],
                                               read_size_blk);
        timer.end("MERGE read");
#ifdef checkpoints
        timer.end("checkpoints");
        printf("MERGE seq read: %f\n", timer.get_overall_time("checkpoints"));
        timer.start("checkpoints");
#endif
        rfile_blk_off[file_num] = read_size_blk;
    }

    vector<k_t> min_vec(runs.size());
    // Load the vec with first key from each run
    for (uint32_t file_num = 0; file_num < runs.size(); ++file_num)
    {
        min_vec[file_num] = read_buffer[cur_rec_off[file_num]].k;
    }

    size_t write_buff_rec_off = 0;
    uint32_t min_index = 0;
    size_t recs_written = 0;
    size_t run_file_sz = 0;
    size_t input_file_off = 0;
    k_t tmp_k;
    tmp_k = 0xff;
    bool random_reads_complete = false;
    vector<read_offs> off_vec;
    // Loop until all the recs are written
    while (recs_written < output_file_size / conf.record_size)
    {
        // [5]
        min_index = std::min_element(min_vec.begin(), min_vec.end()) - min_vec.begin();

        // [6] // This is the additional random read to fetch the value.
        input_file_off = data_manager.ConvertHexToRoff(read_buffer[cur_rec_off[min_index]].r_off.val);

        // check if all RUN files reached the end
        if (count(min_vec.begin(), min_vec.end(), tmp_k) == (uint32_t)runs.size())
        {
            // What if write buffer is not completely full but all records are processed.
            // Here min_vec indicates all the records are processed for all runs
            // Dump remaining write_buffer and break
            timer.start("MERGE write");
            recs_written += data_manager.MergeWrite(write_buffer, write_buff_rec_off);
            timer.end("MERGE write");
#ifdef checkpoints
            timer.end("checkpoints");
            printf("MERGE write: %f\n", timer.get_overall_time("checkpoints"));
            timer.start("checkpoints");
#endif
            break;
        }

        /**
         * @brief List all the offsets reads must be made to and then submit
         * this to threadpool at once. This is to avoid making random read blocking.
         */

        if (write_buff_rec_off < conf.write_buffer_size / conf.record_size)
        {
            // submit job
            off_vec.emplace_back((read_offs){input_file_off, write_buff_rec_off});
            write_buff_rec_off++;
        }

        if (write_buff_rec_off >= conf.write_buffer_size / conf.record_size)
        {
            // Submit task for random reads and wait until complete
            timer.start("RECORD read");
            data_manager.MergeRandomRead(write_buffer, off_vec);
            timer.end("RECORD read");
            // clearing queue for next set of offsets.
            off_vec.clear();
            random_reads_complete = true;
        }

        // Write buffer is full so flush it.
        if (write_buff_rec_off >= (conf.write_buffer_size / conf.record_size) && random_reads_complete == true)
        {
            // printf("The write_buff_rec_off is %lu\n", write_buff_rec_off);
            timer.start("MERGE write");
            recs_written += data_manager.MergeWrite(write_buffer, write_buff_rec_off);
            timer.end("MERGE write");
#ifdef checkpoints
            timer.end("checkpoints");
            printf("MERGE write: %f\n", timer.get_overall_time("checkpoints"));
            timer.start("checkpoints");
#endif
            write_buff_rec_off = 0;
            random_reads_complete = false;
        }

        // [7]
        // Now replace min_vec with new value from respective buffer space.
        cur_rec_off[min_index]++;
        min_vec[min_index] = read_buffer[cur_rec_off[min_index]].k;

        // Now check if corresponding chunk has reached the end
        if (cur_rec_off[min_index] > end_rec_off[min_index])
        {
            // [9]
            // Also see if you have hit the end of file for that run
            run_file_sz = data_manager.file_size_[min_index] / conf.block_size;
            if (rfile_blk_off[min_index] >= run_file_sz)
            {
                // [10]
                // if (count(min_vec.begin(), min_vec.end(), tmp_k) == (uint32_t)runs.size() - 1)
                // {
                //     for (uint32_t file_id = 0; file_id < min_vec.size(); ++file_id)
                //     {
                //         if (!(min_vec[file_id] == tmp_k))
                //         {
                //             // First flush the write_buffer to file
                //             // Read data to read buffer and write it directly to outputfile
                //             recs_written += data_manager.MoveRemainingInputToOuputBuff();
                //             break;
                //         }
                //     }
                // }

                // if you reached the end of a file then set a max value for it in min_vec index.
                min_vec[min_index] = tmp_k; // 0xffffffffffffffffffff
            }
            else
            {
                // [8]
                // reset current rec offset for that RUN
                cur_rec_off[min_index] = min_index * (space_per_run / conf.idx_record_size);
                // Read the next set of blocks from the file
                size_t read_size = min(read_size_blk, run_file_sz - rfile_blk_off[min_index]);
                timer.start("MERGE read");
                data_manager.MergeRead(read_buffer, cur_rec_off[min_index],
                                       mapped_run_files[min_index], rfile_blk_off[min_index],
                                       read_size);
                timer.end("MERGE read");
#ifdef checkpoints
                timer.end("checkpoints");
                printf("MERGE read: %f\n", timer.get_overall_time("checkpoints"));
                timer.start("checkpoints");
#endif
                // Update end record in the case where read_buffer is not completely full
                // because no more records to read from for that RUN file.
                end_rec_off[min_index] = (min_index + 1) *
                                             ((read_size * conf.block_size) / conf.idx_record_size) -
                                         1;
                rfile_blk_off[min_index] += read_size;
            }
        }
    }
}

void ExtSort::Sort(vector<string> &files)
{
#ifdef bandwidth
    timer.start("checkpoints");
#endif
    timer.start("RUN");
    vector<string> runs = RunGeneration(files);
    timer.end("RUN");

    timer.start("MERGE");
    // MergeRuns(runs, files[0]);
    timer.end("MERGE");

#ifdef bandiwdth
    timer.stop("checkpoints");
#endif

    printf("====================================:\n");
    printf("\t RUN read: %f\n", timer.get_overall_time("RUN read"));
    printf("\t RUN sort: %f\n", timer.get_overall_time("SORT"));
    printf("\t RUN write: %f\n", timer.get_overall_time("RUN write"));
    printf("\t RUN read-write: %f\n", timer.get_overall_time("READ-WRITE"));
    printf("Total RUN: %f\n", timer.get_time("RUN"));
    printf("\t MERGE read: %f\n", timer.get_overall_time("MERGE read"));
    printf("\t MERGE write: %f\n", timer.get_overall_time("MERGE write"));
    printf("\t RECORD read: %f\n", timer.get_overall_time("RECORD read"));
    printf("Total MERGE: %f\n", timer.get_time("MERGE"));
    printf("Total: %f\n", timer.get_time("RUN") + timer.get_time("MERGE"));
}