#include "ext_sort.h" #include <chrono> #include <sys/types.h> #include <sys/stat.h> #include <assert.h> #include <algorithm> #include <sys/mman.h> #include "timer.h" #ifdef pmdk #include <libpmem.h> #endif #ifdef __cplusplus #include <stdint.h> extern "C" { void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s); } #endif #define min(a, b) (((a) < (b)) ? (a) : (b)) Timer timer; ExtSort::ExtSort(string mount_path) { folder_path_ = mount_path + std::to_string(std::time(nullptr)); mkdir(folder_path_.c_str(), 0777); // DEBUG: // folder_path_ = "/mnt/pmem/1649252474"; } // return number of blocks processed size_t ExtSort::InMemorySort(size_t blk_off, DataManager &data_manger, string output_file) { vector<in_record_t> keys_idx; size_t read_buff_sz = READ_BUFFER_COUNT * BLOCK_SIZE; record_t *read_buffer = (record_t *)malloc(read_buff_sz); assert(read_buffer); timer.start("RUN read"); size_t read_records = data_manger.RunRead(blk_off, READ_BUFFER_COUNT, read_buffer); timer.end("RUN read"); #ifdef checkpoints timer.end("checkpoints"); printf("RUN read: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif if (!read_records) return 0; // printf("Finished reading!\n"); keys_idx.resize(read_records); size_t off = 0; roffset r_offset; for (size_t idx = 0; idx < read_records; ++idx) { // Let's store the corresponding record offset - Not value offset off = (idx * RECORD_SIZE) + // For each record blk_off * BLOCK_SIZE; // For values beyond first read buffer r_offset = off; keys_idx[idx] = in_record_t(read_buffer[idx].k, r_offset); } timer.start("SORT"); std::sort(keys_idx.begin(), keys_idx.end()); timer.end("SORT"); #ifdef checkpoints timer.end("checkpoints"); printf("SORT: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif // printf("Finished sorting %lu!\n", read_records); // create a file equal to the length of the index data_manger.OpenAndMapOutputFile(output_file, read_records * IDX_RECORD_SIZE); size_t write_records = 0; // Write all the read records to a file directly from the sorted array. timer.start("RUN write"); write_records += data_manger.RunWrite(keys_idx, read_records); timer.end("RUN write"); #ifdef checkpoints timer.end("checkpoints"); printf("RUN write: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif // delete buffers delete[] read_buffer; // Divide by BLOCK_SIZE because reads use BLOCK_SIZE. return (write_records * RECORD_SIZE) / BLOCK_SIZE; } vector<string> ExtSort::RunGeneration(vector<string> files) { DataManager data_manager(files); uint64_t blk_off = 0; string file_name = folder_path_ + "/run_"; size_t run_num = 0; vector<string> run_names; // REVIEW: Loop until all data is processed? ex: index >= total number of records? while (1) { uint64_t blk_processed = InMemorySort(blk_off, data_manager, file_name + std::to_string(run_num)); if (!blk_processed) break; blk_off += blk_processed; run_names.push_back(file_name + std::to_string(run_num)); run_num++; } return std::move(run_names); } void ExtSort::MergeRuns(vector<string> runs, string input_file) { // Game plan! // 1. Open all run files and create a new file of size sum of all runs. // 2. Create a read buffer that is evenly divisible by the number of files // 3. Create current pointer and end pointers (or atleast know end for each file's buffer) // 4a. Open the orginal input_file and mmap it // 4b. First mmap runs and fill the read buffer from their respective files. // 5. find min of keys pointed by current pointers (ideally a min on list of (key,file_num) // 6. Memcpy the offset from input_file to output_buffer directly // 6. If output buffer is full flush it and reset pointer. // 7. increment the pointer of the respective run_buffer // 8. if current pointer reach the end pointer load the new data to the part of that buffer. // 9. if the last block of a file is read and current pointer for that file buffer is at the // end then stop doing reads. // 10. If only one file buffer hasn't reached it's last block then just copy the rest of that // file to the output buffer. // The read buffer must be evenly divisible by runs.size() and IDX_BLOCK_SIZE size_t READ_BUFFER_SIZE_M = ((M_READ_BUFFER_SIZE / runs.size()) / IDX_BLOCK_SIZE) * IDX_BLOCK_SIZE * runs.size(); size_t READ_BUFFER_COUNT_M = READ_BUFFER_SIZE_M / IDX_BLOCK_SIZE; // [1] DataManager data_manager(runs); string output_file_name = folder_path_ + "/sorted"; size_t output_file_size = 0; for (auto i : data_manager.file_size_) output_file_size += ((i / IDX_RECORD_SIZE) * RECORD_SIZE); data_manager.OpenAndMapOutputFile(output_file_name, output_file_size); // Here the write_buffer size should evenly divide the output_file_size record_t *write_buffer = (record_t *)malloc(WRITE_BUFFER_COUNT * BLOCK_SIZE); assert(write_buffer); // [2] size_t read_buff_sz = READ_BUFFER_COUNT_M * IDX_BLOCK_SIZE; in_record_t *read_buffer = (in_record_t *)malloc(read_buff_sz); assert(read_buffer); // [3] vector<size_t> cur_rec_off(runs.size(), 0); // Holds the rec offset of key that needs comparison per file vector<size_t> end_rec_off(runs.size(), 0); // Holds the last rec offset of the space allocated per file vector<size_t> rfile_blk_off(runs.size(), 0); // Has the current blk offset of the file which is to be read vector<char *> mapped_run_files(runs.size()); // mmap all the run files size_t space_per_run = read_buff_sz / runs.size(); // [4a] - REVIEW: Perhaps just pass run's data_manager object where file is already opened and mapped? // open file and mmap it int fd = open(input_file.c_str(), O_RDWR | O_DIRECT); if (fd < 0) { printf("Couldn't open input file %s: %s\n", input_file.c_str(), strerror(errno)); exit(1); } struct stat st; stat(input_file.c_str(), &st); // mmap file if (!data_manager.MMapFile(st.st_size, 0, fd, data_manager.input_mapped_buffer_)) exit(1); // [4b] size_t read_records = 0; size_t read_size_blk = space_per_run / IDX_BLOCK_SIZE; for (uint32_t file_num = 0; file_num < runs.size(); ++file_num) { // Initialize pointers cur_rec_off[file_num] = file_num * (space_per_run / IDX_RECORD_SIZE); end_rec_off[file_num] = (file_num + 1) * (space_per_run / IDX_RECORD_SIZE) - 1; // mmap files data_manager.MMapFile(data_manager.file_size_[file_num], 0, data_manager.file_ptr_[file_num], mapped_run_files[file_num]); timer.start("MERGE seq read"); read_records += data_manager.MergeRead(read_buffer, cur_rec_off[file_num], mapped_run_files[file_num], rfile_blk_off[file_num], read_size_blk); timer.end("MERGE seq read"); #ifdef checkpoints timer.end("checkpoints"); printf("MERGE seq read: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif rfile_blk_off[file_num] = read_size_blk; } vector<k_t> min_vec(runs.size()); // Load the vec with first key from each run for (uint32_t file_num = 0; file_num < runs.size(); ++file_num) { min_vec[file_num] = read_buffer[cur_rec_off[file_num]].k; } ////////////// DEBUG: // char *test = (char *)malloc(512); size_t write_buff_rec_off = 0; uint32_t min_index = 0; size_t recs_written = 0; size_t run_file_sz = 0; size_t input_file_off = 0; k_t tmp_k; tmp_k = 0xff; // Loop until all the recs are written while (recs_written < output_file_size / RECORD_SIZE) { // [5] min_index = std::min_element(min_vec.begin(), min_vec.end()) - min_vec.begin(); // DEBUG: // for (int i; i < 10; i++) // printf("%x ", read_buffer[cur_rec_off[min_index]].k.key[i]); // printf("\n"); // [6] // This is the additional random read to fetch the value. input_file_off = data_manager.ConvertHexToRoff(read_buffer[cur_rec_off[min_index]].r_off.val); // DEBUG: // printf("write_buff_rec_off - %lu, cur_rec_off - %lu, input_file_off - %lu \n", // write_buff_rec_off, cur_rec_off[min_index], input_file_off); timer.start("MERGE rand read"); #ifdef avx512 __memmove_chk_avx512_no_vzeroupper(&write_buffer[write_buff_rec_off], &data_manager.input_mapped_buffer_[input_file_off], RECORD_SIZE); // #elif pmdk // pmem_memcpy_nodrain(&write_buffer[write_buff_rec_off], // &data_manager.input_mapped_buffer_[input_file_off], // RECORD_SIZE); #else // memcpy((void *)(test), // &data_manager.input_mapped_buffer_[input_file_off], // min(output_file_size - input_file_off, 512)); // memcpy(&write_buffer[write_buff_rec_off], (void *)test, RECORD_SIZE); memcpy(&write_buffer[write_buff_rec_off], &data_manager.input_mapped_buffer_[input_file_off], RECORD_SIZE); #endif timer.end("MERGE rand read"); write_buff_rec_off++; // check if all RUN files reached the end if (count(min_vec.begin(), min_vec.end(), tmp_k) == (uint32_t)runs.size()) { // What if write buffer is not completely full but all records are processed. // Here min_vec indicates all the records are processed for all runs // Dump remaining write_buffer and break timer.start("MERGE write"); recs_written += data_manager.MergeWrite(write_buffer, write_buff_rec_off); timer.end("MERGE write"); #ifdef checkpoints timer.end("checkpoints"); printf("MERGE write: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif break; } // Write buffer is full so flush it. if (write_buff_rec_off == REC_PER_WRITE_BUFFER) { timer.start("MERGE write"); recs_written += data_manager.MergeWrite(write_buffer, write_buff_rec_off); timer.end("MERGE write"); #ifdef checkpoints timer.end("checkpoints"); printf("MERGE write: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif write_buff_rec_off = 0; } // [7] // Now replace min_vec with new value from respective buffer space. cur_rec_off[min_index]++; min_vec[min_index] = read_buffer[cur_rec_off[min_index]].k; // Now check if corresponding chunk has reached the end if (cur_rec_off[min_index] >= end_rec_off[min_index]) { // [9] // Also see if you have hit the end of file for that run run_file_sz = data_manager.file_size_[min_index] / IDX_BLOCK_SIZE; if (rfile_blk_off[min_index] >= run_file_sz) { // [10] // if (count(min_vec.begin(), min_vec.end(), tmp_k) == (uint32_t)runs.size() - 1) // { // for (uint32_t file_id = 0; file_id < min_vec.size(); ++file_id) // { // if (!(min_vec[file_id] == tmp_k)) // { // // First flush the write_buffer to file // // Read data to read buffer and write it directly to outputfile // recs_written += data_manager.MoveRemainingInputToOuputBuff(); // break; // } // } // } // if you reached the end of a file then set a max value for it in min_vec index. min_vec[min_index] = tmp_k; // 0xffffffffffffffffffff } else { // [8] // reset current rec offset for that RUN cur_rec_off[min_index] = min_index * (space_per_run / IDX_RECORD_SIZE); // Read the next set of blocks from the file size_t read_size = min(read_size_blk, run_file_sz - rfile_blk_off[min_index]); timer.start("MERGE seq read"); data_manager.MergeRead(read_buffer, cur_rec_off[min_index], mapped_run_files[min_index], rfile_blk_off[min_index], read_size); timer.end("MERGE seq read"); #ifdef checkpoints timer.end("checkpoints"); printf("MERGE seq read: %f\n", timer.get_overall_time("checkpoints")); timer.start("checkpoints"); #endif // Update end record in the case where read_buffer is not completely full // because no more records to read from for that RUN file. end_rec_off[min_index] = (min_index + 1) * ((read_size * IDX_BLOCK_SIZE) / IDX_RECORD_SIZE) - 1; rfile_blk_off[min_index] += read_size; } } } } void ExtSort::Sort(vector<string> &files) { #ifdef checkpoints timer.start("checkpoints"); #endif // DEBUG: Wait until all measuring scripts are ready. sleep(5); timer.start("RUN"); vector<string> runs = RunGeneration(files); timer.end("RUN"); // DEBUG: // vector<string> runs{"/mnt/pmem/1649252474/run_0", "/mnt/pmem/1649252474/run_1"}; timer.start("MERGE"); MergeRuns(runs, files[0]); timer.end("MERGE"); #ifdef checkpoints timer.end("checkpoints"); #endif printf("====================================\n"); printf("Total RUN: %f\n", timer.get_time("RUN")); printf("\t RUN read: %f\n", timer.get_overall_time("RUN read")); printf("\t RUN sort: %f\n", timer.get_overall_time("SORT")); printf("\t RUN write: %f\n", timer.get_overall_time("RUN write")); printf("Total MERGE: %f\n", timer.get_time("MERGE")); printf("\t MERGE seq read: %f\n", timer.get_overall_time("MERGE seq read")); printf("\t MERGE rand read: %f\n", timer.get_overall_time("MERGE rand read")); printf("\t MERGE write: %f\n", timer.get_overall_time("MERGE write")); printf("Total: %f\n", timer.get_time("RUN") + timer.get_time("MERGE")); }