package mapreduce // schedule starts and waits for all tasks in the given phase (Map or Reduce). func (mr *Master) schedule(phase jobPhase) { var ntasks int var nios int // number of inputs (for reduce) or outputs (for map) switch phase { case mapPhase: ntasks = len(mr.files) nios = mr.nReduce case reducePhase: ntasks = mr.nReduce nios = len(mr.files) } debug("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios) // All ntasks tasks have to be scheduled on workers, and only once all of // them have been completed successfully should the function return. // Remember that workers may fail, and that any given worker may finish // multiple tasks. // TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO // Create a single done channel outside the loop to synchronize task completion done := make(chan struct{}) unfinishedChannel := make(chan int, ntasks) for taskNumber := 0; taskNumber < ntasks; taskNumber++ { unfinishedChannel <- taskNumber } var i = 0 for { if i == ntasks { break } select { case value, ok := <-unfinishedChannel: if ok { go func(taskNumber int) { worker := <-mr.registerChannel file := "" if phase == mapPhase { file = mr.files[taskNumber] } taskArgs := DoTaskArgs{ JobName: mr.jobName, File: file, Phase: phase, TaskNumber: taskNumber, NumOtherPhase: nios, } // Call the DoTask RPC on the worker. ok := call(worker, "Worker.DoTask", &taskArgs, new(struct{})) if !ok { unfinishedChannel <- value } else { done <- struct{}{} mr.registerChannel <- worker } }(value) } case <-done: i++ } } }