#include "ext_sort.h" #include <chrono> #include <sys/types.h> #include <sys/stat.h> #include <assert.h> #include <algorithm> #include <sys/mman.h> #include "timer.h" #include "ips4o/include/ips4o.hpp" Timer timer; ExtSort::ExtSort(string mount_path) { folder_path_ = mount_path + std::to_string(std::time(nullptr)); mkdir(folder_path_.c_str(), 0777); } // return number of records processed size_t ExtSort::InMemorySort(size_t rec_off, DataManager &data_manger, string output_file) { size_t read_array_len = data_manger.file_size_[0] / conf.record_size; vector<in_record_t> keys_idx; // This assumes the key + index fits the memory limit, so just get size from input keys_idx.resize(read_array_len); timer.start("RUN read"); size_t read_records = data_manger.RunRead(rec_off, read_array_len, keys_idx); // size_t read_records = data_manger.RunReadPMSort(rec_off, read_array_len, keys_idx); timer.end("RUN read"); // printf("Finished reading %lu!\n", read_records); if (!read_records) return 0; timer.start("SORT"); #ifdef bandwidth uint64_t sort_start_time = rdtsc(); #endif ips4o::parallel::sort(keys_idx.begin(), keys_idx.end(), std::less<>{}); #ifdef bandwidth uint64_t sort_delta = rdtsc() - sort_start_time; timer.end("checkpoints"); printf("%f,SORT,%f,%lu\n", timer.get_overall_time("checkpoints"), ((float)(sort_delta) / NANOSECONDS_IN_SECOND), keys_idx.size() * (KEY_SIZE + INDEX_SIZE)); timer.start("checkpoints"); #endif timer.end("SORT"); // printf("Finished sorting %lu!\n", read_records); // Create a file equal to the length of the final output. data_manger.OpenAndMapOutputFile(output_file, data_manger.file_size_[0]); size_t write_records = 0; // timer.start("RUN write"); /** * Version 1 * - Each thread reads offset from file and writes to the output file. * - This could be bad because of uncrontolled number of writes give poor bandwidth. */ // write_records += data_manger.RunWrite(keys_idx, read_records); /** * Version 2 * Each thread reads offset from file to a output buffer * The output buffer once full is then written to pmem in controlled fashion. */ write_records += data_manger.RunWrite2(keys_idx, read_records); // timer.end("RUN write"); // printf("Finished writing %lu!\n", write_records); return write_records; } vector<string> ExtSort::RunGeneration(vector<string> files) { DataManager data_manager(files, &timer); uint64_t rec_off = 0; string file_name = folder_path_ + "/sorted"; vector<string> run_names; // REVIEW: Loop until all data is processed? ex: index >= total number of records? while (1) { uint64_t rec_processed = InMemorySort(rec_off, data_manager, file_name); if (!rec_processed) break; rec_off += rec_processed; } return std::move(run_names); } void ExtSort::Sort(vector<string> &files) { #ifdef bandwidth timer.start("checkpoints"); #endif timer.start("RUN"); vector<string> runs = RunGeneration(files); timer.end("RUN"); #ifdef bandiwdth timer.stop("checkpoints"); #endif printf("====================================:\n"); printf("\t RUN read: %f\n", timer.get_overall_time("RUN read")); printf("\t RUN sort: %f\n", timer.get_overall_time("SORT")); printf("\t RECORD read: %f\n", timer.get_overall_time("RecRead")); printf("\t RUN write: %f\n", timer.get_overall_time("RUN write")); printf("Total RUN: %f\n", timer.get_time("RUN")); }