snapshot是状态机某一时刻的副本,具体格式依赖存储引擎的实现,比如说:B+树、LSM、哈希表等,6.824是实现一个键值数据库,所以我们采用的是哈希表,在Lab 3可以看到实现。
raft通过日志来实现多副本的数据一致,但是日志会不断膨胀,带来两个缺点:数据量大、恢复时间长,因此需要定期压缩一下,生成snapshot。
快照由上层应用触发。当上层应用认为可以将一些已提交的 entry 压缩成 snapshot 时,其会调用节点的 Snapshot()
函数,将需要压缩的状态机的状态数据传递给节点,作为快照。
在正常情况下,仅由上层应用命令节点进行快照即可。但如果节点出现落后或者崩溃,情况则变得更加复杂。考虑一个日志非常落后的节点 i,当 Leader 向其发送 AppendEntries RPC 时,nextIndex[i] 对应的 entry 已被丢弃,压缩在快照中。这种情况下, Leader 就无法对其进行 AppendEntries。取而代之的是,这里我们应该实现一个新的 InstallSnapshot
RPC,将 Leader 当前的快照直接发送给非常落后的 Follower。
何时快照?
- 服务端触发的日志压缩:上层应用发送快照数据给Raft实例。
- leader 发送来的 InstallSnapshot:领导者发送快照RPC请求给追随者。当raft收到其他节点的压缩请求后,先把请求上报给上层应用,然后上层应用调用rf.CondInstallSnapshot()来决定是否安装快照
流程梳理#
快照是状态机中的概念,需要在状态机中加载快照,因此要通过applyCh将快照发送给状态机,但是发送后Raft并不立即保存快照,而是等待状态机调用 CondInstallSnapshot()
,如果从收到InstallSnapshot()
后到收到CondInstallSnapshot()
前,没有新的日志提交到状态机,则Raft返回True,Raft和状态机保存快照,否则Raft返回False,两者都不保存快照。
如此保证了Raft和状态机保存快照是一个原子操作(SaveStateAndSnapshot
)。当然在InstallSnapshot()
将快照发送给状态机后再将快照保存到Raft,令CondInstallSnap()永远返回True,也可以保证原子操作,但是这样做必须等待快照发送给状态机完成,但是rf.applyCh <- ApplyMsg是有可能阻塞的,由于InstallSnapshot()
需要持有全局的互斥锁,这可能导致整个节点无法工作。
服务端触发的日志压缩
: 上层应用发送快照数据给Raft实例。leader 发送来的 InstallSnapshot
: Leader发送快照RPC请求给Follower。当raft收到其他节点的压缩请求后,先把请求上报给上层应用,然后上层应用调用rf.CondInstallSnapshot()
来决定是否安装快照(SaveStateAndSnapshot
)
相关函数解析#
服务端触发的Log Compact#
func (rf *Raft) Snapshot(index int, snapshot []byte)
应用程序将index(包括)之前的所有日志都打包为了快照,即参数snapshot [] byte。那么对于Raft要做的就是,将打包为快照的日志直接删除,并且要将快照保存起来,因为将来可能会发现某些节点大幅度落后于leader的日志,那么leader就直接发送快照给它,让他的日志“跟上来”。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
lastSnapshotIndex := rf.getFirstLog().Index
// 当前节点的firstLogIndex 比要添加的Snapshot LastIncludedIndex 大,说明已经存在了Snapshot 包含了更多的log
if index <= lastSnapshotIndex {
DPrintf("[Snapshot] - {Node %v} rejects replacing log with snapshotIndex %v as current lastSnapshotIndex %v is larger in term %v", rf.me, index, lastSnapshotIndex, rf.currentTerm)
return
}
// 新的日志索引包含了 LastIncludedIndex 这个位置,因为要把它作为dummpy index
rf.logs = shrinkEntriesArray(rf.logs[index-lastSnapshotIndex:])
rf.logs[0].Command = nil
rf.persister.SaveStateAndSnapshot(rf.encodeState(), snapshot)
DPrintf("[Snapshot] - {Node: %v}'s state is {state %v, term %v, commitIndex %v, lastApplied %v, firstLog %v, lastLogLog %v} after replacing log with snapshotIndex %v as lastSnapshotIndex %v is smaller",
rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), index, lastSnapshotIndex)
}
|
由 Leader 发送来的 InstallSnapshot#
func (rf *Raft) InstallSnapshot(req *InstallSnapshotReq, resp *InstallSnapshotResp)
对于 leader 发过来的 InstallSnapshot,只需要判断 term 是否正确,如果无误则 follower 只能无条件接受。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| func (rf *Raft) InstallSnapshot(req *InstallSnapshotReq, resp *InstallSnapshotResp) {
rf.mu.Lock()
defer rf.mu.Unlock()
defer DPrintf("[InstallSnapshot] - {Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v,firstLog %v,lastLog %v} before processing InstallSnapshotRequest %v and reply InstallSnapshotResponse %v",
rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), req, resp)
resp.Term = rf.currentTerm
if req.Term < rf.currentTerm {
return
}
if req.Term > rf.currentTerm {
rf.currentTerm, rf.votedFor = req.Term, -1
rf.persist()
}
rf.ChangeState(StateFollower)
rf.electionTimer.Reset(RandomizedElectionTimeout())
// outdated snapshot
// snapshot 的 lastIncludedIndex 小于等于本地的 commitIndex,
// 那说明本地已经包含了该 snapshot 所有的数据信息,尽管可能状态机还没有这个 snapshot 新,
// 即 lastApplied 还没更新到 commitIndex,但是 applier 协程也一定尝试在 apply 了,
// 此时便没必要再去用 snapshot 更换状态机了。对于更新的 snapshot,这里通过异步的方式将其
// push 到 applyCh 中。
if req.LastIncludedIndex <= rf.commitIndex {
return
}
go func() {
rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: req.Data,
SnapshotTerm: req.LastIncludedTerm,
SnapshotIndex: req.LastIncludedIndex,
}
}()
}
|
Follower 收到 InstallSnapshot RPC 后#
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool
Follower接收到snapshot后不能够立刻应用并截断日志,raft和状态机都需要应用snapshot,这需要考虑原子性。如果raft应用成功但状态机应用snapshot失败,那么在接下来的时间里客户端读到的数据是不完整的。如果状态机应用snapshot成功但raft应用失败,那么raft会要求重传,状态机应用成功也没啥意义。因此CondInstallSnapshot
是异步于raft的,并由应用层调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("[CondInstallSnapshot] - {Node %v} service calls CondInstallSnapshot with lastIncludedTerm %v and lastIncludedIndex %v to check whether snapshot is still valid in term %v",
rf.me, lastIncludedTerm, lastIncludedIndex, rf.currentTerm)
// outdated snapshot
if lastIncludedIndex <= rf.commitIndex {
DPrintf("[CondInstallSnapshot] - {Node %v} rejects the snapshot which lastIncludedIndex is %v because commitIndex %v is larger",
rf.me, lastIncludedIndex, rf.commitIndex)
return false
}
if lastIncludedIndex > rf.getLastLog().Index {
rf.logs = make([]Entry, 1)
} else {
rf.logs = shrinkEntriesArray(rf.logs[lastIncludedIndex-rf.getFirstLog().Index:])
rf.logs[0].Command = nil
}
rf.logs[0].Term, rf.logs[0].Index = lastIncludedTerm, lastIncludedIndex
rf.lastApplied, rf.commitIndex = lastIncludedIndex, lastIncludedIndex
rf.persister.SaveStateAndSnapshot(rf.encodeState(), snapshot)
DPrintf("[CondInstallSnapshot] - {Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v,firstLog %v,lastLog %v} after accepting the snapshot which lastIncludedTerm is %v, lastIncludedIndex is %v",
rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), lastIncludedTerm, lastIncludedIndex)
return true
}
|
假设有一个节点一直是 crash 的,然后复活了,leader 发现其落后的太多,于是发送 InstallSnapshot() RPC 到落后的节点上面。落后节点收到 InstallSnapshot() 中的 snapshot 后,通过 rf.applyCh 发送给上层 service 。上层的 service 收到 snapshot 时,调用节点的 CondInstallSnapshot() 方法。节点如果在该 snapshot 之后有新的 commit,则拒绝安装此 snapshot CondInstallSnapshot 中的 lastIncludedIndex <= rf.commitIndex
,service 也会放弃本次安装。反之如果在该 snapshot 之后没有新的 commit,那么节点会安装此 snapshot 并返回 true,service 收到后也同步安装。
在实验大纲中指出不能直接通过rf.logs[idx:] 的方式去做日志的截取保存,防止GC 不能及时回收。
Raft must discard old log entries in a way that allows the Go garbage collector to free and re-use the memory; this requires that there be no reachable references (pointers) to the discarded log entries.