0


mybatisPlus批量插入性能优化

背景:物联网平台背景,传感器采集频率干到了1000Hz,分了100多张表出来,还是把mysql干炸了。当前单表数据量在1000来w,从kafka上拉数据异步批量插入,每次插入数据量1500条,测试的时候还没问题,结果上线没多久,kafka服务器直接挂了,赶忙看日志,kafka服务器堆积了几十G的数据,再去看生产环境日志,发现到最后单次批量插入用时固定在10多秒,甚至20多秒,kafka直接把消费端踢出了消费组…从而kafka消息一直没有消费,总重导致kafka数据堆积挂掉了…

在这样的情况下:采取的处理方案无非就分库分表,减少单表数据量,降低数据库压力;提高批量插入效率,提高消费者消费速度。
本文主要把精力放在如何提高批量插入效率上。

使用的mybatisplus的批量插入方法:saveBatch(),之前就看到过网上都在说在jdbc的url路径上加上
rewriteBatchedStatements=true 参数mysql底层才能开启真正的批量插入模式。

保证5.1.13以上版本的驱动,才能实现高性能的批量插入。 MySQL JDBC驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,批量插入实际上是单条插入,直接造成较低的性能。只有把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。 另外这个选项对INSERT/UPDATE/DELETE都有效。

可是我之前已经添加了,而且数据表目前是没有建立索引的,即使是在1000来w的数据量下进行1500条的批量插入也不可能消耗20来秒吧,于是矛盾转移到saveBatch方法,使用版本:V3.4.3.4
查看源码:

publicbooleansaveBatch(Collection<T> entityList,int batchSize){String sqlStatement =this.getSqlStatement(SqlMethod.INSERT_ONE);returnthis.executeBatch(entityList, batchSize,(sqlSession, entity)->{
            sqlSession.insert(sqlStatement, entity);});}
protected<E>booleanexecuteBatch(Collection<E> list,int batchSize,BiConsumer<SqlSession,E> consumer){returnSqlHelper.executeBatch(this.entityClass,this.log, list, batchSize, consumer);}
publicstatic<E>booleanexecuteBatch(Class<?> entityClass,Log log,Collection<E> list,int batchSize,BiConsumer<SqlSession,E> consumer){Assert.isFalse(batchSize <1,"batchSize must not be less than one",newObject[0]);return!CollectionUtils.isEmpty(list)&&executeBatch(entityClass, log,(sqlSession)->{int size = list.size();int i =1;for(Iterator var6 = list.iterator(); var6.hasNext();++i){E element = var6.next();
                consumer.accept(sqlSession, element);if(i % batchSize ==0|| i == size){
                    sqlSession.flushStatements();}}});}

最终来到了executeBatch()方法,可以看到这很明显是在一条一条循环插入,通过sqlSession.flushStatements()将一个个单条插入的insert语句分批次进行提交,而且是同一个sqlSession,这相比遍历集合循环insert来说有一定的性能提升,但是这并不是sql层面真正的批量插入。

通过查阅相关文档后,发现mybatisPlus提供了sql注入器,我们可以自定义方法来满足业务的实际开发需求。
sql注入器官网
sql注入器官方示例
在mybtisPlus的核心包下提供的默认可注入方法有这些:
在这里插入图片描述
在扩展包下,mybatisPlus还为我们提供了可扩展的可注入方法:
在这里插入图片描述
AlwaysUpdateSomeColumnById: 根据Id更新每一个字段,全量更新不忽略null字段,解决mybatis-plus中updateById默认会自动忽略实体中null值字段不去更新的问题;
InsertBatchSomeColumn: 真实批量插入,通过单SQL的insert语句实现批量插入;
Upsert: 更新or插入,根据唯一约束判断是执行更新还是删除,相当于提供insert on duplicate key update支持。

可以发现mybatisPlus已经提供好了InsertBatchSomeColumn的方法,我们只需要把这个方法添加进我们的sql注入器即可。

publicMappedStatementinjectMappedStatement(Class<?> mapperClass,Class<?> modelClass,TableInfo tableInfo){KeyGenerator keyGenerator =NoKeyGenerator.INSTANCE;SqlMethod sqlMethod =SqlMethod.INSERT_ONE;List<TableFieldInfo> fieldList = tableInfo.getFieldList();String insertSqlColumn = tableInfo.getKeyInsertSqlColumn(true,false)+this.filterTableFieldInfo(fieldList,this.predicate,TableFieldInfo::getInsertSqlColumn,"");//------------------------------------拼接批量插入语句----------------------------------------String columnScript ="("+ insertSqlColumn.substring(0, insertSqlColumn.length()-1)+")";String insertSqlProperty = tableInfo.getKeyInsertSqlProperty(true,"et.",false)+this.filterTableFieldInfo(fieldList,this.predicate,(i)->{return i.getInsertSqlProperty("et.");},"");
        insertSqlProperty ="("+ insertSqlProperty.substring(0, insertSqlProperty.length()-1)+")";String valuesScript =SqlScriptUtils.convertForeach(insertSqlProperty,"list",(String)null,"et",",");//------------------------------------------------------------------------------------------String keyProperty =null;String keyColumn =null;if(tableInfo.havePK()){if(tableInfo.getIdType()==IdType.AUTO){
                keyGenerator =Jdbc3KeyGenerator.INSTANCE;
                keyProperty = tableInfo.getKeyProperty();
                keyColumn = tableInfo.getKeyColumn();}elseif(null!= tableInfo.getKeySequence()){
                keyGenerator =TableInfoHelper.genKeyGenerator(this.getMethod(sqlMethod), tableInfo,this.builderAssistant);
                keyProperty = tableInfo.getKeyProperty();
                keyColumn = tableInfo.getKeyColumn();}}String sql =String.format(sqlMethod.getSql(), tableInfo.getTableName(), columnScript, valuesScript);SqlSource sqlSource =this.languageDriver.createSqlSource(this.configuration, sql, modelClass);returnthis.addInsertMappedStatement(mapperClass, modelClass,this.getMethod(sqlMethod), sqlSource,(KeyGenerator)keyGenerator, keyProperty, keyColumn);}

接下来就通过SQL注入器实现真正的批量插入

默认的sql注入器

publicclassDefaultSqlInjectorextendsAbstractSqlInjector{publicDefaultSqlInjector(){}publicList<AbstractMethod>getMethodList(Class<?> mapperClass,TableInfo tableInfo){if(tableInfo.havePK()){return(List)Stream.of(newInsert(),newDelete(),newDeleteByMap(),newDeleteById(),newDeleteBatchByIds(),newUpdate(),newUpdateById(),newSelectById(),newSelectBatchByIds(),newSelectByMap(),newSelectCount(),newSelectMaps(),newSelectMapsPage(),newSelectObjs(),newSelectList(),newSelectPage()).collect(Collectors.toList());}else{this.logger.warn(String.format("%s ,Not found @TableId annotation, Cannot use Mybatis-Plus 'xxById' Method.", tableInfo.getEntityType()));return(List)Stream.of(newInsert(),newDelete(),newDeleteByMap(),newUpdate(),newSelectByMap(),newSelectCount(),newSelectMaps(),newSelectMapsPage(),newSelectObjs(),newSelectList(),newSelectPage()).collect(Collectors.toList());}}}

继承DefaultSqlInjector自定义sql注入器

/**
 * @author zhmsky
 * @date 2022/8/15 15:13
 */publicclassMySqlInjectorextendsDefaultSqlInjector{@OverridepublicList<AbstractMethod>getMethodList(Class<?> mapperClass){List<AbstractMethod> methodList =super.getMethodList(mapperClass);//更新时自动填充的字段,不用插入值
        methodList.add(newInsertBatchSomeColumn(i -> i.getFieldFill()!=FieldFill.UPDATE));return methodList;}}

将自定义的sql注入器注入到Mybatis容器中

/**
 * @author zhmsky
 * @date 2022/8/15 15:15
 */@ConfigurationpublicclassMybatisPlusConfig{@BeanpublicMySqlInjectorsqlInjector(){returnnewMySqlInjector();}}

继承 BaseMapper 添加自定义方法

/**
 * @author zhmsky
 * @date 2022/8/15 15:17
 */publicinterfaceCommonMapper<T>extendsBaseMapper<T>{/**
     * 真正的批量插入
     * @param entityList
     * @return
     */intinsertBatchSomeColumn(List<T> entityList);}

对应的mapper层接口继承上面自定义的mapper

*@author zhmsky
 *@since2021-12-01*/@MapperpublicinterfaceUserMapperextendsCommonMapper<User>{}

最后直接调用UserMapper的insertBatchSomeColumn()方法即可实现真正的批量插入。

@TestvoidcontextLoads(){for(int i =0; i <5; i++){User user =newUser();
            user.setAge(10);
            user.setUsername("zhmsky");
            user.setEmail("[email protected]");
            userList.add(user);}long l =System.currentTimeMillis();
        userMapper.insertBatchSomeColumn(userList);long l1 =System.currentTimeMillis();System.out.println("-------------------:"+(l1-l));
        userList.clear();}

查看日志输出信息,观察执行的sql语句,
在这里插入图片描述
发现这才是真正意义上的sql层面的批量插入。
但是,到这里并没有结束,mybatisPlus官方提供的insertBatchSomeColumn方法不支持分批插入,也就是有多少直接全部一次性插入,这就可能会导致最后的sql拼接语句特别长,超出了mysql的限制,于是我们还要实现一个类似于saveBatch的分批的批量插入方法。

添加分批插入

模仿原来的saveBatch方法:

*@author zhmsky
 *@since2021-12-01*/@ServicepublicclassUserServiceImplextendsServiceImpl<UserMapper,User>implementsUserService{@Override@Transactional(rollbackFor ={Exception.class})publicbooleansaveBatch(Collection<User> entityList,int batchSize){try{int size = entityList.size();int idxLimit =Math.min(batchSize, size);int i =1;//保存单批提交的数据集合List<User> oneBatchList =newArrayList<>();for(Iterator<User> var7 = entityList.iterator(); var7.hasNext();++i){User element = var7.next();
                oneBatchList.add(element);if(i == idxLimit){
                    baseMapper.insertBatchSomeColumn(oneBatchList);//每次提交后需要清空集合数据
                    oneBatchList.clear();
                    idxLimit =Math.min(idxLimit + batchSize, size);}}}catch(Exception e){
            log.error("saveBatch fail", e);returnfalse;}returntrue;}}

测试:

@TestvoidcontextLoads(){for(int i =0; i <20; i++){User user =newUser();
            user.setAge(10);
            user.setUsername("zhmsky");
            user.setEmail("[email protected]");
            userList.add(user);}long l =System.currentTimeMillis();
        userService.saveBatch(userList,10);long l1 =System.currentTimeMillis();System.out.println("-------------------:"+(l1-l));
        userList.clear();}

输出结果:
在这里插入图片描述
分批插入已满足,到此收工结束了。

接下来最重要的测试下性能

在这里插入图片描述
当前数据表的数据量在100w多条,在此基础上分别拿原始的saveBatch(假的批量插入)和 insertBatchSomeColumn(真正的批量插入)进行性能对比----(jdbc均开启rewriteBatchedStatements):

原来的假的批量插入:

@Testvoidinsert(){for(int i =0; i <50000; i++){User user =newUser();
            user.setAge(10);
            user.setUsername("zhmsky");
            user.setEmail("[email protected]");
            userList.add(user);}long l =System.currentTimeMillis();
        userService.saveBatch(userList,1000);long l1 =System.currentTimeMillis();System.out.println("原来的saveBatch方法耗时:"+(l1-l));}

在这里插入图片描述
自定义的insertBatchSomeColumn:

@TestvoidcontextLoads(){for(int i =0; i <50000; i++){User user =newUser();
            user.setAge(10);
            user.setUsername("zhmsky");
            user.setEmail("[email protected]");
            userList.add(user);}long l =System.currentTimeMillis();
        userService.saveBatch(userList,1000);long l1 =System.currentTimeMillis();System.out.println("自定义的insertBatchSomeColumn方法耗时:"+(l1-l));
        userList.clear();}

在这里插入图片描述
分批插入5w条数据,自定义的真正意义上的批量插入耗时减少了3秒左右,用insertBatchSomeColum分批插入1500条数据耗时650毫秒,这速度已经挺快了
在这里插入图片描述


本文转载自: https://blog.csdn.net/weixin_42194695/article/details/126349842
版权归原作者 zhm_sky 所有, 如有侵权,请联系我们删除。

“mybatisPlus批量插入性能优化”的评论:

还没有评论