0


RabbitMQ中channel与try()结合导致的消息不消费或消息无法接收的问题分析与定位

RabbitMQ中channel与try()结合导致的消息不消费或消息无法接收的问题分析与定位

文章目录

项目场景:

使用 rabbitMQ 测试 topic 交换机的案例

关键信息
RabbitMQ、 try、消费者无法接收消息


一、 写在前面

想要直接参考解决方案,看最后一部分
想要看问题原因,看第三部分
想看如何分析,顺序浏览

  • 个人认为,交换机相对于队列数量更少,且与生产者更加相关,因此交给生产者声明更佳。
  • 一次声明之后,只要它还在,就不必重复声明。队列亦是如此,消费者声明后,只要还在,无需重复声明。
  • 如有错误,欢迎留言批评指正

二、问题描述

生产者声明topic交换机正常,消息发布正常
消费者运行无异常,但是无法消费消息

RabbitMQUtil源码

publicclassRabbitMQUtil{privatestaticfinalintMESSAGE_COUNT=20;publicstaticChannelgetChannel(){Channel channel =null;try{ConnectionFactory connectionFactory =newConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setPassword("guest");
            connectionFactory.setUsername("guest");Connection connection = connectionFactory.newConnection();
            channel = connection.createChannel();}catch(IOException e){
            e.printStackTrace();}catch(TimeoutException e){
            e.printStackTrace();}return channel;}}

消费者源码

public class TopicConsumer1 {
    private static String EXCHANGE_NAME = "topic_logs";
    private static String QUEUE_NAME = "Q1";

    public static void main(String[] args) {
        try (Channel channel = RabbitMQUtil.getChannel();){
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "q1.#");
            System.out.println("waiting for message ......");
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody(), "UTF-8"));
                System.out.println("接收队列:" + QUEUE_NAME + "  绑定键:" + message.getEnvelope().getRoutingKey());
            };
            //接收消息
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

生产者源码

publicclassTopicPublisher{privatestaticStringEXCHANGE_NAME="topic_logs";publicstaticvoidmain(String[] args){Channel channel =RabbitMQUtil.getChannel();try{
            channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);Map<String,String> bindingKeyMap =newHashMap<>();
            bindingKeyMap.put("q1.q2.rabbit","q2.q1.rabbit");
            bindingKeyMap.put("aa.q2.elephant","aa.q1.elephant");
            bindingKeyMap.put("q1.orange.fox","q2.orange.fox");
            bindingKeyMap.put("lazy.brown.q3","lazy.brown.q3");
            bindingKeyMap.put("q1.q2.q3","q1.q2.q3");for(Map.Entry<String,String> bindingKeyEntry:bindingKeyMap.entrySet()){String bindingKey =bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME,bindingKey,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息"+ message);}}catch(Exception e){
            e.printStackTrace();}}}
  1. 先启动消费者,声明交换机,但是这一次消息会丢失 进入 页面管理界面 http://127.0.0.1:15672/

在这里插入图片描述
运行后没有队列

  1. 然后关闭生产者,这次运行只是声明交换机,关闭之后,通道消失。这也是重点在这里插入图片描述
  2. 接下来启动消费者,等待消息消费在这里插入图片描述![在这里插入图片描述](https://img-blog.csdnimg.cn/9cbd4cf0e460466cbdf79de02337d7b0.png
  3. 注释生产者中交换机声明,再次运行,发布消息
// 这次运行把它注释掉//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); 

在这里插入图片描述
那么问题来了,消费者处于运行状态,但是接收不到消息,why?
在这里插入图片描述


三、原因分析:

消息是否能够被消费,需要从以下几点考虑:

  1. 通道已建立
  2. 队列有消息(从上图看,没问题)
  3. 该队列消息从归属问题来看,可交给消费者。(队列就是它声明的,没问题)
  4. 服务没挂(后端能进去,没挂)

这么来看,只能去排查第一点,通道是否还在开启
在这里插入图片描述
看似有一个通道,可是问题是我们生产者也在运行,那么应该需要两个通道,从通道信息也可以看出,这是生产者的通道。关闭生产者即可确认,此时消费者的通道已经被关闭。

为什么会自己关掉呢,回看源码,我们将通道建立写在了 try() 中,这就是问题所在。

try(Channel channel =RabbitMQUtil.getChannel();){

在try() 圆括号中的内容一般是需要自动关闭的资源,那么可以猜测,这是被这种机制自动关闭了。


四、解决方案:

取消 try 自动关闭通道,把它写在外面。

publicstaticvoidmain(String[] args){Channel channel =RabbitMQUtil.getChannel();try{
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"q1.#");System.out.println("waiting for message ......");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println(newString(message.getBody(),"UTF-8"));System.out.println("接收队列:"+QUEUE_NAME+"  绑定键:"+ message.getEnvelope().getRoutingKey());};//接收消息
            channel.basicConsume(QUEUE_NAME,true, deliverCallback, consumerTag ->{});}catch(IOException e){
            e.printStackTrace();}}

在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_39754423/article/details/129278728
版权归原作者 ClimberZheng 所有, 如有侵权,请联系我们删除。

“RabbitMQ中channel与try()结合导致的消息不消费或消息无法接收的问题分析与定位”的评论:

还没有评论