0


RabbitMQ(二) - RabbitMQ与消息发布确认与返回、消费确认

RabbitMQ消息确认

SpringBoot与RabbitMQ整合后,对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致;

消息发布确认

  • 生产者给交换机发送消息后、若是不管了,则会出现消息丢失; - 解决方案1: 交换机接受到消息、给生产者一个答复ack, 若生产者没有收到ack, 可能出现消息丢失,因此重新发送消息;
  • 解决方案1隐藏问题:若是交换机发送了ack, 出现网络延迟,则生产者没有收到ack, 就会出现消息重复发送问题, 进而衍生幂等性问题; - 隐藏问题解决方案1:在数据库中增加一张去重表,设置唯一索引; 生产者在消息内容中,翻入唯一ID,消费者消费时、先从数据库查询是否存在,存在则不处理该消息; - 适用于并发低、业务严谨的场景- 隐藏问题解决方案2:利用Redis的String的setnx,若key存在,则不处理、若key存在,则执行业务; - 适用于短时间处理大量消息,且 key不会重复; -这就是大名鼎鼎的幂等性问题,贼讨厌这些专有名词;

业务开发中的幂等性

  1. 前端保存数据时、点击多次保存按钮,插入多条数据; - 解决方案 :前端限制按钮点击、 数据库设置业务唯一索引;
  2. 消息推送中,可能出现多条内容一样的消息,又不可以重复处理 ,需要幂等性处理; 上家公司中,后台给app客户端推送系统消息时、配置给所有用户推送消息, 其他服务给我的应用消息服务推送 RabbitMQ消息, 正常来说, 每次推送的消息, 设备ID和用户ID合起来唯一的,结果其他服务业务数据存在问题,有些旧数据没有清除, 导致相通的设备ID,用户ID, 一次给设备用户推送了十几条,安卓客户端当当当的响, 直接惊动了产品经理; 经过排查上,是上游数据有问题,代码又很老,其他服务负责人排查了好几天, 把问题数据清楚了, 结果后面又产生了问题数据; - 解决方案:由于会一次性处理几万条推送消息,因此对业务要求速度高,因此利用Redis的String的setNx, 以taskId + mobileDevId + userId + tenantId 组成了唯一key,若是存在,则不处理; key有限时间为60分钟, 就成功处理了该问题;- 吐槽:上游的业务问题,让下游服务做业务保证,属实离谱;

{
“taskId” :“xxxx”;
“mobileDevId” : “xxxx”;
“userId”:“xxx”;
“tenantId” : “xxx”;
“其他字段”: “…”
}

RabbitMQ发布确认与返回

SpringBoot发布确认与返回

配置:
第二个参数因为过时,所以要配置第三个参数为correlated,表示用来确认消息;

#生产者
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated

生产者:

  1. 通过RabbitTempldate#setConfirmCallback设置确认回调, 即交换器发送给ack给生产者,生产者调用ConfirmCallback回调, 若出现异常cause,则可重新推送; 通过RabbitTempldate#setReturnCallback设置返回回调;
  2. 通过template#waitForConfirms(xxx)表示等待xxx毫秒后确认,超时返回false; - 若返回false, 则进行业务补救处理;
publicclassConfirmProducerimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@AutowiredprivateRabbitTemplate template;@AutowiredprivateDirectExchange confirmExchange;AtomicInteger index =newAtomicInteger(0);AtomicInteger count =newAtomicInteger(0);privatefinalString[] keys ={"sms","mail"};@Scheduled(fixedDelay =1000, initialDelay =500)publicvoidsend()throwsIOException{//短信String sms ="{userName: xxx; phone:xxx}";HashMap<String,Object> map =newHashMap<>();
        map.put("userName","hanxin");
        map.put("phone", index.getAndIncrement());
        template.setMandatory(true);
        template.setConfirmCallback(this);
        template.setReturnCallback(this);
        template.convertAndSend(confirmExchange.getName(),"confirm", map);System.out.println("send sms confirm");}@Scheduled(fixedDelay =1000, initialDelay =500)publicvoidsend2()throwsIOException{
        template.invoke((operations)->{//短信String sms ="{userName: xxx; phone:xxx}";HashMap<String,Object> map =newHashMap<>();
            map.put("userName","hanxin");
            map.put("phone", index.getAndIncrement());//必须设置
            template.setMandatory(true);
            template.convertAndSend(confirmExchange.getName(),"confirm", map);System.out.println("send sms confirm");return template.waitForConfirms(1000);});}@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){System.out.println("receive confirm callback, ack = "+ ack);}@OverridepublicvoidreturnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey){System.out.println("receive return call : message:"+ message.getBody());System.out.println("receive return call : replyCode:"+ replyCode);System.out.println("receive return call : replyText:"+ replyText);System.out.println("receive return call : exchange:"+ exchange);System.out.println("receive return call : routingKey:"+ routingKey);}}

业务开发中,非严谨,追求性能高业务建议使用send,这个过程是异步确认的;
严谨业务建议使用send2, 同步等待相应,出现问题好确认;

交换机:

@ConfigurationpublicclassConfirmConfig{publicfinalstaticStringCONFIRM_QUEUE_NAME="confirmQueue";publicfinalstaticStringCONFIRM_EXCHANGE_NAME="confirmExchange";publicfinalstaticStringCONFIRM_ROUTING_NAME="confirm";@BeanpublicDirectExchangeconfirmExchange(){returnnewDirectExchange(CONFIRM_EXCHANGE_NAME);}@BeanpublicConfirmProducerconfirmProducer(){returnnewConfirmProducer();}}

运行结果:

receive status : true

RabbitMQ发布确认

若是使用原生Rabbit MQ客户端API,则有三种方式:

  1. 声明channel是需要交换机确认的
channel.confirmSelect();
  1. 发布单条消息
for(int i =0; i <MESSAGE_COUNT; i++){String body =String.valueOf(i);
    channel.basicPublish("", queue,null, body.getBytes());
    channel.waitForConfirmsOrDie(5_000);}

channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。

官方说明了,其实channel底层是异步工作的,会将channel阻塞住,然后异步等待服务端发送一个确认消息,才解除阻塞。但是我们在使用时,可以把他当作一个同步工具来看待。利用一个异步转同步功能,可利用JUC实现;

然后如果到了超时时间,还没有收到服务端的确认机制,那就会抛出异常。然后通常处理这个异常的方式是记录错误日志或者尝试重发消息,但是尝试重发时一定要注意不要使程序陷入死循环。

  1. 发送批量消息 条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认
int batchSize =100;int outstandingMessageCount =0;long start =System.nanoTime();for(int i =0; i <MESSAGE_COUNT; i++){String body =String.valueOf(i);
                ch.basicPublish("", queue,null, body.getBytes());
                outstandingMessageCount++;if(outstandingMessageCount == batchSize){
                    ch.waitForConfirmsOrDie(5_000);
                    outstandingMessageCount =0;}}if(outstandingMessageCount >0){
                ch.waitForConfirmsOrDie(5_000);}

存在隐藏问题:若是500条消息处理太久,超时了,则响应失败,消息重新入队、出现重新消费问题;

  1. 异步确认消息 实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。核心代码就是一个:
channel.addConfirmListener(ConfirmCallback var1,ConfirmCallback var2);

这三种确认机制都能够提升Producer发送消息的安全性。通常情况下,第三种异步确认机制的性能是最好的。第一种安全性最高。

消费者确认

当交换机接受消息后,就要转发给消费者;如何保证消息不丢失?重复消费?

SpringBoot消费确认

自动确认
  • 若是业务中,一些消息发送给消费者,若是消息出现异常,消费者返回通知交换机消息出现了异常,交换机会将消息重新入队;
  • 若是没有确认消息,交换机没有收到消息,会将消息会重新放入队列中,每次消费者启动都会把以前消费的消息重新消费;
  • SpringBoot整合RabbitMQ后, 设置参数max-attempts为最大重试次数、retry.enabled为开启重试机制;

spring.rabbitmq.listener.direct.retry.max-attempts=5
spring.rabbitmq.listener.direct.retry.enabled=true

  • 为什么要开启重试? 不开启重试、消费者处理消息发生异常后, RabbitMQ会丢弃该消息, 通常业务开发中,是不允许的。
  • 为什么设置重试次数? 不开启重试次数,则消息会一直重新入队,占用内存,若是错误消息过多,RabbitMQ内存爆了;
手动确认

在手动确认的模式下,不管是消费成功还是消费失败,一定要记得确认消息,不然消息会一直处于unack状态,直到消费者进程重启或者停止。
设置参数:

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费者:
通过使用channel#basicAck, basicNack, basicReject完成;

@RabbitListener(queues =ConfirmConfig.CONFIRM_QUEUE_NAME)publicclassConfirmConsumer{//    @RabbitHandler : 标记的方法只能有一个参数,类型为String ,若是传Map参数、则需要传入map参数// @RabbitListener:标记的方法可以传入Channel, Message参数@RabbitListener(queues =ConfirmConfig.CONFIRM_QUEUE_NAME)publicvoidlistenObjectQueue(Channel channel,Message message,Map<String,Object> msg)throwsIOException{System.out.println("接收到object.queue的消息"+ msg);System.out.println("消息ID : "+ message.getMessageProperties().getDeliveryTag());try{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOException exception){//拒绝确认消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);//拒绝消息//            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}@RabbitHandlerpublicvoidlistenObjectQueue2(Map<String,Object> msg)throwsIOException{System.out.println("接收到object.queue的消息"+ msg);}}
  • 确认收到一个或多个消息- void basicAck(long deliveryTag, boolean multiple) throws IOException;- deliveryTag :消息传递标识- multiple:是否批量确认,为true,则确认后,其他消息deliveryTag小于当前消息的deliveryTag的消息全部变为确认;(慎重)
  • 拒绝一个或多个消息:- void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;- deliveryTag:消息的传递标识。- multiple: 如果为true,则拒绝所有consumer获得的小于deliveryTag的消息。(慎重)- requeue: 设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队列。
  • 拒绝一个消息:- void basicReject(long deliveryTag, boolean requeue) throws IOException;- deliveryTag:消息的传递标识。- requeue: 设置为false 表示不再重新入队,如果配置了死信队列则进入死信队列。
  • channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。

个人还是推荐手动确认,可控性更高;
SpringBoot消费者手动确认调用的API与RabbitMQClient原生API一致,都是通过这三个方法完成确认操作;

业务开发中,@RabbitHandler注解用的少,因为注解标记的方法只能传入 消息内容参数, 无法传Channel, Message, 获取到的消息有限, 而@RabbitListener则相反;

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/yaoyaochengxian/article/details/132107742
版权归原作者 问道玄霄 所有, 如有侵权,请联系我们删除。

“RabbitMQ(二) - RabbitMQ与消息发布确认与返回、消费确认”的评论:

还没有评论