介绍

流程梳理

整体逻辑, 从 ticker goroutine 开始, 集群开始的时候,所有节点均为Follower, 它们依靠ticker()成为Candidateticker 协程会定期收到两个 timer 的到期事件,如果是 election timer 到期,则发起一轮选举;如果是 heartbeat timer 到期且节点是 leader,则发起一轮心跳。

ElectionTimerHeartbeatTimer. 如果某个raft 节点election timeout,则会触发leader election, 调用StartElection 方法。 StartElection 中发送 RequestVote RPC, 根据ReqestVote Response 判断是否收到选票,决定是否成为Leader

如果某个节点,收到大多数节点的选票,成为Leader 要通过发送Heartbeat 即空LogEntry 的AppendEntries RPC 来告诉其他节点自己的 Leader 地位。

所以Lab2A 中,主要实现 RequestVote, AppendEntries 的逻辑。

服务器状态

服务器在任意时间只能处于以下三种状态之一:

  • Leader:处理所有客户端请求、日志同步、心跳维持领导权。同一时刻最多只能有一个可行的 Leader
  • Follower:所有服务器的初始状态,功能为:追随领导者,接收领导者日志并实时同步,特性:完全被动的(不发送 RPC,只响应收到的 RPC)
  • Candidate:用来选举新的 Leader,处于 Leader 和 Follower 之间的暂时状态,如Follower 一定时间内未收到来自Leader的心跳包,Follower会自动切换为Candidate,并开始选举操作,向集群中的其它节点发送投票请求,待收到半数以上的选票时,协调者升级成为领导者。

系统正常运行时,只有一个 Leader,其余都是 Followers。Leader拥有绝对的领导力,不断向Followers同步日志且发送心跳状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Raft struct {
	mu        sync.RWMutex        // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.

	// 2A
	state          NodeState
	currentTerm    int
	votedFor       int
	electionTimer  *time.Timer
	heartbeatTimer *time.Timer

	// 2B
	logs        []Entry // the first is dummy entry which contains LastSnapshotTerm, LastSnapshotIndex and nil Command
	commitIndex int
	lastApplied int
	nextIndex   []int
	matchIndex  []int

	applyCh        chan ApplyMsg
	applyCond      *sync.Cond   // used to wakeup applier goroutine after committing new entries
	replicatorCond []*sync.Cond // used to signal replicator goroutine to batch replicating entries
}

启动

  • 集群所有节点初始状态均为Follower
  • Follower 被动地接受 Leader 或 Candidate 的 RPC;
  • 所以,如果 Leader 想要保持权威,必须向集群中的其它节点发送心跳包(空的 AppendEntries RPC)
  • 等待选举超时(electionTimeout,一般在 100~500ms)后,Follower 没有收到任何 RPC
  • Follower 认为集群中没有 Leader
  • 开始新的一轮选举
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{
		peers:          peers,
		persister:      persister,
		me:             me,
		dead:           0,
		applyCh:        applyCh,
		replicatorCond: make([]*sync.Cond, len(peers)),
		state:          StateFollower,
		currentTerm:    0,
		votedFor:       -1,
		logs:           make([]Entry, 1),
		nextIndex:      make([]int, len(peers)),
		matchIndex:     make([]int, len(peers)),
		heartbeatTimer: time.NewTimer(StableHeartbeatTimeout()),
		electionTimer:  time.NewTimer(RandomizedElectionTimeout()),
	}

	// Your initialization code here (2A, 2B, 2C).

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	rf.applyCond = sync.NewCond(&rf.mu)
	lastLog := rf.getLastLog()
	for i := 0; i < len(peers); i++ {
		rf.matchIndex[i], rf.nextIndex[i] = 0, lastLog.Index+1
		if i != rf.me {
			rf.replicatorCond[i] = sync.NewCond(&sync.Mutex{})
			// start replicator goroutine to replicate entries in batch
			go rf.replicator(i)
		}
	}

	// start ticker goroutine to start elections
	go rf.ticker()

	// start applier goroutine to push committed logs into applyCh exactly once
	go rf.applier()
	return rf
}

集群开始的时候,所有节点均为Follower, 它们依靠ticker()成为Candidate。ticker 协程会定期收到两个 timer 的到期事件,如果是 election timer 到期,则发起一轮选举;如果是 heartbeat timer 到期且节点是 leader,则发起一轮心跳。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// ticker The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
	// for rf.killed() == false {
	for !rf.killed() {

		// Your code here to check if a leader election should
		// be started and to randomize sleeping time using
		// time.Sleep().
		select {
		case <-rf.electionTimer.C: // start election
			DPrintf("{Node: %v} election timeout", rf.me)
			rf.mu.Lock()
			rf.ChangeState(StateCandidate)
			rf.currentTerm += 1
			rf.StartElection()
			rf.electionTimer.Reset(RandomizedElectionTimeout())
			rf.mu.Unlock()

		case <-rf.heartbeatTimer.C: // 领导者发送心跳维持领导力, 2A 可以先不实现
			rf.mu.Lock()
			if rf.state == StateLeader {
				rf.BroadcastHeartbeat(true)
				rf.heartbeatTimer.Reset(StableHeartbeatTimeout())
			}
			rf.mu.Unlock()
		}

	}
}

选举与投票

当一个节点开始竞选:

  1. 增加自己的 currentTerm
  2. 转为 Candidate 状态,其目标是获取超过半数节点的选票,让自己成为 Leader
  3. 先给自己投一票
  4. 并行地向集群中其它节点发送 RequestVote RPC 索要选票,如果没有收到指定节点的响应,它会反复尝试,直到发生以下三种情况之一:
    • 获得超过半数的选票:成为 Leader,并向其它节点发送 AppendEntries 心跳;
    • 收到来自 Leader 的 RPC:转为 Follower;
    • 其它两种情况都没发生,没人能够获胜(electionTimeout 已过):增加 currentTerm,开始新一轮选举;

Candidate 选举程序与投票统计

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
unc (rf *Raft) StartElection() {
	req := rf.genRequestVoteReq()
	DPrintf("{Note: %v} starts election with RequestVoteReq: %v", rf.me, req)

	// Closure
	grantedVote := 1 // elect for itself
	rf.votedFor = rf.me
	rf.persist()
	for peer := range rf.peers {
		if peer == rf.me {
			continue
		}

		go func(peer int) {
			resp := new(RequestVoteResponse)
			if rf.sendRequestVote(peer, req, resp) {
				rf.mu.Lock()
				defer rf.mu.Unlock()
				DPrintf("[RequestVoteResp]-{Node: %v} receives RequestVoteResponse %v from {Node: %v} after sending RequestVoteRequest %v in term %v",
					rf.me, resp, peer, req, rf.currentTerm)

				// rf.currentTerm == req.Term 为了抛弃过期的RequestVote RPC
				if rf.currentTerm == req.Term && rf.state == StateCandidate { // Candidate node
					if resp.VoteGranted {
						grantedVote += 1
						if grantedVote > len(rf.peers)/2 {
							DPrintf("{Node: %v} receives majority votes in term %v", rf.me, rf.currentTerm)
							rf.ChangeState(StateLeader)
							rf.BroadcastHeartbeat(true)
						}
					} else if resp.Term > rf.currentTerm {
						// candidate 发现有term 比自己大的,立刻转为follower
						DPrintf("{Node %v} finds a new leader {Node %v} with term %v and steps down in term %v",
							rf.me, peer, resp.Term, rf.currentTerm)
						rf.ChangeState(StateFollower)
						rf.currentTerm, rf.votedFor = resp.Term, -1
						rf.persist()
					}
				}
			}
		}(peer)
	}
}
  • 发起投票需要异步进行,从而不阻塞ticker线程,这样candidate 再次 election timeout 之后才能自增 term 继续发起新一轮选举。
  • 投票统计:可以在函数内定义一个变量并利用 go 的闭包来实现,也可以在结构体中维护一个 votes 变量来实现。为了 raft 结构体更干净,我选择了前者。
  • 抛弃过期请求的回复:对于过期请求的回复,直接抛弃就行,不要做任何处理,这一点 guidance 里面也有介绍到

RequestVote

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (rf *Raft) RequestVote(req *RequestVoteRequest, resp *RequestVoteResponse) {
	// Your code here (2A, 2B).
	// 2A
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()
	defer DPrintf("[RequestVote]-{Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v,firstLog %v,lastLog %v} before processing requestVoteRequest %v and reply requestVoteResponse %v",
		rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), req, resp)

	if req.Term < rf.currentTerm || (req.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != req.CandidateId) {
		resp.Term, resp.VoteGranted = rf.currentTerm, false
		return
	}

	if req.Term > rf.currentTerm {
		rf.ChangeState(StateFollower)
		rf.currentTerm, rf.votedFor = req.Term, -1
	}

	// 2A 可以先不实现
	if !rf.isLogUpToDate(req.LastLogTerm, req.LastLogIndex) {
		resp.Term, resp.VoteGranted = rf.currentTerm, false
		return
	}

	rf.votedFor = req.CandidateId
	rf.electionTimer.Reset(RandomizedElectionTimeout())
	resp.Term, resp.VoteGranted = rf.currentTerm, true
}

「任期」表示节点的逻辑时钟,任期高的节点拥有更高的话语权。在RequestVote这个函数中,如果请求者的任期小于当前节点任期,则拒绝投票;如果请求者任期大于当前节点人气,那么当前节点立马成为追随者。即任期大的节点对任期小的拥有绝对的话语权,一旦发现任期大的节点,立马成为其追随者。

注意,节点的选举随机时间和心跳时间的选择很重要

  • 节点随机选择超时时间,通常在 [T, 2T] 之间(T = electionTimeout)
  • 这样,节点不太可能再同时开始竞选,先竞选的节点有足够的时间来索要其他节点的选票
  • T » broadcast time(T 远大于广播时间)时效果更佳
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
const (
	HeartbeatTimeout = 125
	ElectionTimeout  = 1000
)

func StableHeartbeatTimeout() time.Duration {
	// return time.Duration(HeartbeatTimeout) * time.Millisecond
	return HeartbeatTimeout * time.Millisecond
}

func RandomizedElectionTimeout() time.Duration {
	return time.Duration(ElectionTimeout+globalRand.Intn(ElectionTimeout)) * time.Millisecond
}

总结

领导者选举主要工作可总结如下:

  • 三个状态,三个状态之间的转换。
  • 1个loop——ticker。
  • 1个RPC请求和处理,用于投票。

另外,ticker会一直运行,直到节点被kill,因此集群领导者并非唯一,一旦领导者出现了宕机、网络故障等问题,其它节点都能第一时间感知,并迅速做出重新选举的反应,从而维持集群的正常运行,毕竟Raft集群一旦失去了领导者,就无法工作。