raft-key-value-store / labs-fall23-AnurimaVaishnavi-master / assignment1-3 / src / mapreduce / common_reduce.go
common_reduce.go
Raw
package mapreduce
import (
	"encoding/json"
	"os"
	"sort"
)
func doReduce(
	jobName string, // the name of the whole MapReduce job
	reduceTaskNumber int, // which reduce task this is
	nMap int, // the number of map tasks that were run ("M" in the paper)
	reduceF func(key string, values []string) string,
) {
	    keyValueMap := make(map[string][]string)
	    for i := 0; i < nMap; i++ {
		intermediateFileName := reduceName(jobName, i, reduceTaskNumber)
		file, err := os.Open(intermediateFileName)
		checkError(err)
		decoder := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := decoder.Decode(&kv); err != nil {
				break
			}
			keyValueMap[kv.Key] = append(keyValueMap[kv.Key], kv.Value)
		}
		var keys []string
		for key := range keyValueMap {
			keys = append(keys, key)
		}
		sort.Strings(keys)
		outputFileName := mergeName(jobName, reduceTaskNumber)
		outputFile, err := os.Create(outputFileName)
		checkError(err)
		encoder := json.NewEncoder(outputFile)
		for _, key := range keys {
			result := reduceF(key, keyValueMap[key])
			err := encoder.Encode(KeyValue{Key: key, Value: result})
			checkError(err)
		}
		outputFile.Close()
    }
}