✨ RabbitMQ:发布确认高级
📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件
有时候因为一些问题,会导致RabbitMQ重启,在重启期间,生产者消息投递失败,导致消息丢失,需要手动处理和恢复,我们需要保证消息的可靠传递。
1.发布确认
1.1发布确认机制方案
1.2全局配置文件
在application.properties全局配置文件中添加spring.rabbitmq.publish-confirm-type属性,这个属性有以下几种值
- none:禁用发布确认模式(默认)0
- correlated:发布消息成功到交换机后会触发回调方法
- simple:有两种效果 - 第一种效果是和correlated一样会触发回调方法- 第二种效果是在发布消息成功以后使用rabbitTemplate调用waitForConfirms或者waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判单下一步的逻辑- waitForConfirmsOrDie方法如果返回false则会关闭信道,那么接下来就无法发送消息到broker
# RabbitMQ/配置
#服务器地址
spring.rabbitmq.host=192.168.88.136
#服务端口号
spring.rabbitmq.port=5672
#虚拟主机名称
spring.rabbitmq.virtual-host=/myhost
#用户名
spring.rabbitmq.username=admin
#密码
spring.rabbitmq.password=123456
#设置生产者发布确认模式
spring.rabbitmq.publisher-confirm-type=correlated
1.3配置类
packagecom.zyh.config;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* @author zengyihong
* @create 2022--10--06 10:06
*/@ConfigurationpublicclassConfirmConfig{//确认交换机publicstaticfinalString CONFIRM_EXCHANGE ="confirm_exchange";//确认队列publicstaticfinalString CONFIRM_QUEUE ="confirm_queue";//路由keypublicstaticfinalString CONFIRM_ROUTING_KEY ="key1";/**
* 声明确认交换机
*
* @return
*/@BeanpublicDirectExchangeconfirmExchange(){returnnewDirectExchange(CONFIRM_EXCHANGE);}/**
* 声明确认队列
*
* @return
*/@BeanpublicQueueconfirmQueue(){returnQueueBuilder.durable(CONFIRM_QUEUE).build();}/**
* 把确认交换机和确认队列进行绑定
* @param queue
* @param exchange
* @return
*/@BeanpublicBindingqueueBinding(@Qualifier("confirmQueue")Queue queue,@Qualifier("confirmExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);}}
1.4生产者
packagecom.zyh.controller;importcom.zyh.config.ConfirmConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.Resource;importjava.util.Date;/**
* @author zengyihong
* @create 2022--10--06 10:15
*/@Slf4j@RestController@RequestMapping("/confirm")publicclassConfirmController{@ResourceprivateRabbitTemplate rabbitTemplate;/**
* 生产者发送消息
*
* @param message
*/@GetMapping("/sendConfirmMessage/{message}")publicvoidsendMessage(@PathVariableString message){
log.info("生产者发送消息:{}",message);
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_ROUTING_KEY, message);}}
1.5消费者
packagecom.zyh.consumer;importcom.zyh.config.ConfirmConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.UnsupportedEncodingException;/**
* @author zengyihong
* @create 2022--10--06 10:20
*/@Slf4j@ComponentpublicclassConfirmConsumer{@RabbitListener(queues =ConfirmConfig.CONFIRM_QUEUE)publicvoidreceiveConfirmMessage(Message message){try{//获取消息String msg =newString(message.getBody(),"UTF-8");//记录日志
log.info("消费者接收到确认队列中的消息:{}"+msg);}catch(UnsupportedEncodingException e){
e.printStackTrace();}}}
1.6测试
正常运行结果如图所示,如果rabbitmq出现故障的话,那么结果是不会显示出来的,我们可以通过回调接口来监测运行结果
1.7回调接口
packagecom.zyh.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjavax.annotation.Resource;/**
* @author zengyihong
* @create 2022--10--06 10:36
*/@Slf4j@ComponentpublicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback{@ResourceprivateRabbitTemplate rabbitTemplate;//依赖注入rabbitTemplate之后再设置它的回调对象@PostConstructpublicvoidinit(){//把当前类MyCallBack实现类注入到RabbitTemplate中确认回调接口中
rabbitTemplate.setConfirmCallback(this);}/**
* 不管交换机有没有接收到消息,都会执行这个回调方法
* @param correlationData
* @param ack
* @param cause
*/@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){//获取消息idString id = correlationData !=null? correlationData.getId():"";//判断交换机是否接收到消息if(ack){
log.info("交换机已经收到id为{}的消息", id);}else{
log.info("交换机还没有收到id为{}的消息,原因是{}", id, cause);}}}
1.8改写生产者代码
packagecom.zyh.controller;importcom.zyh.config.ConfirmConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.Resource;/**
* @author zengyihong
* @create 2022--10--06 10:15
*/@Slf4j@RestController@RequestMapping("/confirm")publicclassConfirmController{@ResourceprivateRabbitTemplate rabbitTemplate;/**
* 生产者发送消息
*
* @param message
*/@GetMapping("/sendConfirmMessage/{message}")publicvoidsendMessage(@PathVariableString message){//指定消息id为1的数据CorrelationData correlationData1 =newCorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData1);CorrelationData correlationData2 =newCorrelationData("2");//key2是一个不存在的路由key
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,"key2", message,correlationData2);
log.info("生产者发送消息:{}",message);}}
1.9测试
交换机收到两条信息,但是消费者只能消费一条消息,因为第二条消息的路由key和交换机的binding key不一样,也没有其他队列可以接收这条消息,所以就被丢弃了。
2.回退消息
2.1Mandatory参数
如果我们仅仅开启了生产者确认机制,那么当交换机接收到消息以后,会直接给生产者发送确认消息,但是如果发现消息不可以路由,就会直接把消息丢弃,此时消费者接收不到消息,而且这个时候生产者也不知道消息被丢弃了,这样就导致消息丢失。我们可以通过设置mandatory参数,使得消息在传递过程中出现不可到达的目的地的时候可以把消息返回给生产者
2.2在全局配置文件中开启回退消息
# RabbitMQ/配置
#服务器地址
spring.rabbitmq.host=192.168.88.136
#服务端口号
spring.rabbitmq.port=5672
#虚拟主机名称
spring.rabbitmq.virtual-host=/myhost
#用户名
spring.rabbitmq.username=admin
#密码
spring.rabbitmq.password=123456
#设置生产者发布确认模式
spring.rabbitmq.publisher-confirm-type=correlated
#开启消息回退
spring.rabbitmq.publisher-returns=true
2.3生产者
packagecom.zyh.controller;importcom.zyh.config.ConfirmConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.Resource;/**
* @author zengyihong
* @create 2022--10--06 10:15
*/@Slf4j@RestController@RequestMapping("/confirm")publicclassConfirmController{@ResourceprivateRabbitTemplate rabbitTemplate;/**
* 生产者发送消息
*
* @param message
*/@GetMapping("/sendConfirmMessage/{message}")publicvoidsendMessage(@PathVariableString message){//指定消息id为1的数据CorrelationData correlationData1 =newCorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData1);CorrelationData correlationData2 =newCorrelationData("2");//key2是一个不存在的路由key
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,"key2", message,correlationData2);
log.info("生产者发送消息:{}",message);}}
2.4回调接口
packagecom.zyh.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjavax.annotation.Resource;/**
* @author zengyihong
* @create 2022--10--06 10:36
*/@Slf4j@ComponentpublicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{@ResourceprivateRabbitTemplate rabbitTemplate;//依赖注入rabbitTemplate之后再设置它的回调对象@PostConstructpublicvoidinit(){//把当前类MyCallBack实现类注入到RabbitTemplate中确认回调接口中
rabbitTemplate.setConfirmCallback(this);//把当前类MyCallBack实现类注入到RabbitTemplate中消息回退接口中
rabbitTemplate.setReturnsCallback(this);}/**
* 不管交换机有没有接收到消息,都会执行这个回调方法
* @param correlationData
* @param ack
* @param cause
*/@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){//获取消息idString id = correlationData !=null? correlationData.getId():"";//判断交换机是否接收到消息if(ack){
log.info("交换机已经收到id为{}的消息", id);}else{
log.info("交换机还没有收到id为{}的消息,原因是{}", id, cause);}}@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
log.error("消息{}----->被交换机{}退回,退回原因:{},路由key:{}",newString(returnedMessage.getMessage().getBody()),
returnedMessage.getExchange(),
returnedMessage.getReplyText(),
returnedMessage.getRoutingKey());}}
2.5消费者
packagecom.zyh.consumer;importcom.zyh.config.ConfirmConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.UnsupportedEncodingException;/**
* @author zengyihong
* @create 2022--10--06 10:20
*/@Slf4j@ComponentpublicclassConfirmConsumer{@RabbitListener(queues =ConfirmConfig.CONFIRM_QUEUE)publicvoidreceiveConfirmMessage(Message message){try{//获取消息String msg =newString(message.getBody(),"UTF-8");//记录日志
log.info("消费者接收到确认队列中的消息:{}",msg);}catch(UnsupportedEncodingException e){
e.printStackTrace();}}}
2.6测试
3.备份交换机
Rabbitmq——备份交换机
版权归原作者 不断前进的皮卡丘 所有, 如有侵权,请联系我们删除。