0


谷粒商城のRabbitMQ高级篇&最终一致性解决方案。

文章目录


前言

  本篇介绍RabbitMQ的

延迟队列

死信队列

,以及利用这些实现订单关单,库存解锁的分布式事务最终一致性解决方案。
  对应视频

P292-P300

一、延迟&死信队列

1、死信队列

  死信队列是一种处理"不合格"消息的机制,当消息在原队列中无法被正常消费时,会被转发到另一个队列(死信队列)供后续分析或处理。
  死信产生的原因:

  • 启动手动ACK时,消息被拒收,并且没有重新放回队列。
  • 队列设置了TTL,消息到期后仍然没有被消费。
  • 队列中的消息达到了最大长度,最早入队的消息会被移除。

  死信队列本质上就是普通队列,在构造时的

Map<String, Object> arguments

参数传递了

x-dead-letter-exchange

(死信交换机),

x-dead-letter-routing-key

(死信路由键),该队列中的消息一旦满足上面的条件,就会根据路由键将消息发到指定的交换机上。
在这里插入图片描述

2、延迟队列

  延迟队列用于将消息延迟一段时间后再投递到指定的目标队列。其实现方式通常是通过

死信队列

结合TTL过期时间。需要在定义死信队列时,

Map<String, Object> arguments

参数额外传递一个

x-message-ttl

(过期时间)。如果需要设计一套延迟队列,通常如下:
在这里插入图片描述

二、库存解锁的分布式事务最终一致性

  在前篇中提到,单体事务的

@Transcational

注解无法控制远程调用的服务的回滚,同时使用seata的AT模式对于下单这样的高并发场景性能损失大。根据BASE理论的最终一致性,可以有如下的方案:

2.1、队列架构设计

在这里插入图片描述
  需要定义两个队列,一个交换机,两个绑定关系:

@ConfigurationpublicclassMyMQConfig{//    /**//     * 队列,交换机是懒加载的,只有第一次监听消息发现不存在的时候才会创建//     * @param message//     * @param channel//     *///    @RabbitListener//    public void listener(Message message, Channel channel){////    }/**
     * 创建交换机
     * @return
     */@BeanpublicExchangestockEventExchange(){returnnewTopicExchange("stock-event-exchange",true,false);}/**
     * 延迟队列 50分钟
     * @return
     */@BeanpublicQueuestockDelayQueue(){HashMap<String,Object> map =newHashMap<>();//死信交换机
        map.put("x-dead-letter-exchange","stock-event-exchange");//路由键
        map.put("x-dead-letter-routing-key","stock.release");//过期时间
        map.put("x-message-ttl",120000);returnnewQueue("stock.delay.queue",true,false,false, map);}/**
     * 解锁库存消息队列
     * @return
     */@BeanpublicQueuestockReleaseQueue(){returnnewQueue("stock.release.stock.queue",true,false,false);}/**
     * 将延迟队列绑定到交换机
     * @return
     */@BeanpublicBindingdelayQueueToExchange(){returnnewBinding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}/**
     * 解锁库存消息队列绑定到交换机
     * @return
     */@BeanpublicBindingstockReleaseToExchange(){returnnewBinding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}}

2、业务设计

  根据上面的绑定关系:

  1. 库存服务在锁定库存后,向stock-event-exchange中使用stock.locked路由键发送一条消息。
  2. 交换机根据路由键找到stock.delay.queue队列。
  3. stock.delay.queue队列中的消息在40分钟后到期,带着stock.release路由键重新回到交换机。
  4. 交换机根据路由键,将消息转发到stock.release.stock.queue队列。
  5. 库存服务监听stock.release.stock.queue队列。

  库存服务监听到

stock.release.stock.queue

队列的消息后,应该进行判断:

  • 40分钟前锁定的库存工作单是否还存在?如果不存在,说明是库存服务出现问题,库存和订单都进行了回滚,此时无需解锁库存。
  • 根据orderSn查询订单表,可能会有两种情况:   1. 订单表不存在,说明是下单和扣库存之后又出现了异常,订单回滚了,库存没有回滚,需要解锁库存。   2. 订单表存在,就要判定状态,如果状态是已取消才需要解锁库存。

  业务实现关键代码:
  类上加

@RabbitListener(queues = "stock.release.stock.queue")

注解,监听指定的队列。

@Override@Transactional(rollbackFor =Exception.class)publicvoidstockNum(StockLockVO vo){
        log.info("Seata全局事务id=================>{}",RootContext.getXID());String orderSn = vo.getOrderSn();List<CartItem> cartItems = vo.getCartItems();ArrayList<HasStock> hasStocks =newArrayList<>();//锁定库存之前先创建一个工作单WareOrderTaskEntity wareOrderTaskEntity =newWareOrderTaskEntity();
        wareOrderTaskEntity.setOrderSn(orderSn);
        wareOrderTaskDao.insert(wareOrderTaskEntity);for(CartItem cartItem : cartItems){HasStock hasStock =newHasStock();Long skuId = cartItem.getSkuId();//找到这个skuId在哪个仓库有库存List<Long> wareIds = wareSkuDao.selectWareIdBySkuId(skuId);
            hasStock.setSkuId(skuId);
            hasStock.setWareIds(wareIds);
            hasStock.setCount(cartItem.getCount());
            hasStocks.add(hasStock);}for(HasStock hasStock : hasStocks){boolean singleLocked =false;Long skuId = hasStock.getSkuId();Integer count = hasStock.getCount();List<Long> wareIds = hasStock.getWareIds();if(CollectionUtils.isEmpty(wareIds)){thrownewNoStockException(skuId);}//依次锁定库存,这个仓库库存锁定完了,还需要继续锁定,就锁定下一个仓库的for(Long wareId : wareIds){int resCount = wareSkuDao.stockLock(skuId, wareId, count);if(resCount >0){
                    singleLocked =true;//锁定成功,创建库存工作单详情WareOrderTaskDetailEntity wareOrderTaskDetailEntity =newWareOrderTaskDetailEntity();
                    wareOrderTaskDetailEntity.setWareId(wareId);
                    wareOrderTaskDetailEntity.setTaskId(wareOrderTaskEntity.getId());
                    wareOrderTaskDetailEntity.setSkuNum(hasStock.getCount());
                    wareOrderTaskDetailEntity.setSkuId(skuId);
                    wareOrderTaskDetailEntity.setLockStatus(1);
                    wareOrderTaskDetailDao.insert(wareOrderTaskDetailEntity);//向消息队列发送延迟消息 用于后续判断是否需要解锁库存StockLockedTO stockLockedTO =newStockLockedTO();
                    stockLockedTO.setId(wareOrderTaskEntity.getId());
                    stockLockedTO.setDetailId(wareOrderTaskDetailEntity.getId());
                    rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked", stockLockedTO);break;}}//某个skuId所有仓库都没有锁住if(!singleLocked){thrownewNoStockException(skuId);}}}

  监听

stock.release.stock.queue

队列,进行库存解锁的逻辑:

/**
     * 监听stock.release.stock.queue队列,进行库存解锁
     */@RabbitHandlerpublicvoidstockRelease(StockLockedTO stockLockedTO,Message message,Channel channel)throwsIOException{
        log.info("订单已关闭,准备被动解锁库存");Long id = stockLockedTO.getId();Long detailId = stockLockedTO.getDetailId();//首先用上面两个id查询wareOrderTaskDao和wareOrderTaskDetailDao//如果两张表都不存在,说明是锁库存的时候就报错了,不需要进行处理,订单也跟着回滚了WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskDao.selectById(id);WareOrderTaskDetailEntity wareOrderTaskDetailEntity = wareOrderTaskDetailDao.selectById(detailId);if(!ObjectUtils.isEmpty(wareOrderTaskDetailEntity)&&!ObjectUtils.isEmpty(wareOrderTaskEntity)){//根据orderSn查询订单表(远程调用订单服务),如果订单表不存在,说明是下单和扣库存之后又出现了异常,订单回滚了,这里也要解锁库存OrderEntity order =null;try{
                order = orderRemotesServiceClient.getOrder(wareOrderTaskEntity.getOrderSn());}catch(Exception e){//远程调用失败,拒收消息
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}Long wareId = wareOrderTaskDetailEntity.getWareId();Long skuId = wareOrderTaskDetailEntity.getSkuId();Integer skuNum = wareOrderTaskDetailEntity.getSkuNum();if(ObjectUtils.isEmpty(order)){//必须解锁库存this.doStockRelease(wareId, skuId, skuNum,detailId);}if(order.getStatus()==4){this.doStockRelease(wareId, skuId, skuNum,detailId);}//手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}//  //如果两张表都不存在,说明是锁库存的时候就报错了,不需要进行处理,订单也跟着回滚了else{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}

三、订单关单的分布式事务最终一致性

2.1、队列架构设计

  需要定义两个队列,一个交换机,三个绑定关系在这里插入图片描述

  这一套的绑定关系中,与库存解锁最大的不同在于,订单关单后,还需要通过交换机主动给库存服务的

stock.release.stock.queue

队列发送一条消息作为兜底,主动要求库存服务判断是否需要关单。
  原因在于,最初的设计中,订单延迟30分钟关单,库存服务延迟40分钟判断是否需要解锁库存。而订单服务因为各种原因,没能及时消费消息修改订单状态,此时库存解锁就先于关单执行,去查订单的状态还是新建状态,库存服务就不会解锁,并且将消息消费完成。订单服务在这个时候去改状态,库存就无法再次解锁了。

@ConfigurationpublicclassMyMQConfig{/**
     * 创建交换机
     * @return
     */@BeanpublicExchangeorderEventExchange(){returnnewTopicExchange("order-event-exchange",true,false);}/**
     * 定义死信队列,消息到达三十分钟后重新回到order-event-exchange交换机
     * @return
     */@BeanpublicQueueorderDelayQueue(){HashMap<String,Object> map =newHashMap<>();//死信交换机
        map.put("x-dead-letter-exchange","order-event-exchange");//路由键
        map.put("x-dead-letter-routing-key","order.release.order");//过期时间
        map.put("x-message-ttl",60000);returnnewQueue("order.delay.queue",true,false,false,map);}/**
     * 定义订单释放队列
     * @return
     */@BeanpublicQueueorderReleaseOrderQueue(){returnnewQueue("order.release.order.queue",true,false,false);}/**
     * 将order.delay.queue队列和order-event-exchange交换机绑定,路由键order.create.order
     * @return
     */@BeanpublicBindingorderCreateOrderBinding(){returnnewBinding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}/**
     * 将order.release.order.queue队列和order-event-exchange交换机绑定,路由键order.release.order
     * @return
     */@BeanpublicBindingorderReleaseOrderBinding(){returnnewBinding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}/**
     * 用于订单关单后,将消息通过order-event-exchange,路由键order.release.other.#,发送给stock.release.order.queue
     * @return
     */@BeanpublicBindingorderReleaseOtherOrderBinding(){returnnewBinding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);}}

2、业务设计

  根据上面的绑定关系:

  1. 订单服务下单完成后,向order-event-exchange中使用order.created.order路由键发送一条消息。
  2. 交换机根据路由键找到order.delay.queue队列。
  3. order.delay.queue队列中的消息在40分钟后到期,带着order.release.order路由键重新回到交换机。
  4. 交换机根据路由键,将消息转发到order.release.order.queue队列。
  5. 订单服务监听order.release.order.queue队列。
  6. 订单服务进行判断是否需要关单,如果需要关单,关单完成后再次向order-event-exchange使用order.release.other.#路由键发送一条消息。
  7. 库存服务监听stock.release.stock.queue队列,再次判断是否应该解锁库存。

  订单服务关键代码:
  类上加

@RabbitListener(queues = "order.release.order.queue")

注解,监听指定队列。

/**
     * 提交订单
     * @param dto   前端传递的订单信息
     * @return
     */@Override@Transactional(rollbackFor =Exception.class)//这里的事务仅仅能保证订单数据入表,不能保证远程查库存//    @GlobalTransactionalpublicSubmitOrderResponseVOsubmitOrder(SubmitOrderDTO dto){//... 创建订单,锁定库存业务            //发送消息到order-event-exchange交换机 30分钟后从order.release.order.queue 队列中取消息,判断状态,是否需要关单this.releaseOrder(vo.getOrder());return vo;}privatevoidreleaseOrder(OrderEntity order){
        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order", order);}@OverridepublicvoiddoReleaseOrder(OrderEntity order){Integer status = order.getStatus();//需要关单if(Objects.equals(status,OrderStatusEnum.CREATE_NEW.getCode())){OrderEntity orderEntityForUpd =newOrderEntity();
            orderEntityForUpd.setId(order.getId());
            orderEntityForUpd.setStatus(OrderStatusEnum.CANCLED.getCode());updateById(orderEntityForUpd);//order-event-exchange发消息到 stock.release.order.queue 库存服务监听//为了防止一种极端情况,就是订单关单由于系统卡顿,一直无法进行关单,库存消息优先到期,判断状态一直解锁不了库存//所以在关单之后主动给库存服务发一个消息进行兜底
            rabbitTemplate.convertAndSend("order-event-exchange","order.release.other", order);}}

  这里写一个类专门去监听消息:

@Slf4j@Component@RabbitListener(queues ="order.release.order.queue")publicclassMyRabbitListener{@AutowiredprivateOrderService orderService;@RabbitHandlerpublicvoidreleaseOrder(OrderEntity order,Message message,Channel channel)throwsIOException{
        log.info("接收到关单请求,订单号:{}",order.getOrderSn());try{
            orderService.doReleaseOrder(order);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}

  库存服务中,进行处理:

/**
     * 监听 关单后 主动发送给队列的消息
     * @param order
     * @param message
     * @param channel
     * @throws IOException
     */@RabbitHandlerpublicvoidstockRelease(OrderEntity order,Message message,Channel channel)throwsIOException{
        log.info("订单已关闭,准备主动解锁库存");try{//再次判断工作单详情表的状态,筛选出是1的订单进行关单String orderSn = order.getOrderSn();WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskDao.selectOne(newQueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));Long id = wareOrderTaskEntity.getId();List<WareOrderTaskDetailEntity> wareOrderTaskDetailEntities = wareOrderTaskDetailDao.selectList(newQueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id));List<WareOrderTaskDetailEntity> collect = wareOrderTaskDetailEntities.stream().filter(wareOrderTaskDetailEntity -> wareOrderTaskDetailEntity.getLockStatus()==1).collect(Collectors.toList());if(!CollectionUtils.isEmpty(collect)){for(WareOrderTaskDetailEntity wareOrderTaskDetailEntity : collect){//解锁doStockRelease(wareOrderTaskDetailEntity.getWareId(), wareOrderTaskDetailEntity.getSkuId(), wareOrderTaskDetailEntity.getSkuNum(),id);}}//手动确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){//重新放回队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}

3、消息丢失、积压、重复消费解决方案

3.1、消息丢失

  关于消息丢失,在RabbitMQ基础篇中已经提到,可以通过回调函数,以及手动ACK的方式解决。除此之外,在进行业务代码编写时,还需要加入重试机制,利用try-catch捕获异常,拒绝签收并重新放回队列。但是不应该无限重试。合理的做法是创建一个消息日志表,记录消息的信息和状态,并且定期扫描数据库对未发送成功的消息进行重发。

3.2、消息重复

  可能会存在这样一种情况,消息已经成功消费,并执行了业务逻辑,在手动ACK的时候系统宕机或出现了异常(执行业务代码和手动ACK不是原子性操作,分为了两步。),这时消息会重新回到ready状态,重新投递。这样就类似于重复提交,可以在业务代码中加上状态位条件校验,或者每条消息附带一个唯一标识(messageId),在处理前检查该消息是否已经处理过,避免重复操作。

3.3、消息积压

  RabbitMQ队列中的消息堆积过多,消费者无法及时处理,导致系统性能下降或消息延迟。如果业务允许,可以对消息进行批量处理(Batch Processing),一次消费多条消息,减少每条消息的处理开销。同时对于不重要或时效性强的消息,如果积压严重,可以设置消息的TTL或使用死信队列,将过期或处理失败的消息转移或丢弃。也可以增加消费者实例来并行处理消息,缓解单个消费者处理慢的问题,或对于耗时较长的操作,可以将其改为异步处理,快速从队列中取出消息,避免队列积压。


下一篇:秒杀服务。


本文转载自: https://blog.csdn.net/2301_77599076/article/details/143092239
版权归原作者 西岭千秋雪_ 所有, 如有侵权,请联系我们删除。

“谷粒商城のRabbitMQ高级篇&最终一致性解决方案。”的评论:

还没有评论