WiscSort / wiscSort / Gamma / data_manager.cc
data_manager.cc
Raw
#include "data_manager.h"
#include "unistd.h"
#include "fcntl.h"
#include <sys/stat.h>
#include <assert.h>
#include <sys/mman.h>
#include <x86intrin.h>

#define min(a, b) (((a) < (b)) ? (a) : (b))

using std::string;
using std::vector;

#ifdef pmdk
#include <libpmem.h>
#endif

#ifdef __cplusplus
#include <stdint.h>
extern "C"
{
    void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s);
}
#endif

// open and initialize files
DataManager::DataManager(vector<string> &file_list, Timer *timer_cpy) : pool(std::thread::hardware_concurrency())
{
    timer = timer_cpy;
    file_ptr_.resize(file_list.size());
    file_size_.resize(file_list.size());
    size_t rolling_sum = 0;

    for (size_t i = 0; i < file_list.size(); i++)
    {
        int fd = open(file_list[i].c_str(), O_RDWR | O_DIRECT);
        if (fd < 0)
        {
            printf("Couldn't open file %s: %s\n", file_list[i].c_str(), strerror(errno));
            exit(1);
        }
        file_ptr_[i] = fd;

        struct stat st;
        stat(file_list[i].c_str(), &st);
        off_t size = st.st_size;
        file_size_[i] = size;
        // This will hold the number of records in the file and file num
        file_index_.insert(std::make_pair(rolling_sum, i));
        rolling_sum += size / RECORD_SIZE;
    }
    file_index_.insert(std::make_pair(rolling_sum, -1));
}

// READ = type = 0, WRITE = type = 1.
int DataManager::MMapFile(size_t file_size, int type, int fd, char *&mapped_buffer)
{
    if (!type) // READ part
    {
        mapped_buffer = (char *)mmap(NULL, file_size, PROT_READ,
                                     MAP_SHARED, fd, 0);
        if (mapped_buffer == MAP_FAILED)
        {
            printf("Failed to mmap read file of size %lu: %s\n",
                   file_size, strerror(errno));
            return -1;
        }
        return 1;
    }
    else // WRITE part
    {
        mapped_buffer = (char *)mmap(NULL, file_size, PROT_READ | PROT_WRITE,
                                     MAP_SHARED, fd, 0);
        if (mapped_buffer == MAP_FAILED)
        {
            printf("Failed to mmap write file of size %lu: %s\n",
                   file_size, strerror(errno));
            return -1;
        }
        return 1;
    }
}

// bandwdith
typedef struct
{
    int tid;
    uint64_t start_time;
    uint64_t end_time;
} threadargs_t;

uint64_t nano_time(void)
{
    struct timespec ts;
    if (clock_gettime(CLOCK_REALTIME, &ts) == 0)
        return ts.tv_sec * NANOSECONDS_IN_SECOND + ts.tv_nsec;
}

// Finds the min start time and max end time for given set of times.
std::tuple<uint64_t, uint64_t> time_min_max(threadargs_t *threadargs, int threads)
{
    uint64_t min_start_time = threadargs[0].start_time;
    uint64_t max_end_time = 0;
    for (int i = 0; i < threads; i++)
    {
        min_start_time = (threadargs[i].start_time < min_start_time) ? threadargs[i].start_time : min_start_time;
        max_end_time = (threadargs[i].end_time > max_end_time) ? threadargs[i].end_time : max_end_time;
    }
    return std::make_tuple(min_start_time, max_end_time);
}

// Each block belongs to a paricular file, read from the right file.
// read_buff_B Number of blocks that fit the read buffer.
size_t DataManager::RunRead(size_t rec_num, size_t read_arr_count, std::vector<in_record_t> &keys_idx)
{
    // Find the file the block number belongs to
    auto file_it = file_index_.upper_bound(rec_num);
    file_it--;

    // rec belongs to no file
    if (file_it->second < 0)
        return 0;
    size_t file_num = file_it->second;
    off_t file_off = rec_num - file_it->first;

    // 0 because READ
    if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_))
        return -1;

    vector<std::future<size_t>> results;
    size_t thrd_chunk_rec = read_arr_count / conf.read_thrds;
#ifdef bandwidth
    threadargs_t *threadargs;
    threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
    // clang-format off
    for (int thrd = 0; thrd < conf.read_thrds; thrd++)
    {
        results.emplace_back(
            pool.enqueue([=, &keys_idx]()->size_t
            {
                size_t reads = 0;
                size_t read_keys = thrd_chunk_rec * thrd;
                size_t v_off = 0;
                size_t thrd_file_off = file_off;
#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                while (reads < thrd_chunk_rec)
                {
                    memcpy((void *)(&keys_idx[read_keys]),
                        &input_mapped_buffer_[thrd_file_off + (thrd_chunk_rec * conf.record_size * thrd)],
                        KEY_SIZE);
                    v_off = (read_keys * conf.record_size) + // For each record
                            rec_num * conf.record_size;      // For values beyond first read buffer
                    keys_idx[read_keys].r_off = v_off;
                    // Lets convert address to hex later for 
                    // now just append v_off to an array.
                    // tmp_off.push_back(v_off);
                    read_keys++;
                    reads++;
                    thrd_file_off += conf.record_size;
                }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
    /* This is for seperating reads and offset to hex convertion
                // reset read keys and reads
                read_keys = thrd_chunk_rec * thrd;
                reads = 0;
                // Now convert offset to hex
                while(reads < thrd_chunk_rec)
                {
                    keys_idx[read_keys].r_off = tmp_off[reads];
                    read_keys++;
                    reads++;
                }
        */
                return reads;         
            })
        );
    }
    // clang-format on
    size_t total_read = 0;
    for (auto &&result : results)
        total_read += result.get();

#ifdef bandwidth
    timer->end("checkpoints");
    uint64_t min_start_time, max_end_time;
    std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds);
    printf("%f,READ,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), read_arr_count * KEY_SIZE);
    free(threadargs);
    timer->start("checkpoints");
#endif

    return total_read;
}

// PMSort loading step
size_t DataManager::RunReadPMSort(size_t rec_num, size_t read_arr_count, std::vector<in_record_t> &keys_idx)
{
    // Find the file the block number belongs to
    auto file_it = file_index_.upper_bound(rec_num);
    file_it--;

    // rec belongs to no file
    if (file_it->second < 0)
        return 0;
    size_t file_num = file_it->second;
    off_t file_off = rec_num - file_it->first;

    // 0 because READ
    if (!MMapFile(file_size_[file_num], 0, file_ptr_[file_num], input_mapped_buffer_))
        return -1;

    vector<std::future<size_t>> results;
    size_t thrd_chunk_size = (read_arr_count * conf.record_size) / conf.read_thrds;
#ifdef bandwidth
    threadargs_t *threadargs;
    threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
    // clang-format off
    for (int thrd = 0; thrd < conf.read_thrds; thrd++)
    {
        results.emplace_back(
            pool.enqueue([=, &keys_idx]()->size_t
            {
                size_t read_size = 0;
                size_t read_keys = (thrd_chunk_size/conf.record_size) * thrd;
                size_t v_off = 0;
                size_t thrd_file_off = file_off;
#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                char *tmp_buf = (char *) malloc(4000);
                while (read_size < thrd_chunk_size)
                {
                    // memcpy((void *)(&keys_idx[read_keys]),
                    //     &input_mapped_buffer_[thrd_file_off + (thrd_chunk_rec * conf.record_size * thrd)],
                    //     KEY_SIZE);
                    memcpy(tmp_buf, &input_mapped_buffer_[thrd_file_off + (thrd_chunk_size * thrd)],
                        4000);
                    for(int tmp_k = 0; tmp_k < 4000; tmp_k+=conf.record_size)
                    {
                        memcpy((void *) (&keys_idx[read_keys]), &tmp_buf[tmp_k], KEY_SIZE);
                        v_off = (read_keys * conf.record_size) + // For each record
                                rec_num * conf.record_size;      // For values beyond first read buffer
                        keys_idx[read_keys].r_off = v_off;
                        // Lets convert address to hex later for 
                        // now just append v_off to an array.
                        // tmp_off.push_back(v_off);
                        read_keys++;
                    }
                    read_size += 4000;
                    thrd_file_off += 4000;
                }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
    /* This is for seperating reads and offset to hex convertion
                // reset read keys and reads
                read_keys = thrd_chunk_rec * thrd;
                reads = 0;
                // Now convert offset to hex
                while(reads < thrd_chunk_rec)
                {
                    keys_idx[read_keys].r_off = tmp_off[reads];
                    read_keys++;
                    reads++;
                }
        */
                return read_size/conf.record_size;         
            })
        );
    }
    // clang-format on
    size_t total_read = 0;
    for (auto &&result : results)
        total_read += result.get();

#ifdef bandwidth
    timer->end("checkpoints");
    uint64_t min_start_time, max_end_time;
    std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds);
    printf("%f,READ,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), read_arr_count * KEY_SIZE);
    free(threadargs);
    timer->start("checkpoints");
#endif

    return total_read;
}

void DataManager::OpenAndMapOutputFile(string output_file, size_t file_size)
{
    int ofd = open(output_file.c_str(), O_RDWR | O_CREAT | O_DIRECT, S_IRWXU);
    if (ofd < 0)
    {
        printf("Couldn't open file %s: %s\n", output_file.c_str(), strerror(errno));
        exit(1);
    }
    assert(ftruncate(ofd, file_size) == 0);
    output_fd_ = ofd;
    if (!MMapFile(file_size, 1, ofd, output_mapped_buffer_))
        exit(1);
    output_count_ = 0;
}

/**
 * Version 1
 * - Each thread reads offset from file and writes to the output file.
 * - This could be bad because of uncrontolled number of writes give poor bandwidth.
 */
size_t DataManager::RunWrite(vector<in_record_t> &keys_idx, size_t read_recs)
{
    vector<std::future<size_t>> results;
    size_t thrd_chunk_rec = read_recs / conf.read_thrds;
#ifdef bandwidth
    threadargs_t *threadargs;
    threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
    // clang-format off
    for (int thrd = 0; thrd < conf.read_thrds; thrd++)
    {
        results.emplace_back(
            pool.enqueue([=, &keys_idx]()->size_t
            {
                size_t write_records = thrd_chunk_rec * thrd;
                size_t output_off = thrd_chunk_rec * thrd * conf.record_size;
                size_t input_off = 0;
                size_t writes = 0;

#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                while (writes < thrd_chunk_rec)
                {
                    input_off = ConvertHexToRoff(keys_idx[write_records].r_off.val);
                    // input_off = keys_idx[write_records].v_index;
#ifdef avx512
                    __memmove_chk_avx512_no_vzeroupper(&output_mapped_buffer_[output_off],
                        &input_mapped_buffer_[input_off], conf.record_size);
#elif pmdk
                    pmem_memcpy_nodrain(&output_mapped_buffer_[output_off],
                        &input_mapped_buffer_[input_off], conf.record_size);
#else
                    memcpy(&output_mapped_buffer_[output_off],
                        &input_mapped_buffer_[input_off], conf.record_size);
#endif                        
                    output_off += conf.record_size;
                    write_records++;
                    writes++;
                }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
                return writes;
            })
        );
    }
    // clang-format on
    size_t total_write = 0;
    for (auto &&result : results)
        total_write += result.get();

#ifdef bandwidth
    timer->end("checkpoints");
    uint64_t min_start_time, max_end_time;
    std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds);
    printf("%f,READ-WRITE,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), read_recs * conf.record_size);
    free(threadargs);
    timer->start("checkpoints");
#endif
    return total_write;
}

/**
 * Version 2
 * - Each thread reads offset from file and writes to the output buffer.
 * - The output buffer once full is then written to pmem in controlled fashion.
 */
size_t DataManager::RunWrite2(vector<in_record_t> &keys_idx, size_t read_recs)
{
    size_t total_writes = 0;
    char *output_buffer = (char *)malloc(conf.write_buffer_size);
    size_t cycle_off = 0;
    size_t out_file_off = 0;

    ////////////// DEBUG:
    // char *test = (char *)malloc(4000);

    while (total_writes < read_recs)
    {
        // first read to memory
        vector<std::future<void>> results;
        size_t thrd_chunk_rec = (conf.write_buffer_size / conf.record_size) / conf.read_thrds;

#ifdef bandwidth
        threadargs_t *threadargs;
        threadargs = (threadargs_t *)malloc(conf.read_thrds * sizeof(threadargs_t));
#endif
        timer->start("RecRead");
        // clang-format off
        for (int thrd = 0; thrd < conf.read_thrds; thrd++)
        {
            results.emplace_back(
                pool.enqueue([=, &keys_idx]
                {
                    size_t write_records = thrd_chunk_rec * thrd + cycle_off;
                    size_t output_off = thrd_chunk_rec * thrd * conf.record_size;
                    size_t input_off = 0;
                    size_t writes = 0;

#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                    while (writes < thrd_chunk_rec)
                    {
                        input_off = ConvertHexToRoff(keys_idx[write_records].r_off.val);
                        // input_off = keys_idx[write_records].v_index;
#ifdef avx512
                        __memmove_chk_avx512_no_vzeroupper(&output_buffer[output_off],
                                &input_mapped_buffer_[input_off], conf.record_size);
// #elif pmdk
//                      pmem_memcpy_nodrain(&output_buffer[output_off],
//                              &input_mapped_buffer_[input_off], conf.record_size);
#else                 
                        memcpy(&output_buffer[output_off],
                           &input_mapped_buffer_[input_off], conf.record_size);
                        /////////////// DEBUG: Alligned random reads
                        /*
                        memcpy((void *) test,
                            &input_mapped_buffer_[input_off], 
                            min(file_size_[0] - input_off, 4000));
                        memcpy(&output_buffer[output_off],
                            (void *) test,
                            conf.record_size);
                        */
#endif
                        output_off += conf.record_size;
                        write_records++;
                        writes++;
                    }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
                })
            );
        }
        // clang-format on

        for (auto &&result : results)
            result.get();
#ifdef bandwidth
        timer->end("checkpoints");
        uint64_t min_start_time, max_end_time;
        std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.read_thrds);
        printf("%f,READ,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), conf.write_buffer_size);
        // free(threadargs);
        timer->start("checkpoints");
#endif
        timer->end("RecRead");

        cycle_off += thrd_chunk_rec;

        // Now write the buffer to file
        vector<std::future<void>> results2;

        size_t thrd_wrt_chunk = conf.write_buffer_size / conf.write_thrds;
        timer->start("RUN write");
        // clang-format off
        for (int thrd = 0; thrd < conf.write_thrds; thrd++)
        {
            results2.emplace_back(
                pool.enqueue([=, &keys_idx]
                {
                    size_t write_off = thrd_wrt_chunk * thrd;
                    size_t thrd_f_of = out_file_off + thrd_wrt_chunk * thrd;
                    size_t writes = 0;
#ifdef bandwidth
                threadargs[thrd].tid = thrd;
                threadargs[thrd].start_time = rdtsc();
#endif
                    while(writes < thrd_wrt_chunk)
                    {
#ifdef avx512
                        __memmove_chk_avx512_no_vzeroupper(
                                &output_mapped_buffer_[thrd_f_of],
                                &output_buffer[write_off],
                                conf.block_size);
#elif pmdk
                        pmem_memcpy_nodrain(&output_mapped_buffer_[thrd_f_of],
                               &output_buffer[write_off],
                               conf.block_size);
#else                               
                        memcpy(&output_mapped_buffer_[thrd_f_of],
                               &output_buffer[write_off],
                               conf.block_size);
#endif                               
                        write_off += conf.block_size;
                        thrd_f_of += conf.block_size;
                        writes += conf.block_size;
                    }
#ifdef bandwidth
                threadargs[thrd].end_time = rdtsc();
#endif
                })
            );    
        }
        // clang-format on
        for (auto &&result : results2)
            result.get();

#ifdef bandwidth
        timer->end("checkpoints");
        std::tie(min_start_time, max_end_time) = time_min_max(threadargs, conf.write_thrds);
        printf("%f,WRITE,%f,%lu\n", timer->get_overall_time("checkpoints"), ((float)(max_end_time - min_start_time) / NANOSECONDS_IN_SECOND), conf.write_buffer_size);
        // free(threadargs);
        timer->start("checkpoints");
#endif
        timer->end("RUN write");
        out_file_off += conf.write_buffer_size;
        total_writes += conf.write_buffer_size / conf.record_size;
    }
    return total_writes;
}

void DataManager::ConvertRoffToHex(uint8_t *bytes, size_t n)
{
    for (int i = 0; i < INDEX_SIZE; ++i)
    {
        bytes[i] = (n >> i * 8) & 0xFF;
    }
}

uint64_t DataManager::ConvertHexToRoff(uint8_t *bytes)
{
    uint64_t tmp = 0;
    for (int i = INDEX_SIZE - 1; i >= 0; --i)
    {
        tmp |= (uint64_t(bytes[i]) << i * 8);
    }
    return tmp;
}