0


RabbitMQ的介绍

一、什么是MQ

MQ

(message queue),从字面意思上看就个 FIFO 先入先出的队列,只不过队列中存放的内容是 message 而已,它是一种具有接收数据、存储数据、发送数据等功能的技术服务。

消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。

二、为什么使用MQ

主要有三个作用:

  • 解耦。如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

  • 异步。如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。

  • 削峰。如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃

三、常用的MQ

1️⃣ ActiveMQ

优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。

2️⃣ Kafka

大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。

优点:性能卓越,吞吐量高,单机写入 TPS 约在百万条/秒,时效性 ms 级,可用性非常高;其次 kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据导致服务不可用,消费者采用 Pull 方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次。此外 kafka 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager,在日志领域比较成熟,被多家公司和多个开源项目使用;最后 kafka 在功能支持方便面它功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用。
缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢;

选用场景:Kafka 主要特点是基于Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。

3️⃣ RocketMQ

RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。

优点:单机吞吐量十万级,可用性非常高,采用分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,采用 java 语言实现。
缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在MQ核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码。

选用场景:天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。

4️⃣ RabbitMQ

2007 年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备、健壮、稳定、易用、跨平台、支持多种语言如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高。
缺点:商业版需要收费,学习成本较高。

选用场景:结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。

四、消息队列协议

什么是协议
协议:是在TCP/IP协议基础之上构建的种约定成的规范和机制,目的是让客户端进行沟通和通讯。并且这种协议下规范必须具有持久性,高可用,高可靠的性能。

为什么不直接采用TCP/IP协议去传递消息?因为TCP/IP协议太过于简单,并不能承载消息的内容和载体,因此在此之上增加一些内容,给消息的传递分发高可用提供基础。

我们知道消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,是采用底层的TCP/IP,UDP协议还是在这基础上自己构建等,而这些约定成俗的规范就称之为:协议。

所谓协议是指:

  1. 计算机底层操作系统和应用程序通讯时共同遵守的组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流。
  2. 和一般的网络应用程序的不同,它主要负责数据的接受和传递,所以性能比较的高。
  3. 协议对数据格式和计算机之间交换数据都必须严格遵守规范。

网络协议的三要素

  • 语法:语法是用户数据与控制信息的结构与格式,以及数据出现的顺序。
  • 语义:语义是解控制信息每个部分的意义。它规定了需要发出何种控制信息以及完成的动作与做出什么样的响应。
  • 时序:时序是对事件发生顺序的详细说明。

类比http请求协议

  1. 语法:htp规定了请求报文和响应报文的格式
  2. 语义:客户端主动发起请求称之为请求。(这是一种定义,同时你发起的是post/get请求)
  3. 时序:一个请求对应个响应。(定先有请求在有响应,这个是时序)

而消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka、OpenMessage协议

面试题:为什么消息中间件不直接使用http协议呢?

  1. 因为http请求报文头和响应报文头是比较复杂的,包含了cookie、数据的加密解密、状态码、晌应码等附加的功能,但是对于个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就够,要追求的是高性能。尽量简洁,快速。
  2. 大部分情况下http都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。

常用消息中间件协议

1. AMQP协议(Advanced Message Queuing Protocol—高级消息队列协议)

它由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

特性:分布式事务支、消息的持久化支持、高性能和高可靠的消息处理优势

AMQP典型的实现者是RabbitMQ、ACTIVEMQ等,其中RabbitMQ由Erlang开发

2. MQTT协议(Message Queueing Telemetry Transport—消息队列遥测传输协议)

它是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

特点:轻量、结构简单、传输快、不支持事务、没有持久化设计

应用场景:适用于计算能力有限、低带宽、网络不稳定的场景

支持者:RabbitMQ、ACTIVEMQ(默认情况下关闭,需要打开)

3. OpenMessage协议

是近几年由阿里、雅虎和滴滴出行、 Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。

特点:结构简单、解析速度快、支持事务和持久化设计

4. Kafka协议

基于TCP/IP的二进制协议。消息内部是通过长度来分割,由些基本数据类型组成。

特点:结构简单、解析速度快、无事务支持、有持久化设计

五、Rabbit快速入门

5.1、RabbitMQ 概念

RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于:它不处理快件而是接收,存储和转发消息数据。

5.2、RabbitMQ 特点

RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些RabbitMQ的特点,官网可查:

  • 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。

5.3、AMQP 协议

RabbitMQ是一种遵循

AMQP

协议的分布式消息中间件。

AMQP

全称 “Advanced Message Queuing Protocol”,高级消息队列协议。它是应用层协议的一个开发标准,为面向消息的中间件设计。

5.4、RabbitMQ 架构组成

  • Broker:就是 RabbitMQ 服务,用于接收和分发消息,接受客户端的连接,实现 AMQP 实体服务。
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange 或 queue 等。
  • Connection:连接,生产者/消费者与 Broker 之间的 TCP 网络连接。
  • Channel:网络信道,如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立连接的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销。
  • Message:消息,服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
  • Virtual Host:虚拟节点,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queue,同一个虚拟主机里面不能有相同名字的Exchange
  • Exchange:交换机,是 message 到达 broker 的第一站,用于根据分发规则、匹配查询表中的 routing key,分发消息到 queue 中去,不具备消息存储的功能。常用的类型有:direct、topic、fanout。
  • Bindings:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
  • Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息
  • Queue:消息队列,保存消息并将它们转发给消费者进行消费。

5.5、RabbitMQ 核心模块介绍


其中包含几个概念:

publisher:生产者,也就是发送消息的一方 (发送给交换机)

consumer:消费者,也就是消费消息的一方(和队列进行绑定(监听))

queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

    交换机只能路由消息,无法存储消息
     交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定 

virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,因为RabiitMQ性能很强,单个项目使用会造成巨大的浪费,所以多个项目,实现一套MQ,virtual host就是为了不同交换机产生隔离(和容器概念一样)

5.6、RabbitMQ 角色分类

  1. 超级管理员(administrator)- 权限:可登录管理控制台(在启用management plugin的情况下),查看所有的信息,包括用户、队列、交换机等,并且可以对用户、策略(policy)进行操作。- 说明:拥有RabbitMQ系统的最高权限,可以执行任何管理任务。
  2. 监控者(monitoring)- 权限:可登录管理控制台(在启用management plugin的情况下),同时可以查看RabbitMQ节点的相关信息,如进程数、内存使用情况、磁盘使用情况等。但无法对用户和策略进行操作。- 说明:适合需要监控RabbitMQ系统状态但不需要修改配置或用户信息的用户。
  3. 策略制定者(policymaker)- 权限:可登录管理控制台(在启用management plugin的情况下),同时可以对策略(policy)进行管理。但无法查看用户信息或修改用户权限。- 说明:适合需要管理RabbitMQ系统策略(如消息路由策略、队列策略等)的用户。
  4. 普通管理者(management)- 权限:仅可登录管理控制台(在启用management plugin的情况下),但无法看到节点信息,也无法对策略进行管理。通常可以操作exchange、queue等,但权限相对较低。- 说明:适合需要查看和管理自己相关的RabbitMQ资源(如队列、交换机等),但不需要访问系统级信息的用户。
  5. 其他(none或普通用户)- 权限:无法登录管理控制台,通常就是普通的生产者和消费者,通过AMQP协议与RabbitMQ进行交互。- 说明:这类用户主要关注于消息的发送和接收,不需要访问RabbitMQ的管理控制台。

5.7、RabbitMQ 消息模式

5.7.1、 简单模式

  • 组成:一个生产者、一个队列、一个消费者。
  • 工作流程:生产者将消息发送到队列,消费者从队列中获取消息。队列是存储消息的缓冲区,可以缓存消息。
  • 特点:适用于简单的消息传递场景,如日志收集、任务分发等。
5.7.2、 工作模式

  • 组成:一个生产者、一个队列、多个消费者。
  • 工作流程:生产者将消息发送到队列,多个消费者监听同一个队列并竞争获取消息。
  • 特点:通过多个消费者来提高消息处理效率,实现负载均衡。消费者之间公平地获取消息,避免某个消费者过载。
5.7.3、 发布订阅模式

  • 组成:一个生产者、一个交换机(Exchange)、多个队列、多个消费者。
  • 工作流程:生产者将消息发送到交换机,交换机以广播的形式将消息发送到所有绑定的队列,每个队列的消费者从自己的队列中获取消息。
  • 特点:适用于需要广播消息的场景,如实时消息推送、系统通知等。但无法对消息进行过滤,所有消费者都会收到相同的消息。
5.7.4、 路由模式

  • 组成:一个生产者、一个交换机(Direct类型)、多个队列、多个消费者。
  • 工作流程:生产者将消息发送到交换机,并指定一个路由键(RoutingKey)。交换机根据路由键将消息发送到匹配的队列,消费者从自己的队列中获取消息。
  • 特点:通过路由键实现消息的定向分发,将不同消息路由到不同的队列。适用于需要将不同消息路由到不同处理流程的场景。
5.7.5、 主题模式

  • 组成:一个生产者、一个交换机(Topic类型)、多个队列、多个消费者。
  • 工作流程:生产者将消息发送到交换机,并指定一个路由键。交换机根据路由键和队列的绑定规则(使用通配符)将消息发送到匹配的队列,消费者从自己的队列中获取消息。
  • 特点:支持更复杂的消息路由规则,通过通配符(*匹配一个单词,#匹配一个或多个单词)实现灵活的消息分发。适用于需要根据不同话题或事件类型路由消息的场景。

六、RabbitMQ 交换机

6.1、交换机的工作原理

在RabbitMQ中,生产者发送消息时不会直接将消息投递到队列中,而是先将消息投递到交换机中。交换机根据消息的路由键(Routing Key)和交换机类型,决定将消息路由到哪个队列。队列再将消息以推送或拉取的方式发送给消费者。

6.2、交换机的类型

RabbitMQ提供了多种类型的交换机,每种类型都有其独特的路由机制和应用场景,主要包括以下几种:

  1. 直连交换机(Direct Exchange)- 路由规则:消息的路由键与队列绑定时指定的路由键完全匹配。- 特点:简单直接,适用于路由键与队列一一对应的情况。
  2. 主题交换机(Topic Exchange)- 路由规则:使用通配符(和#)进行模糊匹配。匹配一个单词,#匹配零个或多个单词。- 特点:灵活性强,适用于需要将消息根据主题分类发送到多个队列的情况。
  3. 扇形交换机(Fanout Exchange)- 路由规则:忽略路由键,将消息广播到所有与交换机绑定的队列。- 特点:适用于实现发布-订阅模式,将消息广播给多个消费者。
  4. 首部交换机(Headers Exchange)- 路由规则:根据消息的头部信息(Headers)进行匹配。- 特点:提供了更灵活的匹配方式,但相对复杂度较高,使用较少。
  5. 默认交换机(Default Exchange)- 特点:默认交换机是一个隐式存在的直连交换机,无需显式声明。它使用队列名作为路由键。

七、RabbitMQ 机制

RabbitMQ的机制主要围绕消息传递和队列管理展开,它采用AMQP(高级消息队列协议)作为其消息传递协议,并通过一系列组件和策略来确保消息的可靠传递和高性能处理。

7.1、消息应答

默认情况下,RabbitMQ 一旦向消费者发送了一条消息后,便立即将该消息标记为删除。由于消费者处理一个消息可能需要一段时间,假如在处理消息中途消费者挂掉了,我们会丢失其正在处理的消息以及后续发送给该消费这的消息。

为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答意思就是:消费者在接收消息并且处理完该消息之后,才告知 RabbitMQ 可以把该消息删除了。

RabbitMQ提供了两种消息应答方式:自动应答(Automatic Acknowledgment)和手动应答(Manual Acknowledgment)。

自动应答

自动应答是RabbitMQ的默认行为。当消费者接收到消息时,RabbitMQ会立即认为该消息已经被成功处理,并将其从队列中移除,而无需消费者显式地发送任何确认信号。这种方式简单快捷,但存在消息丢失的风险,因为如果消费者在处理消息时发生崩溃或异常,那么这些消息将无法恢复。

在自动应答模式下,你不需要编写任何额外的代码来发送确认信号,因为RabbitMQ会自动为你处理。但是,为了说明如何设置自动应答,我们通常会看到在创建消费者时设置

autoAck

参数为

true

(尽管这是默认值)。

// ...(省略连接和通道创建的代码)  
  
boolean autoAck = true; // 设置为true以启用自动应答(这实际上是默认值)  
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });  
  
// DeliverCallback的实现...  
// 注意:在自动应答模式下,你不需要在DeliverCallback中调用channel.basicAck()

然而,由于自动应答是默认行为,因此在实际代码中,你很少会看到显式地将

autoAck

设置为

true

的示例。

所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

手动应答

手动应答要求消费者在成功处理消息后,显式地向RabbitMQ发送确认信号(通过调用

channel.basicAck()

方法)。RabbitMQ在收到确认信号之前,会将消息保留在队列中,并可能将其重新发送给其他消费者(如果启用了消息重新入队)。这种方式提高了消息的可靠性,但增加了代码的复杂性。

在手动应答模式下,你需要在消费者的消息处理逻辑中调用

channel.basicAck()

方法来发送确认信号。

// ...(省略连接和通道创建的代码)  
  
boolean autoAck = false; // 设置为false以启用手动应答  
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });  
  
// DeliverCallback的实现  
DeliverCallback deliverCallback = (consumerTag, delivery) -> {  
    String message = new String(delivery.getBody(), "UTF-8");  
    System.out.println(" [x] Received '" + message + "'");  
  
    // 模拟消息处理过程  
    try {  
        // ...(处理消息的逻辑)  
        // 假设处理成功  
  
        // 发送确认信号  
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
    } catch (IOException e) {  
        e.printStackTrace();  
        // 如果处理失败,可以选择发送nack或reject来拒绝消息  
        // channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);  
        // 或者  
        // channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);  
    }  
};

在上面的代码中,

DeliverCallback

的实现包含了消息处理逻辑和确认信号的发送。如果消息处理成功,则调用

channel.basicAck()

来发送确认信号;如果处理失败,则可以选择调用

channel.basicNack()

channel.basicReject()

来拒绝消息,并可以选择是否将消息重新入队。

请注意,在手动应答模式下,如果消费者在处理消息时发生崩溃或异常,并且没有发送确认信号,那么RabbitMQ将认为这些消息尚未被处理,并可能将它们重新发送给其他消费者(取决于你的队列和交换机配置)。

7.2、持久化

前面我们通过手动应答处理了消息丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它会清空队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。

队列持久化

队列持久化是指将队列的定义存储在磁盘上,以便在 RabbitMQ 服务器重启后队列仍然存在。默认情况下,队列是内存中的,并且在 RabbitMQ 服务器重启后会自动消失。

在 Java 代码中,你可以通过指定

durable

参数为

true

来创建一个持久化的队列。

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Send {  
  
    private final static String QUEUE_NAME = "hello_durable";  
  
    public static void main(String[] argv) throws Exception {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        try (Connection connection = factory.newConnection();  
             Channel channel = connection.createChannel()) {  
  
            // 声明一个持久化的队列  
            boolean durable = true;  
            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);  
  
            String message = "Hello World!";  
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
            System.out.println(" [x] Sent '" + message + "'");  
        }  
    }  
}

注意:在上面的代码中,

queueDeclare

方法的第二个参数是

durable

,设置为

true

表示队列将被持久化。但是,即使队列是持久的,如果发布的消息不是持久的,那么在 RabbitMQ 重启后这些消息仍然会丢失。

消息持久化

消息持久化是指将消息存储在磁盘上,以确保即使在 RabbitMQ 服务器崩溃或重启后,消息也不会丢失。

在 Java 代码中,要发布持久化的消息,你需要确保队列是持久的(如上所示),并且还需要在发布消息时将

deliveryMode

设置为

2

MessageProperties.PERSISTENT_TEXT_PLAIN

也可以,但它是

2

的包装器)。

import com.rabbitmq.client.AMQP.BasicProperties;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Send {  
  
    private final static String QUEUE_NAME = "hello_durable";  
  
    public static void main(String[] argv) throws Exception {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        try (Connection connection = factory.newConnection();  
             Channel channel = connection.createChannel()) {  
  
            boolean durable = true;  
            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);  
  
            String message = "Hello World!";  
            // 设置消息属性为持久化  
            BasicProperties props = new BasicProperties.Builder()  
                .deliveryMode(2) // 消息持久化  
                .contentType("text/plain")  
                .build();  
  
            channel.basicPublish("", QUEUE_NAME, props, message.getBytes());  
            System.out.println(" [x] Sent '" + message + "'");  
        }  
    }  
}

在上面的代码中,我们创建了一个

BasicProperties

对象,并将其

deliveryMode

设置为

2

,这表示消息将被持久化。然后,我们使用这个属性对象作为

basicPublish

方法的一个参数来发布消息。

请注意,即使队列和消息都被设置为持久化,RabbitMQ 也不能保证消息在发布后立即写入磁盘。它依赖于操作系统的调度和 RabbitMQ 的内部机制来决定何时将内存中的数据写入磁盘。因此,在极端情况下(如 RabbitMQ 服务器在消息写入磁盘之前崩溃),仍然有可能丢失一些消息。但是,通过持久化,你可以大大降低这种风险。

7.3、死信队列

📄 死信队列的概念:

  • 死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:

  • 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
  • 还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的原因:

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

消息被拒绝、消息TTL过期、队列达到最大长度。架构图如下所示:

image-20211128143849075

7.4、延迟队列

概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

使用场景

  1. 订单在十分钟之内未支付则自动取消。
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

RabbitMQ 中的 TTL

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL:

1️⃣ 消息设置TTL

便是针对每条消息设置TTL

2️⃣ 队列设置TTL

创建队列的时候设置队列的“x-message-ttl”属性

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)。而如果仅设置消息的 TTL 属性,即使消息过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;

还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

八、RabbitMQ如何在Docker部署

** 想要了解RabbitMQ如何在Docker里面安装启动可以查看下面这个链接**

Docker安装RabbitMq教程-CSDN博客

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/Heyi3416/article/details/141164140
版权归原作者 鹤一胖了 所有, 如有侵权,请联系我们删除。

“RabbitMQ的介绍”的评论:

还没有评论