#include "ext_sort.h" #include <chrono> #include <sys/types.h> #include <sys/stat.h> #include <assert.h> #include <algorithm> #include <sys/mman.h> #include "timer.h" #include "ips4o/include/ips4o.hpp" #ifdef pmdk #include <libpmem.h> #endif #ifdef __cplusplus #include <stdint.h> extern "C" { void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s); } #endif #define min(a, b) (((a) < (b)) ? (a) : (b)) Timer timer; ExtSort::ExtSort(string mount_path) { folder_path_ = mount_path + std::to_string(std::time(nullptr)); mkdir(folder_path_.c_str(), 0777); } // return number of records processed size_t ExtSort::InMemorySort(size_t rec_off, DataManager &data_manger, string output_file) { vector<in_record_t> keys_idx; keys_idx.resize(conf.read_buffer_size / conf.idx_record_size); timer.start("RUN read"); // size_t read_records = data_manger.RunRead(rec_off, conf.read_buffer_size / conf.idx_record_size, keys_idx); size_t read_records = data_manger.RunReadPMSort(rec_off, conf.read_buffer_size / conf.idx_record_size, keys_idx); timer.end("RUN read"); if (!read_records) return 0; // printf("Finished reading!\n"); timer.start("SORT"); #ifdef bandwidth uint64_t sort_start_time = rdtsc(); #endif ips4o::parallel::sort(keys_idx.begin(), keys_idx.end(), std::less<>{}); #ifdef bandwidth uint64_t sort_delta = rdtsc() - sort_start_time; timer.end("checkpoints"); printf("%f,SORT,%f,%lu\n", timer.get_overall_time("checkpoints"), ((float)(sort_delta) / NANOSECONDS_IN_SECOND), keys_idx.size() * (KEY_SIZE + INDEX_SIZE)); timer.start("checkpoints"); #endif timer.end("SORT"); // printf("Finished sorting %lu!\n", read_records); // create a file equal to the length of the index data_manger.OpenAndMapOutputFile(output_file, read_records * conf.idx_record_size); size_t write_records = 0; // Write all the read records to a file directly from the sorted array. timer.start("RUN write"); write_records += data_manger.RunWrite(keys_idx, read_records); timer.end("RUN write"); return write_records; } vector<string> ExtSort::RunGeneration(vector<string> files) { DataManager data_manager(files, &timer); uint64_t rec_off = 0; string file_name = folder_path_ + "/run_"; size_t run_num = 0; vector<string> run_names; // REVIEW: Loop until all data is processed? ex: index >= total number of records? while (1) { uint64_t rec_processed = InMemorySort(rec_off, data_manager, file_name + std::to_string(run_num)); if (!rec_processed) break; rec_off += rec_processed; run_names.push_back(file_name + std::to_string(run_num)); run_num++; } return std::move(run_names); } void ExtSort::MergeRuns(vector<string> runs, string input_file) { // Game plan! // 1. Open all run files and create a new file of size sum of all runs. // 2. Create a read buffer that is evenly divisible by the number of files // 3. Create current pointer and end pointers (or atleast know end for each file's buffer) // 4a. Open the orginal input_file and mmap it // 4b. First mmap runs and fill the read buffer from their respective files. // 5. find min of keys pointed by current pointers (ideally a min on list of (key,file_num) // 6. Memcpy the offset from input_file to output_buffer directly // 6. If output buffer is full flush it and reset pointer. // 7. increment the pointer of the respective run_buffer // 8. if current pointer reach the end pointer load the new data to the part of that buffer. // 9. if the last block of a file is read and current pointer for that file buffer is at the // end then stop doing reads. // 10. If only one file buffer hasn't reached it's last block then just copy the rest of that // file to the output buffer. // [1] DataManager data_manager(runs, &timer); string output_file_name = folder_path_ + "/sorted"; size_t output_file_size = 0; for (auto i : data_manager.file_size_) output_file_size += ((i / conf.idx_record_size) * conf.record_size); data_manager.OpenAndMapOutputFile(output_file_name, output_file_size); // Here the write_buffer size should evenly divide the output_file_size record_t *write_buffer = (record_t *)malloc(conf.write_buffer_size); assert(write_buffer); // [2] // The read buffer must be evenly divisible by runs.size() and IDX_BLOCK_SIZE size_t READ_BUFFER_SIZE_M = ((conf.merge_read_buffer_size / runs.size()) / conf.block_size) * conf.block_size * runs.size(); in_record_t *read_buffer = (in_record_t *)malloc(READ_BUFFER_SIZE_M); assert(read_buffer); // [3] vector<size_t> cur_rec_off(runs.size(), 0); // Holds the rec offset of key that needs comparison per file vector<size_t> end_rec_off(runs.size(), 0); // Holds the last rec offset of the space allocated per file vector<size_t> rfile_blk_off(runs.size(), 0); // Has the current blk offset of the file which is to be read vector<char *> mapped_run_files(runs.size()); // mmap all the run files size_t space_per_run = READ_BUFFER_SIZE_M / runs.size(); // [4a] - REVIEW: Perhaps just pass run's data_manager object where file is already opened and mapped? // open file and mmap it int fd = open(input_file.c_str(), O_RDWR | O_DIRECT); if (fd < 0) { printf("Couldn't open input file %s: %s\n", input_file.c_str(), strerror(errno)); exit(1); } struct stat st; stat(input_file.c_str(), &st); // mmap file if (!data_manager.MMapFile(st.st_size, 0, fd, data_manager.input_mapped_buffer_)) exit(1); // [4b] size_t read_records = 0; size_t read_size_blk = space_per_run / conf.block_size; for (uint32_t file_num = 0; file_num < runs.size(); ++file_num) { // Initialize pointers cur_rec_off[file_num] = file_num * (space_per_run / IDX_RECORD_SIZE); end_rec_off[file_num] = (file_num + 1) * (space_per_run / IDX_RECORD_SIZE) - 1; // mmap files data_manager.MMapFile(data_manager.file_size_[file_num], 0, data_manager.file_ptr_[file_num], mapped_run_files[file_num]); timer.start("MERGE read"); read_records += data_manager.MergeRead(read_buffer, cur_rec_off[file_num], mapped_run_files[file_num], rfile_blk_off[file_num], read_size_blk); timer.end("MERGE read"); #ifdef checkpoints timer.end("checkpoints"); printf("MERGE seq read: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif rfile_blk_off[file_num] = read_size_blk; } vector<k_t> min_vec(runs.size()); // Load the vec with first key from each run for (uint32_t file_num = 0; file_num < runs.size(); ++file_num) { min_vec[file_num] = read_buffer[cur_rec_off[file_num]].k; } size_t write_buff_rec_off = 0; uint32_t min_index = 0; size_t recs_written = 0; size_t run_file_sz = 0; size_t input_file_off = 0; k_t tmp_k; tmp_k = 0xff; bool random_reads_complete = false; vector<read_offs> off_vec; // Loop until all the recs are written while (recs_written < output_file_size / conf.record_size) { // [5] min_index = std::min_element(min_vec.begin(), min_vec.end()) - min_vec.begin(); // [6] // This is the additional random read to fetch the value. input_file_off = data_manager.ConvertHexToRoff(read_buffer[cur_rec_off[min_index]].r_off.val); // check if all RUN files reached the end if (count(min_vec.begin(), min_vec.end(), tmp_k) == (uint32_t)runs.size()) { // What if write buffer is not completely full but all records are processed. // Here min_vec indicates all the records are processed for all runs // Dump remaining write_buffer and break timer.start("MERGE write"); // recs_written += data_manager.MergeWrite(write_buffer, write_buff_rec_off); timer.end("MERGE write"); #ifdef checkpoints timer.end("checkpoints"); printf("MERGE write: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif break; } /** * @brief List all the offsets reads must be made to and then submit * this to threadpool at once. This is to avoid making random read blocking. */ if (write_buff_rec_off < conf.write_buffer_size / conf.record_size) { // submit job off_vec.emplace_back((read_offs){input_file_off, data_manager.curr_output_off}); data_manager.curr_output_off += conf.record_size; write_buff_rec_off++; } if (write_buff_rec_off >= conf.write_buffer_size / conf.record_size) { // Submit task for random reads and wait until complete timer.start("RECORD read-write"); // data_manager.MergeRandomRead(write_buffer, off_vec); recs_written += data_manager.MergeRandomReadWrite(off_vec); timer.end("RECORD read-write"); // clearing queue for next set of offsets. off_vec.clear(); random_reads_complete = true; // write_buff_rec_off = 0; } // Write buffer is full so flush it. if (write_buff_rec_off >= (conf.write_buffer_size / conf.record_size) && random_reads_complete == true) { // printf("The write_buff_rec_off is %lu\n", write_buff_rec_off); timer.start("MERGE write"); // recs_written += data_manager.MergeWrite(write_buffer, write_buff_rec_off); timer.end("MERGE write"); #ifdef checkpoints timer.end("checkpoints"); printf("MERGE write: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif write_buff_rec_off = 0; random_reads_complete = false; } // [7] // Now replace min_vec with new value from respective buffer space. cur_rec_off[min_index]++; min_vec[min_index] = read_buffer[cur_rec_off[min_index]].k; // Now check if corresponding chunk has reached the end if (cur_rec_off[min_index] > end_rec_off[min_index]) { // [9] // Also see if you have hit the end of file for that run run_file_sz = data_manager.file_size_[min_index] / conf.block_size; if (rfile_blk_off[min_index] >= run_file_sz) { // [10] // if (count(min_vec.begin(), min_vec.end(), tmp_k) == (uint32_t)runs.size() - 1) // { // for (uint32_t file_id = 0; file_id < min_vec.size(); ++file_id) // { // if (!(min_vec[file_id] == tmp_k)) // { // // First flush the write_buffer to file // // Read data to read buffer and write it directly to outputfile // recs_written += data_manager.MoveRemainingInputToOuputBuff(); // break; // } // } // } // if you reached the end of a file then set a max value for it in min_vec index. min_vec[min_index] = tmp_k; // 0xffffffffffffffffffff } else { // [8] // reset current rec offset for that RUN cur_rec_off[min_index] = min_index * (space_per_run / conf.idx_record_size); // Read the next set of blocks from the file size_t read_size = min(read_size_blk, run_file_sz - rfile_blk_off[min_index]); timer.start("MERGE read"); data_manager.MergeRead(read_buffer, cur_rec_off[min_index], mapped_run_files[min_index], rfile_blk_off[min_index], read_size); timer.end("MERGE read"); #ifdef checkpoints timer.end("checkpoints"); printf("MERGE read: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif // Update end record in the case where read_buffer is not completely full // because no more records to read from for that RUN file. end_rec_off[min_index] = (min_index + 1) * ((read_size * conf.block_size) / conf.idx_record_size) - 1; rfile_blk_off[min_index] += read_size; } } } } void ExtSort::Sort(vector<string> &files) { #ifdef bandwidth timer.start("checkpoints"); #endif timer.start("RUN"); vector<string> runs = RunGeneration(files); timer.end("RUN"); timer.start("MERGE"); MergeRuns(runs, files[0]); timer.end("MERGE"); #ifdef bandiwdth timer.stop("checkpoints"); #endif printf("====================================:\n"); printf("\t RUN read: %f\n", timer.get_overall_time("RUN read")); printf("\t RUN sort: %f\n", timer.get_overall_time("SORT")); printf("\t RUN write: %f\n", timer.get_overall_time("RUN write")); printf("Total RUN: %f\n", timer.get_time("RUN")); printf("\t MERGE read: %f\n", timer.get_overall_time("MERGE read")); printf("\t MERGE write: %f\n", timer.get_overall_time("MERGE write")); printf("\t RECORD read-write: %f\n", timer.get_overall_time("RECORD read-write")); printf("Total MERGE: %f\n", timer.get_time("MERGE")); printf("Total: %f\n", timer.get_time("RUN") + timer.get_time("MERGE")); }