0


SpringAMQP — RabbitMQ操作工具

1. Spring AMQP 简介

Spring AMQP(Spring for Advanced Message Queuing Protocol) 是 Spring 框架的一个子项目,用于简化与消息代理(如 RabbitMQ)的集成。Spring AMQP 提供了基于 AMQP 协议的抽象层,使得 Java 程序员能够更轻松地使用消息队列完成异步通信、消息分发和数据流处理。Spring AMQP 的核心模块是

  1. spring-rabbit

,它封装了与 RabbitMQ 的交互。

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于

  1. RabbitMQ

采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与

  1. RabbitMQ

交互。并且

  1. RabbitMQ

官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

官方地址:https://spring.io/projects/spring-amqp/

2. Spring AMQP 的核心组件

  • RabbitTemplate:Spring AMQP 提供的主要操作类,用于向队列发送和接收消息。它提供了简单且易用的 API 进行消息发送和接收。
  • AmqpAdmin:用于管理 RabbitMQ 的资源,例如创建和删除队列、交换机、绑定等。
  • @RabbitListener 和 @RabbitHandler:注解驱动的消息监听机制,可以在类或方法上使用 @RabbitListener 注解来监听指定队列的消息。
  • MessageConverter:用于将 Java 对象和消息之间进行相互转换,Spring AMQP 提供了多种消息转换器,例如 Jackson JSON、SimpleMessageConverter 等,方便消息的序列化和反序列化。

发送消息示例:

  1. // 交换机名称
  2. String exchangeName = "cyt.topic"; // 指定要发送消息的交换机名称。这里使用了一个名为 "cyt.topic" 的 Topic 类型交换机
  3. // 消息内容
  4. String message = "喜报!孙悟空大战哥斯拉,胜!"; // 要发送的消息内容,可以是任何文本,当前内容为示例新闻
  5. // 发送消息到指定交换机和路由键
  6. rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
  7. // 使用 RabbitTemplate 将消息发送到指定的交换机。
  8. // 参数解释:
  9. // - exchangeName:要发送消息的交换机名称 "cyt.topic"。
  10. // - "china.news":消息的路由键,适用于 Topic 交换机,用于匹配绑定队列的路由模式。
  11. // - message:要发送的消息内容。
  12. // 如果有队列绑定到 "cyt.topic" 交换机,并且匹配 "china.news" 的路由键(例如 "china.*"),
  13. // 则消息会被路由到该队列并被消费者接收处理。

3. 交换机类型

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

3.1 Fanout交换机

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

3.2 Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

3.3 Topic交换机

  1. Topic

类型的

  1. Exchange

  1. Direct

相比,都是可以根据

  1. RoutingKey

把消息路由到不同的队列。

只不过

  1. Topic

类型

  1. Exchange

可以让队列在绑定

  1. BindingKey

的时候使用通配符!

  1. BindingKey

一般都是有一个或多个单词组成,多个单词之间以

  1. .

分割,例如:

  1. item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

4. 声明交换机和队列

程序启动时检查队列和交换机是否存在,如果不存在自动创建。

4.1 Direct模式的交换机和队列

  1. // 消费者1 - 监听绑定的 direct.queue1 队列
  2. @RabbitListener(bindings = @QueueBinding(
  3. // 定义并绑定队列
  4. value = @Queue(name = "direct.queue1"), // 创建一个名称为 direct.queue1 的队列
  5. // 定义并绑定交换机
  6. exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
  7. // 绑定的路由键
  8. key = {"red", "blue"} // 路由键,指定该消费者会接收 "red" 和 "blue" 路由键的消息
  9. ))
  10. public void listenDirectQueue1(String msg){
  11. // 接收到消息时打印输出
  12. System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
  13. }
  14. // 消费者2 - 监听绑定的 direct.queue2 队列
  15. @RabbitListener(bindings = @QueueBinding(
  16. // 定义并绑定队列
  17. value = @Queue(name = "direct.queue2"), // 创建一个名称为 direct.queue2 的队列
  18. // 定义并绑定交换机
  19. exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
  20. // 绑定的路由键
  21. key = {"red", "yellow"} // 路由键,指定该消费者会接收 "red" 和 "yellow" 路由键的消息
  22. ))
  23. public void listenDirectQueue2(String msg){
  24. // 接收到消息时打印输出
  25. System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
  26. }

4.2 Topic模式的交换机和队列

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "topic.queue1"),
  3. exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
  4. key = "china.#"
  5. ))
  6. public void listenTopicQueue1(String msg){
  7. System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
  8. }
  9. @RabbitListener(bindings = @QueueBinding(
  10. value = @Queue(name = "topic.queue2"),
  11. exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
  12. key = "#.news"
  13. ))
  14. public void listenTopicQueue2(String msg){
  15. System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
  16. }

5. 消息转换器

Spring的消息发送代码接收的消息体是一个Object。

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

5.1 引入依赖

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.dataformat</groupId>
  3. <artifactId>jackson-dataformat-xml</artifactId>
  4. <version>2.9.10</version>
  5. </dependency>

注意,如果项目中引入了

  1. spring-boot-starter-web

依赖,则无需再次引入

  1. Jackson

依赖。

5.2 配置JSON转换器

配置消息转换器,在服务的启动类中添加一个Bean即可:

  1. @Bean
  2. public MessageConverter messageConverter(){
  3. // 1.定义消息转换器
  4. Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
  5. // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
  6. jackson2JsonMessageConverter.setCreateMessageIds(true);
  7. return jackson2JsonMessageConverter;
  8. }

6. 使用示例

6.1 引入依赖

  1. <!--消息发送-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

6.2 配置

  1. spring:
  2. rabbitmq:
  3. host: 192.168.1.101 # 你的虚拟机IP
  4. port: 5672 # 端口
  5. virtual-host: /cyt# 虚拟主机
  6. username: cyt# 用户名
  7. password: 123 # 密码

6.3 接收消息

在服务中定义一个消息监听类:

  1. package com.cyt.trade.listener;
  2. @Component
  3. @RequiredArgsConstructor
  4. public class PayStatusListener {
  5. // 注入订单服务,用于更新订单状态
  6. private final IOrderService orderService;
  7. /**
  8. * 监听支付成功的消息队列,当接收到支付成功的消息时,更新订单状态。
  9. *
  10. * @RabbitListener 注解用于声明这是一个消息监听方法。
  11. * @QueueBinding 注解定义队列、交换机和路由键的绑定关系。
  12. * - @Queue 用于定义队列的属性,例如队列名和是否持久化。
  13. * - @Exchange 用于定义交换机的属性,例如交换机名和类型。
  14. * - key 指定路由键,将匹配该路由键的消息发送到此队列。
  15. *
  16. * @param orderId 支付成功的订单ID,从消息中提取的参数。
  17. */
  18. @RabbitListener(bindings = @QueueBinding(
  19. value = @Queue(name = "trade.pay.success.queue", durable = "true"), // 定义队列名称和持久化
  20. exchange = @Exchange(name = "pay.topic"), // 指定交换机名称
  21. key = "pay.success" // 路由键,用于匹配支付成功的消息
  22. ))
  23. public void listenPaySuccess(Long orderId) {
  24. // 调用订单服务,更新订单为支付成功状态
  25. orderService.markOrderPaySuccess(orderId);
  26. }
  27. }

6.4 发送消息

在需要发送消息的地方使用rabbitTemplate即可发送消息。

  1. rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());

本文转载自: https://blog.csdn.net/qq_46637011/article/details/143618414
版权归原作者 cyt涛 所有, 如有侵权,请联系我们删除。

“SpringAMQP — RabbitMQ操作工具”的评论:

还没有评论