Flink 检查点配置
启用检查点
开启自动保存快照 (默认:关闭) :
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 每隔 1 秒启动一次检查点保存
env.enableCheckpointing(1000);
间隔调整 :
- 对性能的影响更小,就调大间隔时间
- 为了更好的容错性,就以调小间隔时间
检查点存储
检查点存储 (CheckpointStorage) : 持久化存储位置
- JobManager 的堆内存 (JobManagerCheckpointStorage) : 默认
- 文件系统 (FileSystemCheckpointStorage) : 常用 , (HDFS , S3)
// 配置存储检查点到 JobManager 堆内存
env.getCheckpointConfig().setCheckpointStorage(newJobManagerCheckpointStorage());// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(newFileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
通用增量
Rocksdb 状态后端 : 启用增量 checkpoint
- Flink 1.15 后 , HashMap , Rocksdb 都能开启通用的增量 checkpoint
EmbeddedRocksDBStateBackend backend =newEmbeddedRocksDBStateBackend(true);
增量 checkpoint 过程 :
- 带状态的算子任务 , 将状态更改 , 写入变更日志(记录状态)
- 状态物化:状态表定期保存,独立于检查点
- 状态物化完成后,状态变更日志 , 就截断到相应的点
注意点 :
- HDFS : 文件数变多
- 上传变更日志 : IO 宽带较大
- 序列化状态变更 : CPU 消耗较大
- 缓存状态变更 : TaskManager 内存消耗较大
- Checkpint 最大并发 = 1
- Flink 1.15 , Memory 测试阶段
- 不支持 NO_ClAIM 模式
配置文件 :
state.backend.changelog.enabled:truestate.backend.changelog.storage: filesystem
# 存储 changelog 数据dstl.dfs.base-path: hdfs://hadoop102:8020/changelog
execution.checkpointing.max-concurrent-checkpoints:1execution.savepoint-restore-mode: CLAIM
代码配置 :
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-changelog</artifactId><version>${flink.version}</version><scope>runtime</scope></dependency>
// 开启changelog:
env.enableChangelogStateBackend(true);
最终检查点
当有界数据 , 部分Task 完成 , Flink 1.14 后 , 它们依然能进行检查点
禁用 (Flink 1.15 后, 默认启用) :
Configuration config =newConfiguration();// 禁用最终检查点
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,false);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(config);
配置建议
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 获取所有配置CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 检查点模式 (CheckpointingMode) : // 精确一次 : exactly-once (默认)// 至少一次 : at-least-once (效率更高)
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 最大并发检查点数量(maxConcurrentCheckpoints): // 检查点最多有多少个
checkpointConfig.setMaxConcurrentCheckpoints(1)// 启用非对齐的检查点保存// 限制: CheckpointingMode= exctly-once , 并发的检查点 = 1
checkpointConfig.enableUnalignedCheckpoints();// 默认: 0: 用非对齐的检查点// > 0: 用 对齐的检查点(barrier对齐)// 当对齐时间 > 阈值, 为: 非对齐检查点(barrier非对齐)
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));// 超时时间 (checkpointTimeout) : // 检查点保存的超时时间,当超时就丢弃// 单位 : 长整型毫秒数
checkpointConfig.setCheckpointTimeout(60000)//最小间隔时间 (minPauseBetweenCheckpoints): // 上个 checkpoint 完成后, 最快多久触发另个 checkpoint
checkpointConfig.setMinPauseBetweenCheckpoints(500)// 开启检查点的外部持久化// DELETE_ON_CANCELLATION: 作业取消时, 自动删除外部检查点,但作业失败退出,就保留检查点// RETAIN_ON_CANCELLATION:作业取消时, 也保留外部检查点
checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)// 检查点异常时, 是否整个任务失败// true : 失败提出// false: 丢弃, 并继续运行
checkpointConfig.setFailOnCheckpointingErrors(true)
版权归原作者 cpuCode 所有, 如有侵权,请联系我们删除。