WiscSort / wiscSort / RSW / split_RSW.cpp
split_RSW.cpp
Raw
// Multi threaded - sort key and index.
#include <pthread.h>
#include "extsort_rsw.h"
#include <fcntl.h>
#include <chrono>
#include <unistd.h>
#include <algorithm>
#include "parameter.h"
#include "nano_time.h"
#include <iostream>
#include <unistd.h>
#include <vector>
#include <sys/mman.h>
#include <sys/sysinfo.h>

// Mulit threaded

//#include "ips4o/include/ips4o.hpp"
//#include <execinfo.h>
//#include <signal.h>

using std::cout;
using std::endl;
using std::vector;

#define NANOSECONDS_IN_SECOND 1000000000

#define EXIT_MSG(...)        \
    do                       \
    {                        \
        printf(__VA_ARGS__); \
        _exit(-1);           \
    } while (0)

#ifdef DEBUG
#define D(x) (x)
#else
#define D(x) \
    do       \
    {        \
    } while (0)
#endif

#define min(a, b) (((a) < (b)) ? (a) : (b))

string file_name;

int record_size = KEY_SIZE + VALUE_SIZE;

ExtSort::ExtSort(void)
{
    ID = "/mnt/pmem/" + std::to_string(std::time(nullptr));
    mkdir(ID.c_str(), 0777);
    name = ID + "/runs-";
}

size_t get_filesize(const char *filename)
{
    int retval;

    struct stat st;
    retval = stat(filename, &st);
    if (retval)
        return -1;
    else
        return st.st_size;
}

/**
 * @brief
 *  Load IndexMap file for sorting.
 * @param input_rec_off the input offset from which the thread is responsible
 * @param rec_len the number of records it must process
 * @param keys_idx
 * @param input_mapped_buffer
 * @return size_t
 */
size_t RunRead(size_t input_rec_off, size_t rec_len, vector<in_record_t> &keys_idx, char *input_mapped_buffer)
{
    size_t reads = 0;
    size_t v_off = 0;
    while (reads < rec_len)
    {
        memcpy((void *)(&keys_idx[reads]),
               &input_mapped_buffer[(input_rec_off + reads) * record_size], // 100 because record size
               KEY_SIZE);                                                   // key size is 10
        v_off = (input_rec_off * record_size) +                             // For each record
                reads * record_size;                                        // For values beyond first read buffer
        keys_idx[reads].r_off = v_off;
        reads++;
    }
    return reads;
}

size_t RunWrite(vector<in_record_t> &keys_idx, size_t rec_len, char *output_mapped_buffer)
{
    size_t writes = 0;
    while (writes < rec_len)
    {
        memcpy(&output_mapped_buffer[writes * (KEY_SIZE + INDEX_SIZE)],
               &keys_idx[writes], (KEY_SIZE + INDEX_SIZE)); // TODO: Change this to 4095?
        writes++;
    }
    return writes;
}

typedef struct
{
    int tid;
    char *mmapped_buffer;
    uint64_t read_time;
    uint64_t sort_time;
    uint64_t write_time;
    size_t thr_rec_chunk;
    size_t total_file_size;
} threadargs_t;

void *rsw_data(void *args)
{
    threadargs_t t = *(threadargs_t *)args;

    size_t rec_sorted = 0, rec_read = 0;
    size_t file_rec_off = t.tid * t.thr_rec_chunk;
    int run_num = 0;
    int fd;
    uint64_t read_time = 0;
    uint64_t sort_time = 0;
    uint64_t write_time = 0;
    // This decides how many read-sort-write steps happen in the for loop.
    uint64_t job_len_rec = t.thr_rec_chunk; // / 8;
    // Each thread generates IndexMap and sorts and writes it.
    while (rec_sorted < t.thr_rec_chunk)
    {
        vector<in_record_t> keys_idx;
        keys_idx.resize(job_len_rec); // 400M records = 40GB 100B records

        read_time = nano_time();
        rec_read = RunRead(file_rec_off, job_len_rec, keys_idx, t.mmapped_buffer);
        read_time = nano_time() - read_time;
        // read keys and fill IndexMap
        // Converting offset to hex addresss

        // Sort IndexMap by key
        sort_time = nano_time();
        std::sort(keys_idx.begin(), keys_idx.end());
        sort_time = nano_time() - sort_time;

        // write the IndexMap sequentially to disk and once sorted

        // Create a file and mmap it based on the the thread_id
        string output_file = file_name + std::to_string(t.tid) + "_" + std::to_string(run_num);
        run_num++;
        fd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU);
        assert(ftruncate(fd, job_len_rec * (KEY_SIZE + INDEX_SIZE)) == 0);
        char *mmaped_out;
        mmaped_out = (char *)mmap(NULL, job_len_rec * (KEY_SIZE + INDEX_SIZE), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
        if (mmaped_out == MAP_FAILED)
        {
            printf("Failed to mmap write file of size %lu: %s\n",
                   job_len_rec * record_size, strerror(errno));
            _exit(0);
        }

        write_time = nano_time();
        rec_sorted += RunWrite(keys_idx, rec_read, mmaped_out);
        write_time = nano_time() - write_time;

        file_rec_off += rec_read;
        printf("fs: %lu rec_s:%lu t: %d \n", file_rec_off, rec_read, t.tid);

        close(fd);
        // keys.clear(); // REVIEW: could be expensive call.
        ((threadargs_t *)args)->read_time += read_time;
        ((threadargs_t *)args)->sort_time += sort_time;
        ((threadargs_t *)args)->write_time += write_time;
    }
    return (void *)0;
}

double to_seconds(uint64_t nano_sec)
{
    return (double)nano_sec / (double)NANOSECONDS_IN_SECOND;
}

void ExtSort::sort(string file, int num_threads)
{
    // signal(SIGSEGV, handler);
    uint64_t run_gen_time = nano_time();
    int ret;
    file_name = name;
    // Get file size
    size_t total_file_size = get_filesize(file.c_str());

    // Open and Mmap the file
    int fd = open(file.c_str(), O_RDWR | O_DIRECT);
    if (fd < 0)
    {
        printf("Couldn't open %s: %s\n", file.c_str(), strerror(errno));
        exit(1);
    }
    char *mmapped_buffer = (char *)mmap(NULL, total_file_size, PROT_READ, MAP_SHARED, fd, 0);

    size_t total_records = 400000000;

    // Divide the IndexMap by num_threads
    size_t thr_rec_chunk = total_records / num_threads;

    // Spwan threads now.
    pthread_t *threads;
    threadargs_t *threadargs;
    threads = (pthread_t *)malloc(num_threads * sizeof(pthread_t));
    threadargs = (threadargs_t *)malloc(num_threads * sizeof(threadargs_t));
    for (int i = 0; i < num_threads; i++)
    {
        threadargs[i].tid = i;
        threadargs[i].mmapped_buffer = mmapped_buffer;
        threadargs[i].thr_rec_chunk = thr_rec_chunk;
        threadargs[i].total_file_size = total_file_size;
        ret = pthread_create(&threads[i], NULL, rsw_data, &threadargs[i]);
        if (ret != 0)
            EXIT_MSG("pthread_create for %dth thread failed: %s\n",
                     i, strerror(errno));
    }

    for (int i = 0; i < num_threads; i++)
    {
        ret = pthread_join(threads[i], NULL);
        if (ret != 0)
            EXIT_MSG("Thread %d failed in join: %s\n",
                     i, strerror(errno));
    }

    double total_read_t = 0, total_write_t = 0, total_sort_t = 0;
    for (int i = 0; i < num_threads; ++i)
    {
        cout << "============== " << threadargs[i].tid << " ===============" << endl;
        cout << "read time: " << to_seconds(threadargs[i].read_time) << endl;
        total_read_t += to_seconds(threadargs[i].read_time);
        cout << "sort time: " << to_seconds(threadargs[i].sort_time) << endl;
        total_sort_t += to_seconds(threadargs[i].sort_time);
        cout << "write time: " << to_seconds(threadargs[i].write_time) << endl;
        total_write_t += to_seconds(threadargs[i].write_time);
    }

    run_gen_time = nano_time() - run_gen_time;
    cout << "========================================================" << endl;
    cout << "Run Generation time: " << to_seconds(run_gen_time) << endl;
    cout << "------ Total read time:  " << total_read_t << endl;
    cout << "------ Total sort time: " << total_sort_t << endl;
    cout << "------ Total write time: " << total_write_t << endl;

    free(threads);
    free(threadargs);
    // free threadargs
}