0


Springboot集成多个RabbitMQ数据源创建队列混乱该怎么解决?

Springboot集成多个RabbitMQ数据源创建队列混乱该怎么解决?

背景

某服务配置了两个RabbitMQ数据源,并且在这两个数据源中分别建立一个exchange以及queue,但启动服务后发现所有的exchange和queue都被创建到某一个数据源中,服务启动失败。
在这里插入图片描述

问题代码

RabbitMQ配置类

importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Primary;@ConfigurationpublicclassRabbitMQConfig{@Value("${spring.rabbitmq.concurrentConsumers}")int devAlarmConcurrentConsumers;@Value("${spring.rabbitmq.maxConcurrentConsumers}")int devAlarmMaxConcurrentConsumers;@Value("${spring.rabbitmq.addresses}")privateString host;@Value("${spring.rabbitmq.port}")privateint port;@Value("${spring.rabbitmq.username}")privateString username;@Value("${spring.rabbitmq.password}")privateString password;@Value("${spring.rabbitmq.virtual-host}")privateString virtual_host;@Value("${spring.rabbitmq2.concurrentConsumers}")int concurrentConsumers2;@Value("${spring.rabbitmq2.maxConcurrentConsumers}")int maxConcurrentConsumers2;@Value("${spring.rabbitmq2.host}")privateString host2;@Value("${spring.rabbitmq2.port}")privateint port2;@Value("${spring.rabbitmq2.username}")privateString username2;@Value("${spring.rabbitmq2.password}")privateString password2;@Value("${spring.rabbitmq2.virtual-host}")privateString virtual_host2;// ====================== 第一个mq数据源(主数据源)@Primary@Bean(name ="firstConnectionFactory")publicConnectionFactoryfirstConnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
        connectionFactory.setHost(this.host);
        connectionFactory.setPort(this.port);
        connectionFactory.setUsername(this.username);
        connectionFactory.setPassword(this.password);
        connectionFactory.setVirtualHost(this.virtual_host);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name ="firstRabbitTemplate")publicRabbitTemplatefirstRabbitTemplate(@Qualifier("firstConnectionFactory")ConnectionFactory connectionFactory){RabbitTemplate rabbtiTemplate =newRabbitTemplate(connectionFactory);return rabbtiTemplate;}@Bean(name ="firstContainerFactory")publicSimpleRabbitListenerContainerFactoryfirstContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer,@Qualifier("firstConnectionFactory")ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory containerFactory=newSimpleRabbitListenerContainerFactory();
        rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory);
        containerFactory.setConcurrentConsumers(devAlarmConcurrentConsumers);
        containerFactory.setMaxConcurrentConsumers(devAlarmMaxConcurrentConsumers);return containerFactory;}// ====================== 第二个mq数据源@Bean(name ="secondConnectionFactory")publicConnectionFactorysecondConnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
        connectionFactory.setHost(this.host2);
        connectionFactory.setPort(this.port2);
        connectionFactory.setUsername(this.username2);
        connectionFactory.setPassword(this.password2);
        connectionFactory.setVirtualHost(this.virtual_host2);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name ="secondRabbitTemplate")publicRabbitTemplatesecondRabbitTemplate(@Qualifier("secondConnectionFactory")ConnectionFactory connectionFactory){RabbitTemplate rabbtiTemplate =newRabbitTemplate(connectionFactory);return rabbtiTemplate;}@Bean(name ="secondContainerFactory")publicSimpleRabbitListenerContainerFactorysecondContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer,@Qualifier("secondConnectionFactory")ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory containerFactory=newSimpleRabbitListenerContainerFactory();
        rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory);
        containerFactory.setConcurrentConsumers(concurrentConsumers2);
        containerFactory.setMaxConcurrentConsumers(maxConcurrentConsumers2);return containerFactory;}}

消息监听类一

importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassFirstReceiver{@RabbitListener(bindings =@QueueBinding(
        value =@Queue("test.first.queue"),
        exchange =@Exchange(name ="test.first.exchange", type =ExchangeTypes.TOPIC),
        key ="test.first"), containerFactory ="firstContainerFactory")@RabbitHandlerpublicvoidhandleMsg(Message message){// 代码逻辑...}}

消息监听类二

importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassSecondReceiver{@RabbitListener(bindings =@QueueBinding(
        value =@Queue("test.second.queue"),
        exchange =@Exchange(name ="test.second.exchange", type =ExchangeTypes.FANOUT)), containerFactory ="secondContainerFactory")@RabbitHandlerpublicvoidhandleMsg(Message message){// 代码逻辑...}}

追根溯源

有了当前错误现象,进行如下步骤的问题根源定位:

  1. 查看log日志,未发现异常
  2. 修改log日志级别为DEBUG,查看日志,如图:在这里插入图片描述

如图所示,所有的exchange和queue都被同一个MQ数据源创建了。

  1. 查看源码

源码定位

发现【org.springframework.amqp.rabbit.core.RabbitAdmin】类有相关加载exchange和queue的逻辑,如图所示:
在这里插入图片描述

断点确认

在开发环境服务中打断点,发现,谁能创建这个exchange和queue的原则是,是否指定了exchange和queue是被那个MQ数据源绑定的,也就是需要用到【

RabbitAdmin.java

】实现【

MQ数据源、exchange、queue

】三者之间的绑定,如果没有定义这个RabbitAdmin,那就会默认都被

MQ主数据源

创建并绑定。

MQ主数据源:在一个集成了多个MQ数据源的Springboot项目中,必须要有一个被@Primary注解的主数据源,否则项目启动失败。

代码修改

RabbitMQ配置类

在原类基础上创建两个RabbitAdmin:

@ConfigurationpublicclassRabbitMQConfig{@Bean(value ="firstRabbitAdmin")publicRabbitAdminfirstRabbitAdmin(@Qualifier("firstConnectionFactory")ConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@Bean(value ="secondRabbitAdmin")publicRabbitAdminsecondRabbitAdmin(@Qualifier("secondConnectionFactory")ConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}}

消息监听类一

在原类的@RabbitListener注解中新增admins参数:

importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassFirstReceiver{@RabbitListener(bindings =@QueueBinding(
        value =@Queue(value ="test.first.queue",admins ="firstRabbitAdmin"),
        exchange =@Exchange(name ="test.first.exchange", type =ExchangeTypes.TOPIC,admins ="firstRabbitAdmin"),
        key ="test.first",
        admins ="firstRabbitAdmin"), containerFactory ="firstContainerFactory")@RabbitHandlerpublicvoidhandleMsg(Message message){// 代码逻辑...}}

消息监听类二

在原类的@RabbitListener注解中新增admins参数:

importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassSecondReceiver{@RabbitListener(bindings =@QueueBinding(
        value =@Queue(value ="test.second.queue",admins ="secondRabbitAdmin"),
        exchange =@Exchange(name ="test.second.exchange", type =ExchangeTypes.FANOUT,admins ="secondRabbitAdmin"),
        admins ="secondRabbitAdmin"), containerFactory ="secondContainerFactory")@RabbitHandlerpublicvoidhandleMsg(Message message){// 代码逻辑...}}

服务启动

在这里插入图片描述
最终两个MQ数据源可以分别创建对应的exchange和queue了。


本文转载自: https://blog.csdn.net/m0_63164811/article/details/140493562
版权归原作者 她又在丛中笑 所有, 如有侵权,请联系我们删除。

“Springboot集成多个RabbitMQ数据源创建队列混乱该怎么解决?”的评论:

还没有评论