流程梳理#
整体逻辑, 从 ticker goroutine 开始, 集群开始的时候,所有节点均为Follower, 它们依靠ticker()成为Candidate。ticker 协程会定期收到两个 timer 的到期事件,如果是 election timer 到期,则发起一轮选举;如果是 heartbeat timer 到期且节点是 leader,则发起一轮心跳。
ElectionTimer 和 HeartbeatTimer. 如果某个raft 节点election timeout,则会触发leader election, 调用StartElection 方法。 StartElection 中发送 RequestVote RPC, 根据ReqestVote Response 判断是否收到选票,决定是否成为Leader。
如果某个节点,收到大多数节点的选票,成为Leader 要通过发送Heartbeat 即空LogEntry 的AppendEntries RPC 来告诉其他节点自己的 Leader 地位。
所以Lab2A 中,主要实现 RequestVote, AppendEntries 的逻辑。
服务器状态#
服务器在任意时间只能处于以下三种状态之一:
Leader:处理所有客户端请求、日志同步、心跳维持领导权。同一时刻最多只能有一个可行的 LeaderFollower:所有服务器的初始状态,功能为:追随领导者,接收领导者日志并实时同步,特性:完全被动的(不发送 RPC,只响应收到的 RPC)Candidate:用来选举新的 Leader,处于 Leader 和 Follower 之间的暂时状态,如Follower 一定时间内未收到来自Leader的心跳包,Follower会自动切换为Candidate,并开始选举操作,向集群中的其它节点发送投票请求,待收到半数以上的选票时,协调者升级成为领导者。
系统正常运行时,只有一个 Leader,其余都是 Followers。Leader拥有绝对的领导力,不断向Followers同步日志且发送心跳状态。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  | type Raft struct {
	mu        sync.RWMutex        // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()
	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	// 2A
	state          NodeState
	currentTerm    int
	votedFor       int
	electionTimer  *time.Timer
	heartbeatTimer *time.Timer
	// 2B
	logs        []Entry // the first is dummy entry which contains LastSnapshotTerm, LastSnapshotIndex and nil Command
	commitIndex int
	lastApplied int
	nextIndex   []int
	matchIndex  []int
	applyCh        chan ApplyMsg
	applyCond      *sync.Cond   // used to wakeup applier goroutine after committing new entries
	replicatorCond []*sync.Cond // used to signal replicator goroutine to batch replicating entries
}
  | 
- 集群所有节点初始状态均为Follower
 - Follower 被动地接受 Leader 或 Candidate 的 RPC;
 - 所以,如果 Leader 想要保持权威,必须向集群中的其它节点发送心跳包(空的 
AppendEntries RPC) - 等待选举超时(electionTimeout,一般在 100~500ms)后,Follower 没有收到任何 RPC
 - Follower 认为集群中没有 Leader
 - 开始新的一轮选举
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  | func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{
		peers:          peers,
		persister:      persister,
		me:             me,
		dead:           0,
		applyCh:        applyCh,
		replicatorCond: make([]*sync.Cond, len(peers)),
		state:          StateFollower,
		currentTerm:    0,
		votedFor:       -1,
		logs:           make([]Entry, 1),
		nextIndex:      make([]int, len(peers)),
		matchIndex:     make([]int, len(peers)),
		heartbeatTimer: time.NewTimer(StableHeartbeatTimeout()),
		electionTimer:  time.NewTimer(RandomizedElectionTimeout()),
	}
	// Your initialization code here (2A, 2B, 2C).
	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())
	rf.applyCond = sync.NewCond(&rf.mu)
	lastLog := rf.getLastLog()
	for i := 0; i < len(peers); i++ {
		rf.matchIndex[i], rf.nextIndex[i] = 0, lastLog.Index+1
		if i != rf.me {
			rf.replicatorCond[i] = sync.NewCond(&sync.Mutex{})
			// start replicator goroutine to replicate entries in batch
			go rf.replicator(i)
		}
	}
	// start ticker goroutine to start elections
	go rf.ticker()
	// start applier goroutine to push committed logs into applyCh exactly once
	go rf.applier()
	return rf
}
  | 
集群开始的时候,所有节点均为Follower, 它们依靠ticker()成为Candidate。ticker 协程会定期收到两个 timer 的到期事件,如果是 election timer 到期,则发起一轮选举;如果是 heartbeat timer 到期且节点是 leader,则发起一轮心跳。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  | // ticker The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
	// for rf.killed() == false {
	for !rf.killed() {
		// Your code here to check if a leader election should
		// be started and to randomize sleeping time using
		// time.Sleep().
		select {
		case <-rf.electionTimer.C: // start election
			DPrintf("{Node: %v} election timeout", rf.me)
			rf.mu.Lock()
			rf.ChangeState(StateCandidate)
			rf.currentTerm += 1
			rf.StartElection()
			rf.electionTimer.Reset(RandomizedElectionTimeout())
			rf.mu.Unlock()
		case <-rf.heartbeatTimer.C: // 领导者发送心跳维持领导力, 2A 可以先不实现
			rf.mu.Lock()
			if rf.state == StateLeader {
				rf.BroadcastHeartbeat(true)
				rf.heartbeatTimer.Reset(StableHeartbeatTimeout())
			}
			rf.mu.Unlock()
		}
	}
}
  | 
选举与投票#
当一个节点开始竞选:
- 增加自己的 
currentTerm - 转为 
Candidate 状态,其目标是获取超过半数节点的选票,让自己成为 Leader - 先给自己投一票
 - 并行地向集群中其它节点发送
 RequestVote RPC 索要选票,如果没有收到指定节点的响应,它会反复尝试,直到发生以下三种情况之一:- 获得超过半数的选票:成为 
Leader,并向其它节点发送 AppendEntries 心跳; - 收到来自 
Leader 的 RPC:转为 Follower; - 其它两种情况都没发生,没人能够获胜(electionTimeout 已过):增加 
currentTerm,开始新一轮选举; 
 
Candidate 选举程序与投票统计
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  | unc (rf *Raft) StartElection() {
	req := rf.genRequestVoteReq()
	DPrintf("{Note: %v} starts election with RequestVoteReq: %v", rf.me, req)
	// Closure
	grantedVote := 1 // elect for itself
	rf.votedFor = rf.me
	rf.persist()
	for peer := range rf.peers {
		if peer == rf.me {
			continue
		}
		go func(peer int) {
			resp := new(RequestVoteResponse)
			if rf.sendRequestVote(peer, req, resp) {
				rf.mu.Lock()
				defer rf.mu.Unlock()
				DPrintf("[RequestVoteResp]-{Node: %v} receives RequestVoteResponse %v from {Node: %v} after sending RequestVoteRequest %v in term %v",
					rf.me, resp, peer, req, rf.currentTerm)
				// rf.currentTerm == req.Term 为了抛弃过期的RequestVote RPC
				if rf.currentTerm == req.Term && rf.state == StateCandidate { // Candidate node
					if resp.VoteGranted {
						grantedVote += 1
						if grantedVote > len(rf.peers)/2 {
							DPrintf("{Node: %v} receives majority votes in term %v", rf.me, rf.currentTerm)
							rf.ChangeState(StateLeader)
							rf.BroadcastHeartbeat(true)
						}
					} else if resp.Term > rf.currentTerm {
						// candidate 发现有term 比自己大的,立刻转为follower
						DPrintf("{Node %v} finds a new leader {Node %v} with term %v and steps down in term %v",
							rf.me, peer, resp.Term, rf.currentTerm)
						rf.ChangeState(StateFollower)
						rf.currentTerm, rf.votedFor = resp.Term, -1
						rf.persist()
					}
				}
			}
		}(peer)
	}
}
  | 
- 发起投票需要异步进行,从而不阻塞ticker线程,这样candidate 再次 election timeout 之后才能自增 term 继续发起新一轮选举。
 - 投票统计:可以在函数内定义一个变量并利用 go 的闭包来实现,也可以在结构体中维护一个 votes 变量来实现。为了 raft 结构体更干净,我选择了前者。
 - 抛弃过期请求的回复:对于过期请求的回复,直接抛弃就行,不要做任何处理,这一点 guidance 里面也有介绍到
 
RequestVote#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  | func (rf *Raft) RequestVote(req *RequestVoteRequest, resp *RequestVoteResponse) {
	// Your code here (2A, 2B).
	// 2A
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()
	defer DPrintf("[RequestVote]-{Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v,firstLog %v,lastLog %v} before processing requestVoteRequest %v and reply requestVoteResponse %v",
		rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), req, resp)
	if req.Term < rf.currentTerm || (req.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != req.CandidateId) {
		resp.Term, resp.VoteGranted = rf.currentTerm, false
		return
	}
	if req.Term > rf.currentTerm {
		rf.ChangeState(StateFollower)
		rf.currentTerm, rf.votedFor = req.Term, -1
	}
	// 2A 可以先不实现
	if !rf.isLogUpToDate(req.LastLogTerm, req.LastLogIndex) {
		resp.Term, resp.VoteGranted = rf.currentTerm, false
		return
	}
	rf.votedFor = req.CandidateId
	rf.electionTimer.Reset(RandomizedElectionTimeout())
	resp.Term, resp.VoteGranted = rf.currentTerm, true
}
  | 
「任期」表示节点的逻辑时钟,任期高的节点拥有更高的话语权。在RequestVote这个函数中,如果请求者的任期小于当前节点任期,则拒绝投票;如果请求者任期大于当前节点人气,那么当前节点立马成为追随者。即任期大的节点对任期小的拥有绝对的话语权,一旦发现任期大的节点,立马成为其追随者。
注意,节点的选举随机时间和心跳时间的选择很重要
- 节点随机选择超时时间,通常在 [T, 2T] 之间(T = electionTimeout)
 - 这样,节点不太可能再同时开始竞选,先竞选的节点有足够的时间来索要其他节点的选票
 - T » broadcast time(T 远大于广播时间)时效果更佳
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  | const (
	HeartbeatTimeout = 125
	ElectionTimeout  = 1000
)
func StableHeartbeatTimeout() time.Duration {
	// return time.Duration(HeartbeatTimeout) * time.Millisecond
	return HeartbeatTimeout * time.Millisecond
}
func RandomizedElectionTimeout() time.Duration {
	return time.Duration(ElectionTimeout+globalRand.Intn(ElectionTimeout)) * time.Millisecond
}
  | 
领导者选举主要工作可总结如下:
- 三个状态,三个状态之间的转换。
 - 1个loop——ticker。
 - 1个RPC请求和处理,用于投票。
 
另外,ticker会一直运行,直到节点被kill,因此集群领导者并非唯一,一旦领导者出现了宕机、网络故障等问题,其它节点都能第一时间感知,并迅速做出重新选举的反应,从而维持集群的正常运行,毕竟Raft集群一旦失去了领导者,就无法工作。