SpringBoot 利用 ThreadPoolTaskExecutor 批量插入万条数据
在批处理插入数据时,如果在单线程环境下是非常耗时的,本篇文章将采用单线程和多线程进行对比,利用
ThreadPoolTaskExecutor
进行多线程批处理插入65w数据,然后和单线程进行对比,最终得到性能优化。
yml 文件配置
# 异步线程池配置thread:pool:corePoolSize:8# 核心线程数maxPoolSize:20# 设置最大线程数keepAliveSeconds:300# 设置线程活跃时间queueCapacity:100# 设置队列容量prefixName: async-service-# 线程名称前缀
spring 容器注入线程池 bean 对象
@Data@ConfigurationProperties(prefix ="thread.pool")publicclassThreadPoolConfig{/**
* 核心线程数
*/privateInteger corePoolSize;/**
* 设置最大线程数
*/privateInteger maxPoolSize;/**
* 设置线程活跃时间
*/privateInteger keepAliveSeconds;/**
* 设置队列容量
*/privateInteger queueCapacity;/**
* 线程名称前缀
*/privateString prefixName;}
@Configuration@EnableAsync@Slf4jpublicclassThreadPoolExecutorConfig{privateThreadPoolConfig threadPoolConfig;publicThreadPoolExecutorConfig(ThreadPoolConfig threadPoolConfig){this.threadPoolConfig = threadPoolConfig;}@Bean(name ="asyncServiceExecutor")publicExecutorasyncServiceExecutor(){
log.info("start asyncServiceExecutor");ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();
executor.setCorePoolSize(threadPoolConfig.getCorePoolSize());
executor.setMaxPoolSize(threadPoolConfig.getMaxPoolSize());
executor.setQueueCapacity(threadPoolConfig.getQueueCapacity());
executor.setKeepAliveSeconds(threadPoolConfig.getKeepAliveSeconds());
executor.setThreadNamePrefix(threadPoolConfig.getPrefixName());// 拒绝策略
executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());// 初始化
executor.initialize();return executor;}}
创建异步线程业务类
@Service@Slf4jpublicclassAsyncServiceImplimplementsAsyncService{@Override@Async("asyncServiceExecutor")publicvoidexecuteAsync(List<StandardStation> list,StandardStationService standardStationService,CountDownLatch countDownLatch){try{
log.info("start executeAsync");// 异步线程需要做的事情
standardStationService.saveBatch(list);
log.info("end executeAsync");}finally{// 无论上面程序是否异常必须执行 countDown,否则 await 无法释放
countDownLatch.countDown();}}}
创建单线程批量插入具体业务方法
/**
* 单线程插入 650000 条数据
*/@TestpublicvoidtestSingleThread(){// 10000 条数据List<StandardStation> standardStationList = list.stream().map(info ->{StandardStation standardStation =newStandardStation();BeanUtils.copyProperties(info, standardStation);return standardStation;}).collect(Collectors.toList());// 单线程 每 100 条数据插入一次List<List<StandardStation>> lists =Lists.partition(standardStationList,100);long startTime =System.currentTimeMillis();
lists.forEach(listSub -> standardStationService.saveBatch(listSub));long endTime =System.currentTimeMillis();
log.info("共耗时:{} 秒",(endTime - startTime)/1000);}
结果:
创建多线程批量插入具体业务方法
/**
* 多线程插入 650000 条数据
*/@TestpublicvoidtestMultiThread(){// 10000 条数据List<StandardStation> standardStationList = list.stream().map(info ->{StandardStation standardStation =newStandardStation();BeanUtils.copyProperties(info, standardStation);return standardStation;}).collect(Collectors.toList());// 每 100 条数据插入开一个线程List<List<StandardStation>> lists =Lists.partition(standardStationList,100);CountDownLatch countDownLatch =newCountDownLatch(lists.size());long startTime =System.currentTimeMillis();
lists.forEach(listSub -> asyncService.executeAsync(listSub, standardStationService, countDownLatch));try{// 保证之前的所有的线程都执行完成,才会走下面的
countDownLatch.await();}catch(InterruptedException e){
log.error("阻塞异常:"+ e.getMessage());}long endTime =System.currentTimeMillis();
log.info("共耗时:{} 秒",(endTime - startTime)/1000);}
结果:
从上述的结果可以看出,使用多线程后,批处理插入大量数据的耗时大大减少,由此可见多线程的好处。
本文转载自: https://blog.csdn.net/m0_52781902/article/details/128673369
版权归原作者 lili要努力 所有, 如有侵权,请联系我们删除。
版权归原作者 lili要努力 所有, 如有侵权,请联系我们删除。