Springboot集成多个RabbitMQ数据源创建队列混乱该怎么解决?
背景
某服务配置了两个RabbitMQ数据源,并且在这两个数据源中分别建立一个exchange以及queue,但启动服务后发现所有的exchange和queue都被创建到某一个数据源中,服务启动失败。
问题代码
RabbitMQ配置类
importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Primary;@ConfigurationpublicclassRabbitMQConfig{@Value("${spring.rabbitmq.concurrentConsumers}")int devAlarmConcurrentConsumers;@Value("${spring.rabbitmq.maxConcurrentConsumers}")int devAlarmMaxConcurrentConsumers;@Value("${spring.rabbitmq.addresses}")privateString host;@Value("${spring.rabbitmq.port}")privateint port;@Value("${spring.rabbitmq.username}")privateString username;@Value("${spring.rabbitmq.password}")privateString password;@Value("${spring.rabbitmq.virtual-host}")privateString virtual_host;@Value("${spring.rabbitmq2.concurrentConsumers}")int concurrentConsumers2;@Value("${spring.rabbitmq2.maxConcurrentConsumers}")int maxConcurrentConsumers2;@Value("${spring.rabbitmq2.host}")privateString host2;@Value("${spring.rabbitmq2.port}")privateint port2;@Value("${spring.rabbitmq2.username}")privateString username2;@Value("${spring.rabbitmq2.password}")privateString password2;@Value("${spring.rabbitmq2.virtual-host}")privateString virtual_host2;// ====================== 第一个mq数据源(主数据源)@Primary@Bean(name ="firstConnectionFactory")publicConnectionFactoryfirstConnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost(this.host);
connectionFactory.setPort(this.port);
connectionFactory.setUsername(this.username);
connectionFactory.setPassword(this.password);
connectionFactory.setVirtualHost(this.virtual_host);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name ="firstRabbitTemplate")publicRabbitTemplatefirstRabbitTemplate(@Qualifier("firstConnectionFactory")ConnectionFactory connectionFactory){RabbitTemplate rabbtiTemplate =newRabbitTemplate(connectionFactory);return rabbtiTemplate;}@Bean(name ="firstContainerFactory")publicSimpleRabbitListenerContainerFactoryfirstContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer,@Qualifier("firstConnectionFactory")ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory containerFactory=newSimpleRabbitListenerContainerFactory();
rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory);
containerFactory.setConcurrentConsumers(devAlarmConcurrentConsumers);
containerFactory.setMaxConcurrentConsumers(devAlarmMaxConcurrentConsumers);return containerFactory;}// ====================== 第二个mq数据源@Bean(name ="secondConnectionFactory")publicConnectionFactorysecondConnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost(this.host2);
connectionFactory.setPort(this.port2);
connectionFactory.setUsername(this.username2);
connectionFactory.setPassword(this.password2);
connectionFactory.setVirtualHost(this.virtual_host2);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name ="secondRabbitTemplate")publicRabbitTemplatesecondRabbitTemplate(@Qualifier("secondConnectionFactory")ConnectionFactory connectionFactory){RabbitTemplate rabbtiTemplate =newRabbitTemplate(connectionFactory);return rabbtiTemplate;}@Bean(name ="secondContainerFactory")publicSimpleRabbitListenerContainerFactorysecondContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer,@Qualifier("secondConnectionFactory")ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory containerFactory=newSimpleRabbitListenerContainerFactory();
rabbitListenerContainerFactoryConfigurer.configure(containerFactory,connectionFactory);
containerFactory.setConcurrentConsumers(concurrentConsumers2);
containerFactory.setMaxConcurrentConsumers(maxConcurrentConsumers2);return containerFactory;}}
消息监听类一
importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassFirstReceiver{@RabbitListener(bindings =@QueueBinding(
value =@Queue("test.first.queue"),
exchange =@Exchange(name ="test.first.exchange", type =ExchangeTypes.TOPIC),
key ="test.first"), containerFactory ="firstContainerFactory")@RabbitHandlerpublicvoidhandleMsg(Message message){// 代码逻辑...}}
消息监听类二
importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassSecondReceiver{@RabbitListener(bindings =@QueueBinding(
value =@Queue("test.second.queue"),
exchange =@Exchange(name ="test.second.exchange", type =ExchangeTypes.FANOUT)), containerFactory ="secondContainerFactory")@RabbitHandlerpublicvoidhandleMsg(Message message){// 代码逻辑...}}
追根溯源
有了当前错误现象,进行如下步骤的问题根源定位:
- 查看log日志,未发现异常
- 修改log日志级别为DEBUG,查看日志,如图:
如图所示,所有的exchange和queue都被同一个MQ数据源创建了。
- 查看源码
源码定位
发现【org.springframework.amqp.rabbit.core.RabbitAdmin】类有相关加载exchange和queue的逻辑,如图所示:
断点确认
在开发环境服务中打断点,发现,谁能创建这个exchange和queue的原则是,是否指定了exchange和queue是被那个MQ数据源绑定的,也就是需要用到【
RabbitAdmin.java
】实现【
MQ数据源、exchange、queue
】三者之间的绑定,如果没有定义这个RabbitAdmin,那就会默认都被
MQ主数据源
创建并绑定。
MQ主数据源:在一个集成了多个MQ数据源的Springboot项目中,必须要有一个被@Primary注解的主数据源,否则项目启动失败。
代码修改
RabbitMQ配置类
在原类基础上创建两个RabbitAdmin:
@ConfigurationpublicclassRabbitMQConfig{@Bean(value ="firstRabbitAdmin")publicRabbitAdminfirstRabbitAdmin(@Qualifier("firstConnectionFactory")ConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@Bean(value ="secondRabbitAdmin")publicRabbitAdminsecondRabbitAdmin(@Qualifier("secondConnectionFactory")ConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}}
消息监听类一
在原类的@RabbitListener注解中新增admins参数:
importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassFirstReceiver{@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="test.first.queue",admins ="firstRabbitAdmin"),
exchange =@Exchange(name ="test.first.exchange", type =ExchangeTypes.TOPIC,admins ="firstRabbitAdmin"),
key ="test.first",
admins ="firstRabbitAdmin"), containerFactory ="firstContainerFactory")@RabbitHandlerpublicvoidhandleMsg(Message message){// 代码逻辑...}}
消息监听类二
在原类的@RabbitListener注解中新增admins参数:
importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;@Slf4j@ComponentpublicclassSecondReceiver{@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="test.second.queue",admins ="secondRabbitAdmin"),
exchange =@Exchange(name ="test.second.exchange", type =ExchangeTypes.FANOUT,admins ="secondRabbitAdmin"),
admins ="secondRabbitAdmin"), containerFactory ="secondContainerFactory")@RabbitHandlerpublicvoidhandleMsg(Message message){// 代码逻辑...}}
服务启动
最终两个MQ数据源可以分别创建对应的exchange和queue了。
版权归原作者 她又在丛中笑 所有, 如有侵权,请联系我们删除。