Encoder-nyuenc / src / main / threadpool.c
threadpool.c
Raw
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>

#include "utils.h"

void initThreadPool() {

    allowShutDown = 0;
    fileIndexer = (struct indexer *)malloc(sizeof(struct indexer));
    fileIndexer -> fileIndex = 0;
    fileIndexer -> fileCursor = 0;
    taskQueue = (struct task **)malloc(sizeof(struct task *) * MAXCHUNKCOUNT);
    resultQueue = (struct result **)malloc(sizeof(struct result *) * MAXCHUNKCOUNT);
    firstTask = firstResult = 0;
    lastTask = lastResult = -1;
    pthread_mutex_init(&taskQueueAccess, NULL);
    pthread_cond_init(&taskInQueue, NULL);
    pthread_cond_init(&taskCleared, NULL);
    threads = (struct thread **)malloc(sizeof(struct thread *) * jobs);
    for (int i = 0; i < jobs; i ++) {
        threads[i] = (struct thread *)malloc(sizeof(struct thread));
        threads[i] -> threadIndex = i;
        pthread_create(&(threads[i] -> threadId), NULL, startThread, (void *)threads[i]);
        if (DEBUG) fprintf(stderr, "thread %d created, thread id %lu\n", threads[i] -> threadIndex, threads[i] -> threadId);
    }

}

void *startThread(void *thread) {

    int threadIndex = ((struct thread *)thread) -> threadIndex;
    int curTask;
    while (1) {

        pthread_mutex_lock(&taskQueueAccess);
        while (firstTask > lastTask && allowShutDown == 0) {
            pthread_cond_wait(&taskInQueue, &taskQueueAccess);
        }
        if (firstTask > lastTask && allowShutDown == 1) {
            break;
        }
        curTask = firstTask;
        if (firstTask == lastTask) {
            pthread_cond_signal(&taskCleared);
        }
        if (DEBUG) fprintf(stderr, "thread %d handling task %d\n", threadIndex, firstTask);
        firstTask ++;
        pthread_mutex_unlock(&taskQueueAccess);
        encodeRLE(curTask);
        free(taskQueue[curTask]);

    }

    if (DEBUG) fprintf(stderr, "thread %d shutting down\n", threadIndex);
    pthread_mutex_unlock(&taskQueueAccess);
    pthread_exit(NULL);
    return NULL;

}

void maintainThreadPool() {

    pthread_mutex_lock(&taskQueueAccess);
    allowShutDown = 1;
    pthread_cond_broadcast(&taskInQueue);
    pthread_mutex_unlock(&taskQueueAccess);
    for (int i = 0; i < jobs; i ++) {
        pthread_join(threads[i] -> threadId, NULL);
    }
    extractFinalResult();
    for (int i = 0; i < jobs; i ++) {
        free(threads[i]);
    }
    free(threads);
    free(fileIndexer);

}