0


RabbitMQ

文章目录

RabbitMQ

1 介绍

RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP,Advanced Message Queuing Protocol)。它设计用于在分布式系统中传递消息,提供了一种可靠的、异步的通信方式,帮助不同的应用程序或组件之间进行解耦。

以下是 RabbitMQ 的一些主要特点和概念:

  1. 消息代理(Message Broker): RabbitMQ 充当消息代理,负责接收、存储和转发消息。
  2. 消息队列(Message Queue): RabbitMQ 使用消息队列来存储消息。生产者将消息发送到队列,然后消费者从队列中接收和处理消息。队列采用先进先出(FIFO)的原则,即先发送的消息会先被消费。
  3. 生产者(Producer): 生产者是消息的发送方,负责将消息发送到 RabbitMQ 的队列。
  4. 消费者(Consumer): 消费者是消息的接收方,负责从队列中获取消息并进行处理。
  5. 交换器(机)(Exchange): 交换机用于将消息路由到一个或多个队列。生产者将消息发送到交换机,而交换机根据规则将消息路由到相应的队列。
  6. 路由键(Routing Key): 路由键是用于指定消息路由规则的关键字。在发送消息时,生产者通过指定路由键将消息发送到交换器。在某些交换器类型中,路由键用于匹配与之绑定的队列,决定消息将被发送到哪个队列。
  7. 绑定(Binding): 绑定是交换机和队列之间的关联规则,它定义了消息应该如何从交换机路由到队列。
  8. 虚拟主机(Virtual Host): 虚拟主机提供了一种逻辑隔离机制,允许在同一物理服务器上运行多个独立的消息代理。
  9. 持久化(Durable): RabbitMQ 允许将队列和消息标记为持久的,确保在代理重启时消息不会丢失。

使用 RabbitMQ 可以有效地处理系统之间的异步通信,提高系统的可伸缩性和可维护性。它在分布式系统、微服务架构和异步任务处理等场景中广泛应用。

工作原理图:

在这里插入图片描述

1.1 为什么使用 RabbitMQ

RabbitMQ 提供了以下优势:

  • 解耦与可靠性: 通过消息队列,系统的不同部分可以独立工作,提高可维护性和可扩展性。消息的可靠传递确保消息不会丢失,即使某个组件不可用。
  • 异步通信: 消息队列支持异步通信,生产者将消息发送到队列,而消费者从队列中接收并处理消息,实现了松耦合和高效通信。
  • 处理负载峰值: RabbitMQ 能够缓冲和调整消息流,有助于处理系统中的负载峰值,防止系统过载。
  • 消息路由与灵活性: 不同类型的交换器使得消息能够以灵活的方式进行路由,满足多样化的应用场景。

1.2 RabbitMQ 的关键特性

  • 多种交换器类型: 包括直连、扇出、主题和头交换器,支持不同的消息路由策略。
  • 消息持久化: RabbitMQ 允许将消息和队列标记为持久的,确保消息不会在代理重启时丢失。
  • 灵活的消息路由: 使用 Routing Key 和交换器,可以根据需求定义复杂的消息路由规则。
  • 可扩展性与集群支持: RabbitMQ 提供了水平扩展的能力,支持构建高可用性的集群。
  • 安全性: 支持虚拟主机,提供权限控制和加密传输,确保消息的安全性。

2 RabbitMQ 安装与配置

2.1 先安装Docker

安装gcc

yum -yinstall gcc gcc-c++

安装软件包

yum install-y yum-utils device-mapper-persistent-data lvm2

设置镜像仓库

yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

更新yum

yum makecache fast

安装免费版本的docker-ce

yum -yinstall docker-ce

启动docker

systemctl start docker

入门hello-world

docker run hello-world

在这里插入图片描述

证明docker安装成功!

2.2 配置RabbitMQ

先执行

docker search rabbitmq:management

在这里插入图片描述

拉取镜像

docker pull macintoshplus/rabbitmq-management

查看镜像

docker images

在这里插入图片描述

创建并运行一个RabbitMQ容器:

设置容器的主机名为kdxing,设置容器指定名称为 rabbitmq,设置RabbitMQ的默认用户名和密码,

将容器的15672端口映射到主机的15672端口,15672端口是RabbitMQ的Web管理界面端口。

将容器的5672端口映射到主机的5672端口,5672端口是RabbitMQ的AMQP协议端口。

设置Docker镜像的名称或ID为c20

docker run -d--hostname kdxing --name rabbitmq -erabbitmq_default_user=guest -erabbitmq_user_pass=guest -p15672:15672 -p5672:5672 c20

查看容器

dockerps-a

在这里插入图片描述

然后打开浏览器输入http://192.168.64.128:15672,界面如下:

在这里插入图片描述

输入用户名和密码guest进入RabbitMQ的Web管理界面:

在这里插入图片描述

此时,RabbitMQ配置成功!

3 Spring AMQP入门案例

3.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

3.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

spring.application.name=mq-demo01

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 自定义一个属性设置队列
mq.queue.name=hello-queue01

3.3 创建生产者

创建一个生产者类,用于发送消息到 RabbitMQ:

packagecom.kdx.provider;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;@ComponentpublicclassSender{@AutowiredprivateAmqpTemplate amqpTemplate;@Value("${mq.queue.name}")privateString queueName;//发送消息publicvoidsend(String msg){
        amqpTemplate.convertAndSend(queueName,msg);}}

3.4 创建消费者

创建一个消费者类,用于接收并处理从 RabbitMQ 收到的消息:

packagecom.kdx.consumer;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;//接收Consumer消息的消费者@ComponentpublicclassReceiver{@RabbitListener(queues ={"${mq.queue.name}"})publicvoidprocess(String msg){System.out.println("Receiver:"+ msg);}}

3.5 创建配置类

packagecom.kdx.config;importorg.springframework.amqp.core.Queue;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassQueueConfig{@Value("${mq.queue.name}")privateString queueName;@BeanpublicQueuecreateQueue(){returnnewQueue(queueName);}}

3.6 测试

packagecom.kdx;importcom.kdx.provider.Sender;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassMqDemo01ApplicationTests{@AutowiredprivateSender sender;@TestvoidtestSend(){
        sender.send("RabbitMQ入门案例");}}

启动Application,测试testSend()方法,查看控制台:

在这里插入图片描述

发送消息和接收消息成功!

查看RabbitMQ的Web管理界面,点击队列,发现hello-queue01:

在这里插入图片描述

4 交换器(Exchange)类型

交换器是消息的分发中心,负责将消息路由到一个或多个队列。生产者将消息发送到交换器,而交换器根据规则将消息发送到与之绑定的队列。不同类型的交换器定义了不同的路由策略,包括直连交换器、扇出交换器、主题交换器和头交换器。

在 RabbitMQ 中,有几种不同类型的交换器(Exchange Types),每种类型都定义了不同的消息路由规则。以下是 RabbitMQ 支持的主要交换器类型:

4.1 Direct Exchange(直连交换器)

  • 根据消息的路由键(Routing Key)将消息直接发送到指定队列。
  • 在发送消息时,指定的 Routing Key 必须与队列绑定时指定的 Routing Key 相匹配。

4.2 Fanout Exchange(扇出交换器)

  • 将消息广播到绑定到该交换器的所有队列,无论消息的 Routing Key 是什么。
  • 不关心消息的 Routing Key,消息会被发送到所有与交换器绑定的队列。

4.3 Topic Exchange(主题交换器)

  • 使用通配符的方式进行消息的路由。
  • 在发送消息时,可以使用通配符模式匹配 Routing Key,将消息发送到与模式匹配的队列。
  • 通配符有两种:* 匹配一个单词,# 匹配零个或多个单词。

4.4 Headers Exchange(头交换器)

  • 根据消息的头部属性来进行路由。
  • 在发送消息时,可以通过设置消息的头部属性,交换器会根据头部属性匹配规则将消息发送到对应的队列。

4.5 System Exchange(默认交换器)

  • 默认交换器是一个特殊的直连交换器,无需指定交换器的名称。
  • 当消息的 Routing Key 与队列的名称匹配时,消息会被发送到该队列。

选择交换器类型取决于消息路由需求。例如,如果希望将消息直接发送到指定队列,可以选择 Direct Exchange;如果希望消息广播到所有队列,可以选择 Fanout Exchange;如果需要根据复杂的条件进行消息路由,可以选择 Topic Exchange 或 Headers Exchange。

面试题

RabbitMQ为什么需要信道? 为什么不是TCP直接通道 ?

  1. Tcp创建和销毁开销特别大。
  2. 如果不用信道,大量的请求过来,会造成性能的瓶颈。
  3. 信道的原理是一条线程一条信道,多条线程多条通道同用一条TCP连接。
  4. 一条TCP连接可能容纳无限的信道,处理高并发的请求不会造成性能瓶颈。

5 Direct Exchange案例

1.在consumer服务中,编写两个消费者方法,分别监听log.info和log.error

2.在publisher中编写测试方法,向log. direct发送消息

在这里插入图片描述

5.1 消费者

5.1.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

5.1.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8081spring.application.name=mq-demo02-consumer

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器mq.config.exchange=log.direct

#设置队列infomq.config.queue.info=log.info
#设置队列info的路由键mq.config.info.routing.key=log.info.routing.key

#设置队列errormq.config.queue.error=log.error
#设置队列error的路由键mq.config.error.routing.key=log.error.routing.key

5.1.3 ErrorReceiver

packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
        value =@Queue(value ="${mq.config.queue.error}",autoDelete ="false"),
        exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.DIRECT),
        key ="${mq.config.error.routing.key}"))publicclassErrorReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("errorReceiver:"+ msg);}}

5.1.4 InfoReceiver

packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
        value =@Queue(value ="${mq.config.queue.info}",autoDelete ="false"),
        exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.DIRECT),
        key ="${mq.config.info.routing.key}"))publicclassInfoReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("infoReceiver:"+ msg);}}

5.2 生产者

5.2.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

5.2.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8082

spring.application.name=mq-demo03-provider

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器
mq.config.exchange=log.direct

#设置队列info的路由键
mq.config.info.routing.key=log.info.routing.key

#设置队列error的路由键
mq.config.error.routing.key=log.error.routing.key

5.2.3 Sender

packagecom.kdx.provider;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;@ComponentpublicclassSender{@AutowiredprivateAmqpTemplate amqpTemplate;@Value("${mq.config.exchange}")privateString exchange;@Value("${mq.config.info.routing.key}")privateString routingKey1;@Value("${mq.config.error.routing.key}")privateString routingKey2;publicvoidsend1(String msg){
        amqpTemplate.convertAndSend(exchange,routingKey1,msg);}publicvoidsend2(String msg){
        amqpTemplate.convertAndSend(exchange,routingKey2,msg);}}

5.3 测试

packagecom.kdx;importcom.kdx.provider.Sender;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassMqDemo03ProviderApplicationTests{@AutowiredprivateSender sender;@TestvoidtestSend1(){
        sender.send1("hello mq 1");}@TestvoidtestSend2(){
        sender.send2("hello mq 2");}}

启动两个Application,执行testSend1和testSend2:

在这里插入图片描述

结果看到Direct交换器根据RoutingKey判断路由给哪个队列

6 Topic Exchange案例

1.在consumer服务中,编写三个消费者方法,分别监听log.info、log.error和log.all

2.在publisher中编写测试方法,向 topic发送消息

在这里插入图片描述

6.1 消费者

6.1.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

6.1.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8083spring.application.name=mq-demo05-consumer

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器mq.config.exchange=log.topic

#设置队列infomq.config.queue.info=log.info

#设置队列errormq.config.queue.error=log.error

#设置队列logsmq.config.queue.logs=log.all

6.1.3 ErrorReceiver

packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
        value =@Queue(value ="${mq.config.queue.error}",autoDelete ="false"),
        exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.TOPIC),
        key ="*.log.error"))publicclassErrorReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("errorReceiver:"+ msg);}}

6.1.4 InfoReceiver

packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
        value =@Queue(value ="${mq.config.queue.info}",autoDelete ="false"),
        exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.TOPIC),
        key ="*.log.info"))publicclassInfoReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("infoReceiver:"+ msg);}}

6.1.5 LogsReceiver

packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
        value =@Queue(value ="${mq.config.queue.logs}",autoDelete ="false"),
        exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.TOPIC),
        key ="*.log.*"))publicclassLogsReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("logsReceiver:"+ msg);}}

6.2 生产者

6.2.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

6.2.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8084spring.application.name=mq-demo04-provider

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器mq.config.exchange=log.topic

6.2.3 GoodServer

packagecom.kdx.provider;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;@ComponentpublicclassGoodServer{@AutowiredprivateAmqpTemplate amqpTemplate;@Value("${mq.config.exchange}")privateString exchange;publicvoidsend(String msg){
        amqpTemplate.convertAndSend(exchange,"good.log.debug","good.log.debug:"+ msg);
        amqpTemplate.convertAndSend(exchange,"good.log.info","good.log.info:"+ msg);
        amqpTemplate.convertAndSend(exchange,"good.log.warn","good.log.warn:"+ msg);
        amqpTemplate.convertAndSend(exchange,"good.log.error","good.log.error:"+ msg);}}

6.3 测试

packagecom.kdx;importcom.kdx.provider.GoodServer;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassMqDemo04ProviderApplicationTests{@AutowiredprivateGoodServer goodServer;@Testvoidtest1(){
        goodServer.send("hello mq");}}

启动消费者和生产者服务,执行test1():

在这里插入图片描述

结果看到使用通配符模式匹配 Routing Key,并将消息发送到与模式匹配的队列。

Topic交换器与队列绑定时的bindingKey可以指定通配符,而且Topic交换器接收的消息RoutingKey必须是多个单词,以

.

分割

7 Fanout Exchange案例

1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定

2.在consumer服务中,编写两个消费者方法,分别监听order.sms和order.push

3.在publisher中编写测试方法,向log.fanout发送消息

在这里插入图片描述

7.1 消费者

7.1.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

7.1.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8086spring.application.name=mq-demo07-consumer

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器mq.config.exchange=log.fanout

#设置队列Q1mq.config.queue.sms=order.sms

#设置队列Q2mq.config.queue.push=order.push

7.1.3 SmsReceiver

packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
        value =@Queue(value ="${mq.config.queue.sms}",autoDelete ="false"),
        exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.FANOUT)))publicclassSmsReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("sms:"+ msg);}}

7.1.4 PushReceiver

packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
        value =@Queue(value ="${mq.config.queue.push}",autoDelete ="false"),
        exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.FANOUT)))publicclassPushReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("push:"+ msg);}}

7.2 生产者

7.2.1 添加依赖

在 Maven 配置中添加 Spring AMQP 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

7.2.2 配置 RabbitMQ 连接

在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:

server.port=8085spring.application.name=mq-demo06-provider

spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器mq.config.exchange=log.fanout

7.2.3 Sender

packagecom.kdx.provider;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;@ComponentpublicclassSender{@AutowiredprivateAmqpTemplate amqpTemplate;@Value("${mq.config.exchange}")privateString exchange;publicvoidsend(String msg){
        amqpTemplate.convertAndSend(exchange,"",msg);//routingKey不写}}

7.3 测试

packagecom.kdx;importcom.kdx.provider.Sender;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassMqDemo06ProviderApplicationTests{@AutowiredprivateSender sender;@TestvoidtestSend(){
        sender.send("fanout广播");}}

启动消费者和生产者服务,测试testSend():

在这里插入图片描述

结果看到SmsReceiver和PushReceiver都接收到了交换器广播消息。

8 RabbitMQ 持久化

RabbitMQ 提供了消息的持久化机制,确保即使在 RabbitMQ 服务器重启后,消息仍然能够被恢复。这主要涉及到队列和消息的持久化。

在 5 Direct Echange的案例基础上修改测试方法

packagecom.kdx;importcom.kdx.provider.Sender;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassMqDemo03ProviderApplicationTests{@AutowiredprivateSender sender;@TestvoidtestSend1(){for(int i =1; i <1000; i++){try{Thread.sleep(2000);}catch(InterruptedException e){
                e.printStackTrace();}
            sender.send1("hello mq 1 ..."+ i);}}}

必须确保在

autoDelete = "false"


出现在Queue中:当所有的消费者客户连接断开后,是否自动删除队列。
出现在Exchange中:当所有的绑定队列都不再使用时,是否自动删除交换器。

启动消费者和生产者服务,执行testSend1()方法:

在这里插入图片描述

现在停止消费者服务,结束在

infoReceiver:hello mq 1 ...7

然后再重启消费者服务,控制台:

在这里插入图片描述

发现没有重新从

infoReceiver:hello mq 1 ...1

开始输出,而是接着7从

infoReceiver:hello mq 1 ...8

开始,这就是消息的持久化。

9 RabbitMQ ACK确认机制

RabbitMQ 的 Acknowledgment(简称 ack)机制是确保消息在消费者正确处理后才被确认的一种机制。它有助于提高消息传递的可靠性。在 RabbitMQ 中,有三种 Acknowledgment 模式:自动确认、手动确认(单条消息)、手动批量确认(foreach遍历)。

1. 自动确认模式(Automatic Acknowledgment)

在自动确认模式下,消息一旦被消费者接收,RabbitMQ 就会立即确认消息的接收。这种模式下,消费者无法明确知道消息是否被正确处理。

// 默认是自动确认模式@RabbitListener(queues ="myQueue")publicvoidhandleMessage(String message){// 处理消息的业务逻辑...}

2. 手动确认模式(Manual Acknowledgment)

在手动确认模式下,消费者需要显式地告诉 RabbitMQ 是否成功处理了消息。如果消费者成功处理消息,则调用

channel.basicAck

进行确认;如果处理失败,则可以调用

channel.basicNack

channel.basicReject

进行拒绝。

@RabbitListener(queues ="myQueue")publicvoidhandleMessage(Message message,Channel channel)throwsIOException{try{// 处理消息的业务逻辑...// 手动确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){// 处理异常,可以选择拒绝消息或者进行其他处理
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}

在手动确认模式下,消费者需要谨慎处理异常情况,以确保消息在处理失败时能够得到适当的处理。手动确认模式提供了更精细的控制,确保消息在被消费者正确处理后才被确认。


本文转载自: https://blog.csdn.net/k010908/article/details/134753453
版权归原作者 kdxing198 所有, 如有侵权,请联系我们删除。

“RabbitMQ”的评论:

还没有评论