消息队列那么多,为什么建议深入了解下RabbitMQ?

消息队列那么多,为什么建议深入了解下RabbitMQ?

你为啥要在项目中选择xxx消息中间件?

提起消息队列,也许你的脑海里会不自觉地蹦出好多概念:JMS、Kafka、RocketMQ、AMQP、RabbitMQ、ActiveMQ、Pulsar、Redis Stream...如果你的项目中恰好用到了其中的一个消息中间件,那么你出去面试或者与同事交流技术的时候,对方很大概率会问你:为啥要选择xxx消息中间件?

如果你刚好只了解你正在用的消息中间件,那么你只能回答:因为只会xxx...

消息队列那么多,为什么建议深入了解下RabbitMQ?

不...这绝对不是你想要的结局!在对方准备看你的笑话之前,你也许可以主动发起还击,把所有的框架的诞生背景、优缺点,适用场景等都说一遍,从概念到原理,从特性到源码。在说完了之后,为了不让对方感觉到尴尬,你应该故意停顿片刻,暗示对方自己不想再聊这个话题了,好让对方有喘息的机会,让他趁早切换话题,给他一个台阶下。

消息队列那么多,为什么建议深入了解下RabbitMQ?

为了让自己能有如此实力,你务必要对这些常见的消息中间件有比较深入的了解。

我们先来看看这些技术的发展史。

MQ技术发展史:

如下图所示:

消息队列那么多,为什么建议深入了解下RabbitMQ?

Pub/Sub模式以及第一个消息中间件诞生的故事

1985年在高盛,Ranape 找到了他的第一个客户,并确定了他的软件总线要解决的问题:金融交易。

当时,一个交易员的摊位挤满了不同的终端,用于提供交易员完成工作所需的每种类型的信息。 Teknekron 看到了替换所有这些终端及其应用程序的机会:通过Ranape软件总线取而代之,只需保留一个工作站即可,其显示程序可以作为消费者插入Teknekron软件总线,并允许交易者“订阅”其想要查看的信息。 Pub/Sub 诞生了,世界上第一个现代消息队列软件也诞生了:Teknekron的The Information Bus(TIB)。

而RabbitMQ作为传统的消息中间件,被大量应用于各种古老的项目,你第一个要拿下的就是它了,本文将带您从以下各个方面了解RabbitMQ相关知识:

关于其他的消息中间件,我会在下篇文章中继续给大家分享。

RabbitMQ是一种使用Erlang语言编写的开源的消息中间件,最初实现了AMQP(高级消息队列协议),后来通过插件架构进行了扩展,支持STOMP(面向流文本的消息传递协议)、MQTT(MQ遥控传输)等协议。

详细关于RabbitMQ支持的消息协议,参考官网:Which protocols does RabbitMQ support?[1]

更多内容欢迎关注公众号Java架构杂谈,或者我的博客IT宅itzhai.com

1. RabbitMQ优势

RabbitMQ支持多种客户端,如Python、Java、.NET、C、Ruby等,在易用性、扩展性、高可用性等方面表现都不错,并且可以与SpringAMQP完美整合,API丰富易用。

RabbitMQ程序健壮、稳定、易用,跨平台、支持多种语言,管理界面简单易用,功能全面,文档相对比较齐全,社区活跃。

2. AMQP简介

AMQP,全称为:Advanced Message Queuing Protocol,高级消息队列协议,是面向消息中间件的开放标准的二进制应用层协议。AMQP的核心特性是:面向消息、排队、路由(包括点对点和发布订阅)、可靠性和安全性。这些功能使其非常适合在应用程序之间传递业务消息,AMQP还可以用作物联网IoT协议。

目前,AMQP 1.0已经被批准为国际标准,具体规范文档,可以进一步阅读:OASIS Advanced Message Queuing Protocol (AMQP) Version 1.0[2]

而RabbitMQ最初是为了支持AMQP 0-9-1[1:1]而开发的,因此,该协议是RabbitMQ Broker支持的核心协议。

下面我们就简要介绍下AMQP 0-9-1协议[3]。这部分内容,概念会比较多,稍微有点枯燥,但是可以说RabbitMQ就是按照这个协议去实现的,所以熟悉这个协议很重要。

完整的AMQP文档可以从这里下载:AMQP Working Group 0-9-1[4]

2.1 AMQP模型概述

2.1.1 AMQP 0-9-1

AMQP 0-9-1 是一个二进制协议,定义了非常强大的消息传递语义。对于客户端来说,这是一个相对容易实现的协议,因此有大量客户端库可用于许多不同的编程语言和环境。

AMQP 0-9-1通常划分为两层:

消息队列那么多,为什么建议深入了解下RabbitMQ?

可以在不改变协议的应用程序相关功能的情况下用任意的传输协议来替换传输层,也可以将传输层用于不同的高级协议。

2.1.2 AMQP 0-9-1模型简介

如下图,消息Broker(代理)从消息发布者(发布消息的应用程序,也称为生产者)接收消息并将它们路由到消费者(处理消息的应用程序)。

由于AMQP是一个网络协议,因此,生产者消费者代理都可以部署在不同的机器上。AMQP模型如下图所示:

消息队列那么多,为什么建议深入了解下RabbitMQ?

消息发布到交换机(exchanges)(通常将其比作邮局或邮箱),然后使用称为绑定(Bindings)的规则将消息副本分发到队列(queues)。然后代理(brokers)要么将消息传递(deliver)给订阅队列的消费者(consumers),要么消费者主动按需从队列中获取(fetch)/拉取(pull)消息。

消息元数据:发布消息的时候,发布者可以指定各种消息元数据(消息属性),其中一些元数据可能由代理使用,其余的元数据仅由接收消息的应用程序使用。

消息确认:由于网络是不可靠的,并且应用程序可能无法正确处理消息,因此AMQP 0-9-1模型有一个消息确认的概念:当消息传递给消费者时,消费者会自动或者由开发人员在应用程序中手动指定通知代理Broker,代理只会在收到消息(或消息组)的通知时从队列中完全删除该消息。

死信队列:在某些情况下,例如,当消息无法路由时,消息可能会返回给发布者、或者丢弃掉、或者将其放入所谓的死信队列(如果代理扩展支持),发布者通过使用某些参数来选择如何处理此类情况。

队列(queues)/交换机(exchanges)和绑定(bindings)统称为AMQP实体。

2.1.3 AMQP 0-9-1 是一个可编程的协议

AMQP 0-9-1是一种可编程的协议,因为AMQP 0-9-1实体路由方案由应用程序本身定义,而不是代理管理员。因此AMQP制定了一些规定来实现这些协议操作:

这为应用程序开发人员提供了很大的自由,但也要求他们了解潜在的定义冲突。在实践中,定义冲突很少见,通常表示为配置错误。

应用程序声明它们需要的AMQP 0-9-1实体,定义必要的路由方案,并在不需要使用它们时进行删除。

2.2 交换机(Exchanges)和交换机类型

交换机是发送消息的AMQP 0-9-1实体。交换机收到一条消息,并将其路由到零个或者多个队列中。咳咳,Java架构杂谈提醒大家,不要联想到了网络的交换机(Network switch),只是中文名称一样而已。

使用的路由算法取决于交换机类型和称为绑定的规则。以下是AMQP 0-9-1 Broker提供的四种交换机类型:

交换类型

默认的预定义名称

直连交换机(Direct exchange)

空字符串和amq.direct

扇形交换机(Fanout exchange)

amq.fanout

主题交换机(Topic exchange)

amq.topic

头信息交换机(Headers exchange)

amq.match 和RabbitMQ中的 amq.headers

除了交换类型之外,交换机还声明了许多属性,关键属性有:

2.2.1 默认交换机

默认交换机是由Broker预先声明的匿名直连交换机。使用默认交换机的时候,每个新建队列都会自动绑定到默认交换机上,绑定的路由键与队列名称相同,默认交换机看起来可以将消息直接传递到队列。

2.2.2 直连交换机

交换机根据消息路由键(router_key)将消息传递到队列,消息将会投递到与路由键名称和队列名称相同的队列上。直接交换机是消息单播路由的理想选择(尽管它们也可以用于多播路由)。

直连交换机如下图所示:

消息队列那么多,为什么建议深入了解下RabbitMQ?

如上图,具有路由键"itzhai.com"的消息达到交换机之后,则会路由到Queue1中。

直连交换机通常用于以循环的方式在多个消费者之间分配任务,也就是说,消息的负载均衡是发生在消费者之间而不是队列之间

2.2.3 扇形交换机

扇形交换机将消息路由到绑定到它的所有队列,并且忽略路由键。也就是说,当新消息发布到该交换机时,该消息的副本将投递到所有绑定该交换机的队列。扇形交换机是消息广播路由的理想选择

扇形交换机如下图所示:

消息队列那么多,为什么建议深入了解下RabbitMQ?

使用扇形交换机的案例都非常相似:

2.2.4 主题交换机

主题交换机根据消息路由键和和用于将队列绑定到交换机的模式匹配字符串之间的匹配将消息路由到一个或者多个队列。

也就是说通过消息的路由键去匹配到绑定到交换机的路由键匹配字符串,如果匹配上了,就进行投递消息。

routing key模糊匹配的通配符如下:

routing key通过.分隔字符串。

主题交换机如下图所示:

消息队列那么多,为什么建议深入了解下RabbitMQ?

当生产者发送的routing_key=itzhai.com的时候,会把消息路由投递到Queue1和Queue2。

当生产者发送的routing_key=www.itzhai.com的时候,会把消息路由投递到Queue3。

当需要有针对性的选择多个接收消息的消费者或者应用的时候,主题交换机都可以被列入考虑的范围。常见的使用场景:

2.2.5 头交换机

头交换机不依赖路由键的匹配规则来路由消息,而是根据发送消息内容中的请求头属性进行匹配。

头交换机类似于直连交换机,但是直连交换机的路由键必须是一个字符串,而请求头信息则没有这个约束,它们甚至可以是整数或者字典。因此可以用作路由键不必是字符串的直连交换。

绑定一个队列到头交换机上的时候,会同时绑定多个用于匹配的头信息。

投递消息的时候,可以携带一个x-match参数,指定是否要求必须所有的头信息都匹配(all)才能进行投递,还是只要匹配任何一个就可以了(any)。

注意:以字符串x-打头的头属性,不会作为匹配项。

2.3 队列(Queues)

AMQP 0-9-1 中的队列与其他消息队列和任务队列系统中的队列类似,用于存储即将被消费的消息。一般地,队列与交换机共享一些属性,但队列也有一些特定的属性:

关于队列的声明:

在使用队列之前,必须先声明它。声明队列的时候,如果队列尚不存在,则将创建一个队列;如果队列已存在,并且属性与声明中的属性相同,则不用重新创建一个;如果现有队列属性与声明的队列属性不同,将会引发406(PRECONDITION_FAILED)的通道级异常。

2.3.1 队列名称

应用程序可以设置队列名称,如果设置为空字符串,Broker会为它们生成一个唯一的队列名称,在队列声明响应体中一起返回给客户端。队列名称为255个字节以内的UTF-8字符。

amq开头的队列名称,保留给Broker内部使用,如果尝试使用此类名称声明一个队列将导致通道级别异常:403(ACCESS_REFUSED)

2.3.2 队列持久化

持久化的队列的元数据会存储在磁盘上,当Broker重启之后,队列依然存在。没有被持久化的队列称为暂存队列。发布的消息也有同样的区分,也就是说,持久化的队列并不会使得路由到它的消息也具有持久性,需要手动把消息也标记为持久化才能保证消息的持久性。

2.4 绑定(Bindings)

绑定是交换机将消息路由到队列的规则。为了让交换机能够正确的把消息投递到对应的队列,需要把交换机和队列通过路由键绑定起来,路由键就像是一个过滤器,决定了消息是否可以投递给消息队列。

如果一条消息不能被路由到任何队列(例如,因为它被发布到的交换机没有绑定),它要么被丢弃,要么返回给发布者,这取决于发布者设置的消息属性。

2.5 消费者

如果消息只是存储在队列里没有被消费,是没有什么实际作用的。在AMQP 0-9-1中,有两种途径可以进行消息的消费:

如果应用程序对某一个特定队列的消息感兴趣,则可以注册一个消费者,对队列进行订阅。每个队列可以有多个消费者,当然也可以注册一个独享的消费者,这个时候其他消费者会被排除在外。

每个消费者(订阅)都有一个称为消费者标签的字符串类型的标识符,可以用它来退订消息。

2.5.1 消息确认

消费者应用程序可能偶尔无法处理单个消息或有时会崩溃,另外网络问题也有可能导致问题。这就提出了一个问题:Broker何时应该从队列中删除消息?AMQP 0-9-1 规范中约定让消费者对此进行控制,有两种确认模式:

在显示模式下,应用程序选择何时发送确认消息。如果消费者在没有发送确认的情况下就挂掉了,那么Broker会将其重新投递给另一个消费者,如果此时没有可用的消费者,那么Broker将等到至少有一个消费者注册到该队列时,再尝试重新投递消息。

另外,如果应用程序崩溃(当连接关闭时 AMQP Broker会感知到这一点),并且AMQP Broker在预期的时间内未收到消息确认,则消息将重新入队,如果此时有其他消费者,可能立即传递给另一个消费者。为此,我们的消费者做好业务的幂等处理也是非常重要的

2.5.2 拒绝消息

当消费者接收到消息之后,可能处理成功或者失败。应用程序可以通过拒绝消息向Broker表明消息处理失败了(或者当时无法完成)。拒绝消息的时候,应用程序可以要求Broker丢弃消息或者重新入队。

当队列中只有一个消费者的时候,请确保您不会通过不断地拒绝消息和重新入队导致消息在同一个消费者身上无限循环的情况发生。

在AMQP中,basic.reject方法用来执行拒绝消息的操作。

2.5.3 预取消息

在多个消费者共享一个队列的情况,能够制定每个消费者在发送下一个ack之前可以一次性接收多少条消息,这是非常有用的特性。这可以在试图批量发布消息的时候,起到简单的负载均衡和提高消息吞吐量的作用。

请注意:RabbitMQ仅支持通道级预取计数,不支持基于连接或者大小的预取。

2.6 消息属性和有效负载

AMQP 0-9-1模型中的消息是具有属性的,有些属性非常常见,以至于AMQP 0-9-1明确定义了它们,例如:

有些属性是被AMQP的Broker所使用的,但是大多数是开放给接收它们的应用程序用的。有些属性是可选的,称为消息头(headers),它们类似于HTTP协议的X-Headers,消息属性需要在消息被发布时定义

消息体:AMQP消息除了属性之外,还包括一个有效载荷Payload(消息实际携带的数据),AMQP Broker视其为一个透明的字节数组来对待。Broker不会修改payload。消息可能只包含属性而没有payload。payload通常使用JSON、Thrift、Protocol Buffers和MessagePack等序列化格式来序列化成结构化的数据,以便进行发布,协议对等方通常使用Content typeContent encoding字段来传达此信息。

消息持久化:消息可以作为持久性发布,这使得Broker将他们持久化到磁盘。如果服务器重启之后,系统可以确保接收到的持久化消息不会丢失。简单的将消息发布到持久化的交换机或者被路由到持久化的队列中,是不会让消息也持久化的,消息是否持久化完全取决于消息本身的持久模式。将消息发布为持久性会影响性能,就像数据存储一样,持久性以一定的性能成本作为代价。

2.7 AMQP 0-9-1 方法

AMQP 0-9-1中定义了许多操作方法,详细参考:AMQP 0-9-1参考[5]。

很多方法都有对应的响应方法,有些甚至有不止一种可能的响应,如basic.get,响应可能为:get-ok或者get-empty

如下是声明一个交换机和响应成功的方法:

消息队列那么多,为什么建议深入了解下RabbitMQ?

2.8 连接(Connections)

AMQP 0-9-1 连接通常是长连接,AMQP 0-9-1 是一种使用TCP提供可靠投递的应用层协议。AMQP 0-9-1连接使用身份认证机制并提供TLS (SSL)保护。当应用程序不再需要连接到Broker时,它应该优雅地关闭其 AMQP 0-9-1 连接,而不是突然关闭底层 TCP 连接。

2.9 通道(Channels)

某些应用程序需要同时开启连接到Broker,但是,同时保持许多TCP连接是不可取的,这样会消耗系统资源并且使得配置防火墙更加困难。

AMQP 0-9-1通过通道复用技术通过通道的形式实现在一个TCP连接上面支持多个连接(虚拟的链接)。同一个TCP连接中有多个通道,通道之间的通信是完全隔离的。客户端的每个协议操作都携带了一个通道ID,代理和客户端都是用它来确定该操作所走的通道。

消息队列那么多,为什么建议深入了解下RabbitMQ?

通道仅存在于TCP连接上下文中,一旦TCP连接关闭,其上所有通道也跟着关闭。

一般的,我们会给每个线程打开一个新的通道进行通信。

2.10 虚拟主机

为了让单个代理可以托管多个隔离的环境(用户组、交换机、队列等),AMQP中提供了虚拟主机,这类似于许多流行的Web服务器使用的虚拟主机。协议客户端在连接协商期间需要指定想要使用的虚拟主机。

2.11. AMQP Client架构

推荐的AQMP Client架构须由下面多个抽象层组成:

消息队列那么多,为什么建议深入了解下RabbitMQ?

AMQP就介绍到这里了,接下来Java架构杂谈带大家详细看看RabbitMQ。

3. RabbitMQ架构

RabbitMQ的整体架构如下图所示:

消息队列那么多,为什么建议深入了解下RabbitMQ?

Broker:Broker中按虚拟主机(virtual host)划分,每个虚拟主机下面有自己的交换机(exchange)和消息队列(queue),以及交换机和队列的绑定routing_key(有些人会把这个key称为binding_key);

生产端:一般地,同一个客户端(client)里面的每个生产者(producer)创建一个专门的通道(channel),复用同一个TCP连接(connection),每个生产者可以往Broker发布消息,发布消息的时候,需指定虚拟主机,以及虚拟主机上的交换机,并且消息需要带上routing_key;

消费端:一般地,同一个客户端(client)里面的每个消费者(consumer)创建一个专门的通道(channel),复用同一个TCP连接,每个消费者指定一个消息队列进行消费。同一个消息队列,可以有多个消费者共同消费,但消息队列里面的同一条消息,只会由一个消费者消费,多个消费者相当于给消息队列做了负载均衡。

针对默认交换机直连交换机主题交换机生产端带入的routing_key交换机与队列之间绑定的routing_key(binding_key)进行匹配,匹配上了,就把消息投递给对应的消息队列。

针对扇形交换机,直接把消息投递给所有与扇形交换机绑定的队列。

rabbitmqctl是管理RabbitMQ服务器节点的主要命令行工具,相关完成命令介绍参考:rabbitmqctl(8)[6]

4. RabbitMQ特性

4.1 消息ACK机制[7]

ACK (Acknowledge character),即是确认字符,消息的接收方需要告诉发送方已确认接收消息,这是实现可靠消息投递的必备特性。

MQ系统中,涉及到ACK的流程如下图所示:

消息队列那么多,为什么建议深入了解下RabbitMQ?

4.1.1 生产端ACK之CONFIRM消息机制

如上图所示:

如果Producer没有收到ack,那么可以重发消息,直到收到ack为止。为了避免无限的给Broker投递消息,应该设置一个重试上限,并记录下发送失败的消息。在这个过程中,MQ Server可能会收到重复消息。

在RabbitMQ中,生产端的ACK通过ConfirmListener机制来实现:

消息队列那么多,为什么建议深入了解下RabbitMQ?

在channel中开启确认模式confirmSelect(),然后在channel中添加监听,用来监听Broker返回的应答。

Broker何时给生产端发送ACK?

对于不可路由的消息,一旦交换机验证消息不会路由到任何队列,Broker将发出ack,如果开启了Return消息机制(下一小节讲解),那么Broker会先发送basic.return消息给客户端,再发送basic.ack消息。示例代码如下:

String message = "Hello itzhai.com...."; // Confirm消息机制 channel.addConfirmListener(new TestConfirmListener()); // Return消息机制 channel.addReturnListener(new TestRetrunListener()); // 错误的路由键,但交换机的名称正确 String errorRoutingKey = "itzhai.com.test1"; boolean mandatory = true; channel.basicPublish(exchangeName, errorRoutingKey, mandatory, basicProperties, message.getBytes());

执行以上代码,生产者将依次收到basic.return(Return消息),basic.ack(Confirm消息)。

消息队列那么多,为什么建议深入了解下RabbitMQ?

对于可路由的消息,当所有队列都接收到消息的之后,Broker向生产端发送ACK。如果路由到的是持久队列,并且是持久消息,那么这个ACK就意味着消息持久化到了磁盘。

也就是说,路由到持久队列持久消息的ACK将在将消息持久化到磁盘后发送

RabbitMQ消息持久化的性能如何?

RabbitMQ持久化消息的刷盘策略:为了尽可能减少fsync(2)的调用次数,RabbitMQ在间隔一段时间(几百毫秒)或者在队列空闲的时候将消息分批保存到磁盘中。

这就意味着,在正常的负载下,生产端接收Broker的ACK时延可达几百毫秒。为了提高吞吐量,强烈建议生产端应用程序异步处理ACK,或者批量发布消息,并等待ACK。

4.1.2 生产端ACK之RETURN消息机制

Return消息机制用于处理一些不可路由的消息。发送消息的时候,如果指定的routing_key路由不到队列,这个时候就可以通过ReturnListener监听这种异常情况。

4.1.3 消费端ACK

消息队列那么多,为什么建议深入了解下RabbitMQ?

如上图所示:

当Broker一直没有收到消费端的ACK,则会重发消息,这个过程一般采用指数退避策略,时间间隔按指数增长。

Rabbit中的消费端ACK

在RabbitMQ中,消费端的ACK可以是自动的,或者手动的。

手动ACK签收

通过以下方法关闭自动ack签收(入参autoAck设置为false):

Channel.java String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

然后自定义一个支持ack的Consumer:

public class TestAckConsumer extends DefaultConsumer { ... public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try{ ... // 成功消费的ack boolean multiple = false; channel.basicAck(envelope.getDeliveryTag(), multiple); }catch (Exception e) { // 未成功消费的ack,设置为不重回队列,即立刻删除消息 boolean multiple = false; boolean requeue = false; channel.basicNack(envelope.getDeliveryTag(), multiple, requeue); } } }

channel中有三种ack相关的方法:

basic.nack与basic.reject的区别就是,basic.nack支持批量手动确认,basic.nack是RabbitMQ对AMQP 0-9-1协议的扩展。

自动ACK签收

使用自动确认模式,消息在发送之后就立刻被标记为投递消费成功。如果消费者的TCP连接或者通道在真正投递成功之前就关闭了,那么Broker发送的消息将会丢失。自动确认模式是以降低消息投递的可靠性来换取更高的消费端吞吐量(只要消费端处理速度能够跟上)

如何避免消费过载的问题(消费端限流)?

使用自动模式可以提高吞吐量,但是前提是消费端要能够处理得过来,如果处理不过来,就会在消费端的内存中积压消息,直至把内存耗尽。因此,自动确认模式仅推荐用于能够以稳定的速度高效地处理消息的消费者。

为了避免消费过载问题,我们一般使用手动确认模式,配合通道预取限制一起使用:

// 每条消息的大小限制,0表示不限制 int prefetchSize = 0; // MQ Server每次推送的消息的最大条数,0表示不限制 int prefetchCount = 1; // true 表示配置应用于整个通道,false表示只应用于消费级别 boolean global = false; channel.basicQos(prefetchSize, prefetchCount, global); // 队列名称 String queueName = "com.itzhai.queue"; // 设置为手动确认模式 boolean autoAck = false; // 消费者对象实例 Consumer consumer = new ItzhaiTestConsumer(channel); channel.basicConsume(queueName, autoAck, consumer);

如何提高手动ACK签收的效率

如果不需要严格控制发送消费端ACK的时间,即,只要消费者成功接收到消息,不管有没有消费成功,都允许进行ACK回复,那么就可以通过批量ACK签收的功能更来提高签收的消效率。做法如下:

// 手动签收模式 boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // 注意,这里设置为批量签收 boolean mutiple = true; // 签收deliveryTag以及deliveryTag之前的所有消息 channel.basicAck(deliveryTag, mutiple); } });

这样执行basicAck,deliveryTag以及deliveryTag之前的所有消息都将会被签收。

消息队列那么多,为什么建议深入了解下RabbitMQ?

什么时候需要让消息重回队列?

有时候消费者太繁忙导致无法立即处理接收到的消息,但是其他实例可能可以处理。这种情况,就可以拒绝消息,并且让消息重回队列。

另外,可以使用channel.basicNack方法一次拒绝或者重新排队多条消息:

// 指定批量拒绝策略 boolean multiple = true; // 指定拒绝之后重新入队 boolean requeue = true; channel.basicNack(envelope.getDeliveryTag(), multiple, requeue);

极端情况下,如果所有消费者因为暂时无法处理接收的消息,会导致消息不断的循环重回入队,导致消耗网络带宽和CPU资源。为了避免这种情况,可以跟踪重回队列的消息数量,决定是否需要永久拒绝消息(丢弃消息)还是延迟重回队列的时间。

4.2 消息的顺序性能够得到保证吗?

一般情况下,在单个通道上发布的消息,Rabbit会按照消息发布的相通顺序向生产端发送ACK消息,但也不是绝对的。发布ACK的确切时刻取决于消息的传递模式(持久化或瞬时),以及消息路由到的队列的属性。也就是说,不同的消息在不同的时间准备好进行确认,确认消息可以以不同的顺序达到。所以,应用程序尽可能不要依赖于消息的顺序性。

4.3 消息处理的幂等性如何处理?

无论是生产端还是消费端的ACK,都有可能因为网络或者程序问题导致ACK消息没有及时送达,这个时候会导致重复的消息投递。如何保证消费同一条消息的情况下不影响业务,这就需要保证消息处理的幂等性。

也就是说,针对同一条消息,无论消费者消费多少次,产生的效果始终应该跟消费一次的保持一致,并且返回的ACK结果也是一致的。

常用的实现消息处理幂等性的方法:

4.4 死信队列[8]

如果消息队列中的消息没有被正常消费掉,那么该消息就会成为一个死信(Dead Letter),这条消息可以被重新发送到另一个交换机上,后面这个交换机就是死信交换机(DLX),死信交换机绑定的队列就是死信队列。在以下情况下导致的消息未被正常消费,均会使消息变为死信:

死信队列也是一个正常的交换机,它可以是任何常见的交换机类型,与常规交换机声明没有区别。

DLX可以有客户端使用队列参数(arguments)进行定义,或者在服务器中使用策略(policy)进行定义,在policy和arguments都定义了的情况下,arguments中指定的那个会否决policy中指定的那个。

通过policy启用死信队列:

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

通过arguments启用死信队列:

// 声明一个交换机,作为死信交换机 channel.exchangeDeclare("some.exchange.name", "direct"); Map args = new HashMap(); args.put("x-dead-letter-exchange", "some.exchange.name"); channel.queueDeclare("myqueue", false, false, false, args);

5. 持久化消息就意味着消息的可靠性吗?如何实现可靠性投递?

消息可靠性需要考虑生产端投递消息的可靠性以及保证消费端最终成功地消费消息。

虽然通过生产端的ACK机制,可以确保消息成功的投递到了RabbitMQ中,保证投递的消息不丢失。但是如果生产端不知道消费者究竟有没有成功的消费了消息,那也就无法实现可靠性投递了。

而生产端投递消息的过程中,通常会涉及到生产端的事务提交,要保证消息跟随事务提交而发送,也是需要考虑的问题。

如何实现可靠投递呢?这里留给大家思考,关键设计要点:

提示的还不够具体?我再上一张图:

消息队列那么多,为什么建议深入了解下RabbitMQ?

有更好的方案的朋友,欢迎在评论区留言交流,也许你就是评论区最靓的仔。

6. RabbitMQ更多使用场景

通过给消息设置TTL,超时时候放入死信队列进行处理,可以实现延迟队列,当然,RabbitMQ也有专门的延迟队列插件可以使用;

另外,也可以使用RabbitMQ模拟RPC调用,参考上一节实现消息可靠性投递的例子。

更多的使用场景欢迎大家进行补充。

关于更多消息中间件的文章,欢迎关注Java架构杂谈,或者我的博客IT宅(itzhai.com),我会持续的输出相关内容。


我精心整理了一份Redis宝典给大家,涵盖了Redis的方方面面,面试官懂的里面有,面试官不懂的里面也有,有了它,不怕面试官连环问,就怕面试官一上来就问你Redis的Redo Log是干啥的?毕竟这种问题我也不会。

消息队列那么多,为什么建议深入了解下RabbitMQ?

展开阅读全文

页面更新:2024-02-18

标签:队列   死信   消息   扇形   路由   绑定   交换机   应用程序   持久   中间件   属性   通道   消费者   协议   模式   建议   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top