WiscSort / wiscSort / Epsilon / data_manager.cc
data_manager.cc
Raw
#include "data_manager.h"
#include "unistd.h"
#include "fcntl.h"
#include <sys/stat.h>
#include <assert.h>
#include <sys/mman.h>
#include <x86intrin.h>

using std::string;
using std::vector;

#ifdef pmdk
#include <libpmem.h>
#endif

#ifdef __cplusplus
#include <stdint.h>
extern "C"
{
    void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s);
}
#endif

// open and initialize files
DataManager::DataManager(vector<string> &file_list, Timer *timer_cpy) : pool(std::thread::hardware_concurrency())
{
    timer = timer_cpy;
    file_ptr_.resize(file_list.size());
    file_size_.resize(file_list.size());
    size_t rolling_sum = 0;

    for (size_t i = 0; i < file_list.size(); i++)
    {
        int fd = open(file_list[i].c_str(), O_RDWR | O_DIRECT);
        if (fd < 0)
        {
            printf("Couldn't open file %s: %s\n", file_list[i].c_str(), strerror(errno));
            exit(1);
        }
        file_ptr_[i] = fd;

        struct stat st;
        stat(file_list[i].c_str(), &st);
        off_t size = st.st_size;
        file_size_[i] = size;
        // This will hold the number of records in the file and file num
        file_index_.insert(std::make_pair(rolling_sum, i));
        rolling_sum += size / RECORD_SIZE;
    }
    file_index_.insert(std::make_pair(rolling_sum, -1));
}

// READ = type = 0, WRITE = type = 1.
int DataManager::MMapFile(size_t file_size, int type, int fd, char *&mapped_buffer)
{
    if (!type) // READ part
    {
        mapped_buffer = (char *)mmap(NULL, file_size, PROT_READ,
                                     MAP_SHARED, fd, 0);
        if (mapped_buffer == MAP_FAILED)
        {
            printf("Failed to mmap read file of size %lu: %s\n",
                   file_size, strerror(errno));
            return -1;
        }
        return 1;
    }
    else // WRITE part
    {
        mapped_buffer = (char *)mmap(NULL, file_size, PROT_WRITE,
                                     MAP_SHARED, fd, 0);
        if (mapped_buffer == MAP_FAILED)
        {
            printf("Failed to mmap write file of size %lu: %s\n",
                   file_size, strerror(errno));
            return -1;
        }
        return 1;
    }
}

// bandwidth
typedef struct
{
    int tid;
    uint64_t start_time;
    uint64_t end_time;
} threadargs_t;

uint64_t nano_time(void)
{
    struct timespec ts;
    if (clock_gettime(CLOCK_REALTIME, &ts) == 0)
        return ts.tv_sec * NANOSECONDS_IN_SECOND + ts.tv_nsec;
}

// Finds the min start time and max end time for given set of times.
std::tuple<uint64_t, uint64_t> time_min_max(threadargs_t *threadargs, int threads)
{
    uint64_t min_start_time = threadargs[0].start_time;
    uint64_t max_end_time = 0;
    for (int i = 0; i < threads; i++)
    {
        min_start_time = (threadargs[i].start_time < min_start_time) ? threadargs[i].start_time : min_start_time;
        max_end_time = (threadargs[i].end_time > max_end_time) ? threadargs[i].end_time : max_end_time;
    }
    return std::make_tuple(min_start_time, max_end_time);
}

// Each block belongs to a paricular file, read from the right file.
// read_buff_B Number of blocks that fit the read buffer.
size_t DataManager::RunRead(size_t rec_num, size_t read_arr_count, std::vector<in_record_t> &keys_idx)
{
    // Find the file the block number belongs to
    auto file_it = file_index_.upper_bound(rec_num);
    file_it--;

    // rec belongs to no file
    if (file_it->second < 0)
        return 0;
    size_t file_num = file_it->second;
    off_t file_off = rec_num - file_it->first;

    // 0 because READ
    if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_))
        return -1;

    vector<std::future<size_t>> results;
    size_t thrd_chunk_rec = read_arr_count / conf.read_thrds;
#ifdef bandwidth
    threadargs_t *threadargs;
    threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
    // clang-format off
    for (int thrd = 0; thrd < conf.read_thrds; thrd++)
    {
        results.emplace_back(
            pool.enqueue([=, &keys_idx]()->size_t
            {
                size_t reads = 0;
                size_t read_keys = thrd_chunk_rec * thrd;
                size_t v_off = 0;
                size_t thrd_file_off = file_off;
#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                while (reads < thrd_chunk_rec)
                {
                    memcpy((void *)(&keys_idx[read_keys]),
                        &input_mapped_buffer_[thrd_file_off + (thrd_chunk_rec * conf.record_size * thrd)],
                        KEY_SIZE);
                    v_off = (read_keys * conf.record_size) + // For each record
                            rec_num * conf.record_size;      // For values beyond first read buffer
                    keys_idx[read_keys].r_off = v_off;
                    //keys_idx[read_keys].v_index = v_off;
                    read_keys++;
                    reads++;
                    thrd_file_off += conf.record_size;
                }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
                return reads;           
            })
        );
    }
    // clang-format on
    size_t total_read = 0;
    for (auto &&result : results)
        total_read += result.get();

#ifdef bandwidth
    timer->end("checkpoints");
    uint64_t min_start_time, max_end_time;
    std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds);
    printf("%f,READ,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), read_arr_count * KEY_SIZE);
    fflush(stdout);
    free(threadargs);
    timer->start("checkpoints");
#endif
    return total_read;
}

// PMSort loading step
size_t DataManager::RunReadPMSort(size_t rec_num, size_t read_arr_count, std::vector<in_record_t> &keys_idx)
{
    // Find the file the block number belongs to
    auto file_it = file_index_.upper_bound(rec_num);
    file_it--;

    // rec belongs to no file
    if (file_it->second < 0)
        return 0;
    size_t file_num = file_it->second;
    off_t file_off = rec_num - file_it->first;

    // 0 because READ
    if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_))
        return -1;

    vector<std::future<size_t>> results;
    size_t thrd_chunk_size = (read_arr_count * conf.record_size) / conf.read_thrds;
#ifdef bandwidth
    threadargs_t *threadargs;
    threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
    // clang-format off
    for (int thrd = 0; thrd < conf.read_thrds; thrd++)
    {
        results.emplace_back(
            pool.enqueue([=, &keys_idx]()->size_t
            {
                size_t read_size = 0;
                size_t read_keys = (thrd_chunk_size/conf.record_size) * thrd;
                size_t v_off = 0;
                size_t thrd_file_off = file_off;
#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                char *tmp_buf = (char *) malloc(4000);
                while (read_size < thrd_chunk_size)
                {
                    // memcpy((void *)(&keys_idx[read_keys]),
                    //     &input_mapped_buffer_[thrd_file_off + (thrd_chunk_rec * conf.record_size * thrd)],
                    //     KEY_SIZE);
                    memcpy(tmp_buf, &input_mapped_buffer_[thrd_file_off + (thrd_chunk_size * thrd)],
                        4000);
                    for(int tmp_k = 0; tmp_k < 4000; tmp_k+=conf.record_size)
                    {
                        memcpy((void *) (&keys_idx[read_keys]), &tmp_buf[tmp_k], KEY_SIZE);
                        v_off = (read_keys * conf.record_size) + // For each record
                                rec_num * conf.record_size;      // For values beyond first read buffer
                        keys_idx[read_keys].r_off = v_off;
                        // Lets convert address to hex later for 
                        // now just append v_off to an array.
                        // tmp_off.push_back(v_off);
                        read_keys++;
                    }
                    read_size += 4000;
                    thrd_file_off += 4000;
                }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
    /* This is for seperating reads and offset to hex convertion
                // reset read keys and reads
                read_keys = thrd_chunk_rec * thrd;
                reads = 0;
                // Now convert offset to hex
                while(reads < thrd_chunk_rec)
                {
                    keys_idx[read_keys].r_off = tmp_off[reads];
                    read_keys++;
                    reads++;
                }
        */
                return read_size/conf.record_size;         
            })
        );
    }
    // clang-format on
    size_t total_read = 0;
    for (auto &&result : results)
        total_read += result.get();

#ifdef bandwidth
    timer->end("checkpoints");
    uint64_t min_start_time, max_end_time;
    std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds);
    printf("%f,READ,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), read_arr_count * KEY_SIZE);
    free(threadargs);
    timer->start("checkpoints");
#endif

    return total_read;
}

void DataManager::OpenAndMapOutputFile(string output_file, size_t file_size)
{
    int ofd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU);
    if (ofd < 0)
    {
        printf("Couldn't open file %s: %s\n", output_file.c_str(), strerror(errno));
        exit(1);
    }
    assert(ftruncate(ofd, file_size) == 0);
    output_fd_ = ofd;
    if (!MMapFile(file_size, 1, ofd, output_mapped_buffer_))
        exit(1);
    output_count_ = 0;
}

// The responsiblity of this function is to simply write to file from write buffer
// Must write all read/sorted data
// Assuming everythign is aligned
size_t DataManager::RunWrite(vector<in_record_t> keys_idx, size_t read_recs)
{
    vector<std::future<size_t>> results;
    size_t thrd_chunk_rec = read_recs / conf.write_thrds;

#ifdef bandwidth
    threadargs_t *threadargs;
    threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
    // clang-format off
    for (int thrd = 0; thrd < conf.write_thrds; thrd++)
    {
        results.emplace_back(
            pool.enqueue([=, &keys_idx]
            {
                size_t pending_recs = 0;
                size_t write_records = thrd_chunk_rec * thrd;
                size_t writes = 0;
                size_t blk_idx = thrd_chunk_rec * conf.idx_record_size * thrd;
#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                while (writes < thrd_chunk_rec)
                {
                    pending_recs = thrd_chunk_rec - writes;
                    if (pending_recs < conf.idx_recs_per_blk)
                    {
                        // This is the last write which is smaller than idx_recs_per_blk
                        memcpy(&output_mapped_buffer_[blk_idx],
                            &keys_idx[write_records], pending_recs * conf.idx_record_size);
                        write_records += pending_recs;
                        blk_idx += pending_recs * conf.idx_record_size;
                        writes += pending_recs;
                        break;
                    }
#ifdef avx512
                    __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[blk_idx],
                        &keys_idx[write_records], conf.block_size);
#elif pmdk
                    pmem_memcpy_nodrain(&output_mapped_buffer_[blk_idx],
                        &keys_idx[write_records], conf.block_size);
#else
                    memcpy(&output_mapped_buffer_[blk_idx],
                        &keys_idx[write_records], conf.block_size);
#endif                        
                    blk_idx += conf.block_size;
                    write_records += conf.idx_recs_per_blk;
                    writes += conf.idx_recs_per_blk;
                }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
                return writes;
            })
        );
    }
    // clang-format on
    size_t total_write = 0;
    for (auto &&result : results)
        total_write += result.get();
#ifdef bandwidth
    timer->end("checkpoints");
    uint64_t min_start_time, max_end_time;
    std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.write_thrds);
    printf("%f,WRITE,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), read_recs * conf.idx_record_size);
    fflush(stdout);
    free(threadargs);
    timer->start("checkpoints");
#endif
    return total_write;
}

// Read records from RUN file to the appropriate buffer offset
size_t DataManager::MergeRead(in_record_t *read_buffer, size_t read_buf_rec_off,
                              char *run_file_map, size_t file_off,
                              size_t read_length_blk)
{
    vector<std::future<size_t>> results;
    // FIXME: potential bug since number of blocks don't evenly divide the number of threads.
    // int read_thrds = conf.read_thrds - 5; // This is for 100 GB
    // size_t thrd_chunk_blk = read_length_blk / read_thrds;
    size_t thrd_chunk_blk = read_length_blk / conf.read_thrds;
#ifdef bandwidth
    threadargs_t *threadargs;
    threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
    // clang-format off
    for (int thrd = 0; thrd < conf.read_thrds; thrd++)
    {
        results.emplace_back(
            pool.enqueue([=, &read_buffer, &run_file_map]()->size_t
            {
                size_t read_recs = 0;
                size_t pending_recs_sz = 0;
                size_t thrd_file_off = file_off + thrd_chunk_blk * conf.block_size * thrd;
                size_t thrd_read_buf_rec_off = read_buf_rec_off + thrd_chunk_blk * conf.idx_recs_per_blk * thrd;
#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                while (read_recs < thrd_chunk_blk * conf.idx_recs_per_blk)
                {
                    pending_recs_sz = (thrd_chunk_blk * conf.block_size - read_recs * conf.idx_record_size);
                    if(conf.block_size > pending_recs_sz)
                    {
                        memcpy((void *)(read_buffer + thrd_read_buf_rec_off),
                                &run_file_map[thrd_file_off],
                                pending_recs_sz);
                        read_recs += pending_recs_sz/conf.idx_record_size;
                        break;
                    }
                    // FIXME: What if the file does not have conf.block_size of data anymore
#ifdef avx512
                    __memmove_chk_avx512_no_vzeroupper((void *)(read_buffer + thrd_read_buf_rec_off),
                                                    &run_file_map[thrd_file_off],
                                                    conf.block_size);
// #elif pmdk
//                   pmem_memcpy_nodrain((void *)(read_buffer + thrd_read_buf_rec_off),
//                                       &run_file_map[thrd_file_off],
//                                       conf.block_size);
#else
                    // printf("[%d] thrd_read_buf_rec_off: %lu, thrd_file_off: %lu \n", thrd, thrd_read_buf_rec_off, thrd_file_off);
                    // printf("[%d] read: %x\n", thrd, read_buffer[thrd_read_buf_rec_off + conf.idx_recs_per_blk].k.key);
                    // printf("[%d] file: %x\n", thrd, run_file_map[thrd_file_off+conf.block_size]);
                    memcpy((void *)(read_buffer + thrd_read_buf_rec_off),
                        &run_file_map[thrd_file_off],
                        conf.block_size);
#endif
                    thrd_file_off += conf.block_size;
                    read_recs += conf.idx_recs_per_blk;
                    thrd_read_buf_rec_off += conf.idx_recs_per_blk;
                }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
                return read_recs;
            })
        );
    }
    // clang-format on
    size_t total_read = 0;
    for (auto &&result : results)
        total_read += result.get();

#ifdef bandwidth
    timer->end("checkpoints");
    uint64_t min_start_time, max_end_time;
    std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds);
    printf("%f,READ,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), read_length_blk * conf.block_size);
    fflush(stdout);
    free(threadargs);
    timer->start("checkpoints");
#endif
    return total_read;
}

size_t DataManager::MergeWrite(record_t *write_buffer, size_t write_len_recs)
{
    vector<std::future<size_t>> results;
    size_t thrd_chunk_recs = write_len_recs / conf.write_thrds;
#ifdef bandwidth
    threadargs_t *threadargs;
    threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
    // clang-format off
    for (int thrd = 0; thrd < conf.write_thrds; thrd++)
    {
        results.emplace_back(
            pool.enqueue([=]
            {
                size_t wrt_recs = thrd_chunk_recs * thrd;
                size_t writes = 0;
                size_t ofile_off = output_file_off_ + thrd_chunk_recs * conf.record_size * thrd;
#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                while (writes < thrd_chunk_recs)
                {
#ifdef avx512
                    __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[ofile_off],
                                                    &write_buffer[wrt_recs], BLOCK_SIZE);
#elif pmdk
                    pmem_memcpy_nodrain(&output_mapped_buffer_[ofile_off],
                                        &write_buffer[wrt_recs], BLOCK_SIZE);
#else
                    memcpy(&output_mapped_buffer_[ofile_off],
                        &write_buffer[wrt_recs], BLOCK_SIZE);
#endif
                    ofile_off += BLOCK_SIZE;
                    writes += BLOCK_SIZE/conf.record_size;
                    wrt_recs += BLOCK_SIZE/conf.record_size;
                }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
                return writes;                
            })
        );
    }
    // clang-format on
    size_t total_write = 0;
    for (auto &&result : results)
        total_write += result.get();

#ifdef bandwidth
    timer->end("checkpoints");
    uint64_t min_start_time, max_end_time;
    std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.write_thrds);
    printf("%f,WRITE,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), write_len_recs * conf.record_size);
    fflush(stdout);
    free(threadargs);
    timer->start("checkpoints");
#endif
    // Update output file offset
    output_file_off_ += conf.write_buffer_size;
    return total_write;
}

size_t DataManager::MergeRandomReadWrite(vector<read_offs> &offset_vec)
{
    vector<std::future<size_t>> results;
    size_t thread_chunk_rec = (conf.write_buffer_size / conf.record_size) / conf.read_thrds;
#ifdef bandwidth
    threadargs_t *threadargs;
    threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
    // clang-format off
    for(int thrd=0; thrd < conf.read_thrds; thrd++)
    {
        results.emplace_back(
            pool.enqueue([=, &offset_vec]()
            {
	    	    size_t read_idx = 0;
                size_t reads = thread_chunk_rec * thrd;
#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                while(read_idx < thread_chunk_rec)
                {
#ifdef avx512
                    __memmove_chk_avx512_no_vzeroupper(&write_buffer[offset_vec[reads].output_off],
                        &input_mapped_buffer_[offset_vec[reads].input_off],
                        conf.record_size);
// #elif pmdk
//                     pmem_memcpy_nodrain(&write_buffer[offset_vec[reads].output_buff_off],
//                         &input_mapped_buffer_[offset_vec[reads].input_off],
//                         conf.record_size);
#else
                    memcpy(&output_mapped_buffer_[offset_vec[reads].output_off],
                        &input_mapped_buffer_[offset_vec[reads].input_off],
                        conf.record_size);
#endif
                    read_idx++;
		            reads++;
                }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
                return read_idx;
            })
        );
    }
    // clang-format on
    size_t total_writes = 0;
    for (auto &&result : results)
        total_writes += result.get();
#ifdef bandwidth
    timer->end("checkpoints");
    uint64_t min_start_time, max_end_time;
    std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds);
    printf("%f,READ-WRITE,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), conf.write_buffer_size);
    fflush(stdout);
    free(threadargs);
    timer->start("checkpoints");
#endif
    return total_writes;
}

void DataManager::MergeRandomRead(record_t *write_buffer, vector<read_offs> &offset_vec)
{
    vector<std::future<void>> results;
    size_t thread_chunk_rec = (conf.write_buffer_size / conf.record_size) / conf.read_thrds;
#ifdef bandwidth
    threadargs_t *threadargs;
    threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
    // clang-format off
    for(int thrd=0; thrd < conf.read_thrds; thrd++)
    {
        results.emplace_back(
            pool.enqueue([=, &offset_vec]()
            {
	    	    size_t read_idx = 0;
                size_t reads = thread_chunk_rec * thrd;
#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                while(read_idx < thread_chunk_rec)
                {
#ifdef avx512
                    // __memmove_chk_avx512_no_vzeroupper(&write_buffer[offset_vec[reads].output_buff_off],
                    //     &input_mapped_buffer_[offset_vec[reads].input_off],
                    //     conf.record_size);
// #elif pmdk
//                     pmem_memcpy_nodrain(&write_buffer[offset_vec[reads].output_buff_off],
//                         &input_mapped_buffer_[offset_vec[reads].input_off],
//                         conf.record_size);
#else
                    // memcpy(&write_buffer[offset_vec[reads].output_buff_off],
                    //     &input_mapped_buffer_[offset_vec[reads].input_off],
                    //     conf.record_size);
#endif
                    read_idx++;
		            reads++;
                }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
            })
        );
    }
    // clang-format on
    for (auto &&result : results)
        result.get();
#ifdef bandwidth
    timer->end("checkpoints");
    uint64_t min_start_time, max_end_time;
    std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds);
    printf("%f,READ,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), conf.write_buffer_size);
    fflush(stdout);
    free(threadargs);
    timer->start("checkpoints");
#endif
}

void DataManager::ConvertRoffToHex(uint8_t *bytes, size_t n)
{
    for (int i = 0; i < INDEX_SIZE; ++i)
    {
        bytes[i] = (n >> i * 8) & 0xFF;
    }
}

uint64_t DataManager::ConvertHexToRoff(uint8_t *bytes)
{
    uint64_t tmp = 0;
    for (int i = INDEX_SIZE - 1; i >= 0; --i)
    {
        tmp |= (uint64_t(bytes[i]) << i * 8);
    }
    return tmp;
}