raft-key-value-store / labs-fall23-AnurimaVaishnavi-master / assignment1-2 / src / mapreduce / common_map.go
common_map.go
Raw
package mapreduce

import (
	"hash/fnv"
	"encoding/json"
	"os"
)
// doMap does the job of a map worker: it reads one of the input files
// (inFile), calls the user-defined map function (mapF) for that file's
// contents, and partitions the output into nReduce intermediate files.
func doMap(
	jobName string, // the name of the MapReduce job
	mapTaskNumber int, // which map task this is
	inFile string,
	nReduce int, // the number of reduce task that will be run ("R" in the paper)
	mapF func(file string, contents string) []KeyValue,
) {
	data, err := os.ReadFile(inFile)
	checkError(err)
	keyValues := mapF(inFile, string(data))
    for _, kv := range keyValues {
		reduceTaskNumber := int(ihash(kv.Key)) % nReduce
		intermediateFileName := reduceName(jobName, mapTaskNumber, reduceTaskNumber)
		file, err := os.OpenFile(intermediateFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
		checkError(err)
		enc := json.NewEncoder(file)
		err = enc.Encode(&kv)
		checkError(err)
		file.Close()
	}
}

func ihash(s string) uint32 {
	h := fnv.New32a()
	h.Write([]byte(s))
	return h.Sum32()
}