0


Rabbitmq之ConfirmCallback与ReturnCallback使用

文章目录

前言

在工作中使用Rabbitmq传输数据时,可能会因为数据、网络等问题,导致数据发送或者接收失败;

如果对此类问题没有做好处理,就会存在丢失数据的问题,为此,引入了

ConfirmCallback

ReturnCallback

,来保证系统能够做到更好的数据监听、以及消费失败的数据做好相应的补偿;

ConfirmCallback

ReturnCallback

也被称为Rabbitmq的消息确认机制;

有哪些问题

首先,下面为消息从生产者 ——> 消费者的流程图:
Rabbitmq之ConfirmCallback与ReturnCallback使用

不过如果应用到生产环境中会出现两个问题:

  1. 生产者发出的消息可能因为种种原因,并没有发送到交换器,而生产者却不知道;
  2. 交换器接收到的消息,并没有发送到队列中,而生产者却不知道;

如何解决

为了解决以上两个问题,系统引入了

ConfirmCallback

ReturnCallback

  • ConfirmCallback为发送Exchange(交换器)时回调,成功或者失败都会触发;
  • ReturnCallback为路由不到队列时触发,成功则不触发;

也就是说,前者是为了监听消息**是否到达了

Exchange

,后者是为了监听消息是否到达了队列**,如果这两个步骤遇到了问题,则生产者也好做出相应处理(例如:消息补偿,不过这并不是本篇的重点);

如果消息在消费端消费失败了怎么办?

失败就失败了,在实际场景中,数据库是需要为发送成功的消息做标记的,如果消息没有做标记(消费失败),则会采用定时任务重新发送,不过会涉及到幂等性的问题,这里会另起一篇文章:基于RabbitMQ实现最终一致性解决方案,在此不再赘述;

Demo

注入回调

@PostConstructpublicvoidinit(){//消息未送达队列触发回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
            log.error("消息发送失败,未送达队列,message:{},replyCode:{},replyText:{},exchange:{},exchange:{}", JSON.toJSONString(message), replyCode, replyText, exchange, routingKey);MqMsg msg = JSON.parseObject(newString(message.getBody()),MqMsg.class);// 更新数据库 设置消息的状态为发送失败});//消息进入到Exchange触发回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{String id =Objects.requireNonNull(correlationData.getId());if(!ack){
                log.error("消息未发送成功,返回信息:{}", cause);//设置消息的状态为发送失败}else{// 更新数据库 设置消息的状态为发送成功}});}

生产者

@ResponseBody@GetMapping("/send")publicStringsend(){UserVo userVo =newUserVo();//组装消息内容MessageProperties properties =newMessageProperties();//消息唯一ID,用力防止幂等性
    properties.setMessageId(userVo.getId().toString());Message message =newMessage(JSON.toJSONString(userVo).getBytes(StandardCharsets.UTF_8), properties);// 发送消息时,需要根据业务设置唯一id,发送方确认时,还需要使用唯一id去修改数据状态
    rabbitTemplate.convertAndSend("demoExchange","demoRoutingKey", message);return"发送成功";}

消费者

@Slf4j@Component@RabbitListener(queues ="demo_data_queue")publicclassHelloReceiver{int status =0;@RabbitHandlerpublicvoidprocess(JSONObject jsonObject,Channel channel,Message message)throwsException{// 单条消息的大小限制,一般设为0或不设置,不限制大小int prefecthSize =0;// 不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack 注意在自动应答下不生效int prefecthCount =1;// 表示是否应用于channel上,即是channel级别还是consumer级别boolean global =false;

        channel.basicQos(prefecthSize,prefecthCount,global);

        log.info("收到消息:{}", jsonObject);Thread.sleep(10000);try{
            log.info("message:{}", message.getMessageProperties().getDeliveryTag());}catch(Exception e){
            status =1;
            e.printStackTrace();
            log.info("message:{}", message.getMessageProperties().getDeliveryTag());}finally{// 在这里执行成功或失败if(status ==0){//成功消费消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}elseif(status ==1){//丢弃这条消息,如果最后一个参数设置为true的话,消息将重回队列末尾,重复消费
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}}}}

哪里不清晰的朋友欢迎留言

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/xianyun1992/article/details/117914415
版权归原作者 素人岳 所有, 如有侵权,请联系我们删除。

“Rabbitmq之ConfirmCallback与ReturnCallback使用”的评论:

还没有评论