0


Flink cdc3.0动态变更表结构——源码解析

文章目录

前言

上一篇Flink cdc3.0同步实例 介绍了最新的一些功能和问题,本篇来看下新功能之一的动态变更表结构的具体实现。
在 Flink 中,应用程序由流数据流组成,这些数据流是由用户定义的Operators进行转换。
在这里插入图片描述
Flink CDC 3.0 框架中流动的数据类型被称为Event,代表外部系统产生的变更事件。每个事件都标有发生更改的表 ID 。事件分为

SchemaChangeEvent

DataChangeEvent

,分别代表表结构和数据的变化。处理schema变更的Operators对应图中的

SchemaOperator


在这里插入图片描述
(以下代码使用Flink Release 3.0.0)

源码解析

1. 接收schema变更事件

我们以添加字段触发的

AddColumnEvent

为例,它实现了

SchemaChangeEvent

SchemaOperator

当接收到有

AddColumnEvent

事件时,会在

processElement 

中调用

handleSchemaChangeEvent

处理。
在这里插入图片描述

2. 发起schema变更请求

说明下这里的response实际是直接返回的

new SchemaChangeResponse(true)

, 由于构造的

shouldSendFlushEvent

直接传入true, 所以后续也会进入if条件。我们接着

requestSchemaChange

方法看
在这里插入图片描述
由于知道response是直接创建的已知结果,因此

responseFuture.get()

也不会阻塞。我们接着来看

toCoordinator.sendRequestToCoordinator(getOperatorID(), new SerializedValue<>(request));

的实现
在这里插入图片描述

3. schema变更请求具体处理

通过几层的调用,上述变更请求会走到

SchemaRegistry

handleCoordinationRequest(CoordinationRequest request)

,我们的请求是

SchemaChangeRequest

,所以会调用

requestHandler.handleSchemaChangeRequest(schemaChangeRequest);

在这里插入图片描述
这里可以看到response 是直接创建的

SchemaChangeResponse(true)

。 接着

schemaManager.applySchemaChange(request.getSchemaChangeEvent());

注册新的schema。

在这里插入图片描述
另外还有个重点,在

startToWaitForReleaseRequest

方法中会重置responseFuture, 原本的response通过return返回了。而PendingSchemaChange中的response重置,主要就是为了等schema变更完成设计。(主线程会再次发起请求调用responseFuture.get() ,忽略这里会不理解后面为什么会阻塞)
在这里插入图片描述

4. 广播刷新事件并阻塞

回到第二部分,因为response是一个明确对象没有阻塞,返回后会直接广播

FlushEvent

schemaChangeEvent

(再次发起

schemaChangeEvent

不是很理解)。之后

requestReleaseUpstream

请求调用

responseFuture.get()

会阻塞,因为response在第三步已经重置为

new CompletableFuture<>()

, 利用的1.8的特性。这也是收到变更事件后要保证sink端变更才能发放数据。
在这里插入图片描述

5. 处理FlushEvent

FlushEvent

由什么Operator处理,在官方架构图中其实没有指出,但是图标可以看出是通过sink端完成,我们可以找到

DataSinkWriterOperator

类,有对

FlushEvent

的处理。
在这里插入图片描述
实际调用

SchemaRegistry::handleEventFromOperator

方法,重点在

requestHandler.flushSuccess(flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask());

在这里插入图片描述
其中

applySchemaChange

就是在具体的sink端变更,下面会展开。 当变更完成后会执行

waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));

,实际就通知第4部分的response这里处理完了,可以正常放开数据流。
在这里插入图片描述

6. 修改sink端schema

每个sink端有自定义的

metadataApplier


在这里插入图片描述
我们以

DorisMetadataApplier

为例,

applyAddColumnEvent

会构造

addFieldSchema

,然后在

schemaChangeManager

中转换为对应的sql执行。
在这里插入图片描述

结尾

以上就是这两天对源码跟进的记录,后续思考使用local环境Debug中间过程。
flink cdc debug动态表结构变更


本文转载自: https://blog.csdn.net/yyoc97/article/details/136045023
版权归原作者 yyoc97 所有, 如有侵权,请联系我们删除。

“Flink cdc3.0动态变更表结构——源码解析”的评论:

还没有评论