数据处理在 Session-Based 场景中的一大特点是:同一会话产生的数据应投递到同一处理单元中。 不同于无状态服务,其难点在于:当集群发生扩缩容时,除了要保证负载均衡,更要保证数据到处理单元的稳定映射。 本文从经典案例入手,介绍 Sharding 技术是如何解决该问题的,并结合 kafka consumer 浅谈应用方法。
Session-Based 路由问题分析
什么是 Session
数据流处理面临四个基本问题1 :
- What
-
数据处理结果是什么?比如求和,统计,机器学习等由业务场景决定的计算逻辑。
- Where
-
数据何时产生(Event Time)?数据的生成、观测、传输、计算一般分布在不同的链路,存在时间差。Event Time 用于还原事件发生的原始时刻与顺序。
- When
-
数据何时处理(Processing Time)?原始数据不断涌入到处理节点,需按照一定策略加工才能得到结果,比如:来一个处理一个;来一批集中处理等,涉及到延迟和性能的平衡。Processing Time 用于描述原始数据转变为结果的方法。
- How
-
数据采用何种更新方式?因为数据像水流般逐渐到来,分批处理后需将阶段性结果再加工才能得到最终结果。汇总的方式可能有:总是用最新结果覆盖旧数据(Discarding),累积(Accumulating),累积并记录变化量(Retracting)等。
Session 是 Where 问题的一种表现形式:事件并非相互独立,而是存在层级关系,某些事件隶属同一上下文,需作为整体处理才能得到正确结果。比如,用户在线时的所有操作,属于同一 session;它以用户登录为起点,下线为终点。同一用户同一时刻只能保留唯一 session ,因为一些服务可能依赖于全局状态。下面结合游戏场景举个具体例子。
应用举例——消极比赛
MOBA 或其它竞技类型游戏,核心玩法以单局形式存在,每个单局可视为一个 session。单局除了基本游戏体验,还有一些旁路功能,比如判定作弊、消极比赛等。
Judge Cluster 是消极比赛裁决服务集群,包含多个进程实例(Instance)。每个 Instance 上运行多个裁决器,裁决器和具体单局 Game i 一一对应。 Game i, Event j 表示单局 i 中的第 j 条事件。消极判定依赖事件上下文,所以单局事件须 完整、有序 交给 同一对应裁决器 处理。因为判定逻辑依赖行为的因果关系,因果关系既和 Event Time 顺序有关,也和事件完整性有关。比如角色在战斗过程中死亡,是正常行为;但如果站着不动让别人攻击,那就有消极嫌疑。这种场景对架构设计提出以下要求:
-
必须满足
-
单局事件可被正确路由到裁决器中。
-
路由映射需保证稳定。
-
数据有序。
-
-
可选
-
数据丢失可恢复。不可恢复时,受影响的单局不判定消极行为,对正常用户无影响,体验可接受。
-
这几条要求代表了 Session 场景数据投递的技术点,下文对其抽象,尝试提炼通用解决方案。
问题抽象
-
Message
-
Message Key 消息的唯一标记
-
Session Key 消息隶属的 session 唯一标记
-
Sequence ID 消息序列号,用于排序,可以是时间戳。
-
-
Cluster
-
Node 进程实例
-
Cluster 一组 Nodes 的集合,作为整体对外提供服务。
-
Entity 消息处理实体,和 session 对应,集群内唯一。
-
-
Session
-
存在持续期:起始(Start)和终止(Stop)。start 对应 Entity 创建,stop 对应 Entity 销毁,procesing 即 Entity 处理消息。
-
同一 session 内的消息必须由同一 Entity 处理。
-
消息到 Entity 的映射关系是稳定的。
-
Entity 和 Node 的映射关系可能动态调整,比如 Cluster 扩缩容,Node 消息不可达或响应过慢导致的 Entity 在 Nodes 上分配的重新调整(Rebalance)。
-
Entity 是否需要根据 Event Time 的顺序处理消息和场景相关,如果关注则应关注 Sequence ID。
-
问题分析
核心问题只有一个: 确保 Session 中的消息被正确投递到对应的 Entity 中 。看似简单,但 Entity 的载体是 Nodes ,Nodes 受各种条件影响变化几乎是不可避免的。难点就转换为:怎么保证 Entity 的逻辑地址和物理地址(所属 Node 地址)解耦,不变的逻辑地址如何自动化更新对应的物理地址。
我们先来看下,哪些情况会引起 Nodes 和 Entity 映射关系的变化。
-
Entity 的创建和销毁,对应于 Session 生命周期的变化。
-
Cluster 变化。Nodes 增加或者减少,导致 Entity 迁移到新的 Node,原因可能有:
-
扩缩容。如果基于容器,使用编排系统自动调度,可能会更频繁一些。
-
容灾,Node 故障或者响应过慢。
-
更新。
-
Rebalance 时更新 Nodes 和 Entity 的映射关系,需要考虑的因素和应用场景相关,常见的有:
-
路由更新 。通过算法,存储等方式,确定消息要发给哪个 Node。
-
负载均衡 。提供持续服务基础要求。
-
数据本地性(Locality) ,分空间和时间两种。比如 Kafka Producer 根据 Session Key 向 Partition 推送数据,同一 session 中的数据会进入同一 Partition,作为 consumer 应尽可能保证拉取的 session 数据在本地处理,否则要多一次转发。
上图展示了 session 场景下数据路由的基本流程,总结起来 核心环节 有两个:
-
根据消息确定 Entity ID。
-
监听 Cluster 变化,根据 Entity ID 获取所属节点位置。
下面我们来看看解决方案。
已有方案
- (a) 直连
-
Service Client 作为请求端,配置后端集群各个 Node 地址,本地实现路由功能。优势是:和业务场景关联密切,灵活性高。劣势是:无法自动感知后端集群变化;对请求方不友好,需要侵入式集成 SDK;
- (b) 借助第三方
-
为了自动感知后端集群变化,可以让第三方 Service Registry 监管,请求端从 Service Registry 查询即可 。比如:常用的服务发现组件 zookeeper, etcd 等;目录服务,域名解析服务;数据库等等。
- (c) 网关中转
-
请求方不必集成复杂 SDK,将路由和服务发现集成到网关中。优势是:侵入性小;服务间解耦,不必关注集群变更细节,只需关注服务名。缺陷是:通用网关承载通用逻辑,个性化路由需要个性化网关增加复杂度;通信链路多了一次 hop 。
- (d) Cluster 内路由
-
将复杂的路由隐藏到 Cluster 内部,内部节点承担数据转发功能。相比网关中转优势在于:外部只需要考虑负载均衡,不必担心路由;最差情况才会多一次 hop 。
上述四种方法,各有适用场景,相互间也并非完全独立可以组合使用。为了简化问题,下文将针对 (d) Cluster 内路由 说明 Sharding 的一种实现方案。在这种场景下,外界对 Cluster 访问简化为:
-
服务发现。
-
负载均衡。Node 级别粗粒度,算法很多,比如:随机、Round Robin、最小负载等。
而 Session 消息到 Entity 的稳定路由则全部由 Cluster 内部的 Sharding 机制解决。
Cluster Sharding
Cluster Sharding 在集群内部实现消息路由,无论哪个节点接收到外部投递的消息,都应准确发送到目标 Entity 所属 Node。这需要提根据消息自身内容获取 Node 位置,并随着 Cluster 伸缩自适应更新。这种方式对外屏蔽了路由的复杂性,而且 Cluster 本来就要处理消息,可以做各种自定义逻辑,提高了灵活性。下面本文参考Akka Cluster 介绍 Sharding 的体系结构和术语,然后描述 routing, rebalance 过程。
体系简介
| 符号 | 全称 | 说明 |
|---|---|---|
| ST | Sharding Type | 一个独立的 Sharding 体系,由两个因素决定:处理的消息类型及名称。 |
| E | Entity | 消息处理实体,一个 session 的所有消息均投递给该 Entity 处理。 |
| S | Shard | 包含一组相同功能的 Entity,一个 Shard 只能位于同一 Node 上,负责内部 Entity 创建销毁及路由。 |
| SC | Shard Coordinator | 每个 ST 一个,负责当前体系内 Shard 与 Node 映射关系的维护:Allocate, Rebalance。 |
| SR | Shard Region | 每个 Node 一个,负责 Shard 创建和路由,本地 Shard 直发,远程 Shard 转发 |
一个 Sharding 体系包含:/Entity, Shard, Shard Regin, Shard Coordinator/ 四个模块。Sharding 体系的区分取决于逻辑功能。比如:单局战斗事件和聊天事件,处理逻辑显然不同,通过消息类型即可区分;但同样是战斗事件,可用于消极行为裁决,也可用于数据统计,这就需要用名称区分。所以独立的 Sharding 体系可以将 消息类型 和 名称 组合作为唯一标记。Sharding 体系确定后,就定义了一类具有相同功能的 Entity 集合,这里用 EntityTypeKey[MsgType](name) 来表示。MsgType 表示这类 Entity 可以处理的消息类型,name 是字符串标记。
Entity 数量可能很多,百万用户在线时一个场景通常会有数十万。直接对 Entity 管理代价很高,所以模拟现实世界中组织架构的方式 *分层*。将 Entity 分成若干组,以组为基本单位管理,这就是 Shard 。对 Entity 的访问退化为 Shard 访问,粒度从细变粗。考虑容灾、扩展等多方面因素,Shards 都不应存放于同一 Node,一定分散在不同 Nodes 上。这时就面临两个问题:
-
从 Shard 角度看:我应该在哪个 Node 上?这是 Allocate, Rebalance 无可回避的问题。
-
从 Node 角度看:我到底管理了哪些 Shard? 不在我管辖范围内 Shard 在哪儿?
这两个问题就分别需要 Shard Coordinator 和 Shard Region 解决。
- Shard Coordinator
-
当 Shard 创建及再平衡时,决定 Shard 和 Node 的映射关系。这种决策需要 Cluster 全局信息,且要做统一判断,所以 SC 是 Cluster 内全局唯一的处理逻辑。
- Shard Region
-
每个 Node 一个。作为当前 Node 所有 Shards 的管理器,扮演三种功能:
-
Cluster 内,和 SC 通信确定 Shard 位置,和其它 Node 上的 SR 通信路由消息。
-
对 Cluser 外,接收消息请求,转发到正确的 Shard 上。
-
Node 内,创建销毁 Shard,并转发消息。
-
Shard 数量通常是固定的,每个 Shard 中 Entity 的数量是动态变化的。这点 **非常关键**,这为 Entity 到 Shard 的稳定映射提供了可能。因为 Node 是物理存在,它的变化是不可避免的,Shard 是逻辑存在,可以稳定不变。这样可以保证 Session 和 Node 解耦,简化了 Session 到 Shard 映射复杂度。为了保证 Shard 能够较好均匀的分布在所有 Nodes 上,可以将 Shard 数量设置为 Nodes 总数的较大上限,比如 Nodes 数量的十倍。
该体系的具体运作方式,我们结合两个关键流程介绍:路由和再平衡。
路由
路由有两个过程:不存在时创建;存在时转发。根据目的地有两种场景:目标 shard 位于收到消息的 Node 本地;目标 shard 所在 Node 并非收到消息的 Node 。下面分别介绍这两种场景。
本地路由
-
Node A 收到消息 Msg1,转交给 SR 处理。Msg 的类型是 Type(GameID, SeqID, GameData) :GameID 表示单局 ID,也就是 Session Key(Session ID);SeqID 表示单局消息的顺序;GameData 表示消息的数据内容。
-
SR 从 Msg1 获取 Entity ID(E1)和 Shard ID(S1):
-
获取 Entity ID,一般情况下 Entity 和 Session 一一对应,所以 Entity ID 就是 Session ID。Session ID 包含在 Message 中,比如在消极比赛裁决中,就是 GameID。
-
根据 EntityID 获得 ShardID。这点非常关键,需要保证 Entity 到 Shard 稳定映射,由于 Shard 总数是不变的,一个简单有效的方式是对 Shard 总数取模,这个结果也总是稳定的。
-
-
SR 如果是第一次遇到 S1,是不知道位置的,需要向 SC 发起查询。如果已经查询过,且这段时间内没有发生变化,那么直接使用本地缓存结果即可。
-
SC 向 SR 返回查询结果:S1 在 Node A 上。
-
SR 发现 S1 在本地,那么创建 S1。
-
S1 继续创建 Entity: E1,并将消息路由给 E1。
上述过程描述了,Node 收到消息并在本地创建 Shard 和 Entity 的过程;如果已经创建好,那么本地路由就不必再和 SC 交互,直接转发给本地 Shard 即可,因为 SR 保留了本地路由信息。
远程路由
远程路由和本地路由的大概流程类似,区别点在于:当 SR_A 向 SC 查询后,发现目标 Shard(S2)在节点 B 上,那么将消息转发给 Node B。SR_B 在本地完成 S2 的创建和路由。在这种情况下,消息投递多了一次 Hop。如果要考虑优化,有两个方面:
-
SC 在做分配时除了负载均衡,还应考虑 Locality,减少 Node 间消息转发。如果某个 Node 收到消息那么尽可能将 Shard 分配到该 Node 上。
-
消息投递到 Cluster 时,虽然无论发送给哪个 Node 都可以保证消息投递,但如果稳定发送给固定 Node,则可以保证 Locality 更好的发挥作用。
再平衡
Cluster 内部 Nodes 状态变化,比如增加、移除、不可达,必然涉及 Shard 和 Node 映射关系的调整,这就是再平衡。再平衡具体包含两种情况:
-
增加 Node。Shard 需要从旧 Node 迁移到新 Node,或者新 Shard 直接在新 Node 创建。
-
移除 Node。这个过程多数是不可控的,相当于在剩余 Nodes 上重新创建 Shard,可以参考前文介绍的创建过程。
因此再平衡主要考虑 两个问题 :
-
增加 Node 时,Shard 和 Node 的映射关系如何更新。
-
Shard 状态如何迁移。
新增 Node
-
SC 感知到新增 Node,根据 Rebalance 策略,计算出需要将 Node B 上的 Shard(S1) 迁移到新 Node 上,并对新 Node 初始化——创建 Shard Region,S1'。然后开启整个迁移流程。
-
暂停所有 SR 关于 S1 位置的查询
-
通知所有 SR 停止向 S1 发送消息,将输入全部缓存在本地。目的在于让停止向 S1 发送新消息,让它处理完残留后再向新位置迁移。
-
S1 处理完所有消息后,告知 SC 自己已经没有遗留工作,可以关闭。对于有状态的 shard,还需要完成状态到 S1' 的同步。之后 SC 认为迁移流程结束。
-
SC 关闭 Node B 中的 S1.
-
SC 恢复 SR 关于 S1 位置的查询,并主动通知 S1 新地址——S1'。
-
缓存的消息或者新进消息,根据 S1' 新地址被路由到新 Node。
Shard 状态迁移
如果 Shard 中 Entity 是有状态的,且状态不可丢,那么需要将 Entity 的状态同步到新节点。不过同步方式和业务场景密切相关,不应该也没必要由 Shard 底层提供解决方案,只要做好流程控制即可。一般有两种方式:
- <Inputs, replay>
-
将导致状态变更的消息(Inputs)按序保存,并持久化到第三方存储,比如消息中间件 kafka。新 Entity 同步时将这些事件拉取到本地重播(replay)。这种方式要求内部逻辑能够依据输入完全恢复,在处理随机种子,绝对时间戳等情况时需要非常谨慎,如果消息量较大恢复时间较长。一个典型例子就是 Binlog。
- <Status, set>
-
将状态(Status)保存起来,类似一张快照(Snapshot),新 Entity 将快照数据设置到本地即可。这种方式保存的数据量可能较大,而且对更新频率要求较高否则会导致信息丢失。
实际应用时建议结合需求处理:
-
session 数据真的不可丢吗?有损服务是否可以接收?至少消极行为裁决,是不需要在迁移时状态同步的。最差情况就是迁移过程中一些消极行为没有检测到,但这比例很低。
-
session 数据不可丢。需结合业务特点判断 Binlog 和 Snapshot,哪种方案更适合,甚至两者结合。
顺序
细心的读者,可能会发现一个问题。如果任何一个 Session 的数据随意发送给任何一个 Node,虽然最终路由给正确的 Entity,但顺序是无法保证的。这实际上是网络通信的基本问题。从 A 到 B,如果有多条通路,那么无法保证 A 发送消息的顺序和 B 接收消息的顺序一致。这时有两种解决方案:
-
保证通信链路是单一的。对 Cluster Sharding 而言意味着,同一 Session 的消息应该发送给同一 Node,任意一个 Node 皆可,但应保持不变。
-
Entity 收到消息后重新排序。需要维持一定的缓存空间,可能会增加处理延迟。
下面结合消极行为裁决,我们来看看如何利用 sharding 解决各种问题。
Kafka 应用
我们再回顾下游戏单局数据处理的例子:应用举例——消极比赛。单局数据有多个应用场景,比如:单局结算、玩家生涯指标、大盘数据统计、消极行为裁决。这是典型的 Publish-Subscribe 场景,可以使用 Kafka 保存单局事件,做到不同服务间的解耦。由于消极行为裁决要求保留消息的原始顺序,所以在通信链路上应该保证唯一性。
Game Cluster 是单局服务集群,一个单局只会存在于一个 Node 上,比如 Game 3 在 Node 2 上。Game 3 作为 Session,将 Game ID 作为 Sessioin ID,并作为消息 Key 推送到 Kafka 中。因此可以保证,同一 Game 的所有消息会 按序 进入同一 Partition 。裁决服务集群作为 Consumer Group,从 Kafka 拉数据。每个 Partition 只会由一个固定的 Consumer 消费,在 Consumer Group 稳定的情况下,Partition 到 Consumer 的映射关系是稳定的,到 Entity 的链路也是稳定的。因此可以做到 Entity 3 稳定有序 的消费 Game 3 产生的数据。
Consumer Rebalance
这样似乎就够了,但实际生产环境很难保证 Consumer Group 维持不变,比如:
-
Consumer Group 扩容
-
Consumer 消费速度过慢,被 kafka 踢掉
-
Consumer 故障
-
如果 Consumer 基于 K8s 托管,可能根据资源使用情况调整容器位置
如图所示,Partition 3 本来由 Consumer 1 消费。Game 1 中的事件按先后顺序分别是 M1,M2,M3。当 M1 和 M2 被 C1 消费后,Consumer 3 加入成功,Partition 3 被分配给 C3。那么 Game 1 中的 M3 则转由 C3 处理。显然 G1 数据被截断分拆给不同的 Consumer 处理,结果是不正确的。
Sharding
我们期望的效果是 Partition 无论被哪个 Consumer 消费,同一 Session 的数据总能转发给同一 Entity 处理。经过前面的介绍,sharding 是非常适合的,应用流程如下所示:
-
单局的消息类型定义为 GameMsgType(GameID, SeqID, GameData) ,GameID 是单局的唯一标记,等价于 Session ID.
-
单局服务将 GameID 作为消息 Key 推送到 Kafka 中。保证了同一单局内的消息按序进入同一 Partition.
-
消极行为裁决服务作为消费端接入 Kafka。需要启用 Sharding 机制,先定义 Sharding 体系 EntityTypeKey ,消息类型是 GameMsgType ,名称是 Consumer Group ID。这样可以保证在 Kafka 的所有消费者中唯一。
-
定义 Extractor,从消息中获得 EntityID。需要能够体现 Session 和 Entity 的稳定映射关系,在当前场景下,可以视为 GameID。
-
根据 EntityID 获取 ShardID: ShardID=EntityID%ShardCount(>= PartitionCount),ShardCount 是一个逻辑概念,不应小于 Partition 数量。
-
再平衡策略。当 SC 接收 SR 请求为 Shard 分配 Node 时,优先保证分配给接收消息的 Node。好处在于 consumer 收到消息后收到消息后直接本地处理,减少一次网络转发。
总结
Sharding 技术,旨在实现逻辑处理单元和物理节点的解耦,将复杂的路由逻辑隐藏在集群内部实现,降低外部访问的复杂度。如果配合消息队列以及其它数据恢复技术,还可支持消息有序,节点状态迁移,比较适合做为分布式系统中细粒度有状态服务的路由解决方案。
Footnotes
Tyler Akidau, Slava Chernyak & Reuven lax. Streaming Systems: The What, Where, When and How of Large-Scale Data Processing[M].O'REILLY, 2018-07-12.