一、背景
在生产环境下,rabbitmq机器出现磁盘空间不足的报警,发现是某个队列的消息只有生产,迟迟没有消费。


可以得到的信息是:
- 队列queue是data_center_file_change_queue
- 队列绑定的交换机是resourceChangeExchange,见下图
- 队列所在的vhost是/

之所以出现rabbitmq消息堆积,是因为消费程序所在的vhost与消息生产者所在的vhost不是同一个。
下面将逐步说明其排查过程,以及解决方法。
二、服务架构

在微服务框架下,两个服务之间,通过rabbitmq来解耦。
三、问题排查
消息没有被消费,首先怀疑的一点是:发送方或者消费方修改了消息体,因为出现异常,导致消费失败,从而重新放入了队列。
1、尝试解码消息内容

base64解密工具,输入上面的消息体,进行解密。

2、对比分析生产者和消费者的java代码
近期没有修改过报文体,而且我看消费者的代码,都有对异常进行捕获。
也就是说,消费端无论是消费成功还是失败,该消息就算在出现异常的时候,也会被视为已消费。(并不会出现消息堆积)
3、生产和消费的双方是否处于同一个vhost
检索队列名称,发现同样的队列名,在不同的vhost都存在。详见下图:

进一步查看程序的配置,得知消息生产程序并未指定vhost,默认使用的是/
而消息消费程序,指定了vhost,如此导致两边的vhost不一致。
这是导致rabbitmq出现严重的消息堆积的根源。
4、保证程序的vhost一致
- 消费端程序配置的vhost
修改前:
修改后:
- 生产端程序配置的vhost

四、验证

经过一段时间,看到堆积的消息正被慢慢消费掉。
最后积压的消息全部被消费成功。

五、程序的代码示例
1、消息生产者
@ConfigurationpublicclassRabbitConfig{@BeanpublicFanoutExchangeresourceChangeExchange(){returnnewFanoutExchange("resourceChangeExchange");}}
@Resource(name ="rabbitTemplate")privateAmqpTemplate rabbitTemplate;Map<String,Object> map =Maps.newHashMap();
map.put("xxxx","");this.rabbitTemplate.convertAndSend("resourceChangeExchange","", map);
2、消息消费者
publicinterfaceRabbitMQ{/**
* 资源中心文件变动队列 名 (变动事件)
*/StringDATA_CENTER_FILE_CHANGE_QUEUE="data_center_file_change_queue";/**
* 资源中心文件变动队列 名 (变动事件) Direct交换机名
*/StringDATA_CENTER_FILE_CHANGE_DIRECT_EXCHANGE="resourceChangeExchange";}
@ConfigurationpublicclassRabbitMqConfig{@BeanpublicQueuedataCenterFileChangeQueue(){returnnewQueue(RabbitMQ.DATA_CENTER_FILE_CHANGE_QUEUE,true,false,false,null);}@BeanpublicFanoutExchangedataCenterFileChangeDirectExchange(){returnnewFanoutExchange(RabbitMQ.DATA_CENTER_FILE_CHANGE_DIRECT_EXCHANGE,true,false);}}
- 消费mq消息
@RabbitListener(queues =RabbitMQ.DATA_CENTER_FILE_CHANGE_QUEUE)publicvoidlistener(Map<String,String> messageMap,Message message,Channel channel){// messageMap 是mq消息体}
版权归原作者 天草二十六_简村人 所有, 如有侵权,请联系我们删除。