0


【RabbitMQ】使用手册

RabbitMQ

同步调用

优点:时效性强,等待到结果后才返回

缺点:扩展性差、性能下降(调用链越长耗时越久)、级联失败问题(一个调用点卡住,后面的链路都不能执行)

异步调用

异步调用通常是基于消息通知的方式,包含三个角色:

消息发送者:投递消息的人,就是原来的调用者

消息接收者:接收和处理消息的人,就是原来的服务提供者

消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器

image-20240620225011677

优点:

耦合度低,拓展性强

异步调用,无需等待,性能好

故障隔离,下游服务故障不影响上游业务

缓存消息,流量削峰填谷

缺点:

不能立即得到调用结果,时效性差

不确定下游业务执行是否成功

业务安全依赖于Broker(消息代理)的可靠性

MQ技术选型

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

RabbitMQ安装

  1. 查找镜像:docker search rabbitmqimage-20240622142056336
  2. 拉取镜像:docker pull rabbitmq:3.8.19,指定拉取版本为3.18.19,如果不指定则默认拉取latestimage-20240622142226656
  3. 查看镜像:docker imagesimage-20240622142422222
  4. 启动镜像:设置账号登录为admin,登录密码为admin,不指定镜像版本,默认启动rabbitmq:latestdocker run \-eRABBITMQ_DEFAULT_USER=admin \-eRABBITMQ_DEFAULT_PASS=admin \-v mq-plugins:/plugins \--name mq \--hostname localhost \-p15672:15672 \-p5672:5672 \-d\rabbitmq:3.8.19- 15672是RabbitMQ的后台管理端口- 5672是RabbitMQ的AQMP端口- /plugins是RabbitMQ容器存放插件的路径image-20240622142648269
  5. 查看容器:docker ps外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  6. 进入RabbitMQ容器:docker exec -it 4df /bin/bash外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  7. 开启RabbitMQ后台访问:rabbitmq-plugins enable rabbitmq_managementimage-20240622143150816
  8. 退出容器bash:exitimage-20240622143240346
  9. 网页访问RabbitMQ后台:访问http://localhost:15672,账号admin,密码admin![image-20240622143517023](https://img-blog.csdnimg.cn/img_convert/edb4d1802a6988d88d02627a78f8629e.png)

常见问题:

  1. 后台管理系统的可视化界面中出现:All stable feature flags must be enabled after completing an upgrade解决方案:点击Admin -> Feature Flags,确保所有稳定的特性标志都是启用状态。如果有任何标志未启用,请将其启用。外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  2. 后台管理系统的可视化界面中出现:Stats in management UI are disabled on this node外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传解决方案:进入RabbitMQ容器,运行命令:echo management_agent.disable_metrics_collector=false>/etc/rabbitmq/conf.d/management_agent.disable_metrics_collector.conf,退出RabbitMQ容器,然后运行docker restart 容器id重启RabbitMQ容器。外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  3. 后台管理系统的可视化界面中 Overview 不显示图形的问题解决方案:同《2. 后台管理系统的可视化界面中出现:Stats in management UI are disabled on this node》

RabbitMQ介绍

  • publisher:消息发送者
  • comsumer:消息消费者
  • queue:队列-存储消息
  • exchange:交换机-接收发送者发送的消息,并将消息路由到与其绑定的队列
  • virtual-host:虚拟主机-将数据隔离(多个项目使用同一个RabbitMQ时,可以为每个项目建立一个virtual-host,将不同项目之间的exchange和queue隔离)

image-20240621002222015

Work Queues(任务模型)

任务模型简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。同一个消息只会被一个消费者处理。多个消费者绑定到一个队列,可以加快消息处理速度。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。因此我们需要修改消费者的application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息,消费者处理完后再投递下一条消息。

image-20240623005915858

Fanout交换机

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式。 Fanout交换机会将收到的消息复制成n份,然后将消息发送到n个与其绑定的队列中。

image-20240623010648252

应用场景:用户支付成功后,交易服务更新订单状态,短信服务通知用户,积分服务为用户增加积分。

实现:交易服务的queue、短信服务的queue、积分服务的queue都绑定到Fanout交换机,用户支付成功后,支付服务将消息发送到Fanout交换机,这样交易服务、短信服务、积分服务九都能收到这条消息了。

案例演示

实现思路:

  1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
  2. 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  4. 在publisher中编写测试方法,向hmall.fanout发送消息

代码实现:

image-20240623013810879

发送者:

@SpringBootTestclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){String exchangeName="hmall.fanout";String message="hello everyone";                            rabbitTemplate.convertAndSend(exchangeName,null,message);}}

消费者:

@Component@Slf4jpublicclassSpringRabbitListener{@RabbitListener(queues ="fanout.queue1")publicvoidlistenerWorkQueue1(String message){
        log.info("消费者1接收到消息:{},---{}",message,LocalTime.now());}@RabbitListener(queues ="fanout.queue2")publicvoidlistenerWorkQueue2(String message){
        log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());}}

消费者输出:

image-20240623015031061

Direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个Bindingkey(可以为每一个Queue指定相同的Bindingkey,实现和Fanout交换机相同的功能)。
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

image-20240623124250467

应用场景:用户取消后,只需要给交易服务发送消息,通知交易服务更新订单状态,而不需要给短信服务和积分服务发送消息。

案例演示

实现思路:

  1. 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
  2. 在RabbitMQ控制台中,声明交换机hmall. direct,将两个队列与其绑定,routeKey 为blue时路由到direct.queue1,为yellow时路由到direct.queue2,为red时路由到direct.queue1和direct.queue2
  3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  4. 在publisher中编写测试方法,利用不同的RoutingKey向hmall. direct发送消息

代码实现:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

消费者

@Component@Slf4jpublicclassSpringRabbitListener{@RabbitListener(queues ="direct.queue1")publicvoidlistenerWorkQueue1(String message){
        log.info("消费者1接收到消息:{},---{}",message,LocalTime.now());}@RabbitListener(queues ="direct.queue2")publicvoidlistenerWorkQueue2(String message){
        log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());}}

发送者

@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){//交换机名称String exchangeName="hmall.direct";//消息String message_blue="hello blue";String message_yellow="hello yellow";String message_red="hello red";//发送消息
 rabbitTemplate.convertAndSend(exchangeName,"blue",message_blue);
 rabbitTemplate.convertAndSend(exchangeName,"yellow",message_yellow);
 rabbitTemplate.convertAndSend(exchangeName,"red",message_red);}

消费者输出:

image-20240623130816236

Topic交换机

TopicExchange也是基于routingkey做消息路由,但是routingkey通常是多个单词的组合,并且以.分割。

Queue与Exchange指定routingkey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

案例演示

实现思路:

  1. 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
  2. 在RabbitMQ控制台中,声明交换机hmall. topic,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
  4. 在publisher中编写测试方法,利用不同的RoutingKey向hmall. topic发送消息

代码实现:

image-20240623133252103

消费者:

@Component@Slf4jpublicclassSpringRabbitListener{@RabbitListener(queues ="topic.queue1")publicvoidlistenerWorkQueue1(String message){
        log.info("消费者1接收到消息:{},---{}",message,LocalTime.now());}@RabbitListener(queues ="topic.queue2")publicvoidlistenerWorkQueue2(String message){
        log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());}}

发送者1:

@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){//交换机名称String exchangeName="hmall.topic";//消息String message="中国新闻";//发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}

消费者输出:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

发送者2:

@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){//交换机名称String exchangeName="hmall.topic";//消息String message="中国天气";//发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);}

消费者输出:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

AMQP

Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

AmqpTemplate和RabbitTemplate

AmqpTemplate

是一个接口,定义了基本的 AMQP 操作,如发送消息、接收消息、转换消息等。它提供了与 AMQP(包括 RabbitMQ)通信的基本功能的抽象。

RabbitTemplate

AmqpTemplate

的默认实现类,专门用于与 RabbitMQ 进行交互。它实现了

AmqpTemplate

接口,并提供了更多与 RabbitMQ 交互的具体功能和配置选项。

RabbitTemplate

AmqpTemplate

更加丰富,提供了一些额外的高级特性和配置选项,如事务支持、消息确认机制、消息转换器等。这些功能可以更好地满足与 RabbitMQ 高级交互需求。

综上所述,

AmqpTemplate

是一个通用的 AMQP 操作接口,而

RabbitTemplate

是对其的具体实现,提供了更多与 RabbitMQ 交互的功能和默认配置,使得在 Spring 应用中使用 RabbitMQ 变得更加简单和方便。

RabbitMQ使用

后台可视化界面操作

  • 创建用户外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  • 创建虚拟主机外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  • 为用户添加可访问的虚拟主机image-20240622222126051外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传注意:当前登录用户默认有权限访问其创建的所有虚拟主机。
  • 创建队列image-20240622232912672- Durability:Durable:持久化队列,Rabbit服务器重启后,这个队列还会存在Transient:临时队列,Rabbit服务器重启后,这个队列将会被删除
  • 查看队列的消费者image-20240623000921437
  • 向队列中发布消息外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  • 获取队列中消息队列中可以存储消息。当队列中的消息未被消费时,消息将存储在队列中,此时可以查看队列中的消息。image-20240623002855751- Act Mode:Nack message requeue true:获取消息,但是不做ack应答确认,消息重新入队Ack message requeue false:获取消息,应答确认,消息不重新入队,将会从队列中删除reject requeue true:拒绝获取消息,消息重新入队reject requeue false:拒绝获取消息,消息不重新入队,将会被删除- Encoding:可以选择将消息进行base64编码- Messages:从队列中获取的消息数量
  • 清理消息外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

代码操作

  1. 引入依赖<!-- AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  2. application.yaml中配置RabbitMQspring:rabbitmq:host: 192.168.1.2 # RabbitMQ地址port:5672# 端口virtual-host: /hmall # 虚拟主机username: jack # 用户名password: jack # 密码
创建队列和交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

image-20240623135444692

如果已经存在交换机、队列、绑定关系,运行代码时则不会进行创建,而且也不会报错。

通常发送者只需要关心消息发送,消费者关心队列、交换机、以及绑定关系,所以创建操作一般写在消费者中。

Sping提供了基于java bean和基于

@RabbitListener

注解两种方式创建。

  • 基于bean代码演示:
packagecom.itheima.consumer.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfiguration{//声明交换机@BeanpublicFanoutExchangefanoutExchange(){//        方式1//        return new FanoutExchange("hmall.fanout");//        方式2returnExchangeBuilder.fanoutExchange("hmall.fanout").build();}//声明队列@BeanpublicQueuefanoutQueue1(){//        方式1//        return new Queue("fanout.queue1",true);//        方式2returnQueueBuilder.durable("fanout.queue1").build();}//将队列和交换机绑定@BeanpublicBindingfanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){//spring会去找beanName='fanoutQueue1'的bean作为参数传进来returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@BeanpublicQueuefanoutQueue2(){returnQueueBuilder.durable("fanout.queue2").build();}@BeanpublicBindingfanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){//spring会去找beanName='fanoutQueue2'的bean作为参数传进来returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
  • 基于@RabbitListener注解代码演示:
@Component@Slf4jpublicclassSpringRabbitListener{@RabbitListener(bindings =@QueueBinding(//将交换机和队列绑定
            value =@Queue(name="direct.queue1",durable ="true"),//如果没有队列direct.queue1则创建队列,并监听队列direct.queue1
            exchange =@Exchange(name ="hmall.direct",type =ExchangeTypes.DIRECT),//如果没有交换机hmall.direct则创建交换机
            key ={"blue","red"}//routingKey))publicvoidlistenerWorkQueue1(String message){
        log.info("消费者1接收到消息:{},---{}",message,LocalTime.now());}@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name="direct.queue2",durable ="true"),
            exchange =@Exchange(name ="hmall.direct",type =ExchangeTypes.DIRECT),
            key ={"yellow","red"}))publicvoidlistenerWorkQueue2(String message){
        log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());}}
发送消息
  • 直接发送给队列方法:public void convertAndSend(String routingKey, final Object object),直接发给队列时,routingKey相当于队列名。@AutowiredprivateRabbitTemplate rabbitTemplate;@Testpublicvoid testSimp leQueue(){//队列名称String queueName ="simple. queue";//消息String message ="hello, spring amqp!";//发送消息 rabbitTemplate.convertAndSend(queueName, message);}注意:队列不显示绑定交换机时,默认还是会绑定到defalut exchange上外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  • 发送给Fanout Exchange方法:public void convertAndSend(String exchange, String routingKey, final Object object),使用Fanout Exchange时,routingKey相当于队列名,发送给Fanout Exchange时,routingKey传null或""@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){//交换机名称String exchangeName="hmall.fanout";//消息String message="hello everyone";//发送消息rabbitTemplate.convertAndSend(exchangeName,null,message);}
  • 发送给direct交换机方法:public void convertAndSend(String exchange, String routingKey, final Object object),routingKey就是交换机和队列绑定时的routingKey@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){//交换机名称String exchangeName="hmall.direct";//消息String message_blue="hello blue";String message_yellow="hello yellow";String message_red="hello red";//发送消息 rabbitTemplate.convertAndSend(exchangeName,"blue",message_blue); rabbitTemplate.convertAndSend(exchangeName,"yellow",message_yellow); rabbitTemplate.convertAndSend(exchangeName,"red",message_red);}
  • 发送给topic交换机方法:方法:public void convertAndSend(String exchange, String routingKey, final Object object)``````@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){//交换机名称String exchangeName="hmall.topic";//消息String message="中国新闻";//发送消息 rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}
接收消息
@Component@Slf4jpublicclassSpringRabbitListener{@RabbitListener(queues ="队列名")publicvoidlistenerSimpleQueue(String message){
        log.info("消费者收到消息:{}",message);}}
配置消息转换器

convertAndSend方法会先将消息进行序列化,然后再发送。

Spring的对消息对象的处理是由org.springframework.amap.support.converter.Messageconverter来处理的。而
默认实现是SimpleMessageConverter,如果消息实现了Serializable接口,则会使用serialize方法进行序列化,而serialize方法是基于JDK的Objectoutputstream完成序列化的。存在下列问题:

  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

image-20240623151642723

建议采用JSON序列化代替默认的JDK序列化,要做两件事情:

  1. 在publisher和consumer中都要引入jackson依赖,发送者和消费者要使用相同的消息转换器:<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
  2. 在publisher和consumer中都要配置MessageConverter:@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}

测试:

  • 使用默认的消息转换器发送者:packagecom.itheima.publisher;importlombok.AllArgsConstructor;importlombok.Data;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importjava.io.Serializable;@SpringBootTestclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){User jack =newUser("jack",18); rabbitTemplate.convertAndSend("testConvertMessage", jack);}}@Data@AllArgsConstructorclassUserimplementsSerializable{//要实现Serializable接口,否则convertAndSend方法进行消息转换时会抛出异常privateString name;privateInteger age;}查看消息:image-20240623155543865消费者:packagecom.itheima.consumer.mq;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.Serializable;@Component@Slf4jpublicclassSpringRabbitListener{@RabbitListener(queues ="testConvertMessage")publicvoidlistenerWorkQueue(User message){ log.info("消费者接收到消息:{}",message);}}@Data@AllArgsConstructor@NoArgsConstructorclassUserimplementsSerializable{privateString name;privateInteger age;}消费者输出:image-20240623155819000
  • 配置消息转换器发送者:packagecom.itheima.publisher;importlombok.AllArgsConstructor;importlombok.Data;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){User jack =newUser("jack",18); rabbitTemplate.convertAndSend("testConvertMessage", jack);}}@Data@AllArgsConstructorclassUser{privateString name;privateInteger age;}查看消息:image-20240623154053513消费者:packagecom.itheima.consumer.mq;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassSpringRabbitListener{@RabbitListener(queues ="testConvertMessage")publicvoidlistenerWorkQueue(User message){//自动将json字符串转为User独享 log.info("消费者接收到消息:{}",message);}}@Data@AllArgsConstructor@NoArgsConstructor//消费者将消息转为User对象时,User对象一定要有空参构造器classUser{privateString name;privateInteger age;}消费者输出:外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

消息可靠性

消息丢失三种情况:

  • 发送者到MQ服务器时消息丢失
  • MQ服务器宕机导致消息丢失
  • MQ服务器将消息发送给消费者时消息丢失

发送者的可靠性

发送者重连

有的时候由于网络波动,可能会出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制,默认重连机制是关闭的。

spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间,超过1秒钟还没有连上MQ则表示连接超时template:retry:enabled:true# 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier:1# 失败后下次的等待时长倍数,下次等待时长 initial-interval * multipliermax-attempts:3# 最大重试次数

案例演示:

  1. 停止MQimage-20240623173037488
  2. 开启重连spring:rabbitmq:host: 192.168.1.2 port:5672virtual-host: /hmall username: jack password: jack connection-timeout: 1s template:retry:enabled:trueinitial-interval: 1000ms multiplier:1max-attempts:3
  3. 发送者发送消息packagecom.itheima.publisher;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){ rabbitTemplate.convertAndSend("testConvertMessage","你好");}}
  4. 消息发送失败外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能,如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

发送者确认

SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确人机制后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是MQ路由失败。此时会通过PublisherReturn返回路由异常原因,然后PublisherConfirm返回ACK,告知发送者投递成功
  • 临时消息投递到了MQ,并且入队成功,PublisherConfirm返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队且完成持久化,PublisherConfirm返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

开启发送者确认机制:

  1. 开启配置spring:rabbitmq:publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型publisher-returns:true# 开局publisher return机制这里publisher-confirm-type有三种模式可选:- none:关闭confirm机制- simple:同步阻塞等待MQ的回执消息- correlated:MQ异步回调方式返回回执消息
  2. 为RabbitTemplate配置ReturnsCallback每个RabbitTemplate只能配置一个ReturnsCallback,因此需要在项目启动过程中配置:外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  3. 每次发送消息时,指定消息ID、消息ConfirmCallbackimage-20240623175841421

案例演示:

  1. 开启发送者确认配置spring:rabbitmq:host: 192.168.1.2 # RabbitMQ地址port:5672# 端口virtual-host: /hmall # 虚拟主机username: jack # 用户名password: jack # 密码publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型publisher-returns:true# 开局publisher return机制
  2. 定义ReturnsCallbackpackagecom.itheima.publisher;importlombok.AllArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.context.annotation.Configuration;importjavax.annotation.PostConstruct;@Configuration@Slf4j@AllArgsConstructorpublicclassMqConfig{privatefinalRabbitTemplate rabbitTemplate;@PostConstructpublicvoidinit(){ rabbitTemplate.setReturnsCallback(returnedMessage ->{ log.info("监听到了消息return callback"); log.info("exchange: {}", returnedMessage.getExchange()); log.info("routingKey: {}", returnedMessage.getRoutingKey()); log.info("message:{}", returnedMessage.getMessage()); log.info("replyCode: {}", returnedMessage.getReplyCode()); log.info("replyText: {}", returnedMessage.getReplyText());});}}
  3. 定义ConfirmCallback并发送消息3.1 发送成功packagecom.itheima.publisher;importlombok.extern.slf4j.Slf4j;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.util.concurrent.ListenableFutureCallback;importjava.util.UUID;importjava.util.concurrent.TimeUnit;@SpringBootTest@Slf4jclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestConfirmCallback(){//0. 创建CorrelationData,并设置消息IDCorrelationData cd =newCorrelationData(UUID.randomUUID().toString()); cd.getFuture().addCallback(newListenableFutureCallback<CorrelationData.Confirm>(){@OverridepublicvoidonFailure(Throwable ex){ log.error("spring amqp 处理确认结果异常", ex);}@OverridepublicvoidonSuccess(CorrelationData.Confirm result){if(result.isAck()){ log.info("收到ConfirmCallback ack,消息发送成功!");}else{ log.info("收到ConfirmCallback nack,消息发送失败!", result.getReason());}}});//1. 交换机名称String exchangeName ="hmall.direct";//2. 消息String message ="测试发送者确认";//3. 发送消息 rabbitTemplate.convertAndSend(exchangeName,"blue", message, cd);//4. 此单元测试方法执行完,main线程就结束了,因此需要睡眠2s接收回调函数try{TimeUnit.SECONDS.sleep(2);}catch(InterruptedException e){ e.printStackTrace();}}}外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传3.2 发送失败-路由失败rabbitTemplate.convertAndSend(exchangeName,"blue22", message, cd);外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

注意:发送者确认机制需要发送者和MQ进行确认,会大大影响消息发送的效率,通常情况下不建议开启发送者确认机制。

MQ的可靠性

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞
数据持久化

RabbitMQ实现数据持久化包括3个方面,设置为持久化后,重启MQ,交换机、队列、消息也不会丢失。

  • 交换机持久化(新建交换机默认就是持久化)image-20240623213739596D表示持久化image-20240623214323156
  • 队列持久化(新建队列默认就是持久化)image-20240623213820123
  • 消息持久化(可视化界面发送消息时默认是非持久化,SpringAmqp发送消息时默认是持久化的)外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

案例演示:

MQ接收非持久化消息

发送者发送1百万条非持久化消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

发送耗时:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

MQ收到了一百万条非持久化消息

注意:本测试使用的MQ是3.13.3,默认使用的是Lazy Queue模式:所有的消息直接存入磁盘,不再存储到内存,所以In memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,Persistent表示存入磁盘且持久化的消息的数量)

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

重启MQ后,一百万条非持久化消息全部丢失

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

MQ接收持久化消息

发送者发送1百万条持久化消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

发送耗时:

image-20240623222929928

MQ收到了一百万条持久化消息

注意:本测试使用的MQ是3.13.3,默认使用的是Lazy Queue模式:所有的消息直接存入磁盘,不再存储到内存,所以In memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,Persistent表示存入磁盘且持久化的消息的数量)

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

重启MQ后,一百万条持久化消息不会丢失

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

结论

在接收非持久化消息时,MQ收到消息后会先将消息存到内存中的队列中,队列满了之后会把先收到的消息存到磁盘中(这个行为称为paged out,paged out会导致MQ阻塞),然后再继续接收消息,把消息存进内存中的队列中,队列满了之后再把队列中的消息存入磁盘中,以此类推。

在接收持久化消息时,MQ会直接将消息存到磁盘中,不会等内存中的队列满了之后再将消息保存到磁盘中。

发送一千万条非持久化消息耗时:

image-20240623223503631

发送一千万条持久化消息耗时:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

从上面发送者发送一百万条消息的耗时来看,发送持久化消息比发送非持久化消息耗时更少(不需要paged out),而且持久化消息在MQ重启后不会丢失,所以建议发送持久化消息。

Lazy Queue

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘,不再存储到内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)

在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

3.12版本之前的MQ设置Lazy Queue模式有三种方式:

  • 可视化界面设置要设置一个队列为情性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:image-20240623224928445
  • Spring Bean方式设置image-20240623225142076
  • 注解方式设置外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

非Lazy Queue模式+持久化消息和Lazy Queue模式+持久化消息MQ接收消息速度对比:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

消费者的可靠性

消费者确认机制

消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。MQ将一条消息发送给消费者后,MQ上的这条消息处理待确认状态,当消费者处理消息结束后,应该向RabbitMO发送一个回执,告知RabbitMQ自己消息处理状态:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto:自动模式(默认模式)。SpringAMQP利用 AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常(比如throw new RuntimeException),会自动返回nack- 如果是消息处理或校验异常,自动返回reject外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

案例演示-自动模式

  1. 消费者配置spring:rabbitmq:host: 192.168.1.2 # RabbitMQ地址port:5672# 端口virtual-host: /hmall # 虚拟主机username: jack # 用户名password: jack # 密码listener:simple:prefetch:1acknowledge-mode: auto
  2. 消费者外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  3. 发送者外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传查看消息状态:image-20240624004948237- 因为消费者抛出业务异常,所以会给MQ发送nack,然后MQ不停地向消费者投递消息image-20240624005751210查看消息内容image-20240624005115377- 查看队列中的消息,提示队列是空的,所以得出结论:待确认的消息不保存在队列中

案例演示-手动模式

  1. 消费者配置spring:rabbitmq:host: 192.168.1.2 # RabbitMQ地址port:5672# 端口virtual-host: /hmall # 虚拟主机username: jack # 用户名password: jack # 密码listener:simple:prefetch:1acknowledge-mode: manual
  2. 消费者image-20240624233521995
  3. 发送者3.1 发送者发送ackxxxx外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传程序会运行到消费者在21行的断点处,消费者输出image-20240624233857040查看消息状态image-20240624234215403放行消费者21行断点,查看消息状态image-202406242344129163.2 发送者发送nackxxxx,程序会运行到消费者在26行的断点处,消费者输出外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传查看消息状态外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传放行断点,消费者输出image-20240624235531700查看消息状态,消息又被重新放回了队列,并且MQ又将消息投递给了消费者外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传取消断点26行断点,消费者不停地输出,说明MQ不停地向消费者进行消息重投image-20240624235844464查看消息状态image-20240625000011663停掉消费者进程,查看消息状态image-202406250002474133.3 发送者发送xxxx,消费者停在30行的断点处,消费者输出image-20240625001012535查看消息状态外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传放行断点,查看消息状态外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
失败重试机制

SpringAMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

案例演示

  1. 消费者配置
spring:rabbitmq:host: 192.168.1.2 # RabbitMQ地址port:5672# 端口virtual-host: /hmall # 虚拟主机username: jack # 用户名password: jack # 密码listener:simple:prefetch:1acknowledge-mode: auto
        retry:enabled:trueinitial-interval: 1000ms
          multiplier:1max-attempts:3stateless:true
  1. 消费者外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  2. 发送者image-20240624010548871
  3. 消费者输出外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  4. 查看消息状态image-20240624011251261

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer(默认):重试耗尽后,给MQ返回reject,MQ收到reject后会将消息丢弃。
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

将失败处理策略改为RepublishMessageRecoverer:

  1. 首先,定义接收失败消息的交换机、队列及其绑定关系。
  2. 然后,定义RepublishMessageRecoverer:image-20240624012025791

案例演示

  1. 定义接收失败消息的交换机、队列、绑定关系、RepublishMessageRecoverer

image-20240624012657886

  1. 消费者外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  2. 消费者输出外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  3. 查看error.queue上的消息image-20240624013151477
业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),例如求绝对值的函数。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。

image-20240624013938181

消除非幂等性的手段

  • 唯一消息id外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

案例演示:

  1. 配置消息转换器外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  2. 发送者发送消息image-20240624014854578
  3. 查看消息image-20240624014817996
  4. 消费者使用Message接收

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 业务判断image-20240624015630994

延迟消息

延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。

延迟任务:设置在一定时间之后才执行的任务

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信 (dead letter)

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

image-20240624021939871

案例演示

  1. 消费者中定义交换机和队列,并监听外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  2. 定义交换机和队列,并将交换机dlx.direct声明为死信交换机,并与队列normal.queue绑定。image-20240624184949769
  3. 查看队列、交换机、绑定关系外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传image-20240624190459639外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  4. 发送者,发送消息时设置消息的死亡时间外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  5. 大约10s后消费者收到消息外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传image-20240624193050394
  6. 说明向交换机normal.direct中投递消息M,指定routingKey为hi,消息M的死亡时间设置为10s,消息M会被路由到队列normal.queue中,因为队列normal.queue没有消费者监听,碰巧队列normal.queue绑定了死信交换机dlx.direct,所以投递到队列normal.queue的消息M死亡后,会被转投到死信交换机dlx.direct中,因为指定的routingKey为hi,所以死信交换机dlx.direct会将消息M路由到队列dlx.queue中,而队列dlx.queue有消费者C监听,所以消费者C会消费消息M。这就实现了消息M延迟10秒后被消费。

延迟消息插件

使用死信队列可以实现延迟消息,但这种方法过于繁琐。为了简化这一过程,RabbitMQ的官方推出了一款插件,该插件原生支持延迟消息功能。该插件的运作原理是设计了一种特殊的交换机,当消息投递到这种交换机时,它能够暂存一段时间,直到达到设定的延迟时间后再将消息投递到相应的队列。这种设计大大简化了延迟消息的处理过程,提高了系统的效率和可靠性。

下载

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

选择下载的版本,插件的版本要和RabbitMQ的版本保持一致

  • 查看RabbitMQ的版本:docker run -d--name containerId -p5672:5672 rabbitmq:3-management外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  • 选择3.13.x版本的插件image-20240624201530414

安装

  • 将插件复制到RabbitMQ容器中dockercp 插件路径 容器ID或名称:/plugins/
  • 安装dockerexec-it 容器ID或名称 rabbitmq-plugins enable rabbitmq_delayed_message_exchangeimage-20240624213854511

使用

  1. 创建延迟交换机,三种方式- 图形化界面操作image-20240624214941680- 注解方式外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传- SpringBean方式外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  2. 查看延迟交换机外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  3. 发送消息时需要通过消息头x-delay来设置过期时间外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
  4. 消费者大约10s后收到消息image-20240624215445214外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

本文参考文档

https://b11et3un53m.feishu.cn/wiki/A9SawKUxsikJ6dk3icacVWb4n3g

https://blog.csdn.net/karry_zzj/article/details/119513541

https://blog.csdn.net/weixin_42050545/article/details/121487823

标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/weixin_43654975/article/details/140165516
版权归原作者 a coding ape 所有, 如有侵权,请联系我们删除。

“【RabbitMQ】使用手册”的评论:

还没有评论