0


SpringBoot连接多个RabbitMQ

目 录

1. 前 言

在 SpringBoot 中整合单个 RabbitMQ 使用,是很简单的,只需要引入依赖,然后在配置里面配置好 MQ 的连接地址、账号、密码等信息,然后使用即可。但如果 MQ 的连接地址是多个,那这种连接方式就不奏效了。

前段时间,我开发的一个项目就遇到了这样的问题。那个项目,好几个关联方,每个关联方用的 MQ 的地址都不相同,也就意味着我这边要连接几个 RabbbitMQ 地址。SpringBoot 连接多个 RabbitMQ,怎么搞?

使用默认的连接方式是行不通的,我已经试过,而要实现 SpringBoot 连接多个 RabbitMQ,只能自定义重写一些东西,分别配置才可以,下面一起来走一下试试。

2. 重 写

首先要明确的是,下面的两个类是需要重写的:

  • RabbitTemplate:往队列里面丢消息时,需要用到
  • RabbitAdmin:声明队列、声明交换机、绑定队列和交换机用到

这里,我定义两个关联方,一个是 one,一个是 two,分别重写与它们的连接工厂。

2.1 重写与关联方one的连接工厂

packagecom.yuhuofei.mq.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.AcknowledgeMode;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.support.CorrelationData;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Primary;/**
 * @author yuhuofei
 * @version 1.0
 * @description 重写与关联方one的连接工厂
 * @date 2022/10/3 16:57
 */@Slf4j@ConfigurationpublicclassOneMQConfig{@Value("${one.spring.rabbitmq.host}")privateString host;@Value("${one.spring.rabbitmq.port}")privateint port;@Value("${one.spring.rabbitmq.username}")privateString username;@Value("${one.spring.rabbitmq.password}")privateString password;@Value("${one.spring.rabbitmq.virtual-host}")privateString virtualHost;/**
     * 定义与one的连接工厂
     */@Bean(name ="oneConnectionFactory")@PrimarypublicConnectionFactoryoneConnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name ="oneRabbitTemplate")@PrimarypublicRabbitTemplateoneRabbitTemplate(@Qualifier("oneConnectionFactory")ConnectionFactory connectionFactory){RabbitTemplate oneRabbitTemplate =newRabbitTemplate(connectionFactory);
        oneRabbitTemplate.setMandatory(true);
        oneRabbitTemplate.setConnectionFactory(connectionFactory);
        oneRabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){/**
             * 确认消息送到交换机(Exchange)回调
             * @param correlationData
             * @param ack
             * @param cause
             */@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
                log.info("确认消息送到交换机(Exchange)结果:");
                log.info("相关数据:{}", correlationData);boolean ret =false;if(ack){
                    log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());//下面可自定义业务逻辑处理,如入库保存信息等}else{
                    log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);//下面可自定义业务逻辑处理,如入库保存信息等}}});

        oneRabbitTemplate.setReturnCallback(newRabbitTemplate.ReturnCallback(){/**
             * 只要消息没有投递给指定的队列 就触发这个失败回调
             * @param message  投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 当时这个消息发给那个交换机
             * @param routingKey 当时这个消息用那个路由键
             */@OverridepublicvoidreturnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey){//获取消息idString messageId = message.getMessageProperties().getMessageId();// 内容String result =null;try{
                    result =newString(message.getBody(),"UTF-8");}catch(Exception e){
                    log.error("消息发送失败{}", e);}
                log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);//下面可自定义业务逻辑处理,如入库保存信息等}});return oneRabbitTemplate;}@Bean(name ="oneFactory")@PrimarypublicSimpleRabbitListenerContainerFactoryoneFactory(@Qualifier("oneConnectionFactory")ConnectionFactory connectionFactory,SimpleRabbitListenerContainerFactoryConfigurer configurer){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);return factory;}@Bean(name ="oneRabbitAdmin")@PrimarypublicRabbitAdminoneRabbitAdmin(@Qualifier("oneConnectionFactory")ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}}

2.2 重写与关联方two的连接工厂

packagecom.yuhuofei.mq.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.AcknowledgeMode;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.support.CorrelationData;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * @author yuhuofei
 * @version 1.0
 * @description 重写与关联方two的连接工厂
 * @date 2022/10/3 17:52
 */@Slf4j@ConfigurationpublicclassTwoMQConfig{@Value("${two.spring.rabbitmq.host}")privateString host;@Value("${two.spring.rabbitmq.port}")privateint port;@Value("${two.spring.rabbitmq.username}")privateString username;@Value("${two.spring.rabbitmq.password}")privateString password;@Value("${two.spring.rabbitmq.virtualHost}")privateString virtualHost;/**
     * 定义与two的连接工厂
     */@Bean(name ="twoConnectionFactory")publicConnectionFactorytwoConnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name ="twoRabbitTemplate")publicRabbitTemplatetwoRabbitTemplate(@Qualifier("twoConnectionFactory")ConnectionFactory connectionFactory){RabbitTemplate twoRabbitTemplate =newRabbitTemplate(connectionFactory);
        twoRabbitTemplate.setMandatory(true);
        twoRabbitTemplate.setConnectionFactory(connectionFactory);
        twoRabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){/**
             * 确认消息送到交换机(Exchange)回调
             * @param correlationData
             * @param ack
             * @param cause
             */@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
                log.info("确认消息送到交换机(Exchange)结果:");
                log.info("相关数据:{}", correlationData);boolean ret =false;if(ack){
                    log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());//下面可自定义业务逻辑处理,如入库保存信息等}else{
                    log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);//下面可自定义业务逻辑处理,如入库保存信息等}}});

        twoRabbitTemplate.setReturnCallback(newRabbitTemplate.ReturnCallback(){/**
             * 只要消息没有投递给指定的队列 就触发这个失败回调
             * @param message  投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 当时这个消息发给那个交换机
             * @param routingKey 当时这个消息用那个路由键
             */@OverridepublicvoidreturnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey){//获取消息idString messageId = message.getMessageProperties().getMessageId();// 内容String result =null;try{
                    result =newString(message.getBody(),"UTF-8");}catch(Exception e){
                    log.error("消息发送失败{}", e);}
                log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);//下面可自定义业务逻辑处理,如入库保存信息等}});return twoRabbitTemplate;}@Bean(name ="twoFactory")publicSimpleRabbitListenerContainerFactorytwoFactory(@Qualifier("twoConnectionFactory")ConnectionFactory connectionFactory,SimpleRabbitListenerContainerFactoryConfigurer configurer){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);return factory;}@Bean(name ="twoRabbitAdmin")publicRabbitAdmintwoRabbitAdmin(@Qualifier("twoConnectionFactory")ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}}

2.3 创建队列及交换机并绑定

packagecom.yuhuofei.mq.config;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Configuration;importjavax.annotation.PostConstruct;importjavax.annotation.Resource;/**
 * @author yuhuofei
 * @version 1.0
 * @description 创建队列、交换机并绑定
 * @date 2022/10/3 18:15
 */publicclassQueueConfig{@Resource(name ="oneRabbitAdmin")privateRabbitAdmin oneRabbitAdmin;@Resource(name ="twoRabbitAdmin")privateRabbitAdmin twoRabbitAdmin;@Value("${one.out.queue}")privateString oneOutQueue;@Value("${one.out.queue}")privateString oneRoutingKey;@Value("${two.output.queue}")privateString twoOutQueue;@Value("${two.output.queue}")privateString twoRoutingKey;@Value("${one.topic.exchange.name}")privateString oneTopicExchange;@Value("${two.topic.exchange.name}")privateString twoTopicExchange;@PostConstructpublicvoidoneRabbitInit(){//声明交换机
        oneRabbitAdmin.declareExchange(newTopicExchange(oneTopicExchange,true,false));//声明队列
        oneRabbitAdmin.declareQueue(newQueue(oneOutQueue,true,false,false));//绑定队列及交换机
        oneRabbitAdmin.declareBinding(BindingBuilder.bind(newQueue(oneOutQueue,true,false,false)).to(newTopicExchange(oneTopicExchange,true,false)).with(oneRoutingKey));}@PostConstructpublicvoidtwoRabbitInit(){//声明交换机
        twoRabbitAdmin.declareExchange(newTopicExchange(twoTopicExchange,true,false));//声明队列
        twoRabbitAdmin.declareQueue(newQueue(twoOutQueue,true));//绑定队列及交换机
        twoRabbitAdmin.declareBinding(BindingBuilder.bind(newQueue(twoOutQueue,true,false,false)).to(newTopicExchange(twoTopicExchange,true,false)).with(twoRoutingKey));}}

2.4 配置信息

这里的配置信息,需要与各自的关联方约定好再配置

# 与关联方one的MQ配置
one.spring.rabbitmq.host=one.mq.com
one.spring.rabbitmq.port=5672
one.spring.rabbitmq.username=xxxxx
one.spring.rabbitmq.password=xxxxx
one.spring.rabbitmq.virtual-host=/xxxxx
one.out.queue=xxxaa.ssssd.cffs.xxxx
one.topic.exchange.name=oneTopExchange

# 与关联方two的MQ配置
two.spring.rabbitmq.host=two.mq.com
two.spring.rabbitmq.port=5672
two.spring.rabbitmq.username=aaaaaaa
two.spring.rabbitmq.password=aaaaaaa
two.spring.rabbitmq.virtualHost=/aaaaaaa
two.out.queue=ddddd.sssss.hhhhh.eeee
two.topic.exchange.name=twoTopExchange

2.5 注意点

在连接多个 MQ 的情况下,需要在某个连接加上 @Primary 注解(见 2.1 中的代码),表示主连接,默认使用这个连接,如果不加,服务会起不来

3. 使 用

3.1 作为消费者

由于在前面的 2.3 中,声明了队列及交换机,并进行了绑定,那么作为消费者,监听相应的队列,获取关联方发送的消息进行处理即可。这里用监听关联方 one 的出队列做展示,two 的类似。

需要注意的地方是,在监听队列时,需要指定 ContainerFactory。

packagecom.yuhuofei.mq.service;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;importjava.nio.charset.StandardCharsets;/**
 * @author yuhuofei
 * @version 1.0
 * @description 监听关联方one的消息
 * @date 2022/10/3 18:38
 */@Slf4j@ServicepublicclassOneReceive{@RabbitListener(queues ="${one.out.queue}", containerFactory ="oneFactory")publicvoidlistenOne(Message message,Channel channel){//获取MQ返回的数据
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);String data =newString(message.getBody(),StandardCharsets.UTF_8);
        log.info("MQ返回的数据:{}", data);//下面进行业务逻辑处理}}

3.1 作为生产者

使用之前重写的 RabbitTemplate ,向各个关联方指定的队列发送消息。

packagecom.yuhuofei.mq.service;importcom.google.gson.JsonObject;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageProperties;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.amqp.rabbit.support.CorrelationData;importorg.springframework.stereotype.Service;importjavax.annotation.Resource;/**
 * @author yuhuofei
 * @version 1.0
 * @description 向关联方的队列发送消息
 * @date 2022/10/3 18:47
 */@Slf4j@ServicepublicclassSendMessage{@Resource(name ="oneRabbitTemplate")privateRabbitTemplate oneRabbitTemplate;@Resource(name ="twoRabbitTemplate")privateRabbitTemplate twoRabbitTemplate;publicvoidsendToOneMessage(String messageId,OneMessageConverter message){String exchange = message.getExchange();String routingKey = message.getRoutingKey();JsonObject data = message.getData();MessageProperties messageProperties =newMessageProperties();
        messageProperties.setContentType("application/json");Message info =newMessage(data.toString().getBytes(), messageProperties);
        info.getMessageProperties().setMessageId(messageId);
        oneRabbitTemplate.convertAndSend(exchange, routingKey, info,newCorrelationData(messageId));}publicvoidsendToTwoMessage(String messageId,TwoMessageConverter message){String exchange = message.getExchange();String routingKey = message.getRoutingKey();JsonObject data = message.getData();MessageProperties messageProperties =newMessageProperties();
        messageProperties.setContentType("application/json");Message info =newMessage(data.toString().getBytes(), messageProperties);
        info.getMessageProperties().setMessageId(messageId);
        twoRabbitTemplate.convertAndSend(exchange, routingKey, info,newCorrelationData(messageId));}}

本文转载自: https://blog.csdn.net/Crezfikbd/article/details/127153971
版权归原作者 yuhuofei2021 所有, 如有侵权,请联系我们删除。

“SpringBoot连接多个RabbitMQ”的评论:

还没有评论