文章目录
前言
本篇介绍RabbitMQ的
延迟队列
,
死信队列
,以及利用这些实现订单关单,库存解锁的分布式事务最终一致性解决方案。
对应视频
P292-P300
一、延迟&死信队列
1、死信队列
死信队列是一种处理"不合格"消息的机制,当消息在原队列中无法被正常消费时,会被转发到另一个队列(死信队列)供后续分析或处理。
死信产生的原因:
- 启动手动ACK时,消息被拒收,并且没有重新放回队列。
- 队列设置了TTL,消息到期后仍然没有被消费。
- 队列中的消息达到了最大长度,最早入队的消息会被移除。
死信队列本质上就是普通队列,在构造时的
Map<String, Object> arguments
参数传递了
x-dead-letter-exchange
(死信交换机),
x-dead-letter-routing-key
(死信路由键),该队列中的消息一旦满足上面的条件,就会根据路由键将消息发到指定的交换机上。
2、延迟队列
延迟队列用于将消息延迟一段时间后再投递到指定的目标队列。其实现方式通常是通过
死信队列
结合TTL过期时间。需要在定义死信队列时,
Map<String, Object> arguments
参数额外传递一个
x-message-ttl
(过期时间)。如果需要设计一套延迟队列,通常如下:
二、库存解锁的分布式事务最终一致性
在前篇中提到,单体事务的
@Transcational
注解无法控制远程调用的服务的回滚,同时使用seata的AT模式对于下单这样的高并发场景性能损失大。根据BASE理论的最终一致性,可以有如下的方案:
2.1、队列架构设计
需要定义两个队列,一个交换机,两个绑定关系:
@ConfigurationpublicclassMyMQConfig{// /**// * 队列,交换机是懒加载的,只有第一次监听消息发现不存在的时候才会创建// * @param message// * @param channel// */// @RabbitListener// public void listener(Message message, Channel channel){//// }/**
* 创建交换机
* @return
*/@BeanpublicExchangestockEventExchange(){returnnewTopicExchange("stock-event-exchange",true,false);}/**
* 延迟队列 50分钟
* @return
*/@BeanpublicQueuestockDelayQueue(){HashMap<String,Object> map =newHashMap<>();//死信交换机
map.put("x-dead-letter-exchange","stock-event-exchange");//路由键
map.put("x-dead-letter-routing-key","stock.release");//过期时间
map.put("x-message-ttl",120000);returnnewQueue("stock.delay.queue",true,false,false, map);}/**
* 解锁库存消息队列
* @return
*/@BeanpublicQueuestockReleaseQueue(){returnnewQueue("stock.release.stock.queue",true,false,false);}/**
* 将延迟队列绑定到交换机
* @return
*/@BeanpublicBindingdelayQueueToExchange(){returnnewBinding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}/**
* 解锁库存消息队列绑定到交换机
* @return
*/@BeanpublicBindingstockReleaseToExchange(){returnnewBinding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}}
2、业务设计
根据上面的绑定关系:
- 库存服务在锁定库存后,向
stock-event-exchange
中使用stock.locked
路由键发送一条消息。 - 交换机根据路由键找到
stock.delay.queue
队列。 stock.delay.queue
队列中的消息在40分钟后到期,带着stock.release
路由键重新回到交换机。- 交换机根据路由键,将消息转发到
stock.release.stock.queue
队列。 - 库存服务监听
stock.release.stock.queue
队列。
库存服务监听到
stock.release.stock.queue
队列的消息后,应该进行判断:
- 40分钟前锁定的库存工作单是否还存在?如果不存在,说明是库存服务出现问题,库存和订单都进行了回滚,此时无需解锁库存。
- 根据orderSn查询订单表,可能会有两种情况: 1. 订单表不存在,说明是下单和扣库存之后又出现了异常,订单回滚了,库存没有回滚,需要解锁库存。 2. 订单表存在,就要判定状态,如果状态是
已取消
才需要解锁库存。
业务实现关键代码:
类上加
@RabbitListener(queues = "stock.release.stock.queue")
注解,监听指定的队列。
@Override@Transactional(rollbackFor =Exception.class)publicvoidstockNum(StockLockVO vo){
log.info("Seata全局事务id=================>{}",RootContext.getXID());String orderSn = vo.getOrderSn();List<CartItem> cartItems = vo.getCartItems();ArrayList<HasStock> hasStocks =newArrayList<>();//锁定库存之前先创建一个工作单WareOrderTaskEntity wareOrderTaskEntity =newWareOrderTaskEntity();
wareOrderTaskEntity.setOrderSn(orderSn);
wareOrderTaskDao.insert(wareOrderTaskEntity);for(CartItem cartItem : cartItems){HasStock hasStock =newHasStock();Long skuId = cartItem.getSkuId();//找到这个skuId在哪个仓库有库存List<Long> wareIds = wareSkuDao.selectWareIdBySkuId(skuId);
hasStock.setSkuId(skuId);
hasStock.setWareIds(wareIds);
hasStock.setCount(cartItem.getCount());
hasStocks.add(hasStock);}for(HasStock hasStock : hasStocks){boolean singleLocked =false;Long skuId = hasStock.getSkuId();Integer count = hasStock.getCount();List<Long> wareIds = hasStock.getWareIds();if(CollectionUtils.isEmpty(wareIds)){thrownewNoStockException(skuId);}//依次锁定库存,这个仓库库存锁定完了,还需要继续锁定,就锁定下一个仓库的for(Long wareId : wareIds){int resCount = wareSkuDao.stockLock(skuId, wareId, count);if(resCount >0){
singleLocked =true;//锁定成功,创建库存工作单详情WareOrderTaskDetailEntity wareOrderTaskDetailEntity =newWareOrderTaskDetailEntity();
wareOrderTaskDetailEntity.setWareId(wareId);
wareOrderTaskDetailEntity.setTaskId(wareOrderTaskEntity.getId());
wareOrderTaskDetailEntity.setSkuNum(hasStock.getCount());
wareOrderTaskDetailEntity.setSkuId(skuId);
wareOrderTaskDetailEntity.setLockStatus(1);
wareOrderTaskDetailDao.insert(wareOrderTaskDetailEntity);//向消息队列发送延迟消息 用于后续判断是否需要解锁库存StockLockedTO stockLockedTO =newStockLockedTO();
stockLockedTO.setId(wareOrderTaskEntity.getId());
stockLockedTO.setDetailId(wareOrderTaskDetailEntity.getId());
rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked", stockLockedTO);break;}}//某个skuId所有仓库都没有锁住if(!singleLocked){thrownewNoStockException(skuId);}}}
监听
stock.release.stock.queue
队列,进行库存解锁的逻辑:
/**
* 监听stock.release.stock.queue队列,进行库存解锁
*/@RabbitHandlerpublicvoidstockRelease(StockLockedTO stockLockedTO,Message message,Channel channel)throwsIOException{
log.info("订单已关闭,准备被动解锁库存");Long id = stockLockedTO.getId();Long detailId = stockLockedTO.getDetailId();//首先用上面两个id查询wareOrderTaskDao和wareOrderTaskDetailDao//如果两张表都不存在,说明是锁库存的时候就报错了,不需要进行处理,订单也跟着回滚了WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskDao.selectById(id);WareOrderTaskDetailEntity wareOrderTaskDetailEntity = wareOrderTaskDetailDao.selectById(detailId);if(!ObjectUtils.isEmpty(wareOrderTaskDetailEntity)&&!ObjectUtils.isEmpty(wareOrderTaskEntity)){//根据orderSn查询订单表(远程调用订单服务),如果订单表不存在,说明是下单和扣库存之后又出现了异常,订单回滚了,这里也要解锁库存OrderEntity order =null;try{
order = orderRemotesServiceClient.getOrder(wareOrderTaskEntity.getOrderSn());}catch(Exception e){//远程调用失败,拒收消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}Long wareId = wareOrderTaskDetailEntity.getWareId();Long skuId = wareOrderTaskDetailEntity.getSkuId();Integer skuNum = wareOrderTaskDetailEntity.getSkuNum();if(ObjectUtils.isEmpty(order)){//必须解锁库存this.doStockRelease(wareId, skuId, skuNum,detailId);}if(order.getStatus()==4){this.doStockRelease(wareId, skuId, skuNum,detailId);}//手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}// //如果两张表都不存在,说明是锁库存的时候就报错了,不需要进行处理,订单也跟着回滚了else{
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
三、订单关单的分布式事务最终一致性
2.1、队列架构设计
需要定义两个队列,一个交换机,三个绑定关系
这一套的绑定关系中,与库存解锁最大的不同在于,订单关单后,还需要通过交换机主动给库存服务的
stock.release.stock.queue
队列发送一条消息作为兜底,主动要求库存服务判断是否需要关单。
原因在于,最初的设计中,订单延迟30分钟关单,库存服务延迟40分钟判断是否需要解锁库存。而订单服务因为各种原因,没能及时消费消息修改订单状态,此时库存解锁就先于关单执行,去查订单的状态还是新建状态,库存服务就不会解锁,并且将消息消费完成。订单服务在这个时候去改状态,库存就无法再次解锁了。
@ConfigurationpublicclassMyMQConfig{/**
* 创建交换机
* @return
*/@BeanpublicExchangeorderEventExchange(){returnnewTopicExchange("order-event-exchange",true,false);}/**
* 定义死信队列,消息到达三十分钟后重新回到order-event-exchange交换机
* @return
*/@BeanpublicQueueorderDelayQueue(){HashMap<String,Object> map =newHashMap<>();//死信交换机
map.put("x-dead-letter-exchange","order-event-exchange");//路由键
map.put("x-dead-letter-routing-key","order.release.order");//过期时间
map.put("x-message-ttl",60000);returnnewQueue("order.delay.queue",true,false,false,map);}/**
* 定义订单释放队列
* @return
*/@BeanpublicQueueorderReleaseOrderQueue(){returnnewQueue("order.release.order.queue",true,false,false);}/**
* 将order.delay.queue队列和order-event-exchange交换机绑定,路由键order.create.order
* @return
*/@BeanpublicBindingorderCreateOrderBinding(){returnnewBinding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}/**
* 将order.release.order.queue队列和order-event-exchange交换机绑定,路由键order.release.order
* @return
*/@BeanpublicBindingorderReleaseOrderBinding(){returnnewBinding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}/**
* 用于订单关单后,将消息通过order-event-exchange,路由键order.release.other.#,发送给stock.release.order.queue
* @return
*/@BeanpublicBindingorderReleaseOtherOrderBinding(){returnnewBinding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);}}
2、业务设计
根据上面的绑定关系:
- 订单服务下单完成后,向
order-event-exchange
中使用order.created.order
路由键发送一条消息。 - 交换机根据路由键找到
order.delay.queue
队列。 order.delay.queue
队列中的消息在40分钟后到期,带着order.release.order
路由键重新回到交换机。- 交换机根据路由键,将消息转发到
order.release.order.queue
队列。 - 订单服务监听
order.release.order.queue
队列。 - 订单服务进行判断是否需要关单,如果需要关单,关单完成后再次向
order-event-exchange
使用order.release.other.#
路由键发送一条消息。 - 库存服务监听
stock.release.stock.queue
队列,再次判断是否应该解锁库存。
订单服务关键代码:
类上加
@RabbitListener(queues = "order.release.order.queue")
注解,监听指定队列。
/**
* 提交订单
* @param dto 前端传递的订单信息
* @return
*/@Override@Transactional(rollbackFor =Exception.class)//这里的事务仅仅能保证订单数据入表,不能保证远程查库存// @GlobalTransactionalpublicSubmitOrderResponseVOsubmitOrder(SubmitOrderDTO dto){//... 创建订单,锁定库存业务 //发送消息到order-event-exchange交换机 30分钟后从order.release.order.queue 队列中取消息,判断状态,是否需要关单this.releaseOrder(vo.getOrder());return vo;}privatevoidreleaseOrder(OrderEntity order){
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order", order);}@OverridepublicvoiddoReleaseOrder(OrderEntity order){Integer status = order.getStatus();//需要关单if(Objects.equals(status,OrderStatusEnum.CREATE_NEW.getCode())){OrderEntity orderEntityForUpd =newOrderEntity();
orderEntityForUpd.setId(order.getId());
orderEntityForUpd.setStatus(OrderStatusEnum.CANCLED.getCode());updateById(orderEntityForUpd);//order-event-exchange发消息到 stock.release.order.queue 库存服务监听//为了防止一种极端情况,就是订单关单由于系统卡顿,一直无法进行关单,库存消息优先到期,判断状态一直解锁不了库存//所以在关单之后主动给库存服务发一个消息进行兜底
rabbitTemplate.convertAndSend("order-event-exchange","order.release.other", order);}}
这里写一个类专门去监听消息:
@Slf4j@Component@RabbitListener(queues ="order.release.order.queue")publicclassMyRabbitListener{@AutowiredprivateOrderService orderService;@RabbitHandlerpublicvoidreleaseOrder(OrderEntity order,Message message,Channel channel)throwsIOException{
log.info("接收到关单请求,订单号:{}",order.getOrderSn());try{
orderService.doReleaseOrder(order);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
库存服务中,进行处理:
/**
* 监听 关单后 主动发送给队列的消息
* @param order
* @param message
* @param channel
* @throws IOException
*/@RabbitHandlerpublicvoidstockRelease(OrderEntity order,Message message,Channel channel)throwsIOException{
log.info("订单已关闭,准备主动解锁库存");try{//再次判断工作单详情表的状态,筛选出是1的订单进行关单String orderSn = order.getOrderSn();WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskDao.selectOne(newQueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));Long id = wareOrderTaskEntity.getId();List<WareOrderTaskDetailEntity> wareOrderTaskDetailEntities = wareOrderTaskDetailDao.selectList(newQueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id));List<WareOrderTaskDetailEntity> collect = wareOrderTaskDetailEntities.stream().filter(wareOrderTaskDetailEntity -> wareOrderTaskDetailEntity.getLockStatus()==1).collect(Collectors.toList());if(!CollectionUtils.isEmpty(collect)){for(WareOrderTaskDetailEntity wareOrderTaskDetailEntity : collect){//解锁doStockRelease(wareOrderTaskDetailEntity.getWareId(), wareOrderTaskDetailEntity.getSkuId(), wareOrderTaskDetailEntity.getSkuNum(),id);}}//手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){//重新放回队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}
3、消息丢失、积压、重复消费解决方案
3.1、消息丢失
关于消息丢失,在RabbitMQ基础篇中已经提到,可以通过回调函数,以及手动ACK的方式解决。除此之外,在进行业务代码编写时,还需要加入重试机制,利用try-catch捕获异常,拒绝签收并重新放回队列。但是不应该无限重试。合理的做法是创建一个消息日志表,记录消息的信息和状态,并且定期扫描数据库对未发送成功的消息进行重发。
3.2、消息重复
可能会存在这样一种情况,消息已经成功消费,并执行了业务逻辑,在手动ACK的时候系统宕机或出现了异常(执行业务代码和手动ACK不是原子性操作,分为了两步。),这时消息会重新回到ready状态,重新投递。这样就类似于重复提交,可以在业务代码中加上状态位条件校验,或者每条消息附带一个唯一标识(messageId),在处理前检查该消息是否已经处理过,避免重复操作。
3.3、消息积压
RabbitMQ队列中的消息堆积过多,消费者无法及时处理,导致系统性能下降或消息延迟。如果业务允许,可以对消息进行批量处理(Batch Processing),一次消费多条消息,减少每条消息的处理开销。同时对于不重要或时效性强的消息,如果积压严重,可以设置消息的TTL或使用死信队列,将过期或处理失败的消息转移或丢弃。也可以增加消费者实例来并行处理消息,缓解单个消费者处理慢的问题,或对于耗时较长的操作,可以将其改为异步处理,快速从队列中取出消息,避免队列积压。
下一篇:秒杀服务。
版权归原作者 西岭千秋雪_ 所有, 如有侵权,请联系我们删除。