0


RabbitMQ确认机制

介绍

概要:RabbitMQ中间件的引入对于整个系统来说是一把双刃剑,在对系统进行解耦的同时也降低了消息的可靠性,但是对于某些系统来说我们又必须保证我们的消息是不会丢失的,因此rabbitmq给提供了以下一些功能来保证消息的可靠性,本文我们主要讲解消息可靠性中的 发送端确认机制 以及 消费端确认机制,需要保证消费者、RabbitMQ自己和消费者都不能丢消息

RabbitMQ通过 publisher confirm 机制来实现的消息发送端确认。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后 (如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。

img

​ 在生产者向rabbitmq发送消息的整个流程中,生产者首先是要将消息发送给 交换机,然后交换机 根据指定的 路由键 把消息路由到指定的消息队列中,然后消费者从对应的消息队列对消息进行消费,因此我们要实现生产端的消息确认就需要保证 消息发送到交换机 以及 交换机路由消息到队列 的时候消息是不会丢失的

img

1、导入依赖
<dependencies><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>
2、配置文件
server:port:7001#RabbitMQspring:rabbitmq:username: admin
    password: admin
    host: 192.168.132.128
    port:5672virtual-host: /
    publisher-returns:truelistener:simple:concurrency:10#消费者数量max-concurrency:10#最大消费者的数量prefetch:1#限流(消费者每次从队列获取的消息数量)auto-startup:true#启动时自动启动容器acknowledge-mode: manual #手动ack#        retry:#          enabled: true#          max-attempts: 3 # 重试次数#          max-interval: 10000   # 重试最大间隔时间#          initial-interval: 2000  # 重试初始间隔时间#          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间publisher-confirm-type: correlated   # 开启确认机制/老版 publisher-confirms: true
3、创建交换机
1、Direct交换机模式
packageedu.hunan.rabbitmq.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectConfig{//声明注册Direct交换机模式@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("direct_exchange");}//声明队列@BeanpublicQueuedirectQueue(){/**
         * 参数详解
         * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
         * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         */returnnewQueue("direct_queue",true);}@BeanpublicQueuedirectQueue2(){/**
         * 参数详解
         * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
         * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         */returnnewQueue("direct_queue2",true);}//交换机和队列绑定@BeanpublicBindingdirectBinding(){returnBindingBuilder.bind(directQueue()).to(directExchange()).with("direct1");}@BeanpublicBindingdirectBinding2(){returnBindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2");}}
2、FanOut交换机模式
packageedu.hunan.rabbitmq.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanOutConfig{//声明注册FanOut交换机模式@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanout_exchange");}//声明队列@BeanpublicQueuefanoutQueue(){/**
         * 参数详解
         * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
         * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         */returnnewQueue("fanout_queue",true);}//交换机和队列绑定@BeanpublicBindingfanoutBinding(){returnBindingBuilder.bind(fanoutQueue()).to(fanoutExchange());}}
4、初始化回调方法
packageedu.hunan.rabbitmq.service;importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjavax.annotation.PostConstruct;@ServicepublicclassRabbitMQService{@AutowiredprivateRabbitTemplate rabbitTemplate;/**
     * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
     */@PostConstructpublicvoidinit(){/**
         * 消息投递到交换机后触发回调
         * 使用该功能需要开启确认,spring-boot中配置如下:
         * publisher-confirm-type: correlated   # 开启确认机制/老版 publisher-confirms: true
         */
        rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){if(ack){System.err.println("消息投递成功,消息已确认->"+cause+"\t"+correlationData);}else{/**
                     * 如果消息投递失败需要设置重发   如果一直重发失败投递到死信队列中/数据库 进行手动排查
                     * 重发数据库+定时任务实现
                     */System.err.println("消息投递失败,消息未确认->"+cause+"\t"+correlationData);}}});/**
         * 通过实现ReturnsCallback接口
         * 如果消息从交换机投递到队列中失败时触发
         * 比如根据发送消息指定Routingkey找不到队列时触发
         * 使用该功能需要开启确认,spring-boot中配置如下:
         * spring.rabbitmq.publisher-returns = true
         */
        rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returned){//需要将接收失败的保存到DB中并且手动排错System.err.println("队列接收消息失败,消息被退回"+returned);}});}}
5、生产者
packageedu.hunan.rabbitmq.service;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;/**
 * 生产者
 */@ServicepublicclassDirectServiceImpl{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidmake(String message){//交换机名称String exchangeName ="direct_exchange";//如果有多个队列 通过routingkey 投递到指定的队列String routingkey ="direct1";//投递到交换机中System.err.println("生产者准备开始投递消息");
        rabbitTemplate.convertAndSend(exchangeName,routingkey,message);}}
6、消费者
packageedu.hunan.rabbitmq.service;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.messaging.handler.annotation.Headers;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Service;importjava.io.IOException;/**
 * 消费者
 */@Service@RabbitListener(queues ="direct_queue")publicclassConsumerDirectService{@RabbitHandlerpublicvoidreviceMessage(@PayloadStringMessage2,@HeadersChannel channel,Message message)throwsIOException,InterruptedException{System.err.println("测试异步不影响前端返回");/**
             * 接收失败设置重试  拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发 消息则丢失
             * 方式1 使用自带的重试机制并且控制重试次数  重试机制不能用try/catch否则会死循环 而是将异常抛出
             * 触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。
             * 方式2 将接收失败的消息使用死行队列接盘   死行队列+try/catch
             */try{//处理消息System.err.println("消费者消息接收成功-》"+Message2);//           确认消息已经消费成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){System.err.println("消费处理异常"+userId+"\t"+e);//             拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发/**
             * 接收失败设置重试  拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发 消息则丢失
             * 方式1 使用自带的重试机制并且控制重试次数  重试机制不能用try/catch否则会死循环 而是将异常抛出
             * 触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。
             * 方式2 将接收失败的消息使用死信队列接盘   死信队列+try/catch
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}}
标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/m0_60500421/article/details/132430218
版权归原作者 .小罗同学 所有, 如有侵权,请联系我们删除。

“RabbitMQ确认机制”的评论:

还没有评论