0


2024/5/13 SpringBoot配置多个RabbitMQ

需求描述:原SpringBoot工程已经配置了一个RabbitMQ,现需求是再配置一个RabbitMQ,实现效果是不同RabbitMQ推送到不同的队列中,且互不干扰影响使用。

一、单个RabbitMQ配置

1.1、导入Maven坐标

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.4.4</version>
        </dependency>

1.2、yaml配置

  rabbitmq:
    host: xx.xxx.xxx.xxx
    port: xxxx
    username: xxxx
    password: xxxxxx
    virtual-host: xxxx
    publisher-returns: true
    publisher-confirms: true
    listener:
       simple:
         default-requeue-rejected: true
           retry:
            enabled: false
            max-attempts: 3
            initial-interval: 5000

1.3、java配置类

1.3.1、交换机配置

package com.ruoyi.report.config;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ExchangeConfig {

    public static final String ecoa_exchange = "ecoaExchange";

    /**
     * 1.定义direct exchange
     * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
     * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
     */
    @Bean
    public DirectExchange ecoaExchange() {
        DirectExchange directExchange = new DirectExchange(ecoa_exchange, true, false);
        return directExchange;
    }

}

1.3.2、队列配置

package com.ruoyi.report.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @ClassName QueueConfig
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:26
 * @Version 1.0
 **/
@Component
public class QueueConfig {

    private static final String ecoa_file_upload_queue = "ecoa_file_upload_queue";

    @Bean
    public Queue ecoaFileUploadDispatchQueue() {
        /**
         durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
         auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
         exclusive  表示该消息队列是否只在当前connection生效,默认是false
         */
        return new Queue(ecoa_file_upload_queue, true, false, false);
    }
}

1.3.3、绑定配置

package com.ruoyi.report.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @ClassName BindingConfig
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:31
 * @Version 1.0
 **/
@Component
public class BindingConfig {
    @Autowired
    private QueueConfig queueConfig;
    @Autowired
    private ExchangeConfig exchangeConfig;

    public static final String ECOA_file_upload_key = "ecoa_file_upload_key";

    @Bean
    public Binding ecoaFileUploadDispatchBinding() {
        return BindingBuilder.bind(queueConfig.ecoaFileUploadDispatchQueue()).to(exchangeConfig.ecoaExchange()).with(ECOA_file_upload_key);
    }
}

1.3.4、连接配置

package com.ruoyi.report.config;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName RabbitMqConfig
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:14
 * @Version 1.0
 **/
@Configuration
public class RabbitMqConfig {
    /**
     * 连接工厂
     */
    @Autowired
    private ConnectionFactory connectionFactory;

    /**
     * 自定义rabbit template用于数据的接收和发送
     * 可以设置消息确认机制和回调
     *
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        return template;
    }
}

1.4、生产者与消费者操作配置

1.4.1、生产者操作配置

package com.ruoyi.report.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.report.config.BindingConfig;
import com.ruoyi.report.config.ExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * @ClassName MessageUtils
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:36
 * @Version 1.0
 **/
@Component
public class MessageUtils {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     * 发送随货单信息
     * @param message 消息
     */
    public void sendMessage(Object message) {
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
        rabbitTemplate.convertAndSend(ExchangeConfig.ecoa_exchange, BindingConfig.ECOA_file_upload_key, msg, correlationId);
    }
}

1.4.2、消费者操作配置

package com.ruoyi.report.consumer;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @ClassName PrintFeedbackConsumer
 * @Description
 * @Author Mr.Huang
 * @Date 2024/4/30 10:23
 * @Version 1.0
 **/
@Slf4j
@Component
public class PrintFeedbackConsumer {

    @Autowired
    private PrintSendLogService printSendLogService;

    @RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
    public void receiveMq(Message message, Channel channel) {
        try {
            String body = new String(message.getBody());
            log.info("接受【Print结果推送】RabbitMQ消息:"+body);
            JSONObject objJson = JSONObject.parseObject(body);
            Thread.sleep(1000);
            PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
            printSendLogService.updatePrintSendLog(printResult);
        }catch (Exception e){
            log.error("",e);
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

二、多个RabbitMQ配置

   ** Maven坐标与上面单个RabbitMQ配置一致**

2.1、yaml配置

  rabbitmq:
    first:
      host: xx.xxx.xxx.xxx
      port: xxxx
      username: xxxx
      password: xxxxxx
      virtual-host: xxxx
      publisher-returns: true
      publisher-confirms: true
      listener:
          simple:
            default-requeue-rejected: true
            retry:
              enabled: false
              max-attempts: 3
              initial-interval: 5000          
    second:
      host: xx.xxx.xxx.xxx
      port: xxxx
      username: xxxx
      password: xxxxxx
      publisher-returns: true
      publisher-confirms: true
      virtual-host: xxxx
      listener:
          simple:
            default-requeue-rejected: true
            retry:
              enabled: false
              max-attempts: 3
              initial-interval: 5000 

2.2、java配置类

package com.ruoyi.report.config;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * @ClassName RabbitMqConfig
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:14
 * @Version 1.0
 **/
@Configuration
public class RabbitMqConfig {
    // 第一个MQ电子药检队列与key
    public static final String ECOA_file_upload_queue = "ecoa_file_upload_queue";
    public static final String ECOA_file_upload_key = "ecoa_file_upload_key";

    // 第二个MQ单据打印平台队列与key
    public static final String print_tms_dispatch_info_queue = "print_tms_dispatch_info_queue";
    public static final String print_4pl_dispatch_info_feedback_queue = "print_4pl_dispatch_info_feedback_queue";
    public static final String print_tms_dispatch_info_key = "print_tms_dispatch_info_key";
    public static final String print_4pl_dispatch_info_feedback_key = "print_4pl_dispatch_info_feedback_key";
    /** 交换机名称 */
    public static final String EXCHANGE = "ecoaExchange";
    public static final String EXCHANGE2 = "tms_exchange";

    /** 第一个rabbitMq队列 */
    @Bean(name = "ECOAConnectionFactory")
    @Primary
    public ConnectionFactory ECOAConnectionFactory(@Value("${spring.rabbitmq.first.host}") String host,
                                                  @Value("${spring.rabbitmq.first.port}") int port,
                                                  @Value("${spring.rabbitmq.first.username}") String username,
                                                  @Value("${spring.rabbitmq.first.password}") String password,
                                                  @Value("${spring.rabbitmq.first.virtual-host}") String virtualHost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }

    /** 第二个rabbitMq队列 */
    @Bean(name = "printConnectionFactory")
    public ConnectionFactory printConnectionFactory(@Value("${spring.rabbitmq.second.host}") String host,
                                                   @Value("${spring.rabbitmq.second.port}") int port,
                                                   @Value("${spring.rabbitmq.second.username}") String username,
                                                   @Value("${spring.rabbitmq.second.password}") String password,
                                                   @Value("${spring.rabbitmq.second.virtual-host}") String virtualHost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }
    /** 第一个rabbitMq操作模板 */
    @Bean(name="ECOARabbitTemplate")
    @Primary
    public RabbitTemplate fplRabbitTemplate(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory){
        RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
        return firstRabbitTemplate;
    }
    /** 第二个rabbitMq操作模板 */
    @Bean(name="printRabbitTemplate")
    public RabbitTemplate tcscRabbitTemplate(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory){
        RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
        return secondRabbitTemplate;
    }
    /** 第一个rabbitMq连接工厂 */
    @Bean(name="ECOAContainerFactory")
    public SimpleRabbitListenerContainerFactory ECOAContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                    @Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setMaxConcurrentConsumers(5);
        factory.setConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
    /** 第二个rabbitMq连接工厂 */
    @Bean(name="printContainerFactory")
    public SimpleRabbitListenerContainerFactory printContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                     @Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setMaxConcurrentConsumers(5);
        factory.setConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
    /** 第一个mq绑定队列绑定交换机 */
    @Bean
    public String runECOAQueue(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
        System.out.println("configuration ECOAQueue ........................");
        Connection connection = connectionFactory.createConnection();
        Channel channel = connection.createChannel(false);
        try {
            channel.exchangeDeclare(EXCHANGE, "direct", true, false, null);
            // 单据推送电子药检队列
            channel.queueDeclare(ECOA_file_upload_queue, true, false, false, null);
            channel.queueBind(ECOA_file_upload_queue, EXCHANGE, ECOA_file_upload_key);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            return "ECOAQueue";
        }
    }
    /** 第二个mq绑定队列绑定交换机 */
    @Bean
    public String runPrintQueue(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
        System.out.println("configuration printQueue ........................");
        Connection connection = connectionFactory.createConnection();
        Channel channel = connection.createChannel(false);
        try {
            channel.exchangeDeclare(EXCHANGE2, "direct", true, false, null);
            // 单据推送单据打印平台队列
            channel.queueDeclare(print_tms_dispatch_info_queue, true, false, false, null);
            channel.queueBind(print_tms_dispatch_info_queue, EXCHANGE2, print_tms_dispatch_info_key);
            // 单据打印平台反馈队列
            channel.queueDeclare(print_4pl_dispatch_info_feedback_queue,true,false,false,null);
            channel.queueBind(print_4pl_dispatch_info_feedback_queue,EXCHANGE2,print_4pl_dispatch_info_feedback_key);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            return "printQueue";
        }
    }
}

注意:需将原MQ:交换机、队列、绑定配置类注释掉,只留这一个配置文件即可,这个配置文件已经将对应的:交换机、队列绑定好,只是需要注意队列名字、交换机不要绑定错了

2.3、生产者与消费者操作配置

2.3.1、生产者操作配置

package com.ruoyi.report.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.report.config.RabbitMqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @ClassName MessageUtils
 * @Description
 * @Author Mr.Huang
 * @Date 2023/9/22 16:36
 * @Version 1.0
 **/
@Component
public class MessageUtils {

    @Resource(name = "ECOARabbitTemplate")
    private RabbitTemplate ECOARabbitTemplate;

    @Resource(name = "printRabbitTemplate")
    private RabbitTemplate printRabbitTemplate;

    /**
     * 向ECOA发送消息
     * 发送随货单信息
     * @param message 消息
     */
    public void sendMessage(Object message) {
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
        ECOARabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ECOA_file_upload_key, msg, correlationId);
    }

    /**
     * 向print发送消息
     * 发送派车单信息
     * @param message 消息
     */
    public void sendPrintMessage(Object message) {
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
        printRabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE2, RabbitMqConfig.print_tms_dispatch_info_key, msg, correlationId);
    }
}

2.3.1、消费者操作配置

package com.ruoyi.report.consumer;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @ClassName PrintFeedbackConsumer
 * @Description
 * @Author Mr.Huang
 * @Date 2024/4/30 10:23
 * @Version 1.0
 **/
@Slf4j
@Component
public class PrintFeedbackConsumer {

    @Autowired
    private PrintSendLogService printSendLogService;

    @RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
    public void receiveMq(Message message, Channel channel) {
        try {
            String body = new String(message.getBody());
            log.info("接受【Print结果推送】RabbitMQ消息:"+body);
            JSONObject objJson = JSONObject.parseObject(body);
            Thread.sleep(1000);
            PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
            printSendLogService.updatePrintSendLog(printResult);
        }catch (Exception e){
            log.error("",e);
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

与单个RabbitMQ消费者操作一致,只是注意要消费的队列和连接工厂不要搞错了

三、总结

配置单个RabbitMQ时不需要关心底层的连接工厂是如何配置的,当把yaml内容填好它会自动配置连接工厂,只需要把交换机、队列、配置绑定起来即可。 当需要配置多个mq时才需要自己手动配置连接工厂,并不是只能配置两个RabbitMQ,可以按这个格式配置更多个。唯一注意的是不要把这些队列和交换机搞混了即可。


本文转载自: https://blog.csdn.net/Abcdzzr/article/details/138804832
版权归原作者 Abcdzzr 所有, 如有侵权,请联系我们删除。

“2024/5/13 SpringBoot配置多个RabbitMQ”的评论:

还没有评论