#include <unistd.h> #include <assert.h> #include "data_manager.h" #include "parameter.h" #include <fcntl.h> #include <sys/mman.h> #include <iostream> #define min(a, b) (((a) < (b)) ? (a) : (b)) DataManager::DataManager(vector<string> &file_list) { file_ptr.resize(file_list.size()); file_size.resize(file_list.size()); size_t rolling_sum = 0; for (size_t i=0; i < file_list.size(); i++) { int fd = open(file_list[i].c_str(), O_RDWR | O_DIRECT); if(fd < 0) { printf("Couldn't open %s: %s\n", file_list[i].c_str(), strerror(errno)); exit(1); } file_ptr[i] = fd; // Get the file size struct stat st; stat(file_list[i].c_str(), &st); off_t size = st.st_size; file_size[i] = (uint64_t)size; file_index.insert(std::make_pair(rolling_sum, i)); rolling_sum += size / BLOCK_SIZE; } file_index.insert(std::make_pair(rolling_sum, -1)); } void DataManager::init_output_buffer(size_t size) { //assert(posix_memalign(reinterpret_cast<void**>(&output_buf), BLOCK_SIZE, size * BLOCK_SIZE) == 0); output_buf = (record_t *)malloc(size * BLOCK_SIZE); assert(output_buf); } size_t DataManager::load(size_t block_num, size_t len, record_t *buffer) { // fetch the file the block belongs to. auto it = file_index.upper_bound(block_num); it--; if(it->second < 0) return 0; size_t file_num = it->second; int fd = file_ptr[file_num]; assert(fd>0); off_t offset = block_num - (size_t) it -> first; mmapped_buffer = (char*)mmap(NULL, file_size[file_num], PROT_READ, MAP_SHARED, fd, 0); if (mmapped_buffer == MAP_FAILED) { printf("Failed to mmap file of size %lu: %s\n", file_size[file_num], strerror(errno)); return -1; } size_t next_file_start = it -> first + (file_size[file_num] / BLOCK_SIZE); size_t i = 0; for (; i < len * NUM_PER_BLOCK;) { if (block_num == next_file_start) { //unmmap previous file munmap(mmapped_buffer, file_size[file_num]); file_num ++; if (file_num >= file_index.size() - 1) { break; } fd = file_ptr[file_num]; offset = 0; next_file_start = next_file_start + (file_size[file_num] / BLOCK_SIZE); // mmap the next file. mmapped_buffer = (char*)mmap(NULL, file_size[file_num], PROT_READ, MAP_SHARED, fd, 0); if (mmapped_buffer == MAP_FAILED) { printf("Failed to mmap file of size %lu: %s\n", file_size[file_num], strerror(errno)); return -1; } } // size_t req_size = min(next_file_start - block_num, input_req_size); //TODO: How does std::copy compare? memcpy((void *) (buffer + i), &mmapped_buffer[offset* BLOCK_SIZE], READ_SIZE * BLOCK_SIZE); i += READ_SIZE * NUM_PER_BLOCK; block_num += READ_SIZE; offset += READ_SIZE; } return i; } void DataManager::set_output_fd(int fd) { output_fd = fd; buf_ptr = 0; output_count = 0; } void DataManager::buffer_write(record_t *buffer, in_record_t *keys, int num_block, char *mmapped_buffer) { //REVIEW: Should this step also be parallelized later. for(size_t i=0; i < num_block * NUM_PER_OUT_BLOCK; i++){ output_buf[buf_ptr + i] = std::move(buffer[keys[i].v_index]); } output_count += num_block * NUM_PER_OUT_BLOCK; buf_ptr += num_block * NUM_PER_OUT_BLOCK; off_t off = output_count / NUM_PER_OUT_BLOCK - WRITE_SIZE; uint32_t records_boundary = buf_ptr - WRITE_SIZE * NUM_PER_OUT_BLOCK; /** * TODO: Supporting file sizes other than the multiple of MEM_SIZE. std::cout << "off: " << off << " buf_ptr: " << buf_ptr << " reord_bndr " << records_boundary << std::endl; std::cout << "----- " << off * OUTPUT_BUFFER_SIZE* BLOCK_SIZE << std::endl; int req_rec = min((WRITE_SIZE * NUM_PER_OUT_BLOCK), (MEM_SIZE * NUM_PER_BLOCK - output_count)); std::cout << "--- req_rec--- " << req_rec << std::endl; */ memcpy(&mmapped_buffer[off * OUTPUT_BUFFER_SIZE* BLOCK_SIZE], &output_buf[records_boundary], WRITE_SIZE * OUTPUT_BUFFER_SIZE * BLOCK_SIZE); if (buf_ptr == OUTPUT_BUFFER_SIZE * NUM_PER_BLOCK) { buf_ptr = 0; } //TODO: return req_rec. } uint64_t DataManager::get_overall_size() { uint64_t sum = 0; for (size_t i = 0; i < file_size.size(); i++) { sum += file_size[i]; } return sum; } DataManager::~DataManager() { for (int fd : file_ptr) close(fd); if (!output_buf) delete[] output_buf; // stop = true; } size_t DataManager::get_filesize(const char* filename){ int retval; struct stat st; retval = stat(filename, &st); if(retval) return -1; else return st.st_size; }