流程梳理#
整体逻辑, 从 ticker
goroutine 开始, 集群开始的时候,所有节点均为Follower, 它们依靠ticker()成为Candidate
。ticker
协程会定期收到两个 timer 的到期事件,如果是 election timer 到期,则发起一轮选举;如果是 heartbeat timer 到期且节点是 leader,则发起一轮心跳。
ElectionTimer
和 HeartbeatTimer
. 如果某个raft 节点election timeout,则会触发leader election, 调用StartElection
方法。 StartElection
中发送 RequestVote RPC
, 根据ReqestVote Response 判断是否收到选票,决定是否成为Leader
。
如果某个节点,收到大多数节点的选票,成为Leader
要通过发送Heartbeat
即空LogEntry 的AppendEntries RPC
来告诉其他节点自己的 Leader
地位。
所以Lab2A 中,主要实现 RequestVote
, AppendEntries
的逻辑。
服务器状态#
服务器在任意时间只能处于以下三种状态之一:
Leader
:处理所有客户端请求、日志同步、心跳维持领导权。同一时刻最多只能有一个可行的 LeaderFollower
:所有服务器的初始状态,功能为:追随领导者,接收领导者日志并实时同步,特性:完全被动的(不发送 RPC,只响应收到的 RPC)Candidate
:用来选举新的 Leader,处于 Leader 和 Follower 之间的暂时状态,如Follower 一定时间内未收到来自Leader的心跳包,Follower会自动切换为Candidate,并开始选举操作,向集群中的其它节点发送投票请求,待收到半数以上的选票时,协调者升级成为领导者。
系统正常运行时,只有一个 Leader,其余都是 Followers。Leader拥有绝对的领导力,不断向Followers同步日志且发送心跳状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| type Raft struct {
mu sync.RWMutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
// 2A
state NodeState
currentTerm int
votedFor int
electionTimer *time.Timer
heartbeatTimer *time.Timer
// 2B
logs []Entry // the first is dummy entry which contains LastSnapshotTerm, LastSnapshotIndex and nil Command
commitIndex int
lastApplied int
nextIndex []int
matchIndex []int
applyCh chan ApplyMsg
applyCond *sync.Cond // used to wakeup applier goroutine after committing new entries
replicatorCond []*sync.Cond // used to signal replicator goroutine to batch replicating entries
}
|
- 集群所有节点初始状态均为Follower
- Follower 被动地接受 Leader 或 Candidate 的 RPC;
- 所以,如果 Leader 想要保持权威,必须向集群中的其它节点发送心跳包(空的
AppendEntries
RPC) - 等待选举超时(electionTimeout,一般在 100~500ms)后,Follower 没有收到任何 RPC
- Follower 认为集群中没有 Leader
- 开始新的一轮选举
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{
peers: peers,
persister: persister,
me: me,
dead: 0,
applyCh: applyCh,
replicatorCond: make([]*sync.Cond, len(peers)),
state: StateFollower,
currentTerm: 0,
votedFor: -1,
logs: make([]Entry, 1),
nextIndex: make([]int, len(peers)),
matchIndex: make([]int, len(peers)),
heartbeatTimer: time.NewTimer(StableHeartbeatTimeout()),
electionTimer: time.NewTimer(RandomizedElectionTimeout()),
}
// Your initialization code here (2A, 2B, 2C).
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
rf.applyCond = sync.NewCond(&rf.mu)
lastLog := rf.getLastLog()
for i := 0; i < len(peers); i++ {
rf.matchIndex[i], rf.nextIndex[i] = 0, lastLog.Index+1
if i != rf.me {
rf.replicatorCond[i] = sync.NewCond(&sync.Mutex{})
// start replicator goroutine to replicate entries in batch
go rf.replicator(i)
}
}
// start ticker goroutine to start elections
go rf.ticker()
// start applier goroutine to push committed logs into applyCh exactly once
go rf.applier()
return rf
}
|
集群开始的时候,所有节点均为Follower, 它们依靠ticker()成为Candidate。ticker 协程会定期收到两个 timer 的到期事件,如果是 election timer 到期,则发起一轮选举;如果是 heartbeat timer 到期且节点是 leader,则发起一轮心跳。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| // ticker The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
// for rf.killed() == false {
for !rf.killed() {
// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().
select {
case <-rf.electionTimer.C: // start election
DPrintf("{Node: %v} election timeout", rf.me)
rf.mu.Lock()
rf.ChangeState(StateCandidate)
rf.currentTerm += 1
rf.StartElection()
rf.electionTimer.Reset(RandomizedElectionTimeout())
rf.mu.Unlock()
case <-rf.heartbeatTimer.C: // 领导者发送心跳维持领导力, 2A 可以先不实现
rf.mu.Lock()
if rf.state == StateLeader {
rf.BroadcastHeartbeat(true)
rf.heartbeatTimer.Reset(StableHeartbeatTimeout())
}
rf.mu.Unlock()
}
}
}
|
选举与投票#
当一个节点开始竞选:
- 增加自己的
currentTerm
- 转为
Candidate
状态,其目标是获取超过半数节点的选票,让自己成为 Leader - 先给自己投一票
- 并行地向集群中其它节点发送
RequestVote RPC
索要选票,如果没有收到指定节点的响应,它会反复尝试,直到发生以下三种情况之一:- 获得超过半数的选票:成为
Leader
,并向其它节点发送 AppendEntries
心跳; - 收到来自
Leader
的 RPC:转为 Follower;
- 其它两种情况都没发生,没人能够获胜(electionTimeout 已过):增加
currentTerm
,开始新一轮选举;
Candidate
选举程序与投票统计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| unc (rf *Raft) StartElection() {
req := rf.genRequestVoteReq()
DPrintf("{Note: %v} starts election with RequestVoteReq: %v", rf.me, req)
// Closure
grantedVote := 1 // elect for itself
rf.votedFor = rf.me
rf.persist()
for peer := range rf.peers {
if peer == rf.me {
continue
}
go func(peer int) {
resp := new(RequestVoteResponse)
if rf.sendRequestVote(peer, req, resp) {
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("[RequestVoteResp]-{Node: %v} receives RequestVoteResponse %v from {Node: %v} after sending RequestVoteRequest %v in term %v",
rf.me, resp, peer, req, rf.currentTerm)
// rf.currentTerm == req.Term 为了抛弃过期的RequestVote RPC
if rf.currentTerm == req.Term && rf.state == StateCandidate { // Candidate node
if resp.VoteGranted {
grantedVote += 1
if grantedVote > len(rf.peers)/2 {
DPrintf("{Node: %v} receives majority votes in term %v", rf.me, rf.currentTerm)
rf.ChangeState(StateLeader)
rf.BroadcastHeartbeat(true)
}
} else if resp.Term > rf.currentTerm {
// candidate 发现有term 比自己大的,立刻转为follower
DPrintf("{Node %v} finds a new leader {Node %v} with term %v and steps down in term %v",
rf.me, peer, resp.Term, rf.currentTerm)
rf.ChangeState(StateFollower)
rf.currentTerm, rf.votedFor = resp.Term, -1
rf.persist()
}
}
}
}(peer)
}
}
|
- 发起投票需要异步进行,从而不阻塞ticker线程,这样candidate 再次 election timeout 之后才能自增 term 继续发起新一轮选举。
- 投票统计:可以在函数内定义一个变量并利用 go 的闭包来实现,也可以在结构体中维护一个 votes 变量来实现。为了 raft 结构体更干净,我选择了前者。
- 抛弃过期请求的回复:对于过期请求的回复,直接抛弃就行,不要做任何处理,这一点 guidance 里面也有介绍到
RequestVote#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| func (rf *Raft) RequestVote(req *RequestVoteRequest, resp *RequestVoteResponse) {
// Your code here (2A, 2B).
// 2A
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()
defer DPrintf("[RequestVote]-{Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v,firstLog %v,lastLog %v} before processing requestVoteRequest %v and reply requestVoteResponse %v",
rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), req, resp)
if req.Term < rf.currentTerm || (req.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != req.CandidateId) {
resp.Term, resp.VoteGranted = rf.currentTerm, false
return
}
if req.Term > rf.currentTerm {
rf.ChangeState(StateFollower)
rf.currentTerm, rf.votedFor = req.Term, -1
}
// 2A 可以先不实现
if !rf.isLogUpToDate(req.LastLogTerm, req.LastLogIndex) {
resp.Term, resp.VoteGranted = rf.currentTerm, false
return
}
rf.votedFor = req.CandidateId
rf.electionTimer.Reset(RandomizedElectionTimeout())
resp.Term, resp.VoteGranted = rf.currentTerm, true
}
|
「任期」表示节点的逻辑时钟,任期高的节点拥有更高的话语权。在RequestVote
这个函数中,如果请求者的任期小于当前节点任期,则拒绝投票;如果请求者任期大于当前节点人气,那么当前节点立马成为追随者。即任期大的节点对任期小的拥有绝对的话语权,一旦发现任期大的节点,立马成为其追随者。
注意,节点的选举随机时间和心跳时间的选择很重要
- 节点随机选择超时时间,通常在 [T, 2T] 之间(T = electionTimeout)
- 这样,节点不太可能再同时开始竞选,先竞选的节点有足够的时间来索要其他节点的选票
- T » broadcast time(T 远大于广播时间)时效果更佳
1
2
3
4
5
6
7
8
9
10
11
12
13
| const (
HeartbeatTimeout = 125
ElectionTimeout = 1000
)
func StableHeartbeatTimeout() time.Duration {
// return time.Duration(HeartbeatTimeout) * time.Millisecond
return HeartbeatTimeout * time.Millisecond
}
func RandomizedElectionTimeout() time.Duration {
return time.Duration(ElectionTimeout+globalRand.Intn(ElectionTimeout)) * time.Millisecond
}
|
领导者选举主要工作可总结如下:
- 三个状态,三个状态之间的转换。
- 1个loop——ticker。
- 1个RPC请求和处理,用于投票。
另外,ticker会一直运行,直到节点被kill,因此集群领导者并非唯一,一旦领导者出现了宕机、网络故障等问题,其它节点都能第一时间感知,并迅速做出重新选举的反应,从而维持集群的正常运行,毕竟Raft集群一旦失去了领导者,就无法工作。