// Multi threaded - sort key and index. #include<pthread.h> #include "extsort_rsw.h" #include <fcntl.h> #include <chrono> #include <unistd.h> #include <algorithm> #include "parameter.h" #include "nano_time.h" #include <iostream> #include <unistd.h> #include <vector> #include <sys/mman.h> #include <sys/sysinfo.h> // Mulit threaded //#include "ips4o/include/ips4o.hpp" //#include <execinfo.h> //#include <signal.h> using std::cout; using std::endl; using std::vector; #define NANOSECONDS_IN_SECOND 1000000000 #define EXIT_MSG(...) \ do { \ printf(__VA_ARGS__); \ _exit(-1); \ } while (0) #ifdef DEBUG #define D(x) (x) #else #define D(x) do{}while(0) #endif #define min(a, b) (((a) < (b)) ? (a) : (b)) string file_name; ExtSort::ExtSort(void){ ID = "/mnt/pmem/" + std::to_string(std::time(nullptr)); mkdir(ID.c_str(), 0777); name = ID + "/runs-"; } size_t get_filesize(const char* filename){ int retval; struct stat st; retval = stat(filename, &st); if(retval) return -1; else return st.st_size; } /** FIXME: remove this later void handler(int sig) { void *array[10]; size_t size; // get void*'s for all entries on the stack size = backtrace(array, 10); // print out all the frames to stderr fprintf(stderr, "Error: signal %d:\n", sig); backtrace_symbols_fd(array, size, STDERR_FILENO); exit(1); } */ typedef struct { int tid; char* mmapped_buffer; record_t* buffer; uint64_t read_time; uint64_t sort_time; uint64_t write_time; uint64_t keys_load_t; uint64_t out_buf_load_t; size_t file_thr_chunk; size_t buf_thr_chunk; size_t total_file_size; } threadargs_t; size_t read(size_t file_off, size_t buff_off, size_t len, record_t* buffer, char* mmapped_buffer, size_t total_file_size){ size_t rec_off = buff_off/RECORD_SIZE; size_t read_data = 0; size_t req_size = 0; while(read_data < len){ req_size = min(BLOCK_SIZE, len-read_data); req_size = min(req_size, total_file_size-file_off); //cout << req_size << " -- " << read_data << " -- " << len << " -- " << len-read_data << endl; // Got to also check file boundary memcpy((void *) (buffer+rec_off), &mmapped_buffer[file_off], req_size); read_data += req_size; rec_off += req_size/RECORD_SIZE; file_off += req_size; if(req_size == 0) break; } return read_data; } void write(char* mmaped_out, record_t* output_buf, size_t len){ size_t req_size = 0; size_t write_data = 0; while(write_data < len){ req_size = min(OUTPUT_BUFFER_SIZE * BLOCK_SIZE, len - write_data); memcpy(&mmaped_out[write_data], &output_buf[write_data/RECORD_SIZE], req_size); write_data += req_size; } } void * rsw_data(void *args){ threadargs_t t = *(threadargs_t*)args; size_t read_size = 0; size_t file_off = t.tid * t.file_thr_chunk; size_t buff_off = t.tid * t.buf_thr_chunk; int run_num = 1; record_t* output_buf = (record_t *)malloc(t.buf_thr_chunk); int fd; vector<in_record_t> keys; //((threadargs_t*)args)->read_time = 0; //((threadargs_t*)args)->sort_time = 0; //((threadargs_t*)args)->write_time = 0; uint64_t read_time = 0; uint64_t sort_time = 0; uint64_t write_time = 0; uint64_t keys_load_t=0; uint64_t out_buf_load_t=0; // Each thread memcpys to it's buf // for until entire file chunk is read. while(read_size < t.file_thr_chunk){ read_time = nano_time(); D(cout << "Starting read - " << t.tid << " read_size: " << read_size << endl); read_size += read(file_off, buff_off, t.buf_thr_chunk, t.buffer, t.mmapped_buffer, t.total_file_size); //file_off += read_size; // read_size expected to be t.buf_thr_chunk. read_time = nano_time() - read_time; // REVIEW: Next iteration initialize smaller buffer per thread // and pass them for sorting rather than populating an array. keys_load_t = nano_time(); keys.resize((read_size/run_num)/RECORD_SIZE); size_t buff_rec_off = buff_off/RECORD_SIZE; for(size_t i = 0; i < (read_size/run_num)/RECORD_SIZE; ++i) keys[i] = in_record_t(t.buffer[buff_rec_off + i].k, i, i); keys_load_t = nano_time() - keys_load_t; sort_time = nano_time(); // REVIEW: replace with qsort. D(cout << "Starting sort " << t.tid << endl); //ips4o::parallel::sort(keys.begin(), keys.end(), std::less<>{}); std::sort(keys.begin(), keys.end()); sort_time = nano_time() - sort_time; D( cout << "keys vector size: " << sizeof(std::vector<in_record_t>) + (sizeof(in_record_t) * keys.size()) << endl ); out_buf_load_t = nano_time(); // Create an output buffer to consolidate key and values. for(size_t i = 0; i < t.buf_thr_chunk/RECORD_SIZE; ++i) output_buf[i] = std::move(t.buffer[keys[i].v_index]); out_buf_load_t = nano_time() -out_buf_load_t; // Create a file and mmap it based on the the thread_id string output_file = file_name + std::to_string(t.tid) + "_" + std::to_string(run_num); run_num++; fd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU); assert(ftruncate(fd, (size_t) t.buf_thr_chunk) == 0); char *mmaped_out; mmaped_out = (char*)mmap(NULL, t.buf_thr_chunk, PROT_WRITE, MAP_SHARED, fd, 0); D(cout << "Starting write " << t.tid << endl); write_time = nano_time(); write(mmaped_out, output_buf, t.buf_thr_chunk); write_time = nano_time() - write_time; munmap(mmaped_out, t.buf_thr_chunk); close(fd); //keys.clear(); // REVIEW: could be expensive call. ((threadargs_t*)args)->read_time += read_time; ((threadargs_t*)args)->sort_time += sort_time; ((threadargs_t*)args)->write_time += write_time; ((threadargs_t*)args)->keys_load_t += keys_load_t; ((threadargs_t*)args)->out_buf_load_t += out_buf_load_t; } delete[] output_buf; return (void*) 0; } double to_seconds(uint64_t nano_sec){ return (double)nano_sec/(double)NANOSECONDS_IN_SECOND; } void ExtSort::sort(string file, int num_threads){ //signal(SIGSEGV, handler); uint64_t run_gen_time = nano_time(); int ret; file_name = name; // Get file size size_t total_file_size = get_filesize(file.c_str()); // Divide the file by num_threads //REVIEW: Make sure threads evenly divide the file. size_t file_thr_chunk = total_file_size/num_threads; // Open and Mmap the file int fd = open(file.c_str(), O_RDWR | O_DIRECT); if(fd < 0) { printf("Couldn't open %s: %s\n", file.c_str(), strerror(errno)); exit(1); } char* mmapped_buffer = (char*)mmap(NULL, total_file_size, PROT_READ, MAP_SHARED, fd, 0); //size_t total_mem_size = (uint64_t) MEM_SIZE * (uint64_t) BLOCK_SIZE; // Allocate buffer. Buffer/num_threads per thread buf size. // Total avaialbe memory size_t total_ram = sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE); // 10 gb for OS and other DS. size_t total_read_mem = (total_ram - TEN_GB_B)/2; // This should be evenly divisible by RECORD_SIZE total_read_mem = total_read_mem + RECORD_SIZE/2; total_read_mem = total_read_mem - (total_read_mem%RECORD_SIZE); //FIXME: Test for 100 GiB //total_read_mem = 10737408000; // ~10 GiB total_read_mem = 10000000000; //total_read_mem = 13421760000; // ~12.5 GiB record_t *buffer = (record_t *) malloc(total_read_mem); size_t buf_thr_chunk = total_read_mem/num_threads; // Spwan threads now. pthread_t *threads; threadargs_t *threadargs; threads = (pthread_t*)malloc(num_threads * sizeof(pthread_t)); threadargs = (threadargs_t*)malloc(num_threads * sizeof(threadargs_t)); for (int i = 0; i < num_threads; i++) { threadargs[i].tid = i; threadargs[i].mmapped_buffer =mmapped_buffer; threadargs[i].buffer = buffer; threadargs[i].file_thr_chunk = file_thr_chunk; threadargs[i].buf_thr_chunk = buf_thr_chunk; threadargs[i].total_file_size = total_file_size; if(i%2) sleep(2); ret = pthread_create(&threads[i], NULL, rsw_data, &threadargs[i]); if (ret!=0) EXIT_MSG("pthread_create for %dth thread failed: %s\n", i, strerror(errno)); } for (int i = 0; i< num_threads; i++){ ret = pthread_join(threads[i], NULL); if (ret !=0) EXIT_MSG("Thread %d failed in join: %s\n", i, strerror(errno)); } run_gen_time = nano_time() - run_gen_time; double total_read_t=0, total_keys_t=0, total_write_t=0, total_out_t=0, total_sort_t=0; for(int i = 0; i< num_threads; ++i){ cout << "============== "<< threadargs[i].tid <<" ===============" << endl; cout << "read time: " << to_seconds(threadargs[i].read_time) << endl; total_read_t += to_seconds(threadargs[i].read_time); cout << "Keys time: " << to_seconds(threadargs[i].keys_load_t) << endl; total_keys_t +=to_seconds(threadargs[i].keys_load_t); cout << "sort time: " << to_seconds(threadargs[i].sort_time) << endl; total_sort_t += to_seconds(threadargs[i].sort_time); cout << "out_buf time: " << to_seconds(threadargs[i].out_buf_load_t) << endl; total_out_t += to_seconds(threadargs[i].out_buf_load_t); cout << "write time: " << to_seconds(threadargs[i].write_time) << endl; total_write_t += to_seconds(threadargs[i].write_time); cout << "Thread cycle time: " << to_seconds(threadargs[i].write_time) + to_seconds(threadargs[i].sort_time) + to_seconds(threadargs[i].read_time) + to_seconds(threadargs[i].keys_load_t) + to_seconds(threadargs[i].out_buf_load_t) << endl; } cout << "========================================================" << endl; cout << "Run Generation time: " << to_seconds(run_gen_time) << endl; cout << "------ Total read time: " << total_read_t << endl; cout << "------ Total keys time: " << total_keys_t << endl; cout << "------ Total sort time: " << total_sort_t << endl; cout << "------ Total out_buf time: " << total_out_t << endl; cout << "------ Total write time: " << total_write_t << endl; free(threads); free(threadargs); //free threadargs }