0


SpringCloud集成Seata saga模式案例

文章目录

一、前言

更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html

至此,seata系列的内容已出:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
  13. 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)
  14. 分布式事务Seata源码解析九:分支事务如何注册到全局事务
  15. 分布式事务Seata源码解析十:AT模式回滚日志undo log详细构建过程
  16. 分布式事务Seata源码解析11:全局事务执行流程之两阶段全局事务提交
  17. 分布式事务Seata源码解析12:全局事务执行流程之全局事务回滚
  18. Spring Cloud整合Seata实现TCC分布式事务模式案例
  19. 分布式事务Seata源码解析13:TCC事务模式实现原理
  20. 分布式事务Seata TCC空回滚/幂等/悬挂问题、解决方案(seata1.5.1如何解决?)
  21. Seata XA模式概述+案例
  22. saga模式、Seata saga模式详介

至此,Seata常用的AT模式、TCC模式 和 XA模式已完结,SAGA模式也已做了基本介绍,本文接着聊Spring Cloud 如何集成Seata saga模式实现全局事务、分支事务

二、Seata saga模式介绍

官方文档地址:https://seata.io/zh-cn/docs/user/saga.html

Seata提供的Saga模式目前只能通过状态机引擎来实现,整体机制为:

  1. 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件; - 换言之,需要开发者手工的进行Saga业务流程绘制,并将其转换为JSON配置文件;
  2. 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点; - 注意: 异常发生时是否进行补偿也可由用户自定义决定,可以选择不配置;
  3. 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚; - 在程序启动时,会根据saga状态图加载业务处理流程(包括:服务补偿处理);
  4. 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能;

1、示例状态图

在这里插入图片描述

2、“状态机”介绍

seata saga的状态语言在一定程度上参考了AWS Step Functions

1)“状态机”属性

  • Name: 表示状态机的名称,必须唯一
  • Comment: 状态机的描述
  • Version: 状态机定义版本
  • StartState: 启动时运行的第一个"状态"
  • States: 状态列表,是一个map结构,key是"状态"的名称,在状态机内必须唯一
  • IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新
  • IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新

2)“状态”属性

  1. Type: “状态” 的类型,比如有: - ServiceTask: 执行调用服务任务- Choice: 单条件选择路由- CompensationTrigger: 触发补偿流程- Succeed: 状态机正常结束- Fail: 状态机异常结束- SubStateMachine: 调用子状态机- CompensateSubMachine: 用于补偿一个子状态机
  2. ServiceName: 服务名称,通常是服务的beanId(也就是Spring容器中的beanName)- 无论是SpringCloud,还是Dubbo、HSF…,最重要的就是配置这个beanId。
  3. ServiceMethod: 服务方法名称(也就是:Spring Bean中的某个方法名)
  4. CompensateState: 该"状态"的补偿"状态"
  5. Loop: 标识该事务节点是否为循环事务, 即由框架本身根据循环属性的配置, 遍历集合元素对该事务节点进行循环执行
  6. Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用 SpringEL, 如果是常量直接写值即可
  7. Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个map结构,key为放入到状态机上文时的key(状态机上下文也是一个map),value中$.是表示SpringEL表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数
  8. Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个map结构,key是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是SpringEL表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值
  9. Catch: 捕获到异常后的路由
  10. Next: 服务执行完成后下一个执行的"状态"
  11. Choices: Choice类型的"状态"里, 可选的分支列表, 分支中的Expression为SpringEL表达式, Next为当表达式成立时执行的下一个"状态"
  12. ErrorCode: Fail类型"状态"的错误码
  13. Message: Fail类型"状态"的错误信息

3)更多状态相关内容

更多详细的状态语言使用示例见github:
https://github.com/seata/seata/tree/develop/test/src/test/java/io/seata/saga/engine

三、SpringCloud 集成 seata saga

官方提供的saga案例地址:https://github.com/seata/seata-samples/tree/master/saga

在这里插入图片描述

然而并没有提供SpringCloud与saga模式集成的案例;以下介绍SpringCloud与saga模式集成的案例。

1、saga模式状态机相关信息

1)状态机配置相关的三个表

首先,我们需要 在使用状态机开启saga分支事务的 服务对应的数据库连接中创建三个表(以MYSQL为例):

CREATETABLEIFNOTEXISTS`seata_state_machine_def`(`id`VARCHAR(32)NOTNULLCOMMENT'id',`name`VARCHAR(128)NOTNULLCOMMENT'name',`tenant_id`VARCHAR(32)NOTNULLCOMMENT'tenant id',`app_name`VARCHAR(32)NOTNULLCOMMENT'application name',`type`VARCHAR(20)COMMENT'state language type',`comment_`VARCHAR(255)COMMENT'comment',`ver`VARCHAR(16)NOTNULLCOMMENT'version',`gmt_create`DATETIME(3)NOTNULLCOMMENT'create time',`status`VARCHAR(2)NOTNULLCOMMENT'status(AC:active|IN:inactive)',`content`TEXTCOMMENT'content',`recover_strategy`VARCHAR(16)COMMENT'transaction recover strategy(compensate|retry)',PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET= utf8mb4;CREATETABLEIFNOTEXISTS`seata_state_machine_inst`(`id`VARCHAR(128)NOTNULLCOMMENT'id',`machine_id`VARCHAR(32)NOTNULLCOMMENT'state machine definition id',`tenant_id`VARCHAR(32)NOTNULLCOMMENT'tenant id',`parent_id`VARCHAR(128)COMMENT'parent id',`gmt_started`DATETIME(3)NOTNULLCOMMENT'start time',`business_key`VARCHAR(48)COMMENT'business key',`start_params`TEXTCOMMENT'start parameters',`gmt_end`DATETIME(3)COMMENT'end time',`excep`BLOBCOMMENT'exception',`end_params`TEXTCOMMENT'end parameters',`status`VARCHAR(2)COMMENT'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',`compensation_status`VARCHAR(2)COMMENT'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',`is_running`TINYINT(1)COMMENT'is running(0 no|1 yes)',`gmt_updated`DATETIME(3)NOTNULL,PRIMARYKEY(`id`),UNIQUEKEY`unikey_buz_tenant`(`business_key`,`tenant_id`))ENGINE=InnoDBDEFAULTCHARSET= utf8mb4;CREATETABLEIFNOTEXISTS`seata_state_inst`(`id`VARCHAR(48)NOTNULLCOMMENT'id',`machine_inst_id`VARCHAR(128)NOTNULLCOMMENT'state machine instance id',`name`VARCHAR(128)NOTNULLCOMMENT'state name',`type`VARCHAR(20)COMMENT'state type',`service_name`VARCHAR(128)COMMENT'service name',`service_method`VARCHAR(128)COMMENT'method name',`service_type`VARCHAR(16)COMMENT'service type',`business_key`VARCHAR(48)COMMENT'business key',`state_id_compensated_for`VARCHAR(50)COMMENT'state compensated for',`state_id_retried_for`VARCHAR(50)COMMENT'state retried for',`gmt_started`DATETIME(3)NOTNULLCOMMENT'start time',`is_for_update`TINYINT(1)COMMENT'is service for update',`input_params`TEXTCOMMENT'input parameters',`output_params`TEXTCOMMENT'output parameters',`status`VARCHAR(2)NOTNULLCOMMENT'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',`excep`BLOBCOMMENT'exception',`gmt_updated`DATETIME(3)COMMENT'update time',`gmt_end`DATETIME(3)COMMENT'end time',PRIMARYKEY(`id`,`machine_inst_id`))ENGINE=InnoDBDEFAULTCHARSET= utf8mb4;

数据库表的出处,见seata官方地址:https://github.com/seata/seata/blob/1.5.2/script/client/saga/db/mysql.sql

在这里插入图片描述

2)状态图

状态机设计器演示(在线画图工具)地址:http://seata.io/saga_designer/index.html

在这里插入图片描述

2、项目代码

整体代码结构:

在这里插入图片描述

此处案例和saga官方提供的一样,仅示范saga模式的使用,不涉及RPC、业务表操作,若读者想丰富案例,可在笔者的

todo

标注处自行添加。

0)pom.xml

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/><!-- lookup parent from repository --></parent><modelVersion>4.0.0</modelVersion><version>0.0.1-SNAPSHOT</version><groupId>com.saint</groupId><artifactId>saga-trade</artifactId><properties><java.version>1.8</java.version><druid.version>1.2.8</druid.version><mysql.version>8.0.22</mysql.version><!--seata1.5.2 版本源码验证--><spring-boot.version>2.3.12.RELEASE</spring-boot.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version><spring-cloud-alibaba.version>2.2.9.RELEASE</spring-cloud-alibaba.version><druid.version>1.2.8</druid.version><mysql.version>8.0.22</mysql.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.10</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><!--整合spring cloud--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><!--整合spring cloud alibaba--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

1)线程池配置 – MyThreadFactory

importjava.util.concurrent.ThreadFactory;importjava.util.concurrent.atomic.AtomicInteger;/**
 * 自定义线程工厂
 */publicclassMyThreadFactoryimplementsThreadFactory{privatefinalAtomicInteger threadNumber;privateThreadGroup group;privateString namePrefix;publicMyThreadFactory(String namePrefix){this.threadNumber =newAtomicInteger(1);SecurityManager s =System.getSecurityManager();this.group = s !=null? s.getThreadGroup():Thread.currentThread().getThreadGroup();this.namePrefix = namePrefix +"_THREAD_";}@OverridepublicThreadnewThread(Runnable r){Thread t =newThread(this.group, r,this.namePrefix +this.threadNumber.getAndIncrement(),0L);return t;}}

2)seata saga相关配置 – SagaConfiguration

importcom.alibaba.druid.pool.DruidDataSource;importio.seata.saga.engine.config.DbStateMachineConfig;importio.seata.saga.engine.impl.ProcessCtrlStateMachineEngine;importio.seata.saga.rm.StateMachineEngineHolder;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.sql.DataSource;importjava.util.concurrent.LinkedBlockingQueue;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;/**
 * @author Saint
 */@ConfigurationpublicclassSagaConfiguration{@Bean@ConfigurationProperties(prefix ="spring.datasource")publicDataSourcedataSource(){returnnewDruidDataSource();}@BeanpublicThreadPoolExecutorsagaThreadPool(){ThreadPoolExecutor executor =newThreadPoolExecutor(1,20,30,TimeUnit.SECONDS,newLinkedBlockingQueue<>(2000),newMyThreadFactory("SAGA_ASYNC_EXE_"),newThreadPoolExecutor.AbortPolicy());return executor;}@BeanpublicDbStateMachineConfigdbStateMachineConfig(){DbStateMachineConfig config =newDbStateMachineConfig();
        config.setDataSource(dataSource());
        config.setResources(newString[]{"statelang/*.json"});
        config.setEnableAsync(true);
        config.setApplicationId("saga-trade");
        config.setTxServiceGroup("saint-trade-tx-group");
        config.setThreadPoolExecutor(sagaThreadPool());return config;}@BeanpublicProcessCtrlStateMachineEnginestateMachineEngine(){ProcessCtrlStateMachineEngine engine =newProcessCtrlStateMachineEngine();
        engine.setStateMachineConfig(dbStateMachineConfig());return engine;}@BeanpublicStateMachineEngineHolderstateMachineEngineHolder(){StateMachineEngineHolder holder =newStateMachineEngineHolder();
        holder.setStateMachineEngine(stateMachineEngine());return holder;}}

3)库存服务 – InventoryService

InventoryService提供了两个方法:一个

reduce()

、一个reduce()对应的补偿方法

compensateReduce()

packagecom.saint.saga.trade.service;/**
 * Inventory Actions
 */publicinterfaceInventoryService{/**
     * reduce
     *
     * @param businessKey 业务上的唯一标识
     * @param count
     * @return
     */booleanreduce(String businessKey,int count);/**
     * increase
     *
     * @param businessKey 业务上的唯一标识
     * @return
     */booleancompensateReduce(String businessKey);}

InventoryServiceImpl

packagecom.saint.saga.trade.service.impl;importcom.saint.saga.trade.service.InventoryService;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.stereotype.Service;/**
 * 库存
 *
 * @author Saint
 */@Service(value ="inventoryService")publicclassInventoryServiceImplimplementsInventoryService{privatestaticfinalLogger LOGGER =LoggerFactory.getLogger(InventoryActionImpl.class);@Overridepublicbooleanreduce(String businessKey,int count){
        LOGGER.info("reduce inventory succeed, count: "+ count +", businessKey:"+ businessKey);// todo rpc / httpreturntrue;}@OverridepublicbooleancompensateReduce(String businessKey){
        LOGGER.info("compensate reduce inventory succeed, businessKey:"+ businessKey);// todo rpc / httpreturntrue;}}

4)账户余额服务 – BalanceService

BalanceService提供了两个方法:一个

reduce()

、一个reduce()对应的补偿方法

compensateReduce()

packagecom.saint.saga.trade.service;importjava.math.BigDecimal;importjava.util.Map;/**
 * Balance Actions
 */publicinterfaceBalanceService{/**
     * reduce
     *
     * @param businessKey 业务上的唯一标识
     * @param amount
     * @param params
     * @return
     */booleanreduce(String businessKey,BigDecimal amount,Map<String,Object> params);/**
     * compensateReduce
     *
     * @param businessKey 业务上的唯一标识
     * @param params
     * @return
     */booleancompensateReduce(String businessKey,Map<String,Object> params);}

BalanceServiceImpl

packagecom.saint.saga.trade.service.impl;importcom.saint.saga.trade.service.BalanceService;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.stereotype.Service;importjava.math.BigDecimal;importjava.util.Map;/**
 * 账户余额
 *
 * @author Saint
 */@Service(value ="balanceService")publicclassBalanceServiceImplimplementsBalanceService{privatestaticfinalLogger LOGGER =LoggerFactory.getLogger(BalanceServiceImpl.class);@Overridepublicbooleanreduce(String businessKey,BigDecimal amount,Map<String,Object> params){if(params !=null){Object throwException = params.get("throwException");if(throwException !=null&&"true".equals(throwException.toString())){thrownewRuntimeException("reduce balance failed");}}
        LOGGER.info("reduce balance succeed, amount: "+ amount +", bizCode:"+ businessKey);// todo rpc / httpreturntrue;}@OverridepublicbooleancompensateReduce(String businessKey,Map<String,Object> params){if(params !=null){Object throwException = params.get("throwException");if(throwException !=null&&"true".equals(throwException.toString())){thrownewRuntimeException("compensate reduce balance failed");}}
        LOGGER.info("compensate reduce balance succeed, businessKey:"+ businessKey);// todo rpc / httpreturntrue;}}

5)启动类 – SagaTradeApplication

@SpringBootApplicationpublicclassSagaTradeApplication{publicstaticvoidmain(String[] args){ConfigurableApplicationContext run =SpringApplication.run(SagaTradeApplication.class, args);InventoryService bean = run.getBean(InventoryService.class);BalanceService bean1 = run.getBean(BalanceService.class);}}

6) 状态图对应的JSON文件 – reduce_inventory_and_balance.json

{"Name":"reduceInventoryAndBalance","Comment":"reduce inventory then reduce balance in a transaction","StartState":"ReduceInventory","Version":"0.0.1","States":{"ReduceInventory":{"Type":"ServiceTask","ServiceName":"inventoryService","ServiceMethod":"reduce","CompensateState":"CompensateReduceInventory","Next":"ChoiceState","Input":["$.[businessKey]","$.[count]"],"Output":{"reduceInventoryResult":"$.#root"},"Status":{"#root == true":"SU","#root == false":"FA","$Exception{java.lang.Throwable}":"UN"}},"ChoiceState":{"Type":"Choice","Choices":[{"Expression":"[reduceInventoryResult] == true","Next":"ReduceBalance"}],"Default":"Fail"},"ReduceBalance":{"Type":"ServiceTask","ServiceName":"balanceService","ServiceMethod":"reduce","CompensateState":"CompensateReduceBalance","Input":["$.[businessKey]","$.[amount]",{"throwException":"$.[mockReduceBalanceFail]"}],"Output":{"compensateReduceBalanceResult":"$.#root"},"Status":{"#root == true":"SU","#root == false":"FA","$Exception{java.lang.Throwable}":"UN"},"Catch":[{"Exceptions":["java.lang.Throwable"],"Next":"CompensationTrigger"}],"Next":"Succeed"},"CompensateReduceInventory":{"Type":"ServiceTask","ServiceName":"inventoryService","ServiceMethod":"compensateReduce","Input":["$.[businessKey]"]},"CompensateReduceBalance":{"Type":"ServiceTask","ServiceName":"balanceService","ServiceMethod":"compensateReduce","Input":["$.[businessKey]"]},"CompensationTrigger":{"Type":"CompensationTrigger","Next":"Fail"},"Succeed":{"Type":"Succeed"},"Fail":{"Type":"Fail","ErrorCode":"PURCHASE_FAILED","Message":"purchase failed"}}}

状态图流程解析

在这里插入图片描述

7)application.yml

server:port:9099spring:application:name: saga-trade
  datasource:url: jdbc:mysql://127.0.0.1:3306/seata_saga?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=falseusername: root
    password:123456driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:show-sql:trueseata:tx-service-group: saint-trade-tx-group

8)file.conf

transport {
  # tcp udt unix-domain-socket
  type ="TCP"
  #NIO NATIVE
  server ="NIO"
  #enable heartbeat
  heartbeat =true
  # the client batch send request enable
  enableClientBatchSendRequest =true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix ="NettyBoss"
    workerThreadPrefix ="NettyServerNIOWorker"
    serverExecutorThread-prefix ="NettyServerBizHandler"
    shareBossWorker =false
    clientSelectorThreadPrefix ="NettyClientSelector"
    clientSelectorThreadSize =1
    clientWorkerThreadPrefix ="NettyClientWorkerThread"
    # netty boss thread size,will not be used forUDT
    bossThreadSize =1
    #auto default pin or 8
    workerThreadSize ="default"}
  shutdown {
    # when destroy server, wait seconds
    wait =3}
  serialization ="seata"
  compressor ="none"}
service {
  #transaction service group mapping
  vgroupMapping.saint-trade-tx-group ="seata-server-sh"
  #only support when registry.type=file, please don't set multiple addresses
  seata-server-sh.grouplist ="127.0.0.1:8091"
  #degrade, current not support
  enableDegrade =false
  #disable seata
  disableGlobalTransaction =false}

client {
  rm {
    asyncCommitBufferLimit =10000
    lock {
      retryInterval =10
      retryTimes =30
      retryPolicyBranchRollbackOnConflict =true}
    reportRetryCount =5
    tableMetaCheckEnable =false
    reportSuccessEnable =false}
  tm {
    commitRetryCount =5
    rollbackRetryCount =5}
  undo {
    dataValidation =true
    logSerialization ="jackson"
    logTable ="undo_log"}
  log {
    exceptionRate =100}}

9)开启状态机入口 – TradeController

状态机支持两种执行方式:同步执行、异步执行;

在这里插入图片描述

  • 同步执行API:StateMachineEngine#startWithBusinessKey();
  • 异步执行API:StateMachineEngine#startWithBusinessKeyAsync(…, AsyncCallback) - 其中的AsyncCallback为异步执行结束之后的回调函数
packagecom.saint.saga.trade.controller;importio.seata.saga.engine.AsyncCallback;importio.seata.saga.engine.StateMachineEngine;importio.seata.saga.proctrl.ProcessContext;importio.seata.saga.statelang.domain.ExecutionStatus;importio.seata.saga.statelang.domain.StateMachineInstance;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RequestMethod;importorg.springframework.web.bind.annotation.RestController;importjava.math.BigDecimal;importjava.util.HashMap;importjava.util.Map;/**
 * @author Saint
 */@RestController@RequestMapping("saga")@Slf4jpublicclassTradeController{@AutowiredprivateStateMachineEngine stateMachineEngine;/**
     * POST请求 http://localhost:9099/saga/commit?amount=50&count=2
     */@RequestMapping(value ="/commit", method =RequestMethod.POST)publicStringcommit(Integer amount,Integer count){String businessKey =String.valueOf(System.currentTimeMillis());Map<String,Object> startParams =generateStartParams(amount, count,false);// 1、sync testStateMachineInstance instance = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance",null,
                businessKey, startParams);// 2、async test//        StateMachineInstance instance = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams,//                CALL_BACK);//        waitingForFinish(instance);// PS: instance is not nullif(!ExecutionStatus.SU.equals(instance.getStatus())){
            log.error("saga transaction execute failed. XID: {}", instance.getId());return"rollback";}

        log.info("saga transaction commit succeed. XID: {}", instance.getId());return"succeed";}/**
     * POST请求 http://localhost:9099/saga/rollback?amount=50&count=2
     */@RequestMapping(value ="/rollback", method =RequestMethod.POST)publicStringrollback(Integer amount,Integer count){String businessKey =String.valueOf(System.currentTimeMillis());// unique difference is hereMap<String,Object> startParams =generateStartParams(amount, count,true);// 1、sync testStateMachineInstance instance = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance",null,
                businessKey, startParams);// 2、async test//        StateMachineInstance instance = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams,//                CALL_BACK);//        waitingForFinish(instance);// PS: instance is not nullif(!ExecutionStatus.SU.equals(instance.getStatus())){
            log.error("saga transaction execute failed. XID: {}", instance.getId());return"rollback";}

        log.info("saga transaction commit succeed. XID: {}", instance.getId());return"succeed";}/**
     * parameters to be used in the state machine(状态机需要用到的参数在这里组装)
     */privateMap<String,Object>generateStartParams(Integer amount,Integer count,Boolean mockFail){String businessKey =String.valueOf(System.currentTimeMillis());Map<String,Object> startParams =newHashMap<>(8);
        startParams.put("businessKey", businessKey);
        startParams.put("count",10);
        startParams.put("amount",newBigDecimal(String.valueOf(amount)));if(mockFail)
            startParams.put("mockReduceBalanceFail",true);return startParams;}privatestaticvolatileObject lock =newObject();privatestaticAsyncCallback CALL_BACK =newAsyncCallback(){@OverridepublicvoidonFinished(ProcessContext context,StateMachineInstance stateMachineInstance){synchronized(lock){
                lock.notifyAll();}}@OverridepublicvoidonError(ProcessContext context,StateMachineInstance stateMachineInstance,Exception exp){synchronized(lock){
                lock.notifyAll();}}};privatestaticvoidwaitingForFinish(StateMachineInstance inst){synchronized(lock){if(!ExecutionStatus.RU.equals(inst.getStatus()))return;try{
                lock.wait();}catch(InterruptedException e){
                log.error("occur exception, ", e);}}}}

3、测试 / 验证

1)启动seata-server服务;

参考博文:超细的Spring Cloud 整合Seata实现分布式事务(排坑版)进行seata-server的配置和启动;

在这里插入图片描述

**注意:本文使用的seata版本是

1.5.2

,切勿使用成参考博文中的1.3.0。**

在这里插入图片描述

seata server1.5.2启动成功后控制台输出:

在这里插入图片描述

2)启动seata-client(saga-trade)

在这里插入图片描述

3)事务提交

执行 POST类型请求:http://localhost:9099/saga/commit?amount=50&count=2

在这里插入图片描述

saga-trade控制台输出:

在这里插入图片描述

seata-server日志:

在这里插入图片描述

4)事务回滚

执行 POST类型请求:http://localhost:9099/saga/commit?amount=50&count=2

在这里插入图片描述

saga-trade控制台输出:

在这里插入图片描述

seata-server日志:

在这里插入图片描述

三、总结

seata的saga模式适用于长流程 或 长事务场景。saga模式复杂的地方在于引入状态机,需要自己根据业务定义状态机的流程,然后把定义好的流程用json文件导入到工程中。

此外,saga模式需要开发者自定义回滚事件,并要考虑空补偿、悬挂、幂等三种问题,即:允许空补偿、做防悬挂控制、做幂等控制。读者可以参考TCC模式中的解决方案(分布式事务Seata TCC空回滚/幂等/悬挂问题、解决方案(seata1.5.1如何解决?))实现;


本文转载自: https://blog.csdn.net/Saintmm/article/details/129911538
版权归原作者 秃秃爱健身 所有, 如有侵权,请联系我们删除。

“SpringCloud集成Seata saga模式案例”的评论:

还没有评论