WiscSort / wiscSort / Alpha / data_manager.cc
data_manager.cc
Raw
#include "data_manager.h"
#include "unistd.h"
#include "fcntl.h"
#include <sys/stat.h>
#include <assert.h>
#include <sys/mman.h>

using std::string;
using std::vector;

// FIXME: move this to util
#ifdef pmdk
#include <libpmem.h>
#endif

#ifdef __cplusplus
#include <stdint.h>
extern "C"
{
    void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s);
}
#endif

// open and initialize files
DataManager::DataManager(vector<string> &file_list)
{
    file_ptr_.resize(file_list.size());
    file_size_.resize(file_list.size());
    size_t rolling_sum = 0;

    for (size_t i = 0; i < file_list.size(); i++)
    {
        int fd = open(file_list[i].c_str(), O_RDWR | O_DIRECT);
        if (fd < 0)
        {
            printf("Couldn't open file %s: %s\n", file_list[i].c_str(), strerror(errno));
            exit(1);
        }
        file_ptr_[i] = fd;

        struct stat st;
        stat(file_list[i].c_str(), &st);
        off_t size = st.st_size;
        file_size_[i] = size;

        file_index_.insert(std::make_pair(rolling_sum, i));
        rolling_sum += size / BLOCK_SIZE;
    }
    file_index_.insert(std::make_pair(rolling_sum, -1));
}

// READ = type = 0, WRITE = type = 1.
int DataManager::MMapFile(size_t file_size, int type, int fd, char *&mapped_buffer)
{
    if (!type) // READ part
    {
        mapped_buffer = (char *)mmap(NULL, file_size, PROT_READ,
                                     MAP_SHARED, fd, 0);
        if (mapped_buffer == MAP_FAILED)
        {
            printf("Failed to mmap read file of size %lu: %s\n",
                   file_size, strerror(errno));
            return -1;
        }
        return 1;
    }
    else // WRITE part
    {
        mapped_buffer = (char *)mmap(NULL, file_size, PROT_WRITE,
                                     MAP_SHARED, fd, 0);
        if (mapped_buffer == MAP_FAILED)
        {
            printf("Failed to mmap write file of size %lu: %s\n",
                   file_size, strerror(errno));
            return -1;
        }
        return 1;
    }
}

// Each block belongs to a paricular file, read from the right file.
// read_buff_B Number of blocks that fit the read buffer.
size_t DataManager::RunRead(size_t block_num, size_t read_buff_count, record_t *read_buffer)
{
    // Find the file the block number belongs to
    auto file_it = file_index_.upper_bound(block_num);
    file_it--;

    // block belongs to no file
    if (file_it->second < 0)
        return 0;
    size_t file_num = file_it->second;
    off_t file_off = block_num - file_it->first;

    // 0 because READ
    if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_))
        return -1;

    size_t next_file_start = file_it->first + (file_size_[file_num] / BLOCK_SIZE);
    size_t read_records = 0;
    // Read until number of records fill the read buffer.
    while (read_records < read_buff_count * REC_PER_BLOCK)
    {
        if (block_num == next_file_start)
        {
            // unmap previous file and map the next file.
            munmap(input_mapped_buffer_, file_size_[file_num]);
            file_num++;
            if (file_num >= file_index_.size() - 1)
                break;
            file_off = 0;
            next_file_start = file_it->first + (file_size_[file_num] / BLOCK_SIZE);

            if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_))
                return -1;
        }
#ifdef avx512
        __memmove_chk_avx512_no_vzeroupper((void *)(read_buffer + read_records),
                                           &input_mapped_buffer_[file_off * BLOCK_SIZE],
                                           READ_SIZE * BLOCK_SIZE);
// NOTE: commenting this for write only nt
// #elif pmdk
//         pmem_memcpy_nodrain((void *)(read_buffer + read_records),
//                             &input_mapped_buffer_[file_off * BLOCK_SIZE],
//                             READ_SIZE * BLOCK_SIZE);
#else
        memcpy((void *)(read_buffer + read_records),
               &input_mapped_buffer_[file_off * BLOCK_SIZE],
               READ_SIZE * BLOCK_SIZE);
#endif
        read_records += READ_SIZE * REC_PER_BLOCK;
        block_num += READ_SIZE;
        file_off += READ_SIZE;
    }
    return read_records;
}

void DataManager::OpenAndMapOutputFile(string output_file, size_t file_size)
{
    int ofd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU);
    if (ofd < 0)
    {
        printf("Couldn't open file %s: %s\n", output_file.c_str(), strerror(errno));
        exit(1);
    }
    assert(ftruncate(ofd, file_size) == 0);
    output_fd_ = ofd;
    if (!MMapFile(file_size, 1, ofd, output_mapped_buffer_))
        exit(1);
    output_count_ = 0;
}

// The responsiblity of this function is to simply write to file from write buffer
// Must write all read/sorted data
// Assuming everythign is aligned
size_t DataManager::RunWrite(vector<in_record_t> keys_idx, size_t read_recs)
{
    size_t pending_recs = 0;
    size_t write_records = 0;
    size_t blk_idx = 0;
    // DO writes until the write buffer is empty. One blocks size at a time.
    while (write_records < read_recs)
    {
        pending_recs = read_recs - write_records;
        if (pending_recs < IDX_REC_PER_BLK)
        {
            // This is the last write which is smaller than WRITE_BLOCK_IDX
            memcpy(&output_mapped_buffer_[blk_idx],
                   &keys_idx[write_records], pending_recs * IDX_RECORD_SIZE);
            write_records += pending_recs;
            blk_idx += pending_recs * IDX_RECORD_SIZE;
            break;
        }
#ifdef avx512
        __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[blk_idx],
                                           &keys_idx[write_records], IDX_BLOCK_SIZE);
#elif pmdk
        pmem_memcpy_nodrain(&output_mapped_buffer_[blk_idx],
                            &keys_idx[write_records], IDX_BLOCK_SIZE);
#else
        memcpy(&output_mapped_buffer_[blk_idx],
               &keys_idx[write_records], IDX_BLOCK_SIZE);
#endif
        blk_idx += IDX_BLOCK_SIZE;
        write_records += IDX_REC_PER_BLK;
    }
    return write_records;
}

// Read records from RUN file to the appropriate buffer offset
size_t DataManager::MergeRead(in_record_t *read_buffer, size_t read_buf_rec_off,
                              char *run_file_map, size_t file_off,
                              size_t read_length)
{
    size_t read_blks = 0;
    while (read_blks < read_length)
    {
        // FIXME: What if the file does not have IDX_BLOCK_SIZE of data anymore
#ifdef avx512
        __memmove_chk_avx512_no_vzeroupper((void *)(read_buffer + read_buf_rec_off),
                                           &run_file_map[file_off],
                                           IDX_BLOCK_SIZE);
// #elif pmdk
//         pmem_memcpy_nodrain((void *)(read_buffer + read_buf_rec_off),
//                             &run_file_map[file_off],
//                             IDX_BLOCK_SIZE);
#else
        memcpy((void *)(read_buffer + read_buf_rec_off),
               &run_file_map[file_off],
               IDX_BLOCK_SIZE);
#endif
        file_off += IDX_BLOCK_SIZE;
        read_blks++;
        read_buf_rec_off += IDX_REC_PER_BLK;
    }
    return read_blks;
}

size_t DataManager::MergeWrite(record_t *write_buffer, size_t write_len_recs)
{
    size_t wrt_recs = 0;
    size_t write_blks = 0;
    while (write_blks < (write_len_recs * RECORD_SIZE) / BLOCK_SIZE)
    {
#ifdef avx512
        __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[output_file_off_],
                                           &write_buffer[wrt_recs], BLOCK_SIZE);
#elif pmdk
        pmem_memcpy_nodrain(&output_mapped_buffer_[output_file_off_],
                            &write_buffer[wrt_recs], BLOCK_SIZE);
#else
        memcpy(&output_mapped_buffer_[output_file_off_],
               &write_buffer[wrt_recs], BLOCK_SIZE);
#endif
        output_file_off_ += BLOCK_SIZE;
        write_blks++;
        wrt_recs += REC_PER_BLOCK;
    }
    return wrt_recs;
}

// First write records in the buffer
// This is not the same as EMS. Cause this should also do reads
// from input_file and not just the RUN files, for the values.
size_t DataManager::MoveRemainingInputToOuputBuff()
{
    return 0;
}

void DataManager::ConvertRoffToHex(uint8_t *bytes, size_t n)
{
    for (int i = 0; i < INDEX_SIZE; ++i)
    {
        bytes[i] = (n >> i * 8) & 0xFF;
    }
}

uint64_t DataManager::ConvertHexToRoff(uint8_t *bytes)
{
    uint64_t tmp = 0;
    for (int i = INDEX_SIZE - 1; i >= 0; --i)
    {
        tmp |= (uint64_t(bytes[i]) << i * 8);
    }
    return tmp;
}