Flink 保存点
保存点 (Savepoint) :用户手动触发保存状态
保存点的场景:
- 版本管理和归档存储
- 更新 Flink 版本
- 更新应用程序
- 调整并行度
- 暂停应用程序
设置算子 ID :
DataStream<String> stream = env.addSource(newStatefulSource()).uid("source-id").map(newStatefulMapper()).uid("mapper-id").print();
创建保存点 :
- obId :镜像保存的作业 ID
- targetDirector(可选) :保存点存储的路径
bin/flink savepoint :jobId [:targetDirectory]
flink-conf.yaml
:
# 保存点的默认路径state.savepoints.dir: hdfs:///flink/savepoints
单作业 :
// 保存点的默认路径
env.setDefaultSavepointDir("hdfs:///flink/savepoints");
停止作业时,创建保存点 :
bin/flink stop --savepointPath [:targetDirectory] :jobId
从保存点重启应用 :
- -s : 保存点的路径
bin/flink run -s :savepointPath [:runArgs]
报错 :
javax.annotation.Nullable
找不到,解决 :
<dependency><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId><version>1.3.9</version></dependency>
提交 flink 作业 :
bin/flink run-application -d -t yarn-application \
-Dstate.backend=hashmap \
-c com.cpucode.checkpoint.SavepointDemo \
FlinkTutorial-1.0-SNAPSHOT.jar
优雅停止 , 并触发保存点
- 要求 source 实现 StoppableFunction 接口
bin/flink stop \
-p savepoint路径 job-id \
-yid application-id
立即停止 , 并触发保存点 :
bin/flink cancel \
-s savepoint路径 job-id \
-yid application-id
savepoint 恢复作业, 并修改状态后端
bin/flink run-application -d -t yarn-application \
-s hdfs://hadoop102:8020/sp/savepoint-267cc0-47a214b019d5 \
-Dstate.backend=rocksdb \
-c com.cpucode.checkpoint.SavepointDemo \
FlinkTutorial-1.0-SNAPSHOT.jar
checkpoint 恢复作业
- 恢复时 , 不支持切换状态后端
bin/flink run-application -d -t yarn-application \
-Dstate.backend=rocksdb \
-s hdfs://hadoop102:8020/chk/532f87ef4146b2a2968a1c137d33d4a6/chk-175 \
-c com.cpucode.checkpoint.SavepointDemo \
./FlinkTutorial-1.0-SNAPSHOT.jar
版权归原作者 cpuCode 所有, 如有侵权,请联系我们删除。