RabbitMQ-高级
消息队列的高级使用:主要是考虑处理实际使用过程各种异常情况,包括但不限于:
- 消息可靠性
- 重复消费 - 保证消息可靠,必然会对同一条消息进行多次发送、备份,此时需要保证消息的幂等性
消息可靠性:
- 消息的传递过程中,共有三个参与方 - 发送者- MQ- 消费者
- MQ与任意参与方直接的消息通过网络进行传递 - 因为网络中断、不稳定、延迟等,消息可能出现丢失
- MQ的消息默认保存在内存中(
3.1.2
版本前),突发断电等情况可能出现消息丢失 - 实际开发过程中,部分重要消息是万万不能丢失的🙅
需要注意的是:
- 发现了解MQ保证消息可靠性的理念、实现很有必要 - 实际开发过程中往往需要调用第三方服务 - 同样也会遇到调用失败的问题- 有时,调用第三方服务的过程还是异步执行的,- 如何确保调用成功,并成功处理响应,MQ的实现可以作为一个参考 或者也可以直接引入MQ,解耦调用过程
可靠性
保证消息不丢失,需要从三方面考虑:
- 发送者
- MQ
- 消费者
发送者可靠性
实际上,开启本功能耗费性能比较高,同时MQ和网络也远远没有想象的那么脆弱
- 需要根据业务对数据可靠性的要求,按需开启 绝对正确的废话~
发送者重连
- 保证发送者与MQ连接正常
发送者确认
- 保证发送者将消息发送给MQ
发送者重连
有时因为网络波动原因,可能会出现连接MQ失败的情况,可以通过配置开启连接失败后的重连机制
配置文件:
spring:rabbitmq:connection-timeout: 1s # MQ的连接超时时间template:retry:enabled:true# 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier:1# 失败后下次的等待时长倍数max-attempts:3# 最大重试次数
- 下次等待时长
0 -> 2 -> 5
-initial-interval * multiplier
- 重连过程是阻塞的,需要合理进行配置,否则会影响其他业务性能
- 必须保证可靠性时:可以异步执行发送消息的业务
发送者确认
SpringAMQP提供了
Publisher Confirm【消息发送到MQ】、Publisher Return【消息】
两种确认机制。当发送者发送消息给MQ后,MQ会返回确认结构给发送者,有以下情况:
- 返回ACK: - 消息投递到了MQ,但是路由失败(比如交换机未配置路由,这是程序员的失误)。此时通过
Publisher Return
返回路由异常的原因,然后返回ACK
,告知投递成功- 临时消息【不需要持久化】投递到了MQ,并且入队成功,返回ACK
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK
- 其他情况都会返回
NACK
,告知投递失败
spring:rabbitmq:publisher-confirm-type: correlated
publisher-returns:true
- 确认机制有三种类型: - none:关闭确认机制- simple:同步阻塞等待MQ回执消息- correlated:MQ异步调用方式返回回执消息
组件配置
返回确认机制是一个比较耗费性能的机制,一般不建议开启
@Slf4j@ConfigurationpublicclassMqConfig{@ResourcepublicRabbitTemplate rabbitTemplate;@PostConstructpublicvoidinit(){
rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
log.error("监听到消息return callback");
log.debug("交换机: {}", returnedMessage.getExchange());
log.debug("路由key: {}", returnedMessage.getRoutingKey());
log.debug("文本信息: {}", returnedMessage.getMessage());
log.debug("状态码: {}", returnedMessage.getReplyCode());
log.debug("状态文本: {}", returnedMessage.getReplyText());}});}}
示例:
2024-07-0316:50:57.922 ERROR 6928---[nectionFactory1]com.learndemo.config.MqConfig: 监听到消息return callback
2024-07-0316:50:57.922 DEBUG 6928---[.55.94.206:5672]com.learndemo.LearnDemoApplicationTests: 发送消息成功,收到 ack!2024-07-0316:50:57.922 DEBUG 6928---[nectionFactory1]com.learndemo.config.MqConfig: 交换机: learn.direct
2024-07-0316:50:57.923 DEBUG 6928---[nectionFactory1]com.learndemo.config.MqConfig: 路由key: q
2024-07-0316:50:57.923 DEBUG 6928---[nectionFactory1]com.learndemo.config.MqConfig: 文本信息:(Body:'hello'MessageProperties[headers={spring_returned_message_correlation=061a4ee2-ba31-4e5b-8ebb-dc938671a0b2}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])2024-07-0316:50:57.923 DEBUG 6928---[nectionFactory1]com.learndemo.config.MqConfig: 状态码:3122024-07-0316:50:57.923 DEBUG 6928---[nectionFactory1]com.learndemo.config.MqConfig: 状态文本: NO_ROUTE
每个消息都要单独指定回调对象:
@ResourcepublicRabbitTemplate rabbitTemplate;@TestvoidtestPublisherConfirm()throwsInterruptedException{// 1.创建CorrelationDataCorrelationData cd =newCorrelationData();// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(newListenableFutureCallback<CorrelationData.Confirm>(){@OverridepublicvoidonFailure(Throwable ex){// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);}@OverridepublicvoidonSuccess(CorrelationData.Confirm result){// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){// result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");}else{// result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息
rabbitTemplate.convertAndSend("learn.direct","q","hello", cd);// 给与时间接收回调Thread.sleep(20000);}
异常示例:
2024-07-03 16:49:18.620 ERROR 12588 --- [nectionFactory1] c.g.learndemo.LearnDemoApplicationTests : 发送消息失败,收到 nack, reason : channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'learn.direct' in vhost '/learn', class-id=60, method-id=40)
数据持久化
MQ为了保证性能,数据一般保存在内存之中
- 一旦宕机,数据【数据不仅仅指消息:还包括交换机、队列等信息】会丢失
- 内存空间有限,当消费者故障或处理过慢时,会导致消息堆积
可以通过三个方面进行持久化设计:
- 交换机持久化 -
- 队列持久化 -
- 消息持久化 - 消息如果在发送时设置不进行初始化 - 如果内存已满,会阻塞【PageOut】将消息进行持久化- 设置进行持久化,持久化过程不会阻塞-
@TestpublicvoidtestSendMessage(){// 1. 自定义构建消息Message message =MessageBuilder.withBody("Hello SpringAMQP".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)// 不进行持久化.build();// 2. 发送消息for(int i =0; i <10000; i++){
rabbitTemplate.convertAndSend("simple.queue", message);}}
Lazy Queue
3.12版本后,所有的队列都是LazyQueue,无法更改
一种特殊的队列,特点:
- 接收到的消息不再保存到内存,直接存入磁盘
- 消费者需要消费消息时才会从磁盘中读取并加载到内存(可以预加载消息,最多 2048 条)
配置
@BeanpublicQueuelazyQueue(){returnQueueBuilder.durable("lazy.queue")// 持久化队列.lazy()// 惰性队列.build();}
消费者可靠性
消费者确认
用于确认消费者是否成功处理了消息。当消费者处理消息结束后,应该向MQ发送一个回执,告知处理结果:
- ACK: - 消息处理成功,MQ从队列删除该消息
- NACK: - 消息处理失败,MQ需要再次投递改消息
- REJECT: - 消息处理失败,但不再进行处理,MQ从队列删除消息
SpringAMQP实现了消息确认功能,可以通过配置文件选择消息处理方式,有三种方式:
- none: 不推荐- 不进行处理,消费者收到消息里面返回ACK
- manual: - 手动进行:在代码中调用API发送
ACK
或者REJECT
- auto: 推荐 - 对消息处理逻辑使用AOP环绕增强 - 业务执行正常:ACK- 业务异常【抛出异常】:NACK- 消息处理或校验异常【消息相关的特定异常】:REJECT - 如:消息转换异常
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # none 关闭、 manual 手动
失败重试机制
消息处理失败,还可以在本地重试几次,如果都失败,执行后续异常逻辑
spring:
rabbitmq:
# 消费者确认机制
listener:
simple:
acknowledge-mode: auto
prefetch:1
# 开启本地失败重试
retry:
enabled:true
max-attempts:3 # 重试次数,默认3
initial-interval:1000 # 时间间隔,默认1s
multiplier:1 # 间隔倍增
开启重试模式后,重试次数耗尽,通过MessageRecoverer接口来处理,包含三种策略:
RejectAndDontRequeueRecoverer
- 次数耗尽,返回reject
,丢弃消息- 默认处理方式,很容易导致消息丢失,极不推荐ImmediateRequeueMessageRecoverer
- 次数耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
- 次数耗尽后,将消息投递到指定队列
修改默认策略:
@ConfigurationpublicclassErrorMessageConfig{@ResourceprivateRabbitTemplate rabbitTemplate;@BeanpublicDirectExchangeerrorExchange(){returnnewDirectExchange("error.direct");}@BeanpublicQueueerrorQueue(){returnnewQueue("error.queue");}@BeanpublicBindingerrorQueueBinding(Queue errorQueue,DirectExchange errorExchange){returnBindingBuilder.bind(errorQueue).to(errorExchange).with("error");}/**
* 配置
*/@BeanpublicMessageRecoverererrorMessageRecoverer(){returnnewRepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}}
@ConfigurationpublicclassErrorMessageConfig{@ResourceprivateRabbitTemplate rabbitTemplate;@BeanpublicMessageRecoverermessageRecoverer(){returnnewImmediateRequeueMessageRecoverer();}}
消费者重复消费
也被称为:幂等性问题,即对同一个请求,处理一次和多次,所造成的影响是一样的
通用方案-唯一的ID:
- 消息设置唯一的ID
- 将处理成功的消息ID保存到数据库之中
// 会在消息的properties字段设置消息ID@BeanpublicMessageConvertermessageConverter(){Jackson2JsonMessageConverter messageConverter =newJackson2JsonMessageConverter();
messageConverter.setCreateMessageIds(true);// 自动生成消息IDreturn messageConverter;}// 消费者获取消息ID@RabbitListener(queues ="simple.queue")publicvoidlistener(Message message){// 获取消息IDString messageId = message.getMessageProperties().getMessageId();}
- 缺点: - 读写数据库,性能不高
特定场景-业务状态:
- 比如说支付: - 接收到消息后修改为支付成功,只有未支付的订单才能变为支付成功- 状态只能由:A -> B,消息只会被正常 ‘处理一次’- 同一条消息只会对应一种状态转变- 状态转变只会发生一次
版权归原作者 goal66 所有, 如有侵权,请联系我们删除。