SpringBoot高效批量插入百万数据
前言:我相信很多小伙伴和我一样在初学的时候,面对几万几十万数据插入,不知道如何下手,面对百万级别数据时更是不知所措,我们大部分初学者,很多人都喜欢for循环插入数据,或者是开启多个线程,然后分批使用for循环插入,当我们需要将大量数据存储到数据库中时,传统的逐条插入方式显然效率低下,并且容易导致性能瓶颈。而批量插入是一种更加高效的方式,可以大幅提高数据的插入速度,特别是在数据量较大的情况下。本文将介绍如何使用 Spring Boot 实现高效批量插入百万数据,以解决传统逐条插入方式存在的性能问题。我们将使用不同的插入方式来比较。
1.抛出问题
传统的单条插入存在什么问题:
- 性能问题:如果循环插入的数据量比较大,每次插入都需要与数据库建立连接、执行插入操作,这将导致频繁的网络通信和数据库操作,性能会受到影响。可以考虑批量插入数据来提高性能,例如使用 JDBC 的批处理功能或使用框架提供的批处理方法。
- 事务问题:默认情况下,Spring Boot 的事务管理是基于注解的,每次循环插入数据都会开启一个新的事务,这可能导致事务管理的开销过大。可以考虑将整个循环插入数据放在一个事务中,或者使用编程式事务管理来控制事务的粒度。
- 数据库连接问题:在循环过程中,如果每次都重新获取数据库连接,可能会导致连接资源的浪费和性能下降。可以考虑使用连接池技术来管理数据库连接,确保连接的复用和高效利用。
- 异常处理问题:在循环插入数据时,可能会出现插入失败、异常等情况。需要适当处理异常,例如记录错误日志、回滚事务等,以确保数据的完整性和一致性。
2.前期准备工作
框架:springboot+mybatis plus +mysql
- 准备工作 创建一个简单的springboot项目,pom依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.15</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
- 创建测试需要使用的表
CREATETABLE`student`(`id`intNOTNULLAUTO_INCREMENT,`name`varchar(255)DEFAULTNULL,`age`intDEFAULTNULL,`addr`varchar(255)DEFAULTNULL,`addr_Num`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=8497107DEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
- 修改application.yml配置
server:port:8090spring:datasource:url: jdbc:mysql://localhost:3306/boot_study?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=trueusername: root
password:123456driver-class-name: com.mysql.cj.jdbc.Driver
- 创建实体类StudentDO 注意:在实际业务中,我们应该明确去定义controller service 层的数据模型,数据传输XxxDTO、数据表实体映射XxxDO、返回给前台数据实体XxxVO,这些模型数据都需要根据实际情况在Service实现类和Controller层转换 这里为了演示方便就不按规范定义
@TableName(value ="student")@DatapublicclassStudentDO{/** 主键 type:自增 */@TableId(type =IdType.AUTO)privateInteger id;/** 名字 */privateString name;/** 年龄 */privateInteger age;/** 地址 */privateString addr;/** 地址号 @TableField:与表字段映射 */@TableField(value ="addr_num")privateString addrNum;publicStudentDO(String name,int age,String addr,String addrNum){this.name = name;this.age = age;this.addr = addr;this.addrNum = addrNum;}}
- controller定义
@RestController@RequestMapping("/student")publicclassStudentController{@AutowiredprivateStudentMapper studentMapper;@AutowiredprivateStudentService studentService;@AutowiredprivateSqlSessionFactory sqlSessionFactory;@AutowiredprivateThreadPoolTaskExecutor taskExecutor;@AutowiredprivatePlatformTransactionManager transactionManager;}
- service和impl定义
publicinterfaceStudentServiceextendsIService<StudentDO>{}//实现定义@ServicepublicclassStudentServiceImplextendsServiceImpl<StudentMapper,StudentDO>implementsStudentService{}
- Mapper定义
publicinterfaceStudentMapperextendsBaseMapper<StudentDO>{@Insert("<script>"+"insert into student (name, age, addr, addr_num) values "+"<foreach collection='studentDOList' item='item' separator=','> "+"(#{item.name}, #{item.age},#{item.addr}, #{item.addrNum}) "+"</foreach> "+"</script>")publicintinsertSplice(@Param("studentDOList")List<StudentDO> studentDOList);}
3.测试示例演示
模拟100万条数据不同方式插入
- for循环单条插入(不建议) 这里100万数据大概要20分钟以上,所以以10万条数据类推 10万条数据总耗时348.864秒
@GetMapping("/for")publicvoid insertForData (){long start =System.currentTimeMillis();for(int i =0; i <1000000; i++){StudentDOStudentDO=newStudentDO("张三"+i, i,"地址"+i, i+"号");
studentMapper.insert(StudentDO);}long end =System.currentTimeMillis();System.out.println("插入数据耗费时间:"+(end-start));}
结果:实际上不知道等了多久很慢很慢,总体时间差不多半个多小时,因为这里的for循环进行单条插入时,每次都是在获取连接(Connection)、释放连接和资源关闭等操作上,(如果数据量大的情况下)极其消耗资源,导致时间长。
2. xml拼接foreach sql插入(大量数据不建议)
10万条数据插入数据耗费时间:3.554秒
@GetMapping("/sql")publicvoid insertSqlData (){long start =System.currentTimeMillis();ArrayList<StudentDO> arrayList =newArrayList<>();for(int i =0; i <100000; i++){StudentDOStudentDO=newStudentDO("张三"+i, i,"地址"+i, i+"号");
arrayList.add(StudentDO);}
studentMapper.insertSplice(arrayList);long end =System.currentTimeMillis();System.out.println("插入数据耗费时间:"+(end-start));}
结果:我们在Mapper里面是要insert注解拼接,拼接结果就是将所有的数据集成在一条SQL语句的value值上,其由于提交到服务器上的insert语句少了,相就不需要每次获取连接(Connection)、释放连接和资源关闭,网络负载少了,插入的性能有了提高。但是在数据量大的情况下可能会出现内存溢出、解析SQL语句耗时等情况。
3. mybatis-plus 批量插入saveBatch(推荐)
10万条数据插入数据耗费时间:2.481秒,在数据量不大的情况下和上面差不多
50万条数据插入数据耗费时间:12.473秒
@GetMapping("/batch")publicvoid insertSaveBatchData (){long start =System.currentTimeMillis();ArrayList<StudentDO> arrayList =newArrayList<>();for(int i =0; i <100000; i++){StudentDOStudentDO=newStudentDO("张三"+i, i,"地址"+i, i+"号");
arrayList.add(StudentDO);}
studentService.saveBatch(arrayList);long end =System.currentTimeMillis();System.out.println("插入数据耗费时间:"+(end-start));}
结果:使用MyBatis-Plus实现IService接口中批处理saveBatch()方法,可以很明显的看到性能有了提升,我们可以查看一下源码,它的底层实现原理利用分片处理(batchSize = 1000) + 分批提交事务的操作,来提高插入性能,并没有在连接上消耗性能,MySQLJDBC驱动默认情况下忽略saveBatch()方法中的executeBatch()语句,将需要批量处理的一组SQL语句进行拆散,执行时一条一条给MySQL数据库,造成实际上是分片插入,即与单条插入方式相比,有提高,但是性能未能得到实质性的提高。
- 手动开启批处理模式+批量插入手动提交(推荐) 10万条数据插入数据耗费时间:2.481秒, 50万条数据插入数据耗费时间:13.436秒 和上面相比不管是大数据量还是小数据量两者都是差不多
@GetMapping("/forSaveBatch")publicvoid insertforSaveBatchData (){//创建批量插入SqlSessionSqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false);StudentMapper studentMapper = sqlSession.getMapper(StudentMapper.class);long start =System.currentTimeMillis();ArrayList<StudentDO> arrayList =newArrayList<>();for(int i =0; i <500000; i++){StudentDOStudentDO=newStudentDO("张三"+i, i,"地址"+i, i+"号");
studentMapper.insert(StudentDO);}
sqlSession.commit();
sqlSession.close();long end =System.currentTimeMillis();System.out.println("插入数据耗费时间:"+(end-start));}
结果:手动开启批处理,手动处理关闭自动提交事务,共用同一个SqlSession之后,for循环单条插入的性能得到实质性的提高;由于同一个SqlSession省去对资源相关操作的耗能、减少对事务处理的时间等,从而极大程度上提高执行效率。
5. ThreadPoolTaskExecutor分割多线程插入(大数据量强烈推荐)
50万条数据插入数据耗费时间:3。536秒,插入速度直接是前面的4倍,是不是很疑惑这样就快了这么多?
原理:多线程批量插入的过程,首先定义了一个线程池(ThreadPoolTaskExecutor),用于管理线程的生命周期和执行任务。然后,我们将要插入的数据列表按照指定的批次大小分割成多个子列表,并开启多个线程来执行插入操作,通过 TransactionManager 获取事务管理器,并使用 TransactionDefinition 定义事务属性。然后,在每个线程中,我们通过 transactionManager.getTransaction() 方法获取事务状态,并在插入操作中使用该状态来管理事务。
在插入操作完成后,我们再根据操作结果调用transactionManager.commit()或 transactionManager.rollback() 方法来提交或回滚事务。在每个线程执行完毕后,都会调用 CountDownLatch 的 countDown() 方法,以便主线程等待所有线程都执行完毕后再返回。
@GetMapping("/threadPoolInsert")publicvoid insertThreadPoolBatchData (){ArrayList<StudentDO> arrayList =newArrayList<>();for(int i =0; i <500000; i++){StudentDOStudentDO=newStudentDO("张三"+i, i,"武汉"+i, i+"号");
arrayList.add(StudentDO);}int count = arrayList.size();int pageSize =10000;int threadNum = count % pageSize ==0? count / pageSize: count / pageSize +1;CountDownLatch downLatch =newCountDownLatch(threadNum);long start =System.currentTimeMillis();for(int i =0; i < threadNum; i++){//开始序号int startIndex = i * pageSize;//结束序号int endIndex =Math.min(count,(i+1)*pageSize);//分割listList<StudentDO>StudentDOs= arrayList.subList(startIndex, endIndex);
taskExecutor.execute(()->{DefaultTransactionDefinition definition =newDefaultTransactionDefinition();TransactionStatus status = transactionManager.getTransaction(definition);try{
studentMapper.insertSplice(StudentDOs);
transactionManager.commit(status);}catch(Exception e){
transactionManager.rollback(status);
e.printStackTrace();}finally{//执行完后 计数
downLatch.countDown();}});}try{//等待
downLatch.await();}catch(InterruptedException e){thrownewRuntimeException(e);}long end =System.currentTimeMillis();System.out.println("插入数据耗费时间:"+(end-start));}
- ThreadPoolTaskExecutor分割异步插入 除了上面多线程分割插入,我们也可以使用多线程异步插入其实和上面插入的原理是差不多,下面演示异步插入 - 修改application.yml增加配置 这个参数根据自己的电脑配置合理设置
async:executor:thread:core_pool_size:35max_pool_size:35queue_capacity:99999name:prefix: async-testDB-
- 自定义ThreadPoolTaskExecutor配置
@EnableAsync@ConfigurationpublicclassExecutorConfig{@Value("${async.executor.thread.core_pool_size}")privateint corePoolSize;@Value("${async.executor.thread.max_pool_size}")privateint maxPoolSize;@Value("${async.executor.thread.queue_capacity}")privateint queueCapacity;@Value("${async.executor.thread.name.prefix}")privateString namePrefix;@Bean(name ="asyncServiceExecutor")publicExecutorasyncServiceExecutor(){ThreadPoolTaskExecutor taskExecutor =newThreadPoolTaskExecutor();//设置核心线程数
taskExecutor.setCorePoolSize(corePoolSize);//设置最大线程数
taskExecutor.setMaxPoolSize(maxPoolSize);//设置队列容量
taskExecutor.setQueueCapacity(queueCapacity);//设置线程名前缀
taskExecutor.setThreadNamePrefix(namePrefix);//设置拒绝策略// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
taskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());return taskExecutor;}}
- 定义异步service和实现类
//接口服务publicinterfaceAsyncService{voidexecuteAsync(List<StudentDO> studentDOList,CountDownLatch countDownLatch);}//实现类@ServicepublicclassAsyncServiceImplextendsServiceImplimplementsAsyncService{@AutowiredprivateStudentService studentService;@Async("asyncServiceExecutor")@OverridepublicvoidexecuteAsync(List<StudentDO> studentDOList,CountDownLatch countDownLatch){try{//异步线程要做的事情
studentService.saveBatch(studentDOList);}finally{
countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放}}}
- control代码 这里需要注意修改, 因为我们在ExecutorConfig配置类里面重新设置了ThreadPoolTaskExecutor @Autowired private ThreadPoolTaskExecutor taskExecutor; 改为 @Autowired private Executor taskExecutor;
@GetMapping("/asyncInsertData")publicvoidasyncInsertData(){List<StudentDO> studentDOList =getTestData();//测试每100条数据插入开一个线程long start =System.currentTimeMillis();List<List<StudentDO>> lists =ListUtil.split(studentDOList,10000);CountDownLatch countDownLatch =newCountDownLatch(lists.size());for(List<StudentDO> listSub:lists){
asyncService.executeAsync(listSub,countDownLatch);}try{
countDownLatch.await();//保证之前的所有的线程都执行完成,才会走下面的;// 这样就可以在下面拿到所有线程执行完的集合结果}catch(Exception e){
e.printStackTrace();}long end =System.currentTimeMillis();System.out.println("插入数据耗费时间:"+(end-start));}publicList<StudentDO>getTestData(){ArrayList<StudentDO> arrayList =newArrayList<>();for(int i =0; i <500000; i++){StudentDO studentDO =newStudentDO("张三"+i, i,"武汉"+i, i+"号");
arrayList.add(studentDO);}return arrayList;}
50万条数据插入数据耗费时间:2.604秒,这里插入和上面差不多因为他们使用的都是多线程插入
总结:经过上面的示例演示我们心里已经有谱了,知道什么时候该使用哪一种数据插入方式,针对对不同线程数的测试,发现不是线程数越多越好,具体多少合适,通常的算法:CPU核心数量*2 +2 个线程。
实际要根据每个人的电脑配置情况设置合适的线程数,可以根据下面这个公式获取:
int processNum =Runtime.getRuntime().availableProcessors();// 返回可用处理器的Java虚拟机的数量int corePoolSize =(int)(processNum /(1-0.2));int maxPoolSize =(int)(processNum /(1-0.5));
版权归原作者 seeyouagain_ 所有, 如有侵权,请联系我们删除。