使用RabbitMQ实现订单超时管理
方案分析
- JDK延迟队列
- 定时任务
- 被动取消
- Redis Sorted Set
- Redis事件通知
- 时间轮算法
- RabbitMQ
JDK延迟队列
该方案是利用JDK自带的JUC包中的DelayQueue队列。
pubilc classDelayQueue<E extend Delay>extendsAbstractQueue<E>implementsBlockingQueue<E>
这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入
DelayQueue
中的对象,必须实现
Delayed
接口
offer()
: 添加元素poll()
: 获取并移出队列的超时元素,没有则返回空take()
: 获取并移出队列的超时元素,没有则wait当前线程,直到有元素满足超时条件时返回结果
定时任务
这种方式是最简单的,启动一个计划任务,每隔一定时间(假设1分钟)去扫描数据库一次,通过订单时间来判断是否超时,然后进行UPDATE 或DELETE操作。如Quartz
被动取消
利用懒加载的思想,当用户或商户查询订单时,再判断该订单是否超时,超时则进行业务处理。
这种方式依赖于用户的查询操作触发,如果用户不进行查询订单的操作,该订单就永远不会被取消。所以,实际应用中,也是被动取消+定时任务的组合方式来实现。这种情况下定时任务的时间可以设置的稍微“长”一点。
Redis Sorted Set
Redis有序集合(Sorted Set)每个元素都会关联一个double类型的分数score。Redis可以通过分数来为集合中的成员进行从小到大的排序。
添加元素: ZADD key score member [[score member] [score member]...]
按顺序查询元素: ZRANGE key start end [WITHSCORES]
查询元素score: ZSCORE key member
移出元素: ZREM key member [member ...]
该方案可以将订单超时时间戳与订单编号分别设置为score和member。系统扫描第一个元素判断是否超时,超时则进行业务处理。
然而,这一版存在一个致命的硬伤,在高并发条件下,多个消费者会取到同一个订单编号,又需要编写Lua脚本保证原子性或使用分布式锁,用了分布式锁性能又下降了。
RabbitMQ
RabbitMQ is the most widely deployed open source message broker
延迟队列
队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
延迟队列,最重要的特性就体现在它的延时属性上,跟普通队列不一样的是,普通队列中的元素总是等着希望被早点取出消费,而延迟队列中的元素则是希望在指定时间被取出消费,所以延迟队列中的元素是都是带时间属性的。
简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。
本文使用RabbitMQ也是通过延迟队列的机制来实现订单超时的处理。然而 RabbitMg自身并没有延迟队列这个功能,实现该功能一般有以下两种方式:
- 利用
TTL(Time To Live)
和DLX(Dead Letter Exchanges)
实现延迟队列 - 利用延迟队列插件
rabbitmq_delayed_message_exchange
实现
RabbitMQ延迟队列实现
实现方案一
- RabbitMQ配置类
@ConfigurationpublicclassRabbitMQConfig{// 声明 4个路由key 4个队列 2个交换机 属性// 延迟交换机publicstaticfinalStringDELAY_EXCHANGE_NAME="delay.exchange";// 延迟队列publicstaticfinalStringDELAY_Queue_A_NAME="delay.queue.a";publicstaticfinalStringDELAY_Queue_B_NAME="delay.queue.b";// 延迟队列路由keypublicstaticfinalStringDELAY_Queue_A_ROUTING_KEY="delay.queue.a.routingkey";publicstaticfinalStringDELAY_Queue_B_ROUTING_KEY="delay.queue.b.routingkey";// 死信交换机publicstaticfinalStringDEAD_LETTER_EXCHANGE_NAME="dead.letter.exchange";// 死信队列publicstaticfinalStringDEAD_LETTER_Queue_A_NAME="dead.letter.queue.a";publicstaticfinalStringDEAD_LETTER_Queue_B_NAME="dead.letter.queue.b";// 死信队列路由keypublicstaticfinalStringDEAD_LETTER_Queue_A_ROUTING_KEY="dead.letter.delay_10s.routingkey";publicstaticfinalStringDEAD_LETTER_Queue_B_ROUTING_KEY="dead.letter.delay_60s.routingkey";// 声明延迟交换机@BeanpublicDirectExchangedelayExchange(){returnnewDirectExchange(DELAY_EXCHANGE_NAME);}// 声明死信交换机@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange(DEAD_LETTER_EXCHANGE_NAME);}// 声明延迟队列A 延迟10s 并且绑定到对应的死信交换机中@BeanpublicQueuedelayQueueA(){Map<String,Object> args =newHashMap<>();
args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key",DEAD_LETTER_Queue_A_ROUTING_KEY);
args.put("x-message-ttl",10000);returnQueueBuilder.durable(DELAY_Queue_A_NAME).withArguments(args).build();}// 声明延迟队列A 延迟60s 并且绑定到对应的死信交换机中@BeanpublicQueuedelayQueueB(){Map<String,Object> args =newHashMap<>();
args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key",DEAD_LETTER_Queue_B_ROUTING_KEY);
args.put("x-message-ttl",60000);returnQueueBuilder.durable(DELAY_Queue_B_NAME).withArguments(args).build();}// 声明延迟队列A的绑定关系@BeanpublicBindingdelayBindingA(@Qualifier("delayQueueA")Queue queue,@Qualifier("delayExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_A_ROUTING_KEY);}// 声明延迟队列A的绑定关系@BeanpublicBindingdelayBindingB(@Qualifier("delayQueueB")Queue queue,@Qualifier("delayExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_B_ROUTING_KEY);}// 声明死信队列A,用于接收延迟10s处理的消息@BeanpublicQueuedeadLetterQueueA(){returnnewQueue(DEAD_LETTER_Queue_A_NAME);}// 声明死信队列B,用于接收延迟60s处理的消息@BeanpublicQueuedeadLetterQueueB(){returnnewQueue(DEAD_LETTER_Queue_B_NAME);}// 声明死信队列A的绑定关系@BeanpublicBindingdeadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue,@Qualifier("deadLetterExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_Queue_A_ROUTING_KEY);}// 声明死信队列B的绑定关系@BeanpublicBindingdeadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue,@Qualifier("deadLetterExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_Queue_B_ROUTING_KEY);}}
- 枚举类
@Getter@AllArgsConstructorpublicenumDelayTypeEnum{DELAY_10s(1),DELAY_60s(2);privateInteger type;publicstaticDelayTypeEnumgetDelayTypeEnum(Integer type){if(Objects.equals(type,DELAY_10s.type)){returnDELAY_10s;}if(Objects.equals(type,DELAY_60s.type)){returnDELAY_60s;}returnnull;}}
- 生产者
@ComponentpublicclassDelayMsgProducer{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsend(String msg,DelayTypeEnum type){switch(type){caseDELAY_10s:
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_Queue_A_ROUTING_KEY, msg);break;caseDELAY_60s:
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_Queue_B_ROUTING_KEY, msg);break;default:}}}
- 消费者
@Component@Slf4jpublicclassDeadLetterQueueConsumer{// 监听死信队列A@RabbitListener(queues =DEAD_LETTER_Queue_A_NAME)publicvoidreceiveA(Message message){// 获取消息String msg =newString(message.getBody());// 记录日志
log.info("当前时间: {}, 死信队列A收到消息: {}",LocalDateTime.now(), msg);}// 监听死信队列B@RabbitListener(queues =DEAD_LETTER_Queue_B_NAME)publicvoidreceiveB(Message message){// 获取消息String msg =newString(message.getBody());// 记录日志
log.info("当前时间: {}, 死信队列B收到消息: {}",LocalDateTime.now(), msg);}}
- 测试
实现方案二
方案一的问题可以通过将TTL设置在消息属性里来解决,然后添加一个延迟队列,用于接收设置为任意延迟时长的消息,再添加一个相应的死信队列和routingkey 即可,如下图:
- RabbitMQ配置文件
@ConfigurationpublicclassRabbitMQConfig{// 声明 2个交换机 2个队列 2个路由key 属性// 延迟交换机publicstaticfinalStringDELAY_EXCHANGE_NAME="delay.exchange";// 延迟队列publicstaticfinalStringDELAY_Queue_NAME="delay.queue";// 延迟队列路由keypublicstaticfinalStringDELAY_Queue_ROUTING_KEY="delay.queue.routingkey";// 死信交换机publicstaticfinalStringDEAD_LETTER_EXCHANGE_NAME="dead.letter.exchange";// 死信队列publicstaticfinalStringDEAD_LETTER_Queue_NAME="dead.letter.queue";// 死信队列路由keypublicstaticfinalStringDEAD_LETTER_Queue_ROUTING_KEY="dead.letter.routingkey";// 声明延迟交换机@BeanpublicDirectExchangedelayExchange(){returnnewDirectExchange(DELAY_EXCHANGE_NAME);}// 声明死信交换机@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange(DEAD_LETTER_EXCHANGE_NAME);}// 声明延迟队列 并且绑定到对应的死信交换机中@BeanpublicQueuedelayQueue(){Map<String,Object> args =newHashMap<>();
args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key",DEAD_LETTER_Queue_ROUTING_KEY);returnQueueBuilder.durable(DELAY_Queue_NAME).withArguments(args).build();}// 声明延迟队列的绑定关系@BeanpublicBindingdelayBinding(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_ROUTING_KEY);}// 声明死信队列@BeanpublicQueuedeadLetterQueue(){returnnewQueue(DEAD_LETTER_Queue_NAME);}// 声明死信队列的绑定关系@BeanpublicBindingdeadLetterBindingA(@Qualifier("deadLetterQueue")Queue queue,@Qualifier("deadLetterExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_Queue_ROUTING_KEY);}}
- 生产者
@ComponentpublicclassDelayMsgProducer{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsend(String message,String delayTime){
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_Queue_ROUTING_KEY, message, msg ->{// 设置消息到期时间
msg.getMessageProperties().setExpiration(delayTime);return msg;});}}
- 消费者
@Component@Slf4jpublicclassDeadLetterQueueConsumer{// 监听死信队列@RabbitListener(queues =DEAD_LETTER_Queue_NAME)publicvoidreceiveA(Message message){// 获取消息String msg =newString(message.getBody());// 记录日志
log.info("当前时间: {}, 死信队列收到消息: {}",LocalDateTime.now(), msg);}}
- controller
@RestController@Slf4j@RequestMapping("/rabbitmq")publicclassRabbitMQController{@ResourceprivateDelayMsgProducer producer;@ApiOperation(value ="发送消息")@GetMapping("/send/{msg}/{delayTime}")publicvoidsend(@PathVariableString msg,@PathVariableString delayTime){
log.info("当前时间: {}, 消息: {}, 延迟时间: {}",LocalDateTime.now(), msg, delayTime);
producer.send(msg, delayTime);}}
- 测试 问题:10秒的消息没有被提前释放,发送两条消息,如果后面的消息过期时间比前面的短,则要等着前面消息的过期时间到了再一起被释放
实现方案三(插件)
linux环境docker安装rabbitmq插件
- 下载插件
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.8.5
docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez b40e96969299:/plugins/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
方案二的问题可以通过安装RabbitMQ的社区插件
rabbitmq_delayed_message_exchange
来解决。
安装插件后会生成新的Exchange类型
x-delayed-message
,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列,而是存储在
mnesia
(一个分布式数据库)中,随后监测消息延迟时间,如达到可投递时间时将其通过
x-delayed-type
类型标记的交换机投递至目标队列。
- 配置类
@ConfigurationpublicclassRabbitMQConfig{// 声明 1个交换机 1个队列 1个路由key 属性// 延迟交换机publicstaticfinalStringDELAY_EXCHANGE_NAME="delay.exchange";// 延迟队列publicstaticfinalStringDELAY_Queue_NAME="delay.queue";// 延迟队列路由keypublicstaticfinalStringDELAY_Queue_ROUTING_KEY="delay.queue.routingkey";// 声明延迟交换机@BeanpublicCustomExchangedelayExchange(){Map<String,Object> args =newHashMap<>();
args.put("x-delayed-type","direct");returnnewCustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false, args);}// 声明延迟队列@BeanpublicQueuedelayQueue(){returnQueueBuilder.durable(DELAY_Queue_NAME).build();}// 声明延迟队列的绑定关系@BeanpublicBindingdelayBinding(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange")CustomExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_ROUTING_KEY).noargs();}}
- 生产者
@ComponentpublicclassDelayMsgProducer{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsend(String message,Integer delayTime){
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_Queue_ROUTING_KEY, message, msg ->{// 设置消息到期时间
msg.getMessageProperties().setDelay(delayTime);return msg;});}}
- 消费者
@Component@Slf4jpublicclassDeadLetterQueueConsumer{// 监听死信队列@RabbitListener(queues =RabbitMQConfig.DELAY_Queue_NAME)publicvoidreceiveA(Message message){// 获取消息String msg =newString(message.getBody());// 记录日志
log.info("当前时间: {}, 延迟队列收到消息: {}",LocalDateTime.now(), msg);}}
- 测试 - 没有因为前面一条消息延迟时间长而影响下一条消息
订单超时实战
- yml
spring:rabbitmq:host: 192.168.183.139
port:5672username: admin
password: admin
virtual-host: my_vhost
datasource:driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: root
password:123456mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
map-underscore-to-camel-case:truemapper-locations: classpath:mapper/*.xmltype-aliases-package: com.zdz.entity
global-config:db-config:id-type: auto
# 自定义配置order:delay:time:60000# 1分钟 单位毫秒
- enum
@Getter@AllArgsConstructorpublicenumOrderStatus{//0待确定 1已确定 2已收获 3已取消 4已完成 5已作废no_confirm(0,"待确定"),has_confirm(1,"已确定"),has_receive(2,"已收获"),cancel(3,"已取消 "),complete(4,"已完成"),discard(5,"已作废");privateInteger status;privateString message;}
@Getter@AllArgsConstructorpublicenumPayStatus{//0 等待支付 1 已支付 2 部分支付no_pay(0,"等待支付"),has_pay(1,"已支付"),part_pay(2,"部分支付");privateInteger status;//状态privateString message;//描述}
- 配置类
@ConfigurationpublicclassRabbitMQConfig{// 声明 1个交换机 1个队列 1个路由key 属性// 延迟交换机publicstaticfinalStringDELAY_EXCHANGE_NAME="delay.exchange";// 延迟队列publicstaticfinalStringDELAY_Queue_NAME="delay.queue";// 延迟队列路由keypublicstaticfinalStringDELAY_Queue_ROUTING_KEY="delay.queue.routingkey";// 声明延迟交换机@BeanpublicCustomExchangedelayExchange(){Map<String,Object> args =newHashMap<>();
args.put("x-delayed-type","direct");returnnewCustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false, args);}// 声明延迟队列@BeanpublicQueuedelayQueue(){returnQueueBuilder.durable(DELAY_Queue_NAME).build();}// 声明延迟队列的绑定关系@BeanpublicBindingdelayBinding(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange")CustomExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_ROUTING_KEY).noargs();}}
- service
@ServicepublicclassOrderServiceImplextendsServiceImpl<OrderMapper,Order>implementsOrderService{@ResourceprivateOrderMapper orderMapper;@ResourceprivateDelayMsgProducer producer;@Value("${order.delay.time}")privateInteger orderDelayTime;/**
* 新增订单
* @param order
* @return
*/@Transactional@OverridepublicMap<String,Object>saveOrder(Order order){// 订单编号
order.setOrderSn(IdUtil.getSnowflake(1,1).nextIdStr());// 订单状态 0 待支付
order.setOrderStatus(OrderStatus.no_confirm.getStatus());// 支付状态 0 等待支付
order.setPayStatus(PayStatus.no_pay.getStatus());// 下单时间
order.setOrderTime(newDate());// 新增订单int insert = orderMapper.insert(order);Map<String,Object> map =newHashMap<>();if(insert >0){
map.put("code",200);
map.put("msg","订单已提交");// 发送消息到队列, 设置消息延迟时间
producer.send(order.getOrderSn(), orderDelayTime);}else{
map.put("code",400);
map.put("msg","订单提交失败");}return map;}/**
* 根据用户id查询订单列表
* @param userId
* @return
*/publicList<Order>getAllByUserId(Integer userId){QueryWrapper<Order> queryWrapper =newQueryWrapper<>();
queryWrapper.eq("user_id", userId);return orderMapper.selectList(queryWrapper);}}
- controller
@PostMapping("/save")@ApiOperation(value ="提交订单")publicMap<String,Object>saveOrder(@RequestBodyOrder order){
log.info("订单信息: {}", order);return orderService.saveOrder(order);}
- producer
@Component@Slf4jpublicclassDelayMsgProducer{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidsend(String message,Integer delayTime){
log.info("发送订单编号到队列,当前时间: {}, 订单编号: {}",LocalDateTime.now(), message);
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_Queue_ROUTING_KEY, message, msg ->{// 设置消息到期时间
msg.getMessageProperties().setDelay(delayTime);return msg;});}}
- consumer
@Component@Slf4jpublicclassDeadLetterQueueConsumer{@ResourceprivateOrderMapper orderMapper;@ResourceprivateOrderActionMapper orderActionMapper;// 监听延迟队列@RabbitListener(queues =RabbitMQConfig.DELAY_Queue_NAME)publicvoidreceiveA(Message message){// 获取消息String orderSn =newString(message.getBody());// 记录日志
log.info("当前时间: {}, 订单编号: {}",LocalDateTime.now(), orderSn);// 根据订单编号查询订单QueryWrapper<Order> wrapper =newQueryWrapper<>();
wrapper.eq("order_sn", orderSn);Order order = orderMapper.selectOne(wrapper);
log.info("订单信息: {}", order);// 如果订单不为空并且支付状态是未支付并且订单状态是待确认if(order !=null&&OrderStatus.no_confirm.getStatus().equals(order.getOrderStatus())&&PayStatus.no_pay.getStatus().equals(order.getPayStatus())){// 设置订单状态为3 已取消
order.setOrderStatus(OrderStatus.cancel.getStatus());// 根据订单编写修改订单int result = orderMapper.updateById(order);if(result >0){OrderAction orderAction =newOrderAction();
orderAction.setOrderSn(orderSn);
orderAction.setOrderStatus(OrderStatus.cancel.getStatus());
orderAction.setPayStatus(PayStatus.no_pay.getStatus());
orderAction.setActionNote("支付超时,订单已取消");
orderAction.setActionTime(newDate());
orderAction.setStatusDesc("支付超时,订单已取消");// 新增订单操作
orderActionMapper.insert(orderAction);}}}}
- 测试
版权归原作者 富贵135 所有, 如有侵权,请联系我们删除。