0


记一次 Flink mongoDB CDC 到Kafka遇到的问题

背景

最近在做一个数据接入的部分事情,从mongo导入到 adb,趁着做的事情聊一下Flink内部的一些机制。
首先这会拆分两个部分,一部分是从 mongo 到 Kafka,另一部分是从 Kafka 到 adb,其中遇到了一些问题,比如说 CDC 的机制,
upset kafka source 和 kafka source的一些区别等
mongo 的版本为 4.4.x

分析

mongo -> kafka

一开始时候 Flink source 是 mongo cdc sink 选择是 正常的 kafka
部分配置如下:

  1. // source
  2. CREATE TABLE products (
  3. ...
  4. PRIMARY KEY(_id) NOT ENFORCED
  5. ) WITH (
  6. 'connector' = 'mongodb-cdc',
  7. 'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  8. 'username' = 'flinkuser',
  9. 'password' = 'flinkpw',
  10. 'database' = 'inventory',
  11. 'collection' = 'products'
  12. );
  13. // sink
  14. CREATE TABLE KafkaTable (
  15. `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  16. `user_id` BIGINT,
  17. `item_id` BIGINT,
  18. `behavior` STRING
  19. ) WITH (
  20. ..
  21. 'connector' = 'kafka',
  22. 'key.json.ignore-parse-errors' = 'true',
  23. 'format' = 'debezium-json',
  24. )

这里选择的

  1. format

  1. debezium-json

,这在后续读取kafka数据进行

  1. Row Number over

操作的时候,会报错:

  1. StreamPhysicalOverAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[]..)

从意思来看

  1. Row Number over

操作是不支持 CDC产生的数据的(CDC会产生 +i +U -U 等数据),于是选择了 upsert kafka,

  1. upsert kafka

这里会有一个解释:

  1. 作为 sinkupsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)

所以这里我们选择把 kakfa的数据转换成的正常的 数据流,而不是CDC数据,因为我们最终存储的 Adb 是可以支持upsert操作。
可以看到Flink 物理计划中 会额外多出一个

  1. StreamExecChangelogNormalize

算子,该流的具体流如下:

  1. mongo(4.4.x) -> StreamExecChangelogNormalize -> ConstraintEnforcer(NotNullEnforcer(fields=[_id])) -> kafkaSink

在这里插入图片描述

可以看到

  1. StreamExecChangelogNormalize

是在kafka sink之前的,也就是说

  1. StreamExecChangelogNormalize

是用来Flink用来产生CDC数据的,Flink SQL Planner 会自动为 Upsert 类型的 Source 生成一个 ChangelogNormalize 节点,并按照上述操作将其转换为完整的变更流;代价则是该算子节点需要存储体积巨大的 State 数据。具体可以参考深入解读 MongoDB CDC 的设计与实现,产生的CDC数据流如下:

  1. StreamExecChangelogNormalize.translateToPlanInternal
  2. ||
  3. \/
  4. ProcTimeDeduplicateKeepLastRowFunction.processElement
  5. ||
  6. \/
  7. ProcTimeDeduplicateKeepLastRowFunction.processLastRowOnChangelog
  1. processLastRowOnChangelog

这里会存有 keyedState 状态,但是为了补足这个带有CDC的数据的,所以这里得有依赖状态在flink端进行状态的转换,具体可以看:DeduplicateFunctionHelper.processLastRowOnChangelog 方法:

  1. static void processLastRowOnChangelog(
  2. RowData currentRow,
  3. boolean generateUpdateBefore,
  4. ValueState<RowData> state,
  5. Collector<RowData> out,
  6. boolean isStateTtlEnabled,
  7. RecordEqualiser equaliser)
  8. throws Exception {
  9. RowData preRow = state.value();
  10. RowKind currentKind = currentRow.getRowKind();
  11. if (currentKind == RowKind.INSERT || currentKind == RowKind.UPDATE_AFTER) {
  12. if (preRow == null) {
  13. // the first row, send INSERT message
  14. currentRow.setRowKind(RowKind.INSERT);
  15. out.collect(currentRow);
  16. } else {
  17. if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {
  18. // currentRow is the same as preRow and state cleaning is not enabled.
  19. // We do not emit retraction and update message.
  20. // If state cleaning is enabled, we have to emit messages to prevent too early
  21. // state eviction of downstream operators.
  22. return;
  23. } else {
  24. if (generateUpdateBefore) {
  25. preRow.setRowKind(RowKind.UPDATE_BEFORE);
  26. out.collect(preRow);
  27. }
  28. currentRow.setRowKind(RowKind.UPDATE_AFTER);
  29. out.collect(currentRow);
  30. }
  31. }
  32. // normalize row kind
  33. currentRow.setRowKind(RowKind.INSERT);
  34. // save to state
  35. state.update(currentRow);
  36. } else {
  37. // DELETE or UPDATER_BEFORE
  38. if (preRow != null) {
  39. // always set to DELETE because this row has been removed
  40. // even the input is UPDATE_BEFORE, there may no UPDATE_AFTER after it.
  41. preRow.setRowKind(RowKind.DELETE);
  42. // output the preRow instead of currentRow,
  43. // because preRow always contains the full content.
  44. // currentRow may only contain key parts (e.g. Kafka tombstone records).
  45. out.collect(preRow);
  46. // clear state as the row has been removed
  47. state.clear();
  48. }
  49. // nothing to do if removing a non-existed row
  50. }
  51. }

kafka -> adb

一开始时候的kafka 我们选择了常规的 kafka source

  1. CREATE TABLE KafkaTable (
  2. ...
  3. ) WITH (
  4. 'connector' = 'kafka',
  5. 'topic' = 'user_behavior',
  6. 'properties.bootstrap.servers' = 'localhost:9092',
  7. 'properties.group.id' = 'testGroup',
  8. 'scan.startup.mode' = 'earliest-offset',
  9. 'format' = 'json'
  10. )

这里获取到的数据就是 正常的json数据,而不是 debezium-json数据,具体区别,可以参考下面的说明。
如果选择一开始的 kafka sink是 kafka的话 可以看到这里的物理计划流向为:

  1. kafka -> SinkMaterializer -> adb sink

对于为什么会出现 SinkMaterializer, 为了解决 changlog的乱序问题,为下游提供一个正确的upsert视图, 产生

  1. SinkMaterializer

物理算子的数据流如下:

  1. StreamExecSink
  2. ||
  3. \/
  4. createSinkTransformation // 这里有 final boolean needMaterialization = !inputInsertOnly && upsertMaterialize; 会插入SinkUpsertMaterializer算子
  5. ||
  6. \/
  7. SinkUpsertMaterializer //table.exec.state.ttl的设置
  8. ||
  9. \/
  10. SinkUpsertMaterializer.processElement // 这里有 keyed state

当然

  1. SinkUpsertMaterializer

这个算子也是可以通过配置 table.exec.sink.upsert-materialize 控制的

,因为我们现在选择 kafka sink的是

  1. upsert kafka

这里会消除掉cdc数据,所以不存在以上的

  1. SinkUpsertMaterializer

.

debezium-json的格式与 json的格式区别

  1. debezium-json 变成 {before:, after:, op:} before after里才是真正的数据
  2. json 直接就是 json格式数据

这里的区别的具体数据流可以参考如下,主要是 对mongo数据的处理:

  1. KafkaDynamicTableFactory.createDynamicTableSink
  2. ||
  3. \/
  4. KafkaDynamicSink.getSinkRuntimeProvider
  5. ||
  6. \/
  7. final SerializationSchema<RowData> valueSerialization =
  8. createSerialization(context, valueEncodingFormat, valueProjection, null);
  9. ||
  10. \/
  11. DynamicKafkaRecordSerializationSchema 这里会用到
  12. ||
  13. \/
  14. valueSerialized = valueSerialization.serialize(valueRow);

valueSerialization 这会有多种序列化的方式,如:

  1. debezium-json

对应

  1. DebeziumJsonSerializationSchema
  1. json

对应

  1. JsonRowDataSerializationSchema
标签: flink kafka 大数据

本文转载自: https://blog.csdn.net/monkeyboy_tech/article/details/142903665
版权归原作者 鸿乃江边鸟 所有, 如有侵权,请联系我们删除。

“记一次 Flink mongoDB CDC 到Kafka遇到的问题”的评论:

还没有评论