ShardedKV 介绍

有关 shardkv,其可以算是一个 multi-raft 的实现,只是缺少了物理节点的抽象概念。在实际的生产系统中,不同 raft 组的成员可能存在于一个物理节点上,而且一般情况下都是一个物理节点拥有一个状态机,不同 raft 组使用不同地命名空间或前缀来操作同一个状态机。基于此,下文所提到的的节点都代指 raft 组的某个成员,而不代指某个物理节点。比如节点宕机代指 raft 组的某个成员被 kill 掉,而不是指某个物理节点宕机,从而可能影响多个 raft 的成员。

在本实验中,我们将构建一个带分片的KV存储系统,即一组副本组上的键。每一个分片都是KV对的子集,例如,所有以“a”开头的键可能是一个分片,所有以“b”开头的键可能是另一个分片。 也可以用range 或者Hash 之后分区。 分片的原因是性能。每个replica group只处理几个分片的 put 和 get,并且这些组并行操作;因此,系统总吞吐量(每单位时间的投入和获取)与组数成比例增加。

我们的整个系统有两个基本组件:shard controller 和 shard group。整个系统有一个 controller 和多个 group,controller 单独一个 raft 集群,每一个 shard group 是由 kvraft 实例构成的集群。shard controller 负责调度,客户端向 shard controller 发送请求,controller 会根据配置(config)来告知客户端服务这个 key 的是哪个 group。 每个 group 负责部分 shard。

1
2
3
4
5
type Config struct {
	Num    int              // config number, version also
	Shards [NShards]int     // shard -> gid
	Groups map[int][]string // gid -> servers[]
}

三个参数分别对应的版本的配置号,分片所对应的组(Group)信息(实验中的分片为10个),每个组对应的服务器映射名称列表(也就是组信息)。

Group表示一个Leader-Followers集群,Gid为它的标识,Shard表示所有数据的一个子集,Config表示一个划分方案。此次实验中,所有数据分为NShards = 10份,Server给测试程序提供四个接口。 下图中每个Shard 都有其他对应的副本未画出。对Client 以Group 为单位进行服务。相当于一个物理节点上 有若干个Group 可以对外服务。

group

分片存储系统必须能够在replica group之间移动分片,因为某些组可能比其他组负载更多,因此需要移动分片以平衡负载;而且replica group可能会加入和离开系统,可能会添加新的副本组以增加容量,或者可能会使现有的副本组脱机以进行修复或报废。

Lab4A 实现

本实验的主要挑战是处理重新配置——移动分片所属。在单个副本组中,所有组成员必须就何时发生与客户端 Put/Append/Get 请求相关的重新配置达成一致。例如,Put 可能与重新配置大约同时到达,导致副本组停止对该Put包含的key的分片负责。组中的所有副本必须就 Put 发生在重新配置之前还是之后达成一致。如果之前,Put 应该生效,分片的新所有者将看到它的效果;如果之后,Put 将不会生效,客户端必须在新所有者处重新尝试。推荐的方法是让每个副本组使用 Raft 不仅记录 Puts、Appends 和 Gets 的顺序,还记录重新配置的顺序。您需要确保在任何时候最多有一个副本组为每个分片提供请求。

重新配置还需要副本组之间的交互。例如,在配置 10 中,组 G1 可能负责分片 S1。在配置 11 中,组 G2 可能负责分片 S1。在从 10 到 11 的重新配置过程中,G1 和 G2 必须使用 RPC 将分片 S1(键/值对)的内容从 G1 移动到 G2。

Lab4的内容就是将数据按照某种方式分开存储到不同的RAFT集群(Group)上的分片(shard)上。保证相应数据请求引流到对应的集群,降低单一集群的压力,提供更为高效、更为健壮的服务。 structure

  • 具体的lab4要实现一个支持 multi-raft分片 、分片数据动态迁移的线性一致性分布式 KV 存储服务。
  • shard表示互不相交并且组成完整数据库的每一个数据库子集。group表示shard的集合,包含一个或多个shard。一个shard只可属于一个group,一个group可包含(管理)多个shard。
  • lab4A实现ShardCtrler服务,作用:提供高可用的集群配置管理服务,实现分片的负载均衡,并尽可能少地移动分片。记录了每组(Group) ShardKVServer 的集群信息和每个分片(shard)服务于哪组(Group)ShardKVServer。 具体实现通过Raft维护 一个Configs数组,单个config具体内容如下:
    • Num:config number,Num=0表示configuration无效,边界条件, 即是version 的作用
    • Shards:shard -> gid,分片位置信息,Shards[3]=2,说明分片序号为3的分片负贵的集群是Group2(gid=2)
    • Groups:gid -> servers[], 集群成员信息,Group[3]=[‘server1’,‘server2’],说明gid = 3的集群Group3包含两台名称为server1 & server2的机器

RPC

  • Query RPC。查询配置,参数是一个配置号, shardctrler 回复具有该编号的配置。如果该数字为 -1 或大于已知的最大配置数字,则 shardctrler 应回复最新配置。 Query(-1) 的结果应该反映 shardctrler 在收到 Query(-1) RPC 之前完成处理的每个 Join、Leave 或 Move RPC;

  • Join RPC 。添加新的replica group,它的参数是一组从唯一的非零副本组标识符 (GID) 到服务器名称列表的映射。 shardctrler 应该通过创建一个包含新副本组的新配置来做出反应。新配置应在所有组中尽可能均匀地分配分片,并应移动尽可能少的分片以实现该目标。如果 GID 不是当前配置的一部分,则 shardctrler 应该允许重新使用它(即,应该允许 GID 加入,然后离开,然后再次加入)

新加入的Group信息,要求在每一个group平衡分布shard,即任意两个group之间的shard数目相差不能为1,具体实现每一次找出含有shard数目最多的和最少的,最多的给最少的一个,循环直到满足条件为止。坑为:GID = 0 是无效配置,一开始所有分片分配给GID=0,需要优先分配;map的迭代时无序的,不确定顺序的话,同一个命令在不同节点上计算出来的新配置不一致,按sort排序之后遍历即可。且 map 是引用对象,需要用深拷贝做复制。

对于 Join,可以通过多次平均地方式来达到这个目的:每次选择一个拥有 shard 数最多的 raft 组和一个拥有 shard 数最少的 raft,将前者管理的一个 shard 分给后者,周而复始,直到它们之前的差值小于等于 1 且 0 raft 组无 shard 为止。对于 Leave,如果 Leave 后集群中无 raft 组,则将分片所属 raft 组都置为无效的 0;否则将删除 raft 组的分片均匀地分配给仍然存在的 raft 组。通过这样的分配,可以将 shard 分配地十分均匀且产生了几乎最少的迁移任务。

  • Leave RPC。删除指定replica group, 参数是以前加入的组的 GID 列表。 shardctrler 应该创建一个不包括这些组的新配置,并将这些组的分片分配给剩余的组。新配置应在组之间尽可能均匀地划分分片,并应移动尽可能少的分片以实现该目标;

  • Move RPC。移动分片,的参数是一个分片号和一个 GID。 shardctrler 应该创建一个新配置,其中将分片分配给组。 Move 的目的是让我们能够测试您的软件。移动后的加入或离开可能会取消移动,因为加入和离开会重新平衡。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
// Join according to new Group(gid -> servers) to change the Config
func (cf *MemoryConfigStateMachine) Join(groups map[int][]string) Err {
	lastConfig := cf.Configs[len(cf.Configs)-1]
	newConfig := Config{
		Num:    len(cf.Configs),
		Shards: lastConfig.Shards,
		Groups: deepCopy(lastConfig.Groups),
	}

	for gid, servers := range groups {
		if _, ok := newConfig.Groups[gid]; !ok {
			newServers := make([]string, len(servers))
			copy(newServers, servers)
			newConfig.Groups[gid] = newServers
		}
	}
                                       
	// 找到group 中shard 最大和最小的组,将数据进行move => reblance
	g2s := Group2Shards(newConfig)
	for {
		src, dst := GetGIDWIthMaxShards(g2s), GetGIDWithMinShards(g2s)
		if src != 0 && len(g2s[src])-len(g2s[dst]) <= 1 {
			break
		}

		g2s[dst] = append(g2s[dst], g2s[src][0])
		g2s[src] = g2s[src][1:]
	}

	var newShards [NShards]int
	for gid, shards := range g2s {
		for _, shard := range shards {
			newShards[shard] = gid
		}
	}
	newConfig.Shards = newShards
	cf.Configs = append(cf.Configs, newConfig)
	return OK
}

// Leave some group leave the cluster
func (cf *MemoryConfigStateMachine) Leave(gids []int) Err {
	lastConfig := cf.Configs[len(cf.Configs)-1]
	newConfig := Config{
		Num:    len(cf.Configs),
		Shards: lastConfig.Shards,
		Groups: deepCopy(lastConfig.Groups),
	}

	g2s := Group2Shards(newConfig)
	orphanShards := make([]int, 0)

	for _, gid := range gids {
		delete(newConfig.Groups, gid)
		if shards, ok := g2s[gid]; ok {
			orphanShards = append(orphanShards, shards...)
			delete(g2s, gid)
		}
	}

	var newShards [NShards]int
	if len(newConfig.Groups) != 0 {
		// reblance
		for _, shard := range orphanShards {
			target := GetGIDWithMinShards(g2s)
			g2s[target] = append(g2s[target], shard)
		}

		// update Shards: share -> gid
		for gid, shards := range g2s {
			for _, shard := range shards {
				newShards[shard] = gid
			}
		}
	}

	newConfig.Shards = newShards
	cf.Configs = append(cf.Configs, newConfig)
	return OK
}

// Move move No.shard to No.gid
func (cf *MemoryConfigStateMachine) Move(shard, gid int) Err {
	lastConfig := cf.Configs[len(cf.Configs)-1]
	newConfig := Config{
		Num:    len(cf.Configs),
		Shards: lastConfig.Shards,
		Groups: lastConfig.Groups,
	}

	newConfig.Shards[shard] = gid
	cf.Configs = append(cf.Configs, newConfig)
	return OK
}

// Query return the version of num config
func (cf *MemoryConfigStateMachine) Query(num int) (Config, Err) {
	if num < 0 || num >= len(cf.Configs) {
		return cf.Configs[len(cf.Configs)-1], OK
	}
	return cf.Configs[num], OK
}

Lab4B ShardKV

实验提示:

  • 服务器不需要调用分片控制器的Join(),tester 才会去调用; 服务器将需要定期轮询 shardctrler 以监听新的配置。预期大约每100毫秒轮询一次;可以更频繁,但过少可能会导致 bug。
  • 服务器需要互相发送rpc,以便在配置更改期间传输分片。shardctrler的Config结构包含服务器名,一个 Server 需要一个labrpc.ClientEnd,以便发送RPC。使用make_end()函数传给StartServer()函数将服务器名转换为ClientEnd。shardkv /client.go需要实现这些逻辑。
  • 在server.go中添加代码去周期性从 shardctrler 拉取最新的配置,并且当请求分片不属于自身时,拒绝请求
  • 当被请求到错误分片时,需要返回ErrWrongGroup给客户端,并确保Get, Put, Append在面临并发重配置时能正确作出决定
  • 重配置需要按流程执行唯一一次
  • labgob 的提示错误不能忽视,它可能导致实验不过
  • 分片重分配的请求也需要做重复请求检测
  • 若客户端收到ErrWrongGroup,是否更改请求序列号? 若服务器执行请求时返回ErrWrongGroup,是否更新客户端信息?
  • 当服务器转移到新配置后,它可以继续存储它不再负责的分片(生产环境中这是不允许的),但这个可以简化实现
  • 当 G1 在配置变更时需要来自 G2 的分片数据,G2 处理日志条目的哪个时间点将分片发送给 G1 是最好的?
  • 你可以在整个 rpc 请求或回复中发送整个 map,这可以简化分片传输
  • map 是引用类型,所以在发送 map 的时候,建议先拷贝一次,避免 data race(在 labrpc 框架下,接收 map 时也需要拷贝)
  • 在配置更改期间,一对组可能需要互相传送分片,这可能会发生死锁
  • Challenge 如果想达到生产环境系统级别,如下两个挑战是需要实现的
  • Challenge1:Garbage collection of state 当一个副本组失去一个分片的所有权时,副本组需要删除该分片数据。但这给迁移带来一些问题,考虑两个组G1 和 G2,并且新配置C 将分片从 G1 移动到 G2,若 G1 在转换配置到C时删除了数据库中的分片,当G2 转换到C时,如何获取 G1 的数据
  • 实验要求 使每个副本组保留旧分片的时长不再是无限时长,即使副本组(如上面的G1)中的所有服务器崩溃并恢复正常,解决方案也必须工作。如果您通过TestChallenge1Delete,您就完成了这个挑战。
  • 解决方案 分片迁移成功之后,立马进行分片 GC 了,GC 完毕后再进入到配置更新阶段。
  • chanllenge2:Client requests during configuration changes 配置更改期间最简单的方式是禁止所有客户端操作直到转换完成,虽然简单但是不满足于生产环境要求,这将导致客户端长时间停滞,最好可以继续为不受当前配置更改的分片提供服务 上述优化还能更好,若 G3 在过渡到配置C时,需要来自G1 的分片S1 和 G2 的分片S2。希望 G3 能在收到其中一个分片后可以立即开始接收针对该分片的请求。如G1宕机了,G3在收到G2的分片数据后,可以立即为 S2 分片提供服务,而不需要等待 C 配置转换完全完成
  • 实验要求 修改您的解决方案,以便在配置更改期间继续执行不受影响的分片中的 key 的客户端操作。当您通过 TestChallenge2Unaffected 测试时,您已经完成了这个挑战。 修改您的解决方案,在配置转换进行中,副本组也可以立即开始提供分片服务。当您通过TestChallenge2Partial测试时,您已经完成了这个挑战。
  • 解决方案 分片迁移以 group 为单位,这样即使一个 group挂了,也不会影响到另一个 group中的分片迁移。

上面的实验ShardCtrler 集群组实现了配置更新,分片均匀分配等任务,ShardKVServer则需要承载所有分片的读写任务,相比于MIT 6.824 Lab3 RaftKV的提供基础的读写服务,还需要功能为配置更新,分片数据迁移,分片数据清理,空日志检测

实验逻辑

我们可以首先明确系统的运行方式:一开始系统会创建一个 shardctrler 组来负责配置更新,分片分配等任务,接着系统会创建多个 raft 组来承载所有分片的读写任务。此外,raft 组增删,节点宕机,节点重启,网络分区等各种情况都可能会出现。

对于集群内部,我们需要保证所有分片能够较为均匀的分配在所有 raft 组上,还需要能够支持动态迁移和容错。

对于集群外部,我们需要向用户保证整个集群表现的像一个永远不会挂的单节点 KV 服务一样,即具有线性一致性。

lab4b 的基本测试要求了上述属性,challenge1 要求及时清理不再属于本分片的数据,challenge2 不仅要求分片迁移时不影响未迁移分片的读写服务,还要求不同地分片数据能够独立迁移,即如果一个配置导致当前 raft 组需要向其他两个 raft 组拉取数据时,即使一个被拉取数据的 raft 组全挂了,也不能导致另一个未挂的被拉取数据的 raft 组分片始终不能在当前 raft 组提供服务。

  • StartServer: 启动Raft 节点和 Group
  • configureAction: 监听是否由配置变化, 配置符合要求后执行NewConfigurationCommand RPC Apply 操作记录到日志中
  • migrationAction: 监听configureAction 结束后 根据最新配置进行数据迁移。会发送GetShardsData RPC pull shard 数据到resp *ShardOperationResponse 中,相当于拉到本地节点的某个变量中,这个过程可能会有大量数据的传输。此RPC 结束之后,发送InsertShardsCommand RPC 进行真实的数据迁移, 同样经过Raft 层多数节点同意后,应用到本地的状态机上。并把 需要Shard 状态修改好。Pulling 改为GCing 为下部做准备
  • gcAction: 在上面一步的applyInsertShards 中会把已经Pulling 的远程的Shard 改为Gcing。 在这个Goroutine 中,调用DeleteShardsData RPC, 会把ShardOperationRequest 中的Shard 通过发送 NewDeleteShardsCommand RPC 把状态为GCing 的Shards 改为Server,状态为BePulling 的重置。与此同时,DeleteShardsData RPC 结束OK 后,本节点也需要发送一遍NewDeleteShardsCommand RPC Command,把GCing 的Shards 改为默认状态。

架构图

structure

1
2
3
4
5
6
7
8
9
2023/03/03 19:58:49 [StartServer]-{Node: 0}-{Group: 100} has started
2023/03/03 19:58:49 [StartServer]-{Node: 1}-{Group: 100} has started
2023/03/03 19:58:49 [StartServer]-{Node: 2}-{Group: 100} has started
2023/03/03 19:58:49 [StartServer]-{Node: 0}-{Group: 101} has started
2023/03/03 19:58:49 [StartServer]-{Node: 1}-{Group: 101} has started
2023/03/03 19:58:49 [StartServer]-{Node: 2}-{Group: 101} has started
2023/03/03 19:58:49 [StartServer]-{Node: 0}-{Group: 102} has started
2023/03/03 19:58:49 [StartServer]-{Node: 1}-{Group: 102} has started
2023/03/03 19:58:49 [StartServer]-{Node: 2}-{Group: 102} has started

根据Log 可以看出,集群以Group 为单位,初始化三个Group 管理十个Shard和三台节点。Shard 是真实存储数据的单位。 每个节点都有一个Raft 共识层,同一个Group 构成一个Raft Group, 整体形成Multi Raft Group, 以Group 为单位对应用层提供服务。Group 内部用Raft 保持数据的一致性。每个Group 有多少个Shard 由ShardCtrller 决定。Shard如果由副本也在各自Group 的各个节点上管理。

Nebula Graph 中 每个Shard及其副本构成一个Raft Group,Shard 的数量决定了Group 的数量。 本实验中,确定了只有最多三个Group

客户端Clerk

主要请求逻辑:

  • 使用key2shard()去找到一个 key 对应哪个ShardShard;
  • 根据Shard从当前配置config中获取的 gid;
  • 根据gid从当前配置config中获取 group 信息;
  • 在group循环查找leaderId,直到返回请求成功、ErrWrongGroup或整个 group 都遍历请求过;
  • Query 最新的配置,回到步骤1循环重复;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

type Clerk struct {
	sc      *shardctrler.Clerk
	config  shardctrler.Config
	makeEnd func(string) *labrpc.ClientEnd

	// You will have to modify this struct.
	leaderIds map[int]int // {groupid: leader if hardid of this groupid}
	clientId  int64
	commandId int64 //clientId + commandId define unique operation
}

// 省略一些方法

func (ck *Clerk) Command(req *CommandRequest) string {
	req.ClientId, req.CommandId = ck.clientId, ck.commandId

	for {
		shard := key2shard(req.Key)
		gid := ck.config.Shards[shard]

		if servers, ok := ck.config.Groups[gid]; ok {
			// 找到Group 对应的LeaderId, 如果没有从Id 0 开始轮询
			if _, ok := ck.leaderIds[gid]; !ok {
				ck.leaderIds[gid] = 0
			}

			oldLeaderId := ck.leaderIds[gid]
			newLeaderId := oldLeaderId

			for {
				var resp CommandResponse
				ok := ck.makeEnd(servers[newLeaderId]).Call("ShardKV.Command", req, &resp)

				if ok && (resp.Err == OK || resp.Err == ErrNoKey) {
					ck.commandId++
					return resp.Value
				} else if ok && resp.Err == ErrWrongGroup {
					break
				} else {
					// Err is 	ErrWrongLeader ErrOutDated ErrTimeout ErrNotReady
					newLeaderId = (newLeaderId + 1) % len(servers)
					if newLeaderId == oldLeaderId {
						// 所有server 轮询一遍之后退出,避免raft 集群处于无leader 状态中一直重试
						break
					}
					continue
				}
			}
		}

		time.Sleep(100 * time.Millisecond)

		ck.config = ck.sc.Query(-1)

	}
}

服务端Server

主要逻辑:

  • 客户端首先和ShardCtrler交互,获取最新的配置,根据最新配置找到对应key的shard,请求该shard的group。
  • 服务端ShardKVServer会创建多个 raft 组来承载所有分片的读写任务。
  • 服务端ShardKVServer需要定期和ShardCtrler交互,保证更新到最新配置(monitor)。
  • 服务端ShardKVServer需要根据最新配置完成配置更新,分片数据迁移,分片数据清理,空日志检测等功- 能。

结构体

首先ShardKVServer给出结构体,相比于MIT 6.824 Lab3 RaftKV的多了currentConfig和lastConfig数据,这样其他协程便能够通过其计算需要需要向谁拉取分片或者需要让谁去删分片。 同时底层的StateMachine 也由MemeoryKV 变为Shard 承接。并给Shard 添加了状态信息。

启动了五个协程:apply 协程,配置更新协程,数据迁移协程,数据清理协程,空日志检测协程来实现功能。四个协程都需要 leader 来执行,因此抽象出了一个简单地周期执行函数 Monitor。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
type Shard struct {
	KV     map[string]string
	Status ShardStatus
}

type ShardKV struct {
	mu   sync.RWMutex
	me   int
	rf   *raft.Raft
	dead int32

	applyCh chan raft.ApplyMsg

	makeEnd func(string) *labrpc.ClientEnd
	gid     int
	sc      *shardctrler.Clerk

	maxRaftState int // snapshot if log grows this big
	lastApplied  int // 记录applied Index 防止状态机apply 小的index

	lastConfig    shardctrler.Config
	currentConfig shardctrler.Config

	stateMachine   map[int]*Shard                // {shardId: shard of KV}
	lastOperations map[int64]OperationContext    // {clientId: ctx}
	notifyChans    map[int]chan *CommandResponse // {commitIndex: commandResp}
}

func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister,
	maxraftstate int, gid int, ctrlers []*labrpc.ClientEnd,
	make_end func(string) *labrpc.ClientEnd) *ShardKV {
	// call labgob.Register on structures you want
	// Go's RPC library to marshall/unmarshall.
	labgob.Register(Command{})
	labgob.Register(CommandRequest{})
	labgob.Register(shardctrler.Config{})
	labgob.Register(ShardOperationRequest{})
	labgob.Register(ShardOperationResponse{})

	applyCh := make(chan raft.ApplyMsg)

	kv := &ShardKV{
		me:             me,
		rf:             raft.Make(servers, me, persister, applyCh),
		dead:           0,
		applyCh:        applyCh,
		makeEnd:        make_end,
		gid:            gid,
		sc:             shardctrler.MakeClerk(ctrlers),
		maxRaftState:   maxraftstate,
		lastApplied:    0,
		lastConfig:     shardctrler.DefaultConfig(),
		currentConfig:  shardctrler.DefaultConfig(),
		stateMachine:   make(map[int]*Shard),
		lastOperations: make(map[int64]OperationContext),
		notifyChans:    make(map[int]chan *CommandResponse),
	}

	kv.restoreSnapshot(persister.ReadSnapshot())
	// start applier goroutine to apply committed logs to stateMachine
	go kv.applier()

	// start configuration monitor goroutine to fetch latest configuration
	go kv.Monitor(kv.configureAction, ConfigureMonitorTimeout)
	// start migration monitor goroutine to pull related shards
	go kv.Monitor(kv.migrationAction, MigrationMonitorTimeout)
	// start gc monitor goroutine to delete useless shards in remote groups
	go kv.Monitor(kv.gcAction, GCMonitorTimeout)

	// start entry-in-currentTerm monitor goroutine to advance commitIndex by
	// appending empty entries in current term periodically to avoid live locks
	go kv.Monitor(kv.checkEntryIncurrentTermAction, EmptyEntryDetectorTimeout)

	DPrintf("[StartServer]-{Node: %v}-{Group: %v} has started", kv.me, kv.gid)
	return kv
}

分片状态

每个分片共有 4 种状态:

  • Serving:分片的默认状态,如果当前 raft 组在当前 config 下负责管理此分片,则该分片可以提供读写服务,否则该分片暂不可以提供读写服务,但不会阻塞配置更新协程拉取新配置。

  • Pulling:表示当前 raft 组在当前 config 下负责管理此分片,暂不可以提供读写服务,需要当前 raft 组从上一个配置该分片所属 raft 组拉数据过来之后才可以提供读写服务,系统会有一个分片迁移协程检测所有分片的 Pulling 状态,接着以 raft 组为单位去对应远端 raft 组拉取数据,接着尝试重放该分片的所有数据到本地并将分片状态置为 Serving,以继续提供服务。

  • BePulling:表示当前 raft 组在当前 config 下不负责管理此分片,不可以提供读写服务,但当前 raft 组在上一个 config 时负责管理此分片,因此当前 config 下负责管理此分片的 raft 组拉取完数据后会向本 raft 组发送分片清理的 rpc,接着本 raft 组将数据清空并重置为 serving 状态即可。

  • GCing:表示当前 raft 组在当前 config 下负责管理此分片,可以提供读写服务,但需要清理掉上一个配置该分片所属 raft 组的数据。系统会有一个分片清理协程检测所有分片的 GCing 状态,接着以 raft 组为单位去对应远端 raft 组删除数据,一旦远程 raft 组删除数据成功,则本地会尝试将相关分片的状态置为 Serving。

日志类型

在 lab3 中,客户端的请求会被包装成一个 Op 传给 Raft 层,则在 lab4 中,不难想到,Servers 之间的交互,也可以看做是包装成 Op 传给 Raft 层;定义了五种类型的日志:

  • Operation:客户端传来的读写操作日志,有 Put,Get,Append 等请求。

  • Configuration:配置更新日志,包含一个配置。

  • InsertShards:分片更新日志,包含至少一个分片的数据和配置版本。

  • DeleteShards:分片删除日志,包含至少一个分片的 id 和配置版本。

  • EmptyEntry:空日志,Data 为空,使得状态机达到最新。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
type Command struct {
	Op   CommandType
	Data interface{}
}

func (cmd Command) String() string {
	return fmt.Sprintf("{Op: %v, Data: %v}", cmd.Op, cmd.Data)
}

func NewOperationCommand(req *CommandRequest) Command {
	return Command{
		Op:   Operation,
		Data: *req,
	}
}

func NewConfigurationCommand(config *shardctrler.Config) Command {
	return Command{
		Op:   Configuration,
		Data: *config,
	}
}

func NewInsertShardsCommand(response *ShardOperationResponse) Command {
	return Command{InsertShards, *response}
}

func NewDeleteShardsCommand(request *ShardOperationRequest) Command {
	return Command{DeleteShards, *request}
}

func NewEmptyEntryCommand() Command {
	return Command{EmptyEntry, nil}
}

// -------------------------------------------------------------

type CommandType uint8

const (
	Operation CommandType = iota
	Configuration
	InsertShards
	DeleteShards
	EmptyEntry
)

var ctmap = [...]string{
	"Operation", "Configuration", "InsertShards", "DeleteShards", "EmptyEntry",
}

func (ct CommandType) String() string {
	return ctmap[ct]
}

读写服务

读写操作的基本逻辑相比于MIT 6.824 Lab3 RaftKV基本一致,需要增加分片状态判断。根据上述定义,分片的状态为 Serving 或 GCing,当前 raft 组在当前 config 下负责管理此分片,本 raft 组才可以为该分片提供读写服务,否则返回 ErrWrongGroup 让客户端重新拉取最新的 config 并重试即可

canServe 的判断需要在向 raft 提交前和 apply 时都检测一遍以保证正确性并尽可能提升性能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// canServe 判断shard 的状态是否可以对外服务
// Serving 默认初始状态, GCing 表示该shard 的数据刚刚拉取完毕,但是需要清除
// 远端 该shardId 数据
func (kv *ShardKV) canServe(ShardId int) bool {
	return kv.currentConfig.Shards[ShardId] == kv.gid &&
		(kv.stateMachine[ShardId].Status == Serving ||
			kv.stateMachine[ShardId].Status == GCing)
}


func (kv *ShardKV) Command(req *CommandRequest, resp *CommandResponse) {
	kv.mu.RLock()

	if req.Op != OpGet && kv.isDuplicateRequest(req.ClientId, req.CommandId) {
		lastResp := kv.lastOperations[req.ClientId].LastResponse
		resp.Err = lastResp.Err
		resp.Value = lastResp.Value
		kv.mu.RUnlock()
		return
	}

	// return ErrWrongGroup directly to let client fetch latest configuration
	// and perform a retry if this key can't be served by this shard at present
	if !kv.canServe(key2shard(req.Key)) {
		resp.Err = ErrWrongGroup
		resp.Value = ""
		kv.mu.RUnlock()
		return
	}

	kv.mu.RUnlock()
	kv.Execute(NewOperationCommand(req), resp)
}

// Execute shardKV 执行相关的RPC req
func (kv *ShardKV) Execute(command Command, resp *CommandResponse) {
	// do not hold lock to improve throughput
	// when KVServer holds the lock to take snapshot, underlying raft can still commit raft logs
	index, _, isLeader := kv.rf.Start(command)
	if !isLeader {
		resp.Err = ErrWrongLeader
		return
	}

	defer DPrintf("[Execute]-{Node: %v}-{Group: %v} process Command %v with CommandResponse %v",
		kv.me, kv.gid, command, resp)

	kv.mu.Lock()
	ch := kv.getNotifyChan(index)
	kv.mu.Unlock()

	select {
	case res := <-ch:
		resp.Value, resp.Err = res.Value, res.Err
	case <-time.After(ExecuteTimeout):
		resp.Err = ErrTimeout
	}

	go func() {
		kv.mu.Lock()
		kv.deleteOutdatedNotifyChan(index)
		kv.mu.Unlock()
	}()
}

// applyOperation 对状态机的操作, Get, Put, Append
func (kv *ShardKV) applyOperation(msg *raft.ApplyMsg, req *CommandRequest) *CommandResponse {
	var resp *CommandResponse
	shardId := key2shard(req.Key)

	if kv.canServe(shardId) {
		if req.Op != OpGet && kv.isDuplicateRequest(req.ClientId, req.CommandId) {
			DPrintf("[applyOperation]-{Node: %v}-{Group: %v} doesn't apply duplicated message %v to stateMachine because maxAppliedCommandId is %v for client %v",
				kv.me, kv.gid, kv.lastOperations[req.ClientId], kv.lastApplied, req.ClientId)

			lastResp := kv.lastOperations[req.ClientId].LastResponse
			return lastResp
		}
		resp = kv.applyLogToStateMachines(req, shardId)
		if req.Op != OpGet {
			// save max command resp
			kv.lastOperations[req.ClientId] = OperationContext{
				MaxAppliedCommandId: req.CommandId,
				LastResponse:        resp,
			}
		}
		return resp
	}
	return &CommandResponse{ErrWrongGroup, ""}
}

配置更新

配置更新协程负责定时检测所有分片的状态,一旦存在至少一个分片的状态不为默认状态,则预示其他协程仍然还没有完成任务,那么此时需要阻塞新配置的拉取和提交。

在 apply 配置更新日志时需要保证幂等性:

  • 不同版本的配置更新日志:apply 时仅可逐步递增的去更新配置,否则返回失败。
  • 相同版本的配置更新日志:由于配置更新日志仅由配置更新协程提交,而配置更新协程只有检测到比本地更大地配置时才会提交配置更新日志,所以该情形不会出现。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// configureAction kvctrller execute apply configuration
func (kv *ShardKV) configureAction() {
	canPerformNextConfig := true

	kv.mu.RLock()
	for _, shard := range kv.stateMachine {
		if shard.Status != Serving {
			canPerformNextConfig = false
			DPrintf("[configureAction]-{Node: %v}-{Group: %v} will not try to fetch latest configuration because shards status are %v when currentConfig is %v",
				kv.me, kv.gid, kv.getShardStatus(), kv.currentConfig)
			break
		}
	}

	currentConfigNum := kv.currentConfig.Num
	kv.mu.RUnlock()

	if canPerformNextConfig {
		nextConfig := kv.sc.Query(currentConfigNum + 1)
		if nextConfig.Num == currentConfigNum+1 {
			DPrintf("[configureAction]-{Node: %v}-{Group: %v} fetches latest configuration %v when currentConfigNum is %v",
				kv.me, kv.gid, nextConfig, currentConfigNum)

			kv.Execute(NewConfigurationCommand(&nextConfig), &CommandResponse{})
		}
	}
}

// applyConfiguration 对kv controller 的配置进行更新
func (kv *ShardKV) applyConfiguration(conf *shardctrler.Config) *CommandResponse {
	if conf.Num == kv.currentConfig.Num+1 {
		DPrintf("[applyConfiguration]-{Node: %v}-{Group: %v} updates currentConfig from %v to %v",
			kv.me, kv.gid, kv.currentConfig, conf)

		kv.updateShardStatus(conf)
		kv.lastConfig = kv.currentConfig
		kv.currentConfig = *conf
		return &CommandResponse{OK, ""}
	}

	DPrintf("[applyConfiguration]-{Node: %v}-{Group: %v} rejects outdated config %v when currentConfig is %v",
		kv.me, kv.gid, conf, kv.currentConfig)

	return &CommandResponse{ErrOutDated, ""}
}

分片迁移

分片迁移协程负责定时检测分片的 Pulling 状态,利用 lastConfig 计算出对应 raft 组的 gid 和要拉取的分片,然后并行地去拉取数据。

注意这里使用了 waitGroup 来保证所有独立地任务完成后才会进行下一次任务。此外 wg.Wait() 一定要在释放读锁之后,否则无法满足 challenge2 的要求。

在拉取分片的 handler 中,首先仅可由 leader 处理该请求,其次如果发现请求中的配置版本大于本地的版本,那说明请求拉取的是未来的数据,则返回 ErrNotReady 让其稍后重试,否则将分片数据和去重表都深度拷贝到 response 即可。

在 apply 分片更新日志时需要保证幂等性:

  • 不同版本的配置更新日志:仅可执行与当前配置版本相同地分片更新日志,否则返回 ErrOutDated。
  • 相同版本的配置更新日志:仅在对应分片状态为 Pulling 时为第一次应用,此时覆盖状态机即可并修改状态为 GCing,以让分片清理协程检测到 GCing 状态并尝试删除远端的分片。否则说明已经应用过,直接 break 即可。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

// migrationAction shard migration data when in Pulling status
func (kv *ShardKV) migrationAction() {
	kv.mu.RLock()
	gid2shardIds := kv.getShardIdsByStatus(Pulling)

	var wg sync.WaitGroup
	for gid, shardIds := range gid2shardIds {
		DPrintf("[migrationAction]-{Node: %v}-{Group: %v} starts a PullTask to get shards %v from group %v when config is %v",
			kv.me, kv.gid, shardIds, gid, kv.currentConfig)

		wg.Add(1)
		go func(servers []string, configNum int, shardIds []int) {
			defer wg.Done()
			PullTaskRequest := ShardOperationRequest{
				ConfigNum: configNum,
				ShardIDs:  shardIds,
			}

			// 本Node 向Pulling Status 的Shard 所在Group 的全部Server
			// 发送RPC 但是只有Leader 有响应,其他忽略
			for _, server := range servers {
				var pullTaskResp ShardOperationResponse
				srv := kv.makeEnd(server)
				DPrintf("[migrationAction]-{Node: %v}-{Group: %v} server call %v", kv.me, kv.gid, server)
				if srv.Call("ShardKV.GetShardsData", &PullTaskRequest, &pullTaskResp) && pullTaskResp.Err == OK {
					DPrintf("[migrationAction]-{Node: %v}-{Group: %v} gets a PullTaskResponse %v and tries to commit it when currentConfigNum is %v",
						kv.me, kv.gid, pullTaskResp, configNum)
					kv.Execute(NewInsertShardsCommand(&pullTaskResp), &CommandResponse{})
				}
			}
		}(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIds)
	}
	kv.mu.RUnlock()
	wg.Wait()
}


// RPC GetShardsData ConfigOperation 生效之后数据迁移, 将request 中shardId的数据,迁移到resp中 返回给调用方
func (kv *ShardKV) GetShardsData(req *ShardOperationRequest, resp *ShardOperationResponse) {
	// ...
}

func (kv *ShardKV) applyInsertShards(shardsInfo *ShardOperationResponse) *CommandResponse {
	// ...
}

分片清理

分片清理协程负责定时检测分片的 GCing 状态,利用 lastConfig 计算出对应 raft 组的 gid 和要拉取的分片,然后并行地去删除分片。

注意这里使用了 waitGroup 来保证所有独立地任务完成后才会进行下一次任务。此外 wg.Wait() 一定要在释放读锁之后,否则无法满足 challenge2 的要求。

在删除分片的 handler 中,首先仅可由 leader 处理该请求,其次如果发现请求中的配置版本小于本地的版本,那说明该请求已经执行过,否则本地的 config 也无法增大,此时直接返回 OK 即可,否则在本地提交一个删除分片的日志。

在 apply 分片删除日志时需要保证幂等性:

  • 不同版本的配置更新日志:仅可执行与当前配置版本相同地分片删除日志,否则已经删除过,直接返回 OK 即可。
  • 相同版本的配置更新日志:如果分片状态为 GCing,说明是本 raft 组已成功删除远端 raft 组的数据,现需要更新分片状态为默认状态以支持配置的进一步更新;否则如果分片状态为 BePulling,则说明本 raft 组第一次删除该分片的数据,此时直接重置分片即可。否则说明该请求已经应用过,直接 break 返回 OK 即可。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

// gcAction Pulling 数据之后把状态改为GCing,调用RPC 删除远端的该ShardId 的数据
func (kv *ShardKV) gcAction() {
	kv.mu.RLock()
	gid2shardIds := kv.getShardIdsByStatus(GCing)
	var wg sync.WaitGroup
	for gid, shardIds := range gid2shardIds {
		DPrintf("[gcAction]-{Node: %v}-{Group: %v} starts a GCTask to delete shards %v in group %v when config is %v",
			kv.me, kv.gid, shardIds, gid, kv.currentConfig)

		wg.Add(1)
		go func(servers []string, configNum int, shardIds []int) {
			defer wg.Done()
			gcTaskReq := ShardOperationRequest{ConfigNum: configNum, ShardIDs: shardIds}
			for _, server := range servers {
				var gcTaskResp ShardOperationResponse
				srv := kv.makeEnd(server)
				// 远端执行删除数据的逻辑
				if srv.Call("ShardKV.DeleteShardsData", &gcTaskReq, &gcTaskResp) && gcTaskResp.Err == OK {
					DPrintf("[gcAction]-{Node: %v}-{Group: %v} deletes shards %v in remote group successfully when currentConfigNum is %v",
						kv.me, kv.gid, shardIds, configNum)

					// 远端删除完数据之后,Raft 本节点同样需要 把GCing 状态恢复为Server 状态
					kv.Execute(NewDeleteShardsCommand(&gcTaskReq), &CommandResponse{})
				}
			}
		}(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIds)
	}

	kv.mu.RUnlock()
	wg.Wait()
}

// RPC DeleteShardsData 删除迁移之后的 shard 中的数据
func (kv *ShardKV) DeleteShardsData(req *ShardOperationRequest, resp *ShardOperationResponse) {
	if _, isLeader := kv.rf.GetState(); !isLeader {
		resp.Err = ErrWrongLeader
		return
	}

	defer DPrintf("[DeleteShardsData]-{Node: %v}-{Group: %v} processes GCTaskRequest %v with response %v",
		kv.me, kv.gid, req, resp)

	kv.mu.RLock()
	if kv.currentConfig.Num > req.ConfigNum {
		DPrintf("[DeleteShardsData]-{Node: %v}-{Group: %v} encounters duplicated shards deletions %v when currentConfig is %v",
			kv.me, kv.gid, req, kv.currentConfig)
		resp.Err = OK
		kv.mu.RUnlock()
		return
	}
	kv.mu.RUnlock()

	var commandResp CommandResponse
	kv.Execute(NewDeleteShardsCommand(req), &commandResp)
	resp.Err = commandResp.Err
}

空日志检测

分片清理协程负责定时检测 raft 层的 leader 是否拥有当前 term 的日志,如果没有则提交一条空日志,这使得新 leader 的状态机能够迅速达到最新状态,从而避免多 raft 组间的活锁状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10

func (kv *ShardKV) checkEntryIncurrentTermAction() {
	if !kv.rf.HasLogInCurrentTerm() {
		kv.Execute(NewEmptyEntryCommand(), &CommandResponse{})
	}
}

func (kv *ShardKV) applyEmptyEntry() *CommandResponse {
	return &CommandResponse{OK, ""}
}

问题

出现随机的get(x) != expect(x)这种错误,并且发生情况在raft stop后,applyInsertShards之后,看起来像apply一个duplicated的msg,从而产生了get(x)=“xabb” != expect(x)=“xab” 这个分析了很久,并且重构了日志,让每个gid在一个logfile里。最后发现原因:raft重启后,会重新apply所有logs,这时config change,开始pull shards,待insertShards完成后。raft收到落后的command,导致又apply了duplicate的数据。

解决方案,在applyInsertShards同时,将reply中的lastOperation来更新自己的session。因此,当下次applyMsg来的时候,就可以根据client的SequenceNum来判断 是否接受这个applyMsg,防止了duplicated的msg。