1. Spring AMQP 简介
Spring AMQP(Spring for Advanced Message Queuing Protocol) 是 Spring 框架的一个子项目,用于简化与消息代理(如 RabbitMQ)的集成。Spring AMQP 提供了基于 AMQP 协议的抽象层,使得 Java 程序员能够更轻松地使用消息队列完成异步通信、消息分发和数据流处理。Spring AMQP 的核心模块是
spring-rabbit
,它封装了与 RabbitMQ 的交互。
将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于
RabbitMQ
采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与
RabbitMQ
交互。并且
RabbitMQ
官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
官方地址:https://spring.io/projects/spring-amqp/
2. Spring AMQP 的核心组件
- RabbitTemplate:Spring AMQP 提供的主要操作类,用于向队列发送和接收消息。它提供了简单且易用的 API 进行消息发送和接收。
- AmqpAdmin:用于管理 RabbitMQ 的资源,例如创建和删除队列、交换机、绑定等。
- @RabbitListener 和 @RabbitHandler:注解驱动的消息监听机制,可以在类或方法上使用
@RabbitListener
注解来监听指定队列的消息。 - MessageConverter:用于将 Java 对象和消息之间进行相互转换,Spring AMQP 提供了多种消息转换器,例如 Jackson JSON、SimpleMessageConverter 等,方便消息的序列化和反序列化。
发送消息示例:
// 交换机名称
String exchangeName = "cyt.topic"; // 指定要发送消息的交换机名称。这里使用了一个名为 "cyt.topic" 的 Topic 类型交换机
// 消息内容
String message = "喜报!孙悟空大战哥斯拉,胜!"; // 要发送的消息内容,可以是任何文本,当前内容为示例新闻
// 发送消息到指定交换机和路由键
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
// 使用 RabbitTemplate 将消息发送到指定的交换机。
// 参数解释:
// - exchangeName:要发送消息的交换机名称 "cyt.topic"。
// - "china.news":消息的路由键,适用于 Topic 交换机,用于匹配绑定队列的路由模式。
// - message:要发送的消息内容。
// 如果有队列绑定到 "cyt.topic" 交换机,并且匹配 "china.news" 的路由键(例如 "china.*"),
// 则消息会被路由到该队列并被消费者接收处理。
3. 交换机类型
交换机的类型有四种:
Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
Headers:头匹配,基于MQ的消息头匹配,用的较少。
3.1 Fanout交换机
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
3.2 Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
3.3 Topic交换机
Topic
类型的
Exchange
与
Direct
相比,都是可以根据
RoutingKey
把消息路由到不同的队列。
只不过
Topic
类型
Exchange
可以让队列在绑定
BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以
.
分割,例如:
item.insert
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
4. 声明交换机和队列
程序启动时检查队列和交换机是否存在,如果不存在自动创建。
4.1 Direct模式的交换机和队列
// 消费者1 - 监听绑定的 direct.queue1 队列
@RabbitListener(bindings = @QueueBinding(
// 定义并绑定队列
value = @Queue(name = "direct.queue1"), // 创建一个名称为 direct.queue1 的队列
// 定义并绑定交换机
exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
// 绑定的路由键
key = {"red", "blue"} // 路由键,指定该消费者会接收 "red" 和 "blue" 路由键的消息
))
public void listenDirectQueue1(String msg){
// 接收到消息时打印输出
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
// 消费者2 - 监听绑定的 direct.queue2 队列
@RabbitListener(bindings = @QueueBinding(
// 定义并绑定队列
value = @Queue(name = "direct.queue2"), // 创建一个名称为 direct.queue2 的队列
// 定义并绑定交换机
exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
// 绑定的路由键
key = {"red", "yellow"} // 路由键,指定该消费者会接收 "red" 和 "yellow" 路由键的消息
))
public void listenDirectQueue2(String msg){
// 接收到消息时打印输出
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
4.2 Topic模式的交换机和队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
5. 消息转换器
Spring的消息发送代码接收的消息体是一个Object。
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
5.1 引入依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意,如果项目中引入了
spring-boot-starter-web
依赖,则无需再次引入
Jackson
依赖。
5.2 配置JSON转换器
配置消息转换器,在服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
6. 使用示例
6.1 引入依赖
<!--消息发送-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.2 配置
spring:
rabbitmq:
host: 192.168.1.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /cyt# 虚拟主机
username: cyt# 用户名
password: 123 # 密码
6.3 接收消息
在服务中定义一个消息监听类:
package com.cyt.trade.listener;
@Component
@RequiredArgsConstructor
public class PayStatusListener {
// 注入订单服务,用于更新订单状态
private final IOrderService orderService;
/**
* 监听支付成功的消息队列,当接收到支付成功的消息时,更新订单状态。
*
* @RabbitListener 注解用于声明这是一个消息监听方法。
* @QueueBinding 注解定义队列、交换机和路由键的绑定关系。
* - @Queue 用于定义队列的属性,例如队列名和是否持久化。
* - @Exchange 用于定义交换机的属性,例如交换机名和类型。
* - key 指定路由键,将匹配该路由键的消息发送到此队列。
*
* @param orderId 支付成功的订单ID,从消息中提取的参数。
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "trade.pay.success.queue", durable = "true"), // 定义队列名称和持久化
exchange = @Exchange(name = "pay.topic"), // 指定交换机名称
key = "pay.success" // 路由键,用于匹配支付成功的消息
))
public void listenPaySuccess(Long orderId) {
// 调用订单服务,更新订单为支付成功状态
orderService.markOrderPaySuccess(orderId);
}
}
6.4 发送消息
在需要发送消息的地方使用rabbitTemplate即可发送消息。
rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
版权归原作者 cyt涛 所有, 如有侵权,请联系我们删除。