0


【微服务40】分布式事务Seata源码解析八:AT模式下本地事务的执行流程

文章目录

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务

Seata最核心的全局事务执行流程,前面我们已经聊到了Seata全局事务的开启,本文接着聊Seata全局事务中执行具体业务操作时,DB操作是如何执行的(含:全局锁keys、undologs的构建(AT模式))?

在这里插入图片描述

二、本地事务SQL执行流程

全局事务的整体执行流程体现在TransactionalTemplate#execute()方法中:

在这里插入图片描述

具体代码 和 注释:

publicObjectexecute(TransactionalExecutor business)throwsThrowable{// 1. Get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if(txInfo ==null){thrownewShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.// 获取当前事务,根据ThreadLocal,获取当前线程本地变量副本中的xid,进而判断是否存在一个全局事务// 刚开始一个全局事务时,肯定是没有全局事务的GlobalTransaction tx =GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.// 从全局事务的配置里 获取事务传播级别,默认是REQUIRED(如果存在则加入,否则开启一个新的)Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder =null;try{// 根据事务的隔离级别做不同的处理switch(propagation){case NOT_SUPPORTED:// If transaction is existing, suspend it.if(existingTransaction(tx)){// 事务存在,则挂起事务(默认将xid从RootContext中移除)
                    suspendedResourcesHolder = tx.suspend();}// Execute without transaction and return.return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.if(existingTransaction(tx)){
                    suspendedResourcesHolder = tx.suspend();
                    tx =GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.if(notExistingTransaction(tx)){return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.if(existingTransaction(tx)){thrownewTransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));}else{// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.if(notExistingTransaction(tx)){thrownewTransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:thrownewTransactionException("Not Supported Propagation:"+ propagation);}// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.if(tx ==null){// 创建全局事务(角色为事务发起者),并关联全局事务管理器
            tx =GlobalTransactionContext.createNew();}// set current tx config to holderGlobalLockConfig previousConfig =replaceGlobalLockConfig(txInfo);try{// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,//    else do nothing. Of course, the hooks will still be triggered.// 开启全局事务,如果事务角色是'GlobalTransactionRole.Launcher',发送开始事务请求到seata-server(TC)beginTransaction(txInfo, tx);Object rs;try{// Do Your Business// 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;// 每个分支事务都会去运行
                rs = business.execute();}catch(Throwable ex){// 3. The needed business exception to rollback.// 如果全局事务执行发生了异常,则回滚;completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.// 全局事务和分支事务运行无误,提交事务;commitTransaction(tx);return rs;}finally{//5. clear// 全局事务完成之后做一些清理工作resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}}finally{// If the transaction is suspended, resume it.if(suspendedResourcesHolder !=null){// 如果有挂起的全局事务,则恢复全局事务
            tx.resume(suspendedResourcesHolder);}}}

在前一篇文章: 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务,我们已经聊到了开启全局事务,本文继续聊开启全局事务之后,本地事务中的SQL执行流程。

在这里插入图片描述

1、DataSourceProxy 数据库资源代理入口

在Spring Cloud 整合Seata实现分布式事务一文中有聊到Spring Cloud 集成Seata 的AT模式,需要写一个配置类

DataSourceConfig

,其中会注入一个Bean(

DataSourceProxy

):

在这里插入图片描述

到这里,博主有一个问题:注入DataSourceProxy到Spring容器中之后,哪里会用到它?执行数据增删改查时如何切换到代理数据源?

1)哪里使用了DataSourceProxy?

在这里插入图片描述

从源码来看,有一个Spring AOP抽象类

AbstractAutoProxyCreator

的子类

SeataAutoDataSourceProxyCreator

Spring 通过 AbstractAutoProxyCreator来创建 AOP 代理,其实现了BeanPostProcessor 接口,在 bean 初始化完成之后会创建它的代理,让后将代理对象增加到Spring容器。

在Seata 中,SeataAutoDataSourceProxyCreator的主要作用是为数据源

DataSource

添加

Advisor

,当数据源执行操作时,便会进入到

SeataAutoDataSourceProxyAdvice

类中处理;

在这里插入图片描述

因此,当数据源执行CRUD操作时,由于添加了AOP代理,会进入到SeataAutoDataSourceProxyAdvice#invoke()方法中:

在这里插入图片描述

咦,这里没有DataSourceProxy呀,只有SeataDataSourceProxy,从命名来看,这俩类总感觉有点关系!

2)SeataDataSourceProxy

在这里插入图片描述

从DataSourceProxy类的继承结构来看,DataSourceProxy实现了

SeataDataSourceProxy

接口;**因此

SeataAutoDataSourceProxyAdvice#invoke()方法中

动态代理类实际就是

DataSourceProxy

。**

2、本地事务SQL的执行流程(execute)

1)执行本地事务SQL的入口

在这里插入图片描述
JDBC的执行流程:

  1. 第一步:注册驱动;
  2. 第二步:获取与数据库的连接Connection;
  3. 第三步:获取数据库操作对象Statement;
  4. 第四步:执行sql语句(DQL、DML…),并且返回结果集;
  5. 第五步:处理查询结果集;
  6. 第六步:释放资源、关闭连接;
try{//加载数据库驱动Class.forName("com.mysql.cj.jdbc.Driver");}catch(ClassNotFoundException e){// do something}Connection conn =DriverManager.getConnection(URL, USER_NAME, PASSWORD);PreparedStatement pst = conn.prepareStatement("update user set name=? where id = ?");
pst.setString(1,"bobDog");
pst.setInt(2,1);int updateRes = pst.executeUpdate();if(updateRes >0){System.out.println("更新成功!");}

Seata代理的数据库资源DataSource底层也是JDBC操作数据库,所以也需要先获取数据库连接Connection、再根据数据库连接获取数据库操作对象Statement、接着再通过Statement#execute()执行SQL。在Seata中的表现为:

  1. 先获取seata代理的数据库连接ConnectionProxy;在这里插入图片描述
  2. 再根据ConnectionProxy获取一个数据库操作对象 StatementProxyPreparedStatementProxy在这里插入图片描述
  3. 然后再利用数据库操作对象 StatementProxyPreparedStatementProxy 的execute() 或 executeUpdate() 方法执行SQL语句。在这里插入图片描述在这里插入图片描述
StatementProxy

PreparedStatementProxy

增强了所有的execute方法,由ExecuteTemplate选择需要的Executor执行来sql。

**下面以常见的更新操作(

PreparedStatementProxy#executeUpdate()

)为例:**

ExecuteTemplate#execute()

重载方法调用链路如下:

在这里插入图片描述

publicstatic<T,SextendsStatement>Texecute(List<SQLRecognizer> sqlRecognizers,StatementProxy<S> statementProxy,StatementCallback<T,S> statementCallback,Object... args)throwsSQLException{// 没获取到全局锁,并且事务模式不是ATif(!RootContext.requireGlobalLock()&&BranchType.AT !=RootContext.getBranchType()){// Just work as original statementreturn statementCallback.execute(statementProxy.getTargetStatement(), args);}// 获取DB的类型String dbType = statementProxy.getConnectionProxy().getDbType();if(CollectionUtils.isEmpty(sqlRecognizers)){
        sqlRecognizers =SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                dbType);}Executor<T> executor;if(CollectionUtils.isEmpty(sqlRecognizers)){
        executor =newPlainExecutor<>(statementProxy, statementCallback);}else{if(sqlRecognizers.size()==1){SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);// 数据库操作类型switch(sqlRecognizer.getSQLType()){case INSERT:
                    executor =EnhancedServiceLoader.load(InsertExecutor.class, dbType,newClass[]{StatementProxy.class,StatementCallback.class,SQLRecognizer.class},newObject[]{statementProxy, statementCallback, sqlRecognizer});break;case UPDATE:
                    executor =newUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case DELETE:
                    executor =newDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case SELECT_FOR_UPDATE:
                    executor =newSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case INSERT_ON_DUPLICATE_UPDATE:switch(dbType){caseJdbcConstants.MYSQL:caseJdbcConstants.MARIADB:
                            executor =newMySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;default:thrownewNotSupportYetException(dbType +" not support to INSERT_ON_DUPLICATE_UPDATE");}break;default:
                    executor =newPlainExecutor<>(statementProxy, statementCallback);break;}}else{
            executor =newMultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);}}T rs;try{// 通过Executor真正的执行
        rs = executor.execute(args);}catch(Throwable ex){if(!(ex instanceofSQLException)){// Turn other exception into SQLException
            ex =newSQLException(ex);}throw(SQLException) ex;}return rs;}
  1. 如果当前事务不需要获取全局锁,并且不是AT模式,则以original statement的方式执行。默认Seata Client层面不需要获取全局锁,事务模式是AT模式。
  2. 获取到的DB类型,比如MySQL、Oracle…,博主的项目DBType是MYSQL。
  3. 获取SQL DML类型,并根据DML类型,选择不同的Executor。这里可以看做是策略模式。

因为示例是Update类型,所以最终选择的Executor是UpdateExecutor。

2)执行本地事务SQL逻辑

UpdateExecutor#execute()方法中会执行本地事务SQL,UpdateExecutor的类继承图如下:

在这里插入图片描述

除了数据更新前后的Image构造体现在UpdateExecutor类的方法中,其余方法均在其父类

BaseTransactionalExecutor

中,包括execute()方法。

@OverridepublicTexecute(Object... args)throwsThrowable{// 从全局事务上下文中获取xidString xid =RootContext.getXID();if(xid !=null){// 将xid绑定到ConnectionContext中,后续提交本地事务时会用到
        statementProxy.getConnectionProxy().bind(xid);}// RootContext.requireGlobalLock()检查是否需要全局锁,默认不需要
    statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());returndoExecute(args);}

开始执行本地事务SQL时:

  1. 首先从全局事务上下文RootContext中获取到xid,如果存在全局事务xid,则将xid绑定到数据库连接的上下文ConnectionContext中;
  2. 从全局事务上下文RootContext获取是否全局锁标识,默认不需要;如果需要获取全局锁,则将数据库连接上下文ConnectionContext中的isGlobalLockRequire设置为true;
  3. 调用doExecute()方法真正开始执行SQL;

UpdateExecutor#doExecutor()方法:

在这里插入图片描述

开启了全局事务之后,DML语句的本地事务不会自动提交。

**即使自动提交没有关闭,

AbstractDMLBaseExecutor#doExecute(Object… args)

方法中也会先将其关闭,然后再以非自动提交的方式执行SQL,走ConnectionProxy提交本地事务,然后再将自动提交设置为true;这一块逻辑体现在

executeAutoCommitTrue()

方法中:**

protectedTexecuteAutoCommitTrue(Object[] args)throwsThrowable{ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try{
        connectionProxy.changeAutoCommit();returnnewLockRetryPolicy(connectionProxy).execute(()->{T result =executeAutoCommitFalse(args);
            connectionProxy.commit();return result;});}catch(Exception e){// when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);if(!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()){
            connectionProxy.getTargetConnection().rollback();}throw e;}finally{
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true);}}

正常情况下都是直接以非自动提交的方式执行,即执行executeAutoCommitFalse()方法:

protectedTexecuteAutoCommitFalse(Object[] args)throwsException{if(!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType())&&isMultiPk()){thrownewNotSupportYetException("multi pk only support mysql!");}// 根据SQL语句构建before image,目标SQL执行之前的数据镜像:从数据库根据ID主键等信息查询出更新前的数据;TableRecords beforeImage =beforeImage();// 真正的去执行SQL语句,但是本地事务还没有提交T result = statementCallback.execute(statementProxy.getTargetStatement(), args);int updateCount = statementProxy.getUpdateCount();if(updateCount >0){// 目标SQL执行之后的数据镜像:从数据库根据ID主键等信息查询出更新后的数据;TableRecords afterImage =afterImage(beforeImage);// 准备好undo log数据prepareUndoLog(beforeImage, afterImage);}return result;}

在这里插入图片描述

由于AbstractDMLBaseExecutor提供了公用的

executeAutoCommitFalse()

给Insert、Delete、Update类型的Executor使用,所以无论是Insert、Delete还是Update操作都会走

AbstractDMLBaseExecutor#executeAutoCommitFalse()

方法执行SQL。不过MySQL的

MySQLInsertOrUpdateExecutor

是个个例,其执行SQL的逻辑由自己实现(有兴趣可以自己看一下MySQLInsertOrUpdateExecutor)。

以非自动提交执行SQL的流程如下:

  1. beforeImage() – 根据SQL语句构建before image,查询目标sql执行前的数据快照; - Update、Delete操作从数据库根据ID主键等信息查询出更新前的数据;- Insert操作直接返回空的TableRecords,其中只包含TableMeta,没有数据记录;
  2. 执行SQL语句,但是本地事务还没有提交;
  3. afterImage() – 构建after image,查询目标SQL执行之后的数据快照; - Insert、Update操作从数据库根据ID主键等信息查询出更新后的数据;- Delete操作直接返回空的TableRecords,其中只包含TableMeta,没有数据记录;
  4. prepareUndoLog(beforeImage, afterImage) --> 将before image 和 after image合并作为回滚日志undo log,保存到当前数据库连接上下文ConnectionContext中。其中还包括构建当前本地事务要占用所有全局锁key信息,然后将其保存到当前数据库连接上下文ConnectionContext中。

下面就这几步展开看一看;

1> 构建before image

此处依旧以Update为例:

在这里插入图片描述

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)

2> 执行SQL

在这里插入图片描述
最终使用源Statement执行SQL;

3> 构建after image

执行完SQL之后,再构建SQL查询出当前最新的数据记录作为after image;

在这里插入图片描述

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)

4> 预处理undo log

将before image 和 after image合并作为回滚日志undo log,存储到当前数据库连接上下文ConnectionContext中。

protectedvoidprepareUndoLog(TableRecords beforeImage,TableRecords afterImage)throwsSQLException{if(beforeImage.getRows().isEmpty()&& afterImage.getRows().isEmpty()){return;}if(SQLType.UPDATE == sqlRecognizer.getSQLType()){if(beforeImage.getRows().size()!= afterImage.getRows().size()){thrownewShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");}}ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();TableRecords lockKeyRecords = sqlRecognizer.getSQLType()==SQLType.DELETE ? beforeImage : afterImage;// 1、构建全局锁key信息,针对更新的一批数据主键ID构建这批数据的全局锁key// 例如:table_name:id_1101String lockKeys =buildLockKey(lockKeyRecords);if(null!= lockKeys){// 将lockKeys信息保存到ConnectionContext中,在注册分支事务时,再将全局锁信息放入到TC中进行检查、存储
        connectionProxy.appendLockKey(lockKeys);// 2、构建undo logSQLUndoLog sqlUndoLog =buildUndoItem(beforeImage, afterImage);// 将undo log信息保存到ConnectionContext中
        connectionProxy.appendUndoLog(sqlUndoLog);}}

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)

由于关闭了AutoCommit,所以在Statement.execute()执行完SQL之后,需要“手动”提交本地事务。

3、本地事务SQL的提交(commit)

回到ConnectionProxy#commit()方法,这里是“手动”提交本地事务的入口;

@Overridepublicvoidcommit()throwsSQLException{try{// 由LockRetryPolicy负责提交事务,LockRetryPolicy中包含全局锁的概念,支持retry重试策略
        lockRetryPolicy.execute(()->{doCommit();returnnull;});}catch(SQLException e){if(targetConnection !=null&&!getAutoCommit()&&!getContext().isAutoCommitChanged()){rollback();}throw e;}catch(Exception e){thrownewSQLException(e);}}

本地事务的提交又会委托给LockRetryPolicy的execute方法来执行;

1)LockRetryPolicy重试机制

LockRetryPolicy是ConnectionProxy的静态内部类,其中包含了全局锁的概念,支持retry策略,当出现全局锁冲突时支持多次重试获取全局锁。

在这里插入图片描述

默认情况下execute()方法中:

  • LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT为TRUE,可以通过配置client.rm.lock.retryPolicyBranchRollbackOnConflict=false属性改变;
  • connection.getContext().isAutoCommitChanged()为FALSE;

所以默认情况下,都会走重试获取全局锁的逻辑:

doRetryOnLockConflict()

方法。(当然可以选择开启自动提交事务、并设置属性

client.rm.lock.retryPolicyBranchRollbackOnConflict=true

,这样便不会走重试获取全局锁逻辑。)

protected<T>TdoRetryOnLockConflict(Callable<T> callable)throwsException{LockRetryController lockRetryController =newLockRetryController();while(true){try{return callable.call();}catch(LockConflictException lockConflict){// 出现全局锁冲突,回滚本地事务onException(lockConflict);// AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is releasedif(connection.getContext().isAutoCommitChanged()&& lockConflict.getCode()==TransactionExceptionCode.LockKeyConflictFailFast){
                lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);}// 线程睡眠10ms,然后再重试,超过重试次数,抛出异常结束流程
            lockRetryController.sleep(lockConflict);}catch(Exception e){// 出现非全局锁冲突的异常,则直接报错返回onException(e);throw e;}}}

doRetryOnLockConflict()

方法中:

  • 如果因为全局锁冲突导致提交本地事务失败,先回滚本地事务,然后会判断重试次数(lockRetryTimes,默认30次)再进行重试,重试之前会让线程睡眠一段时间(lockRetryInterval,默认10ms)。如果重试次数已经够了,则直接抛出异常结束流程。在这里插入图片描述
  • 如果因为其他异常(包括超过重试次数)导致提交本地事务失败,直接回滚本地事务、抛出异常结束流程。

**上面的

ConnectionProxy#onException()

方法中负责回滚本地事务、清理当前连接的ConnectionContext中的undo log信息、全局锁keys信息;**

在这里插入图片描述

了解完了全局锁冲突引起的重试机制,下面接着看本地事务的提交流程。

2)本地事务提交流程

在这里插入图片描述

LockRetryPolicy#execute()方法中会运行方法的入参Callable,在ConnectionProxy#commit()方法中传入的到LockRetryPolicy#execute()方法中的Callable为:

()->{doCommit();returnnull;}

doCommit()方法:

privatevoiddoCommit()throwsSQLException{// 当前DML操作在全局事务中时,判定条件:ConnectionContext中包含xidif(context.inGlobalTransaction()){processGlobalTransactionCommit();}elseif(context.isGlobalLockRequire()){// 如果使用了@GlobalLock,需要获取全局锁processLocalCommitWithGlobalLocks();}else{// 不在分布式事务中,则以原生connection提交本地事务
        targetConnection.commit();}}

doCommit()方法中分三种情况进行不同的处理:

  1. 如果当前DML操作在全局事务中,即:当前连接的ConnectionContext中包含xid,则以处理全局事务方式(processGlobalTransactionCommit()提交本地事务;
  2. 如果使用了@GlobalLock,需要获取全局锁,再以原生connection提交本地事务;在这里插入图片描述
  3. 否则如果事务不在分布式事务中,则以原生connection提交本地事务;在这里插入图片描述

正常我们使用分布式事务,一般肯定是要以全局事务的方式执行DML操作;即:默认会进入到processGlobalTransactionCommit():

privatevoidprocessGlobalTransactionCommit()throwsSQLException{try{// 向远程的TC中注册分支事务,并检查、增加全局行锁register();}catch(TransactionException e){// 出现异常时,回滚本地事务 再重试。// 大多数情况是因为全局锁冲突走到这里。recognizeLockKeyConflictException(e, context.buildLockKeys());}try{// 回滚日志管理组件,持久化undo logUndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);// 提交本地事务
        targetConnection.commit();}catch(Throwable ex){
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);// 上报分支事务执行失败,用于监控report(false);thrownewSQLException(ex);}// 上报分支事务执行成功,默认不会上报if(IS_REPORT_SUCCESS_ENABLE){report(true);}// 重置连接的ConnectionContext
    context.reset();}

以全局事务的方式提交本地事务会做四件事:

  1. 通过netty请求TC,注册分支事务,并检查、增加全局行锁; - 如果出现异常,则回滚本地事务。若异常类型为全局锁冲突LockConflictException,则进入重试策略;其他异常类型则直接抛出SQLException
  2. 将执行SQL时保存到ConnectionContext中的undo log 回滚日志 保存到DB;
  3. 提交本地事务,真正将业务数据和回滚日志 持久化到DB;
  4. 向TC上报本地事务提交结果; - 如果持久化undo log 或 提交本地事务出现异常,则上报分支事务执行失败;- 如果本地事务提交成功,上报分支事务执行成功;默认并不会上报。

最后,清空当前数据库连接的ConnectionContext。

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html);

  • 分支事务的注册细节见下一篇文章(【微服务41】分布式事务Seata源码解析九:分支事务如何注册到全局事务);
  • undo log持久化细节 见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志);

三、总结

AT模式下本地事务的SQL执行流程,即RM的分支事务执行流程,主要包括一下几步:

  • 开始执行本地事务的SQL之前,从全局事务上下文RootContext中获取到xid,然后将xid绑定到数据库连接的上下文ConnectionContext中;
  1. 构建before image,查询目标sql执行前的数据快照;
  2. 执行目标SQL语句,但是本地事务还没有提交;
  3. 构建after image,查询目标SQL执行之后的数据快照;
  4. 将before image 和 after image合并作为回滚日志undo log,保存到当前数据库连接上下文ConnectionContext中;
  5. 构建当前本地事务要占用所有全局锁key信息,然后将其保存到当前数据库连接上下文ConnectionContext中;
  6. 通过netty请求TC,注册分支事务,并检查、增加全局行锁;这里可能会出现全局锁冲突 导致注册分支事务失败,所以有一个重试机制;
  7. 将执行SQL时保存到ConnectionContext中的undo log 回滚日志 保存到DB(undo_log表);
  8. 提交本地事务;
  9. 向TC上报本地事务提交结果;
  10. 最后清空当前数据库连接的ConnectionContext,恢复现场。

整个SQL提交可以理解为两阶段提交:

  • 一阶段:先注册分支事务,检查全局锁。
  • 二阶段:插入undolog、提交本地事务。

本文转载自: https://blog.csdn.net/Saintmm/article/details/127292437
版权归原作者 秃秃爱健身 所有, 如有侵权,请联系我们删除。

“【微服务40】分布式事务Seata源码解析八:AT模式下本地事务的执行流程”的评论:

还没有评论