数据处理在 Session-Based 场景中的一大特点是:同一会话产生的数据应投递到同一处理单元中。 不同于无状态服务,其难点在于:当集群发生扩缩容时,除了要保证负载均衡,更要保证数据到处理单元的稳定映射。 本文从经典案例入手,介绍 Sharding 技术是如何解决该问题的,并结合 kafka consumer 浅谈应用方法。
Session-Based 路由问题分析
什么是 Session
数据流处理面临四个基本问题1 :
- What
-
数据处理结果是什么?比如求和,统计,机器学习等由业务场景决定的计算逻辑。
- Where
-
数据何时产生(Event Time)?数据的生成、观测、传输、计算一般分布在不同的链路,存在时间差。Event Time 用于还原事件发生的原始时刻与顺序。
- When
-
数据何时处理(Processing Time)?原始数据不断涌入到处理节点,需按照一定策略加工才能得到结果,比如:来一个处理一个;来一批集中处理等,涉及到延迟和性能的平衡。Processing Time 用于描述原始数据转变为结果的方法。
- How
-
数据采用何种更新方式?因为数据像水流般逐渐到来,分批处理后需将阶段性结果再加工才能得到最终结果。汇总的方式可能有:总是用最新结果覆盖旧数据(Discarding),累积(Accumulating),累积并记录变化量(Retracting)等。
Session 是 Where 问题的一种表现形式:事件并非相互独立,而是存在层级关系,某些事件隶属同一上下文,需作为整体处理才能得到正确结果。比如,用户在线时的所有操作,属于同一 session;它以用户登录为起点,下线为终点。同一用户同一时刻只能保留唯一 session ,因为一些服务可能依赖于全局状态。下面结合游戏场景举个具体例子。
应用举例——消极比赛
MOBA 或其它竞技类型游戏,核心玩法以单局形式存在,每个单局可视为一个 session。单局除了基本游戏体验,还有一些旁路功能,比如判定作弊、消极比赛等。
Judge Cluster 是消极比赛裁决服务集群,包含多个进程实例(Instance)。每个 Instance 上运行多个裁决器,裁决器和具体单局 Game i 一一对应。 Game i, Event j 表示单局 i 中的第 j 条事件。消极判定依赖事件上下文,所以单局事件须 完整、有序 交给 同一对应裁决器 处理。因为判定逻辑依赖行为的因果关系,因果关系既和 Event Time 顺序有关,也和事件完整性有关。比如角色在战斗过程中死亡,是正常行为;但如果站着不动让别人攻击,那就有消极嫌疑。这种场景对架构设计提出以下要求:
-
必须满足
-
单局事件可被正确路由到裁决器中。
-
路由映射需保证稳定。
-
数据有序。
-
-
可选
-
数据丢失可恢复。不可恢复时,受影响的单局不判定消极行为,对正常用户无影响,体验可接受。
-
这几条要求代表了 Session 场景数据投递的技术点,下文对其抽象,尝试提炼通用解决方案。
问题抽象
-
Message
-
Message Key 消息的唯一标记
-
Session Key 消息隶属的 session 唯一标记
-
Sequence ID 消息序列号,用于排序,可以是时间戳。
-
-
Cluster
-
Node 进程实例
-
Cluster 一组 Nodes 的集合,作为整体对外提供服务。
-
Entity 消息处理实体,和 session 对应,集群内唯一。
-
-
Session
-
存在持续期:起始(Start)和终止(Stop)。start 对应 Entity 创建,stop 对应 Entity 销毁,procesing 即 Entity 处理消息。
-
同一 session 内的消息必须由同一 Entity 处理。
-
消息到 Entity 的映射关系是稳定的。
-
Entity 和 Node 的映射关系可能动态调整,比如 Cluster 扩缩容,Node 消息不可达或响应过慢导致的 Entity 在 Nodes 上分配的重新调整(Rebalance)。
-
Entity 是否需要根据 Event Time 的顺序处理消息和场景相关,如果关注则应关注 Sequence ID。
-
问题分析
核心问题只有一个: 确保 Session 中的消息被正确投递到对应的 Entity 中 。看似简单,但 Entity 的载体是 Nodes ,Nodes 受各种条件影响变化几乎是不可避免的。难点就转换为:怎么保证 Entity 的逻辑地址和物理地址(所属 Node 地址)解耦,不变的逻辑地址如何自动化更新对应的物理地址。
我们先来看下,哪些情况会引起 Nodes 和 Entity 映射关系的变化。
-
Entity 的创建和销毁,对应于 Session 生命周期的变化。
-
Cluster 变化。Nodes 增加或者减少,导致 Entity 迁移到新的 Node,原因可能有:
-
扩缩容。如果基于容器,使用编排系统自动调度,可能会更频繁一些。
-
容灾,Node 故障或者响应过慢。
-
更新。
-
Rebalance 时更新 Nodes 和 Entity 的映射关系,需要考虑的因素和应用场景相关,常见的有:
-
路由更新 。通过算法,存储等方式,确定消息要发给哪个 Node。
-
负载均衡 。提供持续服务基础要求。
-
数据本地性(Locality) ,分空间和时间两种。比如 Kafka Producer 根据 Session Key 向 Partition 推送数据,同一 session 中的数据会进入同一 Partition,作为 consumer 应尽可能保证拉取的 session 数据在本地处理,否则要多一次转发。
上图展示了 session 场景下数据路由的基本流程,总结起来 核心环节 有两个:
-
根据消息确定 Entity ID。
-
监听 Cluster 变化,根据 Entity ID 获取所属节点位置。
下面我们来看看解决方案。
已有方案
- (a) 直连
-
Service Client 作为请求端,配置后端集群各个 Node 地址,本地实现路由功能。优势是:和业务场景关联密切,灵活性高。劣势是:无法自动感知后端集群变化;对请求方不友好,需要侵入式集成 SDK;
- (b) 借助第三方
-
为了自动感知后端集群变化,可以让第三方 Service Registry 监管,请求端从 Service Registry 查询即可 。比如:常用的服务发现组件 zookeeper, etcd 等;目录服务,域名解析服务;数据库等等。
- (c) 网关中转
-
请求方不必集成复杂 SDK,将路由和服务发现集成到网关中。优势是:侵入性小;服务间解耦,不必关注集群变更细节,只需关注服务名。缺陷是:通用网关承载通用逻辑,个性化路由需要个性化网关增加复杂度;通信链路多了一次 hop 。
- (d) Cluster 内路由
-
将复杂的路由隐藏到 Cluster 内部,内部节点承担数据转发功能。相比网关中转优势在于:外部只需要考虑负载均衡,不必担心路由;最差情况才会多一次 hop 。
上述四种方法,各有适用场景,相互间也并非完全独立可以组合使用。为了简化问题,下文将针对 (d) Cluster 内路由 说明 Sharding 的一种实现方案。在这种场景下,外界对 Cluster 访问简化为:
-
服务发现。
-
负载均衡。Node 级别粗粒度,算法很多,比如:随机、Round Robin、最小负载等。
而 Session 消息到 Entity 的稳定路由则全部由 Cluster 内部的 Sharding 机制解决。
Cluster Sharding
Cluster Sharding 在集群内部实现消息路由,无论哪个节点接收到外部投递的消息,都应准确发送到目标 Entity 所属 Node。这需要提根据消息自身内容获取 Node 位置,并随着 Cluster 伸缩自适应更新。这种方式对外屏蔽了路由的复杂性,而且 Cluster 本来就要处理消息,可以做各种自定义逻辑,提高了灵活性。下面本文参考Akka Cluster 介绍 Sharding 的体系结构和术语,然后描述 routing, rebalance 过程。
体系简介
符号 | 全称 | 说明 |
---|---|---|
ST | Sharding Type | 一个独立的 Sharding 体系,由两个因素决定:处理的消息类型及名称。 |
E | Entity | 消息处理实体,一个 session 的所有消息均投递给该 Entity 处理。 |
S | Shard | 包含一组相同功能的 Entity,一个 Shard 只能位于同一 Node 上,负责内部 Entity 创建销毁及路由。 |
SC | Shard Coordinator | 每个 ST 一个,负责当前体系内 Shard 与 Node 映射关系的维护:Allocate, Rebalance。 |
SR | Shard Region | 每个 Node 一个,负责 Shard 创建和路由,本地 Shard 直发,远程 Shard 转发 |
一个 Sharding 体系包含:/Entity, Shard, Shard Regin, Shard Coordinator/ 四个模块。Sharding 体系的区分取决于逻辑功能。比如:单局战斗事件和聊天事件,处理逻辑显然不同,通过消息类型即可区分;但同样是战斗事件,可用于消极行为裁决,也可用于数据统计,这就需要用名称区分。所以独立的 Sharding 体系可以将 消息类型 和 名称 组合作为唯一标记。Sharding 体系确定后,就定义了一类具有相同功能的 Entity 集合,这里用 EntityTypeKey[MsgType](name) 来表示。MsgType 表示这类 Entity 可以处理的消息类型,name 是字符串标记。
Entity 数量可能很多,百万用户在线时一个场景通常会有数十万。直接对 Entity 管理代价很高,所以模拟现实世界中组织架构的方式 *分层*。将 Entity 分成若干组,以组为基本单位管理,这就是 Shard 。对 Entity 的访问退化为 Shard 访问,粒度从细变粗。考虑容灾、扩展等多方面因素,Shards 都不应存放于同一 Node,一定分散在不同 Nodes 上。这时就面临两个问题:
-
从 Shard 角度看:我应该在哪个 Node 上?这是 Allocate, Rebalance 无可回避的问题。
-
从 Node 角度看:我到底管理了哪些 Shard? 不在我管辖范围内 Shard 在哪儿?
这两个问题就分别需要 Shard Coordinator 和 Shard Region 解决。
- Shard Coordinator
-
当 Shard 创建及再平衡时,决定 Shard 和 Node 的映射关系。这种决策需要 Cluster 全局信息,且要做统一判断,所以 SC 是 Cluster 内全局唯一的处理逻辑。
- Shard Region
-
每个 Node 一个。作为当前 Node 所有 Shards 的管理器,扮演三种功能:
-
Cluster 内,和 SC 通信确定 Shard 位置,和其它 Node 上的 SR 通信路由消息。
-
对 Cluser 外,接收消息请求,转发到正确的 Shard 上。
-
Node 内,创建销毁 Shard,并转发消息。
-
Shard 数量通常是固定的,每个 Shard 中 Entity 的数量是动态变化的。这点 **非常关键**,这为 Entity 到 Shard 的稳定映射提供了可能。因为 Node 是物理存在,它的变化是不可避免的,Shard 是逻辑存在,可以稳定不变。这样可以保证 Session 和 Node 解耦,简化了 Session 到 Shard 映射复杂度。为了保证 Shard 能够较好均匀的分布在所有 Nodes 上,可以将 Shard 数量设置为 Nodes 总数的较大上限,比如 Nodes 数量的十倍。
该体系的具体运作方式,我们结合两个关键流程介绍:路由和再平衡。
路由
路由有两个过程:不存在时创建;存在时转发。根据目的地有两种场景:目标 shard 位于收到消息的 Node 本地;目标 shard 所在 Node 并非收到消息的 Node 。下面分别介绍这两种场景。
本地路由
-
Node A 收到消息 Msg1,转交给 SR 处理。Msg 的类型是 Type(GameID, SeqID, GameData) :GameID 表示单局 ID,也就是 Session Key(Session ID);SeqID 表示单局消息的顺序;GameData 表示消息的数据内容。
-
SR 从 Msg1 获取 Entity ID(E1)和 Shard ID(S1):
-
获取 Entity ID,一般情况下 Entity 和 Session 一一对应,所以 Entity ID 就是 Session ID。Session ID 包含在 Message 中,比如在消极比赛裁决中,就是 GameID。
-
根据 EntityID 获得 ShardID。这点非常关键,需要保证 Entity 到 Shard 稳定映射,由于 Shard 总数是不变的,一个简单有效的方式是对 Shard 总数取模,这个结果也总是稳定的。
-
-
SR 如果是第一次遇到 S1,是不知道位置的,需要向 SC 发起查询。如果已经查询过,且这段时间内没有发生变化,那么直接使用本地缓存结果即可。
-
SC 向 SR 返回查询结果:S1 在 Node A 上。
-
SR 发现 S1 在本地,那么创建 S1。
-
S1 继续创建 Entity: E1,并将消息路由给 E1。
上述过程描述了,Node 收到消息并在本地创建 Shard 和 Entity 的过程;如果已经创建好,那么本地路由就不必再和 SC 交互,直接转发给本地 Shard 即可,因为 SR 保留了本地路由信息。
远程路由
远程路由和本地路由的大概流程类似,区别点在于:当 SR_A 向 SC 查询后,发现目标 Shard(S2)在节点 B 上,那么将消息转发给 Node B。SR_B 在本地完成 S2 的创建和路由。在这种情况下,消息投递多了一次 Hop。如果要考虑优化,有两个方面:
-
SC 在做分配时除了负载均衡,还应考虑 Locality,减少 Node 间消息转发。如果某个 Node 收到消息那么尽可能将 Shard 分配到该 Node 上。
-
消息投递到 Cluster 时,虽然无论发送给哪个 Node 都可以保证消息投递,但如果稳定发送给固定 Node,则可以保证 Locality 更好的发挥作用。
再平衡
Cluster 内部 Nodes 状态变化,比如增加、移除、不可达,必然涉及 Shard 和 Node 映射关系的调整,这就是再平衡。再平衡具体包含两种情况:
-
增加 Node。Shard 需要从旧 Node 迁移到新 Node,或者新 Shard 直接在新 Node 创建。
-
移除 Node。这个过程多数是不可控的,相当于在剩余 Nodes 上重新创建 Shard,可以参考前文介绍的创建过程。
因此再平衡主要考虑 两个问题 :
-
增加 Node 时,Shard 和 Node 的映射关系如何更新。
-
Shard 状态如何迁移。
新增 Node
-
SC 感知到新增 Node,根据 Rebalance 策略,计算出需要将 Node B 上的 Shard(S1) 迁移到新 Node 上,并对新 Node 初始化——创建 Shard Region,S1'。然后开启整个迁移流程。
-
暂停所有 SR 关于 S1 位置的查询
-
通知所有 SR 停止向 S1 发送消息,将输入全部缓存在本地。目的在于让停止向 S1 发送新消息,让它处理完残留后再向新位置迁移。
-
S1 处理完所有消息后,告知 SC 自己已经没有遗留工作,可以关闭。对于有状态的 shard,还需要完成状态到 S1' 的同步。之后 SC 认为迁移流程结束。
-
SC 关闭 Node B 中的 S1.
-
SC 恢复 SR 关于 S1 位置的查询,并主动通知 S1 新地址——S1'。
-
缓存的消息或者新进消息,根据 S1' 新地址被路由到新 Node。
Shard 状态迁移
如果 Shard 中 Entity 是有状态的,且状态不可丢,那么需要将 Entity 的状态同步到新节点。不过同步方式和业务场景密切相关,不应该也没必要由 Shard 底层提供解决方案,只要做好流程控制即可。一般有两种方式:
- <Inputs, replay>
-
将导致状态变更的消息(Inputs)按序保存,并持久化到第三方存储,比如消息中间件 kafka。新 Entity 同步时将这些事件拉取到本地重播(replay)。这种方式要求内部逻辑能够依据输入完全恢复,在处理随机种子,绝对时间戳等情况时需要非常谨慎,如果消息量较大恢复时间较长。一个典型例子就是 Binlog。
- <Status, set>
-
将状态(Status)保存起来,类似一张快照(Snapshot),新 Entity 将快照数据设置到本地即可。这种方式保存的数据量可能较大,而且对更新频率要求较高否则会导致信息丢失。
实际应用时建议结合需求处理:
-
session 数据真的不可丢吗?有损服务是否可以接收?至少消极行为裁决,是不需要在迁移时状态同步的。最差情况就是迁移过程中一些消极行为没有检测到,但这比例很低。
-
session 数据不可丢。需结合业务特点判断 Binlog 和 Snapshot,哪种方案更适合,甚至两者结合。
顺序
细心的读者,可能会发现一个问题。如果任何一个 Session 的数据随意发送给任何一个 Node,虽然最终路由给正确的 Entity,但顺序是无法保证的。这实际上是网络通信的基本问题。从 A 到 B,如果有多条通路,那么无法保证 A 发送消息的顺序和 B 接收消息的顺序一致。这时有两种解决方案:
-
保证通信链路是单一的。对 Cluster Sharding 而言意味着,同一 Session 的消息应该发送给同一 Node,任意一个 Node 皆可,但应保持不变。
-
Entity 收到消息后重新排序。需要维持一定的缓存空间,可能会增加处理延迟。
下面结合消极行为裁决,我们来看看如何利用 sharding 解决各种问题。
Kafka 应用
我们再回顾下游戏单局数据处理的例子:应用举例——消极比赛。单局数据有多个应用场景,比如:单局结算、玩家生涯指标、大盘数据统计、消极行为裁决。这是典型的 Publish-Subscribe 场景,可以使用 Kafka 保存单局事件,做到不同服务间的解耦。由于消极行为裁决要求保留消息的原始顺序,所以在通信链路上应该保证唯一性。
Game Cluster 是单局服务集群,一个单局只会存在于一个 Node 上,比如 Game 3 在 Node 2 上。Game 3 作为 Session,将 Game ID 作为 Sessioin ID,并作为消息 Key 推送到 Kafka 中。因此可以保证,同一 Game 的所有消息会 按序 进入同一 Partition 。裁决服务集群作为 Consumer Group,从 Kafka 拉数据。每个 Partition 只会由一个固定的 Consumer 消费,在 Consumer Group 稳定的情况下,Partition 到 Consumer 的映射关系是稳定的,到 Entity 的链路也是稳定的。因此可以做到 Entity 3 稳定有序 的消费 Game 3 产生的数据。
Consumer Rebalance
这样似乎就够了,但实际生产环境很难保证 Consumer Group 维持不变,比如:
-
Consumer Group 扩容
-
Consumer 消费速度过慢,被 kafka 踢掉
-
Consumer 故障
-
如果 Consumer 基于 K8s 托管,可能根据资源使用情况调整容器位置
如图所示,Partition 3 本来由 Consumer 1 消费。Game 1 中的事件按先后顺序分别是 M1,M2,M3。当 M1 和 M2 被 C1 消费后,Consumer 3 加入成功,Partition 3 被分配给 C3。那么 Game 1 中的 M3 则转由 C3 处理。显然 G1 数据被截断分拆给不同的 Consumer 处理,结果是不正确的。
Sharding
我们期望的效果是 Partition 无论被哪个 Consumer 消费,同一 Session 的数据总能转发给同一 Entity 处理。经过前面的介绍,sharding 是非常适合的,应用流程如下所示:
-
单局的消息类型定义为 GameMsgType(GameID, SeqID, GameData) ,GameID 是单局的唯一标记,等价于 Session ID.
-
单局服务将 GameID 作为消息 Key 推送到 Kafka 中。保证了同一单局内的消息按序进入同一 Partition.
-
消极行为裁决服务作为消费端接入 Kafka。需要启用 Sharding 机制,先定义 Sharding 体系 EntityTypeKey ,消息类型是 GameMsgType ,名称是 Consumer Group ID。这样可以保证在 Kafka 的所有消费者中唯一。
-
定义 Extractor,从消息中获得 EntityID。需要能够体现 Session 和 Entity 的稳定映射关系,在当前场景下,可以视为 GameID。
-
根据 EntityID 获取 ShardID: ShardID=EntityID%ShardCount(>= PartitionCount),ShardCount 是一个逻辑概念,不应小于 Partition 数量。
-
再平衡策略。当 SC 接收 SR 请求为 Shard 分配 Node 时,优先保证分配给接收消息的 Node。好处在于 consumer 收到消息后收到消息后直接本地处理,减少一次网络转发。
总结
Sharding 技术,旨在实现逻辑处理单元和物理节点的解耦,将复杂的路由逻辑隐藏在集群内部实现,降低外部访问的复杂度。如果配合消息队列以及其它数据恢复技术,还可支持消息有序,节点状态迁移,比较适合做为分布式系统中细粒度有状态服务的路由解决方案。
Footnotes
Tyler Akidau, Slava Chernyak & Reuven lax. Streaming Systems: The What, Where, When and How of Large-Scale Data Processing[M].O'REILLY, 2018-07-12.