WiscSort / EMS / data_manager.cc
data_manager.cc
Raw
#include "data_manager.h"
#include "unistd.h"
#include "fcntl.h"
#include <sys/stat.h>
#include <assert.h>
#include <sys/mman.h>
#include "utils.h"
#include "config.h"

#ifdef pmdk
#include <libpmem.h>
#endif

#ifdef __cplusplus
#include <stdint.h>
extern "C"
{
    void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s);
}
#endif

using std::string;
using std::vector;

// open and initialize files
DataManager::DataManager(vector<string> &file_list)
{
    file_ptr_.resize(file_list.size());
    file_size_.resize(file_list.size());
    size_t rolling_sum = 0;

    for (size_t i = 0; i < file_list.size(); i++)
    {
        int fd = open(file_list[i].c_str(), O_RDWR | O_DIRECT);
        if (fd < 0)
        {
            printf("Couldn't open file %s: %s\n", file_list[i].c_str(), strerror(errno));
            exit(1);
        }
        file_ptr_[i] = fd;

        struct stat st;
        stat(file_list[i].c_str(), &st);
        off_t size = st.st_size;
        file_size_[i] = size;

        file_index_.insert(std::make_pair(rolling_sum, i));
        rolling_sum += size / conf.block_size;
    }
    file_index_.insert(std::make_pair(rolling_sum, -1));
}

// READ = type = 0, WRITE = type = 1.
int DataManager::MMapFile(size_t file_size, int type, int fd, char *&mapped_buffer)
{
    if (!type) // READ part
    {
        mapped_buffer = (char *)mmap(NULL, file_size, PROT_READ | PROT_WRITE,
                                     MAP_SHARED, fd, 0);
        if (mapped_buffer == MAP_FAILED)
        {
            printf("Failed to mmap read file of size %lu: %s\n",
                   file_size, strerror(errno));
            return -1;
        }
        return 1;
    }
    else // WRITE part
    {
        mapped_buffer = (char *)mmap(NULL, file_size, PROT_WRITE,
                                     MAP_SHARED, fd, 0);
        if (mapped_buffer == MAP_FAILED)
        {
            printf("Failed to mmap write file of size %lu: %s\n",
                   file_size, strerror(errno));
            return -1;
        }
        return 1;
    }
}

// Each block belongs to a paricular file, read from the right file.
// read_buff_B Number of blocks that fit the read buffer.
size_t DataManager::RunRead(size_t block_num, size_t read_buff_count, record_t *read_buffer)
{
    // Find the file the block number belongs to
    auto file_it = file_index_.upper_bound(block_num);
    file_it--;

    // block belongs to no file
    if (file_it->second < 0)
        return 0;
    size_t file_num = file_it->second;
    off_t file_off = block_num - file_it->first;

    // 0 because READ
    if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_))
        return -1;

    size_t next_file_start = file_it->first + (file_size_[file_num] / conf.block_size);
    size_t read_records = 0;
    // Read until number of records fill the read buffer.
    while (read_records < read_buff_count * conf.recs_per_blk)
    {
        if (block_num == next_file_start)
        {
            // unmap previous file and map the next file.
            munmap(input_mapped_buffer_, file_size_[file_num]);
            file_num++;
            if (file_num >= file_index_.size() - 1)
                break;
            file_off = 0;
            next_file_start = file_it->first + (file_size_[file_num] / conf.block_size);

            if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_))
                return -1;
        }
#ifdef avx512
        __memmove_chk_avx512_no_vzeroupper((void *)(read_buffer + read_records),
                                           &input_mapped_buffer_[file_off * conf.block_size],
                                           READ_SIZE * conf.block_size);
// #elif pmdk
//         pmem_memcpy_nodrain((void *)(read_buffer + read_records),
//                             &input_mapped_buffer_[file_off * conf.block_size],
//                             READ_SIZE * conf.block_size);
#else
        memcpy((void *)(read_buffer + read_records),
               &input_mapped_buffer_[file_off * conf.block_size],
               conf.block_size);
#endif
        read_records += conf.recs_per_blk;
        block_num++;
        file_off++;
    }
    return read_records;
}

void DataManager::OpenAndMapOutputFile(string output_file, size_t file_size)
{
    int ofd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU);
    if (ofd < 0)
    {
        printf("Couldn't open file %s: %s\n", output_file.c_str(), strerror(errno));
        exit(1);
    }
    assert(ftruncate(ofd, file_size) == 0);
    output_fd_ = ofd;
    if (!MMapFile(file_size, 1, ofd, output_mapped_buffer_))
        exit(1);
    output_count_ = 0;
}

void DataManager::LoadRecordToOutputBuffer(record_t *write_buffer,
                                           record_t *read_buffer,
                                           vector<in_record_t> &keys_idx,
                                           size_t key_off)
{
    for (size_t rec_idx = 0; rec_idx < conf.write_buff_rec_count; rec_idx++)
        write_buffer[rec_idx] = std::move(read_buffer[keys_idx[key_off + rec_idx].v_index]);
}

// The responsiblity of this function is to simply write to file from write buffer
// Must write all read/sorted data
// Assuming everythign is aligned
size_t DataManager::RunWrite(record_t *write_buffer, off_t output_off)
{
    size_t write_records = 0;
    size_t blk_idx = 0;
    // DO writes until the write buffer is empty. One blocks size at a time.
    while (write_records < conf.write_buff_rec_count)
    {
#ifdef avx512
        __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[output_off + blk_idx],
                                           &write_buffer[write_records], conf.block_size);
#elif pmdk
        pmem_memcpy_nodrain(&output_mapped_buffer_[output_off + blk_idx],
                            &write_buffer[write_records], conf.block_size);
#else
        memcpy(&output_mapped_buffer_[output_off + blk_idx],
               &write_buffer[write_records], conf.block_size);
#endif

#ifdef clflush
        // flush and fence to ensure write is complete
        flush_clflushopt(&output_mapped_buffer_[output_off + blk_idx], conf.block_size);
        _mm_sfence();
#endif
        blk_idx += conf.block_size;
        write_records += conf.recs_per_blk;
    }
    return write_records;
}

// Read records from RUN file to the appropriate buffer offset
size_t DataManager::MergeRead(record_t *read_buffer, size_t read_buf_rec_off,
                              char *run_file_map, size_t file_off,
                              size_t read_length)
{
    size_t read_blks = 0;
    while (read_blks < read_length)
    {
#ifdef avx512
        __memmove_chk_avx512_no_vzeroupper((void *)(read_buffer + read_buf_rec_off),
                                           &run_file_map[file_off],
                                           conf.block_size);
// #elif pmdk
//         pmem_memcpy_nodrain((void *)(read_buffer + read_buf_rec_off),
//                             &run_file_map[file_off],
//                             conf.block_size);
#else
        memcpy((void *)(read_buffer + read_buf_rec_off),
               &run_file_map[file_off],
               conf.block_size);
#endif
        file_off += conf.block_size;
        read_blks++;
        read_buf_rec_off += conf.recs_per_blk;
    }
    return read_blks;
}

size_t DataManager::MergeWrite(record_t *write_buffer, size_t write_len_recs)
{
    size_t wrt_recs = 0;
    size_t write_blks = 0;
    while (write_blks < (write_len_recs * conf.record_size) / conf.block_size)
    {
#ifdef avx512
        __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[output_file_off_],
                                           &write_buffer[wrt_recs], conf.block_size);
#elif pmdk
        pmem_memcpy_nodrain(&output_mapped_buffer_[output_file_off_],
                            &write_buffer[wrt_recs], conf.block_size);
#else
        memcpy(&output_mapped_buffer_[output_file_off_],
               &write_buffer[wrt_recs], conf.block_size);
#endif

#ifdef clflush
        // flush and fence to ensure write is complete
        flush_clflushopt(&output_mapped_buffer_[output_file_off_], conf.block_size);
        _mm_sfence();
#endif
        output_file_off_ += conf.block_size;
        write_blks++;
        wrt_recs += conf.recs_per_blk;
    }
    return wrt_recs;
}

// TODO:
//  First write records in write buffer
//  Next write the corresponding records in read buffer
//  Then read remaining keys to the buffer and write it to file.
size_t DataManager::MoveRemainingInputToOuputBuff()
{
    return 0;
}

// DataManager::~DataManager()
// {
//     for (int fd : file_ptr_)
//         close(fd);

//     // unmap last input file and output file
//     // if(input_mapped_buffer_ != NULL)
//     //     munmap(input_mapped_buffer_, file_size_[file_size_.size() - 1]);
//     // munmap(output_mapped_buffer_, file_size_[file_size_.size() - 1]);
// }