Tx's Blog
Published on

RocketMQ流存储:流场景的诉求是什么?

Authors

什么是流场景?

如图所示,Rocketmq在在线业务场景中的主要的应用是连接业务应用,解耦业务架构的上下游系统,比如交易系统的解耦。当用户触发了某个业务流程,比如购买,消息系统需要保证低延迟异步与下游的应用通信,执行消息数据对应的业务逻辑,触发下一个业务流程。每个消息的处理都是不相关的,无状态的。

而流场景主要侧重于数据集成,连接各种数据组件(rocketmq-connects),进行数据分发,解耦数据架构的上下游系统。例如日志解决方案,采集日志数据,进行ETL将日志数据分发到搜索引擎、流计算、数据仓库等。除了日志外,数据库Binlog分发、页面点击流也是常见的数据源。在这种场景下,由于是离线业务,对低延迟的需求较弱,对批处理吞吐量要求较高。在消息消费阶段,不再是无状态的单条消息消费,而是批量转储,或者批量流计算。

image-20241011171051735

流存储特性

流存储基础

普通消息和流场景消息的访问方式不同:

  • 由于用在数据集成场景,对于大规模的数据集成,不可避免的要涉及到数据的分片,基于数据分片来连接上下游数据系统。为了提升数据集成的质量,需要 Topic 的分区数不变,这样才能保证同一个分区的数据不会错乱。在消息的读写方式上,不再是指定 Topic 读写,而是指定 Topic 分片,也就是队列,进行读写操作。

  • 作为流存储系统,下游的消费通常会是一些流计算引擎,用于有状态计算。为了支撑流计算引擎的容错处理,它需要支持 checkpoint 机制,类似于为流计算引擎提供 redolog,能够按照队列位点重放消息,重新恢复流计算的状态。它也会要求分片内有序,同一个 key 的数据会 hash 到同一个分片,用于实现 keyby 的操作。

image-20241011171502610

流存储弹性

RocketMQ经典扩容模式

如果要将Topic A的容量扩容一倍,一般做法是新增一台机器,然后创建Topic A,新增等同数量的队列。这时,分片数量也会扩容一倍,无法满足流场景存储固定分区的场景。

而Kafka的扩容模式是,添加一个机器,并将原机器的部分分区迁移到新机器上,这样保证了分区数量不变。但是,当分区数特别多且数据量大时,将导致流量风暴的问题,且整个扩容时间不可控。

image-20241011173658470

RocketMQ 5.0 静态Topic扩容模式

为了解决流存储的扩容难题,RocketMQ 5.0提供了一种新的模式,引入静态Topic。可以做到分区数不变,且扩容过程无需数据迁移,且可以实现秒级的扩容。

它的实现关键点是引入逻辑队列的概念。对于用户来说,访问对象不再是原来绑定到某个Broker的物理队列,而是Topic全局的逻辑队列,每个逻辑队列会对应一个或者多个物理队列。如图所示,Topic A进行扩容一倍的操作,最初LogicQueue1对应的物理队列是Broker1-mq1。在扩容完成后,Broker1变为只读状态,LogicQueue1的最新读写操作都变为在Broker2-mq1中。生产者最新的消息都会发往 broker2-mq1。消费者如果读最新数据,则直接从 broke2-mq1 的物理队列里面去读取。如果它读的是老数据的话,读请求讲转发到旧物理队列 broker1-mq1。这样就完成了整个静态Topic的扩容流程。既保证的分区数不变,又实现了没有数据迁移,降低了大量的数据复制,提升了系统的稳定性。

image-20241011174431293

高吞吐

在业务场景中,消息的数据承载的是业务事件,比如说订单操作、物流操作,特点是数据包小,数据规模小,但是每条数据的价值都特别高,访问模式偏向于在线、单条事务的短平快访问模式。

而在流场景中,更多是离线数据,比如说日志、监控数据等。其特点是数据规模有数量级的提升,但单条数据的价值比较低,访问模式偏向于离线批量传输。

image-20241011174901981

在RocketMQ 5.0中,引入了端到端的批量消息。就是从客户端开始,在发送阶段,消息在客户端攒批到一定数量,直接一个RPC请求发到broker端。broker存储阶段,直接把整批消息存储,用批量索引的技术,一批消息只会构建一个索引,大幅度提升索引构建速度。在消费阶段,也是按照整批数据读取到消费端,先进行解包操作,最后执行消费逻辑。这样整个Broker的消息TPS可以从原先的十万级提升至百万级。

image-20241011190831899

有状态流

流存储通常会对接流计算引擎,比如Flink、Spark等。流计算引擎设计的一些有状态计算,如数据聚合,求平均、求综合、事件窗口等算子都需要维护计算状态。

在RocketMQ 5.0中,新增了CompactTopic的类型,是一种以流为核心的类KV服务,在不引入外部KV的情况下可以维护流的状态。例如用于股票价格流场景,股票交易,用户只关注每只股票的最新价格。

在CompactTopic中,没条消息就是一对KV。如果用常规的Topic,那么同一个Key的持续更新会占用大量的空间,影响读的效率。在生命周期管理上,也会因为磁盘占用过高,按照FIFO的方式,旧数据被整批删除。而对于CompactTopic来说,Broker会定期对同一个Key的消息进行合并,节约存储空间,用户对Topic的流式访问,也只会读取到每个Key的最新值。

image-20241011191528203

流数据Schema

随着RocketMQ的数据生态的繁荣,数据集成设计的上下游组件越来越多,提升数据治理能力也变得迫在眉睫。因此在RocketMQ 5.0引入schema的概念,为消息增加了结构化的描述。它带来了几个好处:1.提高类型安全,避免消息结构化导致上下游数据集成不兼容。2.提升数据集成的研发效率,上下游通过schema注册中心获取消息结构,节约沟通成本,内置高效序列化机制,无需编写重复的序列化代码。同时在流表融合的大背景下,消息schema能和数据库表结构的概念完成映射,提升流式SQL亲和度。

下图就是消息schema的架构。首先会有一个 schema 注册中心的组件,维护 schema 的数据,数据存储基于CompactTopic。在消息收发的过程中,客户端都会先去获取 schema 的格式,进行格式的校验,用内置的序列工具进行序列化,从而完成整个消息收发的链路。

image-20241011192121731

以下是一个代码示例,生产者和消费者直接包装业务对象即可完成消息的发送和消费:

image-20241011192330525

典型案例

  1. 日志采集和流SQL分析,首先通过批量的索引,提升日志采集的吞吐量,降低机器成本。为日志消息引入schema,包含三个字段:User_id(用户ID)、Item_id、behavior(用户行为)。在下游对接了Flink和RSQLDB,完成流式SQL分析。
image-20241011192754872
  1. 异构数据库的同步。如下图,有两个数据库,一个是按照买家 ID 的维度进行分库分表的。另外一个是按照卖家 ID 的维度进行分库分表,我们需要实时同步两个数据库的订单状态。基于 RocketMQ 的的流存储的能力,上游按照订单的 ID 去对 Binlog 进行分片,确保同一个记录的 binlog 数据能分发到同一个队列。在消费阶段按照顺序重放队列里的 binlog 数据,把数据同步到卖家库。当流量不足时, RocketMQ 对静态 Topic 进行扩容,分区数不变,保障了数据同步的正确性。
image-20241011192959085

结论

RocketMQ 5.0在流场景中的应用为数据架构的解耦带来了极大的便利。无论是静态Topic的扩容模式,还是高效的批量消息处理,都展示了RocketMQ在流存储领域的强大能力。通过引入Schema,RocketMQ不仅提升了数据集成的效率,还增强了系统的稳定性和扩展性。

参考

常见问题解答

  1. 什么是RocketMQ的静态Topic?

    静态Topic是RocketMQ 5.0引入的一种新模式,旨在扩容时保持分区数不变,避免数据迁移,实现秒级扩容。

  2. RocketMQ如何支持高吞吐的流场景?

    RocketMQ 5.0通过引入端到端的批量消息处理机制,大幅提升了消息传输效率,使得Broker的消息TPS从十万级提升至百万级。

  3. 什么是CompactTopic?

    CompactTopic是一种类KV服务的流存储类型,用于维护流计算中的状态,如股票价格流,确保每个Key的最新值被有效存储和读取。

  4. 如何在RocketMQ中使用Schema?

    Schema是在RocketMQ 5.0中引入的,为消息增加结构化描述,以提高类型安全性和数据集成效率。通过Schema注册中心,客户端可以获取和校验Schema格式,从而完成消息的序列化和反序列化。

  5. RocketMQ如何实现异构数据库的同步?

    RocketMQ通过流存储能力,将上游的Binlog数据按订单ID分片,并在消费阶段顺序重放队列中的Binlog数据,从而实现异构数据库的订单状态同步。