raft-key-value-store / labs-fall23-AnurimaVaishnavi-master / assignment1-2 / src / mapreduce / schedule.go
schedule.go
Raw
package mapreduce
import (
	"sync"
)

// schedule starts and waits for all tasks in the given phase (Map or Reduce).
func (mr *Master) schedule(phase jobPhase) {
	var ntasks int
	var nios int // number of inputs (for reduce) or outputs (for map)
	switch phase {
	case mapPhase:
		ntasks = len(mr.files)
		nios = mr.nReduce
	case reducePhase:
		ntasks = mr.nReduce
		nios = len(mr.files)
	}

	debug("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)

	// All ntasks tasks have to be scheduled on workers, and only once all of
	// them have been completed successfully should the function return.
	// Remember that workers may fail, and that any given worker may finish
	// multiple tasks.
	//
	// TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO
	//
	var wg sync.WaitGroup
	for taskNumber := 0; taskNumber < ntasks; taskNumber++ {
		wg.Add(1)
		go func(taskNumber int) {
			defer wg.Done()
			worker := <-mr.registerChannel
			taskArgs := DoTaskArgs{
				JobName:       mr.jobName,
				File:          mr.files[taskNumber],
				Phase:         phase,
				TaskNumber:    taskNumber,
				NumOtherPhase: nios,
			}
			// Call the DoTask RPC on the worker.
			ok := call(worker, "Worker.DoTask", &taskArgs, new(struct{}))

			if !ok {
				mr.schedule(phase)
			}
			mr.registerChannel <- worker
		}(taskNumber)
	}
	wg.Wait()

	debug("Schedule: %v phase done\n", phase)
}