typeKVServerstruct{musync.RWMutexmeintrf*raft.RaftapplyChchanraft.ApplyMsgdeadint32// set by Kill()
maxraftstateint// snapshot if log grows this big
lastAppliedint//record the last applied index to avoid duplicate apply
stateMachineKVStateMachinelastOperationsmap[int64]OperationContextnotifyChsmap[int]chan*CommandReply}typeKVStateMachineinterface{Get(keystring)(string,Err)Put(key,valuestring)ErrAppend(key,valuestring)Err}typeOperationContextstruct{MaxAppliedCommandIdint64LastReply*CommandReply}
func(kv*KVServer)applier(){forkv.killed()==false{select{casemessage:=<-kv.applyCh:DPrintf("{Node %v} tries to apply message %v",kv.rf.GetId(),message)ifmessage.CommandValid{kv.mu.Lock()ifmessage.CommandIndex<=kv.lastApplied{DPrintf("{Node %v} discards outdated message %v because a newer snapshot which lastApplied is %v has been restored",kv.rf.GetId(),message,kv.lastApplied)kv.mu.Unlock()continue}kv.lastApplied=message.CommandIndexreply:=new(CommandReply)command:=message.Command.(Command)// type assertion
ifcommand.Op!=OpGet&&kv.isDuplicatedCommand(command.ClientId,command.CommandId){DPrintf("{Node %v} doesn't apply duplicated message %v to stateMachine because maxAppliedCommandId is %v for client %v",kv.rf.GetId(),message,kv.lastOperations[command.ClientId],command.ClientId)reply=kv.lastOperations[command.ClientId].LastReply}else{reply=kv.applyLogToStateMachine(command)ifcommand.Op!=OpGet{kv.lastOperations[command.ClientId]=OperationContext{MaxAppliedCommandId:command.CommandId,LastReply:reply,}}}// just notify related channel for currentTerm's log when node is leader
ifcurrentTerm,isLeader:=kv.rf.GetState();isLeader&&message.CommandTerm==currentTerm{ch:=kv.getNotifyCh(message.CommandIndex)ch<-reply}kv.mu.Unlock()}}}}
func(kv*KVServer)applier(){forkv.killed()==false{select{casemessage:=<-kv.applyCh:DPrintf("{Node %v} tries to apply message %v",kv.rf.GetId(),message)ifmessage.CommandValid{kv.mu.Lock()ifmessage.CommandIndex<=kv.lastApplied{DPrintf("{Node %v} discards outdated message %v because a newer snapshot which lastApplied is %v has been restored",kv.rf.GetId(),message,kv.lastApplied)kv.mu.Unlock()continue}kv.lastApplied=message.CommandIndexreply:=new(CommandReply)command:=message.Command.(Command)// type assertion
ifcommand.Op!=OpGet&&kv.isDuplicatedCommand(command.ClientId,command.CommandId){DPrintf("{Node %v} doesn't apply duplicated message %v to stateMachine because maxAppliedCommandId is %v for client %v",kv.rf.GetId(),message,kv.lastOperations[command.ClientId],command.ClientId)reply=kv.lastOperations[command.ClientId].LastReply}else{reply=kv.applyLogToStateMachine(command)ifcommand.Op!=OpGet{kv.lastOperations[command.ClientId]=OperationContext{MaxAppliedCommandId:command.CommandId,LastReply:reply,}}}// just notify related channel for currentTerm's log when node is leader
ifcurrentTerm,isLeader:=kv.rf.GetState();isLeader&&message.CommandTerm==currentTerm{ch:=kv.getNotifyCh(message.CommandIndex)ch<-reply}ifkv.needSnapshot(){kv.takeSnapshot(message.CommandIndex)}kv.mu.Unlock()}elseifmessage.SnapshotValid{kv.mu.Lock()ifkv.rf.CondInstallSnapshot(message.SnapshotTerm,message.SnapshotIndex,message.Snapshot){kv.restoreStateFromSnapshot(message.Snapshot)kv.lastApplied=message.SnapshotIndex}kv.mu.Unlock()}else{panic(fmt.Sprintf("Invalid ApplyMsg %v",message))}}}}