目前公司使用jeepluscloud版本,这个版本没有集成消息队列,这里记录一下,集成的过程;这个框架跟ruoyi的那个微服务版本结构一模一样,所以也可以快速上手。
1.项目结构图:
配置类的东西做成一个公共的模块
rabbitmq模块:
2.核心配置
1.pom类
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>jeeplus-common</artifactId>
<groupId>org.jeeplus</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jeeplus-common-rabbitmq</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.jeeplus</groupId>
<artifactId>jeeplus-common-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
</project>
2.ConditionalOnRabbit
package com.jeeplus.common.rabbit.conditional;
import org.springframework.context.annotation.Conditional;
import java.lang.annotation.*;
/**
* 判断系统是否在启用了Rabbit, 未启用的情况下不将Bean注册到系统中
*
* 使用场景: 在不使用Rabbit中间件但未去除Rabbit依赖的情况下, 通过配置文件中关闭Rabbit选项,
* 同时将这个注解到有`@RabbitListener`标志的类上,让这个对象不注册到Spring容器中,
* 从而避免`RabbitMQ`进行无限尝试重连服务器,导致项目一直抛出异常,影响开发和使用。
*
* @author xxm
* @since 2022/12/12
*/
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Conditional(OnRabbitEnable.class)
public @interface ConditionalOnRabbit {
}
3.OnRabbitEnable
package com.jeeplus.common.rabbit.conditional;
import com.jeeplus.common.rabbit.configuration.RabbitMqProperties;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
/**
* 判断是否在启用了Rabbit, 用来控制在没启用Rabbit情况下. 不将 @RabbitListener 修饰的监听器注册为Bean, 不然会导致无限尝试重连
*
* @author xxm
* @since 2022/12/12
*/
public class OnRabbitEnable implements Condition {
private final String rabbitPropertiesPrefix = "com.jeeplus.common.rabbit";
/**
* @param context
* @param metadata
* @return
*/
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
RabbitMqProperties rabbitMqProperties = Binder.get(context.getEnvironment())
.bind(rabbitPropertiesPrefix, RabbitMqProperties.class)
.orElse(new RabbitMqProperties());
return rabbitMqProperties.isEnable();
}
}
4.BootxRabbitListenerConfigurer
package com.jeeplus.common.rabbit.configuration;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
/**
* Rabbit 侦听器配置器
*
* @author xxm
* @since 2021/6/25
*/
@Configuration
@RequiredArgsConstructor
public class BootxRabbitListenerConfigurer implements RabbitListenerConfigurer {
private final DefaultMessageHandlerMethodFactory jsonHandlerMethodFactory;
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(jsonHandlerMethodFactory);
}
}
5.BootxRabbitListenerConfigurer
package com.jeeplus.common.rabbit.configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
/**
* 消息队列配置
*
* @author xxm
* @since 2021/6/25
*/
@EnableRabbit
@Configuration
public class RabbitMqConfigurer {
/**
* 注册 RabbitTemplate 对象, 使用默认序列化方式
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ObjectMapper objectMapper) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 使用系统同版jackson 序列化配置
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper));
return rabbitTemplate;
}
/**
* 添加默认消息序列化方式, 使用默认序列化方式
*/
@Bean
public DefaultMessageHandlerMethodFactory jsonHandlerMethodFactory(ObjectMapper objectMapper) {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
// 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(objectMapper);
factory.setMessageConverter(converter);
return factory;
}
}
6.RabbitMqConfigurer
package com.jeeplus.common.rabbit.configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
/**
* 消息队列配置
*
* @author xxm
* @since 2021/6/25
*/
@EnableRabbit
@Configuration
public class RabbitMqConfigurer {
/**
* 注册 RabbitTemplate 对象, 使用默认序列化方式
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ObjectMapper objectMapper) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 使用系统同版jackson 序列化配置
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper));
return rabbitTemplate;
}
/**
* 添加默认消息序列化方式, 使用默认序列化方式
*/
@Bean
public DefaultMessageHandlerMethodFactory jsonHandlerMethodFactory(ObjectMapper objectMapper) {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
// 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(objectMapper);
factory.setMessageConverter(converter);
return factory;
}
}
7.RabbitMqProperties
package com.jeeplus.common.rabbit.configuration;
import com.jeeplus.common.rabbit.conditional.ConditionalOnRabbit;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* MQTT配置
*
* @author xxm
* @since 2022/12/12
*/
@Getter
@Setter
@ConfigurationProperties("com.jeeplus.common.rabbit")
public class RabbitMqProperties {
/**
* 是否开启 RabbitMQ功能,
* @see ConditionalOnRabbit 配合此注解使用
*/
private boolean enable = false;
}
8.RabbitMqCommonAutoConfiguration
package com.jeeplus.common.rabbit;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* RabbitMQ配置
*
* @author xxm
* @since 2022/5/3
*/
@SpringBootApplication
public class RabbitMqCommonAutoConfiguration {
}
9.org.springframework.boot.autoconfigure.AutoConfiguration.imports
RabbitMqCommonAutoConfiguration
10.spring.factories
## 配置自定义 starter 的自动化配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.jeeplus.common.rabbit.RabbitMqCommonAutoConfiguration
3. nacos配置
哪一个服务模块需要消息队列,就在对应的yml文件中配置 rabbit链接
#rabbitmq
rabbitmq:
host: localhost
port: 5627
username: root
password: root123
virtual-host: /
publisher-confirm-type: correlated
listener:
simple:
acknowledge-mode: manual
4.服务中调用rabbitmq
建立两个包,配置类和监听类
1.mq模板配置
package com.jeeplus.duxin.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* mq模板
* @author lgn
* @date 2023/10/28 10:15
*/
@Configuration
public class MyRabbitConfig {
private RabbitTemplate rabbitTemplate;
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*
*/
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}
2.服务交换机 队列设置
初始化交换机,队列,建立绑定。
package com.jeeplus.duxin.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* 服务交换机 队列设置
* @author lgn
* @date 2023/10/28 10:16
*/
@Configuration
public class MyRabbitMQConfig {
/* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */
/* *//**
* 初始化队列
* 死信队列
*
* @return
*//*@Bean
public Queue orderDelayQueue() {
*//*
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 属性
*//*
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}*/
/* *//**
* 初始化队列
* 普通队列
*
* @return
*//*
@Bean
public Queue orderReleaseQueue() {
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}*/
/* *//**
*
* TopicExchange
* 创建topic类型的交换机
* @return
*//*
@Bean
public Exchange orderEventExchange() {
*//*
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
* *//*
return new TopicExchange("order-event-exchange", true, false);
}*/
/* *//**
* 路由和交换机进行绑定 设置路由key
* @author lgn
* @date 2023/10/28 10:33
* @return Binding
*//*
@Bean
public Binding orderCreateBinding() {
*//*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* *//*
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}*/
/* @Bean
public Binding orderReleaseBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
*//**
* 订单释放直接和库存释放进行绑定
* @return
*//*
@Bean
public Binding orderReleaseOtherBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}*/
/* *//**
* 初始化队列
* 商品秒杀队列
* @return
*//*
@Bean
public Queue orderSecKillOrrderQueue() {
Queue queue = new Queue("order.seckill.order.queue", true, false, false);
return queue;
}
@Bean
public Binding orderSecKillOrrderQueueBinding() {
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map<String, Object> arguments
Binding binding = new Binding(
"order.seckill.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.seckill.order",
null);
return binding;
}*/
/**
* BOM模块的交换机
* TopicExchange
* 创建topic类型的交换机
* @return
*/
@Bean
public Exchange orderEventExchange() {
/*
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
* */
return new TopicExchange("bom-event-exchange", true, false);
}
/**
* 初始化BOM队列
* @return
*/
@Bean
public Queue bomMaintenanceQueue() {
Queue queue = new Queue("bom.maintenance.queue", true, false, false);
return queue;
}
/**
* bom
* 路由和交换机进行绑定 设置路由key
* @author lgn
* @date 2023/10/28 10:33
* @return Binding
*/
@Bean
public Binding bomCreateBinding() {
/*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding("bom.maintenance.queue",
Binding.DestinationType.QUEUE,
"bom-event-exchange",
"bom.maintenance.create",
null);
}
/**
* 初始化产品存货档案队列
* @return
*/
@Bean
public Queue stockDocQueue() {
Queue queue = new Queue("stock.doc.queue", true, false, false);
return queue;
}
/**
* 存货档案StockDoc
* 路由和交换机进行绑定 设置路由key
* @author lgn
* @date 2023/10/28 10:33
* @return Binding
*/
@Bean
public Binding docCreateBinding() {
/*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding("stock.doc.queue",
Binding.DestinationType.QUEUE,
"bom-event-exchange",
"stock.doc.create",
null);
}
/**
* 调用C++模块的交换机
* TopicExchange
* 创建topic类型的交换机
* @return
*/
@Bean
public Exchange cEventExchange() {
/*
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
* */
return new TopicExchange("c-event-exchange", true, false);
}
/**
* 初始化c++生成记录文件队列
* @return
*/
@Bean
public Queue cCreatFileQueue() {
Queue queue = new Queue("c.creatfile.queue", true, false, false);
return queue;
}
/**
* 初始化c++签名队列
* @return
*/
@Bean
public Queue cDealQueue() {
Queue queue = new Queue("c.deal.queue", true, false, false);
return queue;
}
/**
* 创建绑定关系
* @author lgn
* @date 2023/10/30 9:34
* @return Binding
*/
@Bean
public Binding cCreatFileCreateBinding() {
/*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding("c.creatfile.queue",
Binding.DestinationType.QUEUE,
"c-event-exchange",
"c.creatFile.create",
null);
}
/**
* 创建绑定关系
* @author lgn
* @date 2023/10/30 9:34
* @return Binding
*/
@Bean
public Binding cDealBinding() {
/*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding("c.deal.queue",
Binding.DestinationType.QUEUE,
"c-event-exchange",
"c.deal.create",
null);
}
}
3.监听队列 接收消息
消费方消费消息
package com.jeeplus.duxin.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 监听路由信息
* @author lgn
* @date 2023/10/28 10:33
*/
@Slf4j
@Component
//@RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送时类型一致
//@RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue 里面的消息。
@RabbitListener(queues = "bom.maintenance.queue")
public class MQTestListener {
//@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
//@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。
@RabbitHandler
public void listener(String info,Channel channel, Message message) throws IOException {
System.out.println("=============接收消息开始执行:"+info);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
5.使用
使用起来也非常方便:
在业务service中直接调用,生产者消息发送。
/**
* mqTest
* @author lgn
* @date 2023/10/28 10:03
* @return Object
*/
public String mqTest() {
//TODO 订单创建成功,发送消息给MQ
rabbitTemplate.convertAndSend("bom-event-exchange","bom.maintenance.create","1234");
return null;
}
希望对你有用!
版权归原作者 抹香鲸之海 所有, 如有侵权,请联系我们删除。