#include "data_manager.h" #include "unistd.h" #include "fcntl.h" #include #include #include #include "utils.h" #include "config.h" #ifdef pmdk #include #endif #ifdef __cplusplus #include extern "C" { void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s); } #endif using std::string; using std::vector; // open and initialize files DataManager::DataManager(vector &file_list) { file_ptr_.resize(file_list.size()); file_size_.resize(file_list.size()); size_t rolling_sum = 0; for (size_t i = 0; i < file_list.size(); i++) { int fd = open(file_list[i].c_str(), O_RDWR | O_DIRECT); if (fd < 0) { printf("Couldn't open file %s: %s\n", file_list[i].c_str(), strerror(errno)); exit(1); } file_ptr_[i] = fd; struct stat st; stat(file_list[i].c_str(), &st); off_t size = st.st_size; file_size_[i] = size; file_index_.insert(std::make_pair(rolling_sum, i)); rolling_sum += size / conf.block_size; } file_index_.insert(std::make_pair(rolling_sum, -1)); } // READ = type = 0, WRITE = type = 1. int DataManager::MMapFile(size_t file_size, int type, int fd, char *&mapped_buffer) { if (!type) // READ part { mapped_buffer = (char *)mmap(NULL, file_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (mapped_buffer == MAP_FAILED) { printf("Failed to mmap read file of size %lu: %s\n", file_size, strerror(errno)); return -1; } return 1; } else // WRITE part { mapped_buffer = (char *)mmap(NULL, file_size, PROT_WRITE, MAP_SHARED, fd, 0); if (mapped_buffer == MAP_FAILED) { printf("Failed to mmap write file of size %lu: %s\n", file_size, strerror(errno)); return -1; } return 1; } } // Each block belongs to a paricular file, read from the right file. // read_buff_B Number of blocks that fit the read buffer. size_t DataManager::RunRead(size_t block_num, size_t read_buff_count, record_t *read_buffer) { // Find the file the block number belongs to auto file_it = file_index_.upper_bound(block_num); file_it--; // block belongs to no file if (file_it->second < 0) return 0; size_t file_num = file_it->second; off_t file_off = block_num - file_it->first; // 0 because READ if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_)) return -1; size_t next_file_start = file_it->first + (file_size_[file_num] / conf.block_size); size_t read_records = 0; // Read until number of records fill the read buffer. while (read_records < read_buff_count * conf.recs_per_blk) { if (block_num == next_file_start) { // unmap previous file and map the next file. munmap(input_mapped_buffer_, file_size_[file_num]); file_num++; if (file_num >= file_index_.size() - 1) break; file_off = 0; next_file_start = file_it->first + (file_size_[file_num] / conf.block_size); if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_)) return -1; } #ifdef avx512 __memmove_chk_avx512_no_vzeroupper((void *)(read_buffer + read_records), &input_mapped_buffer_[file_off * conf.block_size], READ_SIZE * conf.block_size); // #elif pmdk // pmem_memcpy_nodrain((void *)(read_buffer + read_records), // &input_mapped_buffer_[file_off * conf.block_size], // READ_SIZE * conf.block_size); #else memcpy((void *)(read_buffer + read_records), &input_mapped_buffer_[file_off * conf.block_size], conf.block_size); #endif read_records += conf.recs_per_blk; block_num++; file_off++; } return read_records; } void DataManager::OpenAndMapOutputFile(string output_file, size_t file_size) { int ofd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU); if (ofd < 0) { printf("Couldn't open file %s: %s\n", output_file.c_str(), strerror(errno)); exit(1); } assert(ftruncate(ofd, file_size) == 0); output_fd_ = ofd; if (!MMapFile(file_size, 1, ofd, output_mapped_buffer_)) exit(1); output_count_ = 0; } void DataManager::LoadRecordToOutputBuffer(record_t *write_buffer, record_t *read_buffer, vector &keys_idx, size_t key_off) { for (size_t rec_idx = 0; rec_idx < conf.write_buff_rec_count; rec_idx++) write_buffer[rec_idx] = std::move(read_buffer[keys_idx[key_off + rec_idx].v_index]); } // The responsiblity of this function is to simply write to file from write buffer // Must write all read/sorted data // Assuming everythign is aligned size_t DataManager::RunWrite(record_t *write_buffer, off_t output_off) { size_t write_records = 0; size_t blk_idx = 0; // DO writes until the write buffer is empty. One blocks size at a time. while (write_records < conf.write_buff_rec_count) { #ifdef avx512 __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[output_off + blk_idx], &write_buffer[write_records], conf.block_size); #elif pmdk pmem_memcpy_nodrain(&output_mapped_buffer_[output_off + blk_idx], &write_buffer[write_records], conf.block_size); #else memcpy(&output_mapped_buffer_[output_off + blk_idx], &write_buffer[write_records], conf.block_size); #endif #ifdef clflush // flush and fence to ensure write is complete flush_clflushopt(&output_mapped_buffer_[output_off + blk_idx], conf.block_size); _mm_sfence(); #endif blk_idx += conf.block_size; write_records += conf.recs_per_blk; } return write_records; } // Read records from RUN file to the appropriate buffer offset size_t DataManager::MergeRead(record_t *read_buffer, size_t read_buf_rec_off, char *run_file_map, size_t file_off, size_t read_length) { size_t read_blks = 0; while (read_blks < read_length) { #ifdef avx512 __memmove_chk_avx512_no_vzeroupper((void *)(read_buffer + read_buf_rec_off), &run_file_map[file_off], conf.block_size); // #elif pmdk // pmem_memcpy_nodrain((void *)(read_buffer + read_buf_rec_off), // &run_file_map[file_off], // conf.block_size); #else memcpy((void *)(read_buffer + read_buf_rec_off), &run_file_map[file_off], conf.block_size); #endif file_off += conf.block_size; read_blks++; read_buf_rec_off += conf.recs_per_blk; } return read_blks; } size_t DataManager::MergeWrite(record_t *write_buffer, size_t write_len_recs) { size_t wrt_recs = 0; size_t write_blks = 0; while (write_blks < (write_len_recs * conf.record_size) / conf.block_size) { #ifdef avx512 __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[output_file_off_], &write_buffer[wrt_recs], conf.block_size); #elif pmdk pmem_memcpy_nodrain(&output_mapped_buffer_[output_file_off_], &write_buffer[wrt_recs], conf.block_size); #else memcpy(&output_mapped_buffer_[output_file_off_], &write_buffer[wrt_recs], conf.block_size); #endif #ifdef clflush // flush and fence to ensure write is complete flush_clflushopt(&output_mapped_buffer_[output_file_off_], conf.block_size); _mm_sfence(); #endif output_file_off_ += conf.block_size; write_blks++; wrt_recs += conf.recs_per_blk; } return wrt_recs; } // TODO: // First write records in write buffer // Next write the corresponding records in read buffer // Then read remaining keys to the buffer and write it to file. size_t DataManager::MoveRemainingInputToOuputBuff() { return 0; } // DataManager::~DataManager() // { // for (int fd : file_ptr_) // close(fd); // // unmap last input file and output file // // if(input_mapped_buffer_ != NULL) // // munmap(input_mapped_buffer_, file_size_[file_size_.size() - 1]); // // munmap(output_mapped_buffer_, file_size_[file_size_.size() - 1]); // }