0


RabbitMQ(三)SpringBoot整合,可靠性投递,死信队列,延迟队列,消费端限流,消息超时

文章目录

整合Springboot

概述

  • 搭建环境
  • 基础设定:交换机名称、队列名称、绑定关系
  • 发送消息:使用RabbitTemplate
  • 接收消息:使用@RabbitListener注解

消费者

pom

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>

yml

spring:rabbitmq:host: 192.168.217.134
    port:5672username: guest
    password:123456virtual-host: /
logging:level:com.atguigu.mq.listener.MyMessageListener: info

Listener

@Component@Slf4jpublicclassMyMessageListener{publicstaticfinalStringEXCHANGE_DIRECT="exchange.direct.order";publicstaticfinalStringROUTING_KEY="order";publicstaticfinalStringQUEUE_NAME="queue.order";//    写法一:监听 + 在 RabbitMQ 服务器上创建交换机、队列@RabbitListener(bindings =@QueueBinding(
           value =@Queue(value =QUEUE_NAME, durable ="true"),
           exchange =@Exchange(value =EXCHANGE_DIRECT),
           key ={ROUTING_KEY}))//    写法二:监听//     @RabbitListener(queues = {QUEUE_NAME})publicvoidprocessMessage(String dataString,Message message,Channel channel){
        log.info("消费端接收到了消息:"+ dataString);}}

生产者

pom

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>

yml

spring:rabbitmq:host: 192.168.217.134
    port:5672username: guest
    password:123456virtual-host: /

RabbitTemplate

@SpringBootTestpublicclassRabbitMQTest{publicstaticfinalStringEXCHANGE_DIRECT="exchange.direct.order";publicstaticfinalStringROUTING_KEY="order";@AutowiredprivateRabbitTemplate rabbitTemplate;@Testpublicvoidtest01SendMessage(){
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello Rabbit!SpringBoot!");}}

消息可靠性投递

故障原因

  1. 消息没有发送到消息队列上 后果:消费者拿不到消息,业务功能缺失,数据错误
  2. 消息成功存入消息队列,但是消息队列服务器宕机了 原本保存在内存中的消息也丢失了 即使服务器重新启动,消息也找不回来了 后果:消费者拿不到消息,业务功能缺失,数据错误
  3. 消息成功存入消息队列,但是消费端出现问题,例如:宕机、抛异常等等 后果:业务功能缺失,数据错误

解决方案

  • 故障情况1:消息没有发送到消息队列 - 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机队列来确认, 如果没有成功发送到消息队列服务器上,那就可以尝试重新发送- 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机

在这里插入图片描述

  • 故障情况2:消息队列服务器宕机导致内存中消息丢失 - 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
  • 故障情况3:消费端宕机或抛异常导致消息没有成功被消费 - 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息- 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)

关联交换机和备份交换机
在这里插入图片描述

生产者端消息确认机制(故障情况1)

故障原因1 解决方案:消息没有发送到消息队列上

pom

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>

YAML
publisher-confirm-type,publisher-returns两个必须要增加的配置,如果没有则功能不生效

# producerspring:rabbitmq:host: 192.168.217.134
    port:5672username: guest
    password:123456virtual-host: /
    publisher-confirm-type: CORRELATED # 交换机的确认publisher-returns:true# 队列的确认logging:level:com.atguigu.mq.config.MQProducerAckConfig: info

创建配置类

// 用于出现推送失败的情况下查看返回值importjakarta.annotation.PostConstruct;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Configuration;@Configuration@Slf4jpublicclassRabbitConfigimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;@PostConstructpublicvoidinitRabbitTemplate(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);}@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){// 消息发送到交换机成功或失败时调用这个方法
        log.info("confirm() 回调函数打印 CorrelationData:"+ correlationData);
        log.info("confirm() 回调函数打印 ack:"+ ack);
        log.info("confirm() 回调函数打印 cause:"+ cause);}@OverridepublicvoidreturnedMessage(ReturnedMessage returned){// 发送到队列失败时才调用这个方法
        log.info("returnedMessage() 回调函数 消息主体: "+newString(returned.getMessage().getBody()));
        log.info("returnedMessage() 回调函数 应答码: "+ returned.getReplyCode());
        log.info("returnedMessage() 回调函数 描述:"+ returned.getReplyText());
        log.info("returnedMessage() 回调函数 消息使用的交换器 exchange : "+ returned.getExchange());
        log.info("returnedMessage() 回调函数 消息使用的路由键 routing : "+ returned.getRoutingKey());}}

API说明
①ConfirmCallback接口

这是RabbitTemplate内部的一个接口,源代码如下:

/**
     * A callback for publisher confirmations.
     *
     */@FunctionalInterfacepublicinterfaceConfirmCallback{/**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */voidconfirm(@NullableCorrelationData correlationData,boolean ack,@NullableString cause);}

生产者端发送消息之后,回调confirm()方法

  • ack参数值为true:表示消息成功发送到了交换机
  • ack参数值为false:表示消息没有发送到交换机

②ReturnCallback接口

同样也RabbitTemplate内部的一个接口,源代码如下:

/**
     * A callback for returned messages.
     *
     * @since 2.3
     */@FunctionalInterfacepublicinterfaceReturnsCallback{/**
         * Returned message callback.
         * @param returned the returned message and metadata.
         */voidreturnedMessage(ReturnedMessage returned);}

注意:接口中的returnedMessage()方法仅在消息没有发送到队列时调用

ReturnedMessage类中主要属性含义如下:
属性名类型含义messageorg.springframework.amqp.core.Message消息以及消息相关数据replyCodeint应答码,类似于HTTP响应状态码replyTextString应答码说明exchangeString交换机名称routingKeyString路由键名称

故障情况2解决方案

指定队列名称默认自动持久化,还可设置是否自动删除队列
在这里插入图片描述

故障情况3解决方案

# consumerspring:rabbitmq:host: 192.168.217.134
    port:5672username: guest
    password:123456virtual-host: /
    listener:simple:acknowledge-mode: manual # 把消息确认模式改为手动确认prefetch:1# 每次从队列中取回消息的数量

deliveryTag:交付标签机制,每一个消息进入队列时,broker都会生成一个唯一标识
在这里插入图片描述
消息复制到各个队列,但deliveryTag各不相同
在这里插入图片描述

importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.IOException;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.concurrent.TimeUnit;@Component@Slf4jpublicclassMyMessageListener{publicstaticfinalStringQUEUE_NAME="queue.order";publicstaticfinalStringQUEUE_NORMAL="queue.normal.video";publicstaticfinalStringQUEUE_DEAD_LETTER="queue.dead.letter.video";publicstaticfinalStringQUEUE_DELAY="queue.test.delay";publicstaticfinalStringQUEUE_PRIORITY="queue.test.priority";@RabbitListener(queues ={QUEUE_NAME})publicvoidprocessMessage(String dataString,Message message,Channel channel)throwsIOException{// 获取当前消息的 deliveryTaglong deliveryTag = message.getMessageProperties().getDeliveryTag();try{// 核心操作
            log.info("消费端 消息内容:"+ dataString);System.out.println(10/0);// 核心操作成功:返回 ACK 信息
            channel.basicAck(deliveryTag,false);}catch(Exception e){// 获取当前消息是否是重复投递的//      redelivered 为 true:说明当前消息已经重复投递过一次了//      redelivered 为 false:说明当前消息是第一次投递Boolean redelivered = message.getMessageProperties().getRedelivered();// 核心操作失败:返回 NACK 信息// requeue 参数:控制消息是否重新放回队列//      取值为 true:重新放回队列,broker 会重新投递这个消息//      取值为 false:不重新放回队列,broker 会丢弃这个消息if(redelivered){// 如果当前消息已经是重复投递的,说明此前已经重试过一次啦,所以 requeue 设置为 false,表示不重新放回队列
                channel.basicNack(deliveryTag,false,false);}else{// 如果当前消息是第一次投递,说明当前代码是第一次抛异常,尚未重试,所以 requeue 设置为 true,表示重新放回队列在投递一次// 第二个参数:boolean multiple表示是否一次消费多条消息,false表示只确认该序列号对应的消息,true则表示确认该序列号对应的消息以及比该序列号小的所有消息,比如我先发送2条消息,他们的序列号分别为2,3,并且他们都没有被确认,还留在队列中,那么如果当前消息序列号为4,那么当multiple为true,则序列号为2、3的消息也会被一同确认。
                channel.basicNack(deliveryTag,false,true);}// reject 表示拒绝// 辨析:basicNack() 和 basicReject() 方法区别// basicNack()能控制是否批量操作// basicReject()不能控制是否批量操作// channel.basicReject(deliveryTag, true);}}}

消费端限流

概念

在这里插入图片描述
一个参数:

prefetch
# consumerspring:rabbitmq:host: 192.168.217.134
    port:5672username: guest
    password:123456virtual-host: /
    listener:simple:acknowledge-mode: manual # 把消息确认模式改为手动确认prefetch:1# 每次从队列中取回消息的数量

消息超时

概念

  • 消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除- 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这 个队列中的消息全部使用同一个过期时间。- 消息本身:给具体的某个消息设定过期时间
  • 如果两个层面都做了设置,那么哪个时间短,哪个生效

队列层面:配置队列过期

5000毫秒过期
在这里插入图片描述

消息本身:配置消息过期

@SpringBootTestpublicclassRabbitMQTest{publicstaticfinalStringEXCHANGE_TIMEOUT="exchange.test.timeout";publicstaticfinalStringROUTING_KEY_TIMEOUT="routing.key.test.timeout";@Testpublicvoidtest04SendMessage(){// 创建消息后置处理器对象MessagePostProcessor postProcessor = message ->{// 设置消息的过期时间,单位是毫秒
            message.getMessageProperties().setExpiration("7000");return message;};

        rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT,ROUTING_KEY_TIMEOUT,"Test timeout", postProcessor);}}

在这里插入图片描述

死信队列

概念

概念:当一个消息无法被消费,它就变成了死信。

  • 死信产生的原因大致有下面三种: - 拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false- 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储 了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变 成死信- 超时:消息到达超时时间未被消费
  • 死信的处理方式大致有下面三种: - 丢弃:对不重要的消息直接丢弃,不做处理- 入库:把死信写入数据库,日后处理- 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)

创建死信交换机和死信队列

和创建普通队列一样
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建正常队列,绑定死信队列

在这里插入图片描述在这里插入图片描述
绑定队列到交换机
在这里插入图片描述

代码

@TestpublicvoidtestSendMessageButReject(){  
    rabbitTemplate  
            .convertAndSend(EXCHANGE_NORMAL,ROUTING_KEY_NORMAL,"测试死信情况1:消息被拒绝");}

①监听正常队列

@RabbitListener(queues ={QUEUE_NORMAL})publicvoidprocessMessageNormal(Message message,Channel channel)throwsIOException{// 监听正常队列,但是拒绝消息
    log.info("★[normal]消息接收到,但我拒绝。");
    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}

②监听死信队列

@RabbitListener(queues ={QUEUE_DEAD_LETTER})publicvoidprocessMessageDead(String dataString,Message message,Channel channel)throwsIOException{// 监听死信队列  
    log.info("★[dead letter]dataString = "+ dataString);
    log.info("★[dead letter]我是死信监听方法,我接收到了死信消息");
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

测试超出队列长度进入死信队列

@TestpublicvoidtestSendMultiMessage(){for(int i =0; i <20; i++){
        rabbitTemplate.convertAndSend(EXCHANGE_NORMAL,ROUTING_KEY_NORMAL,"测试死信情况2:消息数量超过队列的最大容量"+ i);}}

执行循环20次的代码两次
正常队列:最大长度为10
在这里插入图片描述

死信队列:没有设置最大长度,所以推送失败的消息都进入死信队列

在这里插入图片描述
过一段时间正常队列中消息超时,进入死信队列
在这里插入图片描述

延迟队列

  • 方案1:借助消息超时时间+死信队列(就是刚刚我们测试的例子)
  • 方案2:给RabbitMQ安装插件

方案1:借助消息超时时间+死信队列

在这里插入图片描述

方案2:给RabbitMQ安装插件

插件安装
https://www.rabbitmq.com/community-plugins.html

docker inspect rabbitmq

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data
# 登录进入容器内部dockerexec-it rabbitmq /bin/bash

# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 退出Docker容器exit# 重启Docker容器docker restart rabbitmq

检查是否安装

创建新交换机时可以在type中看到x-delayed-message选项
在这里插入图片描述
在这里插入图片描述
关于x-delayed-type参数的理解:

原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢?

这里就额外使用x-delayed-type来指定交换机本身的类型

测试

生产者

@Testpublicvoidtest05SendMessageDelay(){// 创建消息后置处理器对象MessagePostProcessor postProcessor = message ->{// 设置消息过期时间(以毫秒为单位)// x-delay 参数必须基于 x-delayed-message-exchange 插件才能生效
        message.getMessageProperties().setHeader("x-delay","10000");return message;};// 发送消息
    rabbitTemplate.convertAndSend(EXCHANGE_DELAY,ROUTING_KEY_DELAY,"Test delay message by plugin "+newSimpleDateFormat("HH:mm:ss").format(newDate()),
            postProcessor);}

消费者

//已创建队列@Component@Slf4jpublicclassMyDelayMessageListener{publicstaticfinalStringQUEUE_DELAY="queue.delay.video";@RabbitListener(queues ={QUEUE_DELAY})publicvoidprocess(String dataString,Message message,Channel channel)throwsIOException{  
        log.info("[生产者]"+ dataString);
        log.info("[消费者]"+newSimpleDateFormat("hh:mm:ss").format(newDate()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
//未创建情况@Component@Slf4jpublicclassMyDelayMessageListener{publicstaticfinalStringEXCHANGE_DELAY="exchange.delay.video";publicstaticfinalStringROUTING_KEY_DELAY="routing.key.delay.video";publicstaticfinalStringQUEUE_DELAY="queue.delay.video";@RabbitListener(bindings =@QueueBinding(  
        value =@Queue(value =QUEUE_DELAY, durable ="true", autoDelete ="false"),  
        exchange =@Exchange(  
                value =EXCHANGE_DELAY,   
                durable ="true",   
                autoDelete ="false",   
                type ="x-delayed-message",   
                arguments =@Argument(name ="x-delayed-type", value ="direct")),  
        key ={ROUTING_KEY_DELAY}))publicvoidprocess(String dataString,Message message,Channel channel)throwsIOException{  
        log.info("[生产者]"+ dataString);  
        log.info("[消费者]"+newSimpleDateFormat("hh:mm:ss").format(newDate()));  
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}

本文转载自: https://blog.csdn.net/qq_45742250/article/details/139177420
版权归原作者 Lucky_Turtle 所有, 如有侵权,请联系我们删除。

“RabbitMQ(三)SpringBoot整合,可靠性投递,死信队列,延迟队列,消费端限流,消息超时”的评论:

还没有评论