记一次线上RabbitMQ的堵塞问题
1、背景
RabbitMQ同步外省市运单到本系统中
2、问题
某天早上上班,发现运维群里有很多企业反馈,在系统中查不到自己最新的运单了,当时我赶到公司,打开系统一看,昨天晚上10:30点之后,运单就没有新增了,当时我首先想到的是交通部那边没有将运单推送到MQ的队列中,我打开服务器log,发现没有MQ消费相关的log,最早的MQ消费log还是昨天晚上10:30左右的。当时我赶紧让运维同事看了一下,发现MQ中堆积了一两千个消息没有消费。
并且再10:30左右的log出现了很多报错log
经过分析错误log,发现昨天晚上10:30左右,MQ消费的消息报错了(业务报错)
3、临时解决
1、根据报错原因,解决报错
解决报错之后,发布线上,发现消费端又开始正常消费了,经过一天的观察,MQ消费正常
4、问题原因
后面有时间的时候,想了一下MQ不消费的原因:
MQ设置的是手动应答,由于消费端消费报错,没有正常ack,导致MQ中出现了很多unacked的消息。这个时候MQ会认为消费端已经没有能力去消费消息了,就不会再发送消息给消费者了,但是消息生产者继续将消息推送到MQ中,导致ready消息越来越多,但又不消费了,就导致了消息堵塞。
上面的其实是MQ的一种保护消费者的机制:QOS(服务质量保证)
5、QOS(服务质量保证)
在手动应答模式下启用, 在消费端出现大量报错,无法正常ack的情况下,MQ出现一定数量unacked,MQ为了保护消费端不在报错,MQ将不在发送消息给消费者,进而保护消费端服务的正常运行。
可以通过设置参数:PrefetchCount(spring.rabbitmq.listener.simple.prefetch),来设置MQ支持的最大未正常确认消息数量。
spring:
# 消息队列
rabbitmq:
host: 1.1.1.1
username: 1
password: 1
virtual-host: 1
port: 5672
# 消息发送确认
publisher-confirm-type: correlated
# 开启发送失败退回
publisher-returns: true
listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次请求中预处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: manual
如上面Prefetch=2,那么当有两个消息没有正常ack的时候,MQ就会不再发送消息了
6、为什么重启之后,消息又正常消费了呢
因为重启之后,unacked的消息,会重新会排到队列开头重新被消费,那么后面正常的消息就能继续被推送
7、如何判断是否又堵塞的风险
堵塞是因为unacked数量达到了限制
允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。
channlCount就是由concurrency,max-concurrency决定的。
所以
min = concurrency * prefetch * 节点数量
max = max-concurrency * prefetch * 节点数量
结论
unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。
unacked_msg_count >= min 可能会出现堵塞。
unacked_msg_count >= max 队列一定阻塞
8、事故重现
1、环境
1、生产者
@PostMapping(value = "/pushOkMsg")
public R<String> pushOkMsg(@RequestParam(value = "num")Integer num,@RequestParam(value = "msg")String msg){
for (int integer = 0; integer < num; integer++) {
String msgId = UUID.randomUUID().toString().toLowerCase().replaceAll("-", "");
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(MqConfigV2.TEST_QUEUE_KEY_V1,msg.getBytes(StandardCharsets.UTF_8),correlationData);
}
return R.ok("success!!!");
}
2、生产者配置
3、队列
4、消费者
@RabbitListener(queues = MqConfigV2.TEST_QUEUE_KEY_V1,containerFactory = "customContainerFactory")
@RabbitHandlerpublic void test4(Message message, Channel channel, @Headers Map<String, Object> heads) throws Exception {
String data = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(MqConfigV2.BASE_YD_QUEUE+" 消息接收=" + data);
if("error".equals(data)){
throw new RuntimeException("系统报错了!!!");
}
//模拟业务处理
Thread.sleep(1000);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
2、重现出
1、先发送10条正常消息
系统正常
2、发送一条error消息
系统报错,消费失败,未正常ack,mq中出现一条unacked
3、再发送10条正常消息
由于只有一条unacked消息,小于配置的prefetch=2
系统正常消费
4、再发送一条error
MQ出现两条unacked
5、发送10条正常的消息
消费者不消费了,MQ消息也堵塞了,因为unacked=2,大于等于prefetch=2
问题重现成功了
版权归原作者 吴书松 所有, 如有侵权,请联系我们删除。