Checkpointing
1.概述
Flink 中的每个方法或算子都能够是有状态的,状态化的方法在处理单个 元素/事件 的时候存储数据,为了让状态容错,Flink 需要为状态添加 checkpoint(检查点)。
2.开启与配置 Checkpoint
默认 checkpoint 是禁用的,通过调用
StreamExecutionEnvironment
的
enableCheckpointing(n)
来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。
checkpoint 其它属性包括
- checkpoint 存储: 可以设置检查点快照的持久化位置,默认使用 JobManager 堆内存,建议在生产中使用持久性文件系统。
- 精确一次(exactly-once)对比至少一次(at-least-once):可以在
enableCheckpointing(long interval, CheckpointingMode mode)
方法中传入 checkpoint 模式,推荐精确一次,至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。 - checkpoint 超时:如果 checkpoint 执行的时间超过了配置的阈值,进行中的 checkpoint 会被抛弃。
- checkpoints 之间的最小时间:在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展,如果值设为 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。使用 checkpoints 之间的最小时间,在 checkpoint 的执行时间超过平均值时不会受到影响(例如目标存储系统忽然变得很慢)这个值也意味着并发 checkpoint 的数目是一。
- checkpoint 可容忍连续失败次数:可容忍多少次连续的 checkpoint 失败,超过阈值之后会触发作业错误 fail over,默认次数为 0,不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over;可容忍的 checkpoint 失败情形:Job Manager的IOException,TaskManager 做 checkpoint 时异步部分的失败, checkpoint 超时等;TaskManager 做 checkpoint 时同步部分的失败会直接触发作业fail over;其它的 checkpoint 失败(如一个 checkpoint 被另一个 checkpoint 包含)会被忽略掉。
- 并发 checkpoint 的数目: 默认在上一个 checkpoint 未完成(失败或成功)的情况下,系统不会触发另一个 checkpoint,以便拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程;允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法调用比较耗时的外部服务),该选项不能和 “checkpoints 间的最小时间” 同时使用。
- externalized checkpoints: 配置周期存储 checkpoint 到外部系统中,Externalized checkpoints 将元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。
- 非对齐 checkpoints: 启用非对齐 checkpoints 以便在背压时大大减少创建 checkpoint 的时间,仅适用于精确一次(exactly-once)checkpoints 并且只有一个并发检查点。
- 部分任务结束的 checkpoints: 默认,即使 DAG 的部分已经处理完它们的所有记录,Flink也会继续执行 checkpoints。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
3.相关配置选项
KeyDefaultTypeDescriptionstate.backend.incrementalfalseBoolean配置状态后端是否应创建增量检查点(如果可能);对于增量检查点,只存储与前一个检查点的diff,而不是完整的检查点状态,启用后,web UI中显示的或从其余API获取的状态大小仅表示增量检查点大小,而不是完整检查点大小,某些状态后端可能不支持增量检查点并忽略此选项。state.backend.local-recoveryfalseBoolean配置状态后端的本地恢复,默认禁用,本地恢复目前只支持keyed state backends(包括EmbeddedRocksDBStateBackend和HashMapStateBackend)。state.checkpoint-storage(none)String配置检查点存储,可通过名称指定 [“jobmanager” 或“filesystem”],也可以通过“CheckpointStorageFactory”的类名指定。state.checkpoint.cleaner.parallel-modetrueBoolean是否使用传递到 cleaner 中的 ExecutorService 并行丢弃检查点的状态state.checkpoints.create-subdirtrueBoolean是否在“state.checkpoints.dir”下创建以作业id命名的子目录,以存储检查点的数据文件和元数据,默认为true。state.checkpoints.dir(none)String默认目录,用于在Flink支持的文件系统中存储检查点的数据文件和元数据,存储路径必须可从所有参与的进程/节点(即所有TaskManager和JobManager)访问。state.checkpoints.num-retained1Integer要保留的已完成检查点的最大数量。state.savepoints.dir(none)String保存点的默认目录,由将存储点写入文件系统的状态后端(HashMapStateBackend、EmbeddedRocksDBStateBackend)使用。state.storage.fs.memory-threshold20 kbMemorySize状态数据文件的最小大小,所有小于的状态块都内联存储在根检查点元数据文件中,此配置的最大内存阈值为1MB。state.storage.fs.write-buffer-size4096Integer写入文件系统的检查点流的写入缓冲区的默认大小,实际写入缓冲区大小被确定为此选项和选项“state.storage.fs.memory threshold”的最大值。taskmanager.state.local.root-dirs(none)Stringconfig 参数定义根目录,存储用于本地恢复的基于文件的状态;本地恢复目前只覆盖keyed state backends,如果未配置,它将默认为<WORKING_DIR>/localState,<WORKING_DIR>可以通过
process.taskmanager.WORKING-dir进行配置
。
4.配置 State Backend
Flink 的 checkpointing 机制会将 timer 以及 stateful 的 operator 进行快照,然后存储下来, 包括连接器(connectors),窗口(windows)以及用户自定义的状态。
Checkpoint 存储在哪里取决于配置的 State Backend(比如 JobManager memory、 file system、 database)。
默认情况下,状态是保存在 TaskManagers 的内存中,checkpoint 保存在 JobManager 的内存中,为了持久化大体量状态, Flink 支持存储 checkpoint 状态到其他的 state backends 上。
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
env.configure(config);
5.迭代作业中的状态和 checkpoint
Flink 现在为没有迭代(iterations)的作业提供一致性的处理保证,在迭代作业上开启 checkpoint 会导致异常。
为了在迭代程序中强制进行 checkpoint,用户需要在开启 checkpoint 时设置一个特殊的标志:
env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true);
请注意在环形边上游走的记录(以及与之相关的状态变化)在故障时会丢失。
6.部分任务结束后的 Checkpoint
a)概述
从 1.14 版本开始 Flink 支持在部分任务结束后继续进行Checkpoint,如果一部分数据源是有限数据集,那么就可以。
从 1.15 版本开始,这一特性被默认打开,如果想要关闭这一功能,可以执行:
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
此时,结束的任务不会参与 Checkpoint 的过程,在实现自定义的算子或者 UDF (用户自定义函数)时需要考虑这一点。
为了支持部分任务结束后的 Checkpoint 操作,调整了 任务的生命周期 并且引入了 StreamOperator#finish 方法,在这一方法中,用户需要写出所有缓冲区中的数据。
在 finish 方法调用后的 checkpoint 中,这一任务不能再有缓冲区中的数据,因为在
finish()
后没有办法输出这些数据,大部分情况下,
finish()
后这一任务的状态为空,唯一的例外是如果其中某些算子中包含外部系统事务的句柄(例如为了实现恰好一次语义), 在这种情况下,在
finish()
后进行的 checkpoint 操作应该保留这些句柄,并且在结束 checkpoint(即任务退出前所等待的 checkpoint)时提交。
b)对 operator state 的影响
在部分 Task 结束后的 checkpoint 中,Flink 对
UnionListState
进行了特殊的处理,
UnionListState
一般用于实现对外部系统读取位置的一个全局视图(例如,用于记录所有 Kafka 分区的读取偏移)。
如果在算子的某个并发调用
close()
方法后丢弃它的状态,就会丢失它所分配的分区的偏移量信息,为了解决这一问题,对于使用
UnionListState
的算子,只允许在它的并发都在运行或都已结束的时候才能进行 checkpoint 操作。
ListState
一般不会用于类似的场景,仍然需要注意在调用
close()
方法后进行的 checkpoint 会丢弃算子的状态并且 这些状态在算子重启后不可用。
任何支持并发修改操作的算子也可以支持部分并发实例结束后的恢复操作,从这种类型的快照中恢复等价于将算子的并发改为正在运行的并发实例数。
c)任务结束前等待最后一次 Checkpoint
为了保证使用两阶段提交的算子可以提交所有的数据,任务会在所有算子都调用
finish()
方法后等待下一次 checkpoint 成功后退出。
注意:这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显,极端情况下,如果 checkpoint 的周期被设置为
Long.MAX_VALUE
,那么任务永远不会结束,因为下一次 checkpoint 不会进行。
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。