0


Flink 保存点

Flink 保存点

保存点 (Savepoint) :用户手动触发保存状态

保存点的场景:

  • 版本管理和归档存储
  • 更新 Flink 版本
  • 更新应用程序
  • 调整并行度
  • 暂停应用程序

设置算子 ID :

DataStream<String> stream = env.addSource(newStatefulSource()).uid("source-id").map(newStatefulMapper()).uid("mapper-id").print();

创建保存点 :

  • obId :镜像保存的作业 ID
  • targetDirector(可选) :保存点存储的路径
bin/flink savepoint :jobId [:targetDirectory]
flink-conf.yaml

:

# 保存点的默认路径state.savepoints.dir: hdfs:///flink/savepoints

单作业 :

// 保存点的默认路径
env.setDefaultSavepointDir("hdfs:///flink/savepoints");

停止作业时,创建保存点 :

bin/flink stop --savepointPath [:targetDirectory] :jobId

从保存点重启应用 :

  • -s : 保存点的路径
bin/flink run -s :savepointPath [:runArgs]

报错 :

javax.annotation.Nullable

找不到,解决 :

<dependency><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId><version>1.3.9</version></dependency>

提交 flink 作业 :

bin/flink run-application -d -t yarn-application \
-Dstate.backend=hashmap \
-c com.cpucode.checkpoint.SavepointDemo \
FlinkTutorial-1.0-SNAPSHOT.jar

优雅停止 , 并触发保存点

  • 要求 source 实现 StoppableFunction 接口
bin/flink stop \
-p savepoint路径 job-id \
-yid application-id

立即停止 , 并触发保存点 :

bin/flink cancel \
-s savepoint路径 job-id \
-yid application-id

savepoint 恢复作业, 并修改状态后端

bin/flink run-application -d -t yarn-application \
-s hdfs://hadoop102:8020/sp/savepoint-267cc0-47a214b019d5 \
-Dstate.backend=rocksdb \
-c com.cpucode.checkpoint.SavepointDemo \
FlinkTutorial-1.0-SNAPSHOT.jar    

checkpoint 恢复作业

  • 恢复时 , 不支持切换状态后端
bin/flink run-application -d -t yarn-application \
-Dstate.backend=rocksdb \
-s hdfs://hadoop102:8020/chk/532f87ef4146b2a2968a1c137d33d4a6/chk-175 \
-c com.cpucode.checkpoint.SavepointDemo \
./FlinkTutorial-1.0-SNAPSHOT.jar
标签: flink hadoop 大数据

本文转载自: https://blog.csdn.net/qq_44226094/article/details/131274163
版权归原作者 cpuCode 所有, 如有侵权,请联系我们删除。

“Flink 保存点”的评论:

还没有评论