#include <stdlib.h> #include <string.h> #include <stdio.h> #include <errno.h> #include <pthread.h> #include "mapreduce.h" #include "hashmap.h" Reducer reducer; char** files; int fileNum; int totalFiles; pthread_mutex_t fileNumLock; pthread_mutex_t **p_locks; Partitioner* part; struct kv { char* key; char* value; }; struct partition { struct kv** elements; int num_elements; int size; }; struct kv_list { struct partition** partitions; int num_partitions; }; struct kv_list kvl; int **kvl_counter; int *partNumbers; void init_kv_list(int sizeP) { kvl.partitions = (struct partition**) malloc(sizeP * sizeof(struct partition*)); kvl.num_partitions = sizeP; struct partition *p; p_locks = malloc(sizeof(pthread_mutex_t*) * (int)sizeP); for(int i=0;i<sizeP;i++) { kvl.partitions[i]=(struct partition*) malloc(sizeof(struct partition)); p = kvl.partitions[i]; p->elements = (struct kv**) malloc(10 * sizeof(struct kv*));//allocating the memory for each partition. p->size=10; p->num_elements=0; p_locks[i] = malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(p_locks[i], NULL); } } void setPartitionSize(struct partition* p, int newSize) { p->size=newSize; p->elements = realloc(p->elements, p->size * sizeof(struct kv*)); } void add_to_list(int pIndex, struct kv* elt) { struct partition *p = kvl.partitions[pIndex]; pthread_mutex_lock(p_locks[pIndex]); if (p->num_elements == p->size) { p->size *= 2; p->elements = realloc(p->elements, p->size * sizeof(struct kv*)); } p->elements[p->num_elements++] = elt; pthread_mutex_unlock(p_locks[pIndex]); } void MR_Emit(char *key, char *value){ struct kv *elt = (struct kv*) malloc(sizeof(struct kv)); if (elt == NULL) { printf("Malloc error! %s\n", strerror(errno)); exit(1); } elt->key = strdup(key); elt->value = strdup(value); add_to_list((int)(*part)(key, kvl.num_partitions), elt); return; } void* mapper_thread(void *mp) { pthread_mutex_lock(&fileNumLock); while(fileNum<totalFiles) {//needs lock int tempNum=fileNum; fileNum++; pthread_mutex_unlock(&fileNumLock); Mapper newMap = (Mapper) mp; (*newMap)(files[tempNum]);//needs lock pthread_mutex_lock(&fileNumLock); } pthread_mutex_unlock(&fileNumLock); return NULL; } char* get_func(char* key, int partition_number) { if (*kvl_counter[partition_number] == kvl.partitions[partition_number]->num_elements) {//check num elements updating return NULL; } struct kv *curr_elt = kvl.partitions[partition_number]->elements[*kvl_counter[partition_number]]; if (!strcmp(curr_elt->key, key)) { (*(kvl_counter[partition_number]))++; return curr_elt->value; } return NULL; } int cmp(const void* a, const void* b) { char* str1 = (*(struct kv **)a)->key; char* str2 = (*(struct kv **)b)->key; return strcmp(str1, str2); } unsigned long MR_DefaultHashPartition(char *key, int num_partitions) { unsigned long hash = 5381; int c; while ((c = *key++) != '\0') hash = hash * 33 + c; return hash % num_partitions; } void* sort_thread(void* pIndex) { qsort(kvl.partitions[*((int *)pIndex)]->elements, kvl.partitions[*((int *)pIndex)]->num_elements, sizeof(struct kv*), cmp); return NULL; } void* reducer_thread(void* pIndex) { kvl_counter[*((int *)pIndex)]= calloc(1, sizeof(size_t)); if((int)kvl.partitions[*((int *)pIndex)]->num_elements==0) { return NULL; } while (*kvl_counter[*((int *)pIndex)] < kvl.partitions[*((int *)pIndex)]->num_elements) { (*reducer)((kvl.partitions[*((int *)pIndex)]->elements[*kvl_counter[*((int *)pIndex)]])->key, get_func, *((int *)pIndex)); } return NULL; } void MR_Run(int argc, char *argv[], Mapper map, int num_mappers, Reducer reduce, int num_reducers, Partitioner partition) { init_kv_list(num_reducers); part=&partition; files = (argv + 1); kvl_counter = malloc(sizeof(int*) * num_reducers); reducer=reduce; pthread_t p[num_mappers]; partNumbers = malloc(sizeof(int)*num_reducers); for(int i=0;i<num_reducers;i++) { partNumbers[i]=i; } int fileCount=argc-1; fileNum = 0; int threadsToMake; totalFiles=fileCount; if(fileCount<num_mappers) { threadsToMake=fileCount; } else threadsToMake=num_mappers; for(int i=0;i<threadsToMake;i++) { pthread_create(&p[i], NULL, mapper_thread, (void*)map); } for(int i=0;i<threadsToMake;i++) { pthread_join(p[i], NULL); } pthread_t partReduce[num_reducers]; pthread_t sorters[num_reducers]; for(int i=0;i<num_reducers;i++) { void *n = &partNumbers[i]; pthread_create(&sorters[i], NULL, sort_thread, n); //qsort(kvl.partitions[i]->elements, kvl.partitions[i]->num_elements, sizeof(struct kv*), cmp); } for(int i=0;i<num_reducers;i++) { pthread_join(sorters[i], NULL); } for(int i=0;i<num_reducers;i++) { //int newI = i; void *n = &partNumbers[i]; pthread_create(&partReduce[i], NULL, reducer_thread, n); } for(int i=0;i<num_reducers;i++) { pthread_join(partReduce[i], NULL); } //all mapper threads should be done at this point. for(int i=0;i<num_reducers;i++) { for(int k=0;k<kvl.partitions[i]->num_elements;k++) { free(kvl.partitions[i]->elements[k]->key); free(kvl.partitions[i]->elements[k]->value); free(kvl.partitions[i]->elements[k]); } free(kvl.partitions[i]->elements); free(kvl.partitions[i]); free(p_locks[i]); free(kvl_counter[i]); } free(kvl.partitions); free(partNumbers); free(p_locks); free(kvl_counter); }