一、概述
在本地IDEA测试中,使用本地文件存储系统,作为checkpoint的存储系统,将备份数据存储到本地文件中,作业停止后,从本地备份数据启动Flink程序。
主要分为两步:
1)备份数据
2)从备份数据启动
二、备份数据
备份数据的配置,和使用HDFS文件体统类似,只不过路径填写成本地文件系统的路径,注意格式需要是 file://///,和HDFS文件系统的配置略有不同。文件具体存储的位置,在idea安装路径的根路径下。比如本人IDEA安装在D盘下,checkpoint地址配置为 file:///Users/flink/checkpoints/TestCheckPoint,那么生成的备份点数据在 D:\Users\flink\checkpoints\TestCheckPoint 目录下。
部分代码如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 禁用全局任务链
env.disableOperatorChaining();
String brokers = "********.com:9092,********.com:9092,********.com:9092";
String sourceTopic = "0000-checkpoint-test-source";
String resultTopic = "0000-checkpoint-test-result";
String groupId = "demo";
String checkPointPath = "file:///Users/flink/checkpoints/TestCheckPoint";
StateBackend backend = new EmbeddedRocksDBStateBackend(true);
env.setStateBackend(backend);
CheckpointConfig conf = env.getCheckpointConfig();
// 任务流取消和故障应保留检查点
conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
conf.setCheckpointInterval(10000);//milliseconds
conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
conf.setMinPauseBetweenCheckpoints(10 * 1000);//相邻两次checkpoint之间的时间间隔
conf.setCheckpointStorage(checkPointPath);
生成的备份数据如下
三、从备份数据启动
主要区别,在从备份点数据恢复运行程序。
如果是在yarn集群运行,在启动指令中加入 -s hdfs://ns1/flink//chk- \ 即可;而在本地运行,需要将备份点路径设置到运行环境中,可以通过启动指令设置,也可以在代码中直接设置。为了展示方便,这里直接在代码中设置。
比如,从备份点 D:\Users\flink\checkpoints\TestCheckPoint\3fc3902734d9316b3d8947508e95eabd\chk-9 恢复运行,需要将备份点设置到环境变量中,部分代码如下:
Configuration configuration = new Configuration();
configuration.setString("execution.savepoint.path", "file:///Users/flink/checkpoints/TestCheckPoint/3fc3902734d9316b3d8947508e95eabd/chk-9");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 禁用全局任务链
env.disableOperatorChaining();
String brokers = "kafka-log1.test.xl.com:9092,kafka-log2.test.xl.com:9092,kafka-log3.test.xl.com:9092";
String sourceTopic = "0000-checkpoint-test-source";
String resultTopic = "0000-checkpoint-test-result";
String groupId = "demo";
String checkPointPath = "file:///Users/flink/checkpoints/TestCheckPoint";
StateBackend backend = new EmbeddedRocksDBStateBackend(true);
env.setStateBackend(backend);
CheckpointConfig conf = env.getCheckpointConfig();
// 任务流取消和故障应保留检查点
conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
conf.setCheckpointInterval(10000);//milliseconds
conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
conf.setMinPauseBetweenCheckpoints(10 * 1000);//相邻两次checkpoint之间的时间间隔
conf.setCheckpointStorage(checkPointPath);
启动程序后,将会从备份点读取状态数据,继续进行计算。
版权归原作者 Johnson8702 所有, 如有侵权,请联系我们删除。