WiscSort / pmem_benchmark / mmapThread.c
mmapThread.c
Raw
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>

#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <getopt.h>
#include <errno.h>
#include <stdbool.h>
#include <ctype.h>
#include <inttypes.h>
#include <pthread.h>
#include <assert.h>

#include "nano_time.h"

// avx512 test
#include <stdint.h>
void *__memmove_chk_avx512_no_vzeroupper(void *dest, void *src, size_t s);

/**
* To create 4 GB file: This will allocate space on disk
* $ dd < /dev/zero bs=1048576 count=4096 > testfile
*
* 100 GiB
* dd if=/dev/zero of=bigmmaptest bs=1M count=102400
* To clear cache:
* $ sync; echo 1 > /proc/sys/vm/drop_caches
*/
#define SAMPLE_LATENCY 0
#define BYTES_IN_GB (1024*1024*1024)
// Block sized will be used for read and the same will be used for striding 
// when iterating over a file in mmap. 
#define DEFAULT_BLOCK_SIZE 4096 //8192
#define NANOSECONDS_IN_SECOND 1000000000

const char DEFAULT_NAME[] = "/mnt/pmem/mmaptest";

#define EXIT_MSG(...)           \
    do {                        \
        printf(__VA_ARGS__);   \
        _exit(-1);              \
    } while (0)


uint64_t read_mmap_test(int fd, int tid, size_t block_size, size_t filesize, char* buf,
                    off_t *offsets, uint64_t *begin, uint64_t *end);
uint64_t write_mmap_test(int fd, int tid, size_t block_size, size_t filesize, char* buf,
                    off_t *offsets, uint64_t *begin, uint64_t *end);
uint64_t mmap_test(int fd, int tid, size_t block_size, size_t filesize, char *buf,
    char optype, off_t *offsets, uint64_t *begin, uint64_t *end);

uint64_t read_syscall_test(int fd, int tid, size_t block_size, size_t filesize, 
                    off_t *offsets, uint64_t *begin, uint64_t *end);
uint64_t write_syscall_test(int fd, int tid, size_t block_size, size_t filesize,
                     off_t *offsets, uint64_t *begin, uint64_t *end);
uint64_t syscall_test(int fd, int tid, size_t block_size, size_t filesize, 
        char optype, off_t *offsets, uint64_t *begin, uint64_t *end);
        
size_t get_filesize(const char* filename);
void print_help_message(const char *progname);
char*    map_buffer(int fd, size_t size);
void    *run_tests(void *);

static int silent = 0;

typedef struct {
    int tid;
    int fd;
    char *mapped_buffer;
    int read_mmap;
    int read_syscall;
    int write_mmap;
    int write_syscall;
    off_t *offsets;
    size_t block_size;
    size_t chunk_size;
    int retval;
    uint64_t start_time;
    uint64_t end_time;
} threadargs_t;


size_t filesize;
int main(int argc, char **argv) {
    char *fname = (char*) DEFAULT_NAME;
    char *mapped_buffer = NULL;
    int c, fd, i, flags = O_RDWR, numthreads = 1, ret, option_index;
    static int randomaccess = 0,
        read_mmap = 0, read_syscall = 0,
        write_mmap = 0, write_syscall = 0,
        mixed_mmap = 0, write_tr = 0;
    off_t *offsets = 0;
    size_t block_size = DEFAULT_BLOCK_SIZE, numblocks,
     new_file_size = 0;
     uint64_t min_start_time, max_end_time = 0, retval;
     // permissions
    uint64_t mode = S_IRWXU | S_IRWXG;

    pthread_t *threads;
    threadargs_t *threadargs;

    static struct option long_options[] = 
    {
        // Options set a flag
        {"randomaccess", no_argument, &randomaccess, 1},
        {"readmmap", no_argument, &read_mmap, 1},
        {"readsyscall", no_argument, &read_syscall, 1},
        {"silent", no_argument, &silent, 1},
        {"writemmap", no_argument, &write_mmap, 1},
        {"writesyscall", no_argument, &write_syscall, 1},
        {"mixedmmap", no_argument, &mixed_mmap, 1},
        // Options take an argument
        {"block", required_argument, 0, 'b'},
        {"file", required_argument, 0, 'f'},
        {"help", no_argument, 0, 'h'},
        {"size", no_argument, 0, 's'},
        {"threads", required_argument, 0, 't'},
        {"writethreads", no_argument, 0, 'w'},
        {0, 0, 0, 0}
    };

    //read operations
    while(1) {
        c = getopt_long(argc, argv, "b:f:h:s:t:w:",
                long_options, &option_index);

        // is end of the option
        if (c == -1)
            break;
        
        switch(c)
        {
            case 0:
                break;
            case 'b':
                block_size = atoi(optarg);
                break;
            case 'f':
                fname = optarg;
                break;
            case 'h':
                print_help_message(argv[0]);
                _exit(0);
            case 's':
                new_file_size = (size_t)(atoi(optarg)) * BYTES_IN_GB;
                break;
            case 't':
                numthreads = (int) (atoi(optarg));
                break;
            case 'w':
                write_tr = atoi(optarg);
                break;
            default:
                break;
        }
    }

    if(!silent){
        printf("PID: %d\n", getpid());
        printf("Using file %s \n", fname);
    }
    if ((filesize = get_filesize(fname)) == -1) {
        if (read_mmap || read_syscall) {
            printf("Cannot obtain file size for %s: %s"
                   "File must exist prior to running read tests.\n",
                   fname, strerror(errno));
            _exit(-1);
        }
        else
            filesize = new_file_size;
    }

    fd = open((const char*)fname, flags, mode);
    if(fd <0) {
        printf("Clould not open/create file %s: %s\n",
            fname, strerror(errno));
            _exit(-1);
    }

    if(block_size < 0 || block_size > filesize){
        printf("Invalid block size: %zu for file of size "
        "%zu. Block size must be greater than 0 and no"
        "greater than the file size.\n",
        block_size, filesize);
        _exit(-1);
    }

    /* 
    * Generate random block number for random file access.
    * Sequential for sequential access
    */
   numblocks = filesize/block_size;
   if(filesize % block_size > 0)
        numblocks++;

    offsets = (off_t *) malloc(numblocks * sizeof(off_t));
    if(offsets == 0){
        printf("Failed to allocate memory: %s\n", strerror(errno));
        _exit(-1);
    }
    for (uint64_t i = 0; i < numblocks; i++)
        if(randomaccess)
            offsets[i] = ((int)random() % numblocks) * block_size;
        else
            offsets[i] = i*block_size;
    if (numblocks % numthreads != 0)
        EXIT_MSG("We have %" PRIu64 " blocks and %d threads. "
                "Threads must evenly divide blocks. "
                "Please fix the args.\n",
                (uint_least64_t)numblocks, numthreads);

    if( read_mmap || write_mmap || mixed_mmap)
        assert((mapped_buffer = map_buffer(fd, filesize)) != NULL);

    threads = (pthread_t*)malloc(numthreads * sizeof(pthread_t));
    threadargs = 
            (threadargs_t*)malloc(numthreads * sizeof(threadargs_t));
    
    if (threads == NULL || threadargs == NULL)
        EXIT_MSG("Could not allocate thread array for %d threads.\n", numthreads);
    
    for (i = 0; i < numthreads; i++) {
        if(mixed_mmap){
            if (i < write_tr) {
                write_mmap = 1;
            } else {
                read_mmap = 1;
            }
        }
        threadargs[i].fd = fd;
        threadargs[i].tid = i;
        threadargs[i].block_size = block_size;
        threadargs[i].chunk_size = filesize/numthreads;
        threadargs[i].mapped_buffer = mapped_buffer;
        threadargs[i].offsets = &offsets[numblocks/numthreads * i];
        threadargs[i].read_mmap = read_mmap;
        threadargs[i].read_syscall = read_syscall;
        threadargs[i].write_mmap = write_mmap;
        threadargs[i].write_syscall = write_syscall;
        int ret = pthread_create(&threads[i], NULL, run_tests, &threadargs[i]);
        if (ret!=0)
            EXIT_MSG("pthread_create for %dth thread failed: %s\n",
                i, strerror(errno));
    }

    for (i = 0; i< numthreads; i++){
        ret = pthread_join(threads[i], NULL);
        if (ret !=0)
            EXIT_MSG("Thread %d failed in join: %s\n", 
            i, strerror(errno));
    }


    // for mixed mode determine read and write aggregate b/w.
    if(mixed_mmap) {
        // Write b/w
        min_start_time = threadargs[0].start_time;
        max_end_time = 0;
        // Since tid 0 to write_tr-1 did writes, find it's min and max.
        for(i=0; i < write_tr; i++){
            min_start_time = (threadargs[i].start_time < min_start_time)?
                threadargs[i].start_time:min_start_time;
            max_end_time = (threadargs[i].end_time > max_end_time)?
                threadargs[i].end_time:max_end_time;
        }
        printf("Write: %.2f\n", 
            (double)write_tr*(filesize/numthreads)/(double)(max_end_time-min_start_time)
            * NANOSECONDS_IN_SECOND / BYTES_IN_GB);
        
        // Read b/w
        min_start_time = threadargs[write_tr].start_time;
        max_end_time = 0;
        for(i=write_tr; i < numthreads; i++){
            min_start_time = (threadargs[i].start_time < min_start_time)?
                threadargs[i].start_time:min_start_time;
            max_end_time = (threadargs[i].end_time > max_end_time)?
                threadargs[i].end_time:max_end_time;
       } 
        printf("Read: %.2f\n", 
            (double)(numthreads-write_tr)*(filesize/numthreads)/(double)(max_end_time-min_start_time)
            * NANOSECONDS_IN_SECOND / BYTES_IN_GB);
    }

    /**
     * For total run time. Find the smallest start time
     * and largest end time across all threads.
     */
    min_start_time = threadargs[0].start_time;
    max_end_time = 0;
    for (i=0; i< numthreads; i++){
        min_start_time = (threadargs[i].start_time < min_start_time)?
            threadargs[i].start_time:min_start_time;
        max_end_time = (threadargs[i].end_time > max_end_time)?
            threadargs[i].end_time:max_end_time;
    }

    printf("%.2f\n", 
            (double)filesize/(double)(max_end_time-min_start_time)
            * NANOSECONDS_IN_SECOND / BYTES_IN_GB);
    

    munmap(mapped_buffer, filesize);
    close(fd);

}

void * run_tests(void *args) {
    uint64_t retval;
    threadargs_t t = *(threadargs_t*)args;

    if(t.read_mmap) {
        if(!silent)
            printf("Running read mmap test:\n");
        retval = read_mmap_test(t.fd, t.tid, t.block_size, t.chunk_size, 
                    t.mapped_buffer, t.offsets,
                    &((threadargs_t*)args)->start_time,
                    &((threadargs_t*)args)->end_time);
    }
    else if(t.read_syscall) {
        if(!silent)
            printf("Running read syscall test:\n");
        retval = read_syscall_test(t.fd, t.tid, t.block_size, t.chunk_size, 
                    t.offsets,
                    &((threadargs_t*)args)->start_time,
                    &((threadargs_t*)args)->end_time);
    }
    else if(t.write_mmap) {
        if(!silent)
            printf("Running write mmap test:\n");
        retval = write_mmap_test(t.fd, t.tid, t.block_size, t.chunk_size, 
                    t.mapped_buffer, t.offsets,
                    &((threadargs_t*)args)->start_time,
                    &((threadargs_t*)args)->end_time);
    }
    else if(t.write_syscall) {
        if(!silent)
            printf("Running write syscall test:\n");
        retval = write_syscall_test(t.fd, t.tid, t.block_size, t.chunk_size, 
                    t.offsets,
                    &((threadargs_t*)args)->start_time,
                    &((threadargs_t*)args)->end_time);
    }
    return (void*) 0;
}

#define READ 1
#define WRITE 2

/**
 ********* SYSCALL section
 */
uint64_t read_syscall_test(int fd, int tid, size_t block_size, size_t filesize, 
                off_t *offsets, uint64_t *begin, uint64_t *end) {
            return syscall_test(fd, tid, block_size, filesize, READ, offsets,
                        begin, end);
}

uint64_t write_syscall_test(int fd, int tid, size_t block_size, size_t filesize,
                off_t *offsets, uint64_t *begin, uint64_t *end) {
            return syscall_test(fd, tid, block_size, filesize, WRITE, offsets,
                        begin, end);
}

uint64_t syscall_test(int fd, int tid, size_t block_size, size_t filesize, 
        char optype, off_t *offsets, uint64_t *begin, uint64_t *end) {

    bool done = false;
    char * buffer = NULL;
    int i = 0;
    size_t total_bytes_transferred = 0;
    uint64_t begin_time, end_time, ret_token = 0;

    buffer = (char*)malloc(block_size);
    if(buffer == NULL) {
        printf("Failed to allocate memory: %s\n", strerror(errno));
        return -1;
    }

    memset((void*)buffer, 0, block_size);

    begin_time= nano_time();

    while(!done) {
        size_t bytes_transferred = 0;

        if(optype == READ)
            bytes_transferred = pread(fd, buffer, block_size, offsets[i++]);
        else if (optype == WRITE)
            bytes_transferred = pwrite(fd, buffer, block_size, offsets[i++]);
        if (bytes_transferred == 0)
            done = true;
        else if(bytes_transferred == -1){
            printf("Failed to IO: %s\n", strerror(errno));
            return -1;
        }
        else {
            total_bytes_transferred += bytes_transferred;

            if (optype == WRITE && total_bytes_transferred == filesize)
                done = true;
            
            // Do random operation
            ret_token += buffer[0];
        }
        if (i*block_size >= filesize)
            done = true;
    }

    end_time = nano_time();

    if(!silent){
        printf("%s: %" PRIu64 " bytes transferred in %" PRIu64 ""
        " ns.\n", (optype == READ)?"read-syscall":"write-syscall",
        (uint_least64_t)total_bytes_transferred, (end_time-begin_time));
        // Throughput in GB/s
        printf("(tid %d) %.2f\n", tid,
            (double)filesize/(double)(end_time-begin_time)
            * NANOSECONDS_IN_SECOND / BYTES_IN_GB);
    }
    
    *begin = begin_time;
    *end = end_time;
    return ret_token;
}

/**
 * MMAP tests
 */

uint64_t read_mmap_test(int fd, int tid, size_t block_size, size_t filesize, 
            char *buf, off_t *offsets, uint64_t *begin, uint64_t *end) {
    return mmap_test(fd, tid, block_size, filesize, buf, READ, offsets, begin, end);
}

uint64_t write_mmap_test(int fd, int tid, size_t block_size, size_t filesize, 
            char *buf, off_t *offsets, uint64_t *begin, uint64_t *end){
    return mmap_test(fd, tid, block_size, filesize, buf, WRITE, offsets, begin, end);
}
// Orginal!
// Add memory addr
#if SAMPLE_LATENCY
#define BEGIN_LAT_SAMPLE                                 \
    if (num_samples < MAX_LAT_SAMPLES && i%LAT_SAMPL_INTERVAL == 0)   \
        lat_begin_time = nano_time();

#define END_LAT_SAMPLE                                                  \
    if (num_samples < MAX_LAT_SAMPLES && i%LAT_SAMPL_INTERVAL == 0) {                \
    lat_end_time = nano_time();                                         \
    latency_samples[i/LAT_SAMPL_INTERVAL % MAX_LAT_SAMPLES] =           \
        lat_end_time - lat_begin_time;                                  \
    num_samples++;                                                      \
    }


#define MAX_LAT_SAMPLES 5000
//#define LAT_SAMPL_INTERVAL (1000*1048576)
#define LAT_SAMPL_INTERVAL block_size

#else

#define BEGIN_LAT_SAMPLE ;
#define END_LAT_SAMPLE

#endif

uint64_t mmap_test(int fd, int tid, size_t block_size, size_t filesize, char *mapped_buffer, 
                char optype, off_t *offsets, uint64_t *begin, uint64_t *end) {
    
    bool done = false;
    char *buffer = NULL;
    uint64_t i, j, numblocks, ret;
    uint64_t begin_time, end_time, ret_token = 0;

#if SAMPLE_LATENCY
    uint64_t lat_begin_time, lat_end_time;
    size_t latency_samples[MAX_LAT_SAMPLES];
    int num_samples = 0;

    memset((void*)latency_samples, 0, sizeof(latency_samples));
#endif

    buffer = (char*)malloc(block_size);
    if(buffer == NULL) {
        printf("Failed to allocate memory: %s\n", strerror(errno));
        return -1;
    }
    memset((void*)buffer, 1, block_size);

    begin_time = nano_time();
    for(i=0; i<filesize; i+=block_size){
        off_t offset = offsets[i/block_size];
        //BEGIN_LAT_SAMPLE;
        if(optype == READ) {
            //__memmove_chk_avx512_no_vzeroupper(buffer, &mapped_buffer[offset], block_size);
            memcpy(buffer, &mapped_buffer[offset], block_size);
            ret_token += buffer[0];
        }
        else if (optype == WRITE) {
            //__memmove_chk_avx512_no_vzeroupper(&mapped_buffer[offset], buffer, block_size);
            memcpy(&mapped_buffer[offset], buffer, block_size);
	    //msync(mapped_buffer, block_size, MS_SYNC );
	    //msync();
            ret_token += mapped_buffer[i];
        }
        //END_LAT_SAMPLE;
    }

    end_time = nano_time();

    if(!silent) {
        printf("%s: %" PRIu64 " bytes read in %" PRIu64 " ns.\n",
        (optype==READ)?"readmap":"writemap",
        (uint_least64_t)filesize, (end_time-begin_time));
    
        // print GB/s
        printf("(tid %d) %.2f\n", tid,
            (double)filesize/(double)(end_time-begin_time)
            * NANOSECONDS_IN_SECOND / BYTES_IN_GB);
    }

    *begin = begin_time;
    *end = end_time;

#if SAMPLE_LATENCY
    printf("\nSample latency for %ld byte block:\n", block_size);
    for (i = 0; i < MAX_LAT_SAMPLES; i++)
        printf("\t%ld: %ld\n", i, latency_samples[i]);

#endif
    return ret_token;
}

char* map_buffer(int fd, size_t size) {
    char *mapped_buffer = NULL;

//    Populate
//      mapped_buffer = (char*)mmap(NULL, size, PROT_READ | PROT_WRITE,
//                              MAP_PRIVATE | MAP_POPULATE, fd, 0);
//    Shared
    mapped_buffer = (char*)mmap(NULL, size, PROT_READ | PROT_WRITE,
                                MAP_SHARED, fd, 0);
//    Anon test
//    mapped_buffer = (char*)mmap(NULL, size, PROT_READ | PROT_WRITE,
//                            MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
    if(mapped_buffer == MAP_FAILED)
        EXIT_MSG("Failed to mmap file of size %zu: %s\n",
            size, strerror(errno));
  
//    Might also need to gurantee page aligned - posix_memalign() 
//    int mret = madvise(mapped_buffer, filesize, MADV_HUGEPAGE);
//     if(mret!=0) {
//        fprintf(stderr, "failed madvise: %s\n", strerror(errno));
//    } 
    return mapped_buffer;
}

size_t get_filesize(const char* filename){
    int retval;

    struct stat st;
    retval = stat(filename, &st);
    if(retval)
        return -1;
    else 
        return st.st_size;
}

void print_help_message(const char *progname) {

    /* take only the last portion of the path */
    const char *basename = strrchr(progname, '/');
    basename = basename ? basename + 1 : progname;

    printf("usage: %s [OPTION]\n", basename);
    printf("  -h, --help\n"
           "     Print this help and exit.\n");
    printf("  -b, --block[=BLOCKSIZE]\n"
           "     Block size used for read system calls.\n"
           "     For mmap tests, the size of the stride when iterating\n"
           "     over the file.\n"
           "     Defaults to %d.\n", DEFAULT_BLOCK_SIZE);
    printf("  -f, --file[=FILENAME]\n"
           "     Perform all tests on this file (defaults to %s).\n",
           DEFAULT_NAME);
    printf("  --readsyscall\n"
           "     Perform a read test using system calls.\n");
    printf("  --readmmap\n"
           "     Perform a read test using mmap.\n");
    printf("  --writesyscall\n"
           "     Perform a write test using system calls.\n");
    printf("  --writemmap\n"
           "     Perform a write test using mmap.\n");
    printf(" --randomaccess\n"
           "    Perform random access.\n");
    printf(" --threads\n"
           "    Number of threads to use. Defaults to one.\n");
    printf(" --mixedmmap\n"
           "    Perfom read and write concurrently at different offsets\n");
    printf(" -w, -writethreads[=0]\n"
           "    Number of threads that should perform write\n");
}