0


3、Flink执行模式(流/批)详解(上)

0、批模式和流模式对比表

类别流模式批模式任务调度所有任务需要持续运行,消耗资源大任务可以按Shuffle分阶段执行,消耗资源小Shuffle记录会被流水线式的持续发送到下游任务,在网络上进行缓冲可以保存Shuffle分阶段执行的中间结果State Backends & State使用StateBackend控制状态的存储方式和检查点的工作方式对输入按key分组排序,依次处理一个key的所有记录,在同一时间只保留一个key的状态,当处理到下一个key时,上一个key的状态将被丢弃处理顺序对于用户自定义函数,数据一到达就被处理广播输入》常规输入》keyed 输入事件时间存在数据乱序的可能不存在数据乱序的可能,可以按时间排序水位线使用Watermark(一个带有时间戳

T

的水印)标志再没有时间戳

t < T

的元素出现在当前key的末尾有一个

MAX_WATERMARK

,仅在每个key末尾触发定时器,在下一个key的开始有一个

MIN_WATERMARK

处理时间事件时间和处理时间存在相关性事件时间和处理时间相关性不存在,允许用户请求当前的处理时间,并注册处理时间计时器,仅在每个key末尾触发计时器故障恢复使用 checkpoint 进行故障恢复,发生故障时,从 checkpoint 重新启动所有正在运行的任务尝试回溯到之前的中间结果仍可获取的处理阶段,只有失败的任务才需要重新启动

1)概述
1.流处理和批处理概述

**DataStream API 的

流(STREAMING)

执行模式,适用于需要连续增量处理,而且常驻线上的无边界作业。**

**DataStream API 的

批(BATCH)

执行模式,类似于 MapReduce 等批处理框架,适用于已知输入、不会连续运行的的有边界作业。**

Flink 对流处理和批处理采取统一的处理方式,无论配置何种执行模式,在有界输入上执行的 DataStream 应用都会产生相同的最终 结果;在

模式执行的作业可能会产生增量更新(类似于数据库中的插入(upsert)操作),而

作业只在最后产生一个最终结果。

**通过启用

执行模式,Flink 可以对有边界作业进行额外的优化**:可以使用不同的关联(join)/ 聚合(aggregation)策略、不同 shuffle 实现来提高任务调度和故障恢复的效率等。

2.批执行模式选择时机

执行模式只能用于 有边界 的作业/Flink 程序。

边界:是数据源的一个属性,表明是否在执行之前已经知道来自该数据源的所有输入,或者新数据是否会无限期地出现;如果作业的所有源都是有边界的,则它就是有边界的,否则就是无边界的。

**

执行模式,既可用于有边界任务,也可用于无边界任务。**

**

执行模式,只用于有边界任务。**

**使用

模式运行有边界作业**:

  • 使用有边界作业的运行结果去初始化作业状态,并将该状态在之后的无边界作业中使用;例如,通过模式运行一个有边界作业,获取一个 savepoint,然后在一个无边界作业上恢复这个 savepoint【将 savepoint 作为 执行作业的附加输出】;
  • 为无边界数据源写测试代码时,使用有边界数据源更自然。
3.配置批执行模式

执行模式可以通过

execution.runtime-mode

配置,可选值如下:

  • STREAMING: 经典 DataStream 执行模式(默认)
  • BATCH: 在 DataStream API 上进行批量式执行
  • AUTOMATIC: 让系统根据数据源的边界性来决定

可以通过

bin/flink run ...

的命令行参数配置,或者在创建/配置

StreamExecutionEnvironment

时写进程序。

案例:通过命令行配置执行模式

bin/flink run -Dexecution.runtime-mode=BATCH <jarFile>

案例:在代码中配置执行模式

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

注意不建议在程序中设置运行模式,而是在提交应用程序时使用命令行设置,保持应用程序代码的免配置可以让程序更加灵活,因为同一个应用程序可能在任何执行模式下执行。

4.执行行为
a)任务调度与网络 Shuffle

执行模式中,因为输入数据是有边界的,Flink可以使用更高效的数据结构和算法来进行任务调度和网络 shuffle。

案例:任务调度和网络传输的差异

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> source = env.fromElements(...);

source.name("source")
    .map(...).name("map1")
    .map(...).name("map2")
    .rebalance()
    .map(...).name("map3")
    .map(...).name("map4")
    .keyBy((value) -> value)
    .map(...).name("map5")
    .map(...).name("map6")
    .sinkTo(...).name("sink");
map()

flatMap()

filter()

,可以依靠算子链链在一起,直接将数据转发到下一个操作,不涉及网络 shuffle。

keyBy()

rebalance()

,需要在不同的任务并行实例之间传输数据,涉及网络 shuffle。

对于上面的例子,Flink 会将操作分组为以下任务

  • 任务1: sourcemap1map2
  • 任务2: map3map4
  • 任务3: map5map6sink

在任务1到任务2、任务2到任务3之间各有一次网络 shuffle,作业的可视化表示如下

在这里插入图片描述

i)流执行模式

在**

执行模式下所有任务需要持续运行;Flink可以通过整个管道立即处理新的记录,以达到需要的连续和低延迟的流处理;同时分配给某个作业的 TaskManagers 需要有足够的资源**来同时运行所有的任务。

网络 shuffle 是 流水线 式的,记录会立即发送给下游任务,在网络层上进行一些缓冲;当处理连续的数据流时,在任务(或任务管道)之间没有可以实体化的自然数据点(时间点),而在

执行模式下,中间的结果可以被实体化。

ii)批执行模式

在**

执行模式下作业的任务可以分阶段执行**;因为输入是有边界的,因此 Flink 可以在进入下一个阶段之前完全处理管道中的一个阶段;在上面的例子中,工作会有三个阶段,对应着被 shuffle 界线分开的三个任务。

分阶段处理要求 Flink 将任务的中间结果实体化到非永久存储中,让下游任务在上游任务下线后再读取,将增加处理的延迟,但这允许 Flink 在故障发生时回溯到最新的可用结果,而不是重新启动整个任务;同时

作业可以在更少的资源上执行

TaskManagers 将至少在下游任务开始消费前保留中间结果,在这之后,只要空间允许,中间结果就会被保留,以便任务失败回滚。

b)State Backends / State

模式下,Flink 使用 StateBackend 控制状态的存储方式和检查点的工作方式。

模式下,配置的 state backend 被忽略;对输入按 key 分组(使用排序),依次处理一个 key 的所有记录,以便在同一时间只保留一个 key 的状态,当进行到下一个 key 时,上一个 key 的状态将被丢弃。

c)处理顺序

**在

执行和

执行中,算子或用户自定义函数(UDF)处理记录的顺序可能不同。**

模式下,对于用户自定义函数,数据一到达就被处理。

模式下,Flink 确保数据有序,排序可以是特定调度任务、网络 shuffle、上文提到的 state backend 等。

将常见输入类型分为三类

  • *广播输入(broadcast input)*: 从广播流输入【广播状态】;
  • 常规输入(regular input): 从广播或 keyed 输入;
  • keyed 输入(keyed input): 从 KeyedStream 输入;

消费多种类型输入的函数或算子,处理顺序如下

  • 广播输入第一个处理
  • 常规输入第二个处理
  • keyed 输入最后处理

对于从多个常规或广播输入进行消费的函数 — 比如

CoProcessFunction

— Flink 有权从任一输入以任意顺序处理数据。

对于从多个keyed输入进行消费的函数 — 比如

KeyedCoProcessFunction

— Flink 先处理单一键中的所有记录再处理下一个。

d)事件时间/水印

模式下,存在数据乱序的可能,使用 Watermark(一个带有时间戳

T

的水印)标志再没有时间戳

t < T

的元素跟进。

模式下,输入的数据集是事先已知的,至少可以按照时间戳对元素进行排序,从而按照时间顺序进行处理,不存在数据乱序的可能。

综上:在

模式下,只需要在输入的末尾有一个与每个键相关的

MAX_WATERMARK

,如果输入流没有键,则在输入的末尾需要一个

MAX_WATERMARK

;所有注册的定时器都会在 时间结束 时触发,用户定义的

WatermarkAssigners

WatermarkStrategies

会被忽略;但配置

WatermarkStrategy

是有用的,因为它的

TimestampAssigner

仍然会被用来给记录分配时间戳。

e)处理时间

处理时间是指在处理记录的具体实例上,处理记录的机器上的时钟时间,基于处理时间的计算结果是不可重复的,因为同一条记录被处理两次,会有两个不同的时间戳。

模式下,事件时间和处理时间存在相关性;在

模式下事件时间的

1小时

也几乎是处理时间的

1小时

,使用处理时间可以用于早期(不完全)触发,给出预期结果的提示。

模式下,事件时间和处理时间相关性不存在,因为输入的数据集是静态的;允许用户请求当前的处理时间,并注册处理时间计时器,但与事件时间的情况一样,所有的计时器都要在输入结束时触发。

在作业执行过程中,处理时间不会提前,当整个输入处理完毕后,会快进到时间结束。

f)故障恢复

模式下,Flink 使用 checkpoint 进行故障恢复,在发生故障时,Flink 会从 checkpoint 重新启动所有正在运行的任务。

模式下,Flink 会尝试回溯到之前的中间结果仍可获取的处理阶段,只有失败的任务才需要重新启动,相比从 checkpoint 重新启动所有任务,可以提高作业的处理效率和整体处理时间。

5.重要的考虑因素
a)概述

模式下的行为变化

  • “滚动"操作,如 reduce() 或 sum(),会对模式下每一条新记录发出增量更新,在模式下,只发出最终结果。

模式下不支持

  • Checkpointing 和任何依赖于 checkpointing 的操作都不支持。
b)Checkpointing

批处理程序的故障恢复不使用检查点,因为没有 checkpoints,某些功能如 CheckpointListener ,以及因此,Kafka 的精确一次(EXACTLY_ONCE) 模式或

File Sink

的 OnCheckpointRollingPolicy 将无法工作。

仍然可以使用所有的状态原语(state primitives),只是用于故障恢复的机制会有所不同。

c)编写自定义算子

注意:**在

模式下运行良好的 Operator 可能会在

模式下产生错误的结果。**

  • 模式下,会逐个 key 处理记录;因此,Watermark 会在每个 key 之间从 MAX_VALUE 切换到 MIN_VALUE
  • Watermark 在一个算子中不总是递增的。
  • 定时器将首先按 key 的顺序触发,然后按每个 key 内的时间戳顺序触发。
  • 不支持手动更改 key 的操作。
6.总结
1、即使不配置执行模式,在有界输入上执行的DataStream应用会产生相同的最终结果,但是配置批处理模式执行有界作业后,可以执行额外的优化。

2、可以通过代码【不建议】和命令行【建议】配置运行模式。

3、相同的算子在批模式下和流模式下可能会产生不同的结果(水位线的触发=>定时器的触发、窗口的触发)。

4、批执行模式和流执行模式均可以配置重试策略(RestartStrategies)来重启任务,但只有流执行模式支持检查点(Checkpoint)。
标签: flink 大数据

本文转载自: https://blog.csdn.net/m0_50186249/article/details/138193314
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。

“3、Flink执行模式(流/批)详解(上)”的评论:

还没有评论