文章目录
前言
在工作中使用Rabbitmq传输数据时,可能会因为数据、网络等问题,导致数据发送或者接收失败;
如果对此类问题没有做好处理,就会存在丢失数据的问题,为此,引入了
ConfirmCallback
与
ReturnCallback
,来保证系统能够做到更好的数据监听、以及消费失败的数据做好相应的补偿;
ConfirmCallback
与
ReturnCallback
也被称为Rabbitmq的消息确认机制;
有哪些问题
首先,下面为消息从生产者 ——> 消费者的流程图:
不过如果应用到生产环境中会出现两个问题:
- 生产者发出的消息可能因为种种原因,并没有发送到交换器,而生产者却不知道;
- 交换器接收到的消息,并没有发送到队列中,而生产者却不知道;
如何解决
为了解决以上两个问题,系统引入了
ConfirmCallback
与
ReturnCallback
:
ConfirmCallback
为发送Exchange
(交换器)时回调,成功或者失败都会触发;ReturnCallback
为路由不到队列时触发,成功则不触发;
也就是说,前者是为了监听消息**是否到达了
Exchange
,后者是为了监听消息是否到达了队列**,如果这两个步骤遇到了问题,则生产者也好做出相应处理(例如:消息补偿,不过这并不是本篇的重点);
如果消息在消费端消费失败了怎么办?
失败就失败了,在实际场景中,数据库是需要为发送成功的消息做标记的,如果消息没有做标记(消费失败),则会采用定时任务重新发送,不过会涉及到幂等性的问题,这里会另起一篇文章:基于RabbitMQ实现最终一致性解决方案,在此不再赘述;
Demo
注入回调
@PostConstructpublicvoidinit(){//消息未送达队列触发回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
log.error("消息发送失败,未送达队列,message:{},replyCode:{},replyText:{},exchange:{},exchange:{}", JSON.toJSONString(message), replyCode, replyText, exchange, routingKey);MqMsg msg = JSON.parseObject(newString(message.getBody()),MqMsg.class);// 更新数据库 设置消息的状态为发送失败});//消息进入到Exchange触发回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{String id =Objects.requireNonNull(correlationData.getId());if(!ack){
log.error("消息未发送成功,返回信息:{}", cause);//设置消息的状态为发送失败}else{// 更新数据库 设置消息的状态为发送成功}});}
生产者
@ResponseBody@GetMapping("/send")publicStringsend(){UserVo userVo =newUserVo();//组装消息内容MessageProperties properties =newMessageProperties();//消息唯一ID,用力防止幂等性
properties.setMessageId(userVo.getId().toString());Message message =newMessage(JSON.toJSONString(userVo).getBytes(StandardCharsets.UTF_8), properties);// 发送消息时,需要根据业务设置唯一id,发送方确认时,还需要使用唯一id去修改数据状态
rabbitTemplate.convertAndSend("demoExchange","demoRoutingKey", message);return"发送成功";}
消费者
@Slf4j@Component@RabbitListener(queues ="demo_data_queue")publicclassHelloReceiver{int status =0;@RabbitHandlerpublicvoidprocess(JSONObject jsonObject,Channel channel,Message message)throwsException{// 单条消息的大小限制,一般设为0或不设置,不限制大小int prefecthSize =0;// 不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack 注意在自动应答下不生效int prefecthCount =1;// 表示是否应用于channel上,即是channel级别还是consumer级别boolean global =false;
channel.basicQos(prefecthSize,prefecthCount,global);
log.info("收到消息:{}", jsonObject);Thread.sleep(10000);try{
log.info("message:{}", message.getMessageProperties().getDeliveryTag());}catch(Exception e){
status =1;
e.printStackTrace();
log.info("message:{}", message.getMessageProperties().getDeliveryTag());}finally{// 在这里执行成功或失败if(status ==0){//成功消费消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}elseif(status ==1){//丢弃这条消息,如果最后一个参数设置为true的话,消息将重回队列末尾,重复消费
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}}}}
哪里不清晰的朋友欢迎留言
版权归原作者 素人岳 所有, 如有侵权,请联系我们删除。