raft-key-value-store / labs-fall23-AnurimaVaishnavi-master / assignment1-3 / src / mapreduce / schedule.go
schedule.go
Raw
package mapreduce

// schedule starts and waits for all tasks in the given phase (Map or Reduce).
func (mr *Master) schedule(phase jobPhase) {
	var ntasks int
	var nios int // number of inputs (for reduce) or outputs (for map)
	switch phase {
	case mapPhase:
		ntasks = len(mr.files)
		nios = mr.nReduce
	case reducePhase:
		ntasks = mr.nReduce
		nios = len(mr.files)
	}

	debug("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)

	// All ntasks tasks have to be scheduled on workers, and only once all of
	// them have been completed successfully should the function return.
	// Remember that workers may fail, and that any given worker may finish
	// multiple tasks.
	// TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO

	// Create a single done channel outside the loop to synchronize task completion
	done := make(chan struct{})
	unfinishedChannel := make(chan int, ntasks)
	for taskNumber := 0; taskNumber < ntasks; taskNumber++ {
		unfinishedChannel <- taskNumber
	}
	var i = 0

	for {
		if i == ntasks {
			break
		}
		select {
		case value, ok := <-unfinishedChannel:
			if ok {
				go func(taskNumber int) {
					worker := <-mr.registerChannel
					file := ""
					if phase == mapPhase {
						file = mr.files[taskNumber]
					}
					taskArgs := DoTaskArgs{
						JobName:       mr.jobName,
						File:          file,
						Phase:         phase,
						TaskNumber:    taskNumber,
						NumOtherPhase: nios,
					}

					// Call the DoTask RPC on the worker.
					ok := call(worker, "Worker.DoTask", &taskArgs, new(struct{}))

					if !ok {
						unfinishedChannel <- value
					} else {
						done <- struct{}{}
						mr.registerChannel <- worker
					}
				}(value)
			}
		case <-done:
			i++
		}
	}
}