📑需求:多线程插入,(保证原子性,要么都成功,要么都失败)其中一个线程报错,所有线程回滚
为什么做不了?
首先事务的四大特性(ACID)特性都知道吧,面试中张口就来!!!
- 原子性(Atomicity):一个事务要么全部提交成功,要么全部失败回滚,不能只执行其中的一部分操作
- 一致性(Consistency):事务的执行不能破坏数据库数据的完整性和一致性,一个事务在执行之前和执行之后,数据库都必须处于一致性状态。如果数据库系统在运行过程中发生故障,有些事务尚未完成就被迫中断,这些未完成的事务对数据库所作的修改有一部分已写入物理数据库,这是数据库就处于一种不正确的状态,也就是不一致的状态
- 隔离性(Isolation):事务的隔离性是指在并发环境中,并发的事务时相互隔离的,一个事务的执行不能不被其他事务干扰。不同的事务并发操作相同的数据时,每个事务都有各自完成的数据空间,即一个事务内部的操作及使用的数据对其他并发事务时隔离的,并发执行的各个事务之间不能相互干扰。
- 持久性(Durability):一旦事务提交,那么它对数据库中的对应数据的状态的变更就会永久保存到数据库中。即使发生系统崩溃或机器宕机等故障,只要数据库能够重新启动,那么一定能够将其恢复到事务成功结束的状态
有了这四个特性,在回头看多线程事务的概念,隔离性(划重点,要考)
还是没看懂?没关系给你举个荔枝🍒,线程A、B、C三个线程做操作,B出现问题,希望A、C一起回滚,这就是想要的多线程,但是A、B、C三个线程是各跑各的,B出错要带着A、C一起回滚,这就造成了各个事务之前的互相干扰,破坏了隔离性吗!事务的特性里面卡的死死的,所以多线程理论上是行不通的。
说到隔离性。那么,Spring 的源码里面,对于事务的隔离性是如何保证的呢? 答案就是 ThreadLocal。
在事务开启的时候,把当前的链接保存在了 ThreadLocal 里面,从而保证了多线程之间的隔离性,就是每个线程里面都自自玩自己的,我们不可能打破 ThreadLocal 的使用规则,让各个线程共享同一个 ThreadLocal 吧?所以,无论从理论上,还是代码实现上,我都认为这个需求是不能实现的。
但是我在冲浪的时候看了很多博客是用编程式事务,例如100w的数据,开10个线程有一个线程出现异常,需要全部回滚,这边随便找了一个贴出来看看()
@Component
public class SelfTransactionManager {
private TransactionStatus transactionStatus;
//获取事务源
@Autowired
private PlatformTransactionManager platformTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
/**
* 手动开启事务
*/
public TransactionStatus begin() {
transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
return transactionStatus;
}
/**
* 提交事务
*/
public void commit(TransactionStatus transactionStatus) {
platformTransactionManager.commit(transactionStatus);
}
/**
* 回滚事务
*/
public void rollBack() {
platformTransactionManager.rollback(transactionStatus);
}
}
@Service
@AllArgsConstructor
public class BookingInfoServiceImpl implements IBookingInfoService {
private BookingInfoMapper bookingInfoMapper;
private SelfTransactionManager selfTransactionManager;
private TaskExecutor taskExecutor;//自定义的线程池
@Override
@Transactional(rollbackFor=Exception.class)
public boolean test() {
// final InheritableThreadLocal<Boolean> IS_OK = new InheritableThreadLocal<>();
// IS_OK.set(Boolean.TRUE);
AtomicBoolean IS_OK = new AtomicBoolean(true);
//子线程计数器
CountDownLatch childMonitor = new CountDownLatch(1);
//子线程结果集
CopyOnWriteArrayList<Boolean> childResponse = new CopyOnWriteArrayList<>();
//主线程计数器
CountDownLatch mainMonitor = new CountDownLatch(1);
/**
* ================================子线程1=======================================
*/
CompletableFuture.runAsync(() -> {
TransactionStatus transactionStatus = selfTransactionManager.begin();
boolean result = false;
try {
//当前子线程开始插入数据
//---------业务代码----------
BookingInfo bookingInfo = new BookingInfo();
bookingInfo.setCreateTime(DateUtil.date());
bookingInfo.setPhoneNumber("13215592666");
bookingInfo.setNickName("ceshi0");
result = bookingInfoMapper.insertBookingInfo(bookingInfo) < 1 ? false : true;
//---------业务代码----------
//插入结果 true or false
childResponse.add(result);
//执行到这里,等待主线程执行完,因为当前子线程要拿到 IS_OK 的结果,IS_OK的结果由主线程决策
mainMonitor.await();
//根据主线程决策的IS_OK,判断是回滚还是提交
if (IS_OK.get()) {
//提交
selfTransactionManager.commit(transactionStatus);
}else {
//回滚
selfTransactionManager.rollBack();
}
} catch (Exception e) {
e.printStackTrace();
//因为会发生异常 而主线程一直在 while (true) 循环 childResponse的size,这里一定要保证进行add,否则可能会出现死循环
childResponse.add(result);
//异常回滚
selfTransactionManager.rollBack();
} finally {
//子线程执行完计数器-1
childMonitor.countDown();
}
}, taskExecutor);
/**
* ================================主线程========================================
*/
try {
//---------业务代码----------
BookingInfo bookingInfo2 = new BookingInfo();
bookingInfo2.setPhoneNumber("13215592000");
bookingInfo2.setNickName("ceshi2");
bookingInfo2.setCreateTime(new Date());
boolean result = bookingInfoMapper.insertBookingInfo(bookingInfo2) < 1 ? false : true;
//---------业务代码----------
//主线程插入不成功,不成功让 IS_OK = false;
if (!result) {
throw new RuntimeException();
}
//循环获取子线程结果
while (true) {
if (childResponse.size() == 1) {
break;
}
}
System.out.println(childResponse.size());
//子线程中存在不成功让 IS_OK = false;
if (childResponse.contains(Boolean.FALSE)) {
throw new RuntimeException();
}
} catch (Exception e) {
e.printStackTrace();
//IS_OK = false; 等待主线程走完子线程就能得知 IS_OK 为false
IS_OK.set(Boolean.FALSE);
//这里抛出异常 是为了当前主线程事务自动进行回滚
throw e;
} finally {
//到这个方法执行完就意味着mainMonitor中主线程走完了,接着子线程会开始执行
mainMonitor.countDown();
}
return true;
}
}
实现思路:
首先设置一个全局变量Boolean,默认是可以提交的true,在子线程,通过编程式事务开启事务,然后插入数据,一条线程负责10w,但是不提交,同时通知主线程,我准备好了,进入等待状态。
如果子线程出现异常,那就通知主线程,我这边发生异常,然后自己回滚,最后主线程收集10个子线程的状态,如果有一个线程出现问题,那么全局变量就设置为不可提交false,然后唤醒所有子线程,进行回滚。
所有子线程都正常的情况:
线程1 执行插入业务...
线程2 执行插入业务...
线程3 执行插入业务...
线程n 执行插入业务...
线程1 准备就绪,判断其它线程状态可否提交...
线程2 准备就绪,判断其它线程状态可否提交...
线程3 准备就绪,判断其它线程状态可否提交...
线程n 准备就绪,判断其它线程状态可否提交...
线程1 事务提交..
线程2 事务提交...
线程3 事务提交...
线程n 事务提交...
有子线程出现异常的情况:
线程1 执行插入业务...
线程2 执行插入业务...
线程3 执行插入业务...
线程n 执行插入业务...
线程1 准备就绪,判断其它线程状态可否提交...
线程2 出现异常,开始事务回滚...
线程3 准备就绪,判断其它线程状态可否提交...
线程n 准备就绪,判断其它线程状态可否提交...
main线程:有线程执行失败,全局变量设为false
线程1 事务回滚..
线程2 事务回滚...
线程3 事务回滚...
线程n 事务回滚..
看结果符合了预期!!!
但是...
如果在回头细品代码的时候发现,只是成功了一半出现了一个类似于两阶段提交(2PC)的一致性协议,这个实现方式实际上就是编程式事务配合二阶段提交(2PC)使用。破绽就出在 2PC 上。
二阶段提交的问题:
协调者说可以提交了,但是参与者挂了...
举例:10个线程提交肯定有先后顺序,前面6个提交了,已经写入DB,后面4个还没提交,那不就丢了4个
虽然,从某种角度上,绕开了事务的隔离性,但是有一定概率出现数据一致性问题,虽然概率比较小,所以称这种方案叫:基于运气编程,用运气换时间。
这是小编在开发学习使用和总结的小Demo, 这中间或许也存在着不足,希望可以得到大家的理解和建议。如有侵权联系小编!
版权归原作者 小影~ 所有, 如有侵权,请联系我们删除。