#include "data_manager.h" #include "unistd.h" #include "fcntl.h" #include <sys/stat.h> #include <assert.h> #include <sys/mman.h> #include <x86intrin.h> #define min(a, b) (((a) < (b)) ? (a) : (b)) using std::string; using std::vector; #ifdef pmdk #include <libpmem.h> #endif #ifdef __cplusplus #include <stdint.h> extern "C" { void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s); } #endif // open and initialize files DataManager::DataManager(vector<string> &file_list, Timer *timer_cpy) : pool(std::thread::hardware_concurrency()) { timer = timer_cpy; file_ptr_.resize(file_list.size()); file_size_.resize(file_list.size()); size_t rolling_sum = 0; for (size_t i = 0; i < file_list.size(); i++) { int fd = open(file_list[i].c_str(), O_RDWR | O_DIRECT); if (fd < 0) { printf("Couldn't open file %s: %s\n", file_list[i].c_str(), strerror(errno)); exit(1); } file_ptr_[i] = fd; struct stat st; stat(file_list[i].c_str(), &st); off_t size = st.st_size; file_size_[i] = size; // This will hold the number of records in the file and file num file_index_.insert(std::make_pair(rolling_sum, i)); rolling_sum += size / RECORD_SIZE; } file_index_.insert(std::make_pair(rolling_sum, -1)); } // READ = type = 0, WRITE = type = 1. int DataManager::MMapFile(size_t file_size, int type, int fd, char *&mapped_buffer) { if (!type) // READ part { mapped_buffer = (char *)mmap(NULL, file_size, PROT_READ, MAP_SHARED, fd, 0); if (mapped_buffer == MAP_FAILED) { printf("Failed to mmap read file of size %lu: %s\n", file_size, strerror(errno)); return -1; } return 1; } else // WRITE part { mapped_buffer = (char *)mmap(NULL, file_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (mapped_buffer == MAP_FAILED) { printf("Failed to mmap write file of size %lu: %s\n", file_size, strerror(errno)); return -1; } return 1; } } // bandwdith typedef struct { int tid; uint64_t start_time; uint64_t end_time; } threadargs_t; uint64_t nano_time(void) { struct timespec ts; if (clock_gettime(CLOCK_REALTIME, &ts) == 0) return ts.tv_sec * NANOSECONDS_IN_SECOND + ts.tv_nsec; } // Finds the min start time and max end time for given set of times. std::tuple<uint64_t, uint64_t> time_min_max(threadargs_t *threadargs, int threads) { uint64_t min_start_time = threadargs[0].start_time; uint64_t max_end_time = 0; for (int i = 0; i < threads; i++) { min_start_time = (threadargs[i].start_time < min_start_time) ? threadargs[i].start_time : min_start_time; max_end_time = (threadargs[i].end_time > max_end_time) ? threadargs[i].end_time : max_end_time; } return std::make_tuple(min_start_time, max_end_time); } // Each block belongs to a paricular file, read from the right file. // read_buff_B Number of blocks that fit the read buffer. size_t DataManager::RunRead(size_t rec_num, size_t read_arr_count, std::vector<in_record_t> &keys_idx) { // Find the file the block number belongs to auto file_it = file_index_.upper_bound(rec_num); file_it--; // rec belongs to no file if (file_it->second < 0) return 0; size_t file_num = file_it->second; off_t file_off = rec_num - file_it->first; // 0 because READ if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_)) return -1; vector<std::future<size_t>> results; size_t thrd_chunk_rec = read_arr_count / conf.read_thrds; #ifdef bandwidth threadargs_t *threadargs; threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t)); #endif // clang-format off for (int thrd = 0; thrd < conf.read_thrds; thrd++) { results.emplace_back( pool.enqueue([=, &keys_idx]()->size_t { size_t reads = 0; size_t read_keys = thrd_chunk_rec * thrd; size_t v_off = 0; size_t thrd_file_off = file_off; #ifdef bandwidth threadargs[thrd].tid = thrd; threadargs[thrd].start_time = rdtsc(); #endif while (reads < thrd_chunk_rec) { memcpy((void *)(&keys_idx[read_keys]), &input_mapped_buffer_[thrd_file_off + (thrd_chunk_rec * conf.record_size * thrd)], KEY_SIZE); v_off = (read_keys * conf.record_size) + // For each record rec_num * conf.record_size; // For values beyond first read buffer keys_idx[read_keys].r_off = v_off; // Lets convert address to hex later for // now just append v_off to an array. // tmp_off.push_back(v_off); read_keys++; reads++; thrd_file_off += conf.record_size; } #ifdef bandwidth threadargs[thrd].end_time = rdtsc(); #endif /* This is for seperating reads and offset to hex convertion // reset read keys and reads read_keys = thrd_chunk_rec * thrd; reads = 0; // Now convert offset to hex while(reads < thrd_chunk_rec) { keys_idx[read_keys].r_off = tmp_off[reads]; read_keys++; reads++; } */ return reads; }) ); } // clang-format on size_t total_read = 0; for (auto &&result : results) total_read += result.get(); #ifdef bandwidth timer->end("checkpoints"); uint64_t min_start_time, max_end_time; std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds); printf("%f,READ,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), read_arr_count * KEY_SIZE); free(threadargs); timer->start("checkpoints"); #endif return total_read; } // PMSort loading step size_t DataManager::RunReadPMSort(size_t rec_num, size_t read_arr_count, std::vector<in_record_t> &keys_idx) { // Find the file the block number belongs to auto file_it = file_index_.upper_bound(rec_num); file_it--; // rec belongs to no file if (file_it->second < 0) return 0; size_t file_num = file_it->second; off_t file_off = rec_num - file_it->first; // 0 because READ if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_)) return -1; vector<std::future<size_t>> results; size_t thrd_chunk_size = (read_arr_count * conf.record_size) / conf.read_thrds; #ifdef bandwidth threadargs_t *threadargs; threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t)); #endif // clang-format off for (int thrd = 0; thrd < conf.read_thrds; thrd++) { results.emplace_back( pool.enqueue([=, &keys_idx]()->size_t { size_t read_size = 0; size_t read_keys = (thrd_chunk_size/conf.record_size) * thrd; size_t v_off = 0; size_t thrd_file_off = file_off; #ifdef bandwidth threadargs[thrd].tid = thrd; threadargs[thrd].start_time = rdtsc(); #endif char *tmp_buf = (char *) malloc(4000); while (read_size < thrd_chunk_size) { // memcpy((void *)(&keys_idx[read_keys]), // &input_mapped_buffer_[thrd_file_off + (thrd_chunk_rec * conf.record_size * thrd)], // KEY_SIZE); memcpy(tmp_buf, &input_mapped_buffer_[thrd_file_off + (thrd_chunk_size * thrd)], 4000); for(int tmp_k = 0; tmp_k < 4000; tmp_k+=conf.record_size) { memcpy((void *) (&keys_idx[read_keys]), &tmp_buf[tmp_k], KEY_SIZE); v_off = (read_keys * conf.record_size) + // For each record rec_num * conf.record_size; // For values beyond first read buffer keys_idx[read_keys].r_off = v_off; // Lets convert address to hex later for // now just append v_off to an array. // tmp_off.push_back(v_off); read_keys++; } read_size += 4000; thrd_file_off += 4000; } #ifdef bandwidth threadargs[thrd].end_time = rdtsc(); #endif /* This is for seperating reads and offset to hex convertion // reset read keys and reads read_keys = thrd_chunk_rec * thrd; reads = 0; // Now convert offset to hex while(reads < thrd_chunk_rec) { keys_idx[read_keys].r_off = tmp_off[reads]; read_keys++; reads++; } */ return read_size/conf.record_size; }) ); } // clang-format on size_t total_read = 0; for (auto &&result : results) total_read += result.get(); #ifdef bandwidth timer->end("checkpoints"); uint64_t min_start_time, max_end_time; std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds); printf("%f,READ,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), read_arr_count * KEY_SIZE); free(threadargs); timer->start("checkpoints"); #endif return total_read; } void DataManager::OpenAndMapOutputFile(string output_file, size_t file_size) { int ofd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU); if (ofd < 0) { printf("Couldn't open file %s: %s\n", output_file.c_str(), strerror(errno)); exit(1); } assert(ftruncate(ofd, file_size) == 0); output_fd_ = ofd; if (!MMapFile(file_size, 1, ofd, output_mapped_buffer_)) exit(1); output_count_ = 0; } /** * Version 1 * - Each thread reads offset from file and writes to the output file. * - This could be bad because of uncrontolled number of writes give poor bandwidth. */ size_t DataManager::RunWrite(vector<in_record_t> &keys_idx, size_t read_recs) { vector<std::future<size_t>> results; size_t thrd_chunk_rec = read_recs / conf.read_thrds; #ifdef bandwidth threadargs_t *threadargs; threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t)); #endif // clang-format off for (int thrd = 0; thrd < conf.read_thrds; thrd++) { results.emplace_back( pool.enqueue([=, &keys_idx]()->size_t { size_t write_records = thrd_chunk_rec * thrd; size_t output_off = thrd_chunk_rec * thrd * conf.record_size; size_t input_off = 0; size_t writes = 0; #ifdef bandwidth threadargs[thrd].tid = thrd; threadargs[thrd].start_time = rdtsc(); #endif while (writes < thrd_chunk_rec) { input_off = ConvertHexToRoff(keys_idx[write_records].r_off.val); // input_off = keys_idx[write_records].v_index; #ifdef avx512 __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[output_off], &input_mapped_buffer_[input_off], conf.record_size); #elif pmdk pmem_memcpy_nodrain(&output_mapped_buffer_[output_off], &input_mapped_buffer_[input_off], conf.record_size); #else memcpy(&output_mapped_buffer_[output_off], &input_mapped_buffer_[input_off], conf.record_size); #endif output_off += conf.record_size; write_records++; writes++; } #ifdef bandwidth threadargs[thrd].end_time = rdtsc(); #endif return writes; }) ); } // clang-format on size_t total_write = 0; for (auto &&result : results) total_write += result.get(); #ifdef bandwidth timer->end("checkpoints"); uint64_t min_start_time, max_end_time; std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds); printf("%f,READ-WRITE,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), read_recs * conf.record_size); free(threadargs); timer->start("checkpoints"); #endif return total_write; } /** * Version 2 * - Each thread reads offset from file and writes to the output buffer. * - The output buffer once full is then written to pmem in controlled fashion. */ size_t DataManager::RunWrite2(vector<in_record_t> &keys_idx, size_t read_recs) { size_t total_writes = 0; char *output_buffer = (char *)malloc(conf.write_buffer_size); size_t cycle_off = 0; size_t out_file_off = 0; ////////////// DEBUG: // char *test = (char *)malloc(4000); while (total_writes < read_recs) { // first read to memory vector<std::future<void>> results; size_t thrd_chunk_rec = (conf.write_buffer_size / conf.record_size) / conf.read_thrds; #ifdef bandwidth threadargs_t *threadargs; threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t)); #endif timer->start("RecRead"); // clang-format off for (int thrd = 0; thrd < conf.read_thrds; thrd++) { results.emplace_back( pool.enqueue([=, &keys_idx] { size_t write_records = thrd_chunk_rec * thrd + cycle_off; size_t output_off = thrd_chunk_rec * thrd * conf.record_size; size_t input_off = 0; size_t writes = 0; #ifdef bandwidth threadargs[thrd].tid = thrd; threadargs[thrd].start_time = rdtsc(); #endif while (writes < thrd_chunk_rec) { input_off = ConvertHexToRoff(keys_idx[write_records].r_off.val); // input_off = keys_idx[write_records].v_index; #ifdef avx512 __memmove_chk_avx512_no_vzeroupper(&output_buffer[output_off], &input_mapped_buffer_[input_off], conf.record_size); // #elif pmdk // pmem_memcpy_nodrain(&output_buffer[output_off], // &input_mapped_buffer_[input_off], conf.record_size); #else memcpy(&output_buffer[output_off], &input_mapped_buffer_[input_off], conf.record_size); /////////////// DEBUG: Alligned random reads /* memcpy((void *) test, &input_mapped_buffer_[input_off], min(file_size_[0] - input_off, 4000)); memcpy(&output_buffer[output_off], (void *) test, conf.record_size); */ #endif output_off += conf.record_size; write_records++; writes++; } #ifdef bandwidth threadargs[thrd].end_time = rdtsc(); #endif }) ); } // clang-format on for (auto &&result : results) result.get(); #ifdef bandwidth timer->end("checkpoints"); uint64_t min_start_time, max_end_time; std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds); printf("%f,READ,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), conf.write_buffer_size); // free(threadargs); timer->start("checkpoints"); #endif timer->end("RecRead"); cycle_off += thrd_chunk_rec; // Now write the buffer to file vector<std::future<void>> results2; size_t thrd_wrt_chunk = conf.write_buffer_size / conf.write_thrds; timer->start("RUN write"); // clang-format off for (int thrd = 0; thrd < conf.write_thrds; thrd++) { results2.emplace_back( pool.enqueue([=, &keys_idx] { size_t write_off = thrd_wrt_chunk * thrd; size_t thrd_f_of = out_file_off + thrd_wrt_chunk * thrd; size_t writes = 0; #ifdef bandwidth threadargs[thrd].tid = thrd; threadargs[thrd].start_time = rdtsc(); #endif while(writes < thrd_wrt_chunk) { #ifdef avx512 __memmove_chk_avx512_no_vzeroupper( &output_mapped_buffer_[thrd_f_of], &output_buffer[write_off], conf.block_size); #elif pmdk pmem_memcpy_nodrain(&output_mapped_buffer_[thrd_f_of], &output_buffer[write_off], conf.block_size); #else memcpy(&output_mapped_buffer_[thrd_f_of], &output_buffer[write_off], conf.block_size); #endif write_off += conf.block_size; thrd_f_of += conf.block_size; writes += conf.block_size; } #ifdef bandwidth threadargs[thrd].end_time = rdtsc(); #endif }) ); } // clang-format on for (auto &&result : results2) result.get(); #ifdef bandwidth timer->end("checkpoints"); std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.write_thrds); printf("%f,WRITE,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), conf.write_buffer_size); // free(threadargs); timer->start("checkpoints"); #endif timer->end("RUN write"); out_file_off += conf.write_buffer_size; total_writes += conf.write_buffer_size / conf.record_size; } return total_writes; } void DataManager::ConvertRoffToHex(uint8_t *bytes, size_t n) { for (int i = 0; i < INDEX_SIZE; ++i) { bytes[i] = (n >> i * 8) & 0xFF; } } uint64_t DataManager::ConvertHexToRoff(uint8_t *bytes) { uint64_t tmp = 0; for (int i = INDEX_SIZE - 1; i >= 0; --i) { tmp |= (uint64_t(bytes[i]) << i * 8); } return tmp; }