package mapreduce import ( "encoding/json" "os" "sort" ) func doReduce( jobName string, // the name of the whole MapReduce job reduceTaskNumber int, // which reduce task this is nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string, ) { keyValueMap := make(map[string][]string) for i := 0; i < nMap; i++ { intermediateFileName := reduceName(jobName, i, reduceTaskNumber) file, err := os.Open(intermediateFileName) checkError(err) decoder := json.NewDecoder(file) for { var kv KeyValue if err := decoder.Decode(&kv); err != nil { break } keyValueMap[kv.Key] = append(keyValueMap[kv.Key], kv.Value) } var keys []string for key := range keyValueMap { keys = append(keys, key) } sort.Strings(keys) outputFileName := mergeName(jobName, reduceTaskNumber) outputFile, err := os.Create(outputFileName) checkError(err) encoder := json.NewEncoder(outputFile) for _, key := range keys { result := reduceF(key, keyValueMap[key]) err := encoder.Encode(KeyValue{Key: key, Value: result}) checkError(err) } outputFile.Close() } }