// Multi threaded - sort key and index. #include <pthread.h> #include "extsort_rsw.h" #include <fcntl.h> #include <chrono> #include <unistd.h> #include <algorithm> #include "parameter.h" #include "nano_time.h" #include <iostream> #include <unistd.h> #include <vector> #include <sys/mman.h> #include <sys/sysinfo.h> // Mulit threaded //#include "ips4o/include/ips4o.hpp" //#include <execinfo.h> //#include <signal.h> using std::cout; using std::endl; using std::vector; #define NANOSECONDS_IN_SECOND 1000000000 #define EXIT_MSG(...) \ do \ { \ printf(__VA_ARGS__); \ _exit(-1); \ } while (0) #ifdef DEBUG #define D(x) (x) #else #define D(x) \ do \ { \ } while (0) #endif #define min(a, b) (((a) < (b)) ? (a) : (b)) string file_name; int record_size = KEY_SIZE + VALUE_SIZE; ExtSort::ExtSort(void) { ID = "/mnt/pmem/" + std::to_string(std::time(nullptr)); mkdir(ID.c_str(), 0777); name = ID + "/runs-"; } size_t get_filesize(const char *filename) { int retval; struct stat st; retval = stat(filename, &st); if (retval) return -1; else return st.st_size; } /** * @brief * Load IndexMap file for sorting. * @param input_rec_off the input offset from which the thread is responsible * @param rec_len the number of records it must process * @param keys_idx * @param input_mapped_buffer * @return size_t */ size_t RunRead(size_t input_rec_off, size_t rec_len, vector<in_record_t> &keys_idx, char *input_mapped_buffer) { size_t reads = 0; size_t v_off = 0; while (reads < rec_len) { memcpy((void *)(&keys_idx[reads]), &input_mapped_buffer[(input_rec_off + reads) * record_size], // 100 because record size KEY_SIZE); // key size is 10 v_off = (input_rec_off * record_size) + // For each record reads * record_size; // For values beyond first read buffer keys_idx[reads].r_off = v_off; reads++; } return reads; } size_t RunWrite(vector<in_record_t> &keys_idx, size_t rec_len, char *output_mapped_buffer) { size_t writes = 0; while (writes < rec_len) { memcpy(&output_mapped_buffer[writes * (KEY_SIZE + INDEX_SIZE)], &keys_idx[writes], (KEY_SIZE + INDEX_SIZE)); // TODO: Change this to 4095? writes++; } return writes; } typedef struct { int tid; char *mmapped_buffer; uint64_t read_time; uint64_t sort_time; uint64_t write_time; size_t thr_rec_chunk; size_t total_file_size; } threadargs_t; void *rsw_data(void *args) { threadargs_t t = *(threadargs_t *)args; size_t rec_sorted = 0, rec_read = 0; size_t file_rec_off = t.tid * t.thr_rec_chunk; int run_num = 0; int fd; uint64_t read_time = 0; uint64_t sort_time = 0; uint64_t write_time = 0; // This decides how many read-sort-write steps happen in the for loop. uint64_t job_len_rec = t.thr_rec_chunk; // / 8; // Each thread generates IndexMap and sorts and writes it. while (rec_sorted < t.thr_rec_chunk) { vector<in_record_t> keys_idx; keys_idx.resize(job_len_rec); // 400M records = 40GB 100B records read_time = nano_time(); rec_read = RunRead(file_rec_off, job_len_rec, keys_idx, t.mmapped_buffer); read_time = nano_time() - read_time; // read keys and fill IndexMap // Converting offset to hex addresss // Sort IndexMap by key sort_time = nano_time(); std::sort(keys_idx.begin(), keys_idx.end()); sort_time = nano_time() - sort_time; // write the IndexMap sequentially to disk and once sorted // Create a file and mmap it based on the the thread_id string output_file = file_name + std::to_string(t.tid) + "_" + std::to_string(run_num); run_num++; fd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU); assert(ftruncate(fd, job_len_rec * (KEY_SIZE + INDEX_SIZE)) == 0); char *mmaped_out; mmaped_out = (char *)mmap(NULL, job_len_rec * (KEY_SIZE + INDEX_SIZE), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (mmaped_out == MAP_FAILED) { printf("Failed to mmap write file of size %lu: %s\n", job_len_rec * record_size, strerror(errno)); _exit(0); } write_time = nano_time(); rec_sorted += RunWrite(keys_idx, rec_read, mmaped_out); write_time = nano_time() - write_time; file_rec_off += rec_read; printf("fs: %lu rec_s:%lu t: %d \n", file_rec_off, rec_read, t.tid); close(fd); // keys.clear(); // REVIEW: could be expensive call. ((threadargs_t *)args)->read_time += read_time; ((threadargs_t *)args)->sort_time += sort_time; ((threadargs_t *)args)->write_time += write_time; } return (void *)0; } double to_seconds(uint64_t nano_sec) { return (double)nano_sec / (double)NANOSECONDS_IN_SECOND; } void ExtSort::sort(string file, int num_threads) { // signal(SIGSEGV, handler); uint64_t run_gen_time = nano_time(); int ret; file_name = name; // Get file size size_t total_file_size = get_filesize(file.c_str()); // Open and Mmap the file int fd = open(file.c_str(), O_RDWR | O_DIRECT); if (fd < 0) { printf("Couldn't open %s: %s\n", file.c_str(), strerror(errno)); exit(1); } char *mmapped_buffer = (char *)mmap(NULL, total_file_size, PROT_READ, MAP_SHARED, fd, 0); size_t total_records = 400000000; // Divide the IndexMap by num_threads size_t thr_rec_chunk = total_records / num_threads; // Spwan threads now. pthread_t *threads; threadargs_t *threadargs; threads = (pthread_t *)malloc(num_threads * sizeof(pthread_t)); threadargs = (threadargs_t *)malloc(num_threads * sizeof(threadargs_t)); for (int i = 0; i < num_threads; i++) { threadargs[i].tid = i; threadargs[i].mmapped_buffer = mmapped_buffer; threadargs[i].thr_rec_chunk = thr_rec_chunk; threadargs[i].total_file_size = total_file_size; ret = pthread_create(&threads[i], NULL, rsw_data, &threadargs[i]); if (ret != 0) EXIT_MSG("pthread_create for %dth thread failed: %s\n", i, strerror(errno)); } for (int i = 0; i < num_threads; i++) { ret = pthread_join(threads[i], NULL); if (ret != 0) EXIT_MSG("Thread %d failed in join: %s\n", i, strerror(errno)); } double total_read_t = 0, total_write_t = 0, total_sort_t = 0; for (int i = 0; i < num_threads; ++i) { cout << "============== " << threadargs[i].tid << " ===============" << endl; cout << "read time: " << to_seconds(threadargs[i].read_time) << endl; total_read_t += to_seconds(threadargs[i].read_time); cout << "sort time: " << to_seconds(threadargs[i].sort_time) << endl; total_sort_t += to_seconds(threadargs[i].sort_time); cout << "write time: " << to_seconds(threadargs[i].write_time) << endl; total_write_t += to_seconds(threadargs[i].write_time); } run_gen_time = nano_time() - run_gen_time; cout << "========================================================" << endl; cout << "Run Generation time: " << to_seconds(run_gen_time) << endl; cout << "------ Total read time: " << total_read_t << endl; cout << "------ Total sort time: " << total_sort_t << endl; cout << "------ Total write time: " << total_write_t << endl; free(threads); free(threadargs); // free threadargs }