1.消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告诉RabbitMQ自己消息处理状态。
回执值含义ack成功处理消息,RabbitMQ从队列中删除该消息nack消息处理失败,RabbitMQ需要再次投递消息reject消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择回执值的处理方式
spring:# RabbitMQ相关配置rabbitmq:listener:simple:# ACK处理方式acknowledge-mode: auto
处理方式含义none不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ中删除。非常不安全,不建议使用manual手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活auto自动模式。SpringAMQP利用AOP对消息处理逻辑做了环绕增强,当业务正常执行时自动返回ack。当业务出现异常时,根据异常判断返回不同结果:(1)业务异常:自动返回nack (2)消息处理或校验异常:自动返回reject
2.消费失败处理
(1)失败重试机制
在auto模式中,出现nack回执的情况下,消息会不断重新入队到队列,再重新发给消费者,然后再次异常,再次重新入队,无限循环,导致MQ的消息处理飙升,带来不必要的压力。利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制测重新入队
spring:# RabbitMQ相关配置rabbitmq:# 配置虚拟主机名称virtual-host: system1
# RabbitMQ管理端口号:15672;服务端口号:5672port:5672# 配置使用者的名称和密码username: guest
password: guest
# 配置消费者信息listener:simple:# 设置消费者回执值的处理模式acknowledge-mode: auto
retry:# 开启消费者失败重试enabled:true# 初始的失败重试时间间隔initial-interval: 1000ms
# 间隔递增系数,下次等待时长 = multiplier * 上次等待时长multiplier:1# 最大重试次数max-attempts:3# true无状态;false有状态。如果业务中包含事务,需要设置为falsestateless:true
(2)失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,该接口有三种不同的实现
实现类作用RejectAndDontRequeueRecoverer重试耗尽后,直接reject,丢弃消息。默认是这种方式ImmediateRequeueMessageRecoverer重试耗尽后,返回nack,消息重新入队RepublishMessageRecoverer重试耗尽后,将失败消息投递到指定的交换机
RepublishMessageRecoverer 实现类代码示例
importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;importorg.springframework.boot.autoconfigure.condition.ConditionalOnProperty;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@Configuration@ConditionalOnProperty(name ="spring.rabbitmq.listener.simple.retry.enabled", havingValue ="true")publicclassMessageErrorConfiguration{// 定义交换机@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange("topic-exchange-1");}// 定义队列@BeanpublicQueuemessageErrorQueue(){returnnewQueue("message-error-queue-1");}// 定义交换机和队列的绑定方式@BeanpublicBindingmessageErrorBinding(){returnBindingBuilder.bind(messageErrorQueue()).to(topicExchange()).with("message.error");}//定义RepublishMessageRecoverer实现类消息失败处理策略@BeanpublicRepublishMessageRecovererrepublishMessageRecoverer(RabbitTemplate rabbitTemplate){returnnewRepublishMessageRecoverer(rabbitTemplate,"topic-exchange-1","message.error");}}
3.业务幂等性
在有消费者确认机制和消费失败处理的情况下可以保证每个消息至少被消费一次。也可能存在消费者已经消费了消息,但是因为某种原因MQ没接收到回执信号导致MQ再次给消费者发消息的重复消费现象。这时要考虑重复消费带来的业务幂等性,以下是两种保证业务幂等性的解决方案
幂等:在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的
(1)唯一消息ID
给每个消息都生成一个唯一ID,利用ID区分是否是重复消息:每一条消息都生成一个唯一的ID,与消息一起投递给消费者。消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库。如果下次收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理
@BeanpublicJackson2JsonMessageConverterjackson2JsonMessageConverter(){Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();
jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
(2)业务判断
结合业务逻辑,基于业务本身做判断
版权归原作者 Java界第一深情 所有, 如有侵权,请联系我们删除。