0


RabbitMQ堵塞问题复盘

记一次线上RabbitMQ的堵塞问题

1、背景

RabbitMQ同步外省市运单到本系统中

2、问题

某天早上上班,发现运维群里有很多企业反馈,在系统中查不到自己最新的运单了,当时我赶到公司,打开系统一看,昨天晚上10:30点之后,运单就没有新增了,当时我首先想到的是交通部那边没有将运单推送到MQ的队列中,我打开服务器log,发现没有MQ消费相关的log,最早的MQ消费log还是昨天晚上10:30左右的。当时我赶紧让运维同事看了一下,发现MQ中堆积了一两千个消息没有消费。
在这里插入图片描述
并且再10:30左右的log出现了很多报错log
经过分析错误log,发现昨天晚上10:30左右,MQ消费的消息报错了(业务报错)

3、临时解决

1、根据报错原因,解决报错
解决报错之后,发布线上,发现消费端又开始正常消费了,经过一天的观察,MQ消费正常

4、问题原因

后面有时间的时候,想了一下MQ不消费的原因:
MQ设置的是手动应答,由于消费端消费报错,没有正常ack,导致MQ中出现了很多unacked的消息。这个时候MQ会认为消费端已经没有能力去消费消息了,就不会再发送消息给消费者了,但是消息生产者继续将消息推送到MQ中,导致ready消息越来越多,但又不消费了,就导致了消息堵塞。
上面的其实是MQ的一种保护消费者的机制:QOS(服务质量保证)

5、QOS(服务质量保证)

在手动应答模式下启用, 在消费端出现大量报错,无法正常ack的情况下,MQ出现一定数量unacked,MQ为了保护消费端不在报错,MQ将不在发送消息给消费者,进而保护消费端服务的正常运行。
可以通过设置参数:PrefetchCount(spring.rabbitmq.listener.simple.prefetch),来设置MQ支持的最大未正常确认消息数量

spring:
  # 消息队列
  rabbitmq:
    host: 1.1.1.1
    username: 1
    password: 1
    virtual-host: 1
    port: 5672
    # 消息发送确认
    publisher-confirm-type: correlated
    # 开启发送失败退回
    publisher-returns: true
    listener:
      simple:
        # 消费端最小并发数
        concurrency: 1
        # 消费端最大并发数
        max-concurrency: 5
        # 一次请求中预处理的消息数量
        prefetch: 2
        # 手动应答
        acknowledge-mode: manual

如上面Prefetch=2,那么当有两个消息没有正常ack的时候,MQ就会不再发送消息了

6、为什么重启之后,消息又正常消费了呢

因为重启之后,unacked的消息,会重新会排到队列开头重新被消费,那么后面正常的消息就能继续被推送

7、如何判断是否又堵塞的风险

参考:https://www.codenong.com/cs109484329/

堵塞是因为unacked数量达到了限制
允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。
channlCount就是由concurrency,max-concurrency决定的。
所以

min = concurrency * prefetch * 节点数量
max = max-concurrency * prefetch * 节点数量

结论

unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。
unacked_msg_count >= min 可能会出现堵塞。
unacked_msg_count >= max 队列一定阻塞

8、事故重现

1、环境

1、生产者

@PostMapping(value = "/pushOkMsg")
public R<String> pushOkMsg(@RequestParam(value = "num")Integer num,@RequestParam(value = "msg")String msg){
    for (int integer = 0; integer < num; integer++) {
        String msgId = UUID.randomUUID().toString().toLowerCase().replaceAll("-", "");
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(MqConfigV2.TEST_QUEUE_KEY_V1,msg.getBytes(StandardCharsets.UTF_8),correlationData);
    }
    return R.ok("success!!!");
}

2、生产者配置

在这里插入图片描述

3、队列

在这里插入图片描述

4、消费者

@RabbitListener(queues = MqConfigV2.TEST_QUEUE_KEY_V1,containerFactory = "customContainerFactory")
@RabbitHandlerpublic void test4(Message message, Channel channel, @Headers Map<String, Object> heads) throws Exception {
    String data = new String(message.getBody(), StandardCharsets.UTF_8);
    System.out.println(MqConfigV2.BASE_YD_QUEUE+" 消息接收=" + data);
    if("error".equals(data)){
        throw new RuntimeException("系统报错了!!!");
    }
    //模拟业务处理
    Thread.sleep(1000);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

2、重现出

1、先发送10条正常消息

系统正常
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2、发送一条error消息

系统报错,消费失败,未正常ack,mq中出现一条unacked
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3、再发送10条正常消息

由于只有一条unacked消息,小于配置的prefetch=2
系统正常消费
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4、再发送一条error

MQ出现两条unacked

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5、发送10条正常的消息

消费者不消费了,MQ消息也堵塞了,因为unacked=2,大于等于prefetch=2
问题重现成功了
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/RoronoaZoro1995/article/details/134071858
版权归原作者 吴书松 所有, 如有侵权,请联系我们删除。

“RabbitMQ堵塞问题复盘”的评论:

还没有评论