- 转载自芋道源码
文章目录
一、AOP实现步骤
一句话:使用自定义注解(切点)+interceptor(增强Advice)构成织入。
1.定义注解 DSTransactional
代码如下(示例):
ipackage com.baomidou.dynamic.datasource.annotation;importjava.lang.annotation.*;/**
* multi data source transaction
*
* @author funkye
*/@Target({ElementType.TYPE,ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic@interfaceDSTransactional{}
2.定义拦截器(增强)DynamicLocalTransactionAdvisor
代码如下(示例):
packagecom.baomidou.dynamic.datasource.aop;importcom.baomidou.dynamic.datasource.tx.ConnectionFactory;importcom.baomidou.dynamic.datasource.tx.TransactionContext;importlombok.extern.slf4j.Slf4j;importorg.aopalliance.intercept.MethodInterceptor;importorg.aopalliance.intercept.MethodInvocation;importorg.springframework.util.StringUtils;importjava.util.UUID;/**
* @author funkye
*/@Slf4jpublicclassDynamicLocalTransactionAdvisorimplementsMethodInterceptor{@OverridepublicObjectinvoke(MethodInvocation methodInvocation)throwsThrowable{if(!StringUtils.isEmpty(TransactionContext.getXID())){return methodInvocation.proceed();}boolean state =true;Object o;String xid =UUID.randomUUID().toString();TransactionContext.bind(xid);try{
o = methodInvocation.proceed();}catch(Exception e){
state =false;throw e;}finally{ConnectionFactory.notify(state);TransactionContext.remove();}return o;}}
3.AspectJ定义注解为切点,并配置织入(关键代码)
publicclassDynamicDataSourceAutoConfigurationimplementsInitializingBean{@Role(value =BeanDefinition.ROLE_INFRASTRUCTURE)@ConditionalOnProperty(prefix =DynamicDataSourceProperties.PREFIX, name ="seata", havingValue ="false", matchIfMissing =true)@BeanpublicAdvisordynamicTransactionAdvisor(){AspectJExpressionPointcut pointcut =newAspectJExpressionPointcut();
pointcut.setExpression("@annotation(com.baomidou.dynamic.datasource.annotation.DSTransactional)");returnnewDefaultPointcutAdvisor(pointcut,newDynamicLocalTransactionAdvisor());}}
二.connection代理实现
Spring自带事务@Transactional的实现在一个事务里,只能有一个数据库connection,在动态多数据源里的现象就是只有第一个数据源,后面切的都失效了。
所以要想实现动态多数据源下的统一提交和回滚,就不能用Spring自带的。
PS:在很多数据库相关项目里,connection这个词是有歧义的,可能有的含义包括:事务、会话、数据库连接。
spring自带事务明显默认了一个事务会话就是一个数据库链接这种老思想。
1.从增强实现开始
从DynamicLocalTransactionAdvisor增强的invoke方法来看具体逻辑:
首先用到了TransactionContext,一个基于ThreadLocal的账本,记录了当前事务的xid。看下面代码注释。
publicclassDynamicLocalTransactionAdvisorimplementsMethodInterceptor{@OverridepublicObjectinvoke(MethodInvocation methodInvocation)throwsThrowable{// 1-1. 如果有xid,直接反射调用原方法,说明会话已经创建。if(!StringUtils.isEmpty(TransactionContext.getXID())){return methodInvocation.proceed();}// 1-2. 如果没有xid,说明新会话,首先生成xid,绑到上下文上。boolean state =true;Object o;String xid =UUID.randomUUID().toString();TransactionContext.bind(xid);try{
o = methodInvocation.proceed();}catch(Exception e){// 1-3. 执行原方法,如果有异常,修改状态为false
state =false;throw e;}finally{// 1-4. 调用会话的notify方法,处理状态ConnectionFactory.notify(state);// 1-5. 删除会话上下文TransactionContext.remove();}return o;}}
2. TransactionContext 事务上下文
publicclassTransactionContext{//记录了当前事务的xidprivatestaticfinalThreadLocal<String>CONTEXT_HOLDER=newThreadLocal<>();/**
* Gets xid.
*
* @return the xid
*/publicstaticStringgetXID(){String xid =CONTEXT_HOLDER.get();if(!StringUtils.isEmpty(xid)){return xid;}returnnull;}/**
* Unbind string.
*
* @return the string
*/publicstaticStringunbind(String xid){CONTEXT_HOLDER.remove();return xid;}/**
* bind string.
*
* @return the string
*/publicstaticStringbind(String xid){CONTEXT_HOLDER.set(xid);return xid;}/**
* remove
*/publicstaticvoidremove(){CONTEXT_HOLDER.remove();}}
3. ConnectionFactory 会话工厂(代表会话)
再来看1-4用到的ConnectionFactory,也是一个基于ThreadLocal的账本,记录了该【会话】中用到的所有【连接】。
publicclassConnectionFactory{privatestaticfinalThreadLocal<Map<String,ConnectionProxy>>CONNECTION_HOLDER=newThreadLocal<Map<String,ConnectionProxy>>(){@OverrideprotectedMap<String,ConnectionProxy>initialValue(){returnnewConcurrentHashMap<>();}};// 3-1:将【数据库连接】存到会话的CONNECTION_HOLDER中publicstaticvoidputConnection(String ds,ConnectionProxy connection){Map<String,ConnectionProxy> concurrentHashMap =CONNECTION_HOLDER.get();if(!concurrentHashMap.containsKey(ds)){try{// 3-2:禁用了自动提交,相当于先执行数据库操作,但暂停了commit。等待3-4循环批量处理// 注意,这个connection是proxy,也就是真正的数据库连接,不是会话
connection.setAutoCommit(false);}catch(SQLException e){
e.printStackTrace();}// 把新的连接放入Map
concurrentHashMap.put(ds, connection);}}publicstaticConnectionProxygetConnection(String ds){returnCONNECTION_HOLDER.get().get(ds);}// 3-3:前面1-4调用的方法:publicstaticvoidnotify(Boolean state){try{Map<String,ConnectionProxy> concurrentHashMap =CONNECTION_HOLDER.get();for(ConnectionProxy connectionProxy : concurrentHashMap.values()){// 3-4:循环调用了所有数据库连接的notify方法。有一个false就都rollback了。
connectionProxy.notify(state);}}finally{// 3-5:会话结束,删除数据库连接账本CONNECTION_HOLDER.remove();}}}
4. ConnectionProxy 数据库连接代理
数据库连接
publicclassConnectionProxyimplementsConnection{privateConnection connection;privateString ds;publicConnectionProxy(Connection connection,String ds){this.connection = connection;this.ds = ds;}// 4-1:前面303调用的方法publicvoidnotify(Boolean commit){try{if(commit){
connection.commit();// 状态为true,则提交}else{
connection.rollback();// 状态为false,则提交}
connection.close();}catch(Exception e){
log.error(e.getLocalizedMessage(), e);}}@Overridepublicvoidcommit()throwsSQLException{// connection.commit();}// ....略}
5. AbstractRoutingDataSource 的改造:让动态数据库连接获取connection时登记到会话账本
publicabstractclassAbstractRoutingDataSourceextendsAbstractDataSource{protectedabstractDataSourcedetermineDataSource();@OverridepublicConnectiongetConnection()throwsSQLException{String xid =TransactionContext.getXID();if(StringUtils.isEmpty(xid)){// 非DSTransactionreturndetermineDataSource().getConnection();}else{// DSTransactionString ds =DynamicDataSourceContextHolder.peek();ConnectionProxy connection =ConnectionFactory.getConnection(ds);return connection ==null?getConnectionProxy(ds,determineDataSource().getConnection()): connection;}}@OverridepublicConnectiongetConnection(String username,String password)throwsSQLException{String xid =TransactionContext.getXID();if(StringUtils.isEmpty(xid)){// 非DSTransactionreturndetermineDataSource().getConnection(username, password);}else{// DSTransactionString ds =DynamicDataSourceContextHolder.peek();ConnectionProxy connection =ConnectionFactory.getConnection(ds);return connection ==null?getConnectionProxy(ds,determineDataSource().getConnection(username, password)): connection;}}privateConnectiongetConnectionProxy(String ds,Connection connection){ConnectionProxy connectionProxy =newConnectionProxy(connection, ds);// 调用了3-1ConnectionFactory.putConnection(ds, connectionProxy);return connectionProxy;}// ... 略}
使用注意
1.不可与Transactional混用
2.目前只支持统一提交和回滚,更复杂的请用seata
3.3.4版本之前不加@DS注解会报错
版权归原作者 芝士小王 所有, 如有侵权,请联系我们删除。