RabbitMQ中channel与try()结合导致的消息不消费或消息无法接收的问题分析与定位
文章目录
项目场景:
使用 rabbitMQ 测试 topic 交换机的案例
关键信息
RabbitMQ、 try、消费者无法接收消息
一、 写在前面
想要直接参考解决方案,看最后一部分
想要看问题原因,看第三部分
想看如何分析,顺序浏览
- 个人认为,交换机相对于队列数量更少,且与生产者更加相关,因此交给生产者声明更佳。
- 一次声明之后,只要它还在,就不必重复声明。队列亦是如此,消费者声明后,只要还在,无需重复声明。
- 如有错误,欢迎留言批评指正。
二、问题描述
生产者声明topic交换机正常,消息发布正常
消费者运行无异常,但是无法消费消息
RabbitMQUtil源码
publicclassRabbitMQUtil{privatestaticfinalintMESSAGE_COUNT=20;publicstaticChannelgetChannel(){Channel channel =null;try{ConnectionFactory connectionFactory =newConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setPassword("guest");
connectionFactory.setUsername("guest");Connection connection = connectionFactory.newConnection();
channel = connection.createChannel();}catch(IOException e){
e.printStackTrace();}catch(TimeoutException e){
e.printStackTrace();}return channel;}}
消费者源码:
public class TopicConsumer1 {
private static String EXCHANGE_NAME = "topic_logs";
private static String QUEUE_NAME = "Q1";
public static void main(String[] args) {
try (Channel channel = RabbitMQUtil.getChannel();){
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "q1.#");
System.out.println("waiting for message ......");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "UTF-8"));
System.out.println("接收队列:" + QUEUE_NAME + " 绑定键:" + message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
生产者源码
publicclassTopicPublisher{privatestaticStringEXCHANGE_NAME="topic_logs";publicstaticvoidmain(String[] args){Channel channel =RabbitMQUtil.getChannel();try{
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);Map<String,String> bindingKeyMap =newHashMap<>();
bindingKeyMap.put("q1.q2.rabbit","q2.q1.rabbit");
bindingKeyMap.put("aa.q2.elephant","aa.q1.elephant");
bindingKeyMap.put("q1.orange.fox","q2.orange.fox");
bindingKeyMap.put("lazy.brown.q3","lazy.brown.q3");
bindingKeyMap.put("q1.q2.q3","q1.q2.q3");for(Map.Entry<String,String> bindingKeyEntry:bindingKeyMap.entrySet()){String bindingKey =bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息"+ message);}}catch(Exception e){
e.printStackTrace();}}}
- 先启动消费者,声明交换机,但是这一次
消息会丢失
进入 页面管理界面 http://127.0.0.1:15672/
运行后没有队列
- 然后关闭生产者,这次运行只是声明交换机,关闭之后,通道消失。这也是重点
- 接下来启动消费者,等待消息消费![在这里插入图片描述](https://img-blog.csdnimg.cn/9cbd4cf0e460466cbdf79de02337d7b0.png
- 注释生产者中交换机声明,再次运行,发布消息
// 这次运行把它注释掉//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
那么问题来了,消费者处于运行状态,但是接收不到消息,why?
三、原因分析:
消息是否能够被消费,需要从以下几点考虑:
- 通道已建立
- 队列有消息(从上图看,没问题)
- 该队列消息从归属问题来看,可交给消费者。(队列就是它声明的,没问题)
- 服务没挂(后端能进去,没挂)
这么来看,只能去排查第一点,通道是否还在开启
看似有一个通道,可是问题是我们生产者也在运行,那么应该需要两个通道,从通道信息也可以看出,这是生产者的通道。关闭生产者即可确认,此时消费者的通道已经被关闭。
为什么会自己关掉呢,回看源码,我们将通道建立写在了 try() 中,这就是问题所在。
try(Channel channel =RabbitMQUtil.getChannel();){
在try() 圆括号中的内容一般是需要自动关闭的资源,那么可以猜测,这是被这种机制自动关闭了。
四、解决方案:
取消 try 自动关闭通道,把它写在外面。
publicstaticvoidmain(String[] args){Channel channel =RabbitMQUtil.getChannel();try{
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"q1.#");System.out.println("waiting for message ......");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println(newString(message.getBody(),"UTF-8"));System.out.println("接收队列:"+QUEUE_NAME+" 绑定键:"+ message.getEnvelope().getRoutingKey());};//接收消息
channel.basicConsume(QUEUE_NAME,true, deliverCallback, consumerTag ->{});}catch(IOException e){
e.printStackTrace();}}
版权归原作者 ClimberZheng 所有, 如有侵权,请联系我们删除。