0


@DSTransactional注解原理

  • 转载自芋道源码

文章目录


一、AOP实现步骤

一句话:使用自定义注解(切点)+interceptor(增强Advice)构成织入。

1.定义注解 DSTransactional

代码如下(示例):

ipackage com.baomidou.dynamic.datasource.annotation;importjava.lang.annotation.*;/**
 * multi data source transaction
 *
 * @author funkye
 */@Target({ElementType.TYPE,ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic@interfaceDSTransactional{}

2.定义拦截器(增强)DynamicLocalTransactionAdvisor

代码如下(示例):

packagecom.baomidou.dynamic.datasource.aop;importcom.baomidou.dynamic.datasource.tx.ConnectionFactory;importcom.baomidou.dynamic.datasource.tx.TransactionContext;importlombok.extern.slf4j.Slf4j;importorg.aopalliance.intercept.MethodInterceptor;importorg.aopalliance.intercept.MethodInvocation;importorg.springframework.util.StringUtils;importjava.util.UUID;/**
 * @author funkye
 */@Slf4jpublicclassDynamicLocalTransactionAdvisorimplementsMethodInterceptor{@OverridepublicObjectinvoke(MethodInvocation methodInvocation)throwsThrowable{if(!StringUtils.isEmpty(TransactionContext.getXID())){return methodInvocation.proceed();}boolean state =true;Object o;String xid =UUID.randomUUID().toString();TransactionContext.bind(xid);try{
            o = methodInvocation.proceed();}catch(Exception e){
            state =false;throw e;}finally{ConnectionFactory.notify(state);TransactionContext.remove();}return o;}}

3.AspectJ定义注解为切点,并配置织入(关键代码)

publicclassDynamicDataSourceAutoConfigurationimplementsInitializingBean{@Role(value =BeanDefinition.ROLE_INFRASTRUCTURE)@ConditionalOnProperty(prefix =DynamicDataSourceProperties.PREFIX, name ="seata", havingValue ="false", matchIfMissing =true)@BeanpublicAdvisordynamicTransactionAdvisor(){AspectJExpressionPointcut pointcut =newAspectJExpressionPointcut();
        pointcut.setExpression("@annotation(com.baomidou.dynamic.datasource.annotation.DSTransactional)");returnnewDefaultPointcutAdvisor(pointcut,newDynamicLocalTransactionAdvisor());}}

二.connection代理实现

Spring自带事务@Transactional的实现在一个事务里,只能有一个数据库connection,在动态多数据源里的现象就是只有第一个数据源,后面切的都失效了。
所以要想实现动态多数据源下的统一提交和回滚,就不能用Spring自带的。

PS:在很多数据库相关项目里,connection这个词是有歧义的,可能有的含义包括:事务、会话、数据库连接。
spring自带事务明显默认了一个事务会话就是一个数据库链接这种老思想。

1.从增强实现开始

从DynamicLocalTransactionAdvisor增强的invoke方法来看具体逻辑:
首先用到了TransactionContext,一个基于ThreadLocal的账本,记录了当前事务的xid。看下面代码注释。

publicclassDynamicLocalTransactionAdvisorimplementsMethodInterceptor{@OverridepublicObjectinvoke(MethodInvocation methodInvocation)throwsThrowable{// 1-1. 如果有xid,直接反射调用原方法,说明会话已经创建。if(!StringUtils.isEmpty(TransactionContext.getXID())){return methodInvocation.proceed();}// 1-2. 如果没有xid,说明新会话,首先生成xid,绑到上下文上。boolean state =true;Object o;String xid =UUID.randomUUID().toString();TransactionContext.bind(xid);try{
            o = methodInvocation.proceed();}catch(Exception e){// 1-3. 执行原方法,如果有异常,修改状态为false
            state =false;throw e;}finally{// 1-4. 调用会话的notify方法,处理状态ConnectionFactory.notify(state);// 1-5. 删除会话上下文TransactionContext.remove();}return o;}}

2. TransactionContext 事务上下文

publicclassTransactionContext{//记录了当前事务的xidprivatestaticfinalThreadLocal<String>CONTEXT_HOLDER=newThreadLocal<>();/**
     * Gets xid.
     *
     * @return the xid
     */publicstaticStringgetXID(){String xid =CONTEXT_HOLDER.get();if(!StringUtils.isEmpty(xid)){return xid;}returnnull;}/**
     * Unbind string.
     *
     * @return the string
     */publicstaticStringunbind(String xid){CONTEXT_HOLDER.remove();return xid;}/**
     * bind string.
     *
     * @return the string
     */publicstaticStringbind(String xid){CONTEXT_HOLDER.set(xid);return xid;}/**
     * remove
     */publicstaticvoidremove(){CONTEXT_HOLDER.remove();}}

3. ConnectionFactory 会话工厂(代表会话)

再来看1-4用到的ConnectionFactory,也是一个基于ThreadLocal的账本,记录了该【会话】中用到的所有【连接】。

publicclassConnectionFactory{privatestaticfinalThreadLocal<Map<String,ConnectionProxy>>CONNECTION_HOLDER=newThreadLocal<Map<String,ConnectionProxy>>(){@OverrideprotectedMap<String,ConnectionProxy>initialValue(){returnnewConcurrentHashMap<>();}};// 3-1:将【数据库连接】存到会话的CONNECTION_HOLDER中publicstaticvoidputConnection(String ds,ConnectionProxy connection){Map<String,ConnectionProxy> concurrentHashMap =CONNECTION_HOLDER.get();if(!concurrentHashMap.containsKey(ds)){try{// 3-2:禁用了自动提交,相当于先执行数据库操作,但暂停了commit。等待3-4循环批量处理// 注意,这个connection是proxy,也就是真正的数据库连接,不是会话
                connection.setAutoCommit(false);}catch(SQLException e){
                e.printStackTrace();}// 把新的连接放入Map
            concurrentHashMap.put(ds, connection);}}publicstaticConnectionProxygetConnection(String ds){returnCONNECTION_HOLDER.get().get(ds);}// 3-3:前面1-4调用的方法:publicstaticvoidnotify(Boolean state){try{Map<String,ConnectionProxy> concurrentHashMap =CONNECTION_HOLDER.get();for(ConnectionProxy connectionProxy : concurrentHashMap.values()){// 3-4:循环调用了所有数据库连接的notify方法。有一个false就都rollback了。
                connectionProxy.notify(state);}}finally{// 3-5:会话结束,删除数据库连接账本CONNECTION_HOLDER.remove();}}}

4. ConnectionProxy 数据库连接代理

数据库连接

publicclassConnectionProxyimplementsConnection{privateConnection connection;privateString ds;publicConnectionProxy(Connection connection,String ds){this.connection = connection;this.ds = ds;}// 4-1:前面303调用的方法publicvoidnotify(Boolean commit){try{if(commit){
                connection.commit();// 状态为true,则提交}else{
                connection.rollback();// 状态为false,则提交}
            connection.close();}catch(Exception e){
            log.error(e.getLocalizedMessage(), e);}}@Overridepublicvoidcommit()throwsSQLException{// connection.commit();}// ....略}

5. AbstractRoutingDataSource 的改造:让动态数据库连接获取connection时登记到会话账本

publicabstractclassAbstractRoutingDataSourceextendsAbstractDataSource{protectedabstractDataSourcedetermineDataSource();@OverridepublicConnectiongetConnection()throwsSQLException{String xid =TransactionContext.getXID();if(StringUtils.isEmpty(xid)){// 非DSTransactionreturndetermineDataSource().getConnection();}else{// DSTransactionString ds =DynamicDataSourceContextHolder.peek();ConnectionProxy connection =ConnectionFactory.getConnection(ds);return connection ==null?getConnectionProxy(ds,determineDataSource().getConnection()): connection;}}@OverridepublicConnectiongetConnection(String username,String password)throwsSQLException{String xid =TransactionContext.getXID();if(StringUtils.isEmpty(xid)){// 非DSTransactionreturndetermineDataSource().getConnection(username, password);}else{// DSTransactionString ds =DynamicDataSourceContextHolder.peek();ConnectionProxy connection =ConnectionFactory.getConnection(ds);return connection ==null?getConnectionProxy(ds,determineDataSource().getConnection(username, password)): connection;}}privateConnectiongetConnectionProxy(String ds,Connection connection){ConnectionProxy connectionProxy =newConnectionProxy(connection, ds);// 调用了3-1ConnectionFactory.putConnection(ds, connectionProxy);return connectionProxy;}// ... 略}

使用注意

1.不可与Transactional混用
2.目前只支持统一提交和回滚,更复杂的请用seata
3.3.4版本之前不加@DS注解会报错

标签: java spring spring boot

本文转载自: https://blog.csdn.net/wangchengqi1997/article/details/127821521
版权归原作者 芝士小王 所有, 如有侵权,请联系我们删除。

“@DSTransactional注解原理”的评论:

还没有评论