流程梳理#
相关的RPC 在Raft0 中已经介绍, 这里不再赘述。
启动的Goroutine:
ticker
一个,用于监听 Election Timeout 或者Heartbeat Timeoutapplier
一个,监听 leader commit 之后,把log 发送到ApplyCh,然后从applyCh 中持久化到本地replicator
n-1 个,每一个对应一个 peer。监听心跳广播命令,仅在节点为 Leader 时工作, 唤醒条件变量。接收到命令后,向对应的 peer 发送 AppendEntries RPC。
日志结构#
每个节点存储自己的日志副本(log[]),每条日志记录包含:
- 索引:该记录在日志中的位置
- 任期号:该记录首次被创建时的任期号
- 命令
1
2
3
4
5
| type Entry struct {
Index int
Term int
Command interface{}
}
|
日志「已提交」与「已应用」概念:
- 已提交:committed, 数据在本地raft 日志中记录,没有应用到状态机
- 已应用:真正的数据变化。提交到大多数节点之后,应用到各自本地的状态机中。
已提交的日志被应用后才会生效
日志同步:
日志同步是Leader独有的权利,Leader向Follower发送日志,Follower同步日志。
日志同步要解决如下两个问题:
- Leader发送心跳宣示自己的主权,Follower不会发起选举。
- Leader将自己的日志数据同步到Follower,达到数据备份的效果。
运行流程
客户端向 Leader 发送命令,希望该命令被所有状态机执行;
- Leader 先将该命令追加到自己的日志中;
- Leader 并行地向其它节点发送 AppendEntries RPC,等待响应;
- 收到超过半数节点的响应,则认为新的日志记录是被提交的:
- Leader 将命令传给自己的状态机,然后向客户端返回响应
- 一旦 Leader 知道一条记录被提交了,将在后续的 AppendEntries RPC 中通知已经提交记录的 Followers
- Follower 将已提交的命令传给自己的状态机
- 如果 Follower 宕机/超时:Leader 将反复尝试发送 RPC;
性能优化:Leader 不必等待每个 Follower 做出响应,只需要超过半数的成功响应(确保日志记录已经存储在超过半数的节点上)——一个很慢的节点不会使系统变慢,因为 Leader 不必等他;
AppendEntries RPC 具体介绍参考此处文章
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| type AppendEntriesReq struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
LeaderComment int
Entries []Entry
}
func (req AppendEntriesReq) String() string {
return fmt.Sprintf("{Term: %d, LeaderId: %v, PreVoteLogIndex: %v, PreVoteLogTerm: %v, LeaderComment: %v, Entries: %v}",
req.Term, req.LeaderId, req.PrevLogIndex, req.PrevLogTerm, req.LeaderComment, req.Entries)
}
type AppendEntriesResp struct {
Term int
Success bool
// for fast backup https://mit-public-courses-cn-translatio.gitbook.io/mit6-824/lecture-07-raft2/7.3-hui-fu-jia-su-backup-acceleration
ConflictIndex int
ConflictTerm int
ConflictLen int
}
func (resp AppendEntriesResp) String() string {
return fmt.Sprintf("{Term:%v,Success:%v,ConflictIndex:%v,ConflictTerm:%v}",
resp.Term, resp.Success, resp.ConflictIndex, resp.ConflictTerm)
}
|
AppendEntries RPC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
func (rf *Raft) AppendEntries(req *AppendEntriesReq, resp *AppendEntriesResp) {
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()
defer DPrintf("[AppendEntries]- {Node: %v}'s state is {state %v, term %v, commitIndex %v, lastApplied %v, firstLog %v, lastLog %v} before processing AppendEntriesRequest %v and reply AppendEntries %v",
rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), req, resp)
// 如果发现来自leader的rpc中的term比当前peer要小,
// 说明是该RPC 来自旧的term(leader),|| 或者 当前leader 需要更新 不处理
if req.Term < rf.currentTerm {
resp.Term, resp.Success = rf.currentTerm, false
return
}
// 一般来讲,在vote的时候已经将currentTerm和leader同步
// 不过,有些peer暂时的掉线或者其他一些情况重连以后,会发现term和leader不一样
// 以收到大于自己的term的rpc也是第一时间同步.而且要将votefor重新设置为-1
// 等待将来选举 (说明这个peer 不是之前election 中投的的marjority)
if req.Term > rf.currentTerm {
rf.currentTerm, rf.votedFor = req.Term, -1
}
rf.ChangeState(StateFollower)
rf.electionTimer.Reset(RandomizedElectionTimeout())
// PrevLogIndex 比rf 当前的第一个Log index 还要小
if req.PrevLogIndex < rf.getFirstLog().Index {
resp.Term, resp.Success = 0, false
DPrintf("[AppendEntries] - {Node: %v} receives unexpected AppendEntriesRequest %v from {Node: %v} because prevLogIndex %v < firstLogIndex %v",
rf.me, req, req.LeaderId, req.PrevLogIndex, rf.getFirstLog().Index)
return
}
if !rf.matchLog(req.PrevLogTerm, req.PrevLogIndex) {
// 日志的一致性检查失败后,递归找到需要追加日志的位置
resp.Term, resp.Success = rf.currentTerm, false
lastIndex := rf.getLastLog().Index
if lastIndex < req.PrevLogIndex {
// lastIndex 和 nextIndex[peer] 之间有空洞 scenario3
// follower 在nextIndex[peer] 没有log
resp.ConflictTerm = -1
resp.ConflictIndex = lastIndex + 1
} else {
// scenario2, 1
// 以任期为单位进行回退
firstIndex := rf.getFirstLog().Index
resp.ConflictTerm = rf.logs[req.PrevLogIndex-firstIndex].Term
index := req.PrevLogIndex - 1
for index >= firstIndex && rf.logs[index-firstIndex].Term == resp.ConflictTerm {
index--
}
resp.ConflictIndex = index
}
return
}
firstIndex := rf.getFirstLog().Index
for i, entry := range req.Entries {
// mergeLog
// 添加的日志索引位置 比Follower 日志相同 直接添加 此处用大于等于,实际只有==
// || 要添加的日志索引位置在 Follower 中的任期和AE RPC 中的Term 冲突
if entry.Index-firstIndex >= len(rf.logs) || rf.logs[entry.Index-firstIndex].Term != entry.Term {
rf.logs = shrinkEntriesArray(append(rf.logs[:entry.Index-firstIndex], req.Entries[i:]...))
break
}
}
rf.advanceCommitIndexForFollower(req.LeaderComment)
resp.Term, resp.Success = rf.currentTerm, true
}
|
复制模型(log replication)#
对于复制模型,很直观的方式是:包装一个 BroadcastHeartbeat() 函数,其负责向所有 follower 发送一轮同步。不论是心跳超时还是上层服务传进来一个新 command,都去调一次这个函数来发起一轮同步。
以上方式是可以 work 的,我最开始的实现也是这样的,然而在测试过程中,我发现这种方式有很大的资源浪费。比如上层服务连续调用了几十次 Start() 函数,由于每一次调用 Start() 函数都会触发一轮日志同步,则最终导致发送了几十次日志同步。一方面,这些请求包含的 entries 基本都一样,甚至有 entry 连续出现在几十次 rpc 中,这样的实现多传输了一些数据,存在一定浪费;另一方面,每次发送 rpc 都不论是发送端还是接收端都需要若干次系统调用和内存拷贝,rpc 次数过多也会对 CPU 造成不必要的压力。总之,这种资源浪费的根本原因就在于:将日志同步的触发与上层服务提交新指令强绑定,从而导致发送了很多重复的 rpc。
为此,参考了 sofajraft 的日志复制实现 。每个 peer 在启动时会为除自己之外的每个 peer 都分配一个 replicator 协程。对于 follower 节点,该协程利用条件变量执行 wait 来避免耗费 cpu,并等待变成 leader 时再被唤醒;对于 leader 节点,该协程负责尽最大地努力去向对应 follower 发送日志使其同步,直到该节点不再是 leader 或者该 follower 节点的 matchIndex 大于等于本地的 lastIndex。
这样的实现方式能够将日志同步的触发和上层服务提交新指令解耦,能够大幅度减少传输的数据量,rpc 次数和系统调用次数。由于 6.824 的测试能够展示测试过程中的传输 rpc 次数和数据量,因此我进行了前后的对比测试,结果显示:这样的实现方式相比直观方式的实现,不同测试数据传输量的减少倍数在 1-20 倍之间。当然,这样的实现也只是实现了粗粒度的 batching,并没有流量控制,而且也没有实现 pipeline,有兴趣的同学可以去了解 sofajraft, etcd 或者 tikv 的实现,他们对于复制过程进行了更细粒度的控制。
此外,虽然 leader 对于每一个节点都有一个 replicator 协程去同步日志,但其目前同时最多只能发送一个 rpc,而这个 rpc 很可能超时或丢失从而触发集群换主。因此,对于 heartbeat timeout 触发的 BroadcastHeartbeat,我们需要立即发出日志同步请求而不是让 replicator 去发。这也就是我的 BroadcastHeartbeat 函数有两种行为的真正原因。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| // handleAppendEntriesResponse peer handle AppendEntries RPC
func (rf *Raft) handleAppendEntriesResponse(peer int, req *AppendEntriesReq, resp *AppendEntriesResp) {
defer DPrintf("[handleAppendEntriesResponse]-{Node %v}'s state is {state %v,term %v,commitIndex %v,lastApplied %v,firstLog %v,lastLog %v} after handling AppendEntriesResponse %v for AppendEntriesRequest %v",
rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), resp, req)
if rf.state == StateLeader && rf.currentTerm == req.Term {
if resp.Success {
// 更新matchIndex, nextIndex
rf.matchIndex[peer] = req.PrevLogIndex + len(req.Entries)
rf.nextIndex[peer] = rf.matchIndex[peer] + 1
rf.advanceCommitIndexForLeader()
} else {
// term 太小而失败
if resp.Term > rf.currentTerm {
rf.ChangeState(StateFollower)
rf.currentTerm, rf.votedFor = resp.Term, -1
rf.persist()
} else if resp.Term == rf.currentTerm { // 日志不匹配而失败
rf.nextIndex[peer] = resp.ConflictIndex
// 1. 如果在Leader 中能找到和Follower 有相同的ConflictTerm,
// 返回该Leader Term 的最后一个Log 作为nextIndex[peer]
// 2. 如果找不到相同的Term,返回Follower 中的ConflictTerm 的第一个日志,即ConflictIndex
if resp.ConflictTerm != -1 {
firstIndex := rf.getFirstLog().Index
for i := req.PrevLogIndex; i >= firstIndex; i-- {
if rf.logs[i-firstIndex].Term == resp.ConflictTerm {
rf.nextIndex[peer] = i
break
}
}
}
}
}
}
}
func (rf *Raft) replicator(peer int) {
rf.replicatorCond[peer].L.Lock()
defer rf.replicatorCond[peer].L.Unlock()
for !rf.killed() {
// if there is no need to replicate entries for this peer,
// just release CPU and wait other goroutine's signal if service adds new Command
// if this peer needs replicating entries, this goroutine will call
// replicateOneRound(peer) multiple times until this peer catches up, and then wait
// Only Leader 可以Invoke 这个方法,通过.Singal 唤醒各个peer, 不是Leader 不生效
for !rf.needReplicating(peer) {
rf.replicatorCond[peer].Wait()
}
// maybe a pipeline mechanism is better to trade-off the memory usage and catch up time
rf.replicateOneRound(peer)
}
}
|
日志应用 异步 applier 的 exactly once#
Raft论文的说话,一旦发现commitIndex大于lastApplied,应该立马将可应用的日志应用到状态机中。Raft节点本身是没有状态机实现的,状态机应该由Raft的上层应用来实现,因此我们不会谈论如何实现状态机,只需将日志发送给applyCh这个通道即可。
对于异步 apply,其触发方式无非两种,leader 提交了新的日志或者 follower 通过 leader 发来的 leaderCommit 来更新 commitIndex。很多人实现的时候可能顺手就在这两处异步启一个协程把 [lastApplied + 1, commitIndex] 的 entry push 到 applyCh 中,但其实这样子是可能重复发送 entry 的,原因是 push applyCh 的过程不能够持锁,那么这个 lastApplied 在没有 push 完之前就无法得到更新,从而可能被多次调用。虽然只要上层服务可以保证不重复 apply 相同 index 的日志到状态机就不会有问题,但我个人认为这样的做法是不优雅的。考虑到异步 apply 时最耗时的步骤是 apply channel 和 apply 日志到状态机,其他的都不怎么耗费时间。因此我们完全可以只用一个 applier 协程,让其不断的把 [lastApplied + 1, commitIndex] 区间的日志 push 到 applyCh 中去。这样既可保证每一条日志只会被 exactly once 地 push 到 applyCh 中,也可以使得日志 apply 到状态机和 raft 提交新日志可以真正的并行。我认为这是一个较为优雅的异步 apply 实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
// applier a dedicated applier goroutine to guarantee that each log will be push into
// applyCh exactly once, ensuring that service's applying entries and raft's
// committing entries can be parallel
func (rf *Raft) applier() {
for !rf.killed() {
rf.mu.Lock()
// if there is no need to apply entries,
// just release CPU and wait other goroutine's signal if they commit new entries
for rf.lastApplied >= rf.commitIndex {
rf.applyCond.Wait()
}
firstIndex, commitIndex, lastApplied := rf.getFirstLog().Index, rf.commitIndex, rf.lastApplied
entries := make([]Entry, commitIndex-lastApplied)
copy(entries, rf.logs[lastApplied+1-firstIndex:commitIndex+1-firstIndex])
rf.mu.Unlock()
for _, entry := range entries {
rf.applyCh <- ApplyMsg{
CommandValid: true,
Command: entry.Command,
CommandTerm: entry.Term,
CommandIndex: entry.Index,
}
}
rf.mu.Lock()
DPrintf("{Node %v} applies entries %v-%v in term %v",
rf.me, rf.lastApplied, commitIndex, rf.currentTerm)
rf.lastApplied = Max(rf.lastApplied, commitIndex)
rf.mu.Unlock()
}
}
|
需要注意以下两点:
- 引用之前的 commitIndex:push applyCh 结束之后更新 lastApplied 的时候一定得用之前的 commitIndex 而不是 rf.commitIndex,因为后者很可能在 push channel 期间发生了改变。
- 防止与 installSnapshot 并发导致 lastApplied 回退:需要注意到,applier 协程在 push channel 时,中间可能夹杂有 snapshot 也在 push channel。如果该 snapshot 有效,那么在 CondInstallSnapshot 函数里上层状态机和 raft 模块就会原子性的发生替换,即上层状态机更新为 snapshot 的状态,raft 模块更新 log, commitIndex, lastApplied 等等,此时如果这个 snapshot 之后还有一批旧的 entry 在 push channel,那上层服务需要能够知道这些 entry 已经过时,不能再 apply,同时 applier 这里也应该加一个 Max 自身的函数来防止 lastApplied 出现回退。
快速恢复(Fast Backup)#
在前面(7.1)介绍的日志恢复机制中,如果Log有冲突,Leader每次会回退一条Log条目。 这在许多场景下都没有问题。但是在某些现实的场景中,至少在Lab2的测试用例中,每次只回退一条Log条目会花费很长很长的时间。所以,现实的场景中,可能一个Follower关机了很长时间,错过了大量的AppendEntries消息。这时,Leader重启了。按照Raft论文中的图2,如果一个Leader重启了,它会将所有Follower的nextIndex设置为Leader本地Log记录的下一个槽位(7.1有说明)。所以,如果一个Follower关机并错过了1000条Log条目,Leader重启之后,需要每次通过一条RPC来回退一条Log条目来遍历1000条Follower错过的Log记录。这种情况在现实中并非不可能发生。在一些不正常的场景中,假设我们有5个服务器,有1个Leader,这个Leader和另一个Follower困在一个网络分区。但是这个Leader并不知道它已经不再是Leader了。它还是会向它唯一的Follower发送AppendEntries,因为这里没有过半服务器,所以没有一条Log会commit。在另一个有多数服务器的网络分区中,系统选出了新的Leader并继续运行。旧的Leader和它的Follower可能会记录无限多的旧的任期的未commit的Log。当旧的Leader和它的Follower重新加入到集群中时,这些Log需要被删除并覆盖。可能在现实中,这不是那么容易发生,但是你会在Lab2的测试用例中发现这个场景。
所以,为了更快的恢复日志,Raft论文在5.3结尾处,对这种方法有了一些模糊的描述。原文有些晦涩,在这里我会以一种更好的方式,尝试解释论文中有关快速恢复的方法。大致思想是,让Follower返回足够多的信息给Leader,这样Leader可以以任期(Term)为单位来回退
,而不用每次只回退一条Log条目
。所以现在,在恢复Follower的Log时,如果Leader和Follower的Log不匹配,Leader只需要对不同任期发生一条AEs,而不需要对每个不通Log条目发送一条AEs。这是一种加速策略,当然也可以有别的日志恢复的加速策略。
我将可能出现的场景分成3类,为了简化,这里只画出一个Leader(S2)和一个Follower(S1),S2将要发送一条任期号为6的AppendEntries消息给Follower。
- 场景1:S1(Follower)没有任期6的任何Log,因此我们需要回退一整个任期的Log。
- 场景2:S1收到了任期4的旧Leader的多条Log,但是作为新Leader,S2只收到了一条任期4的Log。所以这里,我们需要覆盖S1中有关旧Leader的一些Log。
- 场景3: S1与S2的Log不冲突,但是S1缺失了部分S2中的Log
可以让Follower在回复Leader的AppendEntries消息中,携带3个额外的信息,来加速日志的恢复。这里的回复是指,Follower因为Log信息不匹配,拒绝了Leader的AppendEntries之后的回复。这里的三个信息是指:
- XTerm: 这个是Follower中与Leader冲突的Log对应的任期号。在之前(7.1)有介绍Leader会在prevLogTerm中带上本地Log记录中,前一条Log的任期号。如果Follower在对应位置的任期号不匹配,它会拒绝Leader的AppendEntries消息,并将自己的任期号放在XTerm中。如果Follower在对应位置没有Log,那么这里会返回 -1。
- XIndex: 这个是Follower中,对应任期号为XTerm的第一条Log条目的槽位号。
- XLen: 如果Follower在对应位置没有Log,那么XTerm会返回-1,XLen表示空白的Log槽位数。
我们再来看这些信息是如何在上面3个场景中,帮助Leader快速回退到适当的Log条目位置。
- 场景1: Follower(S1)会返回XTerm=5,XIndex=2。Leader(S2)发现自己没有任期5的日志,它会将自己本地记录的,S1的nextIndex设置到XIndex,也就是S1中,任期5的第一条Log对应的槽位号。所以,如果Leader完全没有XTerm的任何Log,那么它应该回退到XIndex对应的位置(这样,Leader发出的下一条AppendEntries就可以一次覆盖S1中所有XTerm对应的Log)
- 场景2: Follower(S1)会返回XTerm=4,XIndex=1。Leader(S2)发现自己其实有任期4的日志,它会将自己本地记录的S1的nextIndex设置到本地在XTerm位置的Log条目后面,也就是槽位2。下一次Leader发出下一条AppendEntries时,就可以一次覆盖S1中槽位2和槽位3对应的Log。
- 场景3: Follower(S1)会返回XTerm=-1,XLen=2。这表示S1中日志太短了,以至于在冲突的位置没有Log条目,Leader应该回退到Follower最后一条Log条目的下一条,也就是槽位2,并从这开始发送AppendEntries消息。槽位2可以从XLen中的数值计算得到。
在本次的实现中以Term 为单位返回,不在一个一个Index 自减。这需要添加 ConflicTerm
, ConflictIndex
字段 去记录出现冲突的位置和任期。然后在 HanleAppendEntries RPC 中,在 Leader 的log
中检查 ConflictIndex
位置的日志一致性。
为什么Raft协议不能提交之前任期的日志?#
查看
函数解析#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
func (rf *Raft) AppendEntries(req *AppendEntriesReq, resp *AppendEntriesResp) {
rf.mu.Lock()
defer rf.mu.Unlock()
defer rf.persist()
defer DPrintf("[AppendEntries]- {Node: %v}'s state is {state %v, term %v, commitIndex %v, lastApplied %v, firstLog %v, lastLog %v} before processing AppendEntriesRequest %v and reply AppendEntries %v",
rf.me, rf.state, rf.currentTerm, rf.commitIndex, rf.lastApplied, rf.getFirstLog(), rf.getLastLog(), req, resp)
// 如果发现来自leader的rpc中的term比当前peer要小,
// 说明是该RPC 来自旧的term(leader),|| 或者 当前leader 需要更新 不处理
if req.Term < rf.currentTerm {
resp.Term, resp.Success = rf.currentTerm, false
return
}
// 一般来讲,在vote的时候已经将currentTerm和leader同步
// 不过,有些peer暂时的掉线或者其他一些情况重连以后,会发现term和leader不一样
// 以收到大于自己的term的rpc也是第一时间同步.而且要将votefor重新设置为-1
// 等待将来选举 (说明这个peer 不是之前election 中投的的marjority)
if req.Term > rf.currentTerm {
rf.currentTerm, rf.votedFor = req.Term, -1
}
rf.ChangeState(StateFollower)
rf.electionTimer.Reset(RandomizedElectionTimeout())
// PrevLogIndex 比rf 当前的第一个Log index 还要小
if req.PrevLogIndex < rf.getFirstLog().Index {
resp.Term, resp.Success = 0, false
DPrintf("[AppendEntries] - {Node: %v} receives unexpected AppendEntriesRequest %v from {Node: %v} because prevLogIndex %v < firstLogIndex %v",
rf.me, req, req.LeaderId, req.PrevLogIndex, rf.getFirstLog().Index)
return
}
if !rf.matchLog(req.PrevLogTerm, req.PrevLogIndex) {
// 日志的一致性检查失败后,递归找到需要追加日志的位置
resp.Term, resp.Success = rf.currentTerm, false
lastIndex := rf.getLastLog().Index
if lastIndex < req.PrevLogIndex {
// lastIndex 和 nextIndex[peer] 之间有空洞 scenario3
// follower 在nextIndex[peer] 没有log
resp.ConflictTerm = -1
resp.ConflictIndex = lastIndex + 1
} else {
// scenario2, 1
// 以任期为单位进行回退
firstIndex := rf.getFirstLog().Index
resp.ConflictTerm = rf.logs[req.PrevLogIndex-firstIndex].Term
index := req.PrevLogIndex - 1
for index >= firstIndex && rf.logs[index-firstIndex].Term == resp.ConflictTerm {
index--
}
resp.ConflictIndex = index
}
return
}
firstIndex := rf.getFirstLog().Index
for i, entry := range req.Entries {
// mergeLog
// 添加的日志索引位置 比Follower 日志相同 直接添加 此处用大于等于,实际只有==
// || 要添加的日志索引位置在 Follower 中的任期和AE RPC 中的Term 冲突
if entry.Index-firstIndex >= len(rf.logs) || rf.logs[entry.Index-firstIndex].Term != entry.Term {
rf.logs = shrinkEntriesArray(append(rf.logs[:entry.Index-firstIndex], req.Entries[i:]...))
break
}
}
rf.advanceCommitIndexForFollower(req.LeaderComment)
resp.Term, resp.Success = rf.currentTerm, true
}
|