#include <iostream> #include <vector> #include <chrono> #include "timer.h" #include "parameter.h" #include "assert.h" //#include "in_place_merge/ParallelMergeSort.h" #include "ips4o/include/ips4o.hpp" //#include <algorithm> //#include "msort.h" #include "extsort_st.h" #include <sys/mman.h> using std::chrono::high_resolution_clock; using std::chrono::duration; using std::cout; using std::endl; Timer g_timer; ExtSort::ExtSort(void){ ID = "/mnt/pmem/" + std::to_string(std::time(nullptr)); mkdir(ID.c_str(), 0777); } size_t ExtSort::in_memory_sort(size_t start, DataManager &m, string output_file) { vector<in_record_t> keys; record_t *buffer; uint64_t mem_size = (uint64_t) MEM_SIZE * (uint64_t) BLOCK_SIZE; // REVIEW: Aligning to 4K //assert(posix_memalign(reinterpret_cast<void**>(&buffer), BLOCK_SIZE, mem_size) == 0); buffer = (record_t *) malloc(mem_size); assert(buffer); g_timer.start("run load"); size_t records_num = m.load(start, MEM_SIZE, buffer); g_timer.end("run load"); if(records_num==0) return 0; keys.resize(records_num); g_timer.start("load to v_keys"); //REVIEW: Don't forget to parallelize this. for (size_t i=0; i < records_num; i++) { keys[i] = in_record_t(buffer[i].k, i, i); } g_timer.end("load to v_keys"); D( cout << "keys vector size: " << sizeof(std::vector<in_record_t>) + (sizeof(in_record_t) * keys.size()) << endl ); g_timer.start("msort"); /** 1) This sorting takes O(2N) size typedef std::less<in_record_t> ComparatorT; ComparatorT comparator; ParallelMergeSort::sort(keys, comparator, 5); */ /** 2) * Currently uses openMP * msort(keys.data(), records_num, 1024); */ // 3) 32 Hardware concurrency /** * Requires <exection> from GCC 9+. ParallelAlgorithms::parallel_inplace_merge_sort_hybrid(keys, 0, records_num - 1, (records_num -1) / 32); */ // 4) //std::sort(keys.begin(), keys.end()); // 5) // msort(keys.data(), records_num, 1024); //TODO: searial ips40 ips4o::parallel::sort(keys.begin(), keys.end(), std::less<>{}); g_timer.end("msort"); //TODO: Write to file. int fd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU); assert(ftruncate(fd, (size_t) records_num * RECORD_SIZE) == 0); m.set_output_fd(fd); D( cout << "Start to output run files" << endl); char *mmapped_buffer = (char*)mmap(NULL, records_num * RECORD_SIZE, PROT_WRITE, MAP_SHARED, fd, 0); g_timer.start("run write"); size_t i = 0; while(i<records_num){ m.buffer_write( buffer, &keys[i], WRITE_SIZE, mmapped_buffer); i += WRITE_SIZE * NUM_PER_OUT_BLOCK; } g_timer.end("run write"); //unmap munmap(mmapped_buffer, records_num * RECORD_SIZE); delete[] buffer; close(fd); return records_num / NUM_PER_BLOCK; } vector<string> ExtSort::run_generation(vector<string> files){ //Initialize file meta. DataManager m(files); uint64_t index = 0; string name = ID + "/runs_"; size_t run_num = 0; vector<string> run_names; m.init_output_buffer(OUTPUT_BUFFER_SIZE); while (1) { g_timer.start("in_memory_sort"); uint64_t new_processed = in_memory_sort(index, m, name + std::to_string(run_num)); g_timer.end("in_memory_sort"); D( cout << "processed Blocks: " << new_processed << endl); D(cout << "In memory sorting " << new_processed << " blocks using " << g_timer.get_time("in_memory_sort") << endl); if (!new_processed) break; else index += new_processed; run_names.push_back(name + std::to_string(run_num)); run_num ++; } return std::move(run_names); } void ExtSort::sort(vector<string> &files) { g_timer.start("overall"); //g_timer.start("run_generation"); vector<string> runs = run_generation(files); //g_timer.end("run_generation"); g_timer.end("overall"); cout << "========================================================" << endl; cout << "Run Generation time: " << g_timer.get_time("overall") << endl; cout << "------ Total load time: " << g_timer.get_overall_time("run load") << endl; cout << "------ Total msort time: " << g_timer.get_overall_time("msort") << endl; cout << "------ Total write time: " << g_timer.get_overall_time("run write") << endl; }