typeRaftstruct{musync.RWMutex// Lock to protect shared access to this peer's state
peers[]*labrpc.ClientEnd// RPC end points of all peers
persister*Persister// Object to hold this peer's persisted state
meint// this peer's index into peers[]
deadint32// set by Kill()
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
// 2A
stateNodeStatecurrentTermintvotedForintelectionTimer*time.TimerheartbeatTimer*time.Timer// 2B
logs[]Entry// the first is dummy entry which contains LastSnapshotTerm, LastSnapshotIndex and nil Command
commitIndexintlastAppliedintnextIndex[]intmatchIndex[]intapplyChchanApplyMsgapplyCond*sync.Cond// used to wakeup applier goroutine after committing new entries
replicatorCond[]*sync.Cond// used to signal replicator goroutine to batch replicating entries
}
funcMake(peers[]*labrpc.ClientEnd,meint,persister*Persister,applyChchanApplyMsg)*Raft{rf:=&Raft{peers:peers,persister:persister,me:me,dead:0,applyCh:applyCh,replicatorCond:make([]*sync.Cond,len(peers)),state:StateFollower,currentTerm:0,votedFor:-1,logs:make([]Entry,1),nextIndex:make([]int,len(peers)),matchIndex:make([]int,len(peers)),heartbeatTimer:time.NewTimer(StableHeartbeatTimeout()),electionTimer:time.NewTimer(RandomizedElectionTimeout()),}// Your initialization code here (2A, 2B, 2C).
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())rf.applyCond=sync.NewCond(&rf.mu)lastLog:=rf.getLastLog()fori:=0;i<len(peers);i++{rf.matchIndex[i],rf.nextIndex[i]=0,lastLog.Index+1ifi!=rf.me{rf.replicatorCond[i]=sync.NewCond(&sync.Mutex{})// start replicator goroutine to replicate entries in batch
gorf.replicator(i)}}// start ticker goroutine to start elections
gorf.ticker()// start applier goroutine to push committed logs into applyCh exactly once
gorf.applier()returnrf}
// ticker The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func(rf*Raft)ticker(){// for rf.killed() == false {
for!rf.killed(){// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().
select{case<-rf.electionTimer.C:// start election
DPrintf("{Node: %v} election timeout",rf.me)rf.mu.Lock()rf.ChangeState(StateCandidate)rf.currentTerm+=1rf.StartElection()rf.electionTimer.Reset(RandomizedElectionTimeout())rf.mu.Unlock()case<-rf.heartbeatTimer.C:// 领导者发送心跳维持领导力, 2A 可以先不实现
rf.mu.Lock()ifrf.state==StateLeader{rf.BroadcastHeartbeat(true)rf.heartbeatTimer.Reset(StableHeartbeatTimeout())}rf.mu.Unlock()}}}
unc(rf*Raft)StartElection(){req:=rf.genRequestVoteReq()DPrintf("{Note: %v} starts election with RequestVoteReq: %v",rf.me,req)// Closure
grantedVote:=1// elect for itself
rf.votedFor=rf.merf.persist()forpeer:=rangerf.peers{ifpeer==rf.me{continue}gofunc(peerint){resp:=new(RequestVoteResponse)ifrf.sendRequestVote(peer,req,resp){rf.mu.Lock()deferrf.mu.Unlock()DPrintf("[RequestVoteResp]-{Node: %v} receives RequestVoteResponse %v from {Node: %v} after sending RequestVoteRequest %v in term %v",rf.me,resp,peer,req,rf.currentTerm)// rf.currentTerm == req.Term 为了抛弃过期的RequestVote RPC
ifrf.currentTerm==req.Term&&rf.state==StateCandidate{// Candidate node
ifresp.VoteGranted{grantedVote+=1ifgrantedVote>len(rf.peers)/2{DPrintf("{Node: %v} receives majority votes in term %v",rf.me,rf.currentTerm)rf.ChangeState(StateLeader)rf.BroadcastHeartbeat(true)}}elseifresp.Term>rf.currentTerm{// candidate 发现有term 比自己大的,立刻转为follower
DPrintf("{Node %v} finds a new leader {Node %v} with term %v and steps down in term %v",rf.me,peer,resp.Term,rf.currentTerm)rf.ChangeState(StateFollower)rf.currentTerm,rf.votedFor=resp.Term,-1rf.persist()}}}}(peer)}}
发起投票需要异步进行,从而不阻塞ticker线程,这样candidate 再次 election timeout 之后才能自增 term 继续发起新一轮选举。
投票统计:可以在函数内定义一个变量并利用 go 的闭包来实现,也可以在结构体中维护一个 votes 变量来实现。为了 raft 结构体更干净,我选择了前者。
func(rf*Raft)RequestVote(req*RequestVoteRequest,resp*RequestVoteResponse){// Your code here (2A, 2B).
// 2A
rf.mu.Lock()deferrf.mu.Unlock()deferrf.persist()deferDPrintf("[RequestVote]-{Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v,firstLog %v,lastLog %v} before processing requestVoteRequest %v and reply requestVoteResponse %v",rf.me,rf.state,rf.currentTerm,rf.commitIndex,rf.lastApplied,rf.getFirstLog(),rf.getLastLog(),req,resp)ifreq.Term<rf.currentTerm||(req.Term==rf.currentTerm&&rf.votedFor!=-1&&rf.votedFor!=req.CandidateId){resp.Term,resp.VoteGranted=rf.currentTerm,falsereturn}ifreq.Term>rf.currentTerm{rf.ChangeState(StateFollower)rf.currentTerm,rf.votedFor=req.Term,-1}// 2A 可以先不实现
if!rf.isLogUpToDate(req.LastLogTerm,req.LastLogIndex){resp.Term,resp.VoteGranted=rf.currentTerm,falsereturn}rf.votedFor=req.CandidateIdrf.electionTimer.Reset(RandomizedElectionTimeout())resp.Term,resp.VoteGranted=rf.currentTerm,true}