0


SpringAMQP — RabbitMQ操作工具

1. Spring AMQP 简介

Spring AMQP(Spring for Advanced Message Queuing Protocol) 是 Spring 框架的一个子项目,用于简化与消息代理(如 RabbitMQ)的集成。Spring AMQP 提供了基于 AMQP 协议的抽象层,使得 Java 程序员能够更轻松地使用消息队列完成异步通信、消息分发和数据流处理。Spring AMQP 的核心模块是

spring-rabbit

,它封装了与 RabbitMQ 的交互。

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于

RabbitMQ

采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与

RabbitMQ

交互。并且

RabbitMQ

官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

官方地址:https://spring.io/projects/spring-amqp/

2. Spring AMQP 的核心组件

  • RabbitTemplate:Spring AMQP 提供的主要操作类,用于向队列发送和接收消息。它提供了简单且易用的 API 进行消息发送和接收。
  • AmqpAdmin:用于管理 RabbitMQ 的资源,例如创建和删除队列、交换机、绑定等。
  • @RabbitListener 和 @RabbitHandler:注解驱动的消息监听机制,可以在类或方法上使用 @RabbitListener 注解来监听指定队列的消息。
  • MessageConverter:用于将 Java 对象和消息之间进行相互转换,Spring AMQP 提供了多种消息转换器,例如 Jackson JSON、SimpleMessageConverter 等,方便消息的序列化和反序列化。

发送消息示例:

// 交换机名称
String exchangeName = "cyt.topic"; // 指定要发送消息的交换机名称。这里使用了一个名为 "cyt.topic" 的 Topic 类型交换机

// 消息内容
String message = "喜报!孙悟空大战哥斯拉,胜!"; // 要发送的消息内容,可以是任何文本,当前内容为示例新闻

// 发送消息到指定交换机和路由键
rabbitTemplate.convertAndSend(exchangeName, "china.news", message); 
// 使用 RabbitTemplate 将消息发送到指定的交换机。
// 参数解释:
// - exchangeName:要发送消息的交换机名称 "cyt.topic"。
// - "china.news":消息的路由键,适用于 Topic 交换机,用于匹配绑定队列的路由模式。
// - message:要发送的消息内容。
// 如果有队列绑定到 "cyt.topic" 交换机,并且匹配 "china.news" 的路由键(例如 "china.*"),
// 则消息会被路由到该队列并被消费者接收处理。

3. 交换机类型

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

3.1 Fanout交换机

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

3.2 Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

3.3 Topic交换机

Topic

类型的

Exchange

Direct

相比,都是可以根据

RoutingKey

把消息路由到不同的队列。

只不过

Topic

类型

Exchange

可以让队列在绑定

BindingKey

的时候使用通配符!

BindingKey

一般都是有一个或多个单词组成,多个单词之间以

.

分割,例如:

item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

4. 声明交换机和队列

程序启动时检查队列和交换机是否存在,如果不存在自动创建。

4.1 Direct模式的交换机和队列

// 消费者1 - 监听绑定的 direct.queue1 队列
@RabbitListener(bindings = @QueueBinding(
    // 定义并绑定队列
    value = @Queue(name = "direct.queue1"), // 创建一个名称为 direct.queue1 的队列
    // 定义并绑定交换机
    exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
    // 绑定的路由键
    key = {"red", "blue"} // 路由键,指定该消费者会接收 "red" 和 "blue" 路由键的消息
))
public void listenDirectQueue1(String msg){
    // 接收到消息时打印输出
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

// 消费者2 - 监听绑定的 direct.queue2 队列
@RabbitListener(bindings = @QueueBinding(
    // 定义并绑定队列
    value = @Queue(name = "direct.queue2"), // 创建一个名称为 direct.queue2 的队列
    // 定义并绑定交换机
    exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
    // 绑定的路由键
    key = {"red", "yellow"} // 路由键,指定该消费者会接收 "red" 和 "yellow" 路由键的消息
))
public void listenDirectQueue2(String msg){
    // 接收到消息时打印输出
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

4.2 Topic模式的交换机和队列

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

5. 消息转换器

Spring的消息发送代码接收的消息体是一个Object。

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

5.1 引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了

spring-boot-starter-web

依赖,则无需再次引入

Jackson

依赖。

5.2 配置JSON转换器

配置消息转换器,在服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

6. 使用示例

6.1 引入依赖

  <!--消息发送-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

6.2 配置

spring:
  rabbitmq:
    host: 192.168.1.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /cyt# 虚拟主机
    username: cyt# 用户名
    password: 123 # 密码

6.3 接收消息

在服务中定义一个消息监听类:

package com.cyt.trade.listener;

@Component
@RequiredArgsConstructor
public class PayStatusListener {

    // 注入订单服务,用于更新订单状态
    private final IOrderService orderService;

    /**
     * 监听支付成功的消息队列,当接收到支付成功的消息时,更新订单状态。
     * 
     * @RabbitListener 注解用于声明这是一个消息监听方法。
     * @QueueBinding 注解定义队列、交换机和路由键的绑定关系。
     *   - @Queue 用于定义队列的属性,例如队列名和是否持久化。
     *   - @Exchange 用于定义交换机的属性,例如交换机名和类型。
     *   - key 指定路由键,将匹配该路由键的消息发送到此队列。
     *
     * @param orderId 支付成功的订单ID,从消息中提取的参数。
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "trade.pay.success.queue", durable = "true"), // 定义队列名称和持久化
            exchange = @Exchange(name = "pay.topic"), // 指定交换机名称
            key = "pay.success" // 路由键,用于匹配支付成功的消息
    ))
    public void listenPaySuccess(Long orderId) {
        // 调用订单服务,更新订单为支付成功状态
        orderService.markOrderPaySuccess(orderId);
    }
}

6.4 发送消息

在需要发送消息的地方使用rabbitTemplate即可发送消息。

rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());

本文转载自: https://blog.csdn.net/qq_46637011/article/details/143618414
版权归原作者 cyt涛 所有, 如有侵权,请联系我们删除。

“SpringAMQP — RabbitMQ操作工具”的评论:

还没有评论