#include "data_manager.h" #include "unistd.h" #include "fcntl.h" #include <sys/stat.h> #include <assert.h> #include <sys/mman.h> using std::string; using std::vector; // FIXME: move this to util #ifdef pmdk #include <libpmem.h> #endif #ifdef __cplusplus #include <stdint.h> extern "C" { void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s); } #endif // open and initialize files DataManager::DataManager(vector<string> &file_list) { file_ptr_.resize(file_list.size()); file_size_.resize(file_list.size()); size_t rolling_sum = 0; for (size_t i = 0; i < file_list.size(); i++) { int fd = open(file_list[i].c_str(), O_RDWR | O_DIRECT); if (fd < 0) { printf("Couldn't open file %s: %s\n", file_list[i].c_str(), strerror(errno)); exit(1); } file_ptr_[i] = fd; struct stat st; stat(file_list[i].c_str(), &st); off_t size = st.st_size; file_size_[i] = size; file_index_.insert(std::make_pair(rolling_sum, i)); rolling_sum += size / BLOCK_SIZE; } file_index_.insert(std::make_pair(rolling_sum, -1)); } // READ = type = 0, WRITE = type = 1. int DataManager::MMapFile(size_t file_size, int type, int fd, char *&mapped_buffer) { if (!type) // READ part { mapped_buffer = (char *)mmap(NULL, file_size, PROT_READ, MAP_SHARED, fd, 0); if (mapped_buffer == MAP_FAILED) { printf("Failed to mmap read file of size %lu: %s\n", file_size, strerror(errno)); return -1; } return 1; } else // WRITE part { mapped_buffer = (char *)mmap(NULL, file_size, PROT_WRITE, MAP_SHARED, fd, 0); if (mapped_buffer == MAP_FAILED) { printf("Failed to mmap write file of size %lu: %s\n", file_size, strerror(errno)); return -1; } return 1; } } // Each block belongs to a paricular file, read from the right file. // read_buff_B Number of blocks that fit the read buffer. size_t DataManager::RunRead(size_t block_num, size_t read_buff_count, record_t *read_buffer) { // Find the file the block number belongs to auto file_it = file_index_.upper_bound(block_num); file_it--; // block belongs to no file if (file_it->second < 0) return 0; size_t file_num = file_it->second; off_t file_off = block_num - file_it->first; // 0 because READ if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_)) return -1; size_t next_file_start = file_it->first + (file_size_[file_num] / BLOCK_SIZE); size_t read_records = 0; // Read until number of records fill the read buffer. while (read_records < read_buff_count * REC_PER_BLOCK) { if (block_num == next_file_start) { // unmap previous file and map the next file. munmap(input_mapped_buffer_, file_size_[file_num]); file_num++; if (file_num >= file_index_.size() - 1) break; file_off = 0; next_file_start = file_it->first + (file_size_[file_num] / BLOCK_SIZE); if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_)) return -1; } #ifdef avx512 __memmove_chk_avx512_no_vzeroupper((void *)(read_buffer + read_records), &input_mapped_buffer_[file_off * BLOCK_SIZE], READ_SIZE * BLOCK_SIZE); // NOTE: commenting this for write only nt // #elif pmdk // pmem_memcpy_nodrain((void *)(read_buffer + read_records), // &input_mapped_buffer_[file_off * BLOCK_SIZE], // READ_SIZE * BLOCK_SIZE); #else memcpy((void *)(read_buffer + read_records), &input_mapped_buffer_[file_off * BLOCK_SIZE], READ_SIZE * BLOCK_SIZE); #endif read_records += READ_SIZE * REC_PER_BLOCK; block_num += READ_SIZE; file_off += READ_SIZE; } return read_records; } void DataManager::OpenAndMapOutputFile(string output_file, size_t file_size) { int ofd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU); if (ofd < 0) { printf("Couldn't open file %s: %s\n", output_file.c_str(), strerror(errno)); exit(1); } assert(ftruncate(ofd, file_size) == 0); output_fd_ = ofd; if (!MMapFile(file_size, 1, ofd, output_mapped_buffer_)) exit(1); output_count_ = 0; } // The responsiblity of this function is to simply write to file from write buffer // Must write all read/sorted data // Assuming everythign is aligned size_t DataManager::RunWrite(vector<in_record_t> keys_idx, size_t read_recs) { size_t pending_recs = 0; size_t write_records = 0; size_t blk_idx = 0; // DO writes until the write buffer is empty. One blocks size at a time. while (write_records < read_recs) { pending_recs = read_recs - write_records; if (pending_recs < IDX_REC_PER_BLK) { // This is the last write which is smaller than WRITE_BLOCK_IDX memcpy(&output_mapped_buffer_[blk_idx], &keys_idx[write_records], pending_recs * IDX_RECORD_SIZE); write_records += pending_recs; blk_idx += pending_recs * IDX_RECORD_SIZE; break; } #ifdef avx512 __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[blk_idx], &keys_idx[write_records], IDX_BLOCK_SIZE); #elif pmdk pmem_memcpy_nodrain(&output_mapped_buffer_[blk_idx], &keys_idx[write_records], IDX_BLOCK_SIZE); #else memcpy(&output_mapped_buffer_[blk_idx], &keys_idx[write_records], IDX_BLOCK_SIZE); #endif blk_idx += IDX_BLOCK_SIZE; write_records += IDX_REC_PER_BLK; } return write_records; } // Read records from RUN file to the appropriate buffer offset size_t DataManager::MergeRead(in_record_t *read_buffer, size_t read_buf_rec_off, char *run_file_map, size_t file_off, size_t read_length) { size_t read_blks = 0; while (read_blks < read_length) { // FIXME: What if the file does not have IDX_BLOCK_SIZE of data anymore #ifdef avx512 __memmove_chk_avx512_no_vzeroupper((void *)(read_buffer + read_buf_rec_off), &run_file_map[file_off], IDX_BLOCK_SIZE); // #elif pmdk // pmem_memcpy_nodrain((void *)(read_buffer + read_buf_rec_off), // &run_file_map[file_off], // IDX_BLOCK_SIZE); #else memcpy((void *)(read_buffer + read_buf_rec_off), &run_file_map[file_off], IDX_BLOCK_SIZE); #endif file_off += IDX_BLOCK_SIZE; read_blks++; read_buf_rec_off += IDX_REC_PER_BLK; } return read_blks; } size_t DataManager::MergeWrite(record_t *write_buffer, size_t write_len_recs) { size_t wrt_recs = 0; size_t write_blks = 0; while (write_blks < (write_len_recs * RECORD_SIZE) / BLOCK_SIZE) { #ifdef avx512 __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[output_file_off_], &write_buffer[wrt_recs], BLOCK_SIZE); #elif pmdk pmem_memcpy_nodrain(&output_mapped_buffer_[output_file_off_], &write_buffer[wrt_recs], BLOCK_SIZE); #else memcpy(&output_mapped_buffer_[output_file_off_], &write_buffer[wrt_recs], BLOCK_SIZE); #endif output_file_off_ += BLOCK_SIZE; write_blks++; wrt_recs += REC_PER_BLOCK; } return wrt_recs; } // First write records in the buffer // This is not the same as EMS. Cause this should also do reads // from input_file and not just the RUN files, for the values. size_t DataManager::MoveRemainingInputToOuputBuff() { return 0; } void DataManager::ConvertRoffToHex(uint8_t *bytes, size_t n) { for (int i = 0; i < INDEX_SIZE; ++i) { bytes[i] = (n >> i * 8) & 0xFF; } } uint64_t DataManager::ConvertHexToRoff(uint8_t *bytes) { uint64_t tmp = 0; for (int i = INDEX_SIZE - 1; i >= 0; --i) { tmp |= (uint64_t(bytes[i]) << i * 8); } return tmp; }