0


谷粒商城13——订单模块完成、分布式事务应用、RabbitMQ、Seata、电商项目订单场景的分析完善

文章目录

十二、订单模块

修改前端页面路径,将静态资源放到 nginx

1.整合Spring Session

导包:

<!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 整合spring session--><dependency><groupId>org.springframework.session</groupId><artifactId>spring-session-data-redis</artifactId></dependency>

配置文件:

spring.redis.host=192.168.137.128
spring.redis.port=6379
# session 存储方式
spring.session.store-type=redis
# session 过期时间
server.servlet.session.timeout=30m
# Spring Session 的刷新模式,
# spring.session.redis.flush-mode=on_save 
# 命名空间 (默认 ‘spring:session ’)
# spring.session.redis.namespace=spring:session 

配置类:

@ConfigurationpublicclassMySessionConfig{@BeanpublicCookieSerializercookieSerializer(){DefaultCookieSerializer cookieSerializer =newDefaultCookieSerializer();

        cookieSerializer.setDomainName("gulimall.com");//父域
        cookieSerializer.setCookieName("GULISESSION");//cookie namereturn cookieSerializer;}@BeanpublicRedisSerializer<Object>springSessionDefaultRedisSerializer(){returnnewGenericJackson2JsonRedisSerializer();}}

2.订单信息

电商系统涉及到 3 流,分别时信息流,资金流,物流,而订单系统作为中枢将三者有机的集 合起来。

订单模块是电商系统的枢纽,在订单这个环节上需求获取多个模块的数据和信息,同时对这 些信息进行加工处理后流向下个环节,这一系列就构成了订单的信息流通。

image-20220814082558608

  • 用户信息用户信息包括用户账号、用户等级、用户的收货地址、收货人、收货人电话等组成,用户账 户需要绑定手机号码,但是用户绑定的手机号码不一定是收货信息上的电话。用户可以添加 多个收货信息,用户等级信息可以用来和促销系统进行匹配,获取商品折扣,同时用户等级 还可以获取积分的奖励等
  • 订单基础信息订单基础信息是订单流转的核心,其包括订单类型、父/子订单、订单编号、订单状态、订 单流转的时间等。(1)订单类型包括实体商品订单和虚拟订单商品等,这个根据商城商品和服务类型进行区 分。(2)同时订单都需要做父子订单处理,之前在初创公司一直只有一个订单,没有做父子订 单处理后期需要进行拆单的时候就比较麻烦,尤其是多商户商场,和不同仓库商品的时候, 父子订单就是为后期做拆单准备的。(3)订单编号不多说了,需要强调的一点是父子订单都需要有订单编号,需要完善的时候 可以对订单编号的每个字段进行统一定义和诠释。(4)订单状态记录订单每次流转过程,后面会对订单状态进行单独的说明。(5)订单流转时间需要记录下单时间,支付时间,发货时间,结束时间/关闭时间等等
  • 商品信息商品信息从商品库中获取商品的 SKU 信息、图片、名称、属性规格、商品单价、商户信息 等,从用户下单行为记录的用户下单数量,商品合计价格等。
  • 优惠信息优惠信息记录用户参与的优惠活动,包括优惠促销活动,比如满减、满赠、秒杀等,用户使 用的优惠券信息,优惠券满足条件的优惠券需要默认展示出来,具体方式已在之前的优惠券 篇章做过详细介绍,另外还虚拟币抵扣信息等进行记录。 为什么把优惠信息单独拿出来而不放在支付信息里面呢? 因为优惠信息只是记录用户使用的条目,而支付信息需要加入数据进行计算,所以做为区分。
  • 支付信息(1)支付流水单号,这个流水单号是在唤起网关支付后支付通道返回给电商业务平台的支 付流水号,财务通过订单号和流水单号与支付通道进行对账使用。(2)支付方式用户使用的支付方式,比如微信支付、支付宝支付、钱包支付、快捷支付等。 支付方式有时候可能有两个——余额支付+第三方支付。(3)商品总金额,每个商品加总后的金额;运费,物流产生的费用;优惠总金额,包括促 销活动的优惠金额,优惠券优惠金额,虚拟积分或者虚拟币抵扣的金额,会员折扣的金额等 之和;实付金额,用户实际需要付款的金额。 用户实付金额=商品总金额+运费-优惠总金额
  • 物流信息 物流信息包括配送方式,物流公司,物流单号,物流状态,物流状态可以通过第三方接口来 获取和向用户展示物流每个状态节点。

3.订单状态

  1. 待付款用户提交订单后,订单进行预下单,目前主流电商网站都会唤起支付,便于用户快速完成支 付,需要注意的是待付款状态下可以对库存进行锁定,锁定库存需要配置支付超时时间,超 时后将自动取消订单,订单变更关闭状态。
  2. 已付款/待发货用户完成订单支付,订单系统需要记录支付时间,支付流水单号便于对账,订单下放到 WMS 系统,仓库进行调拨,配货,分拣,出库等操作。
  3. 待收货/已发货仓储将商品出库后,订单进入物流环节,订单系统需要同步物流信息,便于用户实时知悉物 品物流状态
  4. 已完成用户确认收货后,订单交易完成。后续支付侧进行结算,如果订单存在问题进入售后状态
  5. 已取消付款之前取消订单。包括超时未付款或用户商户取消订单都会产生这种订单状态。
  6. 售后中用户在付款后申请退款,或商家发货后用户申请退换货。

4.订单流程

订单流程是指从订单产生到完成整个流转的过程,从而行程了一套标准流程规则。而不同的 产品类型或业务类型在系统中的流程会千差万别,比如上面提到的线上实物订单和虚拟订单 的流程,线上实物订单与 O2O 订单等,所以需要根据不同的类型进行构建订单流程。

不管类型如何订单都包括正向流程和逆向流程,对应的场景就是购买商品和退换货流程,正 向流程就是一个正常的网购步骤:订单生成–>支付订单–>卖家发货–>确认收货–>交易成功。 而每个步骤的背后,订单是如何在多系统之间交互流转的,可概括如下图

image-20220814083709833

  1. 订单创建与支付(1) 、订单创建前需要预览订单,选择收货信息等 (2) 、订单创建需要锁定库存,库存有才可创建,否则不能创建 (3) 、订单创建后超时未支付需要解锁库存 (4) 、支付成功后,需要进行拆单,根据商品打包方式,所在仓库,物流等进行拆单 (5) 、支付的每笔流水都需要记录,以待查账 (6) 、订单创建,支付成功等状态都需要给 MQ 发送消息,方便其他系统感知订阅
  2. 逆向流程(1) 、修改订单,用户没有提交订单,可以对订单一些信息进行修改,比如配送信息, 优惠信息,及其他一些订单可修改范围的内容,此时只需对数据进行变更即可。(2) 、订单取消,用户主动取消订单和用户超时未支付,两种情况下订单都会取消订 单,而超时情况是系统自动关闭订单,所以在订单支付的响应机制上面要做支付的 限时处理,尤其是在前面说的下单减库存的情形下面,可以保证快速的释放库存。 另外需要需要处理的是促销优惠中使用的优惠券,权益等视平台规则,进行相应补 回给用户。(3) 、退款,在待发货订单状态下取消订单时,分为缺货退款和用户申请退款。如果是 全部退款则订单更新为关闭状态,若只是做部分退款则订单仍需进行进行,同时生 成一条退款的售后订单,走退款流程。退款金额需原路返回用户的账户。(4) 、发货后的退款,发生在仓储货物配送,在配送过程中商品遗失,用户拒收,用户 收货后对商品不满意,这样情况下用户发起退款的售后诉求后,需要商户进行退款 的审核,双方达成一致后,系统更新退款状态,对订单进行退款操作,金额原路返 回用户的账户,同时关闭原订单数据。仅退款情况下暂不考虑仓库系统变化。如果 发生双方协调不一致情况下,可以申请平台客服介入。在退款订单商户不处理的情 况下,系统需要做限期判断,比如 5 天商户不处理,退款单自动变更同意退款

5.登录拦截

@ComponentpublicclassLoginUserInterceptorimplementsHandlerInterceptor{publicstaticThreadLocal<MemberResponseTo> loginUser =newThreadLocal<>();@OverridepublicbooleanpreHandle(HttpServletRequest request,HttpServletResponse response,Object handler)throwsException{MemberResponseTo attribute =(MemberResponseTo) request.getSession().getAttribute(AuthServerConstant.LOGIN_USER);if(attribute ==null){
            request.getSession().setAttribute("msg","请先进行登录!");
            response.sendRedirect("http:auth.gulimall.com/login.html");returnfalse;}else{
            loginUser.set(attribute);returntrue;}}}
@ConfigurationpublicclassOrderWebConfigurationimplementsWebMvcConfigurer{@AutowiredLoginUserInterceptor loginUserInterceptor;@OverridepublicvoidaddInterceptors(InterceptorRegistry registry){
        registry.addInterceptor(loginUserInterceptor).addPathPatterns("/**");}}

6.订单确认页

@OverridepublicOrderConfirmVoconfirmOrder()throwsExecutionException,InterruptedException{OrderConfirmVo confirmVo =newOrderConfirmVo();MemberResponseTo memberResponseTo =LoginUserInterceptor.loginUser.get();CompletableFuture<Void> getAddressFuture =CompletableFuture.runAsync(()->{//1.远程查询所有的收货地址列表List<MemberAddressVo> address = memberFeignService.getAddress(memberResponseTo.getId());
        confirmVo.setMemberAddressVos(address);}, executor);CompletableFuture<Void> getCartFuture =CompletableFuture.runAsync(()->{//2.远程查询购物车所有选中的购物项List<OrderItemVo> itemVos = cartFeignService.getCurrentUserCartItems(memberResponseTo.getId());
        confirmVo.setItems(itemVos);}, executor).thenRunAsync(()->{List<OrderItemVo> items = confirmVo.getItems();List<Long> collect = items.stream().map(item -> item.getSkuId()).collect(Collectors.toList());List<SkuHasStockVo> skuHasStockVos = wmsFeignService.hasStock(collect);if(skuHasStockVos !=null){Map<Long,Boolean> map = skuHasStockVos.stream().collect(Collectors.toMap(SkuHasStockVo::getSkuId,SkuHasStockVo::isHasStock));
            confirmVo.setStocks(map);}},executor);//3.查询用户积分Integer integration = memberResponseTo.getIntegration();
    confirmVo.setIntegration(integration);//4.其他数据自动计算//TODO 5.防重令牌CompletableFuture.allOf(getAddressFuture,getCartFuture).get();return confirmVo;}

image-20220814164031740

7.接口幂等性

为了防止提交订单的按钮点击多次并且成功提交到数据库,需要包装

接口幂等性

7.1 幂等性概念

概念:

  • 接口幂等性就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因 为多次点击而产生了副作用;
  • 比如说支付场景,用户购买了商品支付扣款成功,但是返回结 果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结 果成功,用户查询余额返发现多扣钱了,流水记录也变成了两条...,这就没有保证接口 的幂等性。

场景:

  • 用户多次点击按钮
  • 用户页面回退再次提交
  • 微服务互相调用,由于网络问题,导致请求失败
  • feign 触发重试机制 其他业务情况

幂等性以sql 为例,有些操作是天然幂等的:

  • SELECT * FROM table WHER id=?,无论执行多少次都不会改变状态,是天然的幂等。
  • UPDATE tab1 SET col1=1 WHERE col2=2,无论执行成功多少次状态都是一致的,也是幂等操作。
  • delete from user where userid=1,多次操作,结果一样,具备幂等性
  • insert into user(userid,name) values(1,‘a’) 如 userid 为唯一主键,即重复操作上面的业务,只 会插入一条用户数据,具备幂等性。
  • UPDATE tab1 SET col1=col1+1 WHERE col2=2,每次执行的结果都会发生变化,不是幂等的。
  • insert into user(userid,name) values(1,‘a’) 如 userid 不是主键,可以重复,那上面业务多次操 作,数据都会新增多条,不具备幂等性。

7.2 幂等性解决方案

7.2.1 token机制

获取token --》对比token—》删除token

  1. 服务端提供了发送 token 的接口。我们在分析业务的时候,哪些业务是存在幂等问题的, 就必须在执行业务前,先去获取 token,服务器会把 token 保存到 redis 中。
  2. 然后调用业务接口请求时,把 token 携带过去,一般放在请求头部。
  3. 服务器判断 token 是否存在 redis 中,存在表示第一次请求,然后删除 token,继续执行业务。
  4. 如果判断 token 不存在 redis 中,就表示是重复操作,直接返回重复标记给 client,这样 就保证了业务代码,不被重复执行。

危险性:

  1. 先删除 token 还是后删除 token;1. 先删除可能导致,业务确实没有执行,重试还带上之前 token,由于防重设计导致, 请求还是不能执行。2. 后删除可能导致,业务处理成功,但是服务闪断,出现超时,没有删除 token,别 人继续重试,导致业务被执行两边3. 我们最好设计为先删除 token,如果业务调用失败,就重新获取 token 再次请求。
  2. Token 获取、比较和删除必须是原子性1. redis.get(token) 、token.equals、redis.del(token)如果这两个操作不是原子,可能导 致,高并发下,都 get 到同样的数据,判断都成功,继续业务并发执行2. 可以在 redis 使用 lua 脚本完成这个操作if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 e

image-20220814184638877

1.确认订单时添加防重令牌

进入订单确认页就生成令牌

publicclassOrderConstant{publicstaticfinalString USER_ORDER_TOKEN_PREFIX ="order:token:";}
@OverridepublicOrderConfirmVoconfirmOrder()throwsExecutionException,InterruptedException{.......//TODO 5.防重令牌String token = UUID.randomUUID().toString().replace("-","");
    redisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX+memberResponseTo.getId(),token,30,TimeUnit.MINUTES);
    confirmVo.setOrderToken(token);CompletableFuture.allOf(getAddressFuture,getCartFuture).get();.......return confirmVo;}

将后端生成的令牌回显给前端提交订单按钮,并以表单的形式提交订单相关数据

<formaction="http://order.gulimall.com/submitOrder"method="post"><inputid="addrIdInput"type="hidden"name="addrId"><inputid="payPriceInput"type="hidden"name="payPrice"><inputtype="hidden"name="orderToken"th:value="${orderConfirmData.orderToken}"><buttonclass="tijiao"type="submit">提交订单</button></form>
2.确认订单,给前端确认订单页面返回数据
@OverridepublicOrderConfirmVoconfirmOrder()throwsExecutionException,InterruptedException{OrderConfirmVo confirmVo =newOrderConfirmVo();MemberResponseTo memberResponseTo =LoginUserInterceptor.loginUser.get();CompletableFuture<Void> getAddressFuture =CompletableFuture.runAsync(()->{//1.远程查询所有的收货地址列表List<MemberAddressVo> address = memberFeignService.getAddress(memberResponseTo.getId());
        confirmVo.setMemberAddressVos(address);}, executor);CompletableFuture<Void> getCartFuture =CompletableFuture.runAsync(()->{//2.远程查询购物车所有选中的购物项List<OrderItemVo> itemVos = cartFeignService.getCurrentUserCartItems(memberResponseTo.getId());
        confirmVo.setItems(itemVos);}, executor).thenRunAsync(()->{List<OrderItemVo> items = confirmVo.getItems();List<Long> collect = items.stream().map(item -> item.getSkuId()).collect(Collectors.toList());List<SkuHasStockVo> skuHasStockVos = wmsFeignService.hasStock(collect);if(skuHasStockVos !=null){Map<Long,Boolean> map = skuHasStockVos.stream().collect(Collectors.toMap(SkuHasStockVo::getSkuId,SkuHasStockVo::isHasStock));
            confirmVo.setStocks(map);}},executor);//3.查询用户积分Integer integration = memberResponseTo.getIntegration();
    confirmVo.setIntegration(integration);//4.其他数据自动计算//TODO 5.防重令牌String token = UUID.randomUUID().toString().replace("-","");
    redisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX+memberResponseTo.getId(),token,30,TimeUnit.MINUTES);
    confirmVo.setOrderToken(token);CompletableFuture.allOf(getAddressFuture,getCartFuture).get();return confirmVo;}
3.提交订单创建订单

对应controller:

/**
 * 下单功能
 * @param vo
 * @return
 */@PostMapping("/submitOrder")publicStringsubmitOrder(OrderSubmitVo vo,Model model,RedirectAttributes redirectAttributes){try{SubmitOrderResponseVo submitOrderResponseVo = orderService.submitOrder(vo);Integer code = submitOrderResponseVo.getCode();if(code ==0){//下单成功来到支付选中页
            model.addAttribute("submitOrderResp",submitOrderResponseVo);return"pay";}else{//下单失败回到订单确认页重新确认订单String msg ="下单失败:";switch(submitOrderResponseVo.getCode()){case1:
                    msg +="订单信息过期,请刷新再次提交!";break;case2:
                    msg +="订单商品价格发生变化,青确认后再次提交!";break;}
            redirectAttributes.addFlashAttribute("msg",msg);return"redirect:http://order.gulimall.com/toTrade";}}catch(Exception e){if(e instanceofNoStockException){String msg ="下单失败,商品无库存!";
            redirectAttributes.addFlashAttribute("msg",msg);}return"redirect:http://order.gulimall.com/toTrade";}}

提交订单:

是一个事务,远程调用仓库服务的锁库存失败时,抛出异常、回滚事务,数据库中的订单删除

@Transactional@OverridepublicSubmitOrderResponseVosubmitOrder(OrderSubmitVo vo){
    submitVoThreadLocal.set(vo);SubmitOrderResponseVo responseVo =newSubmitOrderResponseVo();MemberResponseTo memberResponseTo =LoginUserInterceptor.loginUser.get();
    responseVo.setCode(0);//1.验证令牌[令牌的对比和删除必须具备原子性]  使用lua脚本删除//脚本返回 0:令牌校验失败 1:令牌校验成功String orderToken = vo.getOrderToken();String script ="if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Long result =(Long) redisTemplate.execute(newDefaultRedisScript<Long>(script,Long.class),Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseTo.getId()), orderToken);if(result ==0L){//验证失败
        responseVo.setCode(1);//1:令牌验证失败return responseVo;}else{//验证成功//创建订单、验令牌、验价格、锁库存//1.创建订单、订单项等信息OrderCreateTo order =createOrder();//2.验价BigDecimal payAmount = order.getOrder().getPayAmount();BigDecimal payPrice = vo.getPayPrice();if(Math.abs(payAmount.subtract(payPrice).doubleValue())<0.01){//金额对比成功//3.保存订单saveOrder(order);//4.库存锁定 只要有异常回滚订单数据//订单号、所有订单项(skuId、skuName、num)//锁库存WareSkuLockVo lockVo =newWareSkuLockVo();
            lockVo.setOrderSn(order.getOrder().getOrderSn());List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map(item ->{OrderItemVo orderItemVo =newOrderItemVo();
                orderItemVo.setSkuId(item.getSkuId());
                orderItemVo.setCount(item.getSkuQuantity());
                orderItemVo.setTitle(item.getSkuName());return orderItemVo;}).collect(Collectors.toList());
            lockVo.setLocks(orderItemVos);//远程锁库存R r = wmsFeignService.orderLockStock(lockVo);if(r.getCode()==0){//锁成功
                responseVo.setCode(0);
                responseVo.setOrder(order.getOrder());return responseVo;}else{//失败//5.1 锁定库存失败String msg =(String) r.get("msg");thrownewNoStockException(msg);}}else{
            responseVo.setCode(2);//2:验价错误return responseVo;}}}

保存订单的方法:

/**
 * 保存订单
 * @param order
 */privatevoidsaveOrder(OrderCreateTo order){OrderEntity orderEntity = order.getOrder();
    orderEntity.setModifyTime(newDate());this.save(orderEntity);List<OrderItemEntity> orderItems = order.getOrderItems();
    orderItemService.saveBatch(orderItems);}/**
 * 创建订单
 * @return
 */privateOrderCreateTocreateOrder(){OrderCreateTo orderCreateTo =newOrderCreateTo();//1.生成订单号String orderSn =IdWorker.getTimeId();OrderEntity entity =buildOrder(orderSn);//订单的一些信息//2.获取所有的订单项List<OrderItemEntity> orderItemEntities =buildOrderItems(orderSn);//3.计算价格相关computePrice(entity,orderItemEntities);

    orderCreateTo.setOrder(entity);
    orderCreateTo.setOrderItems(orderItemEntities);return orderCreateTo;}privatevoidcomputePrice(OrderEntity entity,List<OrderItemEntity> orderItemEntities){BigDecimal total =newBigDecimal("0.0");BigDecimal coupon =newBigDecimal("0.0");//优惠券BigDecimal integration =newBigDecimal("0.0");//积分BigDecimal promotion =newBigDecimal("0.0");//打折优惠BigDecimal gift =newBigDecimal("0.0");BigDecimal growth =newBigDecimal("0.0");//订单的总额叠加每一个订单的总额信息for(OrderItemEntity orderItemEntity : orderItemEntities){BigDecimal realAmount = orderItemEntity.getRealAmount();
        promotion = promotion.add(orderItemEntity.getPromotionAmount());
        coupon = coupon.add(orderItemEntity.getCouponAmount());
        integration = integration.add(orderItemEntity.getIntegrationAmount());
        gift = gift.add(newBigDecimal(orderItemEntity.getGiftIntegration().toString()));
        growth = growth.add(newBigDecimal(orderItemEntity.getGiftGrowth().toString()));

        total = total.add(realAmount);}//1.订单价格相关 应付价格=订单总额+运费
    entity.setTotalAmount(total);//应付价格
    entity.setPayAmount(total.add(entity.getFreightAmount()));
    entity.setPromotionAmount(promotion);
    entity.setIntegrationAmount(integration);
    entity.setCouponAmount(coupon);//设置积分等信息
    entity.setIntegration(gift.intValue());
    entity.setGrowth(growth.intValue());
    entity.setDeleteStatus(0);//未删除}/**
 * 订单的一些信息
 * @param orderSn
 * @return
 */privateOrderEntitybuildOrder(String orderSn){OrderEntity entity =newOrderEntity();
    entity.setOrderSn(orderSn);
    entity.setMemberId(LoginUserInterceptor.loginUser.get().getId());//获取收货地址信息OrderSubmitVo orderSubmitVo = submitVoThreadLocal.get();FareVo fareResp = wmsFeignService.getFare(orderSubmitVo.getAddrId());//设置收获信息
    entity.setFreightAmount(fareResp.getFare());
    entity.setReceiverCity(fareResp.getAddress().getCity());
    entity.setReceiverDetailAddress(fareResp.getAddress().getDetailAddress());
    entity.setReceiverName(fareResp.getAddress().getName());
    entity.setReceiverPhone(fareResp.getAddress().getPhone());
    entity.setReceiverPostCode(fareResp.getAddress().getPostCode());
    entity.setReceiverProvince(fareResp.getAddress().getProvince());
    entity.setReceiverRegion(fareResp.getAddress().getRegion());//设置订单相关状态信息
    entity.setStatus(OrderStatusEnum.CREATE_NEW.getCode());return entity;}/**
 * 构建所有的订单项
 * @return
 */privateList<OrderItemEntity>buildOrderItems(String orderSn){//最后确定每个购物项的价格MemberResponseTo memberResponseTo =LoginUserInterceptor.loginUser.get();//获取当前购物车选中的商品List<OrderItemVo> currentUserCartItems = cartFeignService.getCurrentUserCartItems(memberResponseTo.getId());if(currentUserCartItems !=null&& currentUserCartItems.size()>0){//对选中的商品遍历封装List<OrderItemEntity> collect = currentUserCartItems.stream().map(cartItem ->{OrderItemEntity itemEntity =buildOrderItem(cartItem);
            itemEntity.setOrderSn(orderSn);return itemEntity;}).collect(Collectors.toList());return collect;}returnnull;}/**
 * 构建某一个订单项
 * @param cartItem
 * @return
 */privateOrderItemEntitybuildOrderItem(OrderItemVo cartItem){OrderItemEntity itemEntity =newOrderItemEntity();//1.订单信息【buildOrderItems已做】//2.商品的spu信息Long skuId = cartItem.getSkuId();R r = productFeignService.getSpuInfoBySkuId(skuId);SpuInfoVo spuInfo = r.getData(newTypeReference<SpuInfoVo>(){});
    itemEntity.setSpuId(spuInfo.getId());
    itemEntity.setSpuBrand(spuInfo.getBrandId().toString());
    itemEntity.setSpuName(spuInfo.getSpuName());
    itemEntity.setCategoryId(spuInfo.getCatalogId());//3.商品的sku信息
    itemEntity.setSkuId(cartItem.getSkuId());
    itemEntity.setSkuName(cartItem.getTitle());
    itemEntity.setSkuPic(cartItem.getImage());
    itemEntity.setSkuPrice(cartItem.getPrice());//将一个集合转换成字符串,元素之间用“;“隔开String skuAttr =StringUtils.collectionToDelimitedString(cartItem.getSkuAttrValues(),";");
    itemEntity.setSkuAttrsVals(skuAttr);
    itemEntity.setSkuQuantity(cartItem.getCount());//4.优惠信息【不做】//5.积分信息
    itemEntity.setGiftGrowth(cartItem.getPrice().multiply(newBigDecimal(cartItem.getCount().toString())).intValue());
    itemEntity.setGiftIntegration(cartItem.getPrice().multiply(newBigDecimal(cartItem.getCount().toString())).intValue());//6.订单项的价格信息
    itemEntity.setPromotionAmount(newBigDecimal("0"));
    itemEntity.setCouponAmount(newBigDecimal("0"));
    itemEntity.setIntegrationAmount(newBigDecimal("0"));//不算优惠的价格BigDecimal orign = itemEntity.getSkuPrice().multiply(newBigDecimal(itemEntity.getSkuQuantity()));//优惠后的价格BigDecimal subtract = orign.subtract(itemEntity.getPromotionAmount()).subtract(itemEntity.getCouponAmount()).subtract(itemEntity.getIntegrationAmount());
    itemEntity.setRealAmount(subtract);return itemEntity;}
4.锁库存
  • 当订单创建成功,进行锁库存,防止支付时,没有库存导致支付失败
  • 查询仓库库存,修改锁库存数
  • 远程调用仓库服务,进行锁库存
  • 当库存不足的时,抛出异常,事务回滚

写一个内部类,查询商品在哪个仓库有库存

调用接口:

/**
 *
 * @return
 */@PostMapping("/lock/order")publicRorderLockStock(@RequestBodyWareSkuLockVo vo){try{Boolean lockStockResultVos = wareSkuService.orderLockStock(vo);returnR.ok();}catch(NoStockException e){returnR.error(BizCodeEnume.NO_STOCK_EXCEPTION.getCode(),BizCodeEnume.NO_STOCK_EXCEPTION.getMsg());}}
@DataclassSkuWareHasStock{privateLong skuId;privateInteger num;//锁多少件privateList<Long> wareIds;}/**
 * 为某个订单锁定库存
 * @param vo
 * @return
 */@Transactional@OverridepublicBooleanorderLockStock(WareSkuLockVo vo){//1.找到每个商品在哪个仓库有库存List<OrderItemVo> locks = vo.getLocks();List<SkuWareHasStock> collect = locks.stream().map(item ->{SkuWareHasStock stock =newSkuWareHasStock();Long skuId = item.getSkuId();
        stock.setSkuId(skuId);
        stock.setNum(item.getCount());//查询这个商品在哪个仓库有库存List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(skuId);
        stock.setWareIds(wareIds);return stock;}).collect(Collectors.toList());//2.锁定库存for(SkuWareHasStock stock : collect){Boolean skuLock =false;Long skuId = stock.getSkuId();List<Long> wareIds = stock.getWareIds();if(wareIds.size()==0|| wareIds ==null){thrownewNoStockException(skuId);}for(Long wareId : wareIds){//成功返回1,否则就是0Long count = wareSkuDao.lockSkuStock(skuId,wareId,stock.getNum());if(count ==1){//锁成功了
                skuLock =true;break;}//锁失败了 继续下一个仓库}//只要有一件商品锁失败了,订单就锁失败  【抛出异常 回滚事务】if(!skuLock){thrownewNoStockException(skuId);}}returntrue;}
  • 用总库存减去锁定的库存判断仓库库存是否还有
  • 同一个商品可以在多个仓库有库存

image-20220816163354418

查询仓库库存的sql:

<selectid="listWareIdHasSkuStock"resultType="java.lang.Long">
    SELECT ware_id FROM `wms_ware_sku` WHERE sku_id=#{skuId} AND stock-stock_locked>0
</select>

锁库存的sql:

<updateid="lockSkuStock">
    UPDATE `wms_ware_sku` SET stock_locked = stock_locked+#{num}
    WHERE sku_id=#{skuId} AND ware_id=#{wareId} AND stock-stock_locked>=#{num}
</update>

7.2.2 各种锁机制

  1. 数据库悲观锁 - ​ select * from xxxx where id = 1 for update; - 悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,需要根据实际情况选用。 另外要注意的是,id 字段一定是主键或者唯一索引,不然可能造成锁表的结果,处理起来会 非常麻烦。
  2. 数据库乐观锁 - update t_goods set count = count -1 , version = version + 1 where good_id=2 and version = 1 - 这种方法适合在更新的场景中, 根据 version 版本,也就是在操作库存前先获取当前商品的 version 版本号,然后操作的时候 带上此 version 号。我们梳理下,我们第一次操作库存时,得到 version 为 1,调用库存服务 version 变成了 2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订 单服务传如的 version 还是 1,再执行上面的 sql 语句时,就不会执行;因为 version 已经变 为 2 了,where 条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。- 乐观锁主要使用于处理读多写少的问题
  3. 业务层分布式锁 - 如果多个机器可能在同一时间同时处理相同的数据,比如多台机器定时任务都拿到了相同数 据处理,我们就可以加分布式锁,锁定此数据,处理完成后释放锁。获取到锁的必须先判断 这个数据是否被处理过

7.2.3 各种唯一约束

  1. 数据库唯一约束 - 插入数据,应该按照唯一索引进行插入,比如订单号,相同的订单就不可能有两条记录插入。- 我们在数据库层面防止重复。 这个机制是利用了数据库的主键唯一约束的特性,解决了在 insert 场景时幂等问题。但主键 的要求不是自增的主键,这样就需要业务生成全局唯一的主键。- 如果是分库分表场景下,路由规则要保证相同请求下,落地在同一个数据库和同一表中,要 不然数据库主键约束就不起效果了,因为是不同的数据库和表主键不相关。
  2. redis set 防重 - 很多数据需要处理,只能被处理一次,比如我们可以计算数据的 MD5 将其放入 redis 的 set, 每次处理数据,先看这个 MD5 是否已经存在,存在就不处理。
  3. 防重表 - 使用订单号 orderNo 做为去重表的唯一索引,把唯一索引插入去重表,再进行业务操作,且 他们在同一个事务中。- 这个保证了重复请求时,因为去重表有唯一约束,导致请求失败,避 免了幂等问题。- 这里要注意的是,去重表和业务表应该在同一库中,这样就保证了在同一个 事务,即使业务操作失败了,也会把去重表的数据回滚。这个很好的保证了数据一致性。 之前说的 redis 防重也算
  4. 全局请求唯一 id - 调用接口时,生成一个唯一 id,redis 将数据保存到集合中(去重),存在即处理过。 可以使用 nginx 设置每一个请求的唯一 id; proxy_set_header X-Request-Id $request_id

十三、分布式事务

前面使用本地事务,抛出异常回滚事务。

但是在多个微服务相互调用的情况下,本地事务很容易产生下面这种情况:

  • 远程调用的微服务执行成功了,但是由于网络等原因导致返回结果超时,没有返回数据报失败异常 或 报超时异常而导致当前事务回滚
  • 同时远程调用的微服务执行成功,没有事务回滚,那么数据库中的数据是错乱的

又如果:

  • 订单服务一个业务同时调用库存服务和积分服务,先执行的库存服务执行成功了
  • 但是积分服务执行失败了,那么积分服务和订单服务的业务是可以回滚的,但库存服务的业务就无法回滚了

因此本地事务只能控制本地方法的调用,对于远程调用,因此要求统一的分布式事务管理

image-20220816223644029

1.本地事务

  1. 事务的基本性质数据库事务的几个特性:原子性(Atomicity )、一致性( Consistency )、隔离性或独立性( Isolation) 和持久性(Durabilily),简称就是 ACID- 原子性:一系列的操作整体不可拆分,要么同时成功,要么同时失败- 一致性:数据在事务的前后,业务整体一致。 转账。A:1000;B:1000; 转 200 事务成功; A:800 B:1200- 隔离性:事务之间互相隔离。- 持久性:一旦事务成功,数据一定会落盘在数据库。 在以往的单体应用中,我们多个业务操作使用同一条连接操作不同的数据表,一旦有异常, 我们可以很容易的整体回滚; Business:我们具体的业务代码 Storage:库存业务代码;扣库存 Order:订单业务代码;保存订单 Account:账号业务代码;减账户余额 比如买东西业务,扣库存,下订单,账户扣款,是一个整体;必须同时成功或者失败 一个事务开始,代表以下的所有操作都在同一个连接里面;
  2. 事务的隔离级别- READ UNCOMMITTED(读未提交) 该隔离级别的事务会读到其它未提交事务的数据,此现象也称之为脏读。- READ COMMITTED(读提交) 一个事务可以读取另一个已提交的事务,多次读取会造成不一样的结果,此现象称为不可重 复读问题,Oracle 和 SQL Server 的默认隔离级别。- REPEATABLE READ(可重复读) 该隔离级别是 MySQL 默认的隔离级别,在同一个事务里,select 的结果是事务开始时时间 点的状态,因此,同样的 select 操作读到的结果会是一致的,但是,会有幻读现象。MySQL 的 InnoDB 引擎可以通过 next-key locks 机制(参考下文"行锁的算法"一节)来避免幻读。- SERIALIZABLE(序列化) 在该隔离级别下事务都是串行顺序执行的,MySQL 数据库的 InnoDB 引擎会给读操作隐式 加一把读共享锁,从而避免了脏读、不可重读复读和幻读问题。
  3. 事务的传播行为- PROPAGATION_REQUIRED:如果当前没有事务,就创建一个新事务,如果当前存在事务, 就加入该事务,该设置是最常用的设置。- PROPAGATION_SUPPORTS:支持当前事务,如果当前存在事务,就加入该事务,如果当 前不存在事务,就以非事务执行。- PROPAGATION_MANDATORY:支持当前事务,如果当前存在事务,就加入该事务,如果 当前不存在事务,就抛出异常。- PROPAGATION_REQUIRES_NEW:创建新事务,无论当前存不存在事务,都创建新事务。- PROPAGATION_NOT_SUPPORTED:以非事务方式执行操作,如果当前存在事务,就把当 前事务挂起。- PROPAGATION_NEVER:以非事务方式执行,如果当前存在事务,则抛出异常。- PROPAGATION_NESTED:如果当前存在事务,则在嵌套事务内执行。如果当前没有事务, 则执行与 PROPAGATION_REQUIRED 类似的操作。
  4. SpringBoot 事务关键点- 事务的自动配置 TransactionAutoConfiguration- 事务的坑 在同一个类里面,编写两个方法,内部调用的时候,会导致事务设置失效。原因是没有用到 代理对象的缘故。 - 解决:- 0)、导入 spring-boot-starter-aop- 1)、@EnableTransactionManagement(proxyTargetClass = true)- 2)、@EnableAspectJAutoProxy(exposeProxy=true)- 3)、AopContext.currentProxy() 调用方法

2.分布式事务

image-20220817145218079

分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个 东西,特别是在微服务架构中,几乎可以说是无法避免

2.1 CAP定理

CAP 原则又称 CAP 定理,指的是在一个分布式系统中

  • 一致性(Consistency): - 在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访 问同一份最新的数据副本)
  • 可用性(Availability) - 在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据 更新具备高可用性)
  • 分区容错性(Partition tolerance) - 大多数分布式系统都分布在多个子网络。每个子网络就叫做一个区(partition)。 分区容错的意思是,区间通信可能失败。比如,一台服务器放在中国,另一台服务 器放在美国,这就是两个区,它们之间可能无法通信。

CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。

image-20220817145754571

一般来说,分区容错无法避免,因此可以认为 CAP 的 P 总是成立。CAP 定理告诉我们,剩下的 C 和 A 无法同时做到。

分布式系统中实现一致性的有raft 算法、paxos。raft算法:http://thesecretlivesofdata.com/raft/

对于多数大型互联网应用的场景,主机众多、部署分散,而且现在的集群规模越来越大,所
以节点故障、网络故障是常态,而且要保证服务可用性达到 99.99999%(N 个 9),即保证
P 和 A,舍弃 C,即不能保证强一致,但是可以弥补强一致

2.2 BASE理论

是对 CAP 理论的延伸,思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但以采用适当的采取弱一致性,即最终一致性。即不能保证强一致,但是可以弥补强一致
BASE 是指

  • 基本可用(Basically Available) - 基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、 - 功能上的可用性),允许损失部分可用性。需要注意的是,基本可用绝不等价于系统不可用。- 响应时间上的损失:正常情况下搜索引擎需要在 0.5 秒之内返回给用户相应的查询结果,但由于出现故障(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到了 1~2 秒。- 功能上的损失:购物网站在购物高峰(如双十一)时,为了保护系统的稳定性,部分消费者可能会被引导到一个降级页面。
  • 软状态( Soft State)(处于失败、成功的中间状态) - 软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据会有多个副本,允许不同副本同步的延时就是软状态的体现。mysql replication 的异步复制也是一种体现。
  • 最终一致性( Eventual Consistency) - 最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况

强一致性、弱一致性、最终一致性

  • 从客户端角度,多进程并发访问时,更新过的数据在不同进程如何获取的不同策略,决定了 不同的一致性。
  • 对于关系型数据库,要求更新过的数据能被后续的访问都能看到,这是强一 致性。
  • 如果能容忍后续的部分或者全部访问不到,则是弱一致性。
  • 如果经过一段时间后要求 能访问到更新后的数据,则是最终一致性

3.分布式事务实现模式

3.1 2PC模式

数据库支持的 2PC【2 phase commit 二阶提交】,又叫做 XA Transactions。 MySQL 从 5.5 版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。

其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:

  • 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是 否可以提交.
  • 第二阶段:事务协调器要求每个数据库提交数据。 其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务 中的那部分信息

image-20220817154056328

  • XA 协议比较简单,而且一旦商业数据库实现了 XA 协议,使用分布式事务的成本也比较低。
  • XA 性能不理想,特别是在交易下单链路,往往并发量很高,XA 无法满足高并发场景
  • XA 目前在商业数据库支持的比较理想,在 mysql 数据库中支持的不太理想,mysql 的XA 实现,没有记录 prepare 阶段日志,主备切换回导致主库与备库数据不一致。
  • 许多 nosql 也没有支持 XA,这让 XA 的应用场景变得非常狭隘。
  • 也有 3PC,引入了超时机制(无论协调者还是参与者,在向对方发送请求后,若长时间未收到回应则做出相应处理)

3.2 柔性事务-TCC事务补偿型方案

  • 刚性事务:遵循 ACID 原则,强一致性。
  • 柔性事务:遵循 BASE 理论,最终一致性;
  • 与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。

image-20220817154758877

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
  • 所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

image-20220817173548937

3.3 柔性事务-最大努力通知型方案

  • 按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。
  • 这种 方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。
  • 这种 方案也是结合 MQ 进行实现,例如:通过 MQ 发送 http 请求,设置最大通知次数。达到通 知次数后即不再通知。

案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对 账文件),支付宝的支付成功异步回调

3.4 柔性事务-可靠消息+最终一致性方案(异步确保型)

实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只 记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确 认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。

防止消息丢失:

/**
* 1、做好消息确认机制(pulisher,consumer【手动 ack】)
* 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一
遍
*/
CREATE TABLE `mq_message` (
    `message_id` char(32)NOT NULL, 
    `content` text, 
    `to_exchane` varchar(255)DEFAULT NULL, 
    `routing_key` varchar(255)DEFAULT NULL, 
    `class_type` varchar(255)DEFAULT NULL,
    `message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
    `create_time` datetime DEFAULT NULL, 
    `update_time` datetime DEFAULT NULL,PRIMARY KEY (`message_id`)) ENGINE=InnoDBDEFAULT CHARSET=utf8mb4

4.整合Seata

官方文档:Seata 是什么

4.1 Seate初识

  • TC (Transaction Coordinator) - 事务协调者维护全局和分支事务的状态,驱动全局事务提交或回滚。

  • TM (Transaction Manager) - 事务管理器定义全局事务的范围:开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - 资源管理器管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

image-20220817174856143

4.2 创建回滚日志表 UNDO_LOG表

SEATA AT 模式需要

UNDO_LOG

-- 注意此处0.3.0+ 增加唯一索引 ux_undo_logCREATETABLE`undo_log`(`id`bigint(20)NOTNULLAUTO_INCREMENT,`branch_id`bigint(20)NOTNULL,`xid`varchar(100)NOTNULL,`context`varchar(128)NOTNULL,`rollback_info`longblobNOTNULL,`log_status`int(11)NOTNULL,`log_created`datetimeNOTNULL,`log_modified`datetimeNOTNULL,`ext`varchar(100)DEFAULTNULL,PRIMARYKEY(`id`),UNIQUEKEY`ux_undo_log`(`xid`,`branch_id`))ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8;

4.3 安装事务协调器TC

https://github.com/seata/seata/releases,下载服务器软件包,将其解压缩

修改配置文件

image-20220817183654768

image-20220817183946582

image-20220817184112568

image-20220817184216482

启动:

image-20220817184615500

启动成功:

image-20220817184648474

image-20220817184713752

4.4 项目整合seata

导入依赖:

<!--        分布式事务seata--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><version>2.0.1.RELEASE</version><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-all</artifactId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId><version>${seata.version}</version></dependency>

所有想要用分布式事务的微服务都要使用seata DataSourceProxy 代理自己的数据源

packagecom.henu.soft.merist.gulimall.order.config;importcom.zaxxer.hikari.HikariDataSource;importio.seata.rm.datasource.DataSourceProxy;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.autoconfigure.jdbc.DataSourceProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.util.StringUtils;importjavax.sql.DataSource;@ConfigurationpublicclassMySeataConfig{@AutowiredDataSourceProperties dataSourceProperties;@BeanpublicDataSourcedataSource(DataSourceProperties dataSourceProperties){HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();if(StringUtils.hasText(dataSourceProperties.getName())){
            dataSource.setPoolName(dataSourceProperties.getName());}returnnewDataSourceProxy(dataSource);}}

每个微服务放入 file.conf、registry.conf 文件,并配置分组

image-20220817193150601

在父(大)事务上添加注解

@GlobalTransactional

,每个远程的小事务还用

@Transactional

image-20220817193404661

重启测试

对于普通的业务像后台管理之类的,可以使用 Seata 的 AT 模式

但是在高并发情况下,Seata 并不适用,需要适用前面的

柔性事务-可靠消息+最终一致性方案(异步确保型)

模式

  • 为了高并发,库存服务自己回滚。订单服务在完成之后,发送消息给库存服务
  • 延时队列实现定时任务(定时任务太耗费资源),扫描数据库表保存的锁库存记录,根据订单状态判断,然后将失败的锁库存自动解锁

5.RabbitMQ 延时队列

5.1 场景分析

定时任务缺点:

  • 消耗系统内存、增加了数据库的压力
  • 存在较大的时间误差
  • image-20220817204655755

使用RabbitMQ的延时队列

  • 订单提交之后,先被放到消息队列,到达指定时间30分钟后发送给逻辑业务进行数据库订单保存
  • 订单提交之后,库存锁定成功信息先被放到消息队列,达到指定时间40分钟后检查订单,订单不存在的话自动解锁库存
  • 其实延时队列就是保证 订单状态更新后(已支付、手动取消),判断库存锁定是否逻辑正确,不正确就更正

5.2 概念

消息的TTL(Time To Live)就是消息的存活时间

RabbitMQ可以对队列和消息分别设置TTL

  • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的 设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
  • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队 列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。

死信 DLX(Dead Letter Exchanges)

  • 一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列, 一个路由可以对应很多队列。(什么是死信)满足以下三个条件就会成为死信:1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不 会被再次放在队列里,被其他消费者使用。*(basic.reject/basic.nackrequeue=false*2. 上面的消息的TTL到了,消息过期了。3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
  • Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有 消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
  • 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息 被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列

设置队列过期时间实现延时队列:

image-20220818084819401

设置消息过期时间实现延时队列:

image-20220818084850068

5.3 延时队列定时关单模拟

image-20220818085223576

改进:

image-20220818085601126

image-20220818100324569

5.4 代码实现

订单模块

配置类:

packagecom.henu.soft.merist.gulimall.order.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.Exchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;@ConfigurationpublicclassMyMQConfig{/**
     * 容器钟的Binding、Queue、Exchange 都会自动创建(RabbitMQ 没有的情况)
     * @return
     */@BeanpublicQueueorderDelayQueue(){HashMap<String,Object> arguments =newHashMap<>();/**
         * x-dead-letter-exchange: order-event-exchange
         * x-dead-letter-routing-key: order.release.order
         * x-message-ttl: 60000
         */
        arguments.put("x-dead-letter-exchange","order-event-exchange");
        arguments.put("x-dead-letter-routing-key","order.release.order");
        arguments.put("x-message-ttl",60000);/**
         Queue(String name,  队列名字
         boolean durable,  是否持久化
         boolean exclusive,  是否排他
         boolean autoDelete, 是否自动删除
         Map<String, Object> arguments) 属性
         */Queue queue =newQueue("order.delay.queue",true,false,false,arguments);return queue;}@BeanpublicQueueorderReleaseOrderQueue(){Queue queue =newQueue("order.release.order.queue",true,false,false);return queue;}@BeanpublicExchangeorderEventExchange(){/**
         *   String name,
         *   boolean durable,
         *   boolean autoDelete,
         *   Map<String, Object> arguments
         */returnnewTopicExchange("order-event-exchange",true,false);}@BeanpublicBindingorderCreateOrderBinding(){/**
         * String destination, 目的地(队列名或者交换机名字)
         * DestinationType destinationType, 目的地类型(Queue、Exhcange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         */returnnewBinding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@BeanpublicBindingorderReleaseOrderBinding(){returnnewBinding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}}

加上监听 进行测试:

@RabbitListener(queues ="order.release.order.queue")publicvoidlistener(OrderEntity entity,Channel channel,Message message)throwsIOException{System.out.println("收到过期的订单信息:准备关闭订单"+entity.getOrderSn());
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}==========================================================================@AutowiredRabbitTemplate rabbitTemplate;@ResponseBody@GetMapping("/test/createOrder")publicStringcreateOrderTest(){//订单下单成功OrderEntity entity =newOrderEntity();
        entity.setOrderSn(UUID.randomUUID().toString());
        entity.setModifyTime(newDate());

        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);return"OK";}

发送请求,消息存储在队列中:

image-20220818094230147

一分钟后:

image-20220818094350775

库存模块

packagecom.henu.soft.merist.gulimall.ware.config;importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.amqp.support.converter.MessageConverter;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;@ConfigurationpublicclassMyRabbitConfig{@BeanpublicMessageConvertermessageConverter(){//在容器中导入Json的消息转换器returnnewJackson2JsonMessageConverter();}@RabbitListener(queues ="stock.release.stock.queue")publicvoidhandle(Message message){}@BeanpublicExchangestockEventExchange(){returnnewTopicExchange("stock-event-exchange",true,false);}@BeanpublicQueuestockReleaseStockQueue(){returnnewQueue("stock.release.stock.queue",true,false,false);}@BeanpublicQueuestockDelayQueue(){HashMap<String,Object> arguments =newHashMap<>();/**
         * x-dead-letter-exchange: order-event-exchange
         * x-dead-letter-routing-key: order.release.order
         * x-message-ttl: 60000
         */
        arguments.put("x-dead-letter-exchange","order-event-exchange");
        arguments.put("x-dead-letter-routing-key","order.release.order");
        arguments.put("x-message-ttl",60000);returnnewQueue("stock.delay.queue",true,false,false,arguments);}/**
     * 绑定
     */@BeanpublicBindingstockReleaseBinding(){//String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> argumentsreturnnewBinding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchanges","stock.release.#",null);}/**
     * 绑定
     */@BeanpublicBindingstockLockedBinding(){//String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> argumentsreturnnewBinding("stock.delay.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchanges","stock.delay",null);}}

image-20220818101418131

保存订单详情发送给mq

for(Long wareId : wareIds){//成功返回1,否则就是0Long count = wareSkuDao.lockSkuStock(skuId,wareId,stock.getNum());if(count ==1){//锁成功了
        skuLock =true;//TODO 告诉MQ库存锁成功WareOrderTaskDetailEntity entity =newWareOrderTaskDetailEntity(null, skuId,"", stock.getNum(), taskEntity.getId(), wareId,1);
        wareOrderTaskDetailService.save(entity);StockLockedTo lockedTo =newStockLockedTo();
        lockedTo.setId(taskEntity.getId());StockDetailTo stockDetailTo =newStockDetailTo();BeanUtils.copyProperties(entity,stockDetailTo);
        lockedTo.setDetail(stockDetailTo);

        rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo);break;}//锁失败了 继续下一个仓库}

5.5 流程分析

锁库存成功给 延时队列发送消息,延时时间到,检查订单状态,确认要解锁库存,发送给交换机,发送给解锁库存的队列

image-20220818144428072

监听解锁库存消息队列

image-20220818144724654

收到解锁消息后进行判断释放解锁

/**
 * 收到解锁库存的消息
 * @param to
 * @param message
 */@RabbitHandlerpublicvoidhandleStockLockedRelease(StockLockedToto,Message message,Channel channel)throwsIOException{System.out.println("收到解锁库存的消息");StockDetailTo detail =to.getDetail();Long detailId = detail.getId();//解锁//1、查询数据库关于这个订单的锁定库存信息//有:证明库存锁定成功了//  解锁:查询订单情况//      1、没有这个订单。解锁//      2、有这个订单//          订单状态:已取消,解锁库存//                  未取消,不解锁//没有:库存锁定失败,库存回滚,无需解锁WareOrderTaskDetailEntity byId = wareOrderTaskDetailService.getById(detailId);if(byId !=null){//解锁Long id =to.getId();//库存工作单的idWareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);String orderSn = taskEntity.getOrderSn();R r = orderFeignService.getOrder(orderSn);if(r.getCode()==0){//订单数据返回成功OrderVo data = r.getData(newTypeReference<OrderVo>(){});if(data ==null|| data.getStatus()==4){//订单不存在or订单被取消//解锁库存unLockStock(detail.getId(),detail.getWareId(),detail.getSkuNum(),detailId);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}else{//订单数据返回失败//消息拒绝以后重新放到队列,让别人继续消费解锁
           channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}else{//无需解锁}}/**
 *解锁库存
 */privatevoidunLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId){
    wareSkuDao.unlockStock(skuId,wareId,num);}

开启消息队列的手动ack

image-20220818151436432

spring:rabbitmq:listener:simple:acknowledge-mode: manual

代码优化:

packagecom.henu.soft.merist.gulimall.ware.listener;importcom.henu.soft.merist.common.to.mq.StockLockedTo;importcom.henu.soft.merist.gulimall.ware.service.WareSkuService;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.io.IOException;@Slf4j@Component@RabbitListener(queues ={"stock.release.stock.queue"})publicclassStockReleaseListener{@AutowiredWareSkuService wareSkuService;/**
     * 收到解锁库存的消息
     * @param to
     * @param message
     */@RabbitHandlerpublicvoidhandleStockLockedRelease(StockLockedToto,Message message,Channel channel)throwsIOException{System.out.println("收到解锁库存的消息");try{
           wareSkuService.unlockStock(to);
           channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){
           channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
=================================WareSkuServiceImpl.java===================================@OverridepublicvoidunlockStock(StockLockedToto){StockDetailTo detail =to.getDetail();Long detailId = detail.getId();//解锁//1、查询数据库关于这个订单的锁定库存信息//有:证明库存锁定成功了//  解锁:查询订单情况//      1、没有这个订单。解锁//      2、有这个订单//          订单状态:已取消,解锁库存//                  未取消,不解锁//没有:库存锁定失败,库存回滚,无需解锁WareOrderTaskDetailEntity byId = wareOrderTaskDetailService.getById(detailId);if(byId !=null){//解锁Long id =to.getId();//库存工作单的idWareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);String orderSn = taskEntity.getOrderSn();R r = orderFeignService.getOrder(orderSn);if(r.getCode()==0){//订单数据返回成功OrderVo data = r.getData(newTypeReference<OrderVo>(){});if(data ==null|| data.getStatus()==4){//订单不存在or订单被取消//解锁库存unLockStock(detail.getId(),detail.getWareId(),detail.getSkuNum(),detailId);}}else{//订单数据返回失败//消息拒绝以后重新放到队列,让别人继续消费解锁thrownewRuntimeException("远程服务失败");}}else{//无需解锁}}/**
 *解锁库存
 */privatevoidunLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId){
    wareSkuDao.unlockStock(skuId,wareId,num);}

5.6 判断关闭订单

订单状态:

packagecom.henu.soft.merist.gulimall.order.enume;publicenumOrderStatusEnum{CREATE_NEW(0,"待付款"),PAYED(1,"已付款"),SENDED(2,"已发货"),RECIEVED(3,"已完成"),CANCLED(4,"已取消"),SERVICING(5,"售后中"),SERVICED(6,"售后完成");privateInteger code;privateString msg;OrderStatusEnum(Integer code,String msg){this.code = code;this.msg = msg;}publicIntegergetCode(){return code;}publicStringgetMsg(){return msg;}}
packagecom.henu.soft.merist.gulimall.order.listener;importcom.henu.soft.merist.gulimall.order.entity.OrderEntity;importcom.henu.soft.merist.gulimall.order.service.OrderService;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.io.IOException;@Service@RabbitListener(queues ="order.release.order.queue")publicclassOrderCloseListener{@AutowiredOrderService orderService;@RabbitHandlerpublicvoidlistener(OrderEntity entity,Channel channel,Message message)throwsIOException{System.out.println("收到过期的订单信息:准备关闭订单"+entity.getOrderSn());try{
            orderService.closeOrder(entity);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
@OverridepublicvoidcloseOrder(OrderEntity entity){//查询当前这个订单的最新状态OrderEntity orderEntity =this.getById(entity.getId());if(orderEntity.getStatus()==OrderStatusEnum.CREATE_NEW.getCode()){//关单OrderEntity update =newOrderEntity();
        update.setId(entity.getId());
        update.setStatus(OrderStatusEnum.CANCLED.getCode());this.updateById(update);}}

问题:

由于机器卡顿消息延迟等情况,导致在订单释放前,库存就解锁了,此时订单依然存在,从而导致库存解锁失败。image-20220818162915225

解决方法:

双重解锁:订单释放时,也发送解锁消息

实际上,订单释放时发送解锁消息应该是主动逻辑,而库存的自动解锁应该是被动逻辑用来辅助主动逻辑,这样思考就很清晰了。

image-20220818163305981

image-20220818164215301

image-20220818164438077

image-20220818165438584

5.7 如何保证消息的可靠性

1.消息丢失

  • 消息发送出去,由于网络问题没有抵达服务器 - 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式- 做好日志记录,每个消息状态是否都被服务器收到都应该记录- 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发
  • 消息抵达Broker,Broker要将消息写入queue、磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。 publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
  • 自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机 - 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队

2.消息重复

  • 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者 - 消息消费失败,由于重试机制,自动又将消息发送出去- 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送- 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标志
  • 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理
  • rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的

3.消息积压

  • 消费者宕机积压
  • 消费者消费能力不足积压
  • 发送者发送流量太大 - 上线更多的消费者,进行正常消费- 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

本文转载自: https://blog.csdn.net/qq_52476654/article/details/126410063
版权归原作者 HotRabbit. 所有, 如有侵权,请联系我们删除。

“谷粒商城13——订单模块完成、分布式事务应用、RabbitMQ、Seata、电商项目订单场景的分析完善”的评论:

还没有评论