rabbitMQ在发送消息时,会出现交换机不存在(交换机名字写错等消息),这种情况如何会退给生产者重新处理?【交换机层】
生产者发送消息时,消息未送达到指定的队列,如何消息回退?
核心:对类RabbitTemplate.ConfirmCallback 和RabbitTemplate.ReturnCallback的重写。
RabbitTemplate.ConfirmCallback:交换机在收到消息或者没收到消息时会被触发
RabbitTemplate.ReturnCallback:消息进入交换机,不能达到指定目的地时被出发。
开启交换机确认
开启消息不可达回退
配置文件不开启 这两项
spring:rabbitmq:# 交换机进行确认消息publisher-confirm-type: correlated
# 交换机不可以路由消息时 消息回退publisher-returns:true
配置类声明
packagecom.esint.configs;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* 发布确认
*
*/@ConfigurationpublicclassConfirmConfig{//交换机publicstaticfinalStringCONFIRM_EXCHANGE="confirm.exchange";//队列publicstaticfinalStringCONFIRM_QUEUE="confirm.queue";//routing-keypublicstaticfinalStringCONFIRM_ROUTING_KEY="key1";//声明 交换机@Bean("confirmExchange")publicDirectExchangeconfirmExchange(){returnnewDirectExchange(CONFIRM_EXCHANGE);}//声明 队列@Bean("confrimQueue")publicQueueconfrimQueue(){returnQueueBuilder.durable(CONFIRM_QUEUE).build();}//绑定@BeanpublicBindingqueueBindingExchange(@Qualifier("confrimQueue")Queue confrimQueue,@Qualifier("confirmExchange")DirectExchange confirmExchange){returnBindingBuilder.bind(confrimQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}}
消费者:
packagecom.esint.controller;importcom.esint.configs.ConfirmConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@Slf4j@RestController@RequestMapping("/confirm")publicclassProducerController{@AutowiredprivateRabbitTemplate rabbitTemplate;//发消息@GetMapping("/sendMessage/{message}")publicvoidsendMessage(@PathVariableString message){//普通发送模式 无是否发送成功回调CorrelationData correlationData =newCorrelationData("101");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_ROUTING_KEY+"123",message);
log.info("发送消息为:{}",message);}}
消费者:
packagecom.esint.consumer;importcom.esint.configs.ConfirmConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassConsumer{@RabbitListener(queues =ConfirmConfig.CONFIRM_QUEUE)publicvoidreceiveConfrimMessage(Message message){
log.info("接收到的消息为:"+newString(message.getBody()));}}
核心修改的重写的类:
packagecom.esint.consumer;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;@Slf4j@ComponentpublicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{/**
* 注入:本类为实现了RabbitTemplate的内部类,所以在RabbitTemplate发送消息的时候不会调用到我们自己的实现,所以需要把这个类在注入到RabbitTemplate中。
*/@AutowiredprivateRabbitTemplate rabbitTemplate;@PostConstructpublicvoidinit(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);}/**
* RabbitTemplate.ConfirmCallback 是在【生产者】发送【交换机】 交换机的感知回应调去方法
*
* 交换机确认回调方法
* 1.交换机接收消息成功
* 参数1 correlationData保存了回调消息ID和相关信息
* 参数2 交换机收到消息 true
* 参数3 失败原因 为 null
* 2.交换机接受消息失败
* 参数1 correlationData保存了回调消息ID和相关信息
* 参数2 交换机收到消息 false
* 参数3 失败原因
* @param correlationData 来源于生产者 所以在发消息时 需要带有这个属性
* @param ack
* @param cause
*/@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){String id = correlationData !=null? correlationData.getId():"";if(ack){
log.info("交换机确认收到 ID:{}",id);}else{
log.info("交换机未收到ID:{}的消息,原因:{}",id,cause);//这里实现发送交换机失败的存储逻辑}}/**
* 回退消息
* 在消息传递过程不可达目标地时 返还给生产者 只有消息不可达,才会执行这个方法
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/@OverridepublicvoidreturnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey){
log.error("消息{} 被交换机{} 退回,原因:{} 路由:{}",newString(message.getBody()),exchange,replyText,routingKey);//这里实现发送消息不到达的逻辑 发送消息无法被逻辑 默认就会被交换机丢掉 这里重写后 可以在这里处理存储}}
故意发送一个错误路由时:
消息能发出 交换机有确认 消息可以被回退
版权归原作者 溜达的大象 所有, 如有侵权,请联系我们删除。