文章目录
RabbitMQ
1 介绍
RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP,Advanced Message Queuing Protocol)。它设计用于在分布式系统中传递消息,提供了一种可靠的、异步的通信方式,帮助不同的应用程序或组件之间进行解耦。
以下是 RabbitMQ 的一些主要特点和概念:
- 消息代理(Message Broker): RabbitMQ 充当消息代理,负责接收、存储和转发消息。
- 消息队列(Message Queue): RabbitMQ 使用消息队列来存储消息。生产者将消息发送到队列,然后消费者从队列中接收和处理消息。队列采用先进先出(FIFO)的原则,即先发送的消息会先被消费。
- 生产者(Producer): 生产者是消息的发送方,负责将消息发送到 RabbitMQ 的队列。
- 消费者(Consumer): 消费者是消息的接收方,负责从队列中获取消息并进行处理。
- 交换器(机)(Exchange): 交换机用于将消息路由到一个或多个队列。生产者将消息发送到交换机,而交换机根据规则将消息路由到相应的队列。
- 路由键(Routing Key): 路由键是用于指定消息路由规则的关键字。在发送消息时,生产者通过指定路由键将消息发送到交换器。在某些交换器类型中,路由键用于匹配与之绑定的队列,决定消息将被发送到哪个队列。
- 绑定(Binding): 绑定是交换机和队列之间的关联规则,它定义了消息应该如何从交换机路由到队列。
- 虚拟主机(Virtual Host): 虚拟主机提供了一种逻辑隔离机制,允许在同一物理服务器上运行多个独立的消息代理。
- 持久化(Durable): RabbitMQ 允许将队列和消息标记为持久的,确保在代理重启时消息不会丢失。
使用 RabbitMQ 可以有效地处理系统之间的异步通信,提高系统的可伸缩性和可维护性。它在分布式系统、微服务架构和异步任务处理等场景中广泛应用。
工作原理图:
1.1 为什么使用 RabbitMQ
RabbitMQ 提供了以下优势:
- 解耦与可靠性: 通过消息队列,系统的不同部分可以独立工作,提高可维护性和可扩展性。消息的可靠传递确保消息不会丢失,即使某个组件不可用。
- 异步通信: 消息队列支持异步通信,生产者将消息发送到队列,而消费者从队列中接收并处理消息,实现了松耦合和高效通信。
- 处理负载峰值: RabbitMQ 能够缓冲和调整消息流,有助于处理系统中的负载峰值,防止系统过载。
- 消息路由与灵活性: 不同类型的交换器使得消息能够以灵活的方式进行路由,满足多样化的应用场景。
1.2 RabbitMQ 的关键特性
- 多种交换器类型: 包括直连、扇出、主题和头交换器,支持不同的消息路由策略。
- 消息持久化: RabbitMQ 允许将消息和队列标记为持久的,确保消息不会在代理重启时丢失。
- 灵活的消息路由: 使用 Routing Key 和交换器,可以根据需求定义复杂的消息路由规则。
- 可扩展性与集群支持: RabbitMQ 提供了水平扩展的能力,支持构建高可用性的集群。
- 安全性: 支持虚拟主机,提供权限控制和加密传输,确保消息的安全性。
2 RabbitMQ 安装与配置
2.1 先安装Docker
安装gcc
yum -yinstall gcc gcc-c++
安装软件包
yum install-y yum-utils device-mapper-persistent-data lvm2
设置镜像仓库
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
更新yum
yum makecache fast
安装免费版本的docker-ce
yum -yinstall docker-ce
启动docker
systemctl start docker
入门hello-world
docker run hello-world
证明docker安装成功!
2.2 配置RabbitMQ
先执行
docker search rabbitmq:management
拉取镜像
docker pull macintoshplus/rabbitmq-management
查看镜像
docker images
创建并运行一个RabbitMQ容器:
设置容器的主机名为kdxing,设置容器指定名称为 rabbitmq,设置RabbitMQ的默认用户名和密码,
将容器的15672端口映射到主机的15672端口,15672端口是RabbitMQ的Web管理界面端口。
将容器的5672端口映射到主机的5672端口,5672端口是RabbitMQ的AMQP协议端口。
设置Docker镜像的名称或ID为c20
docker run -d--hostname kdxing --name rabbitmq -erabbitmq_default_user=guest -erabbitmq_user_pass=guest -p15672:15672 -p5672:5672 c20
查看容器
dockerps-a
然后打开浏览器输入http://192.168.64.128:15672,界面如下:
输入用户名和密码guest进入RabbitMQ的Web管理界面:
此时,RabbitMQ配置成功!
3 Spring AMQP入门案例
3.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
3.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
spring.application.name=mq-demo01
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 自定义一个属性设置队列
mq.queue.name=hello-queue01
3.3 创建生产者
创建一个生产者类,用于发送消息到 RabbitMQ:
packagecom.kdx.provider;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;@ComponentpublicclassSender{@AutowiredprivateAmqpTemplate amqpTemplate;@Value("${mq.queue.name}")privateString queueName;//发送消息publicvoidsend(String msg){
amqpTemplate.convertAndSend(queueName,msg);}}
3.4 创建消费者
创建一个消费者类,用于接收并处理从 RabbitMQ 收到的消息:
packagecom.kdx.consumer;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;//接收Consumer消息的消费者@ComponentpublicclassReceiver{@RabbitListener(queues ={"${mq.queue.name}"})publicvoidprocess(String msg){System.out.println("Receiver:"+ msg);}}
3.5 创建配置类
packagecom.kdx.config;importorg.springframework.amqp.core.Queue;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassQueueConfig{@Value("${mq.queue.name}")privateString queueName;@BeanpublicQueuecreateQueue(){returnnewQueue(queueName);}}
3.6 测试
packagecom.kdx;importcom.kdx.provider.Sender;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassMqDemo01ApplicationTests{@AutowiredprivateSender sender;@TestvoidtestSend(){
sender.send("RabbitMQ入门案例");}}
启动Application,测试testSend()方法,查看控制台:
发送消息和接收消息成功!
查看RabbitMQ的Web管理界面,点击队列,发现hello-queue01:
4 交换器(Exchange)类型
交换器是消息的分发中心,负责将消息路由到一个或多个队列。生产者将消息发送到交换器,而交换器根据规则将消息发送到与之绑定的队列。不同类型的交换器定义了不同的路由策略,包括直连交换器、扇出交换器、主题交换器和头交换器。
在 RabbitMQ 中,有几种不同类型的交换器(Exchange Types),每种类型都定义了不同的消息路由规则。以下是 RabbitMQ 支持的主要交换器类型:
4.1 Direct Exchange(直连交换器)
- 根据消息的路由键(Routing Key)将消息直接发送到指定队列。
- 在发送消息时,指定的 Routing Key 必须与队列绑定时指定的 Routing Key 相匹配。
4.2 Fanout Exchange(扇出交换器)
- 将消息广播到绑定到该交换器的所有队列,无论消息的 Routing Key 是什么。
- 不关心消息的 Routing Key,消息会被发送到所有与交换器绑定的队列。
4.3 Topic Exchange(主题交换器)
- 使用通配符的方式进行消息的路由。
- 在发送消息时,可以使用通配符模式匹配 Routing Key,将消息发送到与模式匹配的队列。
- 通配符有两种:
*
匹配一个单词,#
匹配零个或多个单词。
4.4 Headers Exchange(头交换器)
- 根据消息的头部属性来进行路由。
- 在发送消息时,可以通过设置消息的头部属性,交换器会根据头部属性匹配规则将消息发送到对应的队列。
4.5 System Exchange(默认交换器)
- 默认交换器是一个特殊的直连交换器,无需指定交换器的名称。
- 当消息的 Routing Key 与队列的名称匹配时,消息会被发送到该队列。
选择交换器类型取决于消息路由需求。例如,如果希望将消息直接发送到指定队列,可以选择 Direct Exchange;如果希望消息广播到所有队列,可以选择 Fanout Exchange;如果需要根据复杂的条件进行消息路由,可以选择 Topic Exchange 或 Headers Exchange。
面试题
RabbitMQ为什么需要信道? 为什么不是TCP直接通道 ?
- Tcp创建和销毁开销特别大。
- 如果不用信道,大量的请求过来,会造成性能的瓶颈。
- 信道的原理是一条线程一条信道,多条线程多条通道同用一条TCP连接。
- 一条TCP连接可能容纳无限的信道,处理高并发的请求不会造成性能瓶颈。
5 Direct Exchange案例
1.在consumer服务中,编写两个消费者方法,分别监听log.info和log.error
2.在publisher中编写测试方法,向log. direct发送消息
5.1 消费者
5.1.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
5.1.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8081spring.application.name=mq-demo02-consumer
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器mq.config.exchange=log.direct
#设置队列infomq.config.queue.info=log.info
#设置队列info的路由键mq.config.info.routing.key=log.info.routing.key
#设置队列errormq.config.queue.error=log.error
#设置队列error的路由键mq.config.error.routing.key=log.error.routing.key
5.1.3 ErrorReceiver
packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="${mq.config.queue.error}",autoDelete ="false"),
exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.DIRECT),
key ="${mq.config.error.routing.key}"))publicclassErrorReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("errorReceiver:"+ msg);}}
5.1.4 InfoReceiver
packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="${mq.config.queue.info}",autoDelete ="false"),
exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.DIRECT),
key ="${mq.config.info.routing.key}"))publicclassInfoReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("infoReceiver:"+ msg);}}
5.2 生产者
5.2.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
5.2.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8082
spring.application.name=mq-demo03-provider
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器
mq.config.exchange=log.direct
#设置队列info的路由键
mq.config.info.routing.key=log.info.routing.key
#设置队列error的路由键
mq.config.error.routing.key=log.error.routing.key
5.2.3 Sender
packagecom.kdx.provider;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;@ComponentpublicclassSender{@AutowiredprivateAmqpTemplate amqpTemplate;@Value("${mq.config.exchange}")privateString exchange;@Value("${mq.config.info.routing.key}")privateString routingKey1;@Value("${mq.config.error.routing.key}")privateString routingKey2;publicvoidsend1(String msg){
amqpTemplate.convertAndSend(exchange,routingKey1,msg);}publicvoidsend2(String msg){
amqpTemplate.convertAndSend(exchange,routingKey2,msg);}}
5.3 测试
packagecom.kdx;importcom.kdx.provider.Sender;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassMqDemo03ProviderApplicationTests{@AutowiredprivateSender sender;@TestvoidtestSend1(){
sender.send1("hello mq 1");}@TestvoidtestSend2(){
sender.send2("hello mq 2");}}
启动两个Application,执行testSend1和testSend2:
结果看到Direct交换器根据RoutingKey判断路由给哪个队列
6 Topic Exchange案例
1.在consumer服务中,编写三个消费者方法,分别监听log.info、log.error和log.all
2.在publisher中编写测试方法,向 topic发送消息
6.1 消费者
6.1.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
6.1.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8083spring.application.name=mq-demo05-consumer
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器mq.config.exchange=log.topic
#设置队列infomq.config.queue.info=log.info
#设置队列errormq.config.queue.error=log.error
#设置队列logsmq.config.queue.logs=log.all
6.1.3 ErrorReceiver
packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="${mq.config.queue.error}",autoDelete ="false"),
exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.TOPIC),
key ="*.log.error"))publicclassErrorReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("errorReceiver:"+ msg);}}
6.1.4 InfoReceiver
packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="${mq.config.queue.info}",autoDelete ="false"),
exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.TOPIC),
key ="*.log.info"))publicclassInfoReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("infoReceiver:"+ msg);}}
6.1.5 LogsReceiver
packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="${mq.config.queue.logs}",autoDelete ="false"),
exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.TOPIC),
key ="*.log.*"))publicclassLogsReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("logsReceiver:"+ msg);}}
6.2 生产者
6.2.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
6.2.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8084spring.application.name=mq-demo04-provider
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器mq.config.exchange=log.topic
6.2.3 GoodServer
packagecom.kdx.provider;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;@ComponentpublicclassGoodServer{@AutowiredprivateAmqpTemplate amqpTemplate;@Value("${mq.config.exchange}")privateString exchange;publicvoidsend(String msg){
amqpTemplate.convertAndSend(exchange,"good.log.debug","good.log.debug:"+ msg);
amqpTemplate.convertAndSend(exchange,"good.log.info","good.log.info:"+ msg);
amqpTemplate.convertAndSend(exchange,"good.log.warn","good.log.warn:"+ msg);
amqpTemplate.convertAndSend(exchange,"good.log.error","good.log.error:"+ msg);}}
6.3 测试
packagecom.kdx;importcom.kdx.provider.GoodServer;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassMqDemo04ProviderApplicationTests{@AutowiredprivateGoodServer goodServer;@Testvoidtest1(){
goodServer.send("hello mq");}}
启动消费者和生产者服务,执行test1():
结果看到使用通配符模式匹配 Routing Key,并将消息发送到与模式匹配的队列。
Topic交换器与队列绑定时的bindingKey可以指定通配符,而且Topic交换器接收的消息RoutingKey必须是多个单词,以
.
分割
7 Fanout Exchange案例
1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定
2.在consumer服务中,编写两个消费者方法,分别监听order.sms和order.push
3.在publisher中编写测试方法,向log.fanout发送消息
7.1 消费者
7.1.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
7.1.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8086spring.application.name=mq-demo07-consumer
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器mq.config.exchange=log.fanout
#设置队列Q1mq.config.queue.sms=order.sms
#设置队列Q2mq.config.queue.push=order.push
7.1.3 SmsReceiver
packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="${mq.config.queue.sms}",autoDelete ="false"),
exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.FANOUT)))publicclassSmsReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("sms:"+ msg);}}
7.1.4 PushReceiver
packagecom.kdx.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Component@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="${mq.config.queue.push}",autoDelete ="false"),
exchange =@Exchange(value ="${mq.config.exchange}",type =ExchangeTypes.FANOUT)))publicclassPushReceiver{@RabbitHandlerpublicvoidprocess(String msg){System.out.println("push:"+ msg);}}
7.2 生产者
7.2.1 添加依赖
在 Maven 配置中添加 Spring AMQP 的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
7.2.2 配置 RabbitMQ 连接
在 Spring Boot 的配置文件(application.properties 或 application.yml)中添加 RabbitMQ 的连接信息:
server.port=8085spring.application.name=mq-demo06-provider
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#设置交换器mq.config.exchange=log.fanout
7.2.3 Sender
packagecom.kdx.provider;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;@ComponentpublicclassSender{@AutowiredprivateAmqpTemplate amqpTemplate;@Value("${mq.config.exchange}")privateString exchange;publicvoidsend(String msg){
amqpTemplate.convertAndSend(exchange,"",msg);//routingKey不写}}
7.3 测试
packagecom.kdx;importcom.kdx.provider.Sender;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassMqDemo06ProviderApplicationTests{@AutowiredprivateSender sender;@TestvoidtestSend(){
sender.send("fanout广播");}}
启动消费者和生产者服务,测试testSend():
结果看到SmsReceiver和PushReceiver都接收到了交换器广播消息。
8 RabbitMQ 持久化
RabbitMQ 提供了消息的持久化机制,确保即使在 RabbitMQ 服务器重启后,消息仍然能够被恢复。这主要涉及到队列和消息的持久化。
在 5 Direct Echange的案例基础上修改测试方法
packagecom.kdx;importcom.kdx.provider.Sender;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassMqDemo03ProviderApplicationTests{@AutowiredprivateSender sender;@TestvoidtestSend1(){for(int i =1; i <1000; i++){try{Thread.sleep(2000);}catch(InterruptedException e){
e.printStackTrace();}
sender.send1("hello mq 1 ..."+ i);}}}
必须确保在
autoDelete = "false"
:
出现在Queue中:当所有的消费者客户连接断开后,是否自动删除队列。
出现在Exchange中:当所有的绑定队列都不再使用时,是否自动删除交换器。
启动消费者和生产者服务,执行testSend1()方法:
现在停止消费者服务,结束在
infoReceiver:hello mq 1 ...7
然后再重启消费者服务,控制台:
发现没有重新从
infoReceiver:hello mq 1 ...1
开始输出,而是接着7从
infoReceiver:hello mq 1 ...8
开始,这就是消息的持久化。
9 RabbitMQ ACK确认机制
RabbitMQ 的 Acknowledgment(简称 ack)机制是确保消息在消费者正确处理后才被确认的一种机制。它有助于提高消息传递的可靠性。在 RabbitMQ 中,有三种 Acknowledgment 模式:自动确认、手动确认(单条消息)、手动批量确认(foreach遍历)。
1. 自动确认模式(Automatic Acknowledgment)
在自动确认模式下,消息一旦被消费者接收,RabbitMQ 就会立即确认消息的接收。这种模式下,消费者无法明确知道消息是否被正确处理。
// 默认是自动确认模式@RabbitListener(queues ="myQueue")publicvoidhandleMessage(String message){// 处理消息的业务逻辑...}
2. 手动确认模式(Manual Acknowledgment)
在手动确认模式下,消费者需要显式地告诉 RabbitMQ 是否成功处理了消息。如果消费者成功处理消息,则调用
channel.basicAck
进行确认;如果处理失败,则可以调用
channel.basicNack
或
channel.basicReject
进行拒绝。
@RabbitListener(queues ="myQueue")publicvoidhandleMessage(Message message,Channel channel)throwsIOException{try{// 处理消息的业务逻辑...// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){// 处理异常,可以选择拒绝消息或者进行其他处理
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}
在手动确认模式下,消费者需要谨慎处理异常情况,以确保消息在处理失败时能够得到适当的处理。手动确认模式提供了更精细的控制,确保消息在被消费者正确处理后才被确认。
版权归原作者 kdxing198 所有, 如有侵权,请联系我们删除。