文章目录
一、前言
至此,seata系列的内容包括:
- can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
- Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
- Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
- 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
- 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
- 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
- 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
- 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
- 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
- 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
- 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
- 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
Seata最核心的全局事务执行流程,前面我们已经聊到了Seata全局事务的开启,本文接着聊Seata全局事务中执行具体业务操作时,DB操作是如何执行的(含:全局锁keys、undologs的构建(AT模式))?
二、本地事务SQL执行流程
全局事务的整体执行流程体现在TransactionalTemplate#execute()方法中:
具体代码 和 注释:
publicObjectexecute(TransactionalExecutor business)throwsThrowable{// 1. Get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if(txInfo ==null){thrownewShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.// 获取当前事务,根据ThreadLocal,获取当前线程本地变量副本中的xid,进而判断是否存在一个全局事务// 刚开始一个全局事务时,肯定是没有全局事务的GlobalTransaction tx =GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.// 从全局事务的配置里 获取事务传播级别,默认是REQUIRED(如果存在则加入,否则开启一个新的)Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder =null;try{// 根据事务的隔离级别做不同的处理switch(propagation){case NOT_SUPPORTED:// If transaction is existing, suspend it.if(existingTransaction(tx)){// 事务存在,则挂起事务(默认将xid从RootContext中移除)
suspendedResourcesHolder = tx.suspend();}// Execute without transaction and return.return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.if(existingTransaction(tx)){
suspendedResourcesHolder = tx.suspend();
tx =GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.if(notExistingTransaction(tx)){return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.if(existingTransaction(tx)){thrownewTransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));}else{// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.if(notExistingTransaction(tx)){thrownewTransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:thrownewTransactionException("Not Supported Propagation:"+ propagation);}// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.if(tx ==null){// 创建全局事务(角色为事务发起者),并关联全局事务管理器
tx =GlobalTransactionContext.createNew();}// set current tx config to holderGlobalLockConfig previousConfig =replaceGlobalLockConfig(txInfo);try{// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,// else do nothing. Of course, the hooks will still be triggered.// 开启全局事务,如果事务角色是'GlobalTransactionRole.Launcher',发送开始事务请求到seata-server(TC)beginTransaction(txInfo, tx);Object rs;try{// Do Your Business// 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;// 每个分支事务都会去运行
rs = business.execute();}catch(Throwable ex){// 3. The needed business exception to rollback.// 如果全局事务执行发生了异常,则回滚;completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.// 全局事务和分支事务运行无误,提交事务;commitTransaction(tx);return rs;}finally{//5. clear// 全局事务完成之后做一些清理工作resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}}finally{// If the transaction is suspended, resume it.if(suspendedResourcesHolder !=null){// 如果有挂起的全局事务,则恢复全局事务
tx.resume(suspendedResourcesHolder);}}}
在前一篇文章: 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务,我们已经聊到了开启全局事务,本文继续聊开启全局事务之后,本地事务中的SQL执行流程。
1、DataSourceProxy 数据库资源代理入口
在Spring Cloud 整合Seata实现分布式事务一文中有聊到Spring Cloud 集成Seata 的AT模式,需要写一个配置类
DataSourceConfig
,其中会注入一个Bean(
DataSourceProxy
):
到这里,博主有一个问题:注入DataSourceProxy到Spring容器中之后,哪里会用到它?执行数据增删改查时如何切换到代理数据源?
1)哪里使用了DataSourceProxy?
从源码来看,有一个Spring AOP抽象类
AbstractAutoProxyCreator
的子类
SeataAutoDataSourceProxyCreator
;
Spring 通过 AbstractAutoProxyCreator来创建 AOP 代理,其实现了BeanPostProcessor 接口,在 bean 初始化完成之后会创建它的代理,让后将代理对象增加到Spring容器。
在Seata 中,SeataAutoDataSourceProxyCreator的主要作用是为数据源
DataSource
添加
Advisor
,当数据源执行操作时,便会进入到
SeataAutoDataSourceProxyAdvice
类中处理;
因此,当数据源执行CRUD操作时,由于添加了AOP代理,会进入到SeataAutoDataSourceProxyAdvice#invoke()方法中:
咦,这里没有DataSourceProxy呀,只有SeataDataSourceProxy,从命名来看,这俩类总感觉有点关系!
2)SeataDataSourceProxy
从DataSourceProxy类的继承结构来看,DataSourceProxy实现了
SeataDataSourceProxy
接口;**因此
SeataAutoDataSourceProxyAdvice#invoke()方法中
动态代理类实际就是
DataSourceProxy
。**
2、本地事务SQL的执行流程(execute)
1)执行本地事务SQL的入口
JDBC的执行流程:
- 第一步:注册驱动;
- 第二步:获取与数据库的连接Connection;
- 第三步:获取数据库操作对象Statement;
- 第四步:执行sql语句(DQL、DML…),并且返回结果集;
- 第五步:处理查询结果集;
- 第六步:释放资源、关闭连接;
try{//加载数据库驱动Class.forName("com.mysql.cj.jdbc.Driver");}catch(ClassNotFoundException e){// do something}Connection conn =DriverManager.getConnection(URL, USER_NAME, PASSWORD);PreparedStatement pst = conn.prepareStatement("update user set name=? where id = ?");
pst.setString(1,"bobDog");
pst.setInt(2,1);int updateRes = pst.executeUpdate();if(updateRes >0){System.out.println("更新成功!");}
Seata代理的数据库资源DataSource底层也是JDBC操作数据库,所以也需要先获取数据库连接Connection、再根据数据库连接获取数据库操作对象Statement、接着再通过Statement#execute()执行SQL。在Seata中的表现为:
- 先获取seata代理的数据库连接ConnectionProxy;
- 再根据ConnectionProxy获取一个数据库操作对象
StatementProxy
或PreparedStatementProxy
;- 然后再利用数据库操作对象
StatementProxy
或PreparedStatementProxy
的execute() 或 executeUpdate() 方法执行SQL语句。
StatementProxy
或
PreparedStatementProxy
增强了所有的execute方法,由ExecuteTemplate选择需要的Executor执行来sql。
**下面以常见的更新操作(
PreparedStatementProxy#executeUpdate()
)为例:**
ExecuteTemplate#execute()
重载方法调用链路如下:
publicstatic<T,SextendsStatement>Texecute(List<SQLRecognizer> sqlRecognizers,StatementProxy<S> statementProxy,StatementCallback<T,S> statementCallback,Object... args)throwsSQLException{// 没获取到全局锁,并且事务模式不是ATif(!RootContext.requireGlobalLock()&&BranchType.AT !=RootContext.getBranchType()){// Just work as original statementreturn statementCallback.execute(statementProxy.getTargetStatement(), args);}// 获取DB的类型String dbType = statementProxy.getConnectionProxy().getDbType();if(CollectionUtils.isEmpty(sqlRecognizers)){
sqlRecognizers =SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);}Executor<T> executor;if(CollectionUtils.isEmpty(sqlRecognizers)){
executor =newPlainExecutor<>(statementProxy, statementCallback);}else{if(sqlRecognizers.size()==1){SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);// 数据库操作类型switch(sqlRecognizer.getSQLType()){case INSERT:
executor =EnhancedServiceLoader.load(InsertExecutor.class, dbType,newClass[]{StatementProxy.class,StatementCallback.class,SQLRecognizer.class},newObject[]{statementProxy, statementCallback, sqlRecognizer});break;case UPDATE:
executor =newUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case DELETE:
executor =newDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case SELECT_FOR_UPDATE:
executor =newSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case INSERT_ON_DUPLICATE_UPDATE:switch(dbType){caseJdbcConstants.MYSQL:caseJdbcConstants.MARIADB:
executor =newMySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;default:thrownewNotSupportYetException(dbType +" not support to INSERT_ON_DUPLICATE_UPDATE");}break;default:
executor =newPlainExecutor<>(statementProxy, statementCallback);break;}}else{
executor =newMultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);}}T rs;try{// 通过Executor真正的执行
rs = executor.execute(args);}catch(Throwable ex){if(!(ex instanceofSQLException)){// Turn other exception into SQLException
ex =newSQLException(ex);}throw(SQLException) ex;}return rs;}
- 如果当前事务不需要获取全局锁,并且不是AT模式,则以original statement的方式执行。默认Seata Client层面不需要获取全局锁,事务模式是AT模式。
- 获取到的DB类型,比如MySQL、Oracle…,博主的项目DBType是MYSQL。
- 获取SQL DML类型,并根据DML类型,选择不同的Executor。这里可以看做是策略模式。
因为示例是Update类型,所以最终选择的Executor是UpdateExecutor。
2)执行本地事务SQL逻辑
UpdateExecutor#execute()方法中会执行本地事务SQL,UpdateExecutor的类继承图如下:
除了数据更新前后的Image构造体现在UpdateExecutor类的方法中,其余方法均在其父类
BaseTransactionalExecutor
中,包括execute()方法。
@OverridepublicTexecute(Object... args)throwsThrowable{// 从全局事务上下文中获取xidString xid =RootContext.getXID();if(xid !=null){// 将xid绑定到ConnectionContext中,后续提交本地事务时会用到
statementProxy.getConnectionProxy().bind(xid);}// RootContext.requireGlobalLock()检查是否需要全局锁,默认不需要
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());returndoExecute(args);}
开始执行本地事务SQL时:
- 首先从全局事务上下文RootContext中获取到xid,如果存在全局事务xid,则将xid绑定到数据库连接的上下文ConnectionContext中;
- 从全局事务上下文RootContext获取是否全局锁标识,默认不需要;如果需要获取全局锁,则将数据库连接上下文ConnectionContext中的
isGlobalLockRequire
设置为true;- 调用doExecute()方法真正开始执行SQL;
UpdateExecutor#doExecutor()方法:
开启了全局事务之后,DML语句的本地事务不会自动提交。
**即使自动提交没有关闭,
AbstractDMLBaseExecutor#doExecute(Object… args)
方法中也会先将其关闭,然后再以非自动提交的方式执行SQL,走ConnectionProxy提交本地事务,然后再将自动提交设置为true;这一块逻辑体现在
executeAutoCommitTrue()
方法中:**
protectedTexecuteAutoCommitTrue(Object[] args)throwsThrowable{ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try{
connectionProxy.changeAutoCommit();returnnewLockRetryPolicy(connectionProxy).execute(()->{T result =executeAutoCommitFalse(args);
connectionProxy.commit();return result;});}catch(Exception e){// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);if(!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()){
connectionProxy.getTargetConnection().rollback();}throw e;}finally{
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);}}
正常情况下都是直接以非自动提交的方式执行,即执行executeAutoCommitFalse()方法:
protectedTexecuteAutoCommitFalse(Object[] args)throwsException{if(!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType())&&isMultiPk()){thrownewNotSupportYetException("multi pk only support mysql!");}// 根据SQL语句构建before image,目标SQL执行之前的数据镜像:从数据库根据ID主键等信息查询出更新前的数据;TableRecords beforeImage =beforeImage();// 真正的去执行SQL语句,但是本地事务还没有提交T result = statementCallback.execute(statementProxy.getTargetStatement(), args);int updateCount = statementProxy.getUpdateCount();if(updateCount >0){// 目标SQL执行之后的数据镜像:从数据库根据ID主键等信息查询出更新后的数据;TableRecords afterImage =afterImage(beforeImage);// 准备好undo log数据prepareUndoLog(beforeImage, afterImage);}return result;}
由于AbstractDMLBaseExecutor提供了公用的
executeAutoCommitFalse()
给Insert、Delete、Update类型的Executor使用,所以无论是Insert、Delete还是Update操作都会走
AbstractDMLBaseExecutor#executeAutoCommitFalse()
方法执行SQL。不过MySQL的
MySQLInsertOrUpdateExecutor
是个个例,其执行SQL的逻辑由自己实现(有兴趣可以自己看一下MySQLInsertOrUpdateExecutor)。
以非自动提交执行SQL的流程如下:
beforeImage()
– 根据SQL语句构建before image,查询目标sql执行前的数据快照; - Update、Delete操作从数据库根据ID主键等信息查询出更新前的数据;- Insert操作直接返回空的TableRecords,其中只包含TableMeta,没有数据记录;- 执行SQL语句,但是本地事务还没有提交;
afterImage()
– 构建after image,查询目标SQL执行之后的数据快照; - Insert、Update操作从数据库根据ID主键等信息查询出更新后的数据;- Delete操作直接返回空的TableRecords,其中只包含TableMeta,没有数据记录;prepareUndoLog(beforeImage, afterImage)
--> 将before image 和 after image合并作为回滚日志undo log,保存到当前数据库连接上下文ConnectionContext中。其中还包括构建当前本地事务要占用所有全局锁key信息,然后将其保存到当前数据库连接上下文ConnectionContext中。
下面就这几步展开看一看;
1> 构建before image
此处依旧以Update为例:
点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)
2> 执行SQL
最终使用源Statement执行SQL;
3> 构建after image
执行完SQL之后,再构建SQL查询出当前最新的数据记录作为after image;
点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)
4> 预处理undo log
将before image 和 after image合并作为回滚日志undo log,存储到当前数据库连接上下文ConnectionContext中。
protectedvoidprepareUndoLog(TableRecords beforeImage,TableRecords afterImage)throwsSQLException{if(beforeImage.getRows().isEmpty()&& afterImage.getRows().isEmpty()){return;}if(SQLType.UPDATE == sqlRecognizer.getSQLType()){if(beforeImage.getRows().size()!= afterImage.getRows().size()){thrownewShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");}}ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();TableRecords lockKeyRecords = sqlRecognizer.getSQLType()==SQLType.DELETE ? beforeImage : afterImage;// 1、构建全局锁key信息,针对更新的一批数据主键ID构建这批数据的全局锁key// 例如:table_name:id_1101String lockKeys =buildLockKey(lockKeyRecords);if(null!= lockKeys){// 将lockKeys信息保存到ConnectionContext中,在注册分支事务时,再将全局锁信息放入到TC中进行检查、存储
connectionProxy.appendLockKey(lockKeys);// 2、构建undo logSQLUndoLog sqlUndoLog =buildUndoItem(beforeImage, afterImage);// 将undo log信息保存到ConnectionContext中
connectionProxy.appendUndoLog(sqlUndoLog);}}
点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)
由于关闭了AutoCommit,所以在Statement.execute()执行完SQL之后,需要“手动”提交本地事务。
3、本地事务SQL的提交(commit)
回到ConnectionProxy#commit()方法,这里是“手动”提交本地事务的入口;
@Overridepublicvoidcommit()throwsSQLException{try{// 由LockRetryPolicy负责提交事务,LockRetryPolicy中包含全局锁的概念,支持retry重试策略
lockRetryPolicy.execute(()->{doCommit();returnnull;});}catch(SQLException e){if(targetConnection !=null&&!getAutoCommit()&&!getContext().isAutoCommitChanged()){rollback();}throw e;}catch(Exception e){thrownewSQLException(e);}}
本地事务的提交又会委托给LockRetryPolicy的execute方法来执行;
1)LockRetryPolicy重试机制
LockRetryPolicy是ConnectionProxy的静态内部类,其中包含了全局锁的概念,支持retry策略,当出现全局锁冲突时支持多次重试获取全局锁。
默认情况下execute()方法中:
LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT
为TRUE,可以通过配置client.rm.lock.retryPolicyBranchRollbackOnConflict=false
属性改变;connection.getContext().isAutoCommitChanged()
为FALSE;
所以默认情况下,都会走重试获取全局锁的逻辑:
doRetryOnLockConflict()
方法。(当然可以选择开启自动提交事务、并设置属性
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
,这样便不会走重试获取全局锁逻辑。)
protected<T>TdoRetryOnLockConflict(Callable<T> callable)throwsException{LockRetryController lockRetryController =newLockRetryController();while(true){try{return callable.call();}catch(LockConflictException lockConflict){// 出现全局锁冲突,回滚本地事务onException(lockConflict);// AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is releasedif(connection.getContext().isAutoCommitChanged()&& lockConflict.getCode()==TransactionExceptionCode.LockKeyConflictFailFast){
lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);}// 线程睡眠10ms,然后再重试,超过重试次数,抛出异常结束流程
lockRetryController.sleep(lockConflict);}catch(Exception e){// 出现非全局锁冲突的异常,则直接报错返回onException(e);throw e;}}}
在
doRetryOnLockConflict()
方法中:
- 如果因为全局锁冲突导致提交本地事务失败,先回滚本地事务,然后会判断重试次数(
lockRetryTimes
,默认30次)再进行重试,重试之前会让线程睡眠一段时间(lockRetryInterval
,默认10ms)。如果重试次数已经够了,则直接抛出异常结束流程。 - 如果因为其他异常(包括超过重试次数)导致提交本地事务失败,直接回滚本地事务、抛出异常结束流程。
**上面的
ConnectionProxy#onException()
方法中负责回滚本地事务、清理当前连接的ConnectionContext中的undo log信息、全局锁keys信息;**
了解完了全局锁冲突引起的重试机制,下面接着看本地事务的提交流程。
2)本地事务提交流程
LockRetryPolicy#execute()方法中会运行方法的入参Callable,在ConnectionProxy#commit()方法中传入的到LockRetryPolicy#execute()方法中的Callable为:
()->{doCommit();returnnull;}
doCommit()方法:
privatevoiddoCommit()throwsSQLException{// 当前DML操作在全局事务中时,判定条件:ConnectionContext中包含xidif(context.inGlobalTransaction()){processGlobalTransactionCommit();}elseif(context.isGlobalLockRequire()){// 如果使用了@GlobalLock,需要获取全局锁processLocalCommitWithGlobalLocks();}else{// 不在分布式事务中,则以原生connection提交本地事务
targetConnection.commit();}}
doCommit()方法中分三种情况进行不同的处理:
- 如果当前DML操作在全局事务中,即:当前连接的ConnectionContext中包含xid,则以处理全局事务方式(
processGlobalTransactionCommit(
)提交本地事务;- 如果使用了@GlobalLock,需要获取全局锁,再以原生connection提交本地事务;
- 否则如果事务不在分布式事务中,则以原生connection提交本地事务;
正常我们使用分布式事务,一般肯定是要以全局事务的方式执行DML操作;即:默认会进入到processGlobalTransactionCommit():
privatevoidprocessGlobalTransactionCommit()throwsSQLException{try{// 向远程的TC中注册分支事务,并检查、增加全局行锁register();}catch(TransactionException e){// 出现异常时,回滚本地事务 再重试。// 大多数情况是因为全局锁冲突走到这里。recognizeLockKeyConflictException(e, context.buildLockKeys());}try{// 回滚日志管理组件,持久化undo logUndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);// 提交本地事务
targetConnection.commit();}catch(Throwable ex){
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);// 上报分支事务执行失败,用于监控report(false);thrownewSQLException(ex);}// 上报分支事务执行成功,默认不会上报if(IS_REPORT_SUCCESS_ENABLE){report(true);}// 重置连接的ConnectionContext
context.reset();}
以全局事务的方式提交本地事务会做四件事:
- 通过netty请求TC,注册分支事务,并检查、增加全局行锁; - 如果出现异常,则回滚本地事务。若异常类型为全局锁冲突
LockConflictException
,则进入重试策略;其他异常类型则直接抛出SQLException
;- 将执行SQL时保存到ConnectionContext中的undo log 回滚日志 保存到DB;
- 提交本地事务,真正将业务数据和回滚日志 持久化到DB;
- 向TC上报本地事务提交结果; - 如果持久化undo log 或 提交本地事务出现异常,则上报分支事务执行失败;- 如果本地事务提交成功,上报分支事务执行成功;默认并不会上报。
最后,清空当前数据库连接的ConnectionContext。
点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html);
- 分支事务的注册细节见下一篇文章(【微服务41】分布式事务Seata源码解析九:分支事务如何注册到全局事务);
- undo log持久化细节 见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志);
三、总结
AT模式下本地事务的SQL执行流程,即RM的分支事务执行流程,主要包括一下几步:
- 开始执行本地事务的SQL之前,从全局事务上下文RootContext中获取到xid,然后将xid绑定到数据库连接的上下文ConnectionContext中;
- 构建before image,查询目标sql执行前的数据快照;
- 执行目标SQL语句,但是本地事务还没有提交;
- 构建after image,查询目标SQL执行之后的数据快照;
- 将before image 和 after image合并作为回滚日志undo log,保存到当前数据库连接上下文ConnectionContext中;
- 构建当前本地事务要占用所有全局锁key信息,然后将其保存到当前数据库连接上下文ConnectionContext中;
- 通过netty请求TC,注册分支事务,并检查、增加全局行锁;这里可能会出现全局锁冲突 导致注册分支事务失败,所以有一个重试机制;
- 将执行SQL时保存到ConnectionContext中的undo log 回滚日志 保存到DB(undo_log表);
- 提交本地事务;
- 向TC上报本地事务提交结果;
- 最后清空当前数据库连接的ConnectionContext,恢复现场。
整个SQL提交可以理解为两阶段提交:
- 一阶段:先注册分支事务,检查全局锁。
- 二阶段:插入undolog、提交本地事务。
版权归原作者 秃秃爱健身 所有, 如有侵权,请联系我们删除。