WiscSort / wiscSort / RSW / data_manager.cc
data_manager.cc
Raw
#include <unistd.h>
#include <assert.h>
#include "data_manager.h"
#include "parameter.h"
#include <fcntl.h>
#include <sys/mman.h>
#include <iostream>
#define min(a, b) (((a) < (b)) ? (a) : (b))

DataManager::DataManager(vector<string> &file_list)
{
    file_ptr.resize(file_list.size());
    file_size.resize(file_list.size());
    size_t rolling_sum = 0;
    for (size_t i=0; i < file_list.size(); i++) {
        int fd = open(file_list[i].c_str(), O_RDWR | O_DIRECT);
        if(fd < 0) {
            printf("Couldn't open %s: %s\n", file_list[i].c_str(), strerror(errno));
            exit(1);
        } 
        file_ptr[i] = fd;
        // Get the file size
        struct stat st;
        stat(file_list[i].c_str(), &st);
        off_t size = st.st_size;
        file_size[i] = (uint64_t)size;

        file_index.insert(std::make_pair(rolling_sum, i));
        rolling_sum += size / BLOCK_SIZE;
    }
    file_index.insert(std::make_pair(rolling_sum, -1));
}

void DataManager::init_output_buffer(size_t size) {
    //assert(posix_memalign(reinterpret_cast<void**>(&output_buf), BLOCK_SIZE, size * BLOCK_SIZE) == 0); 
    output_buf = (record_t *)malloc(size * BLOCK_SIZE);
    assert(output_buf);
}

size_t DataManager::load(size_t block_num, size_t len, record_t *buffer) {
    // fetch the file the block belongs to.
    auto it = file_index.upper_bound(block_num);
    it--;

    if(it->second < 0) return 0;
    size_t file_num = it->second;
    int fd = file_ptr[file_num];
    assert(fd>0);
    off_t offset = block_num - (size_t) it -> first;

    mmapped_buffer = (char*)mmap(NULL, file_size[file_num],
                    PROT_READ, MAP_SHARED, fd, 0);
    if (mmapped_buffer == MAP_FAILED) {
        printf("Failed to mmap file of size %lu: %s\n",
                file_size[file_num], strerror(errno));
        return -1;
    }

    size_t next_file_start = it -> first + (file_size[file_num] / BLOCK_SIZE);
    size_t i = 0;
    for (; i < len * NUM_PER_BLOCK;) {
        if (block_num == next_file_start) {

            //unmmap previous file
            munmap(mmapped_buffer, file_size[file_num]);
            file_num ++;
            if (file_num >= file_index.size() - 1) {
                break;
            }
            fd = file_ptr[file_num];
            offset = 0;
            next_file_start = next_file_start + (file_size[file_num] / BLOCK_SIZE);
            
            // mmap the next file.
            mmapped_buffer = (char*)mmap(NULL, file_size[file_num],
                            PROT_READ, MAP_SHARED, fd, 0);
            if (mmapped_buffer == MAP_FAILED) {
                printf("Failed to mmap file of size %lu: %s\n",
                        file_size[file_num], strerror(errno));
                return -1;
            }
        }
        // size_t req_size = min(next_file_start - block_num, input_req_size);
        //TODO: How does std::copy compare?
        memcpy((void *) (buffer + i), 
                    &mmapped_buffer[offset* BLOCK_SIZE], 
                    READ_SIZE * BLOCK_SIZE);
	i += READ_SIZE * NUM_PER_BLOCK;
        block_num += READ_SIZE;
        offset += READ_SIZE;
    }
 
    return i;

}

void DataManager::set_output_fd(int fd) {
    output_fd = fd;
    buf_ptr = 0;
    output_count = 0;
}

void DataManager::buffer_write(record_t *buffer, in_record_t *keys, int num_block, char *mmapped_buffer) {
    //REVIEW: Should this step also be parallelized later.
    for(size_t i=0; i < num_block * NUM_PER_OUT_BLOCK; i++){
        output_buf[buf_ptr + i] = std::move(buffer[keys[i].v_index]);
    }

    output_count += num_block * NUM_PER_OUT_BLOCK;
    buf_ptr += num_block * NUM_PER_OUT_BLOCK;
    off_t off = output_count / NUM_PER_OUT_BLOCK - WRITE_SIZE;
    uint32_t records_boundary = buf_ptr - WRITE_SIZE * NUM_PER_OUT_BLOCK;
    /**
     * TODO: Supporting file sizes other than the multiple of MEM_SIZE.
    std::cout << "off: " << off << " buf_ptr: " << buf_ptr << " reord_bndr " << records_boundary << std::endl;
    std::cout << "----- " << off * OUTPUT_BUFFER_SIZE* BLOCK_SIZE << std::endl;
    int req_rec = min((WRITE_SIZE * NUM_PER_OUT_BLOCK), (MEM_SIZE * NUM_PER_BLOCK - output_count));
    std::cout << "--- req_rec--- " << req_rec << std::endl;
    */
     memcpy(&mmapped_buffer[off * OUTPUT_BUFFER_SIZE* BLOCK_SIZE],
            &output_buf[records_boundary],
            WRITE_SIZE * OUTPUT_BUFFER_SIZE * BLOCK_SIZE);
    if (buf_ptr == OUTPUT_BUFFER_SIZE * NUM_PER_BLOCK) {
	            buf_ptr = 0;
    }
    //TODO: return req_rec.
}

uint64_t DataManager::get_overall_size()  {
    uint64_t sum = 0;
    for (size_t i = 0; i < file_size.size(); i++) {
        sum += file_size[i];
    }
    return sum;
}


DataManager::~DataManager() {
    for (int fd : file_ptr) close(fd);
    if (!output_buf) delete[] output_buf;
//    stop = true;
}

size_t DataManager::get_filesize(const char* filename){
    int retval;

    struct stat st;
    retval = stat(filename, &st);
    if(retval)
        return -1;
    else 
        return st.st_size;
}