WiscSort / wiscSort / RSW / extsort_rsw.cc
extsort_rsw.cc
Raw
// Multi threaded - sort key and index.
#include<pthread.h>
#include "extsort_rsw.h"
#include <fcntl.h>
#include <chrono>
#include <unistd.h>
#include <algorithm>
#include "parameter.h"
#include "nano_time.h"
#include <iostream>
#include <unistd.h>
#include <vector>
#include <sys/mman.h>
#include <sys/sysinfo.h>

// Mulit threaded

//#include "ips4o/include/ips4o.hpp"
//#include <execinfo.h>
//#include <signal.h>

using std::cout;
using std::endl;
using std::vector;

#define NANOSECONDS_IN_SECOND 1000000000

#define EXIT_MSG(...)           \
    do {                        \
        printf(__VA_ARGS__);    \
        _exit(-1);              \
    } while (0)

#ifdef DEBUG 
#define D(x) (x)
#else 
#define D(x) do{}while(0)
#endif

#define min(a, b) (((a) < (b)) ? (a) : (b))

string file_name;

ExtSort::ExtSort(void){
        ID = "/mnt/pmem/" + std::to_string(std::time(nullptr));
        mkdir(ID.c_str(), 0777);
        name = ID + "/runs-";
}

size_t get_filesize(const char* filename){
    int retval;

    struct stat st;
    retval = stat(filename, &st);
    if(retval)
        return -1;
    else 
        return st.st_size;
}

/** FIXME: remove this later

void handler(int sig) {
  void *array[10];
  size_t size;

  // get void*'s for all entries on the stack
  size = backtrace(array, 10);

  // print out all the frames to stderr
  fprintf(stderr, "Error: signal %d:\n", sig);
  backtrace_symbols_fd(array, size, STDERR_FILENO);
  exit(1);
}
*/

typedef struct {
    int tid;
    char* mmapped_buffer;
    record_t* buffer;
    uint64_t read_time;
    uint64_t sort_time;
    uint64_t write_time;
    uint64_t keys_load_t;
    uint64_t out_buf_load_t;
    size_t file_thr_chunk;
    size_t buf_thr_chunk;
    size_t total_file_size;
} threadargs_t;

size_t read(size_t file_off, size_t buff_off, size_t len, record_t* buffer, char* mmapped_buffer, size_t total_file_size){
    size_t rec_off = buff_off/RECORD_SIZE;
    size_t read_data = 0;
    size_t req_size = 0;
    while(read_data < len){
            req_size = min(BLOCK_SIZE, len-read_data);
            req_size = min(req_size, total_file_size-file_off);
            //cout << req_size << " -- " << read_data << " -- " << len << " -- " << len-read_data << endl;
            // Got to also check file boundary
            memcpy((void *) (buffer+rec_off), &mmapped_buffer[file_off], req_size);
            read_data += req_size;
            rec_off += req_size/RECORD_SIZE;
            file_off += req_size;
            if(req_size == 0) break;
    }
    return read_data;
}

void write(char* mmaped_out, record_t* output_buf, size_t len){
    size_t req_size = 0;
    size_t write_data = 0;
    while(write_data < len){
        req_size = min(OUTPUT_BUFFER_SIZE * BLOCK_SIZE, len - write_data);
        memcpy(&mmaped_out[write_data],
           &output_buf[write_data/RECORD_SIZE],
           req_size);
        write_data += req_size;
    }
}

void * rsw_data(void *args){
    threadargs_t t = *(threadargs_t*)args;

    size_t read_size = 0;
    size_t file_off = t.tid * t.file_thr_chunk;
    size_t buff_off = t.tid * t.buf_thr_chunk;
    int run_num = 1;
    record_t* output_buf = (record_t *)malloc(t.buf_thr_chunk);
    int fd;
    vector<in_record_t> keys;
    //((threadargs_t*)args)->read_time = 0;
    //((threadargs_t*)args)->sort_time = 0;
    //((threadargs_t*)args)->write_time = 0;
    uint64_t read_time = 0;
    uint64_t sort_time = 0;
    uint64_t write_time = 0;
    uint64_t keys_load_t=0;
    uint64_t out_buf_load_t=0;
    // Each thread memcpys to it's buf
    // for until entire file chunk is read.
    while(read_size < t.file_thr_chunk){
        read_time = nano_time();
        D(cout << "Starting read - " << t.tid  << " read_size: " << read_size << endl);
        read_size += read(file_off, buff_off, t.buf_thr_chunk, t.buffer, t.mmapped_buffer, t.total_file_size);
        //file_off += read_size; // read_size expected to be t.buf_thr_chunk.
        read_time = nano_time() - read_time;

        // REVIEW: Next iteration initialize smaller buffer per thread
        // and pass them for sorting rather than populating an array.
        keys_load_t = nano_time();
        keys.resize((read_size/run_num)/RECORD_SIZE);
        size_t buff_rec_off = buff_off/RECORD_SIZE;
        for(size_t i = 0; i < (read_size/run_num)/RECORD_SIZE; ++i)
            keys[i] = in_record_t(t.buffer[buff_rec_off + i].k, i, i);
        keys_load_t = nano_time() - keys_load_t;

        sort_time = nano_time(); 
        // REVIEW: replace with qsort.
        D(cout << "Starting sort " << t.tid << endl);
        //ips4o::parallel::sort(keys.begin(), keys.end(), std::less<>{});
        std::sort(keys.begin(), keys.end());
        sort_time = nano_time() - sort_time;
        D(
            cout << "keys vector size: " << sizeof(std::vector<in_record_t>) + (sizeof(in_record_t)  * keys.size()) << endl
        );

        out_buf_load_t = nano_time();
        // Create an output buffer to consolidate key and values.
        for(size_t i = 0; i < t.buf_thr_chunk/RECORD_SIZE; ++i)
            output_buf[i] = std::move(t.buffer[keys[i].v_index]);
        out_buf_load_t = nano_time() -out_buf_load_t; 

        // Create a file and mmap it based on the the thread_id
        string output_file = file_name + std::to_string(t.tid) + "_" + std::to_string(run_num);
        run_num++;
        fd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU);
        assert(ftruncate(fd, (size_t) t.buf_thr_chunk) == 0); 
        char *mmaped_out;
        mmaped_out = (char*)mmap(NULL, t.buf_thr_chunk, PROT_WRITE, MAP_SHARED, fd, 0);

        D(cout << "Starting write " << t.tid << endl);
        write_time = nano_time();
        write(mmaped_out, output_buf, t.buf_thr_chunk);
        write_time = nano_time() - write_time;

        munmap(mmaped_out, t.buf_thr_chunk);
        close(fd);
        //keys.clear(); // REVIEW: could be expensive call.
        ((threadargs_t*)args)->read_time += read_time;
        ((threadargs_t*)args)->sort_time += sort_time;
        ((threadargs_t*)args)->write_time += write_time;
        ((threadargs_t*)args)->keys_load_t += keys_load_t; 
        ((threadargs_t*)args)->out_buf_load_t += out_buf_load_t; 
    }
    delete[] output_buf;
    return (void*) 0;
}

double to_seconds(uint64_t nano_sec){
    return (double)nano_sec/(double)NANOSECONDS_IN_SECOND;
}

void ExtSort::sort(string file, int num_threads){
    //signal(SIGSEGV, handler);
    uint64_t run_gen_time = nano_time();
    int ret;
    file_name = name;
    // Get file size
    size_t total_file_size = get_filesize(file.c_str());

    // Divide the file by num_threads
    //REVIEW: Make sure threads evenly divide the file.
    size_t file_thr_chunk = total_file_size/num_threads;

    // Open and Mmap the file
    int fd = open(file.c_str(), O_RDWR | O_DIRECT);
    if(fd < 0) {
        printf("Couldn't open %s: %s\n", file.c_str(), strerror(errno));
        exit(1);
    }
    char* mmapped_buffer = (char*)mmap(NULL, total_file_size, PROT_READ, MAP_SHARED, fd, 0);

    //size_t total_mem_size = (uint64_t) MEM_SIZE * (uint64_t) BLOCK_SIZE;

    // Allocate buffer. Buffer/num_threads per thread buf size.
    // Total avaialbe memory
    size_t total_ram = sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE);
    // 10 gb for OS and other DS.
    size_t total_read_mem = (total_ram - TEN_GB_B)/2;
    // This should be evenly divisible by RECORD_SIZE
    total_read_mem = total_read_mem + RECORD_SIZE/2;
    total_read_mem = total_read_mem - (total_read_mem%RECORD_SIZE);
    //FIXME: Test for 100 GiB
    //total_read_mem = 10737408000; // ~10 GiB
    total_read_mem = 10000000000;
    //total_read_mem = 13421760000; // ~12.5 GiB
    record_t *buffer = (record_t *) malloc(total_read_mem);
    size_t buf_thr_chunk = total_read_mem/num_threads;
    // Spwan threads now.
    pthread_t *threads;
    threadargs_t *threadargs;
    threads = (pthread_t*)malloc(num_threads * sizeof(pthread_t));
    threadargs = (threadargs_t*)malloc(num_threads * sizeof(threadargs_t));
    for (int i = 0; i < num_threads; i++) {
        threadargs[i].tid = i;
        threadargs[i].mmapped_buffer =mmapped_buffer;
        threadargs[i].buffer = buffer;
        threadargs[i].file_thr_chunk = file_thr_chunk;
        threadargs[i].buf_thr_chunk = buf_thr_chunk;
        threadargs[i].total_file_size = total_file_size;
	if(i%2)
		sleep(2);
        ret = pthread_create(&threads[i], NULL, rsw_data, &threadargs[i]);
        if (ret!=0)
            EXIT_MSG("pthread_create for %dth thread failed: %s\n",
                i, strerror(errno));
    }

    for (int i = 0; i< num_threads; i++){
        ret = pthread_join(threads[i], NULL);
        if (ret !=0)
            EXIT_MSG("Thread %d failed in join: %s\n", 
            i, strerror(errno));
    }

    run_gen_time = nano_time() - run_gen_time;

    double total_read_t=0, total_keys_t=0, total_write_t=0, total_out_t=0, total_sort_t=0;
    for(int i = 0; i< num_threads; ++i){
        cout << "============== "<< threadargs[i].tid <<" ===============" << endl; 
        cout << "read time: " << to_seconds(threadargs[i].read_time) << endl;
        total_read_t += to_seconds(threadargs[i].read_time); 
        cout << "Keys time: " << to_seconds(threadargs[i].keys_load_t) << endl;
        total_keys_t +=to_seconds(threadargs[i].keys_load_t); 
        cout << "sort time: " << to_seconds(threadargs[i].sort_time) << endl;
        total_sort_t += to_seconds(threadargs[i].sort_time); 
        cout << "out_buf time: " << to_seconds(threadargs[i].out_buf_load_t) << endl;
        total_out_t += to_seconds(threadargs[i].out_buf_load_t);  
        cout << "write time: " << to_seconds(threadargs[i].write_time) << endl;
        total_write_t += to_seconds(threadargs[i].write_time); 
        cout << "Thread cycle time: " << to_seconds(threadargs[i].write_time) + 
                                         to_seconds(threadargs[i].sort_time) + 
                                         to_seconds(threadargs[i].read_time) + 
                                         to_seconds(threadargs[i].keys_load_t) +
                                         to_seconds(threadargs[i].out_buf_load_t) << endl;
    }
    cout << "========================================================" << endl;
    cout << "Run Generation time: " << to_seconds(run_gen_time) << endl;
    cout << "------ Total read time:  " << total_read_t << endl;
    cout << "------ Total keys time:  " << total_keys_t << endl;
    cout << "------ Total sort time: " << total_sort_t << endl;
    cout << "------ Total out_buf time: " << total_out_t << endl; 
    cout << "------ Total write time: " << total_write_t << endl;

    free(threads);
    free(threadargs);
    //free threadargs
}