RocketMQ实现原理

一、MQ协议

AMQP

应用场景:主要是面向服务端,机器数量不会超过1万台,需要高性能、高吞吐量,RabbitMQ就是基础此协议。

AMQP全称Advanced Message Queuing Protocol(高级消息队列协议), 是一个二进制协议,具有以下特性:多通道、可协商、异步、安全、高效等。协议主要分为两层:

功能层:定义了一系列指令在应用层工作。

传输层:通过多路复用、分帧、内容编码、心跳检测、数据定义、错误处理等方式,实现数据在 客户端<-->服务端 双端的传输。

分层的好处:可以用任意协议替换传输层,而不需要修改功能层。也可以对不同的高级协议使用相同的传输层。

整体架构

AMQ模型

数据流:生产者把消息发送到服务端,由Exchange将消息路由到不同的消息队列中,消息队列进行存储然后再消息转发给消费者。

更多细节可以查看:https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf

MQTT协议

应用场景:面向移动端,海量连接,使用卫星网络,带宽小、不稳定性因素多。实现有Mosquitto。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议) ,是用于消息队列服务的轻量级、发布订阅、端到端网络协议,运行在TCP/IP协议之上,为资源或网络带宽有限的远程设备提供实时有效的消息服务。

优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统。

MQTT架构

用温度传感器的例子去理解整体架构:发布者首先定义一个温度Topic,随后发布温度值到队列中,消息发布后,另一侧的手机或PC设备订阅温度Topic,然后接收到发布的温度值消息。

Kafka、RocketMQ

Kafka、RocketMQ都是使用的自定义协议,用于处理海量流式数据,具有分布式、高性能、高可靠、低延迟、高扩展性等特征。

Kafka架构图

RocketMQ架构图

二、使用场景

异步解耦

用户支付完订单后,比较耗时并且不需要同步反馈的动作放入mq中,后续物流、短信等系统消费到后再进行对应的业务处理,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦。

削峰


比如618、双11这种大促活动,C端流量很大,比如用户购买商品时,需要扣减库存,但是库存系统无法承载海量调用量,则需要把跟库存相关的操作都放于队列中。

同时可以利用时间窗口降低数据库压力:比如热门商品的库存是1000,共有900人在一秒内秒杀了,则可以通过设置消费时间窗口为1s,将相同商品的库存扣减聚合起来,只对数据库进行一次更新。

延迟消息

定点发布文章、视频等等类似的场景,都可以利用延迟消息的特性。

事务消息

需要保证两个系统之前的数据最终一致,比如订单生成成功则一定要发出通知,失败则不发通知。

三、架构设计

详情:https://github.com/apache/rocketmq/tree/develop/docs/cn

技术架构

RocketMQ架构上主要分为四部分,如上图所示:

为什么RocketMQ不使用Zookeeper作为注册中心呢?

我认为有以下几个点是不使用zookeeper的原因:

  1. 根据CAP理论,同时最多只能满足两个点,而zookeeper满足的是CP,也就是说zookeeper并不能保证服务的可用性,zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
  2. 基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而zookeeper的写是不可扩展的,而zookeeper要解决这个问题只能通过划分领域,划分多个zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
  3. 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
  4. 消息发送应该弱依赖注册中心,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

部署架构

RocketMQ 网络部署特点

结合部署架构图,描述集群工作流程:

四、工作流程

4.1 Topic创建、更新

4.11 集群模式


假设ClusterA下存在broker-a、broker-b,在ClusterA中创建TopicA,读写queue均设置为3,则会分别在broker-a和broker-b中创建相同数量的读写queue。

4.12 broker模式

以broker模式创建的Topic在不同的broker之间的queue允许不同,比如TopicA在broker-a中读写queue为6,而broker-b中的读写queue设置为4。

4.13 读写queue

为什么需要分别设置读和写queue?

用途:topic路由信息

1、当w=r时,同一个组的consumer会均分队列进行消费

如下,w=r=5

2、当w > r时,会导致部分queue无法消费

如下场景,w=5, r=4,会出现queue4无人消费情况。

3、特别说明,当consumer实例数量>读queue数量时,会导致部分consumer没有消费

设置读写队列主要针对topic扩缩容场景,考虑如下情况:

topic原来w=5,r=5; 现在需要进行缩容,则先把w=3,r=5,待queue3、queue4消费完成后,再把r置为3

初始如下:

缩容,把w设置为3:

queue-3、queue-4消费完成,设置r为3,完成缩容:

4.2 消息发送

以broker双主模式部署举例,TopicA在broker-a中读写队列数量为4,在broker-b中读写队列数量为4,则消息发送过程如下:

4.3 消息存储

消息发送到broker后,数据存储流程如下:

CommitLog:消息主体以及元数据的存储主体,单个文件大小默认1G,思考大小为什么是1G?

ConsumeQueue:ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构。

IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。

Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。

消息刷盘

(1) 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。

(2) 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

4.4 顺序消息

4.41 业务使用场景

4.42 分区顺序

一个Partition(queue)内所有的消息按照先进先出的顺序进行发布和消费。

在MQ的模型中,顺序需要由3个阶段去保障:

  1. 消息被发送时保持顺序。
  2. 消息被存储时保持和发送的顺序一致。
  3. 消息被消费时保持和存储的顺序一致。

Producer端

Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的选择,比如通过把唯一键如id经过hash算法固定写到同一分区。

Consumer端

如何保证一个队列只被一个消费者消费?

创建消息拉取任务时,消息客户端向broker端申请锁定MessageQueue,使得一个MessageQueue同一个时刻只能被一个消费客户端消费。

消息消费时,多线程针对同一个消息队列的消费先尝试使用synchronized申请独占锁,加锁成功才能进行消费,使得一个MessageQueue同一个时刻只能被一个消费客户端中一个线程消费。

4.43 全局顺序

需要设置topic下读写队列数量为1,一个Topic内所有的消息按照先进先出的顺序进行发布和消费.但是全局顺序极大的降低了系统的吞吐量,不符合mq的设计初衷。

4.5 延迟消息

场景:用户下单后如果30分钟未支付,则该订单需要被关闭。

思考:如何实现任意粒度的延迟消息?

4.6 事务消息

事务消息流程:

1.事务消息发送及提交:

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2.补偿流程:

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

还未commit的半消息对用户不可见,那么,如何做到写入消息但是对用户不可见呢?

RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

4.7 死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

4.8 Rebalance

触发Rebalance的根本因素无非是两个:1 ) 订阅Topic的队列数量变化 2)消费者组信息变化。导致二者发生变化的典型场景如下所示:

topic queue变更&& broker实例变更

broker日常运维时的停止/启动或者broker异常宕机,也有可能导致队列数量发生变化,不论是停止/启动/扩容导致的所有变化最终都会上报给NameServer。客户端可以给NameServer发送GET_ROUTEINTO_BY_TOPIC请求,来获得某个Topic的完整路由信息。如果发现队列信息发生变化,则触发Reabalance。

consumer 变更

影响

五、缺点

架构设计决定了数据需保存在本地,本地数据的管理限制了扩展性,提高了运维成本,同时吞吐量受本地磁盘的限制。新一代计算存储分离的云原生消息队列应运而生,具体可查看pulsar的相关设计:https://pulsar.apache.org/docs/2.10.x/concepts-architecture-overview。

展开阅读全文

页面更新:2024-06-17

标签:死信   队列   集群   路由   顺序   原理   消费者   协议   事务   消息   信息

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top