0


SpringBoot 利用 ThreadPoolTaskExecutor 批量插入数十万条数据

SpringBoot 利用 ThreadPoolTaskExecutor 批量插入万条数据

在批处理插入数据时,如果在单线程环境下是非常耗时的,本篇文章将采用单线程和多线程进行对比,利用

ThreadPoolTaskExecutor

进行多线程批处理插入65w数据,然后和单线程进行对比,最终得到性能优化。

image-20230112191339858

image-20230113135223468

yml 文件配置

# 异步线程池配置thread:pool:corePoolSize:8# 核心线程数maxPoolSize:20# 设置最大线程数keepAliveSeconds:300# 设置线程活跃时间queueCapacity:100# 设置队列容量prefixName: async-service-# 线程名称前缀

spring 容器注入线程池 bean 对象

@Data@ConfigurationProperties(prefix ="thread.pool")publicclassThreadPoolConfig{/**
     * 核心线程数
     */privateInteger corePoolSize;/**
     * 设置最大线程数
     */privateInteger maxPoolSize;/**
     * 设置线程活跃时间
     */privateInteger keepAliveSeconds;/**
     * 设置队列容量
     */privateInteger queueCapacity;/**
     * 线程名称前缀
     */privateString prefixName;}
@Configuration@EnableAsync@Slf4jpublicclassThreadPoolExecutorConfig{privateThreadPoolConfig threadPoolConfig;publicThreadPoolExecutorConfig(ThreadPoolConfig threadPoolConfig){this.threadPoolConfig = threadPoolConfig;}@Bean(name ="asyncServiceExecutor")publicExecutorasyncServiceExecutor(){
        log.info("start asyncServiceExecutor");ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();
        executor.setCorePoolSize(threadPoolConfig.getCorePoolSize());
        executor.setMaxPoolSize(threadPoolConfig.getMaxPoolSize());
        executor.setQueueCapacity(threadPoolConfig.getQueueCapacity());
        executor.setKeepAliveSeconds(threadPoolConfig.getKeepAliveSeconds());
        executor.setThreadNamePrefix(threadPoolConfig.getPrefixName());// 拒绝策略
        executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());// 初始化
        executor.initialize();return executor;}}

创建异步线程业务类

@Service@Slf4jpublicclassAsyncServiceImplimplementsAsyncService{@Override@Async("asyncServiceExecutor")publicvoidexecuteAsync(List<StandardStation> list,StandardStationService standardStationService,CountDownLatch countDownLatch){try{
            log.info("start executeAsync");// 异步线程需要做的事情
            standardStationService.saveBatch(list);
            log.info("end executeAsync");}finally{// 无论上面程序是否异常必须执行 countDown,否则 await 无法释放
            countDownLatch.countDown();}}}

创建单线程批量插入具体业务方法

/**
 * 单线程插入 650000 条数据
 */@TestpublicvoidtestSingleThread(){// 10000 条数据List<StandardStation> standardStationList = list.stream().map(info ->{StandardStation standardStation =newStandardStation();BeanUtils.copyProperties(info, standardStation);return standardStation;}).collect(Collectors.toList());// 单线程 每 100 条数据插入一次List<List<StandardStation>> lists =Lists.partition(standardStationList,100);long startTime =System.currentTimeMillis();
    lists.forEach(listSub -> standardStationService.saveBatch(listSub));long endTime =System.currentTimeMillis();
    log.info("共耗时:{} 秒",(endTime - startTime)/1000);}

结果:

image-20230113135734803

创建多线程批量插入具体业务方法

/**
 * 多线程插入 650000 条数据
 */@TestpublicvoidtestMultiThread(){// 10000 条数据List<StandardStation> standardStationList = list.stream().map(info ->{StandardStation standardStation =newStandardStation();BeanUtils.copyProperties(info, standardStation);return standardStation;}).collect(Collectors.toList());// 每 100 条数据插入开一个线程List<List<StandardStation>> lists =Lists.partition(standardStationList,100);CountDownLatch countDownLatch =newCountDownLatch(lists.size());long startTime =System.currentTimeMillis();
    lists.forEach(listSub -> asyncService.executeAsync(listSub, standardStationService, countDownLatch));try{// 保证之前的所有的线程都执行完成,才会走下面的
        countDownLatch.await();}catch(InterruptedException e){
        log.error("阻塞异常:"+ e.getMessage());}long endTime =System.currentTimeMillis();
    log.info("共耗时:{} 秒",(endTime - startTime)/1000);}

结果:

image-20230113135830051

从上述的结果可以看出,使用多线程后,批处理插入大量数据的耗时大大减少,由此可见多线程的好处。

标签: spring boot java spring

本文转载自: https://blog.csdn.net/m0_52781902/article/details/128673369
版权归原作者 lili要努力 所有, 如有侵权,请联系我们删除。

“SpringBoot 利用 ThreadPoolTaskExecutor 批量插入数十万条数据”的评论:

还没有评论