介绍

在lab2的Raft函数库之上,搭建一个能够容错的key/value存储服务,需要提供强一致性保证。

  • 强一致性介绍 对于单个请求,整个服务需要表现得像个单机服务,并且对状态机的修改基于之前所有的请求。对于并发的请求,返回的值和最终的状态必须相同,就好像所有请求都是串行的一样。即使有些请求发生在了同一时间,那么也应当一个一个响应。此外,在一个请求被执行之前,这之前的请求都必须已经被完成(在技术上我们也叫着线性化(linearizability))。

kv服务支持三种操作:Put, Append, Get。通过在内存维护一个简单的键/值对数据库,键和值都是字符串;

整体架构 skeleton

简化来看 skeleton-simple

在正式开始前,要了解论文-extend-version中section 7和8的内容。

相关的RPC 在Raft 作者的博士论文中的6.3- Implementing linearizable semantics 小结有很详细的介绍,建议先阅读。

RPC

RPC

Lab3A - 不需要日志压缩的Key/Value服务

考虑这样一个场景,客户端向服务端提交了一条日志,服务端将其在 raft 组中进行了同步并成功 commit,接着在 apply 后返回给客户端执行结果。然而不幸的是,该 rpc 在传输中发生了丢失,客户端并没有收到写入成功的回复。因此,客户端只能进行重试直到明确地写入成功或失败为止,这就可能会导致相同地命令被执行多次,从而违背线性一致性。

有人可能认为,只要写请求是幂等的,那重复执行多次也是可以满足线性一致性的,实际上则不然。考虑这样一个例子:对于一个仅支持 put 和 get 接口的 raftKV 系统,其每个请求都具有幂等性。设 x 的初始值为 0,此时有两个并发客户端,客户端 1 执行 put(x,1),客户端 2 执行 get(x) 再执行 put(x,2),问(客户端 2 读到的值,x 的最终值)是多少。对于线性一致的系统,答案可以是 (0,1),(0,2) 或 (1,2)。然而,如果客户端 1 执行 put 请求时发生了上段描述的情况,然后客户端 2 读到 x 的值为 1 并将 x 置为了 2,最后客户端 1 超时重试且再次将 x 置为 1。对于这种场景,答案是 (1,1),这就违背了线性一致性。归根究底还是由于幂等的 put(x,1) 请求在状态机上执行了两次,有两个 LZ 点。因此,即使写请求的业务语义能够保证幂等,不进行额外的处理让其重复执行多次也会破坏线性一致性。当然,读请求由于不改变系统的状态,重复执行多次是没问题的。

对于这个问题,raft 作者介绍了想要实现线性化语义,就需要保证日志仅被执行一次,即它可以被 commit 多次,但一定只能 apply 一次。其解决方案原文如下:

The solution is for clients to assign unique serial numbers to every command. Then, the state machine tracks the latest serial number processed for each client, along with the associated response. If it receives a command whose serial number has already been executed, it responds immediately without re-executing the request.

思路可以是:

  • 每个 client 都需要一个唯一的标识符,它的每个不同命令需要有一个顺序递增的 commandId,clientId 和这个 commandId,clientId 可以唯一确定一个不同的命令,从而使得各个 raft 节点可以记录保存各命令是否已应用以及应用以后的结果。

也可以参考此处dragonboat 作者讨论

为什么要记录应用的结果?因为通过这种方式同一个命令的多次 apply 最终只会实际应用到状态机上一次,之后相同命令 apply 的时候实际上是不应用到状态机上的而是直接从保存的结果中返回的。

如果默认一个客户端只能串行执行请求的话,服务端这边只需要记录一个 map,其 key 是 clientId,其 value 是该 clientId 执行的最后一条日志的 commandId 和状态机的输出即可CommandResponse。

客户端

一个 client 可以通过为其处理的每条命令递增 commandId 的方式来确保不同的命令一定有不同的 commandId,当然,同一条命令的 commandId 在没有处理完毕之前,即明确收到服务端的写入成功或失败之前是不能改变的。

代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

package kvraft

import (
	"crypto/rand"
	"math/big"

	"6.824/labrpc"
)

type Clerk struct {
	servers []*labrpc.ClientEnd
	// You will have to modify this struct.
	leaderId int64
	// generated by nrand(), it would be better to use some distributed ID
	// generation algorithm that guarantees no conflicts
	clientId  int64
	commandId int64 // (clientId, commandId) defines a operation uniquely
}

func nrand() int64 {
	max := big.NewInt(int64(1) << 62)
	bigx, _ := rand.Int(rand.Reader, max)
	x := bigx.Int64()
	return x
}

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
	return &Clerk{
		servers:   servers,
		leaderId:  0,
		clientId:  nrand(),
		commandId: 0,
	}
}

// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) Get(key string) string {

	// You will have to modify this function.
	return ck.Command(&CommandRequest{
		Key:       key,
		Op:        OpGet,
		ClientId:  ck.clientId,
		CommandId: ck.commandId,
	})
}

func (ck *Clerk) Put(key string, value string) {
	ck.Command(&CommandRequest{
		Key:       key,
		Value:     value,
		Op:        OpPut,
		ClientId:  ck.clientId,
		CommandId: ck.commandId,
	})
}
func (ck *Clerk) Append(key string, value string) {
	ck.Command(&CommandRequest{
		Key:       key,
		Value:     value,
		Op:        OpAppend,
		ClientId:  ck.clientId,
		CommandId: ck.commandId,
	})
}

func (ck *Clerk) Command(req *CommandRequest) string {
	// req.ClientId, req.CommandId = ck.clientId, ck.commandId
	for {
		var resp CommandResponse
		if !ck.servers[ck.leaderId].Call("KVServer.Command", req, &resp) ||
			resp.Err == ErrWrongLeader || resp.Err == ErrTimeout {

			// 不知leader 轮询所有的server 尝试发出请求
			ck.leaderId = (ck.leaderId + 1) % int64(len(ck.servers))
			continue
		}

		ck.commandId++
		return resp.Value
	}
}

服务端

整体请求逻辑如下: logic

Server结构体与初始化代码实现:

  1. 一个存储kv的map,即状态机,但这里实现一个基于内存版本KV即可的,但实际生产环境下必然不可能把数据全部存在内存当中,系统往往采用的是 LSM 的架构,例如 RocksDB 等,抽象成KVStateMachine 的接口。
  2. 一个能记录某一个客户端最后一次操作序号和应用结果的map lastOperations (类比Nebula 中的session 作用)
  3. 一个能记录每个raft同步操作结果的map notifyChans
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type KVServer struct {
	mu      sync.RWMutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg
	dead    int32 // set by Kill()

	maxRaftState int // snapshot if log grows this big

	// Your definitions here.
	lastApplied  int            // record the lastApplied index to prevent stateMachine from rollback
	stateMachine KVStateMachine // KV stateMachine

	// 客户端id最后的命令id和回复内容 (clientId,{最后的commdId,最后的LastReply})
	lastOperations map[int64]OperationContext
	// Leader回复给客户端的响应(LogIndex, CommandResponse
	notifyChans map[int]chan *CommandResponse
}

应用到状态机的流程

kv.applier协程:单独开一个goroutine来远程监听 Raft 的apply channel,一旦底层的Raft commit一个到apply channel,状态机就立马执行且通过 commandIndex(即raft 中的CommitIndex) 通知到该客户端的NotifyChan, Command函数取消阻塞返回给客户端。

要点:

  • raft同步完成后,也需要判断请求是否为重复请求。因为同一请求可能由于重试会被同步多次。
  • 对于客户端的请求,rpc 框架也会生成一个协程去处理逻辑。因此,需要考虑清楚这些协程之间的通信关系。为此,我的实现是客户端协程将日志放入 raft 层去同步后即注册一个 channel 去阻塞等待,接着 apply 协程监控 applyCh,在得到 raft 层已经 commit 的日志后,apply 协程首先将其 apply 到状态机中,接着根据 index 得到对应的 channel ,最后将状态机执行的结果 push 到 channel 中,这使得客户端协程能够解除阻塞并回复结果给客户端
  • 为了保证强一致性,仅对当前 term 日志的 notifyChan 进行通知,让之前 term 的客户端协程都超时重试。避免leader 降级为 follower 后又迅速重新当选了 leader,而此时依然有客户端协程未超时在阻塞等待,那么此时 apply 日志后,根据 index 获得 channel 并向其中 push 执行结果就可能出错,因为可能并不对应。
  • 在目前的实现中,读(Get)请求也会生成一条 raft 日志去同步,最简单粗暴的方式保证线性一致性,即LogRead方法。但是,这样子实现的读性能会相当的差,实际生产级别的 raft 读请求实现一般都采用了 Read Index 或者 Lease Read 的方式,具体原理可以参考此线性一致性博客,具体实现可以参照 SOFAJRaft 的实现博客
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (kv *KVServer) applier() {
	for !kv.killed() {
		for msg := range kv.applyCh {
			DPrintf("[applier] - {Node: %v} tries to apply message %v", kv.rf.Me(), msg)
			if msg.CommandValid {
				kv.mu.Lock()
				if msg.CommandIndex <= kv.lastApplied {
					DPrintf("[applier] - {Node: %v} discards outdated message %v since a newer snapshot which lastapplied is %v has been restored",
						kv.rf.Me(), msg, kv.lastApplied)

					kv.mu.Unlock()
					continue
				}

				kv.lastApplied = msg.CommandIndex

				var resp = new(CommandResponse)
				command := msg.Command.(Command)
				if command.Op != OpGet && kv.isDuplicatedReq(command.ClientId, command.CommandId) {
					DPrintf("[applier] - {Node: %v} doesn't apply duplicated message %v to state machine since maxAppliedCommandId is %v for client %v",
						kv.rf.Me(), msg, kv.lastOperations[command.ClientId], command.ClientId)

					resp = kv.lastOperations[command.ClientId].LastResponse
				} else {
					resp = kv.applyLogToStateMachine(command)
					if command.Op != OpGet {
						kv.lastOperations[command.ClientId] = OperationContext{
							MaxAppliedCommandId: command.CommandId,
							LastResponse:        resp,
						}
					}
				}

				// 记录每个idx apply 到state machine 的 CommandResponse
				// 为了保证强一致性,仅对当前 term 日志的 notifyChan 进行通知,
				// 让之前 term 的客户端协程都超时重试。避免leader 降级为 follower
				// 后又迅速重新当选了 leader,而此时依然有客户端协程未超时在阻塞等待,
				// 那么此时 apply 日志后,根据 index 获得 channel 并向其中 push 执行结果就可能出错,因为可能并不对应
				if currentTerm, isLeader := kv.rf.GetState(); isLeader && msg.CommandTerm == currentTerm {
					ch := kv.getNotifyChan(msg.CommandIndex)
					ch <- resp
				}

				// part 2
				needSnapshot := kv.needSnapshot()
				if needSnapshot {
					kv.takeSnapshot(msg.CommandIndex)
				}
				kv.mu.Unlock()
			} else if msg.SnapshotValid {
				kv.mu.Lock()
				if kv.rf.CondInstallSnapshot(msg.SnapshotTerm, msg.SnapshotIndex, msg.Snapshot) {
					kv.restoreSnapshot(msg.Snapshot)
					kv.lastApplied = msg.SnapshotIndex
				}
				kv.mu.Unlock()
			} else {
				panic(fmt.Sprintf("unexpected Message: %v", msg))
			}
		}
	}
}

leader 比 follower 多出一个 notifyChan 环节,是因为 leader 需要处理 rpc 请求响应,而 follower 不用,一个很简单的流程其实就是 client -> kvservice -> Start() -> applyCh -> kvservice -> client,但是applyCh是逐个 commit 一个一个返回,所以需要明确返回的 commit 对应的是哪一个请求,即通过 commitIndex唯一确定一个请求,然后通知该请求执行流程可以返回了。

对于读请求,由于其不影响系统状态,所以直接去状态机执行即可,当然,其结果也不需要再记录到去重的数据结构中。

CommandRPC 逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

// Command 客户端调用的RPC方法
func (kv *KVServer) Command(req *CommandRequest, resp *CommandResponse) {
	defer DPrintf("[Command]- {Node: %v} processes CommandReq %v with CommandResp %v",
		kv.rf.Me(), req, resp)

	// 如果请求是重复的,直接在 OperationContext 中拿到之前的结果返回
	kv.mu.RLock()
	if req.Op != OpGet && kv.isDuplicatedReq(req.ClientId, req.CommandId) {
		lastResp := kv.lastOperations[req.ClientId].LastResponse
		resp.Value, resp.Err = lastResp.Value, lastResp.Err
		kv.mu.RUnlock()
		return
	}
	kv.mu.RUnlock()

	idx, _, isLeader := kv.rf.Start(Command{req})
	if !isLeader {
		resp.Err = ErrWrongLeader
		return
	}

	kv.mu.Lock()
	ch := kv.getNotifyChan(idx)
	kv.mu.Unlock()

	select {
	case result := <-ch:
		resp.Value, resp.Err = result.Value, result.Err

	case <-time.After(ExecuteTimeOut):
		resp.Err = ErrTimeout
	}

	go func() {
		kv.mu.Lock()
		kv.removeOutdatedNotifyChan(idx)
		kv.mu.Unlock()
	}()

}

Lab3B - 日志压缩

首先,日志的 snapshot 不仅需要包含状态机的状态,还需要包含用来去重的 lastOperations 哈希表。

其次,apply 协程负责持锁阻塞式的去生成 snapshot,幸运的是,此时 raft 框架是不阻塞的,依然可以同步并提交日志,只是不 apply 而已。如果这里还想进一步优化的话,可以将状态机搞成 MVCC 等能够 COW 的机制,这样应该就可以不阻塞状态机的更新了

优化: 项目中 LastOperationsNotifyChan 都是使用map 不能并发安全,用了一张大锁保平安。 实际上可以使用Sync.Map 然后将锁的粒度细化来优化这块