0


RabbitMQ笔记

使用RabbitMQ实现订单超时管理

方案分析

  1. JDK延迟队列
  2. 定时任务
  3. 被动取消
  4. Redis Sorted Set
  5. Redis事件通知
  6. 时间轮算法
  7. RabbitMQ

JDK延迟队列

该方案是利用JDK自带的JUC包中的DelayQueue队列。

pubilc classDelayQueue<E extend Delay>extendsAbstractQueue<E>implementsBlockingQueue<E>

这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入

DelayQueue

中的对象,必须实现

Delayed

接口
在这里插入图片描述

  • offer(): 添加元素
  • poll(): 获取并移出队列的超时元素,没有则返回空
  • take(): 获取并移出队列的超时元素,没有则wait当前线程,直到有元素满足超时条件时返回结果

定时任务

这种方式是最简单的,启动一个计划任务,每隔一定时间(假设1分钟)去扫描数据库一次,通过订单时间来判断是否超时,然后进行UPDATE 或DELETE操作。如Quartz

被动取消

利用懒加载的思想,当用户或商户查询订单时,再判断该订单是否超时,超时则进行业务处理。
这种方式依赖于用户的查询操作触发,如果用户不进行查询订单的操作,该订单就永远不会被取消。所以,实际应用中,也是被动取消+定时任务的组合方式来实现。这种情况下定时任务的时间可以设置的稍微“长”一点。

Redis Sorted Set

Redis有序集合(Sorted Set)每个元素都会关联一个double类型的分数score。Redis可以通过分数来为集合中的成员进行从小到大的排序。

添加元素: ZADD key score member [[score member] [score member]...]
按顺序查询元素: ZRANGE key start end [WITHSCORES]
查询元素score: ZSCORE key member
移出元素: ZREM key member [member ...]

该方案可以将订单超时时间戳与订单编号分别设置为score和member。系统扫描第一个元素判断是否超时,超时则进行业务处理。
然而,这一版存在一个致命的硬伤,在高并发条件下,多个消费者会取到同一个订单编号,又需要编写Lua脚本保证原子性或使用分布式锁,用了分布式锁性能又下降了。

RabbitMQ

RabbitMQ is the most widely deployed open source message broker

延迟队列

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
延迟队列,最重要的特性就体现在它的延时属性上,跟普通队列不一样的是,普通队列中的元素总是等着希望被早点取出消费,而延迟队列中的元素则是希望在指定时间被取出消费,所以延迟队列中的元素是都是带时间属性的。
简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。
本文使用RabbitMQ也是通过延迟队列的机制来实现订单超时的处理。然而 RabbitMg自身并没有延迟队列这个功能,实现该功能一般有以下两种方式:

  • 利用TTL(Time To Live)DLX(Dead Letter Exchanges)实现延迟队列
  • 利用延迟队列插件rabbitmq_delayed_message_exchange实现

RabbitMQ延迟队列实现

实现方案一

在这里插入图片描述

  • RabbitMQ配置类
@ConfigurationpublicclassRabbitMQConfig{// 声明 4个路由key 4个队列 2个交换机 属性// 延迟交换机publicstaticfinalStringDELAY_EXCHANGE_NAME="delay.exchange";// 延迟队列publicstaticfinalStringDELAY_Queue_A_NAME="delay.queue.a";publicstaticfinalStringDELAY_Queue_B_NAME="delay.queue.b";// 延迟队列路由keypublicstaticfinalStringDELAY_Queue_A_ROUTING_KEY="delay.queue.a.routingkey";publicstaticfinalStringDELAY_Queue_B_ROUTING_KEY="delay.queue.b.routingkey";// 死信交换机publicstaticfinalStringDEAD_LETTER_EXCHANGE_NAME="dead.letter.exchange";// 死信队列publicstaticfinalStringDEAD_LETTER_Queue_A_NAME="dead.letter.queue.a";publicstaticfinalStringDEAD_LETTER_Queue_B_NAME="dead.letter.queue.b";// 死信队列路由keypublicstaticfinalStringDEAD_LETTER_Queue_A_ROUTING_KEY="dead.letter.delay_10s.routingkey";publicstaticfinalStringDEAD_LETTER_Queue_B_ROUTING_KEY="dead.letter.delay_60s.routingkey";// 声明延迟交换机@BeanpublicDirectExchangedelayExchange(){returnnewDirectExchange(DELAY_EXCHANGE_NAME);}// 声明死信交换机@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange(DEAD_LETTER_EXCHANGE_NAME);}// 声明延迟队列A 延迟10s 并且绑定到对应的死信交换机中@BeanpublicQueuedelayQueueA(){Map<String,Object> args =newHashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_Queue_A_ROUTING_KEY);
        args.put("x-message-ttl",10000);returnQueueBuilder.durable(DELAY_Queue_A_NAME).withArguments(args).build();}// 声明延迟队列A 延迟60s 并且绑定到对应的死信交换机中@BeanpublicQueuedelayQueueB(){Map<String,Object> args =newHashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_Queue_B_ROUTING_KEY);
        args.put("x-message-ttl",60000);returnQueueBuilder.durable(DELAY_Queue_B_NAME).withArguments(args).build();}// 声明延迟队列A的绑定关系@BeanpublicBindingdelayBindingA(@Qualifier("delayQueueA")Queue queue,@Qualifier("delayExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_A_ROUTING_KEY);}// 声明延迟队列A的绑定关系@BeanpublicBindingdelayBindingB(@Qualifier("delayQueueB")Queue queue,@Qualifier("delayExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_B_ROUTING_KEY);}// 声明死信队列A,用于接收延迟10s处理的消息@BeanpublicQueuedeadLetterQueueA(){returnnewQueue(DEAD_LETTER_Queue_A_NAME);}// 声明死信队列B,用于接收延迟60s处理的消息@BeanpublicQueuedeadLetterQueueB(){returnnewQueue(DEAD_LETTER_Queue_B_NAME);}// 声明死信队列A的绑定关系@BeanpublicBindingdeadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue,@Qualifier("deadLetterExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_Queue_A_ROUTING_KEY);}// 声明死信队列B的绑定关系@BeanpublicBindingdeadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue,@Qualifier("deadLetterExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_Queue_B_ROUTING_KEY);}}
  • 枚举类
@Getter@AllArgsConstructorpublicenumDelayTypeEnum{DELAY_10s(1),DELAY_60s(2);privateInteger type;publicstaticDelayTypeEnumgetDelayTypeEnum(Integer type){if(Objects.equals(type,DELAY_10s.type)){returnDELAY_10s;}if(Objects.equals(type,DELAY_60s.type)){returnDELAY_60s;}returnnull;}}
  • 生产者
@ComponentpublicclassDelayMsgProducer{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsend(String msg,DelayTypeEnum type){switch(type){caseDELAY_10s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_Queue_A_ROUTING_KEY, msg);break;caseDELAY_60s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_Queue_B_ROUTING_KEY, msg);break;default:}}}
  • 消费者
@Component@Slf4jpublicclassDeadLetterQueueConsumer{// 监听死信队列A@RabbitListener(queues =DEAD_LETTER_Queue_A_NAME)publicvoidreceiveA(Message message){// 获取消息String msg =newString(message.getBody());// 记录日志
        log.info("当前时间: {}, 死信队列A收到消息: {}",LocalDateTime.now(), msg);}// 监听死信队列B@RabbitListener(queues =DEAD_LETTER_Queue_B_NAME)publicvoidreceiveB(Message message){// 获取消息String msg =newString(message.getBody());// 记录日志
        log.info("当前时间: {}, 死信队列B收到消息: {}",LocalDateTime.now(), msg);}}
  • 测试在这里插入图片描述在这里插入图片描述

实现方案二

方案一的问题可以通过将TTL设置在消息属性里来解决,然后添加一个延迟队列,用于接收设置为任意延迟时长的消息,再添加一个相应的死信队列和routingkey 即可,如下图:

在这里插入图片描述

  • RabbitMQ配置文件
@ConfigurationpublicclassRabbitMQConfig{// 声明 2个交换机  2个队列 2个路由key  属性// 延迟交换机publicstaticfinalStringDELAY_EXCHANGE_NAME="delay.exchange";// 延迟队列publicstaticfinalStringDELAY_Queue_NAME="delay.queue";// 延迟队列路由keypublicstaticfinalStringDELAY_Queue_ROUTING_KEY="delay.queue.routingkey";// 死信交换机publicstaticfinalStringDEAD_LETTER_EXCHANGE_NAME="dead.letter.exchange";// 死信队列publicstaticfinalStringDEAD_LETTER_Queue_NAME="dead.letter.queue";// 死信队列路由keypublicstaticfinalStringDEAD_LETTER_Queue_ROUTING_KEY="dead.letter.routingkey";// 声明延迟交换机@BeanpublicDirectExchangedelayExchange(){returnnewDirectExchange(DELAY_EXCHANGE_NAME);}// 声明死信交换机@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange(DEAD_LETTER_EXCHANGE_NAME);}// 声明延迟队列 并且绑定到对应的死信交换机中@BeanpublicQueuedelayQueue(){Map<String,Object> args =newHashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_Queue_ROUTING_KEY);returnQueueBuilder.durable(DELAY_Queue_NAME).withArguments(args).build();}// 声明延迟队列的绑定关系@BeanpublicBindingdelayBinding(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_ROUTING_KEY);}// 声明死信队列@BeanpublicQueuedeadLetterQueue(){returnnewQueue(DEAD_LETTER_Queue_NAME);}// 声明死信队列的绑定关系@BeanpublicBindingdeadLetterBindingA(@Qualifier("deadLetterQueue")Queue queue,@Qualifier("deadLetterExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_Queue_ROUTING_KEY);}}
  • 生产者
@ComponentpublicclassDelayMsgProducer{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsend(String message,String delayTime){
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_Queue_ROUTING_KEY, message, msg ->{// 设置消息到期时间
            msg.getMessageProperties().setExpiration(delayTime);return msg;});}}
  • 消费者
@Component@Slf4jpublicclassDeadLetterQueueConsumer{// 监听死信队列@RabbitListener(queues =DEAD_LETTER_Queue_NAME)publicvoidreceiveA(Message message){// 获取消息String msg =newString(message.getBody());// 记录日志
        log.info("当前时间: {}, 死信队列收到消息: {}",LocalDateTime.now(), msg);}}
  • controller
@RestController@Slf4j@RequestMapping("/rabbitmq")publicclassRabbitMQController{@ResourceprivateDelayMsgProducer producer;@ApiOperation(value ="发送消息")@GetMapping("/send/{msg}/{delayTime}")publicvoidsend(@PathVariableString msg,@PathVariableString delayTime){
        log.info("当前时间: {}, 消息: {}, 延迟时间: {}",LocalDateTime.now(), msg, delayTime);
        producer.send(msg, delayTime);}}
  • 测试在这里插入图片描述 问题:10秒的消息没有被提前释放,发送两条消息,如果后面的消息过期时间比前面的短,则要等着前面消息的过期时间到了再一起被释放

实现方案三(插件)

linux环境docker安装rabbitmq插件

  1. 下载插件
  2. docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.8.5
  3. docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez b40e96969299:/plugins/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
  4. docker exec -it rabbitmq /bin/bash
  5. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

方案二的问题可以通过安装RabbitMQ的社区插件

rabbitmq_delayed_message_exchange

来解决。
安装插件后会生成新的Exchange类型

x-delayed-message

,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列,而是存储在

mnesia

(一个分布式数据库)中,随后监测消息延迟时间,如达到可投递时间时将其通过

x-delayed-type

类型标记的交换机投递至目标队列。
在这里插入图片描述
在这里插入图片描述

  • 配置类
@ConfigurationpublicclassRabbitMQConfig{// 声明 1个交换机  1个队列 1个路由key  属性// 延迟交换机publicstaticfinalStringDELAY_EXCHANGE_NAME="delay.exchange";// 延迟队列publicstaticfinalStringDELAY_Queue_NAME="delay.queue";// 延迟队列路由keypublicstaticfinalStringDELAY_Queue_ROUTING_KEY="delay.queue.routingkey";// 声明延迟交换机@BeanpublicCustomExchangedelayExchange(){Map<String,Object> args =newHashMap<>();
        args.put("x-delayed-type","direct");returnnewCustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false, args);}// 声明延迟队列@BeanpublicQueuedelayQueue(){returnQueueBuilder.durable(DELAY_Queue_NAME).build();}// 声明延迟队列的绑定关系@BeanpublicBindingdelayBinding(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange")CustomExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_ROUTING_KEY).noargs();}}
  • 生产者
@ComponentpublicclassDelayMsgProducer{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsend(String message,Integer delayTime){
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_Queue_ROUTING_KEY, message, msg ->{// 设置消息到期时间
            msg.getMessageProperties().setDelay(delayTime);return msg;});}}
  • 消费者
@Component@Slf4jpublicclassDeadLetterQueueConsumer{// 监听死信队列@RabbitListener(queues =RabbitMQConfig.DELAY_Queue_NAME)publicvoidreceiveA(Message message){// 获取消息String msg =newString(message.getBody());// 记录日志
        log.info("当前时间: {}, 延迟队列收到消息: {}",LocalDateTime.now(), msg);}}
  • 测试 -在这里插入图片描述 没有因为前面一条消息延迟时间长而影响下一条消息

订单超时实战

  • yml
spring:rabbitmq:host: 192.168.183.139
    port:5672username: admin
    password: admin
    virtual-host: my_vhost

  datasource:driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: root
    password:123456mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    map-underscore-to-camel-case:truemapper-locations: classpath:mapper/*.xmltype-aliases-package: com.zdz.entity
  global-config:db-config:id-type: auto

# 自定义配置order:delay:time:60000# 1分钟 单位毫秒
  • enum
@Getter@AllArgsConstructorpublicenumOrderStatus{//0待确定 1已确定 2已收获 3已取消 4已完成 5已作废no_confirm(0,"待确定"),has_confirm(1,"已确定"),has_receive(2,"已收获"),cancel(3,"已取消 "),complete(4,"已完成"),discard(5,"已作废");privateInteger status;privateString message;}
@Getter@AllArgsConstructorpublicenumPayStatus{//0 等待支付 1 已支付 2 部分支付no_pay(0,"等待支付"),has_pay(1,"已支付"),part_pay(2,"部分支付");privateInteger status;//状态privateString message;//描述}
  • 配置类
@ConfigurationpublicclassRabbitMQConfig{// 声明 1个交换机  1个队列 1个路由key  属性// 延迟交换机publicstaticfinalStringDELAY_EXCHANGE_NAME="delay.exchange";// 延迟队列publicstaticfinalStringDELAY_Queue_NAME="delay.queue";// 延迟队列路由keypublicstaticfinalStringDELAY_Queue_ROUTING_KEY="delay.queue.routingkey";// 声明延迟交换机@BeanpublicCustomExchangedelayExchange(){Map<String,Object> args =newHashMap<>();
        args.put("x-delayed-type","direct");returnnewCustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false, args);}// 声明延迟队列@BeanpublicQueuedelayQueue(){returnQueueBuilder.durable(DELAY_Queue_NAME).build();}// 声明延迟队列的绑定关系@BeanpublicBindingdelayBinding(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange")CustomExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_ROUTING_KEY).noargs();}}
  • service
@ServicepublicclassOrderServiceImplextendsServiceImpl<OrderMapper,Order>implementsOrderService{@ResourceprivateOrderMapper orderMapper;@ResourceprivateDelayMsgProducer producer;@Value("${order.delay.time}")privateInteger orderDelayTime;/**
     * 新增订单
     * @param order
     * @return
     */@Transactional@OverridepublicMap<String,Object>saveOrder(Order order){// 订单编号
        order.setOrderSn(IdUtil.getSnowflake(1,1).nextIdStr());// 订单状态 0 待支付
        order.setOrderStatus(OrderStatus.no_confirm.getStatus());// 支付状态 0 等待支付
        order.setPayStatus(PayStatus.no_pay.getStatus());// 下单时间
        order.setOrderTime(newDate());// 新增订单int insert = orderMapper.insert(order);Map<String,Object> map =newHashMap<>();if(insert >0){
            map.put("code",200);
            map.put("msg","订单已提交");// 发送消息到队列, 设置消息延迟时间
            producer.send(order.getOrderSn(), orderDelayTime);}else{
            map.put("code",400);
            map.put("msg","订单提交失败");}return map;}/**
     * 根据用户id查询订单列表
     * @param userId
     * @return
     */publicList<Order>getAllByUserId(Integer userId){QueryWrapper<Order> queryWrapper =newQueryWrapper<>();
        queryWrapper.eq("user_id", userId);return orderMapper.selectList(queryWrapper);}}
  • controller
@PostMapping("/save")@ApiOperation(value ="提交订单")publicMap<String,Object>saveOrder(@RequestBodyOrder order){
        log.info("订单信息: {}", order);return orderService.saveOrder(order);}
  • producer
@Component@Slf4jpublicclassDelayMsgProducer{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsend(String message,Integer delayTime){
        log.info("发送订单编号到队列,当前时间: {}, 订单编号: {}",LocalDateTime.now(), message);
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_Queue_ROUTING_KEY, message, msg ->{// 设置消息到期时间
            msg.getMessageProperties().setDelay(delayTime);return msg;});}}
  • consumer
@Component@Slf4jpublicclassDeadLetterQueueConsumer{@ResourceprivateOrderMapper orderMapper;@ResourceprivateOrderActionMapper orderActionMapper;// 监听延迟队列@RabbitListener(queues =RabbitMQConfig.DELAY_Queue_NAME)publicvoidreceiveA(Message message){// 获取消息String orderSn =newString(message.getBody());// 记录日志
        log.info("当前时间: {}, 订单编号: {}",LocalDateTime.now(), orderSn);// 根据订单编号查询订单QueryWrapper<Order> wrapper =newQueryWrapper<>();
        wrapper.eq("order_sn", orderSn);Order order = orderMapper.selectOne(wrapper);
        log.info("订单信息: {}", order);// 如果订单不为空并且支付状态是未支付并且订单状态是待确认if(order !=null&&OrderStatus.no_confirm.getStatus().equals(order.getOrderStatus())&&PayStatus.no_pay.getStatus().equals(order.getPayStatus())){// 设置订单状态为3 已取消
            order.setOrderStatus(OrderStatus.cancel.getStatus());// 根据订单编写修改订单int result = orderMapper.updateById(order);if(result >0){OrderAction orderAction =newOrderAction();
                orderAction.setOrderSn(orderSn);
                orderAction.setOrderStatus(OrderStatus.cancel.getStatus());
                orderAction.setPayStatus(PayStatus.no_pay.getStatus());
                orderAction.setActionNote("支付超时,订单已取消");
                orderAction.setActionTime(newDate());
                orderAction.setStatusDesc("支付超时,订单已取消");// 新增订单操作
                orderActionMapper.insert(orderAction);}}}}
  • 测试在这里插入图片描述
标签: rabbitmq 笔记

本文转载自: https://blog.csdn.net/weixin_67154380/article/details/135115276
版权归原作者 富贵135 所有, 如有侵权,请联系我们删除。

“RabbitMQ笔记”的评论:

还没有评论