0


17.RabbitMQ.记录

基础

一.初识MQ

异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方 (谁调的方法)
  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方 (处理方法)

MQ 就是存放消息的队列, 就是上面异步调用的Broker

开发语言是Erlang, 可用性较高, 吞吐量一般, 消息延迟微妙级, 消息可靠性高

二.RabbitMQ

安装

docker pull rabbitmq 安装最新的

或者直接运行run命令也行, 没有镜像他会自己下载

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \ 控制台地址
 -p 5672:5672 \ 处理消息的端口
 --network hmall \ 这个没有可以不写
 -d \
 rabbitmq:3.8-management

访问地址: http://ip:15672/

收发消息

概念:

publisher:消息发送者

exchange: 交换机, 负责路由消息

queue: 队列,存储消息 (他和上面的交换机,就属于MQ服务器的范畴 broker; virtual-host: 虚拟主机,起数据隔离的作用 (有可能一个公司都用这一个mq, 用这个去隔离不同的业务))

consumer: 消息的消费者

案例: 需求: 新建队列hello1 和 hello2 , 向默认交换机发送一条消息, 查看小时 是否到达队列

数据隔离

案例: 需求: 新建一个用户hmall, 为hamall用户创建一个virtual host, 测试不同的虚拟主机之间的数据隔离现象

添加一个用户

创建一个虚拟主机

三.SpringAMQP

由于

RabbitMQ

采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与

RabbitMQ

交互

java实现AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
spring: #无论是消费者还是发送者都得加这个配置
  rabbitmq:
    host: 192.168.0.82 #ip
    port: 5672 #端口
    virtual-host: /heima #虚拟主机
    username: hmall #创建的用户
    password: 123 #密码
发送者
// 在控制台创建了simple.queue队列之后
// 执行
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testSendMessage2Queue() {
        String queueName = "simple.queue"; // 队列名字 和自己创建的队列的名字要相同
        String msg = "hello"; // 发送的消息
        rabbitTemplate.convertAndSend(queueName, msg);
    }
}   // 会在队列中收到该消息
接受者  要启动接受者服务
@Slf4j
@Component
public class MqListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "simple.queue") // 监听的哪个队列
    public void listenSimpleQueue(String msg) {
        System.out.println("收到的消息为: " + msg); // 当消息发送过来后直接输出
    }

}

队列 WorkQueues

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

案例: 需求, 创建一个队列, 让发送者一秒往里面发50条, 创建两个消费者, 一个处理消息快, 一个处理消息慢

    /**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */
    @Test
    public void testWorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "work.queue";
        // 消息
        String message = "hello, message_";
        for (int i = 0; i < 50; i++) {
            // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }

发送了50条消息

处理消息快的,  结果是处理了25条
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }

处理消息慢的,  结果是处理了25条
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }

Work queues 这个无论消息处理者,处理的快慢(就是不管消息处理没处理完), 都会把消息平均的给他们发相同的数量

就是假如发了一条消息, 任意一个消费者把他消费了就行, 假如有50条消息, 会均摊一人25条

消费者消息推送限制

默认情况下,mq会将消息依次轮训投递给队列上的每个消费者, 但没有考虑消费者是否已经处理完消息, 可能出现消息的堆积.(就是上面的情况)

解决方案: 加配置 (谁处理的快, 就会多处理一些消息, 假如有20条,处理快的处理了18条)

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息, 处理完成才能获取下一个消息
确保同一时刻最多投递给消费者1条消息

交换机类型

上面的是 发送者 直接将消息发送到 一个队列中 , 消息被一个消费者 消费掉就没了,

但在实际生产中, 一个消息可能要发送到多个微服务中, 就是发送到多个队列中,

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

Fanout交换机

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息
在控制台给交换机绑定了两个队列之后
// 发送消息
@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "hmall.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

// 下面这俩队列都能接收
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

Direct交换机

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
声明一个名为hmall.direct的交换机
声明队列direct.queue1,绑定hmall.direct,bindingKey为blud和red
声明队列direct.queue2,绑定hmall.direct,bindingKey为yellow和red
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
在publisher中编写测试方法,向hmall.direct发送消息

// 消息发送
@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "hmall.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);  
// 绑定相同key("red")的队列会收到消息, 如果有多个队列绑定的都是这个, 多个队列都会收到消息
}

// 消息接收  
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

Topic交换机

Topic

类型的

Exchange

Direct

相比,都是可以根据

RoutingKey

把消息路由到不同的队列。 只不过

Topic

类型

Exchange

可以让队列在绑定

BindingKey

的时候使用通配符!

BindingKey

一般都是有一个或多个单词组成,多个单词之间以

.

分割,例如:

item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:- china.news- china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:- china.news- japan.news

/**
 * topicExchange
 */
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "hmall.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

声明队列和交换机

由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

// 广播交换机   发消息直接玩交换机发就行
@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
}

// 订阅交换机
@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }
}

基于注解声明

// 其他的地方不用定义任何东西. 发消息直接往hmall.direct交换机发就行
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

消息转换器

Spring的消息发送代码接收的消息体是一个Object: 而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

// 在发送者 和 接受者服务都引入依赖
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>
// 注意,如果项目中引入了`spring-boot-starter-web`依赖,则无需再次引入`Jackson`依赖。

配置消息转换器,在

publisher

consumer

两个服务的启动类中添加一个Bean即可(两边都得有):

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}
// 消息转换器中添加的messageId可以便于我们将来做幂等性判断。

消费者接收Object

我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:

@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
    System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}

四.在实际业务中

// 支付模块    
@Override
@Transactional
public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {
    // 1.查询支付单
    PayOrder po = getById(payOrderDTO.getId());
    // 2.判断状态
    if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){
        // 订单不是未支付,状态异常
        throw new BizIllegalException("交易已支付或关闭!");
    }
    // 3.尝试扣减余额
    userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());
    // 4.修改支付单状态
    boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());
    if (!success) {
        throw new BizIllegalException("交易已支付或关闭!");
    }
    // 5.修改订单状态
    // tradeClient.markOrderPaySuccess(po.getBizOrderNo());

// 当支付成功之后去给订单模块发个消息, 修改订单状态, 这里建议用try包起来这样消息发送失败也不会回滚这个方法
    try {
        rabbitTemplate.convertAndSend("pay.topic", "pay.success", po.getBizOrderNo());
    } catch (Exception e) {
        log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);
    }
}

// 收到支付发过来的消息 修改订单状态  
// 订单模块  
@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "mark.order.pay.queue", durable = "true"),
            exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),
            key = "pay.success"
    ))
    public void listenPaySuccess(Long orderId){
        orderService.markOrderPaySuccess(orderId);
    }

问题: 如果消息发送失败, 订单模块没收到会怎么样??

高级

问题: 如果消息发送失败, 订单模块没收到会怎么样??

还有就是当消息被投递到交换机但是还没有被消费的时候, mq挂了? 会怎么样?

一.发送者的可靠性

消息从生产者到消费者的每一步都可能导致消息丢失:

  • 发送消息时丢失:- 生产者发送消息时连接MQ失败- 生产者发送消息到达MQ后未找到Exchange- 生产者发送消息到达MQ的Exchange后,未找到合适的Queue- 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:- 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:- 消息接收后尚未处理突然宕机- 消息接收后处理过程中抛出异常

综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

(1.发送者发的时候, 2.发到mq,mq不会宕机丢失之类的, 3消费者确保要消费了)

我们要做的是确保生产者一定能把消息发送到MQ。

1.生产者重试机制

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当

RabbitTemplate

与MQ连接超时后,多次重试。 修改配置文件

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间,就是失败之后第一次等多久开始下一次重试
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

2.生产者确认机制

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。 不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制包括

Publisher** Confirm**

Publisher **Return**

两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。 具体如图所示:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功(入队的那一刻就返回ack),返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化(得保存在磁盘之后才能返回ack),返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中

ack

nack

属于Publisher Confirm机制,

ack

是投递成功;

nack

是投递失败。而

return

则属于Publisher Return机制。 默认两种机制都是关闭状态,需要通过配置文件来开启。

实现生产者确认
1.开启生产者确认

在publisher模块的

application.yaml

中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制 一般不需要配置

这里

publisher-confirm-type

有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般我们推荐使用

correlated

回调机制

我们使用了回调机制就得写一个回调函数,来处理回调, 下面两个

2.定义ReturnCallback

每个

RabbitTemplate

只能配置一个

ReturnCallback

,因此我们可以在配置类中统一设置(因为只有一个我们可以在项目启动的时候去配置)。

版本升级之后写法: 在发送者配置类里面

/**
 * mq 发送者确认机制 的回执 的配置
 *
 * */
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware { // ApplicationContextAware 是容器通知

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.debug("收到消息的 return callback, exchange:{}, key:{}, msg:{}, code:{}, text:{}",
                        returned.getExchange(),
                        returned.getRoutingKey(),
                        returned.getMessage(),
                        returned.getReplyCode(),
                        returned.getReplyText());
            }
        });
    }
}

下面这种和上面这种相同

我们在publisher模块定义一个配置类:

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}
3.定义ConfirmCallback

发送消息, 指定消息id, 消息ConfirmCallback 每次发消息的时候定义回调

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆(每个消息都有一个id)
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个

Future

来返回,我们可以提前给

CorrelationData

中的

Future

添加回调函数来处理消息回执:

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData(); // 这个里面要有自己的id
    // 2.给Future添加ConfirmCallback  (多线程Future异步执行,执行成功后得到结果)
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) { // 这个方法失败不是ack也不是nack而是表示spring内部处理的时候失败了,和mq没有关系,基本上不会触发, 就是表回调没拿到
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) { // 指mq的回调成功了, 拿到了回调
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
            // 这里表示消息发送失败, 在这里面执行重发的逻辑
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

生产者确认需要额外的网络和系统开销, 尽量不要使用. 影响效率

如果一定要用, 无需开启publisher-Return机制, 因为一般路由失败是自己业务问题.

对于nack消息可以有限次数重试, 依然失败则记录异常记录.

二.MQ的可靠性

在默认情况下, mq会将接收到的信息,保内存中以降低消息收发的延迟. 这样会导致两个问题

  • 一但mq宕机, 内存中的消息会丢失
  • 内存空间有限, 当消费者故障或消息处理过慢时, 会导致消息积压, 引发mq阻塞(当mq满了之后,mq会将一些老的消息存到磁盘, 但这个过程是阻塞的, 新的消息进不来)

两个方式解决: mq3.6版本之前用数据持久化, 之后用Lazy Queue

数据持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,mq实现数据持久化包括3个方面:

  • 交换机持久化
  • 队列持久化
  • 消息持久化
   // 测试非持久化状态下  向mq发送100万条消息
     @Test
    void testPageOut() {
        Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build(); // 非持久化
        for (int i = 0; i < 1000000; i++) {
            rabbitTemplate.convertAndSend("simple.queue", message);
        }
    } // 结果是 当mq消息满了之后, mq会阻塞,无法处理新的消息, 一段时间之后才接着处理

   // 测试持久化状态下  向mq发送100万条消息
    @Test
    void testPageOut() {
        Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build(); // 持久化
        for (int i = 0; i < 1000000; i++) {
            rabbitTemplate.convertAndSend("simple.queue", message);
        }
    } // 结果是 当mq消息满了之后, mq不会阻塞,只是处理消息的能力稍微降低, 一段时间之后恢复正常处理

数据持久化确实解决了内存存储的安全问题, 也不会有pageout造成的阻塞, 但是他的效率不是很好

Lazy Queue

mq是3.6版本开始, 加了lazy queue, 就是**惰性队列; **3.12版本之后都是lazy queue模式无法更改

惰性队列的特征:

  • 接收到消息后直接存入磁盘而非内存 (内存中只保留最近的消息, 默认2048条)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存 (读取消息时也是从磁盘再读取)
  • 支持百万条的消息存储

基于bean模式开启

@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 开启Lazy模式
            .build();
}

基于注解

@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy") // 添加这个参数
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}
更新已有队列为lazy模式

对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。 可以基于命令行设置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues 

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues:策略的作用对象,是所有的队列

当然,也可以在控制台配置policy,进入在控制台的

Admin

页面,点击

Policies

,即可添加配置:

三.消费者的可靠性

问题:

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。 但问题来了:RabbitMQ如何得知消费者的处理状态呢?

解决:

消费者确认机制

为了确认消费者是否成功处理了消息, mq提供了消费者确认机制. 当消费者处理消息结束之后, 应该向mq发送一个回执, 告知mq自己消息状态. 回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息 (队列将消息发给消费者,如果消费者处理成功给mq队列返回ack,那么这个队列就删除这个消息)
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过

try catch

机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • **none**:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用 (默认)
  • **manual**:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • **auto**:自动模式SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回nack- 如果是消息处理或校验异常,自动返回reject;

返回Reject的常见异常有:

Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:

  • o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
  • o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
  • o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.
  • o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message<Foo> but Message<Bar> is received.
  • java.lang.NoSuchMethodException: Added in version 1.6.3.
  • java.lang.ClassCastException: Added in version 1.6.3.

通过下面的配置可以修改SpringAMQP的ACK处理方式: none 不做处理

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new MessageConversionException("故意的");
    }
    log.info("消息处理完成");
}

测试可以发现:当消息处理发生异常时消息依然RabbitMQ删除了

我们再次把确认机制修改为auto: 一般我们使用auto

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

当我们把配改为**

auto

,消息处理失败后,会回到RabbitMQ,并重新投递到消费者**。

问题: 极端情况下, 他可能一直在这里失败, 一直重复投递, 就有问题

消费者失败处理- 失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力.

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是**reject**

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

问题: 消息重试后没成功就被丢弃了, 对消息可靠性要求高的的不合适, (像改订单状态, 必须保证消息投递到,修改状态)

消费者失败处理- 失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。 因此Spring允许我们自定义重试次数耗尽后消息处理策略,这个策略是由

MessageRecovery

接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是**

RepublishMessageRecoverer

**,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

消费者加配置
spring:
  rabbitmq:
    host: 192.168.0.88
    port: 5672
    virtual-host: /heima
    username: hmall
    password: 123
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto #消费者确认机制
        retry:
          enabled: true # 开启重试机制

1)在consumer服务中定义处理失败消息的交换机和队列

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") // 这个注解的意思是当某个属性满足时 生效,
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    // 定义一个RepublishMessageRecoverer,关联队列和交换机
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
消费者如何保证消息一定被消费?
  • 开启消费者确认机制atuo, 由spring确认消息处理成功后返回ack, 异常时返回nack
  • 开启消费者重试机制, 并设置MessageRecoverer, 多次重试失败后将消息投递到异常交换机, 交由人工处理(多次失败之后,会将异常信息投递到异常交换机,我们可以在这个交换机队列中看到异常信息,进行排查)

问题: 上面这些操作, 就保证消息至少被消费一次,但是消费者到底消费了几次消息我们无法确定

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:

f(x) = f(f(x))

,例如求绝对值函数。 在程序开发中,则是**指同一个业务,执行一次或多次对业务状态的影响是一致的(重复消费多次的结果应该是一样的)**。下面这个几个是幂等的操作 例如:

  • 根据id删除数据 (删100次id=2的数据, 都是删掉id=2的数据)
  • 查询数据 (就是我查一次id=2的数据, 和我查询100次id=2的数据返回的都是一样的数据)
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。 然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。 举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案

  • 唯一消息ID
  • 业务状态判断
唯一消息ID
  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息去数据库查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一ID呢? 其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。 以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

问题: 这样存在的问题就是, 之前拿到消息去判断就行了, 现在还需要拿到消息id,然后保存在数据库可, 对本来业务有侵入. 而且要操作数据库, 对性能有影响.

业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。 例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。(就是假如收到两条消息都是修改订单状态从未支付到已经支付,消费第一条将状态改为了已经支付,然后判断这个订单已经支付了, 所以就不用再次消费第二条消息了)

相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

以支付修改订单的业务为例,我们需要修改

OrderServiceImpl

中的

markOrderPaySuccess

方法:

mq支付结果监听器:

package com.hmall.trade.listeners;

import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;

@Component
@RequiredArgsConstructor
public class PayStatusListener {

    private final IOrderService orderService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "mark.order.pay.queue", durable = "true"),
            exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),
            key = "pay.success"
    ))
    public void listenOrderPay(Long orderId) {
       /* // 1.查询订单
        Order order = orderService.getById(orderId);
        // 2.判断订单状态是否为未支付 
        if(order == null || order.getStatus() != 1){ // 状态1为为付款, 除了1的都表示支付完了
            // 订单不存在,或者状态异常
            return; // 这里不需要抛异常, 因为抛异常,mq就会重试
        }
        // 3.如果未支付,标记订单状态为已支付
        orderService.markOrderPaySuccess(orderId);*/

        // update order set status = 2 where id = ? AND status = 1
        //orderService.lambdaUpdate()
               // .set(Order::getStatus, 2)
              //  .set(Order::getPayTime, LocalDateTime.now())
              //  .eq(Order::getId, orderId)
               // .eq(Order::getStatus, 1)
                //.update();
    }
}

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。我们可以合并上述操作为这样:

@Override
public void markOrderPaySuccess(Long orderId) { // 用这个就不需要上面的1,2判断步骤操作了
    // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
     orderService.lambdaUpdate() // mp里面的操作
            .set(Order::getStatus, 2)
            .set(Order::getPayTime, LocalDateTime.now())
            .eq(Order::getId, orderId)
            .eq(Order::getStatus, 1)
            .update();
}

注意看,上述代码等同于这样的SQL语句:

UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

兜底方案

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢? 有没有其它兜底方案,能够确保订单的支付状态一致呢?

其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

(就是本来交易服务就是修改订单状态的服务要一直被动的等待支付服务发送消息,那边交易成功才能发过来消息. 我们可以定期查询订单支付状态, 这样即时mq通知失败也能确保订单支付状态的最终一致性)

图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

不过需要注意的是,交易服务并不知道用户会在什么时候支付,如果查询的时机不正确(比如查询的时候用户正在支付中),可能查询到的支付状态也不正确。 那么问题来了,我们到底该在什么时间主动查询支付状态呢?

这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。 定时任务大家之前学习过,具体的实现这里就不再赘述了。

至此,消息可靠性的问题已经解决了。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

四.延迟消息

延迟消息: 生产者发送消息时指定一个时间, 消费者不会立刻收到消息, 而是在指定时间之后才收到消息.

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过

****dead-letter-exchange****

属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

控制台绑定死信队列

延迟消息插件 DelayExchange插件

mq推出的, 该插件的原理是设计了一种支持延迟消息功能的交换机, 当消息投递到交换机后可以暂存一定时间 , 到期后再投递到队列.

安装

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins  // 因为我们当时启动容器的时候将这个目录挂载出来了

结果如下:

[
    {
        "CreatedAt": "2024-10-12T10:54:46+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]

插件目录被挂载到了

/var/lib/docker/volumes/mq-plugins/_data

这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:

声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于

@Bean

的方式:

package com.itheima.consumer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}
发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() { // 以后这个发送方法可以简化写, 改成一个类
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。 因此,不建议设置延迟时间过长的延迟消息

订单状态同步问题

假如订单超时支付时间为30分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。 但是大多数情况下用户支付都会在1分钟内完成,我们发送的消息却要在MQ中停留30分钟,额外消耗了MQ的资源。因此,我们最好多检测几次订单支付状态,而不是在最后第30分钟才检测。 例如:我们在用户下单后的第10秒、20秒、30秒、45秒、60秒、1分30秒、2分、...30分分别设置延迟消息,如果提前发现订单已经支付,则后续的检测取消即可。 这样就可以有效避免对MQ资源的浪费了。

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体,处于通用性考虑,我们将其定义到

hm-common

模块下:

package com.hmall.common.domain;

import com.hmall.common.utils.CollUtils;
import lombok.Data;

import java.util.List;

@Data
public class MultiDelayMessage<T> {
    /**
     * 消息体
     */
    private T data;
    /**
     * 记录延迟时间的集合
     */
    private List<Long> delayMillis;

    public MultiDelayMessage(T data, List<Long> delayMillis) {
        this.data = data;
        this.delayMillis = delayMillis;
    }
    public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){
        return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
    }

    /**
     * 获取并移除下一个延迟时间
     * @return 队列中的第一个延迟时间
     */
    public Long removeNextDelay(){
        return delayMillis.remove(0);
    }

    /**
     * 是否还有下一个延迟时间
     */
    public boolean hasNextDelay(){
        return !delayMillis.isEmpty();
    }
}

定义常量

package com.hmall.trade.constants;

public interface MqConstants {
    String DELAY_EXCHANGE = "trade.delay.topic";
    String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
    String DELAY_ORDER_ROUTING_KEY = "order.query";
}

抽取延迟类

package com.hmall.common.mq;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {

    private final int delay;

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelay(delay);
        return message;
    }
}

订单里面延迟修改订单状态

        // 5.延迟检测订单状态消息
        try {
            MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L);
            rabbitTemplate.convertAndSend(
                    MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
                    new DelayMessageProcessor(msg.removeNextDelay().intValue())
            );
        } catch (AmqpException e) {
            log.error("延迟消息发送异常!", e);
        }

消息监听 收到订单发的延迟消息,进行逻辑处理

package com.hmall.trade.listeners;

import com.hmall.common.constants.MqConstants;
import com.hmall.common.domain.MultiDelayMessage;
import com.hmall.common.mq.DelayMessageProcessor;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class OrderStatusCheckListener {

    private final IOrderService orderService;
    private final RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
            exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE, delayed = "true", type = ExchangeTypes.TOPIC),
            key = MqConstants.DELAY_ORDER_ROUTING_KEY
    ))
    public void listenOrderDelayMessage(MultiDelayMessage<Long> msg) {
        // 1.查询订单状态
        Order order = orderService.getById(msg.getData());
        // 2.判断是否已经支付
        if (order == null || order.getStatus() == 2) {
            // 订单不存在或者已经被处理
            return;
        }
        // TODO 3.主动查询 去支付服务模块 查询真正的支付状态
        boolean isPay = false;
        // 3.1.已支付,标记订单状态为已支付
        if (isPay) {
            orderService.markOrderPaySuccess(order.getId()); // 修改订单状态为已经支付
            return;
        }

        // 4.判断是否存在延迟时间    若还没有支付看看后面还有没有延迟时间
        if (msg.hasNextDelay()) {
            // 4.1.存在,重发延迟消息
            Long nextDelay = msg.removeNextDelay();
            rabbitTemplate.convertAndSend(
                    MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY,
                    msg, new DelayMessageProcessor(nextDelay.intValue()));
            return;
        }
        
        // 5.不存在,取消订单  后续没有延迟时间了,例如过了30分钟了还没支付, 就把订单取消
        orderService.cancelOrder(order.getId());
    }
}

抽取共享mq配置

我们将mq的配置抽取到nacos中,方便各个微服务共享配置。 在nacos中定义一个名为

shared-mq.xml

的配置文件,内容如下:

spring:
  rabbitmq:
    host: ${hm.mq.host:192.168.150.101} # 主机名
    port: ${hm.mq.port:5672} # 端口
    virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机
    username: ${hm.mq.un:hmall} # 用户名
    password: ${hm.mq.pw:123} # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
标签: rabbitmq java

本文转载自: https://blog.csdn.net/m0_67308643/article/details/142868616
版权归原作者 忘记带蟹堡黄了 所有, 如有侵权,请联系我们删除。

“17.RabbitMQ.记录”的评论:

还没有评论