介绍
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(ConsumerAcknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitM0自己消息处理状态。
回执三种模式
- ack:成功处理消息,RabbitMO从队列中删除该消息
- nack:消息处理失败,RabbitMO需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式
配置文件
- none 不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
- manual 手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
- auto 自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,如果是业务异常,会自动返回nack,如果是消息处理或校验异常,自动返回reiect。
spring:rabbitmq:host: 112.68.123.287
port:5673#通信端口virtual-host: /cdn #虚拟主机名称username: c
password: 123456aa
listener:simple:acknowledge-mode: auto #确认机制 默认none
失败重试机制
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理飙升。
异常返回nack RabbitMQ需要再次投递消息,如果因为某些原因导致一直投递失败,就会造成循环给系统带来压力。可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列
spring:rabbitmq:host: 118.99.13.229
port:5673#通信端口virtual-host: /csdn #虚拟主机名称username: csdn
password: 123456aa
listener:simple:retry:enabled:true#开启消费者失败重试stateless:true# true无状态 false有状态。如果业务包含事务改为falsemax-attempts:3#最大重试次数acknowledge-mode: auto #确认机制 默认none
失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理。
3种不同的实现
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
目前RepublishMessageRecoverer 是比较可靠的策略
@Configuration@ConditionalOnProperty(value ="spring.rabbitmq.listener.simple.retry",name ="enabled",havingValue ="true")//开启失败重试的情况下才生效publicclassErrorConfig{@BeanpublicDirectExchangeerrorDirectExchange(){returnnewDirectExchange("error.direct");}//创建交换机@BeanpublicQueueerrorQueue(){returnnewQueue("error.queue");}//创建队列@BeanpublicBindingerrorBinding(Queue errorQueue,DirectExchange exchange){returnBindingBuilder.bind(errorQueue).to(exchange).with("error");}//交换机与队列绑定@BeanpublicMessageRecoverermessageRecoverer(RabbitTemplate rabbitTemplate){returnnewRepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}//RepublishMessageRecoverer}
版权归原作者 生产队的驴. 所有, 如有侵权,请联系我们删除。