Final-Mapreduce / mapreduce.c
mapreduce.c
Raw
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include "mapreduce.h"
#include "hashmap.h"


Reducer reducer;
char** files;
int fileNum;
int totalFiles;
pthread_mutex_t fileNumLock;
pthread_mutex_t **p_locks;

Partitioner* part;

struct kv {
    char* key;
    char* value;
};

struct partition {
    struct kv** elements;
    int num_elements;
    int size;
	
};

struct kv_list {
	struct partition** partitions;
	int num_partitions;

};

struct kv_list kvl;
int **kvl_counter;
int *partNumbers;


void init_kv_list(int sizeP) {
    kvl.partitions = (struct partition**) malloc(sizeP * sizeof(struct partition*));
    kvl.num_partitions = sizeP;
	    
	struct partition *p;
	p_locks = malloc(sizeof(pthread_mutex_t*) * (int)sizeP);
	for(int i=0;i<sizeP;i++) {
		kvl.partitions[i]=(struct partition*) malloc(sizeof(struct partition));
		p = kvl.partitions[i];
		
		p->elements = (struct kv**) malloc(10 * sizeof(struct kv*));//allocating the memory for each partition.
		
		p->size=10;
		
		p->num_elements=0;
		p_locks[i] = malloc(sizeof(pthread_mutex_t));
		pthread_mutex_init(p_locks[i], NULL);
		
	}
	
	
}

void setPartitionSize(struct partition* p, int newSize) {
	p->size=newSize;
	p->elements = realloc(p->elements, p->size * sizeof(struct kv*));
}

void add_to_list(int pIndex, struct kv* elt) {
	
	struct partition *p = kvl.partitions[pIndex];
	pthread_mutex_lock(p_locks[pIndex]);
    if (p->num_elements == p->size) {
		p->size *= 2;
		p->elements = realloc(p->elements, p->size * sizeof(struct kv*));
    }
    p->elements[p->num_elements++] = elt;
	
	pthread_mutex_unlock(p_locks[pIndex]);

}



void MR_Emit(char *key, char *value){
	struct kv *elt = (struct kv*) malloc(sizeof(struct kv));
    if (elt == NULL) {
	printf("Malloc error! %s\n", strerror(errno));
	exit(1);
    }
    elt->key = strdup(key);
    elt->value = strdup(value);
	
	
    add_to_list((int)(*part)(key, kvl.num_partitions), elt);
	
    return;
}

void* mapper_thread(void *mp) {
   
   pthread_mutex_lock(&fileNumLock);
   while(fileNum<totalFiles) {//needs lock
	
	int tempNum=fileNum;
	fileNum++;
	pthread_mutex_unlock(&fileNumLock);
	Mapper newMap = (Mapper) mp;
    
	(*newMap)(files[tempNum]);//needs lock
	
	
	pthread_mutex_lock(&fileNumLock);
   }
   pthread_mutex_unlock(&fileNumLock);
   
    
	return NULL;
}

char* get_func(char* key, int partition_number) {
    if (*kvl_counter[partition_number] == kvl.partitions[partition_number]->num_elements) {//check num elements updating
	return NULL;
    }
    struct kv *curr_elt = kvl.partitions[partition_number]->elements[*kvl_counter[partition_number]];
	
    if (!strcmp(curr_elt->key, key)) {
	(*(kvl_counter[partition_number]))++;
	return curr_elt->value;
    }
	

    return NULL;
}

int cmp(const void* a, const void* b) {
    char* str1 = (*(struct kv **)a)->key;
    char* str2 = (*(struct kv **)b)->key;
    return strcmp(str1, str2);
}

unsigned long MR_DefaultHashPartition(char *key, int num_partitions) {
    unsigned long hash = 5381;
    int c;
    while ((c = *key++) != '\0')
        hash = hash * 33 + c;
    return hash % num_partitions;
}

void* sort_thread(void* pIndex) {
		qsort(kvl.partitions[*((int *)pIndex)]->elements, kvl.partitions[*((int *)pIndex)]->num_elements, sizeof(struct kv*), cmp);
		return NULL;
}

void* reducer_thread(void* pIndex) {
	
	kvl_counter[*((int *)pIndex)]= calloc(1, sizeof(size_t));
	
	if((int)kvl.partitions[*((int *)pIndex)]->num_elements==0) {
		
		return NULL;
	}
	
	while (*kvl_counter[*((int *)pIndex)] < kvl.partitions[*((int *)pIndex)]->num_elements) {
	
	(*reducer)((kvl.partitions[*((int *)pIndex)]->elements[*kvl_counter[*((int *)pIndex)]])->key, get_func, *((int *)pIndex));
	
    }

	return NULL;
}

void MR_Run(int argc, char *argv[], 
	    Mapper map, int num_mappers, 
	    Reducer reduce, int num_reducers, 
	    Partitioner partition) {
			
			init_kv_list(num_reducers);
			part=&partition;
			files = (argv + 1);
            kvl_counter = malloc(sizeof(int*) * num_reducers);
			
			reducer=reduce;
			pthread_t p[num_mappers];
			
			partNumbers = malloc(sizeof(int)*num_reducers);
			for(int i=0;i<num_reducers;i++) {
				partNumbers[i]=i;
			}
			int fileCount=argc-1;
            fileNum = 0;
			int threadsToMake;
			totalFiles=fileCount;
			if(fileCount<num_mappers) {
				threadsToMake=fileCount;
			}
			else threadsToMake=num_mappers;
			for(int i=0;i<threadsToMake;i++) {
				pthread_create(&p[i], NULL, mapper_thread, (void*)map);
			}
			
			for(int i=0;i<threadsToMake;i++) {
				pthread_join(p[i], NULL);
			}
			
			pthread_t partReduce[num_reducers];
			
			pthread_t sorters[num_reducers];
            for(int i=0;i<num_reducers;i++) {
                void *n = &partNumbers[i];
                pthread_create(&sorters[i], NULL, sort_thread, n);
                //qsort(kvl.partitions[i]->elements, kvl.partitions[i]->num_elements, sizeof(struct kv*), cmp);
                
            }
            for(int i=0;i<num_reducers;i++) {
                pthread_join(sorters[i], NULL);
                
            }

			
			for(int i=0;i<num_reducers;i++) {
				//int newI = i;
				void *n = &partNumbers[i];
				pthread_create(&partReduce[i], NULL, reducer_thread, n);
				

			}
			
			for(int i=0;i<num_reducers;i++) {
				pthread_join(partReduce[i], NULL);
				
			}


			//all mapper threads should be done at this point.
			

			for(int i=0;i<num_reducers;i++) {
				for(int k=0;k<kvl.partitions[i]->num_elements;k++) {
					free(kvl.partitions[i]->elements[k]->key);
					free(kvl.partitions[i]->elements[k]->value);
					free(kvl.partitions[i]->elements[k]);

				}
				free(kvl.partitions[i]->elements);
				free(kvl.partitions[i]);
				free(p_locks[i]);
				free(kvl_counter[i]);
			}
			free(kvl.partitions);
			free(partNumbers);
			free(p_locks);
			free(kvl_counter);
			
        }