WiscSort / wiscSort / RSW / extsort_st.cc
extsort_st.cc
Raw
#include <iostream>
#include <vector>
#include <chrono>
#include "timer.h"
#include "parameter.h"
#include "assert.h"
//#include "in_place_merge/ParallelMergeSort.h"
#include "ips4o/include/ips4o.hpp"
//#include <algorithm>
//#include "msort.h"
#include "extsort_st.h"
#include <sys/mman.h>

using std::chrono::high_resolution_clock;
using std::chrono::duration;
using std::cout;
using std::endl;

Timer g_timer;
ExtSort::ExtSort(void){
        ID = "/mnt/pmem/" + std::to_string(std::time(nullptr));
        mkdir(ID.c_str(), 0777);
}

size_t ExtSort::in_memory_sort(size_t start, DataManager &m, string output_file) {
    vector<in_record_t> keys;
    record_t *buffer;
    uint64_t mem_size = (uint64_t) MEM_SIZE * (uint64_t) BLOCK_SIZE;
    // REVIEW: Aligning to 4K
    //assert(posix_memalign(reinterpret_cast<void**>(&buffer), BLOCK_SIZE, mem_size) == 0);
    buffer = (record_t *) malloc(mem_size);
    assert(buffer);

    g_timer.start("run load");
    size_t records_num = m.load(start, MEM_SIZE, buffer);
    g_timer.end("run load");

    if(records_num==0) return 0;

        keys.resize(records_num);
        g_timer.start("load to v_keys");
        //REVIEW: Don't forget to parallelize this.
        for (size_t i=0; i < records_num; i++) {
            keys[i] = in_record_t(buffer[i].k, i, i);
        }
        g_timer.end("load to v_keys");
        D(
            cout << "keys vector size: " << sizeof(std::vector<in_record_t>) + (sizeof(in_record_t)  * keys.size()) << endl
        );

	g_timer.start("msort");

	/** 1) This sorting takes O(2N) size
	typedef std::less<in_record_t> ComparatorT;
        ComparatorT comparator;

        ParallelMergeSort::sort(keys, comparator, 5);
	*/

	/** 2)
	 * Currently uses openMP 
	 * msort(keys.data(), records_num, 1024);	    
	 */

	// 3) 32 Hardware concurrency
	/**
	 * Requires <exection> from GCC 9+.
	ParallelAlgorithms::parallel_inplace_merge_sort_hybrid(keys, 0, records_num - 1, (records_num -1) / 32);	
	*/

	// 4)
	//std::sort(keys.begin(), keys.end());

	// 5)
	// msort(keys.data(), records_num, 1024);


	//TODO: searial ips40
	ips4o::parallel::sort(keys.begin(), keys.end(), std::less<>{});
        g_timer.end("msort");

        //TODO: Write to file.
        int fd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU);
        assert(ftruncate(fd, (size_t) records_num * RECORD_SIZE) == 0);
        m.set_output_fd(fd);
        D( cout << "Start to output run files" << endl);

        char *mmapped_buffer = (char*)mmap(NULL, records_num * RECORD_SIZE,
                    PROT_WRITE, MAP_SHARED, fd, 0);
        
        g_timer.start("run write");
        size_t i = 0;
        while(i<records_num){
            m.buffer_write( buffer, 
                            &keys[i], 
                            WRITE_SIZE, 
                            mmapped_buffer);
            i += WRITE_SIZE * NUM_PER_OUT_BLOCK;
        }
        g_timer.end("run write");
        //unmap
        munmap(mmapped_buffer, records_num * RECORD_SIZE);

        delete[] buffer;
        close(fd);
        return records_num / NUM_PER_BLOCK;
}

vector<string> ExtSort::run_generation(vector<string> files){
    //Initialize file meta.
    DataManager m(files);
    uint64_t index = 0;
    string name = ID + "/runs_";
    size_t run_num = 0;
    vector<string> run_names;
    m.init_output_buffer(OUTPUT_BUFFER_SIZE);
    while (1) {
        g_timer.start("in_memory_sort");
        uint64_t new_processed = in_memory_sort(index, m, name + std::to_string(run_num));
        g_timer.end("in_memory_sort");
        D( cout << "processed Blocks: " << new_processed << endl);
        D(cout << "In memory sorting " << new_processed << " blocks using " << g_timer.get_time("in_memory_sort") << endl);

        if (!new_processed) 
		break;
        else 
		index += new_processed;
        run_names.push_back(name + std::to_string(run_num));
        run_num ++;
    }
    return std::move(run_names);
}

void ExtSort::sort(vector<string> &files) {
    g_timer.start("overall");
    //g_timer.start("run_generation");
    vector<string> runs = run_generation(files);
    //g_timer.end("run_generation");
    g_timer.end("overall");
    cout << "========================================================" << endl;
    cout << "Run Generation time: " << g_timer.get_time("overall") << endl;
    cout << "------ Total load time:  " << g_timer.get_overall_time("run load") << endl;
    cout << "------ Total msort time: " << g_timer.get_overall_time("msort") << endl;
    cout << "------ Total write time: " << g_timer.get_overall_time("run write") << endl;
}