0


RabbitMQ如何实现延迟队列

1.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。很可惜,在RabbitMQ中并未提供延迟队列功能,但是我们有其他的方式可以实现延迟队列,方法就是TTL+死信队列,组合实现延迟队列的效果。

2.什么是TTL

TTL,全称Time To Live,消息过期时间设置。消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

队列过期后,会将队列所有消息全部移除。 一个队列中某一个消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉),如果不在队列顶端,那么是无效的,过期时间有队列的过期时间判定。

如果队列设置了,消息也设置了,那么会取时间短的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。
我门一般通过设置消息的x-message-ttl属性来设置时间

3.死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队 列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机,给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key,就能成功绑定了

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

4.实现延迟队列

延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。

这里模拟一个订单支付的功能,下单后30分钟如果没有付款,那么久自动取消订单并且回滚库存,使用的是spring框架,

以下是producer的spring配置文件中对交换机和队列的配置

   <!--
延迟队列:
1. 定义正常交换机(order_exchange)和队列(order_queue)
2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
3. 绑定,设置正常队列过期时间为30分钟
-->
    <!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
        <!-- 3. 绑定,设置正常队列过期时间为30分钟-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
            <entry key="x-message-ttl" value="20000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- 2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx">
            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

为了测试方便,我就把延迟的时间设置为20s,

测试代码:

 @Test
    public void testDelay() throws InterruptedException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend("order_exchange", "order.msg", "订单信息: id=1,time=" + sdf.format(new Date()));
      
    }

consumer端配置

 <context:component-scan base-package="com.kkb.listener"/>
    <!--加载配置文件-->
    <context:property-placeholder
            location="classpath:rabbitmq.properties"/>
    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual" prefetch="1">
        <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/>

    </rabbit:listener-container>

代码

package com.kkb.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class OrderListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
             //1.接收转换消息
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            System.out.println(new String(message.getBody()));
            System.out.println("处理时间:"+sdf.format(new Date()));
            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            //3. 手动签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag, true, false);
        }
    }
}

分别启动消费者端consumer,然后生产者producer发送一条消息过去

运行结果如下:

从图中可以看到,两个时间间隔正好是20s左右,由此实现了延迟队列的功能

标签: java 后端 rabbitmq

本文转载自: https://blog.csdn.net/weixin_44273388/article/details/124285031
版权归原作者 青山故人丶 所有, 如有侵权,请联系我们删除。

“RabbitMQ如何实现延迟队列”的评论:

还没有评论