AApache Flink 继续快速发展,是 Apache 最活跃的社区之一。共有 240 多位贡献者热情参与 Flink 1.16,完成了 19 个 FLIP和 1100 多个问题,为社区带来了许多令人兴奋的功能。
Flink 已经是流计算领域的佼佼者。流批一体化的理念逐渐被大家所认可,并在越来越多的企业中成功落地。之前的流批集成强调统一的API和统一的计算框架。今年 Flink 在此基础上推出了 Streaming Warehouse,进一步升级了流批融合的概念:真正完成了流批计算和流批存储的融合,从而实现了流的实时性。-批量集成分析。
在 1.16 版本中,Flink 社区在对流和批处理方面都完成了很多改进:
(1)在批处理方面,完成了易用性、稳定性和性能方面的全方位提升。 1.16 是 Fink 批处理的里程碑版本,也是迈向成熟的重要一步。
1)易用性:引入 SQL 网关,与 HiveServer2 完全兼容。用户可以轻松提交 Flink SQL 作业和 Hive SQL 作业,同时也可以轻松接入原有的 Hive 生态。
2)功能:Flink SQL 用户支持通过 Join Hint 指定 Join 策略,避免不合理的执行计划;Hive SQL 的兼容性达到了 94%,用户可以以极低的成本完成从 Hive 到 Flink 的迁移。
3)稳定性:通过预测执行减少作业的长尾,提高作业的整体运行稳定性;支持自适应HashJoin,通过故障回滚机制避免作业失败。
4)性能:对多分区表进行动态分区剪枝,提高处理效率。 TPC-DS在10TB规模数据集下性能提升30%;支持混合Shuffle模式,提高资源利用率和处理性能。
(2)在流处理方面,也进行了多项重大改进:
1)Changelog State Backend 可以为用户提供秒甚至毫秒级的检查点,极大地提升了容错体验,为事务性 Sink 作业提供了更小的端到端延迟体验。
2)维度表关联在流处理中被广泛使用。引入通用缓存机制加快维表查询速度,引入可配置异步模式提高维表查询吞吐量,引入可重试查询机制解决维表更新延迟问题。这些功能非常实用,解决了用户经常抱怨的痛点,支持更丰富的场景。
3)从 Flink SQL 诞生的第一天开始,一些非确定性的操作可能会导致用户作业的结果不正确或者作业的运行异常,给用户带来很大的麻烦。在1.16中,我们花了很多精力解决了大部分问题,未来会继续改进。
随着流批集成的进一步完善和 Flink Table Store(0.2 版本已经发布)的不断迭代,Flink 社区正在逐步推动 Streaming Warehouse 从概念走向现实,走向成熟。
1.理解流式仓库
流式仓库(Streaming Warehouse),更准确的说,其实就是“让数仓流式传输”,就是让整个数仓的所有分层数据都实时流动起来,从而实现端到端的纯流。结束实时性能。Streaming Service,并使用统一的API和计算框架来处理和分析流中的所有数据。
2.批处理
得益于我们对流处理的长期投入,流处理已成为流计算领域的佼佼者。在批处理方面,我们也付出了更多的努力,使其成为一个优秀的批处理引擎。流批处理的整体体验也会更加流畅。
2.1 SQL 网关
我们从各个渠道的反馈中了解到,SQL Gateway一直是用户期待的功能,尤其是批处理用户。在 1.16 中,这个特性终于完成了(设计见 FLIP-91)。SQL Gateway 是 SQL Client 的扩展和增强,支持多租户和插件 API 协议(Endpoint),解决了 SQL Client 只能服务单个用户,无法连接外部服务或组件的问题。目前,SQL Gateway 已经支持 REST API 和 HiveServer2 协议。用户可以通过cURL、Postman、HTTP客户端等多种编程语言提交流作业、批处理作业,甚至OLAP作业,链接到SQL Gateway。
2.2 Hive 语法兼容
为了降低 Hive 到 Flink 的迁移成本,我们在这个版本中引入了 HiveServer2 协议,并持续提升 Hive 语法的兼容性。
HiveServer2 协议允许用户使用 Hive JDBC/Beeline 与 SQL Gateway 交互,因此 Hive 生态系统(DBeaver、Apache Superset、Apache DolphinScheduler 和 Apache Zeppelin)可以轻松迁移到 Flink。当用户使用 HiveServer2 协议连接 SQL Gateway 时,SQL Gateway 会自动注册到 Hive Catalog,自动切换到 Hive 方言,自动批量提交作业。用户可以获得与直接使用 HiveServer2 相同的体验。
Hive 语法已成为大数据处理的事实标准。Flink 改进了与 Hive 语法的兼容性,并增加了对生产中常用的几种 Hive 语法的支持。兼容 Hive 语法可以帮助用户将现有的 Hive SQL 任务迁移到 Flink,也方便熟悉 Hive 语法的用户使用 Hive 语法编写 SQL 查询注册在 Flink 中的表。至此,基于Hive qtest测试集(包含12K SQL案例),Hive 2.3版本的查询兼容性达到94.1%,如果排除ACID查询语句,则达到97.3%。
2.3 Join Hint
Hint一直是业界用来干预执行计划以改善优化器缺陷的常用解决方案。 Join 是批处理作业中使用最广泛的算子,Flink 支持多种 join 策略。缺少统计信息或优化器的成本模型不完善会导致选择错误的 Join 策略,从而导致工作缓慢甚至有风险的失败。通过指定Join Hint,用户允许优化器尽可能选择用户指定的Join策略,从而避免优化器的各种不足,保证批处理作业的生产可用性。
2.4 自适应哈希联接
对于批处理作业,数据倾斜很常见,此时使用 HashJoin 可能会失败,这是一种非常糟糕的体验。为了解决这个问题,我们引入了一个自适应的HashJoin:一旦Join算子的操作失败,它可以自动回退到SortMergeJoin,它是Task粒度的。这种机制保证了HashJoin算子总是成功的,从而提高了作业的稳定性。
2.5 批处理的预测执行
为了解决问题机器导致的批处理慢的问题,Flink 1.16 引入了预测执行。问题机器是指存在硬件问题、突发 I/O 高或 CPU 负载高的机器,这些问题会使该机器上的任务运行速度比其他机器上的任务慢得多,从而影响批处理作业的整体执行时间。
当启用预测执行时,Flink 会持续检测慢任务。一旦检测到慢速任务,该任务所在的机器将被识别为问题机器,并通过黑名单机制(FLIP-224)将其列入黑名单。调度器将为慢速任务创建新的执行实例,并将它们部署到未被破解的节点,而现有的执行实例将继续运行。新的执行实例和旧的执行实例将处理相同的输入数据并产生相同的结果数据。一旦任何一个执行实例首先完成,它将被认为是该任务的唯一完成执行实例,并且该任务的其余执行实例将被取消。
大多数现有资源都可以使用推测执行(FLIP-245)。只有当 Source 使用 SourceEvent 时,它必须额外实现 SupportsHandleExecutionAttemptSourceEvent 接口以支持推测执行。目前 Sink 不支持推测执行,因此 Sink 上不会发生推测执行。
我们还改进了 Web UI 和 REST API (FLIP-249) 以显示任务的多个执行实例和被黑的 TaskManager。
2.6 混合shuffle模式
我们为批处理引入了一种新的 Hybrid Shuffle模式。它结合了 Blocking Shuffle 和 Pipeline Shuffle 的优点(主要用于流式传输):
与 Blocking Shuffle 一样,它不需要上游和下游任务同时运行,这使得作业可以用很少的资源执行。
与 Pipeline Shuffle 一样,它不需要在执行下游任务之前完成上游任务,这在资源充足的情况下减少了作业的整体执行时间。
用户可以选择不同的放置策略,以满足减少数据放置或降低任务重启成本的不同需求。
注意:此功能是实验性的,默认关闭。
2.7 Blocking shuffle 进一步改进
在这个版本中,我们进一步改进了 Blocking Shuffle 的可用性和性能,包括自适应网络缓冲区分配、顺序 IO 优化和结果分区重用,允许多个消费者节点重用同一个物理结果分区,以减少磁盘 IO 和存储。在 TPC-DS 10TB 规模测试中,这些优化可以实现 7% 的整体性能提升。此外,还引入了两种更高的压缩算法(LZO 和 ZSTD)。相比默认的 LZ4 压缩算法,可以进一步减少存储空间,但需要一定的 CPU 开销。
2.8 动态分区裁剪
对于批处理作业,分区表在生产中的使用比非分区表更广泛。目前 Flink 已经支持静态分区剪枝,即在优化阶段,优化器将 Filter 中与 Partition 相关的过滤条件推送到 Source Connector,以减少不必要的分区读取。星型模型是数据集市模型中最简单、应用最广泛的模型。我们发现很多用户的作业不能使用静态分区裁剪,因为分区裁剪信息只能在执行过程中确定,这就需要动态分区裁剪。技术,即运行时根据其他相关表的数据确定分区剪枝信息,以减少对分区表中无效分区的读取。在 TPC-DS 10TB 规模数据集上验证,该功能可提升 30% 的性能。
3.流处理
在 1.16 中,我们在 Checkpoint、SQL、Connector 等方面进行了改进,确保 Flink 在流计算方面继续保持领先。
3.1 广义增量检查点
Changelog State Backend 旨在使检查点间隔更短且更可预测。该版本在易用性和与其他状态后端的兼容性方面进行了许多改进,使其可以投入生产。
(1)支持状态迁移
(2)支持故障转移时从本地恢复
(3)引入文件缓存以优化恢复过程的性能
(4)支持从检查点切换
(5)优化监控体验:
1)扩展了Changelog的监控指标
2)在 Flink WebUI 上显示 Changelog 相关配置
表 1:Changelog Enabled / Changelog Disabled on Value State的对比。
3.2 RocksDB 重新缩放改进和性能测试
对于使用 Flink 构建的云服务应用程序,Rescaling 是一项非常频繁的操作。该版本使用 RocksDB 的区间删除来优化增量 RocksDB State Backend 的重缩放性能。间隔删除用于避免重缩放过程中的大量扫描和单点删除操作。对于需要删除大量状态的扩展并发,单个并发的恢复速度可以提高 2 到 10 倍。
3.3 提升 State Backend 的监控体验和可用性
此版本还改进了状态后端的监控体验和可用性。以前,RocksDB 的日志位于自己的 DB 目录中,这使得调试 RocksDB 变得不那么容易。此版本默认将 RocksDB 日志保留在 Flink 的日志目录中。添加了与 RocksDB 相关的统计信息以帮助调试 DB 级性能,例如 DB 中的总块缓存命中/失败计数。
3.4 支持透支缓冲
Overdraft Buffers(Overdraft Buffers)旨在减轻子任务在背压下被阻塞的可能性,并且可以通过设置 taskmanager.network.memory.max-overdraft-buffers-per-gate来启用。
从 1.16 开始,一个 Flink 子任务可以申请 5 个(默认)额外的透支缓冲区。过度绘制缓冲区会略微增加作业的内存使用量,但可以大大减少检查点间隔,尤其是在打开未对齐检查点的情况下。透支缓冲区只会在当前 Subtask 被下游 Subtasks 背压且当前 Subtask 需要请求多个网络缓冲区(Network Buffer)来完成当前操作时使用。更多细节可以在文档中找到。
3.5 对齐检查点超时
此版本更新了从对齐检查点 (AC) 切换到未对齐检查点 (UC) 的时间。UC开启时,如果配置了execution.checkpointing.aligned-checkpoint-timeout,每个Checkpoint在启动时仍然是一个AC,但是当全局Checkpoint持续时间超过aligned-checkpoint-timeout时,如果AC仍然没有的话 完成后,Checkpoint 将转换为 UC。
以前,对于一个 Substask,从 AC 到 UC 的切换需要等待所有上游屏障到达才能开始。在背压严重的情况下,下游的 Substask 在 checkpointing-timeout到期之前可能无法完全接受 checkpointing-timeout。到所有障碍,导致检查点失败。
在这个版本中,如果上游子任务中的barrier在execution.checkpointing.aligned-checkpoint-timeout内无法发送到下游,Flink会先让上游子任务切换到UC,将barrier发送到下游,从而 减少背压下检查点超时的概率。更多细节可以在文档中找到。
3.6 流计算中的非确定性
Flink SQL 用户经常抱怨理解流处理成本太高,痛点之一是流处理中的非确定性(而且往往不直观),这可能导致错误的结果或异常。而这些痛点在 Flink SQL 的早期就已经存在。
对于复杂的流式传输作业,现在可以在运行之前检测并解决潜在的正确性问题。如果问题不能完全解决,可以通过详细的消息提示用户如何调整 SQL 以避免引入非确定性问题。更多细节可以在文档中找到。
3.7 维度表增强
维度表关联在流处理中被广泛使用,在 1.16 中我们为此添加了一些优化和增强:
它支持通用缓存机制和相关指标,可以加快维表查询。
通过作业配置或查询提示支持可配置的异步模式(ALLOW_UNORDERED),在不影响正确性的情况下大大提高了查询吞吐量。
可重试查询机制为用户提供了更多解决维度表数据延迟更新问题的手段。
3.8 异步 I/O 支持重试
针对异步I/O引入了内置的重试机制,对用户现有代码透明,可以灵活满足用户的重试和异常处理需求。
4.PyFlink
在 Flink 1.15 中,我们引入了一种新的执行模式:“线程”模式。在这种模式下,用户定义的 Python 函数将通过 JNI 在 JVM 中执行,而不是在单独的 Python 进程中执行。但是,在 Flink 1.15 中,该功能仅在 Table API 和 SQL 上 Python 标量函数的执行上支持。在此版本中,我们对该功能提供了更全面的支持,Python DataStream API 以及 Table API 和 SQL 的 Python 表值函数也支持该功能。
最重要的是,我们将继续添加 Python API 的最后几个缺失的功能。在这个版本中,我们对 Python DataStream API 提供了更全面的支持:支持旁路输出、Broadcast State 等功能,并完善了对窗口功能的支持。我们还在 Python DataStream API 中添加了对更多连接器和格式的支持,例如对 Elasticsearch、Kinesis、Pulsar、Hybrid Source 等连接器以及 Orc 和 Parquet 等格式的支持。有了这些功能,Python API 已经基本对齐了 Java 和 Scala API 中的大部分重要功能,用户已经可以使用 Python 语言完成大部分类型的 Flink 作业的开发。
5.其他
5.1 新语法
1.16 扩展了几个 DDL 语法,帮助用户更好地使用 SQL:
(1)USING JAR[32]支持动态加载UDF jar包,方便平台开发者轻松管理UDF和提交相关作业。
(2)CREATE TABLE AS SELECT(CTAS) 允许用户基于现有表和查询创建新表。
(3)ANALYZE TABLE允许用户手动为原始表生成统计信息,以便优化器生成更好的执行计划。
5.2 在数据流中缓存
支持通过 DataStream#cache 缓存 Transformation 执行结果。第一次计算中间结果时会生成缓存的中间结果,以便将来的作业可以重用它。如果缓存丢失,将重新计算原始 Transformation 以获得结果。目前此功能仅在批处理模式下支持。此功能对于 Python 中的 ML 和交互式编程非常有用。
5.3 历史服务器和已完成作业的信息增强
在此版本中,我们增强了查看已完成作业信息的体验。
(1)JobManager/HistoryServer WebUI 提供详细的执行时间指标,包括任务在每个执行状态中花费的时间,以及运行期间的总忙/空闲/背压时间。
(2)JobManager/HistoryServer WebUI 提供按 Task 或 TaskManager 维度分组的主要子任务指标的聚合。
(3)JobManager/HistoryServer WebUI 提供更多环境信息,包括环境变量、JVM 选项和类路径。
(4)HistoryServer 现在支持浏览来自外部日志归档服务的日志。
5.4 Protobuf 格式
Flink 现在支持 Protocol Buffers(Protobuf) 格式,它允许您直接在 Table API 或 SQL 应用程序中使用这种格式。
5.5 为异步Sink引入可配置的 RateLimitingStrategy
异步接收器在 1.15 中实现,允许用户轻松实现自定义异步接收器。在此版本中,我们对其进行了扩展以支持可配置的 RateLimitingStrategy。这意味着接收器的实现者现在可以自定义其异步接收器在请求失败时的行为方式,具体取决于特定的接收器。如果未指定 RateLimitingStrategy,则默认为 AIMDScalingStrategy。
版权归原作者 大数据研习社 所有, 如有侵权,请联系我们删除。