目录
一、序言
在有些业务场景中,消息是不能丢的,比如分布式事务资金动账,出账方扣款,那么入账方就一定要收款。以前写了一篇分布式事务的文章,里面的跨地区转账就是一个实际案例。
消息是有可能丢的,比如生产者在发送消息时
broker
服务挂了,消息没有来得及落盘,这时消息就彻底丢了。
保证MQ消息可靠传输主要有两个方面,一方面是消息生产者确保消息一定发送成功,另一方面是消费者确保消息一定被处理。
二、生产者确保消息发送成功
1、为什么需要Publisher Confirms
在Spring AMQP中
AmqpTemplate
的实现
RabbitTemplate
已经支持 Publisher Confirms and Returns,所谓的publisher confirms意思就是消息发布者确认消息是否已经被发送。
在RabbitMQ官方文档描述中,持久化的消息在Broker重启时也是应该存活的,这里的词用的是应该,因为消息有可能在落地磁盘前Broker就挂了,导致消息丢失。
最直接的解决方案是通过事务,但是通过事务有两个问题:
- 事务阻塞:发布者必须等待Broker处理完每条消息。
- 事务很重:每次提交都会要求触发
fsync()
,强制磁盘,这个过程需要花很长的时间。
备注:在RabbitMQ官方测试中,通过事务去保证,发布10000条消息需要花至少4分钟的时间。
而通过
Publisher Confirm机制
,一旦Broker处理完就会确认消息,而且这个过程是异步的,生产者可以流式发布消息,不需要等待Broker,并且Broker会批量高效将消息落盘。
2、哪些消息会被确认处理成功
当Broker确认消息时,会通知消息发布者消息是否被成功处理,成功处理的基本规则如下:
- 无法路由的
mandatory
(必须有符合条件的队列)和immediate
(必须有消费者在线)类型在被basic.return
后会被确认。 - 非持久化消息在入队时会被确认。
- 持久化消息当持久化到磁盘或者被消费者消费时会被确认。
三、消费者保证消息被处理
消费者端确保消息消费很简单,关闭消息自动确认就好,开启消息手动确认。当然有些场景消息只能被处理一次,可以通过分布式锁来实现。
四、Spring RabbitMQ支持代码示例
1、 application.yml
server:port:8080spring:rabbitmq:addresses: localhost:5672username: admin
password: admin
virtual-host: /
publisher-returns:truepublisher-confirm-type: correlated
listener:type: simple
simple:acknowledge-mode: manual
concurrency:5max-concurrency:20prefetch:5template:mandatory:true
备注:
- 这里一定要设置
spring.rabbitmq.publisher-returns
为true,并且设置spring.rabbitmq.publisher-confirm-type
为correlated,同时设置spring.rabbitmq.template.mandatory
为true。- 上面我们将消费者的确认模式改为了手动确认。
2、RabbigtMQ配置
@ConfigurationpublicclassRabbitReliableTransportConfig{/**
* RabbitTemplate消息转换器配置,自动将对象转换为json字符串
*
* @return
*/@BeanpublicMessageConverterjackson2JsonMessageConverter(){Jackson2JsonMessageConverter messageConverter =newJackson2JsonMessageConverter();
messageConverter.setClassMapper(newDefaultJackson2JavaTypeMapper());return messageConverter;}@BeanpublicQueuereliableQueue(){returnQueueBuilder.durable("reliable-queue").build();}}
3、可靠生产者配置
@Slf4j@Component@RequiredArgsConstructorpublicclassRabbitMqReliableProducer{privatefinalRabbitTemplate rabbitTemplate;publicvoidsendReliableMsg(String body){// 发送可靠消息ReliableMsgDTO reliableMsgDTO =ReliableMsgDTO.builder().body(body).build();CorrelationData correlationData =newCorrelationData();
rabbitTemplate.convertAndSend("reliable-queue", reliableMsgDTO, correlationData);// 发送确认逻辑CompletableFuture<Confirm> future = correlationData.getFuture().completable();
future.whenComplete((confirm, throwable)->{if(confirm.isAck()){
log.info("消息已经被成功发送, 消息内容:{}",JSON.toJSONString(reliableMsgDTO));return;}
log.warn("消息发送未成功发送, 原因:{}, 消息内容:{}", confirm.getReason(),JSON.toJSONString(reliableMsgDTO), throwable);// 5秒后再发送LockSupport.parkNanos(5*1000*1000*1000L);
rabbitTemplate.convertSendAndReceive(reliableMsgDTO, correlationData);});}}
4、可靠消费者配置
@Slf4j@ComponentpublicclassRabbitMQReliableConsumer{@RabbitListener(queues ="reliable-queue")publicvoidhandleMsgFromQueue(ReliableMsgDTO reliableMsgDTO,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long tag)throwsIOException{
channel.basicAck(tag,false);// channel.basicNack(tag, false, false);
log.info("Message received from queue, message body: {}",JSON.toJSONString(reliableMsgDTO));}}
备注:这里我们开启了消息的手动确认,如果消息处理失败没有确认,那么消息将会在下次消费者参加连接时再次被投递。
5、测试用例
测试结果如下,每当消息发送至Broker成功后会触发回调,如果消息发送失败将会触发重新发送。
2024-01-2018:13:11.399INFO12316---[78.107.127:5672] c.u.r.i.p.RabbitMqReliableProducer : 消息已经被成功发送,消息内容:{"body":"hello"}2024-01-2018:13:11.399INFO12316---[ntContainer#0-5] c.u.r.i.c.RabbitMQReliableConsumer : Message received from queue, message body:{"body":"hello"}
版权归原作者 凌波漫步& 所有, 如有侵权,请联系我们删除。