// Join adds new groups to the configuration.
func(cf*MemoryConfigStateMachine)Join(groupsmap[int][]string)Err{lastConfig:=cf.Configs[len(cf.Configs)-1]// create a new configuration based on the last configuration
newConfig:=Config{len(cf.Configs),lastConfig.Shards,deepCopy(lastConfig.Groups),}forgid,servers:=rangegroups{// if the group does not exist in the new configuration, add it
if_,ok:=newConfig.Groups[gid];!ok{newServers:=make([]string,len(servers))copy(newServers,servers)newConfig.Groups[gid]=newServers}}group2Shards:=Group2Shards(newConfig)for{// load balance the shards among the groups
source,target:=GetGIDWithMaximumShards(group2Shards),GetGIDWithMinimumShards(group2Shards)ifsource!=0&&len(group2Shards[source])-len(group2Shards[target])<=1{break}group2Shards[target]=append(group2Shards[target],group2Shards[source][0])group2Shards[source]=group2Shards[source][1:]}// update the shard assignment in the new configuration
varnewShards[NShards]intforgid,shards:=rangegroup2Shards{for_,shard:=rangeshards{newShards[shard]=gid}}newConfig.Shards=newShardscf.Configs=append(cf.Configs,newConfig)returnOK}
// Leave removes specified groups from the configuration.
func(cf*MemoryConfigStateMachine)Leave(gids[]int)Err{lastConifg:=cf.Configs[len(cf.Configs)-1]// create a new configuration based on the last configuration
newConfig:=Config{len(cf.Configs),lastConifg.Shards,deepCopy(lastConifg.Groups),}group2Shards:=Group2Shards(newConfig)// used to store the orphan shards (i.e., shards owned by
orphanShards:=make([]int,0)for_,gid:=rangegids{// if the group exists in the new configuration, remove it
if_,ok:=newConfig.Groups[gid];ok{delete(newConfig.Groups,gid)}// if the group owns any shards, remove them and add them to the orphan shards
ifshards,ok:=group2Shards[gid];ok{delete(group2Shards,gid)orphanShards=append(orphanShards,shards...)}}varnewShards[NShards]intiflen(newConfig.Groups)>0{// re-allocate orphan shards to the remaining groups
for_,shard:=rangeorphanShards{gid:=GetGIDWithMinimumShards(group2Shards)newShards[shard]=gidgroup2Shards[gid]=append(group2Shards[gid],shard)}// update the shard assignment in the new configuration
forgid,shards:=rangegroup2Shards{for_,shard:=rangeshards{newShards[shard]=gid}}}newConfig.Shards=newShardscf.Configs=append(cf.Configs,newConfig)returnOK}
Move:更新当前的配置,将该分片从旧的副本组重新分配到指定的新副本组。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Move moves a specified shard to a specified group.
func(cf*MemoryConfigStateMachine)Move(shard,gidint)Err{lastConfig:=cf.Configs[len(cf.Configs)-1]// create a new configuration based on the last configuration
newConfig:=Config{len(cf.Configs),lastConfig.Shards,deepCopy(lastConfig.Groups),}// update the shard assignment in the new configuration
newConfig.Shards[shard]=gidcf.Configs=append(cf.Configs,newConfig)returnOK}
// Query queries a specified configuration.
func(cf*MemoryConfigStateMachine)Query(numint)(Config,Err){// if the configuration number is not valid, return the latest configuration
ifnum<0||num>=len(cf.Configs){returncf.Configs[len(cf.Configs)-1],OK}returncf.Configs[num],OK}// Group2Shards assigns each shard to the corresponding group.
funcGroup2Shards(configConfig)map[int][]int{group2Shards:=make(map[int][]int)forgid:=rangeconfig.Groups{group2Shards[gid]=make([]int,0)}forshard,gid:=rangeconfig.Shards{group2Shards[gid]=append(group2Shards[gid],shard)}returngroup2Shards}
typeShardKVstruct{musync.RWMutex// mutex for synchronizing access to shared resources
deadint32// set by Kill(), indicates if the server is killed
rf*raft.Raft// raft instance for consensus
applyChchanraft.ApplyMsg// channel for applying raft messages
makeEndfunc(string)*labrpc.ClientEnd// function to create a client end to communicate with other groups
gidint// group id of the server
sc*shardctrler.Clerk// client to communicate with the shardctrler
maxRaftStateint// snapshot if log grows this big
lastAppliedint// index of the last applied log entry to prevent stateMachine from rolling back
lastConfigshardctrler.Config// the last configuration received from the shardctrler
currentConfigshardctrler.Config// the current configuration of the cluster
stateMachinemap[int]*Shard// KV State Machines
lastOperationsmap[int64]OperationContext// determine whether log is duplicated by (clientId, commandId)
notifyChansmap[int]chan*CommandReply// notify the client when the command is applied
}funcStartServer(servers[]*labrpc.ClientEnd,meint,persister*raft.Persister,maxraftstateint,gidint,ctrlers[]*labrpc.ClientEnd,makeEndfunc(string)*labrpc.ClientEnd)*ShardKV{// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Command{})labgob.Register(CommandArgs{})labgob.Register(shardctrler.Config{})labgob.Register(ShardOperationArgs{})labgob.Register(ShardOperationReply{})// Create a channel to receive messages applied by Raft.
applyCh:=make(chanraft.ApplyMsg)kv:=&ShardKV{dead:0,rf:raft.Make(servers,me,persister,applyCh),applyCh:applyCh,makeEnd:makeEnd,gid:gid,sc:shardctrler.MakeClerk(ctrlers),maxRaftState:maxraftstate,lastApplied:0,lastConfig:shardctrler.DefaultConfig(),currentConfig:shardctrler.DefaultConfig(),stateMachine:make(map[int]*Shard),lastOperations:make(map[int64]OperationContext),notifyChans:make(map[int]chan*CommandReply),}// Restore any snapshot data stored in persister to recover the state.
kv.restoreSnapshot(persister.ReadSnapshot())gokv.applier()// Start several monitoring routines that periodically perform specific actions:
gokv.Monitor(kv.configurationAction,ConfigurationMonitorTimeout)// Monitor configuration changes.
gokv.Monitor(kv.migrationAction,MigrationMonitorTimeout)// Monitor shard migration.
gokv.Monitor(kv.gcAction,GCMonitorTimeout)// Monitor garbage collection of old shard data.
gokv.Monitor(kv.checkEntryInCurrentTermAction,EmptyEntryDetectorTimeout)// Monitor Raft log entries in the current term.
returnkv}
// Execute processes a command and returns the result via the reply parameter.
func(kv*ShardKV)Execute(commandCommand,reply*CommandReply){// do not hold lock to improve throughput
// when KVServer holds the lock to take snapshot, underlying raft can still commit raft logs
index,_,isLeader:=kv.rf.Start(command)if!isLeader{reply.Err=ErrWrongLeaderreturn}kv.mu.Lock()notifyChan:=kv.getNotifyChan(index)kv.mu.Unlock()// wait for the result or timeout.
select{caseresult:=<-notifyChan:reply.Value,reply.Err=result.Value,result.Errcase<-time.After(ExecuteTimeout):reply.Err=ErrTimeout}// release notifyChan to reduce memory footprint
// why asynchronously? to improve throughput, here is no need to block client request
gofunc(){kv.mu.Lock()kv.removeOutdatedNotifyChan(index)kv.mu.Unlock()}()}// applier continuously applies commands from the Raft log to the state machine.
func(kv*ShardKV)applier(){for!kv.killed(){select{// wait for a new message in the apply channel
casemessage:=<-kv.applyCh:ifmessage.CommandValid{kv.mu.Lock()// check if the command has already been applied.
ifmessage.CommandIndex<=kv.lastApplied{kv.mu.Unlock()continue}// update the last applied index
kv.lastApplied=message.CommandIndexreply:=new(CommandReply)// type assert the command from the message.
command:=message.Command.(Command)switchcommand.CommandType{caseOperation:// extract the operation data and apply the operation to the state machine
operation:=command.Data.(CommandArgs)reply=kv.applyOperation(&operation)caseConfiguration:// extract the configuration data and apply the configuration to the state machine
nextConfig:=command.Data.(shardctrler.Config)reply=kv.applyConfiguration(&nextConfig)caseInsertShards:// extract the shard insertion data and apply the insertion to the state machine
shardsInfo:=command.Data.(ShardOperationReply)reply=kv.applyInsertShards(&shardsInfo)caseDeleteShards:// extract the shard deletion data and apply the deletion to the state machine
shardsInfo:=command.Data.(ShardOperationArgs)reply=kv.applyDeleteShards(&shardsInfo)caseEmptyShards:// apply empty shards to the state machine, to prevent the state machine from rolling back
reply=kv.applyEmptyShards()}// only notify the related channel for currentTerm's log when node is Leader
ifcurrentTerm,isLeader:=kv.rf.GetState();isLeader&&message.CommandTerm==currentTerm{notifyChan:=kv.getNotifyChan(message.CommandIndex)notifyChan<-reply}// take snapshot if needed
ifkv.needSnapshot(){kv.takeSnapshot(message.CommandIndex)}kv.mu.Unlock()}elseifmessage.SnapshotValid{// restore the state machine from the snapshot
kv.mu.Lock()ifkv.rf.CondInstallSnapshot(message.SnapshotTerm,message.SnapshotIndex,message.Snapshot){kv.restoreSnapshot(message.Snapshot)kv.lastApplied=message.SnapshotIndex}kv.mu.Unlock()}else{panic(fmt.Sprintf("{Node %v}{Group %v} invalid apply message %v",kv.rf.GetId(),kv.gid,message))}}}}
const(OperationCommandType=iota// Generic operation command
Configuration// Configuration change command
InsertShards// Command to insert shards
DeleteShards// Command to delete shards
EmptyShards// Command to empty shards
)typeCommandstruct{CommandTypeCommandTypeDatainterface{}}typeShardOperationArgsstruct{ConfigNumintShardIDs[]int}typeShardOperationReplystruct{ErrErrConfigNumintShardsmap[int]map[string]stringLastOperationsmap[int64]OperationContext}// NewOperationCommand creates a new operation command from CommandArgs
funcNewOperationCommand(args*CommandArgs)Command{returnCommand{Operation,*args}}// NewConfigurationCommand creates a new configuration command
funcNewConfigurationCommand(config*shardctrler.Config)Command{returnCommand{Configuration,*config}}// NewInsertShardsCommand creates a new command to insert shards
funcNewInsertShardsCommand(reply*ShardOperationReply)Command{returnCommand{InsertShards,*reply}}// NewDeleteShardsCommand creates a new command to delete shards
funcNewDeleteShardsCommand(args*ShardOperationArgs)Command{returnCommand{DeleteShards,*args}}// NewEmptyShardsCommand creates a new command indicating no shards
funcNewEmptyShardsCommand()Command{returnCommand{EmptyShards,nil}}
2.3.3 读写服务
这段代码主要实现了与shardkv服务相关的操作逻辑。其中canServe函数用于判断服务器是否能服务指定分片,依据是当前配置下分片所属组与服务器组 ID 是否一致以及分片状态。Command函数先检查是否为重复请求,若是非Get操作的重复请求则直接返回结果,同时也检查服务器能否服务对应分片,不能则返回ErrWrongGroup,否则调用Execute。applyOperation函数在处理操作时,先检查服务器能否服务分片,对于非Get操作的重复请求返回上次结果,否则将操作应用到状态机,并在非Get操作时更新客户端操作相关信息,通过在多处进行相关判断和检查来保障操作的正确性和线性化语义。
// canServe checks if the server can serve the shard.
func(kv*ShardKV)canServe(shardIDint)bool{returnkv.currentConfig.Shards[shardID]==kv.gid&&(kv.stateMachine[shardID].Status==Serving||kv.stateMachine[shardID].Status==GCing)}func(kv*ShardKV)Command(args*CommandArgs,reply*CommandReply){kv.mu.RLock()// if the command is the duplicated, return result directly without raft layer's participation
ifargs.Op!=Get&&kv.isDuplicateRequest(args.ClientId,args.CommandId){lastReply:=kv.lastOperations[args.ClientId].LastReplyreply.Value,reply.Err=lastReply.Value,lastReply.Errkv.mu.RUnlock()return}// check if the server can serve the requested shard.
if!kv.canServe(key2shard(args.Key)){reply.Err=ErrWrongGroupkv.mu.RUnlock()return}kv.mu.RUnlock()kv.Execute(NewOperationCommand(args),reply)}// applyOperation applies a given operation to the KV state machine.
func(kv*ShardKV)applyOperation(operation*CommandArgs)*CommandReply{reply:=new(CommandReply)shardID:=key2shard(operation.Key)// check if the server can serve the requested shard.
if!kv.canServe(shardID){reply.Err=ErrWrongGroup}else{// check if the operation is duplicated(only for non-Get operations)
ifoperation.Op!=Get&&kv.isDuplicateRequest(operation.ClientId,operation.CommandId){lastReply:=kv.lastOperations[operation.ClientId].LastReplyreply.Value,reply.Err=lastReply.Value,lastReply.Err}else{// apply the operation to the state machine
reply=kv.applyLogToStateMachine(operation,shardID)// update the last operation context for the client if the operation is not a Get operation
ifoperation.Op!=Get{kv.lastOperations[operation.ClientId]=OperationContext{operation.CommandId,reply,}}}}returnreply}
// Checks if the next configuration can be performed.
// If all shards are in Serving status, it queries and applies the next configuration.
func(kv*ShardKV)configurationAction(){canPerformNextConfig:=truekv.mu.RLock()// If any shard is not in the Serving status, the next configuration cannot be applied
for_,shard:=rangekv.stateMachine{ifshard.Status!=Serving{canPerformNextConfig=falsebreak}}currentConfigNum:=kv.currentConfig.Numkv.mu.RUnlock()// Query and apply the next configuration if allowed
ifcanPerformNextConfig{nextConfig:=kv.sc.Query(currentConfigNum+1)// Ensure the queried configuration is the next one
ifnextConfig.Num==currentConfigNum+1{kv.Execute(NewConfigurationCommand(&nextConfig),new(CommandReply))}}}// applyConfiguration applies a new configuration to the shard.
func(kv*ShardKV)applyConfiguration(nextConfig*shardctrler.Config)*CommandReply{reply:=new(CommandReply)// check if the new configuration is the next in line.
ifnextConfig.Num==kv.currentConfig.Num+1{// update the shard status based on the new configuration.
kv.updateShardStatus(nextConfig)// save the last configuration.
kv.lastConfig=kv.currentConfig// update the current configuration.
kv.currentConfig=*nextConfigreply.Err=OK}else{reply.Err=ErrOutDated}returnreply}
// Executes the migration task to pull shard data from other groups.
func(kv*ShardKV)migrationAction(){kv.mu.RLock()gid2Shards:=kv.getShardIDsByStatus(Pulling)varwgsync.WaitGroup// Create pull tasks for each group (GID)
forgid,shardIDs:=rangegid2Shards{wg.Add(1)gofunc(servers[]string,configNumint,shardIDs[]int){deferwg.Done()pullTaskArgs:=ShardOperationArgs{configNum,shardIDs}// Try to pull shard data from each server in the group
for_,server:=rangeservers{pullTaskReply:=new(ShardOperationReply)srv:=kv.makeEnd(server)ifsrv.Call("ShardKV.GetShardsData",&pullTaskArgs,pullTaskReply)&&pullTaskReply.Err==OK{//Pulling data from these servers
kv.Execute(NewInsertShardsCommand(pullTaskReply),new(CommandReply))}}}(kv.lastConfig.Groups[gid],kv.currentConfig.Num,shardIDs)}kv.mu.RUnlock()wg.Wait()// Wait for all pull tasks to complete
}// applyInsertShards applies the insertion of shard data.
func(kv*ShardKV)applyInsertShards(shardsInfo*ShardOperationReply)*CommandReply{reply:=new(CommandReply)// check if the configuration number matches the current one.
ifshardsInfo.ConfigNum==kv.currentConfig.Num{forshardID,shardData:=rangeshardsInfo.Shards{shard:=kv.stateMachine[shardID]// only pull if the shard is in the Pulling state.
ifshard.Status==Pulling{forkey,value:=rangeshardData{shard.Put(key,value)}// update the shard status to Garbage Collecting.
shard.Status=GCing}else{break}}// update last operations with the provided contexts.
forclientId,operationContext:=rangeshardsInfo.LastOperations{iflastOperation,ok:=kv.lastOperations[clientId];!ok||lastOperation.MaxAppliedCommandId<operationContext.MaxAppliedCommandId{kv.lastOperations[clientId]=operationContext}}}else{reply.Err=ErrOutDated}returnreply}
// Executes garbage collection (GC) tasks to delete shard data from other groups.
func(kv*ShardKV)gcAction(){kv.mu.RLock()// Get the group that was previously responsible for these shards and clean up the shards that are no longer responsible.
gid2Shards:=kv.getShardIDsByStatus(GCing)varwgsync.WaitGroup// Create GC tasks for each group (GID)
forgid,shardIDs:=rangegid2Shards{wg.Add(1)gofunc(servers[]string,configNumint,shardIDs[]int){deferwg.Done()gcTaskArgs:=ShardOperationArgs{configNum,shardIDs}// Try to delete shard data from each server in the group
for_,server:=rangeservers{gcTaskReply:=new(ShardOperationReply)srv:=kv.makeEnd(server)ifsrv.Call("ShardKV.DeleteShardsData",&gcTaskArgs,gcTaskReply)&&gcTaskReply.Err==OK{kv.Execute(NewDeleteShardsCommand(&gcTaskArgs),new(CommandReply))}}}(kv.lastConfig.Groups[gid],kv.currentConfig.Num,shardIDs)}kv.mu.RUnlock()wg.Wait()// Wait for all GC tasks to complete
}// applyInsertShards applies the insertion of shard data.
func(kv*ShardKV)applyInsertShards(shardsInfo*ShardOperationReply)*CommandReply{reply:=new(CommandReply)// check if the configuration number matches the current one.
ifshardsInfo.ConfigNum==kv.currentConfig.Num{forshardID,shardData:=rangeshardsInfo.Shards{shard:=kv.stateMachine[shardID]// only pull if the shard is in the Pulling state.
ifshard.Status==Pulling{forkey,value:=rangeshardData{shard.Put(key,value)}// update the shard status to Garbage Collecting.
shard.Status=GCing}else{break}}// update last operations with the provided contexts.
forclientId,operationContext:=rangeshardsInfo.LastOperations{iflastOperation,ok:=kv.lastOperations[clientId];!ok||lastOperation.MaxAppliedCommandId<operationContext.MaxAppliedCommandId{kv.lastOperations[clientId]=operationContext}}}else{reply.Err=ErrOutDated}returnreply}
2.3.7 空日志检测
根据 raft 论文 5.4.2 节,新 leader 提交之前 term 的日志存在风险。若要提交这类日志,需等新 leader 在自身任期产生新日志,新日志提交时,之前 term 的日志才能随之提交。这意味着若当前 term 迟迟无日志生成并提交,之前 term 的部分日志将一直无法提交,进而可能导致活锁,使日志无法推进。
// Ensures that a log entry is present in the current term to keep the log active.
func(kv*ShardKV)checkEntryInCurrentTermAction(){// If no log entry exists in the current term, execute an empty command
if!kv.rf.HasLogInCurrentTerm(){kv.Execute(NewEmptyShardsCommand(),new(CommandReply))}}// applyEmptyShards handles the case for empty shards. This is to prevent the state machine from rolling back.
func(kv*ShardKV)applyEmptyShards()*CommandReply{return&CommandReply{Err:OK}}
typeClerkstruct{sm*shardctrler.Clerk// Client that communicates with the shardctrler to get the latest configuration data (mapping of shards to groups)
configshardctrler.Config// The current cluster configuration, including the shard-to-group mapping, based on which Clerk sends requests.
makeEndfunc(string)*labrpc.ClientEnd// Generates an RPC connection to a server, each of which is identified by a unique address.
// You will have to modify this struct.
leaderIdsmap[int]int// gid -> leaderId, gid is the group id, leaderId is the leader server id
clientIdint64// generated by nrand(), it would be better to use some distributed ID generation algorithm that guarantees no conflicts
commandIdint64// (clientId, commandId) defines a operation uniquely
}func(ck*Clerk)Command(args*CommandArgs)string{args.ClientId,args.CommandId=ck.clientId,ck.commandIdfor{shard:=key2shard(args.Key)gid:=ck.config.Shards[shard]ifservers,ok:=ck.config.Groups[gid];ok{// if not set, set the default leader id to 0
if_,ok=ck.leaderIds[gid];!ok{ck.leaderIds[gid]=0}oldLeaderId:=ck.leaderIds[gid]newLeader:=oldLeaderIdfor{reply:=new(CommandReply)// send the request to the leader server
ok:=ck.makeEnd(servers[newLeader]).Call("ShardKV.Command",args,reply)ifok&&(reply.Err==OK||reply.Err==ErrNoKey){ck.commandId++returnreply.Value}elseifok&&reply.Err==ErrWrongGroup{break}else{// try the next server
newLeader=(newLeader+1)%len(servers)// check if all servers have been tried
ifnewLeader==oldLeaderId{break}}}}time.Sleep(100*time.Millisecond)// Query the latest configuration from the shardctrler
ck.config=ck.sm.Query(-1)}}