前言

RocketMQ是一款开源的分布式消息队列系统,具有高吞吐量、低延迟和高可靠性的特点,广泛应用于金融、电商、物流等高并发场景。其架构设计支持海量数据的实时传输和处理,确保消息的顺序性和一致性。本文将深入探讨RocketMQ的架构设计原理和最佳实践,帮助开发者在分布式系统中更好地利用这款强大的消息中间件。

架构设计

网络拓扑架构

  • 网络架构拓扑:2M / M-S / NM-NS /主从切换/ 云服务(计算存储分离,如 pulsar)

Untitled

核心模块组成

  • Common 公共模块
  • NameServer 模块(注册中心)
  • Remoting 通信模块(基于 Netty)
  • Client 模块(生产消费者)
  • Broker 模块(MQServer 服务模块)
  • Store 模块(存储层)

核心优势

  • 核心设计:
    • Broker
      • 实现 FastFail 机制,在写盘出现瓶颈的时候,根据内存序列化的延迟时间会自动触发自我保护的机制;
    • Client
      • 生产端:Latency 消息投递路由策略,保证延迟更低的 Broker 优先投递,延迟高的 Broker 自动剔除与恢复,提升了生产端消息发送的性能;
      • 消费端:采用拉取模式,长轮询模式(longpolling)降低 Broker 主动推送消息的压力,在 Consumer 端做拉取;
    • Remoting
      • 基于 Netty,整体设计非常的轻量、高性能;采用 json 序列化方式;
    • Store
      • 存储结构:CommitLog + ConsumeQueue + IndexFile
      • 内存序列化:顺序写机制,充分借力于 OS PageCache 与 ZeroCopy 机制,通过 MappedByteBuffer(内存映射)对文件进行读写操作;优秀的隔离策略将内存操作与物理操作线程异步化等等
      • 读写分离:DirectBuffer 堆外内存写、MappedByteBuffer 内存映射读
      • 物理文件内存预加载、预热机制等优秀的思想
      • JNA 锁内存:提升文件读写效率、防止资源不足时发生内存 SWAP
  • 易用性好,容错性强,API 功能强大;
  • 生产者
    • 支持消息同步、异步、单向、事务消息,各种灵活性配置
    • 支持消息批量发送、消息压缩、自定义属性透传设置等
  • 消费者
    • 支持集群、广播模式,消费端支持推拉、定时消费模式
    • 消费方式支持并发、顺序消息
    • 支持消息过滤 Tag、支持标准 SQL 语法过滤
    • 消费端 rebalance 策略丰富,默认采用一致性哈希,并且可以根据场景做自定义扩展
    • 容错性支持消息重试策略、精准消息回溯(按时间戳)、支持流控、支持并发跨度设置等等

轨迹跟踪

  • 全链路消息轨迹流程

Untitled

  • Client 端(生产者)发送消息流程图

Untitled

  • Broker 层接收消息流程图

Untitled

  • Store 层持久化消息流程图

Untitled

  • Client 端(消费端)消息拉取流程图

Untitled

  • Broker 端接收拉取请求流程图

Untitled

  • Client 端(消费端)消息消费流程图

Untitled

存储设计

  • 业务层存储交互领域设计

Untitled

  • DefaultMessageStore 核心功能实现
    • FlushConsumeQueueService: 消息队列文件 ConsumerQueue 刷盘线程
    • CleanCommitLogService:定时清理 CommitLog 文件;
    • CleanConsumeQueueService :定时清理 ConsumeQueue 文件;
    • IndexService:消息查询索引服务,用于查询消息;
    • AllocateMappedFileService:MappedFile 分配服务,用于创建或删除 MappedFile;
    • ReputMessageService:异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引文件)数据;
    • HAService:用于主从数据同步;
    • ScheduleMessageService:用于延迟消息的处理服务;
    • StoreCheckpoint:文件检查点服务,用于故障恢复;
    • StoreStatsService:存储的指标统计信息服务;
    • BrokerStatsManager:Broker 统计信息服务;
  • 底层存储核心类结构

Untitled

  • 核心存储文件设计
    • commitLog 文件结构设计
      • 单个文件大小默认为 1G;文件名长度为 20 位,左边补零,剩余为起始偏移量,比如:00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824
      • 若第一个文件写满了,则会生成第二个文件,文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息是顺序写入日志文件,当文件满以后,写入下一个文件
    • ConsumeQueue 文件结构设计
      • ConsumeQueue 文件夹的组织方式如下 topic/queue/file 三层组织结构;具体存储路径为:{usr.home}/store/consumequeue/{topic}/{queueId}/{fileName}
      • ConsumeQueue 文件与 CommitLog 一样也是采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 CommitLog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M
    • Index 文件结构设计
      • IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。Index 文件的存储位置是:{usr.home}\store\index{fileName},文件名 fileName 是以创建时的时间戳命名的

      • 固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 结构,故 RocketMQ 的索引文件其底层实现为 Hash 索引

        Untitled

最佳实践

生产端

  • 关于生产者应用的业务场景
    • 一般性业务使用同步消息发送,注意两点即可
      • 在代码的最后发送消息
      • 完善且细粒度、准确的进行 try catch finally
    • 对于高并发、对性能要求高的场景下,采用异步消息,注意点
      • 在回调函数进行标记处理,但要注意 Broker ACK 响应请求抖动、断路问题,需要定时任务补偿
    • 对于顺序消息,一般而言非必要场景不需要使用
      • 一般的使用场景就是通过其业务 id(或关键路由字段),比如做 hash 操作等,可以路由到指定的队列;就如我们每个门店编号消费消息就可以采用其顺序消息;类似 kafka 的 partition,把相同业务属性的消息投递到一个队列里,保证局部有序且做消费区分
  • 对于重要不可丢失的消息
    • 选择事务消息最佳,rocketmq 事务消息并不重,相当于一次同步消息,只是增加了 Broker 端的存储成本而已
    • 本质上事务消息并不会加数据库事务(如果需要自己需要手工开启、提交/回滚)
    • 要注意消息回调事务检查问题:TransactionalMessageServiceImpl#check 方法里的 needdiscard 方法,通过 TRANSACTION_CHECK_TIMES 属性设置了回查的次数(默认 15 次);这个属性也会带入到生产者回调检查函数里,也就是 15 次会查就会默认丢弃该消息(所以我们一定要在会查函数里面把这个参数取到,判断其 TRANSACTION_CHECK_TIMES 次数,再次做 failover 记录人工处理
  • 生产者到底是单条发送还是批量发送?
    • 一般而言,我们倾向于消息的单条发送,rocketmq 默认消费 api 也是只处理一条消息,也说明了官方的倾向性
    • 批量消费的场景诉求
      • 首先可能是希望一批数据处理完成后一次性 commit;是要保障整体消息的原子性
      • 如果业务诉求实际是可接受整体完整性的延迟完整性,那么我们可以把批量中失败的消息记录后重回队列后补偿处理
      • 如果业务需要绝对的完整性,那么只能一次性提交,其中有失败就需要整体失败;这种情况可能会影响业务处理速度和性能,也会导致业务延迟和因为卡住
      • 如果既要追求性能和数据强一致性,那我们架构设计一般会配合其他组件来完成;单靠 MQ 不能彻底解决该类问题
  • 生产端 FastFail 机制流程

Untitled

生产端 Case

  • 发送消息失败

Untitled

Untitled

  • 为什么发送失败?
    • 首先在消息流量高峰期,一定是触发了两个情况
      • 第一种:RocketMQ 首先判断的就是是否 OS Pagecache busy,他判断的依据就是内存序列化写盘大于 1 秒,我理解这种情况应该是什么原因?系统资源瓶颈 or 消息大报文
      • 第二种:确实队列里消息量太大了,被流控了,通过 cleanExpiredRequestInQueue 字样就知道

Untitled

总结

RocketMQ通过其高效的架构设计和灵活的扩展性,为分布式系统提供了可靠的消息传输解决方案。理解其内部架构、消息模型和最佳实践,可以帮助开发者在高并发、分布式环境下实现稳定、高效的消息队列管理,提升系统的整体性能和可靠性。

参考链接

  1. Apache RocketMQ 官方文档