实际业务中有在一个 RabbitMQ 中添加多个
virtual host
(又叫vhost)的情况,现记录SpringBoot 的配置方式如下,该配置同时满足多机部署配置。
假设我们需要分别配置名为
/primary
和
/second
的 vhost(vhost通常以
/
开头,实际中可按业务需求取名)。
1. SpringBoot 配置文件
spring:rabbitmq:# 可满足多机或多virtual host 配置primary:host: IP
port:5672username: guest
password: guest
virtual-host: /primary
second:host: IP
port:5672username: guest
password: guest
virtual-host: /second
# ***** 下面为可选配置 *****listener:simple:# 消息确认模式 其有三种配置方式,分别是none、manual和auto,默认autoacknowledge-mode: manual
# 最小的消费者数量concurrency:1# 最大的消费者数量maxConcurrency:16# 一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量prefetch:1retry:# 关闭自动重试enabled:falsetemplate:mandatory:trueretry:# 启动发送重试策略enabled:true# 初始重试间隔为1sinitial-interval:1000# 重试的最大次数max-attempts:3# 重试间隔最多10smax-interval:10000# 每次重试的因子1.0 等差multiplier:1.0
2. RabbitTemplate 配置和使用
2.1 RabbitTemplate 配置
读取不同的配置,创建两个
ConnectionFactory
,并分别创建对应的
RabbitTemplate
。
为方便使用,下面的配置中,primary 配置通过
@Primary
注解标记为 优先使用的 Bean,其 ConnectionFactory 和 RabbitTemplate 在
@Bean
中不单独命名,采用默认名,但需注意命名不要与second 混淆;second ConnectionFactory 和 RabbitTemplate 在
@Bean
中单独标记为 secondConnectionFactory 和 secondRabbitTemplate,且在装配
RabbitTemplate
时要通过
@Qualifier
指定使用secondConnectionFactory。
packagecom.xxx.amqp.config;importcn.hutool.core.util.RandomUtil;importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;importorg.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Primary;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassRabbitMqConfiguration{/*
* ===================================================
* primary virtual-host /primary
* ===================================================
*/@Bean@PrimarypublicConnectionFactoryconnectionFactory(@Value("${spring.rabbitmq.primary.host}")String host,@Value("${spring.rabbitmq.primary.port}")int port,@Value("${spring.rabbitmq.primary.username}")String username,@Value("${spring.rabbitmq.primary.password}")String password,@Value("${spring.rabbitmq.primary.virtual-host}")String virtualHost){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}@Bean@PrimarypublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){returnnewRabbitTemplate(connectionFactory);}/*
* ===================================================
* second virtual-host /second
* ===================================================
*/@Bean("secondConnectionFactory")publicConnectionFactorysecondConnectionFactory(@Value("${spring.rabbitmq.second.host}")String host,@Value("${spring.rabbitmq.second.port}")int port,@Value("${spring.rabbitmq.second.username}")String username,@Value("${spring.rabbitmq.second.password}")String password,@Value("${spring.rabbitmq.second.virtual-host}")String virtualHost){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}@Bean("secondRabbitTemplate")publicRabbitTemplateotbSyncRabbitTemplate(@Qualifier("secondConnectionFactory")ConnectionFactory connectionFactory){returnnewRabbitTemplate(connectionFactory);}}
2.2 RabbitTemplate 注入使用
建议优先使用
@Resource
注解注入。
如果要使用 primaryRabbitTemplate ,可直接注入使用,无需指定
name
:
@ResourceprivateRabbitTemplate rabbitTemplate;
如果要使用 secondRabbitTemplate,则须特别指定
name
属性:
@Resource(name ="secondRabbitTemplate")privateRabbitTemplate secondRabbitTemplate;
或者,如果你倾向于使用
@Autowired
注入,则需要同时使用
@Qualifier
注解指定
name
@Autowired@Qualifier("secondRabbitTemplate")privateRabbitTemplate secondRabbitTemplate;
3. 消息监听配置
这里采用
@RabbitListener
注解的方式监听消息消费。
如果要监听
/primary
,可在
@RabbitListener
注解中只声明要监听的
queues
,Spring会自动监听默认vhost
/primary
中的queue;但是如果要监听
/second
的消息,在声明
queues
(例如,使用 second-test-queue)的同时,还要在消费端配置中声明相应的
containerFactory
。
首先,消费端
RabbitTemplate
配置同上,在此基础上增加下面
RabbitListenerContainerFactory
配置(该配置为示例,具体场景可按需设置):
@Bean(name ="secondListenerContainer")publicSimpleRabbitListenerContainerFactorysecondRabbitListenerContainer(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("secondConnectionFactory")ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);return factory;}
然后在消息监听类方法的
@RabbitListener
中声明使用该
containerFactory
packagecom.xxx.amqp.consumer;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageProperties;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.IOException;@Slf4j@ComponentpublicclassSecondTestRabbitListener{@RabbitListener(queues ="second-test-queue", containerFactory ="secondListenerContainer")publicvoidupdateCoreMainCategoryFlag(Message message,Channel channel)throwsIOException{String messageBody =IOUtils.toString(message.getBody(),StandardCharsets.UTF_8.toString());
log.info("消息体内容: {}", messageBody);// 消息处理逻辑// ...// 处理结束后手动 ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
4. 参考
[1] SpringBoot配置多个RabbitMQ
版权归原作者 留围冰 所有, 如有侵权,请联系我们删除。