funcMap(filenamestring,contentsstring)[]mr.KeyValue{// function to detect word separators.
ff:=func(rrune)bool{return!unicode.IsLetter(r)}// split contents into an array of words.
words:=strings.FieldsFunc(contents,ff)kva:=[]mr.KeyValue{}for_,w:=rangewords{kv:=mr.KeyValue{w,"1"}kva=append(kva,kv)}returnkva}funcReduce(keystring,values[]string)string{// return the number of occurrences of this word.
returnstrconv.Itoa(len(values))}
typeMessageSendstruct{TaskIDint// task id
TaskCompletedStatusTaskCompletedStatus// task completed status
}
回复消息结构如下:
1
2
3
4
5
6
7
typeMessageReplystruct{TaskIDint// task id
TaskTypeTaskType// task type, map or reduce or wait or exit
TaskFilestring// task file name
NReduceint// reduce number, indicate the number of reduce tasks
NMapint// map number, indicate the number of map tasks
}
typeTaskInfostruct{TaskStatusTaskStatus// task status
TaskFilestring// task file
TimeStamptime.Time// time stamp, indicating the running time of the task
}
typeCoordinatorstruct{NMapint// number of map tasks
NReduceint// number of reduce tasks
MapTasks[]TaskInfo// map task
ReduceTasks[]TaskInfo// reduce task
AllMapTaskCompletedbool// whether all map tasks have been completed
AllReduceTaskCompletedbool// whether all reduce tasks have been completed
Mutexsync.Mutex// mutex, used to protect the shared data
}
func(c*Coordinator)RequestTask(args*MessageSend,reply*MessageReply)error{// lock
c.Mutex.Lock()deferc.Mutex.Unlock()// assign map task
if!c.AllMapTaskCompleted{// count the number of completed map tasks
NMapTaskCompleted:=0foridx,taskInfo:=rangec.MapTasks{iftaskInfo.TaskStatus==Unassigned||taskInfo.TaskStatus==Failed||(taskInfo.TaskStatus==Assigned&&time.Since(taskInfo.TimeStamp)>10*time.Second){reply.TaskFile=taskInfo.TaskFilereply.TaskID=idxreply.TaskType=MapTaskreply.NReduce=c.NReducereply.NMap=c.NMapc.MapTasks[idx].TaskStatus=Assigned// mark the task as assigned
c.MapTasks[idx].TimeStamp=time.Now()// update the time stamp
returnnil}elseiftaskInfo.TaskStatus==Completed{NMapTaskCompleted++}}// check if all map tasks have been completed
ifNMapTaskCompleted==len(c.MapTasks){c.AllMapTaskCompleted=true}else{reply.TaskType=Waitreturnnil}}// assign reduce task
if!c.AllReduceTaskCompleted{// count the number of completed reduce tasks
NReduceTaskCompleted:=0foridx,taskInfo:=rangec.ReduceTasks{iftaskInfo.TaskStatus==Unassigned||taskInfo.TaskStatus==Failed||(taskInfo.TaskStatus==Assigned&&time.Since(taskInfo.TimeStamp)>10*time.Second){reply.TaskID=idxreply.TaskType=ReduceTaskreply.NReduce=c.NReducereply.NMap=c.NMapc.ReduceTasks[idx].TaskStatus=Assigned// mark the task as assigned
c.ReduceTasks[idx].TimeStamp=time.Now()// update the time stamp
returnnil}elseiftaskInfo.TaskStatus==Completed{NReduceTaskCompleted++}}// check if all reduce tasks have been completed
ifNReduceTaskCompleted==len(c.ReduceTasks){c.AllReduceTaskCompleted=true}else{reply.TaskType=Waitreturnnil}}// all tasks have been completed
reply.TaskType=Exitreturnnil}
funcHandleMapTask(reply*MessageReply,mapffunc(string,string)[]KeyValue){// open the file
file,err:=os.Open(reply.TaskFile)iferr!=nil{log.Fatalf("cannot open %v",reply.TaskFile)return}// read the file, get the content
content,err:=io.ReadAll(file)iferr!=nil{log.Fatalf("cannot read %v",reply.TaskFile)return}file.Close()// call the map function to get the key-value pairs
kva:=mapf(reply.TaskFile,string(content))// create intermediate files
intermediate:=make([][]KeyValue,reply.NReduce)for_,kv:=rangekva{r:=ihash(kv.Key)%reply.NReduceintermediate[r]=append(intermediate[r],kv)}// write the intermediate files
forr,kva:=rangeintermediate{oname:=fmt.Sprintf("mr-%v-%v",reply.TaskID,r)ofile,err:=os.CreateTemp("",oname)iferr!=nil{log.Fatalf("cannot create tempfile %v",oname)}enc:=json.NewEncoder(ofile)for_,kv:=rangekva{// write the key-value pairs to the intermediate file
enc.Encode(kv)}ofile.Close()// Atomic file renaming:rename the tempfile to the final intermediate file
os.Rename(ofile.Name(),oname)}// send the task completion message to the coordinator
args:=MessageSend{TaskID:reply.TaskID,TaskCompletedStatus:MapTaskCompleted,}call("Coordinator.ReportTask",&args,&MessageReply{})}
// generate the intermediate files for reduce tasks
funcgenerateFileName(rint,NMapint)[]string{varfileName[]stringforTaskID:=0;TaskID<NMap;TaskID++{fileName=append(fileName,fmt.Sprintf("mr-%d-%d",TaskID,r))}returnfileName}funcHandleReduceTask(reply*MessageReply,reduceffunc(string,[]string)string){// load the intermediate files
varintermediate[]KeyValue// get the intermediate file names
intermediateFiles:=generateFileName(reply.TaskID,reply.NMap)// fmt.Println(intermediateFiles)
for_,filename:=rangeintermediateFiles{file,err:=os.Open(filename)iferr!=nil{log.Fatalf("cannot open %v",filename)return}// decode the intermediate file
dec:=json.NewDecoder(file)for{kv:=KeyValue{}iferr:=dec.Decode(&kv);err==io.EOF{break}intermediate=append(intermediate,kv)}file.Close()}// sort the intermediate key-value pairs by key
sort.Slice(intermediate,func(i,jint)bool{returnintermediate[i].Key<intermediate[j].Key})// write the key-value pairs to the output file
oname:=fmt.Sprintf("mr-out-%v",reply.TaskID)ofile,err:=os.Create(oname)iferr!=nil{log.Fatalf("cannot create %v",oname)return}fori:=0;i<len(intermediate);{j:=i+1forj<len(intermediate)&&intermediate[j].Key==intermediate[i].Key{j++}varvalues[]stringfork:=i;k<j;k++{values=append(values,intermediate[k].Value)}// call the reduce function to get the output
output:=reducef(intermediate[i].Key,values)// write the key-value pairs to the output file
fmt.Fprintf(ofile,"%v %v\n",intermediate[i].Key,output)i=j}ofile.Close()// rename the output file to the final output file
os.Rename(ofile.Name(),oname)// send the task completion message to the coordinator
args:=MessageSend{TaskID:reply.TaskID,TaskCompletedStatus:ReduceTaskCompleted,}call("Coordinator.ReportTask",&args,&MessageReply{})}