package mapreduce import ( "sync" ) // schedule starts and waits for all tasks in the given phase (Map or Reduce). func (mr *Master) schedule(phase jobPhase) { var ntasks int var nios int // number of inputs (for reduce) or outputs (for map) switch phase { case mapPhase: ntasks = len(mr.files) nios = mr.nReduce case reducePhase: ntasks = mr.nReduce nios = len(mr.files) } debug("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios) // All ntasks tasks have to be scheduled on workers, and only once all of // them have been completed successfully should the function return. // Remember that workers may fail, and that any given worker may finish // multiple tasks. // // TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO // var wg sync.WaitGroup for taskNumber := 0; taskNumber < ntasks; taskNumber++ { wg.Add(1) go func(taskNumber int) { defer wg.Done() worker := <-mr.registerChannel taskArgs := DoTaskArgs{ JobName: mr.jobName, File: mr.files[taskNumber], Phase: phase, TaskNumber: taskNumber, NumOtherPhase: nios, } // Call the DoTask RPC on the worker. ok := call(worker, "Worker.DoTask", &taskArgs, new(struct{})) if !ok { mr.schedule(phase) } mr.registerChannel <- worker }(taskNumber) } wg.Wait() debug("Schedule: %v phase done\n", phase) }