《Flink 详解》系列(已完结),共包含以下
10
10
10 篇文章:
- 【大数据】Flink 详解(一):基础篇(架构、并行度、算子)
- 【大数据】Flink 详解(二):核心篇 Ⅰ(窗口、WaterMark)
- 【大数据】Flink 详解(三):核心篇 Ⅱ(状态 State)
- 【大数据】Flink 详解(四):核心篇 Ⅲ(Checkpoint、Savepoint、Exactly-Once)
- 【大数据】Flink 详解(五):核心篇 Ⅳ(反压、序列化、内存模型)
- 【大数据】Flink 详解(六):源码篇 Ⅰ(作业提交、Local 方式、YARN 方式、K8s 方式)
- 【大数据】Flink 详解(七):源码篇 Ⅱ(作业图、执行图、调度、作业生命周期、Task Slot)
- 【大数据】Flink 详解(八):SQL 篇 Ⅰ(Flink SQL)
- 【大数据】Flink 详解(九):SQL 篇 Ⅱ(Flink SQL CEP)
- 【大数据】Flink 详解(十):SQL 篇 Ⅲ(Flink SQL CDC)
😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!
Flink 详解(十):SQL 篇 Ⅲ
102.Flink CDC 了解吗?什么是 Flink SQL CDC Connectors?
在 Flink
1.11
引入了 CDC 机制,CDC 的全称是
Change Data Capture
,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。
Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors,开源地址:https://github.com/ververica/flink-cdc-connectors。
目前(
1.13
版本)支持的 Connectors 如下:
另外支持解析 Kafka 中
debezium-json
和
canal-json
格式的 Change Log,通过 Flink 进行计算或者直接写入到其他外部数据存储系统(比如 Elasticsearch),或者将 Changelog Json 格式的 Flink 数据写入到 Kafka。
Flink CDC Connectors 和 Flink 之间的版本映射:
103.Flink CDC 原理介绍一下
在最新 CDC 调研报告中,
Debezium
和
Canal
是目前最流行使用的 CDC 工具,这些 CDC 工具的核心原理是 抽取数据库日志 获取变更。在经过一系列调研后,目前 Debezium(支持全量、增量同步,同时支持 MySQL、PostgreSQL、Oracle 等数据库),使用较为广泛。
Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将
changelog
转换为 Flink SQL 认识的 RowData 数据。(以下右侧是 Debezium 的数据格式,左侧是 Flink 的 RowData 数据格式)。
RowData 代表了一行的数据,在 RowData 上面会有一个元数据的信息 RowKind,RowKind 里面包括了 插入(
+I
)、更新前(
-U
)、更新后(
+U
)、删除(
-D
),这样和数据库里面的
binlog
概念十分类似。
通过 Debezium 采集的数据,包含了旧数据(
before
)和新数据行(
after
)以及元数据信息(
source
),
op
的
u
表示是
update
更新操作标识符(
op
字段的值
c
,
u
,
d
,
r
分别对应
create
,
update
,
delete
,
read
),
ts_ms
表示同步的时间戳。
104.通过 CDC 设计一种 Flink SQL 的 ETL 一体化的实时数仓
设计图如下:
通过 Flink CDC Connectors 替换
Debezium + Kafka
的数据采集模块,实现 Flink SQL 的 ETL 一体化,以 MySQL 为 Source 源,Flink CDC 中间件为插件,ES、Kafka 或者其他为 Sink,这样设计的优点如下:
- 开箱即用,简单易上手。
- 减少维护的组件,简化实时链路,减轻部署成本。
- 减小端到端延迟。
- Flink 自身支持
Exactly Once
的读取和计算。 - 数据不落地,减少存储成本。
- 支持全量和增量流式读取。
binlog
采集位点可回溯。
105.Flink SQL CDC 如何实现一致性保障(源码分析)
Flink SQL CDC 用于获取数据库变更日志的 Source 函数是
DebeziumSourceFunction
,且最终返回的类型是 RowData,该函数实现了
CheckpointedFunction
,即通过 Checkpoint 机制来保证发生
failure
时不会丢数,实现
exactly once
语义,这部分在函数的注释中有明确的解释。
/**
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
* from databases into Flink.
* 通过Checkpoint机制来保证发生failure时不会丢数,实现exactly once语义
* <p>The source function participates in checkpointing and guarantees that no data is lost
* during a failure, and that the computation processes elements "exactly once".
* 注意:这个Source Function不能同时运行多个实例
* <p>Note: currently, the source function can't run in multiple parallel instances.
*
* <p>Please refer to Debezium's documentation for the available configuration properties:
* https://debezium.io/documentation/reference/1.2/development/engine.html#engine-properties</p>
*/@PublicEvolvingpublicclassDebeziumSourceFunction<T>extendsRichSourceFunction<T>implementsCheckpointedFunction,ResultTypeQueryable<T>{}
为实现
CheckpointedFunction
,需要实现以下两个方法:
publicinterfaceCheckpointedFunction{// 做快照,把内存中的数据保存在checkpoint状态中voidsnapshotState(FunctionSnapshotContext var1)throwsException;// 程序异常恢复后从checkpoint状态中恢复数据voidinitializeState(FunctionInitializationContext var1)throwsException;}
接下来我们看看
DebeziumSourceFunction
中都记录了哪些状态。
/** Accessor for state in the operator state backend.
offsetState中记录了读取的binlog文件和位移信息等,对应Debezium中的
*/privatetransientListState<byte[]> offsetState;/**
* State to store the history records, i.e. schema changes.
* historyRecordsState记录了schema的变化等信息
* @see FlinkDatabaseHistory
*/privatetransientListState<String> historyRecordsState;
我们发现在 Flink SQL CDC 是一个相对简易的场景,没有中间算子,是通过 Checkpoint 持久化
binglog
消费位移和
schema
变化信息的快照,来实现 Exactly Once。
106.Flink SQL GateWay 了解吗?
Flink SQL Gateway 是 Flink 集群的 任务网关,支持以 RestAPI 的形式提交查询、插入、删除等任务,如下图所示:
总体架构如下图所示:
107.Flink SQL GateWay 创建会话讲解一下?
创建会话流程图如下:
- 传入参数包含
name
名称、planner
执行引擎(Blink 或原生的 Flink)、executetype
(streaming
或者batch
)、properties
(配置参数,如并发度等)。 - 在 SessionMnager 中,会根据这些参数创建对应的 SessionContext。
SessionContext sessionContext =newSessionContext(sessionName, sessionId, sessionEnv, defaultContext);
- 将创建 Session 放入 Map 集合中,最后返回对应的 SessionId,方便后续使用。
sessions.put(sessionId,session);return sessionId;
108.Flink SQL GateWay 如何处理并发请求?多个提交怎么处理?
SQL GateWay 内部维护 SessionManager,里面通过 Map 维护了各个 Session,每个 Session 的任务执行是独立的。同一个 Session 通过 ExecuteContext 内部的
tEnv
(
TableEnvironment
)按顺序提交。
109.如何维护多个 SQL 之间的关联性?
在每个 Session 中单独维护了
tEnv
,同一个 Session 中的操作其实是在一个
env
中执行的。因此只要是同一个 Session 中的任务,内部使用的
tEnv
就是同一个。这样就可以实现在一个 Session 中,先创建一个
view
,然后执行一个
select
,最后执行一个
insert
。
110.SQL 字符串如何提交到集群成为代码?
Session 中维护了
tEnv
,SQL 会通过
tEnv
编译生成 Pipeline(即 DAG 图),在
batch
模式下是 Plan 执行计划;在
stream
模式下是 StreamGraph。然后 Session 内部会创建一个 ProgramDeployer 代码发布器,根据 Flink 中配置的
target
创建不同的
excutor
。最后调用
executor.execute
方法提交 Pipeline 和 Config 执行。
版权归原作者 G皮T 所有, 如有侵权,请联系我们删除。