前言
最近在做数据写入服务的性能优化,主要是基于Mybatis-Plus实现一套批量写数据的服务,不过该服务是支持整个平台所有需要持久化的业务实体。所以这种服务不仅仅有insert操作还有update的操作。根据以往的MySQL数据库写入经验,主要总结了两套批量插入、批量插入更新的优化思路。
应用场景
1、纯插入、纯更新
2、插入+更新共存
可行性分析
在本地环境使用JMeter验证MySQL插入性能,以t_xxx表(55个字段)表为例,分别用1个线程、2个线程、10个线程测试写入Mysql,不同批次大小批量Replace into结果如下:
批次大小
线程数
TPS
10/20/50/100/200/500
1/2/10
空表replace into(全是insert)
50w数据表replace into(全是update)
10
1
125*10=1250
99*10=990
20
106*20=2120
76*20=1520
50
70*50=3500
40*50=2000
100
42*100=4200
22*100=2200
200
12*200=2400
10*200=2000
500
3.6*500=1800
1.2*500=600
10
2
210*10=2100
190*10=1900
20
186*20=3820
138*20=2760
50
128*50=6400
74*50=3700
100
73*100=7300
42*100=4200
200
22*200=4400
20*200=4000
500
6.7*500=3350
6.1*500=3050
10
10
1200*10=12000
641*10=6410
20
905*20=18100
429*20=8580
50
525*50=26250
192*50=9600
100
230*100=23000
85*100=8500
200
110*200=22000
42*200=8400
500
25*500=12500
16*500=8000
- 无论空表insert、50w表update,最优的批次都是在50~100,所以最大500一个批次写库是否合理?
- 50w表单线程插入Replace写入TPS=2000~4200
思路
主要从两个方面来考虑这个问题:
- SQL本身的执行效率
- 网络I/O
纯插入、纯更新
批量插入的时候,一般有两种思路:
(1)用一个 for 循环,把数据一条一条的插入(这种需要开启批处理)
insert into t_xx(a) values(1);
insert into t_xx(a) values(2);
开启批处理简单的讲就是openSession的时候带上参数
ExecutorType.BATCH
,可以几乎无损优化你的代码性能。
SqlSession session = sessionFactory.openSession(ExecutorType.BATCH);
(2)生成一条插入 sql,类似这种
insert into t_xx(a) values(1),(2);
- 这种方案的优势在于只有一次网络 IO,即使分片处理也只是数次网络 IO,所以这种方案不会在网络 IO 上花费太多时间。
- 当然这种方案有好几个劣势,一是 SQL 太长了,甚至可能需要分片后批量处理;二是无法充分发挥 PreparedStatement 预编译的优势,SQL 要重新解析且无法复用;三是最终生成的 SQL 如果太长了,数据库管理器解析这么长的 SQL 也需要时间。
插入+更新共存
批量插入和更新,一般采用
(1)insert、update语句分开批量处理
第一种这种和上述纯插入、纯更新处理类似,但是一般这种处理比较麻烦的点在于如何分类。一般对于区分可能需要根据UK查询一把。
(2)使用replace into语句批量处理
replace into t_xx(a) values(1),(2);
这种方案主要的原理是delete + insert,这种如果在多线程下执行出现死锁的概率很大。
目前基于mybatis-plus自定义SQL注入器方法实现,主要代码如下:
@Override
public MappedStatement injectMappedStatement(Class<?> mapperClass, Class<?> modelClass, TableInfo tableInfo) {
KeyGenerator keyGenerator = new NoKeyGenerator();
String scriptSql = "<script>\n%s\nVALUES %s\n</script>";
String replaceSql = "REPLACE INTO %s %s";
List<TableFieldInfo> fieldList = tableInfo.getFieldList();
String insertSqlColumn = tableInfo.getKeyInsertSqlColumn(false) +
this.filterTableFieldInfo(fieldList, predicate, TableFieldInfo::getInsertSqlColumn, EMPTY);
String columnScript = LEFT_BRACKET + insertSqlColumn.substring(0, insertSqlColumn.length() - 1) + RIGHT_BRACKET;
String insertSqlProperty = tableInfo.getKeyInsertSqlProperty(ENTITY_DOT, false) +
this.filterTableFieldInfo(fieldList, predicate, i -> i.getInsertSqlProperty(ENTITY_DOT), EMPTY);
insertSqlProperty = LEFT_BRACKET + insertSqlProperty.substring(0, insertSqlProperty.length() - 1) + RIGHT_BRACKET;
String valuesScript = SqlScriptUtils.convertForeach(insertSqlProperty, "list", null, ENTITY, COMMA);
String keyProperty = null;
String keyColumn = null;
if (tableInfo.havePK()) {
if (tableInfo.getIdType() == IdType.AUTO) {
keyGenerator = new NoKeyGenerator();
keyProperty = tableInfo.getKeyProperty();
keyColumn = tableInfo.getKeyColumn();
} else {
if (null != tableInfo.getKeySequence()) {
keyGenerator = TableInfoHelper.genKeyGenerator(functionName, tableInfo, builderAssistant);
keyProperty = tableInfo.getKeyProperty();
keyColumn = tableInfo.getKeyColumn();
}
}
}
String sql = String.format(scriptSql, String.format(replaceSql, tableInfo.getTableName(), columnScript), valuesScript);
SqlSource sqlSource = languageDriver.createSqlSource(configuration, sql, modelClass);
return this.addInsertMappedStatement(mapperClass, modelClass, functionName, sqlSource, keyGenerator, keyProperty, keyColumn);
}
(3)使用insert into tb values() on duplicate key update批量处理
insert into t_xx(a) values(1),(2) on duplicate key update a=values(a)
这种方案主要的原理是根据uk是否冲突判断,是否执行insert或者update,这种如果在多线程下执行会出现死锁,但是冲突概率相比较方案2要小很多。
目前基于mybatis-plus自定义SQL注入器方法实现,主要代码如下:
@Override
public MappedStatement injectMappedStatement(Class<?> mapperClass, Class<?> modelClass, TableInfo tableInfo) {
KeyGenerator keyGenerator = new NoKeyGenerator();
String scriptSql = "<script>\n%s\nVALUES %s on duplicate key update %s\n</script>";
String insertSql = "INSERT INTO %s %s";
String insertSqlColumn = tableInfo.getKeyInsertSqlColumn(false) +
this.filterTableFieldInfo(fieldList, predicate, TableFieldInfo::getInsertSqlColumn, EMPTY);
String columnScript = LEFT_BRACKET + insertSqlColumn.substring(0, insertSqlColumn.length() - 1) + RIGHT_BRACKET;
String updateSqlColumn = getUpdateSqlProperties(insertSqlColumn);
String insertSqlProperty = tableInfo.getKeyInsertSqlProperty(ENTITY_DOT, false) +
this.filterTableFieldInfo(fieldList, predicate, i -> i.getInsertSqlProperty(ENTITY_DOT), EMPTY);
insertSqlProperty = LEFT_BRACKET + insertSqlProperty.substring(0, insertSqlProperty.length() - 1) + RIGHT_BRACKET;
String valuesScript = SqlScriptUtils.convertForeach(insertSqlProperty, "list", null, ENTITY, COMMA);
String keyProperty = null;
String keyColumn = null;
if (tableInfo.havePK()) {
if (tableInfo.getIdType() == IdType.AUTO) {
keyGenerator = new NoKeyGenerator();
keyProperty = tableInfo.getKeyProperty();
keyColumn = tableInfo.getKeyColumn();
} else {
if (null != tableInfo.getKeySequence()) {
keyGenerator = TableInfoHelper.genKeyGenerator(functionName, tableInfo, builderAssistant);
keyProperty = tableInfo.getKeyProperty();
keyColumn = tableInfo.getKeyColumn();
}
}
}
String sql = String.format(scriptSql, String.format(insertSql, tableInfo.getTableName(), columnScript), valuesScript, updateSqlColumn);
SqlSource sqlSource = languageDriver.createSqlSource(configuration, sql, modelClass);
return this.addInsertMappedStatement(mapperClass, modelClass, functionName, sqlSource, keyGenerator, keyProperty, keyColumn);
}
版权归原作者 keep_touching 所有, 如有侵权,请联系我们删除。