0


Springboot+RabbitMQ+ACK机制(生产方确认(全局、局部)、消费方确认)、知识盲区

RabbitMQ–了解中间件、常用的中间件、分布式系统使用中间件、Docker安装rabbitmq及遇到的问题、RabbitMQ核心组成、消息模式
Springboot整合RabbitMQ(Fanout、Direct、Topic模式)、设置队列信息TTL、死信队列、RabbitMQ磁盘监控,内存控制
Springboot+Rabbitmq消费者注解详解、改序列化方式
Docker简易部署RabbitMQ集群、分布式事务解决方案+案例(可靠生产、可靠消费)
Springboot+RabbitMQ+ACK机制(生产方确认(全局、局部)、消费方确认)、知识盲区

消息确认机制

前言

为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了

消息确认机制

(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

采用消息确认机制,只要将自动ack设置为false,消费者有足够的时间处理消息数据,消费者可以在处理完后续的业务逻辑后再进行提交ack,确保消息确实是被消费了,防止服务宕机可能导致的消息丢失。而MQ会一直等待消费者手动提交ack!!

在rabbitmq管理页面上可以详细看到队列中的消息情况:

在这里插入图片描述

  • ready: 队列中存在的消息,可以提供给消费者的消息数量
  • Unacked: 表示是发送给消费者了,但是消费者还未将ack反馈给MQ的消息数量(一般只有设置了手动ack时,当消费者获取到消息时才会有值)

盲区

自动ACK:

消费者

配置中如果是自动ack机制,

MQ将消息发送给消费者后直接就将消息给删除了

,这个的前提条件是消费者程序没有出现异常,如果消费者接收消息后处理时出现异常,那么MQ将会尝试重发消息给消费者直至达到了

消费者服务

中配置的最大重试次数后将会直接抛出异常不再重试。

手动ACK:

消费者

设置了手动ACK机制后,可以显式的提交/拒绝消息(这一步骤叫做发送ACK),如果消息被消费后正常被提交了ack,那么此消息可以说是流程走完了,然后MQ将此消息从队列中删除。而如果消息被消费后被拒绝了,消费者可选择让MQ重发此消息或者让MQ直接移除此消息。后面可以使用死信队列来进行接收这些被消费者拒绝的消息,再进行后续的业务处理。

认知

RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认。

其中发送方确认又分为:生产者到交换机到确认、交换机到队列的确认。(借用下大佬的图)

在这里插入图片描述

实践

发送方确认

  • ConfirmCallback()方法,是一个回调方法,生产者将消息发送给Broker(RabbitMQ服务),然后Broker给回调生产者的ConfirmCallback()方法告知生产者消息是否接收到。也就是确认消息是否正常到达 Exchange 中。# 我们需要在生产者中添加配置,表示开启发布者确认(注意新旧版本)spring.rabbitmq.publisher-confirm-type=correlated # 新版本spring.rabbitmq.publisher-confirms=true # 老版本
  • ReturnCallback()方法同样是一个回调方法,是交换机和队列之间的消息确认方式。启动消息失败返回,此方法是在交换器路由不到队列时触发回调,这个可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了# 在生产者中配置,表示发布者返回spring.rabbitmq.publisher-returns=true

使用

application.yml

spring:rabbitmq:host: 127.0.0.1
    port:5672username: admin
    password: admin
    # 消息确认(ACK)publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)publisher-returns:true#确认消息已发送到队列(Queue)

理解:springboot中需要给RabbitTemplate设置一些方法的回调即可。

通常情况下我们可以直接在配置类中设置好这些东西,但是可能由于某些业务需求,并不是所有的消息都使用常用的方式,也可以将我们的消息发送服务实现接口然后重写这些回调。

配置类方式(全局方式):

@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    rabbitTemplate.setMandatory(true);//确认消息送到交换机(Exchange)回调
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{System.out.println("\n确认消息送到交换机(Exchange)结果:");System.out.println("相关数据:"+ correlationData);System.out.println("是否成功:"+ ack);System.out.println("错误原因:"+ cause);});//确认消息送到队列(Queue)回调
    rabbitTemplate.setReturnsCallback(returnedMessage ->{System.out.println("\n确认消息送到队列(Queue)结果:");System.out.println("发生消息:"+ returnedMessage.getMessage());System.out.println("回应码:"+ returnedMessage.getReplyCode());System.out.println("回应信息:"+ returnedMessage.getReplyText());System.out.println("交换机:"+ returnedMessage.getExchange());System.out.println("路由键:"+ returnedMessage.getRoutingKey());});return rabbitTemplate;}

以接口的形式访问发送一下。注意:确认消息送到队列(Queue)回调,只有在出现错误时才回调。

在这里插入图片描述

发送服务类实现(局部方式)

将发送的服务类实现接口,实现回调

@ServicepublicclassSendMessageServiceimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{privatestaticLogger logger =LoggerFactory.getLogger(SendMessageService.class);@AutowiredpublicRabbitTemplate rabbitTemplate;publicvoidsendMessage(String str){
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setConfirmCallback(this);// CorrelationData构造函数中的id可以随便写,但是必须要非null而且是唯一的
        rabbitTemplate.convertAndSend("exchange","routingKey", str,newCorrelationData(UUID.randomUUID().toString()));}@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){System.out.println("sender return success"+ returnedMessage.toString());}@Overridepublicvoidconfirm(CorrelationData correlationData,boolean b,String s){
        logger.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");if(!b){
            logger.error("消息发送异常!");// 进行处理}else{
            logger.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), b, s);}}}

接口方式访问下。没问题

在这里插入图片描述

需要注意的是:配置类方式和局部方式只能选择其一,如果一个RabbitTemplate设置了两个或者多个ConfirmCallback/ReturnCallback,会报错的不支持。类似这样的报错:Only one ConfirmCallback/ReturnCallback is supported by each RabbitTemplate。在开发过程中需要注意!!

接收方确认

消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理,比如重新发送或者丢弃。

RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。

消息确认模式有:

  1. AcknowledgeMode.NONE:自动确认。
  2. AcknowledgeMode.AUTO:根据情况确认。
  3. AcknowledgeMode.MANUAL:手动确认。

消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。

  • Basic.Ack 命令:用于确认当前消息。
  • Basic.Nack 命令:用于否定当前消息(批量拒绝) 。
  • Basic.Reject 命令:用于拒绝当前消息(单量拒绝)。

配置,注意是simple模式的ack还是direct模式,或者两个都设置上

server:port:9000spring:rabbitmq:username: admin
    password: admin
    virtual-host: /
    listener:simple:acknowledge-mode: manual
      direct:acknowledge-mode: manual
    # 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了addresses: 192.168.0.101:5672,192.168.0.101:5673,192.168.0.101:5673

消费者接收数据

其中在调用basiAck或basicNack时必须要携带一个tag,它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。而在接收者方法上使用

@Header(AmqpHeaders.DELIVERY_TAG)

可以直接获取到这个tag。

@ComponentpublicclassMQConsumer{@AutowiredprivateDispatcherService dispatcherService;@RabbitListener(queues ="order.queue")publicvoidmessageConsumer(String orderMsg,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long tag)throwsException{try{System.out.println("消息:"+ orderMsg);JSONObject order =JSONObject.parseObject(orderMsg);String orderId = order.getString("orderId");// 派单处理
            dispatcherService.dispatch(orderId);System.out.println(1/0);// 出现异常// 手动确认
            channel.basicAck(tag,false);}catch(Exception e){// 如果出现异常的情况下 根据实际情况重发// 重发一次后,丢失// 参数1:消息的tag// 参数2:多条处理// 参数3:重发// false 不会重发,会把消息打入到死信队列// true 重发,建议不使用try/catch 否则会死循环// 手动拒绝消息
            channel.basicNack(tag,false,false);}}}

扩展

消息的接收者也可使用普通类实现

ChannelAwareMessageListener

接口,重写方法完成,这种是直接全局性接收的。没有最好的,只有最合适的,根据项目情况选择全局接收还是单个类接收自己监听的。

/**
 * 接收者
 *
 **/@ComponentpublicclassConsumerimplementsChannelAwareMessageListener{@OverridepublicvoidonMessage(Message message,Channel channel)throwsException{long deliveryTag = message.getMessageProperties().getDeliveryTag();try{if("queue_name".equals(message.getMessageProperties().getConsumerQueue())){System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());System.out.println("接收消息: "+newString(message.getBody(),"UTF-8"));System.out.println("执行queue_name中的消息的业务处理流程......");}if("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){System.out.println("消费的消息来自的队列名为:"+ message.getMessageProperties().getConsumerQueue());System.out.println("接收消息: "+newString(message.getBody(),"UTF-8"));System.out.println("执行fanout.A中的消息的业务处理流程......");}// 手动提交ack,并且批量确认消息
            channel.basicAck(deliveryTag,true);}catch(Exception e){
            e.printStackTrace();/**
             * 拒绝消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             */
            channel.basicReject(deliveryTag,true);}}}

参考

参考自

RabbitMQ消息确认机制(ACK)

springboot rabbitmq ACK手动确认


本文转载自: https://blog.csdn.net/weixin_45248492/article/details/126208554
版权归原作者 鸢尾の 所有, 如有侵权,请联系我们删除。

“Springboot+RabbitMQ+ACK机制(生产方确认(全局、局部)、消费方确认)、知识盲区”的评论:

还没有评论