需求描述:原SpringBoot工程已经配置了一个RabbitMQ,现需求是再配置一个RabbitMQ,实现效果是不同RabbitMQ推送到不同的队列中,且互不干扰影响使用。
一、单个RabbitMQ配置
1.1、导入Maven坐标
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.4</version>
</dependency>
1.2、yaml配置
rabbitmq:
host: xx.xxx.xxx.xxx
port: xxxx
username: xxxx
password: xxxxxx
virtual-host: xxxx
publisher-returns: true
publisher-confirms: true
listener:
simple:
default-requeue-rejected: true
retry:
enabled: false
max-attempts: 3
initial-interval: 5000
1.3、java配置类
1.3.1、交换机配置
package com.ruoyi.report.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ExchangeConfig {
public static final String ecoa_exchange = "ecoaExchange";
/**
* 1.定义direct exchange
* 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
* 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
*/
@Bean
public DirectExchange ecoaExchange() {
DirectExchange directExchange = new DirectExchange(ecoa_exchange, true, false);
return directExchange;
}
}
1.3.2、队列配置
package com.ruoyi.report.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @ClassName QueueConfig
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:26
* @Version 1.0
**/
@Component
public class QueueConfig {
private static final String ecoa_file_upload_queue = "ecoa_file_upload_queue";
@Bean
public Queue ecoaFileUploadDispatchQueue() {
/**
durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
return new Queue(ecoa_file_upload_queue, true, false, false);
}
}
1.3.3、绑定配置
package com.ruoyi.report.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @ClassName BindingConfig
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:31
* @Version 1.0
**/
@Component
public class BindingConfig {
@Autowired
private QueueConfig queueConfig;
@Autowired
private ExchangeConfig exchangeConfig;
public static final String ECOA_file_upload_key = "ecoa_file_upload_key";
@Bean
public Binding ecoaFileUploadDispatchBinding() {
return BindingBuilder.bind(queueConfig.ecoaFileUploadDispatchQueue()).to(exchangeConfig.ecoaExchange()).with(ECOA_file_upload_key);
}
}
1.3.4、连接配置
package com.ruoyi.report.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName RabbitMqConfig
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:14
* @Version 1.0
**/
@Configuration
public class RabbitMqConfig {
/**
* 连接工厂
*/
@Autowired
private ConnectionFactory connectionFactory;
/**
* 自定义rabbit template用于数据的接收和发送
* 可以设置消息确认机制和回调
*
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
return template;
}
}
1.4、生产者与消费者操作配置
1.4.1、生产者操作配置
package com.ruoyi.report.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.report.config.BindingConfig;
import com.ruoyi.report.config.ExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @ClassName MessageUtils
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:36
* @Version 1.0
**/
@Component
public class MessageUtils {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
* 发送随货单信息
* @param message 消息
*/
public void sendMessage(Object message) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
rabbitTemplate.convertAndSend(ExchangeConfig.ecoa_exchange, BindingConfig.ECOA_file_upload_key, msg, correlationId);
}
}
1.4.2、消费者操作配置
package com.ruoyi.report.consumer;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName PrintFeedbackConsumer
* @Description
* @Author Mr.Huang
* @Date 2024/4/30 10:23
* @Version 1.0
**/
@Slf4j
@Component
public class PrintFeedbackConsumer {
@Autowired
private PrintSendLogService printSendLogService;
@RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
public void receiveMq(Message message, Channel channel) {
try {
String body = new String(message.getBody());
log.info("接受【Print结果推送】RabbitMQ消息:"+body);
JSONObject objJson = JSONObject.parseObject(body);
Thread.sleep(1000);
PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
printSendLogService.updatePrintSendLog(printResult);
}catch (Exception e){
log.error("",e);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
二、多个RabbitMQ配置
** Maven坐标与上面单个RabbitMQ配置一致**
2.1、yaml配置
rabbitmq:
first:
host: xx.xxx.xxx.xxx
port: xxxx
username: xxxx
password: xxxxxx
virtual-host: xxxx
publisher-returns: true
publisher-confirms: true
listener:
simple:
default-requeue-rejected: true
retry:
enabled: false
max-attempts: 3
initial-interval: 5000
second:
host: xx.xxx.xxx.xxx
port: xxxx
username: xxxx
password: xxxxxx
publisher-returns: true
publisher-confirms: true
virtual-host: xxxx
listener:
simple:
default-requeue-rejected: true
retry:
enabled: false
max-attempts: 3
initial-interval: 5000
2.2、java配置类
package com.ruoyi.report.config;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* @ClassName RabbitMqConfig
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:14
* @Version 1.0
**/
@Configuration
public class RabbitMqConfig {
// 第一个MQ电子药检队列与key
public static final String ECOA_file_upload_queue = "ecoa_file_upload_queue";
public static final String ECOA_file_upload_key = "ecoa_file_upload_key";
// 第二个MQ单据打印平台队列与key
public static final String print_tms_dispatch_info_queue = "print_tms_dispatch_info_queue";
public static final String print_4pl_dispatch_info_feedback_queue = "print_4pl_dispatch_info_feedback_queue";
public static final String print_tms_dispatch_info_key = "print_tms_dispatch_info_key";
public static final String print_4pl_dispatch_info_feedback_key = "print_4pl_dispatch_info_feedback_key";
/** 交换机名称 */
public static final String EXCHANGE = "ecoaExchange";
public static final String EXCHANGE2 = "tms_exchange";
/** 第一个rabbitMq队列 */
@Bean(name = "ECOAConnectionFactory")
@Primary
public ConnectionFactory ECOAConnectionFactory(@Value("${spring.rabbitmq.first.host}") String host,
@Value("${spring.rabbitmq.first.port}") int port,
@Value("${spring.rabbitmq.first.username}") String username,
@Value("${spring.rabbitmq.first.password}") String password,
@Value("${spring.rabbitmq.first.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
/** 第二个rabbitMq队列 */
@Bean(name = "printConnectionFactory")
public ConnectionFactory printConnectionFactory(@Value("${spring.rabbitmq.second.host}") String host,
@Value("${spring.rabbitmq.second.port}") int port,
@Value("${spring.rabbitmq.second.username}") String username,
@Value("${spring.rabbitmq.second.password}") String password,
@Value("${spring.rabbitmq.second.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
/** 第一个rabbitMq操作模板 */
@Bean(name="ECOARabbitTemplate")
@Primary
public RabbitTemplate fplRabbitTemplate(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory){
RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
return firstRabbitTemplate;
}
/** 第二个rabbitMq操作模板 */
@Bean(name="printRabbitTemplate")
public RabbitTemplate tcscRabbitTemplate(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory){
RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory);
return secondRabbitTemplate;
}
/** 第一个rabbitMq连接工厂 */
@Bean(name="ECOAContainerFactory")
public SimpleRabbitListenerContainerFactory ECOAContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMaxConcurrentConsumers(5);
factory.setConcurrentConsumers(1);
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
/** 第二个rabbitMq连接工厂 */
@Bean(name="printContainerFactory")
public SimpleRabbitListenerContainerFactory printContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMaxConcurrentConsumers(5);
factory.setConcurrentConsumers(1);
factory.setPrefetchCount(1);
configurer.configure(factory, connectionFactory);
return factory;
}
/** 第一个mq绑定队列绑定交换机 */
@Bean
public String runECOAQueue(@Qualifier("ECOAConnectionFactory") ConnectionFactory connectionFactory) {
System.out.println("configuration ECOAQueue ........................");
Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(false);
try {
channel.exchangeDeclare(EXCHANGE, "direct", true, false, null);
// 单据推送电子药检队列
channel.queueDeclare(ECOA_file_upload_queue, true, false, false, null);
channel.queueBind(ECOA_file_upload_queue, EXCHANGE, ECOA_file_upload_key);
} catch (Exception e) {
e.printStackTrace();
} finally {
return "ECOAQueue";
}
}
/** 第二个mq绑定队列绑定交换机 */
@Bean
public String runPrintQueue(@Qualifier("printConnectionFactory") ConnectionFactory connectionFactory) {
System.out.println("configuration printQueue ........................");
Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(false);
try {
channel.exchangeDeclare(EXCHANGE2, "direct", true, false, null);
// 单据推送单据打印平台队列
channel.queueDeclare(print_tms_dispatch_info_queue, true, false, false, null);
channel.queueBind(print_tms_dispatch_info_queue, EXCHANGE2, print_tms_dispatch_info_key);
// 单据打印平台反馈队列
channel.queueDeclare(print_4pl_dispatch_info_feedback_queue,true,false,false,null);
channel.queueBind(print_4pl_dispatch_info_feedback_queue,EXCHANGE2,print_4pl_dispatch_info_feedback_key);
} catch (Exception e) {
e.printStackTrace();
} finally {
return "printQueue";
}
}
}
注意:需将原MQ:交换机、队列、绑定配置类注释掉,只留这一个配置文件即可,这个配置文件已经将对应的:交换机、队列绑定好,只是需要注意队列名字、交换机不要绑定错了
2.3、生产者与消费者操作配置
2.3.1、生产者操作配置
package com.ruoyi.report.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.report.config.RabbitMqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @ClassName MessageUtils
* @Description
* @Author Mr.Huang
* @Date 2023/9/22 16:36
* @Version 1.0
**/
@Component
public class MessageUtils {
@Resource(name = "ECOARabbitTemplate")
private RabbitTemplate ECOARabbitTemplate;
@Resource(name = "printRabbitTemplate")
private RabbitTemplate printRabbitTemplate;
/**
* 向ECOA发送消息
* 发送随货单信息
* @param message 消息
*/
public void sendMessage(Object message) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
ECOARabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ECOA_file_upload_key, msg, correlationId);
}
/**
* 向print发送消息
* 发送派车单信息
* @param message 消息
*/
public void sendPrintMessage(Object message) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
Message msg = MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();
printRabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE2, RabbitMqConfig.print_tms_dispatch_info_key, msg, correlationId);
}
}
2.3.1、消费者操作配置
package com.ruoyi.report.consumer;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName PrintFeedbackConsumer
* @Description
* @Author Mr.Huang
* @Date 2024/4/30 10:23
* @Version 1.0
**/
@Slf4j
@Component
public class PrintFeedbackConsumer {
@Autowired
private PrintSendLogService printSendLogService;
@RabbitListener(queues = {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory = "printContainerFactory")
public void receiveMq(Message message, Channel channel) {
try {
String body = new String(message.getBody());
log.info("接受【Print结果推送】RabbitMQ消息:"+body);
JSONObject objJson = JSONObject.parseObject(body);
Thread.sleep(1000);
PrintResult printResult = JSONObject.toJavaObject(objJson, PrintResult.class);
printSendLogService.updatePrintSendLog(printResult);
}catch (Exception e){
log.error("",e);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
与单个RabbitMQ消费者操作一致,只是注意要消费的队列和连接工厂不要搞错了
三、总结
配置单个RabbitMQ时不需要关心底层的连接工厂是如何配置的,当把yaml内容填好它会自动配置连接工厂,只需要把交换机、队列、配置绑定起来即可。 当需要配置多个mq时才需要自己手动配置连接工厂,并不是只能配置两个RabbitMQ,可以按这个格式配置更多个。唯一注意的是不要把这些队列和交换机搞混了即可。
版权归原作者 Abcdzzr 所有, 如有侵权,请联系我们删除。