FlinkSql使用ES sink并指定主键,为什么数据还是会被覆盖?
1. 问题描述
根据ES connector文档中的描述,创建ES表并指定主键后将采用upsert模式。
但是在实际的使用过程中却发现部分数据仍然存在被直接覆盖的问题。
举个例子,假如ES中本来的数据如下图中示例1所示,在FlinkSql中要写入的数据如示例2所示,按照upsert的理解执行结果应该如示例3所示,但实际结果为示例4所示,即对整个json文档进行了覆盖,而不是部分字段的更新。
实际发生问题的场景如下:
- 情形一:在任务启动前es中已经存在数据,任务启动后,发现部分主键key对应的json文档直接被覆盖成任务中新写入的数据
- 情形二:任务中存在A和B两个insert into语句往同一个es表中写入数据,A和B除主键key之外其他写入的字段不完全相同,任务启动后,发现部分主键key对应的数据只包含A或B的结果。 上述两种情景的sql中均包含join、groupBy等操作。
2. 问题排查
针对上述问题,在网上查找一番后并未直接找到结果。大多数给出的答案正如官方文档所述,会执行upsert操作。
针对自己在实际使用过程中遇到的问题,尝试从源码中一探究竟。
接下来,首先需要找到源码的切入点,从切入点设置断点进行逐步分析。
2.1. 自定义sink原理
源码基于flink 1.14.4
根据官方文档中对自定义sink的描述,connector sink的的工作原理如下
元数据的内容由create table语句所定义,通过
CatalogTable
的实例表示,该实例表示
Catalog
中未解析的元数据,包含可以在create table语句中表示的所有特征,框架将其解析为
ResolvedCatalogTable
实例(一个经过验证的CatalogTable),然后再将其传递给
DynamicTableSinkFactory
,最终传入到
DynamicTableSink
,用于insert into语句的写入。
**
DynamicTableSinkFactory
负责提供connector的具体实现逻辑。用于将
CatalogTable
的元数据转化为
DynamicTableSink
。**
其接口中仅有一个方法,返回
DynamicTableSink
实例。
publicinterfaceDynamicTableSinkFactoryextendsDynamicTableFactory{/**
* 根据CatalogTable的元数据转化为DynamicTableSink,CatalogTable通过org.apache.flink.table.factories.DynamicTableFactory.Context中获取
* 实现应该此方法中执行create table中with选项的验证
*/DynamicTableSinkcreateDynamicTableSink(Context context);}
**
DynamicTableSink
负责将动态表sink到外部存储。**
动态表的内容可以被视为一个changelog,所有的更改都会连续写出,直到changelog结束为止。给定的
ChangelogMode
指示sink在运行时接受的一组更改。
该接口的实例可以看作是最终生成用于写入实际数据的具体运行时实现的工场。
最后planner将调用
getSinkRuntimeProvider
方法来获取运行时实现的提供程序,即
SinkRuntimeProvider
的实例。
**因此自定义sink的核心是通过
DynamicTableSink#getSinkRuntimeProvider
方法向planner提供一个
SinkRuntimeProvider
实例,该实例中包含了向外部存储写入数据的关键。**
publicinterfaceDynamicTableSink{/**
* 返回sink在运行时接受的一组更改
* planner可以提出建议,但sink拥有最终决定权。如果planner不支持此模式,它将引发错误。
*/ChangelogModegetChangelogMode(ChangelogMode requestedMode);/**
* 返回用于写入数据的运行时实现,即SinkRuntimeProvider的实例
* 表运行时独立于provider接口,要求sink实现接受内部数据结构(RowData)
*/SinkRuntimeProvidergetSinkRuntimeProvider(Context context);interfaceContext{booleanisBounded();<T>TypeInformation<T>createTypeInformation(DataType consumedDataType);DataStructureConvertercreateDataStructureConverter(DataType consumedDataType);}interfaceDataStructureConverterextendsRuntimeConverter{@NullableObjecttoExternal(@NullableObject internalStructure);}/**
* 提供用于写入数据的实际运行时实现
* SinkProvider 是sink的核心接口。
* flink-table-api-java-bridge 中的 SinkFunctionProvider 和 OutputFormatProvider 可用于向后兼容。 *
*/interfaceSinkRuntimeProvider{// marker interface}}
了解了自定义sink的实现原理后,接下里看下es sink的具体实现。
2.2. ES sink实现
本文中使用的es connector版本为
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>1.14.4</version></dependency>
DynamicTableSinkFactory
和
DynamicTableSink
在es sink中的实现分别为
Elasticsearch6DynamicSinkFactory
,
Elasticsearch6DynamicSink
。
Elasticsearch6DynamicSinkFactory
的实现如下,获取到scheme后对schema和配置参数进行验证,最后返回
Elasticsearch6DynamicSink
实例。
publicclassElasticsearch6DynamicSinkFactoryimplementsDynamicTableSinkFactory{@OverridepublicDynamicTableSinkcreateDynamicTableSink(Context context){// 从CatalogTable中获取schemaTableSchema tableSchema = context.getCatalogTable().getSchema();// 验证主键ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);finalFactoryUtil.TableFactoryHelper helper =FactoryUtil.createTableFactoryHelper(this, context);// 得到formatfinalEncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class,FORMAT_OPTION);
helper.validate();Configuration configuration =newConfiguration();
context.getCatalogTable().getOptions().forEach(configuration::setString);Elasticsearch6Configuration config =newElasticsearch6Configuration(configuration, context.getClassLoader());// 验证参数validate(config, configuration);// 最终返回Elasticsearch6DynamicSinkreturnnewElasticsearch6DynamicSink(
format, config,TableSchemaUtils.getPhysicalSchema(tableSchema));}}
Elasticsearch6DynamicSink
的实现如下,
getChangelogMode
方法表示ES sink接受除UPDATE_BEFORE类型之外的其他更改。
getSinkRuntimeProvider
方法返回了
SinkFunctionProvider
实例,为
SinkRuntimeProvider
的子接口。
通过实现
SinkFunctionProvider
接口中的
SinkFunction<RowData> createSinkFunction()
方法,从而得到
SinkFunction
实例。
finalclassElasticsearch6DynamicSinkimplementsDynamicTableSink{@OverridepublicChangelogModegetChangelogMode(ChangelogMode requestedMode){ChangelogMode.Builder builder =ChangelogMode.newBuilder();for(RowKind kind : requestedMode.getContainedKinds()){// 不接受update_before类型的更改if(kind !=RowKind.UPDATE_BEFORE){
builder.addContainedKind(kind);}}return builder.build();}@OverridepublicSinkFunctionProvidergetSinkRuntimeProvider(Context context){return()->{SerializationSchema<RowData> format =this.format.createRuntimeEncoder(context, schema.toRowDataType());// 负责生成真正和es交互的ActionRequest实例,如IndexRequest、UpdateRequest、DeleteRequest等finalRowElasticsearchSinkFunction upsertFunction =newRowElasticsearchSinkFunction(IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
config.getDocumentType(),
format,XContentType.JSON,REQUEST_FACTORY,KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));finalElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(), upsertFunction);
builder.setFailureHandler(config.getFailureHandler());
builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
builder.setBulkFlushMaxSizeMb((int)(config.getBulkFlushMaxByteSize()>>20));
builder.setBulkFlushInterval(config.getBulkFlushInterval());
builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);// we must overwrite the default factory which is defined with a lambda because of a bug// in shading lambda serialization shading see FLINK-18006if(config.getUsername().isPresent()&& config.getPassword().isPresent()&&!StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())&&!StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())){
builder.setRestClientFactory(newAuthRestClientFactory(
config.getPathPrefix().orElse(null),
config.getUsername().get(),
config.getPassword().get()));}else{
builder.setRestClientFactory(newDefaultRestClientFactory(config.getPathPrefix().orElse(null)));}// 最终是得到ElasticsearchSinkfinalElasticsearchSink<RowData> sink = builder.build();if(config.isDisableFlushOnCheckpoint()){
sink.disableFlushOnCheckpoint();}return sink;};}}publicinterfaceSinkFunctionProviderextendsDynamicTableSink.SinkRuntimeProvider,ParallelismProvider{/** 返回SinkFunction的实例 */SinkFunction<RowData>createSinkFunction();}
因此planner在ES sink的实现中最终得到了
ElasticsearchSink
的实例。该实例负责调用ES相应API与ES进行数据交互。
2.3. 实际与ES的交互
通过
ElasticsearchSink
的继承关系,可以发现该类直接继承
ElasticsearchSinkBase
,并最终继承了
SinkFunction
。
SinkFunction
是实现自定义sink的根接口。每条写入sink的数据都将会调用该接口中的
invoke
方法。
ElasticsearchSinkBase
对此方法的实现如下,在其中通过调用
ElasticsearchSinkFunction#process
方法进行数据写入。
privatefinalElasticsearchSinkFunction<T> elasticsearchSinkFunction;@Overridepublicvoidinvoke(T value,Context context)throwsException{checkAsyncErrorsAndRequests();// 调用ElasticsearchSinkFunction的process方法
elasticsearchSinkFunction.process(value,getRuntimeContext(), requestIndexer);}
在ES sink中
ElasticsearchSinkFunction
的唯一实现是
RowElasticsearchSinkFunction
类,其关键代码如下。
classRowElasticsearchSinkFunctionimplementsElasticsearchSinkFunction<RowData>{privatefinalRequestFactory requestFactory;@Override/**
* 每条数据和ES实际交互的方法
*/publicvoidprocess(RowData element,RuntimeContext ctx,RequestIndexer indexer){// 根据每条数据的RowKind类型得到es api的UpdateRequest或DeleteRequest实例switch(element.getRowKind()){caseINSERT:caseUPDATE_AFTER:// insert和update_after时,进行更新processUpsert(element, indexer);break;caseUPDATE_BEFORE:caseDELETE:// update_before时,删除数据processDelete(element, indexer);break;default:thrownewTableException("Unsupported message kind: "+ element.getRowKind());}}// 对数据进行IndexRequest或UpdateRequest请求privatevoidprocessUpsert(RowData row,RequestIndexer indexer){finalbyte[] document = serializationSchema.serialize(row);finalString key = createKey.apply(row);if(key !=null){// 指定主键时,得到UpdateRequestfinalUpdateRequest updateRequest =
requestFactory.createUpdateRequest(
indexGenerator.generate(row), docType, key, contentType, document);
indexer.add(updateRequest);}else{// 未指定主键时,得到IndexRequestfinalIndexRequest indexRequest =
requestFactory.createIndexRequest(
indexGenerator.generate(row), docType, key, contentType, document);
indexer.add(indexRequest);}}// 对数据进行DeleteRequest请求privatevoidprocessDelete(RowData row,RequestIndexer indexer){finalString key = createKey.apply(row);finalDeleteRequest deleteRequest =
requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
indexer.add(deleteRequest);}}
其中
requestFactory.createUpdateRequest
、
requestFactory.createIndexRequest
、
requestFactory.createDeleteRequest
的方法分别如下
privatestaticclassElasticsearch6RequestFactoryimplementsRequestFactory{@OverridepublicUpdateRequestcreateUpdateRequest(String index,String docType,String key,XContentType contentType,byte[] document){returnnewUpdateRequest(index, docType, key).doc(document, contentType).upsert(document, contentType);}@OverridepublicIndexRequestcreateIndexRequest(String index,String docType,String key,XContentType contentType,byte[] document){returnnewIndexRequest(index, docType, key).source(document, contentType);}@OverridepublicDeleteRequestcreateDeleteRequest(String index,String docType,String key){returnnewDeleteRequest(index, docType, key);}}
到此,我们已经找到了ES sink和ES进行交互的具体逻辑,对于changelog中每条数据根据其
RowKind
类型决定执行INSERT、UPDATE还是DELETE请求。并且根据是否指定主键,细分为INSERT或UPDATE。
- 指定主键,只会进行UPDATE和DELETE操作。
- 未指定主键,只进行INSERT操作。
综上所述,在指定主键的情况下,如果changelog中只有INSERT和UPDATE_AFTER类型的数据时,那么仅调用UpdateRequest API将数据写入ES,数据只会执行Upsert操作。但是针对实际结果中发生数据覆盖的情况,只可能是存在DELETE类型的数据,导致其执行了processDelete方法,先将数据删除,随着changelog中数据不断到来再次执行processUpsert方法。最终导致结果中只保留了最新写入的数据,即看到的数据覆盖的情况。
DELETE类型的数据是否会真正生成?什么时机生成?
3. 验证演示
验证使用的示例代码如下,kafka中为随机生成的账单id从1到5,时间递增的账单数据。SQL加工逻辑为汇总每个id、每小时的账单金额,将结果写入到使用账单id作为主键的es表中。
publicclassEsConnectorSQLTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =EnvConf.getEnvWithUI();StreamTableEnvironment tEnv =StreamTableEnvironment.create(env);
env.setParallelism(1);// 创建source表,交易表
tEnv.executeSql("CREATE TABLE transactions (\n"+" account_id BIGINT,\n"+" amount BIGINT,\n"+" transaction_time TIMESTAMP(3),\n"+" WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n"+") WITH (\n"+" 'connector' = 'kafka',\n"+" 'topic' = 'transactions',\n"+" 'properties.bootstrap.servers' = '******',\n"+" 'scan.startup.mode' = 'latest-offset',\n"+" 'format' = 'csv'\n"+")");// 创建sink表,消费报告表
tEnv.executeSql("CREATE TABLE spend_report (\n"+" account_id BIGINT,\n"+" log_ts TIMESTAMP(3),\n"+" amount BIGINT\n,"+" PRIMARY KEY (account_id) NOT ENFORCED"+") WITH (\n"+" 'connector' = 'elasticsearch-6',\n"+" 'hosts' = ******n"+" 'index' = '******',\n"+" 'document-type' = '******',\n"+" 'sink.bulk-flush.max-actions' = '1',\n"+" 'username' = '******',\n"+" 'password' = '******',\n"+" 'format' = 'json'\n"+")");// 从交易表中写数据到消费报告表中Table report =report(tEnv.from("transactions"));
report.executeInsert("spend_report");// 通过如下代码观察select的执行结果数据,changelog// report.execute().print();}publicstaticTablereport(Table transactions){/*
* 将每个账号在小时内的金额进行汇总
* */return transactions.select(
$("account_id"),
$("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
$("amount")).groupBy($("account_id"), $("log_ts")).select(
$("account_id"),
$("log_ts"),
$("amount").sum().as("amount"));}}
通过在
RowElasticsearchSinkFunction#process
方法中设置断点,可以观察到对于同一个key的确产生了
-D
类型的数据,导致删除了ES中数据,后续再执行upsert方法时,由于之前的数据已经被删除,所以更新后只剩下了新写入的数据。
通过观察示例代码main方法中
report.execute().print()
结果,对于各个id首次聚合结果类型为
+I
,当数据执行更新时,先生成
-U
类型结果,将原先的结果回撤,然后紧跟着生成
+U
类型的最新结果。
selece的结果中并不会产生
-D
类型的数据,但是实际到达
RowElasticsearchSinkFunction#process
方法时却包含了
-D
类型的数据,因此
-D
类型的数据并不是有select语句直接得到的。
+----+----------------------+-------------------------+----------------------+
| op | account_id | log_ts | amount |
+----+----------------------+-------------------------+----------------------+
| +I | 5 | 2565-04-12 05:00:00.000 | 461 |
| +I | 1 | 2565-04-12 05:00:00.000 | 517 |
| +I | 2 | 2565-04-12 05:00:00.000 | 630 |
| +I | 3 | 2565-04-12 05:00:00.000 | 640 |
| +I | 4 | 2565-04-12 05:00:00.000 | 356 |
| -U | 5 | 2565-04-12 05:00:00.000 | 461 |
| +U | 5 | 2565-04-12 05:00:00.000 | 1140 |
| -U | 1 | 2565-04-12 05:00:00.000 | 517 |
| +U | 1 | 2565-04-12 05:00:00.000 | 688 |
| +I | 2 | 2565-04-12 06:00:00.000 | 35 |
| +I | 3 | 2565-04-12 06:00:00.000 | 524 |
| +I | 4 | 2565-04-12 06:00:00.000 | 290 |
| +I | 5 | 2565-04-12 06:00:00.000 | 612 |
| +I | 1 | 2565-04-12 06:00:00.000 | 12 |
| -U | 2 | 2565-04-12 06:00:00.000 | 35 |
| +U | 2 | 2565-04-12 06:00:00.000 | 903 |
| -U | 3 | 2565-04-12 06:00:00.000 | 524 |
| +U | 3 | 2565-04-12 06:00:00.000 | 1353 |
3.1. SinkMateriallizer任务
回到示例代码中,数据的加工逻辑是对数据进行Group by后的聚合处理,查看其作业DAG,可以发现有3个任务节点,分别是KafkaSource->GroupAggregate->SinkMateriallizer,前两个任务节点的作用容易理解,第三个名为SinkMaterializer节点除了最后将数据写入到ES外,还做了什么操作?算子名和算子中的
upsertMaterialize=[true]
表示什么含义?
根据算子名称并结合断点分析,数据会先进入到
GroupAggFunction#processElement
方法中执行聚合计算,并将聚合结果继续发送给了下游
SinkUpsertMaterializer#processElement
方法中,聚合结果最终在此方法中生成供上述
RowElasticsearchSinkFunction#process
方法中写入到ES的数据,接下来重点了解下
SinkUpsertMaterializer#processElement
的具体实现。
!!! note “”
SinkUpsertMaterializer类的作用是为了解决changelog中乱序流造成的结果不正确问题。
该维护了一个
List<RowData>
的状态,该状态中保存着changelog中相同key的结果数据,并且根据已保存到状态中的数据进行乱序处理。
publicclassSinkUpsertMaterializerextendsTableStreamOperator<RowData>implementsOneInputStreamOperator<RowData,RowData>{// Buffer of emitted insertions on which deletions will be applied first.// RowKind可能是+I或+U,并且在应用删除时将被忽略privatetransientValueState<List<RowData>> state;@OverridepublicvoidprocessElement(StreamRecord<RowData> element)throwsException{finalRowData row = element.getValue();List<RowData> values = state.value();if(values ==null){
values =newArrayList<>(2);}switch(row.getRowKind()){caseINSERT:caseUPDATE_AFTER:
row.setRowKind(values.isEmpty()?INSERT:UPDATE_AFTER);
values.add(row);
collector.collect(row);break;caseUPDATE_BEFORE:caseDELETE:finalint lastIndex = values.size()-1;finalint index =removeFirst(values, row);if(index ==-1){LOG.info(STATE_CLEARED_WARN_MSG);return;}if(values.isEmpty()){// 此处会将元素的RowKind重新赋值为-D
row.setRowKind(DELETE);
collector.collect(row);}elseif(index == lastIndex){// Last row has been removed, update to the second last onefinalRowData latestRow = values.get(values.size()-1);
latestRow.setRowKind(UPDATE_AFTER);
collector.collect(latestRow);}break;}if(values.isEmpty()){
state.clear();}else{
state.update(values);}}privateintremoveFirst(List<RowData> values,RowData remove){finalIterator<RowData> iterator = values.iterator();int i =0;while(iterator.hasNext()){finalRowData row = iterator.next();// 忽略RowKind类型,对RowData中其他数据进行比较
remove.setRowKind(row.getRowKind());if(equaliser.equals(row, remove)){
iterator.remove();return i;}
i++;}return-1;}}
processElement方法大致处理逻辑如下流程图所示。
举个例子,
情形一:假如source表中有数据如左侧所示,经过示例sql处理后,输出了右侧的changelog结果
changelog
+----+------------+---------------+--------+
| op | account_id | log_hour | amount |
+----+------------+---------------+--------+
| +I | 3 | 2565-04-12 05 | 640 |
| +I | 3 | 2565-04-12 06 | 524 |
| -U | 3 | 2565-04-12 06 | 524 |
| +U | 3 | 2565-04-12 06 | 1353 |
此种情况下changelog数据经过processElement方法后输出结果为:
- 当第一条数据到达后,数据中RowKind=+I,状态为空,输出RowKind=+I的结果到下游;
- 当第二条数据到达后,数据中RowKind=+I,状态不为空,输出RowKind=+U的结果到下游,这时状态中保存了changlog中第一条和第二条数据内容;
- 当第三条数据到达后,数据中RowKind=-U,状态集合中最大index=1(lastindex),依次比较集合中元素和当前数据内容是否相同,发现集合中第二条数据和当前数据一致,因此返回index=1,并将从集合中移除index=1的元素。此时集合中仅剩amount=640的一个元素; 接下来满足lastindex=index的条件,因此获取集合中最后一条数据,即amount=640的那条数据,输出Rowkind= +U的结果到下游;
- 当第四条数据到达后,数据中RowKind=+U,状态不为空,输出RowKind=+U的结果到下游。
情形二:假如source表中有数据如左侧所示,经过示例sql处理后,输出了右侧的changelog结果
changelog
+----+------------+---------------+--------+
| op | account_id | log_hour | amount |
+----+------------+---------------+--------+
| +I | 3 | 2565-04-12 06 | 524 |
| -U | 3 | 2565-04-12 06 | 524 |
| +U | 3 | 2565-04-12 06 | 1353 |
此种情况下changelog数据经过processElement方法后输出结果为:
- 当第一条数据到达后,数据中RowKind=+I,状态为空,输出RowKind=+I的结果到下游;
- 当第二条数据到达后,数据中RowKind=-U,集合中最大index=0(lastindex),依次比较集合中元素和当前数据内容是否相同,发现集合中第一条数据和当前数据一致,因此返回index=0,并将从集合中移除index=0的元素。此时集合为空。 接下来满足了集合为空的条件,输出Rowkind=-D的结果到下游;
- 当第三条数据到达后,数据中RowKind=+U,状态为空,输出RowKind=+I的结果到下游。
4. 总结
导致写入到ES的数据从结果上看没有执行upsert而是执行了insert的根本原因是中间过程中执行了delete操作,将es中原始数据删除,然后继续写入了新数据。如果在delete执行后未继续写入新数据前停止任务,观察到的结果过将会是ES中数据条数减少。
而导致从ES中删除数据的原因是因为经过SinkUpsertMaterializer类处理后,将部分
RowKind=-U
的数据转化成了
RowKind=-D
类型的数据,从而触发ES sink执行
DeleteRequest
请求。
版权归原作者 有数的编程笔记 所有, 如有侵权,请联系我们删除。