Flink Checkpoint 详解
一、checkpoint简介
Checkpoint是Flink实现容错机制最核心的功能,是Flink可靠性的基石,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。
二、checkpoint原理
1. Checkpoint Barrier
Barrier是Flink分布式快照的核心概念之一,称之为屏障或者数据栅栏(可以理解为快照的分界线)。Barrier是一种特殊的内部消息,在进行Checkpoint的时候Flink会在数据流源头处周期性地注入Barrier,这些Barrier会作为数据流的一部分,一起流向下游节点并且不影响正常的数据流。Barrier的作用是将无界数据流从时间上切分成多个窗口,每个窗口对应一系列连续的快照中的一个,每个Barrier都带有一个快照ID,一个Barrier生成之后,在这之前的数据都进入此快照,在这之后的数据则进入下一个快照。
单并发:如图所示,当ID为n的Checkpoint Barrier到达每个算子后,表示要对n-1和n之间状态的更新做快照。
多并发:如图所示,每一个入流都会有barrier传过来,这里就会涉及到一个对齐的概念(详细介绍如下),如果没有对齐的话,当flink任务失败重启的话,则会重复消费barrier到达差之间的数据。
数据对齐:先到的入流后续的数据会先存到缓存里等待其他流的barrier到,等做完checkpoint并将barrier下发之后再正常的向下发送数据,步骤如下
第一步:算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐。
第二步:算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。
第三步:第二个输入通道的Checkpoint Barrier抵达该算子子任务,该算子子任务执行快照,将状态写入State Backend,然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。
第四步:对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。
2. checkpoint整体步骤
首先,Flink的检查点协调器(Checkpoint Coordinator)触发一次Checkpoint(Trigger Checkpoint),这个请求会发送给Source的各个子任务。
各Source算子子任务接收到这个Checkpoint请求之后,会将自己的状态写入到状态后端,生成一次快照,并且会向下游广播Checkpoint Barrier。
Source算子做完快照后,还会给Checkpoint Coodinator发送一个确认,告知自己已经做完了相应的工作。这个确认中包括了一些元数据,其中就包括刚才备份到State Backend的状态句柄,或者说是指向状态的指针。至此,Source完成了一次Checkpoint,之后source算子就会把barrier往下传递
下游算子接受barrier之后都需要经历一遍对齐、快照、确认的工作,当最后所有Sink算子确认完成快照之后,说明ID为n的Checkpoint执行结束,Checkpoint Coordinator向State Backend写入一些本次Checkpoint的元数据。
3. 异步快照
当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。同时,Flink启动一个后台线程,它创建本地状态的一份拷贝,这个线程用来将本地状态的拷贝同步到State Backend上,一旦数据同步完成,再给Checkpoint Coordinator发送确认信息。拷贝一份数据肯定占用更多内存,这时可以利用写入时复制(Copy-on-Write)的优化策略。Copy-on-Write指:如果这份内存数据没有任何修改,那没必要生成一份拷贝,只需要有一个指向这份数据的指针,通过指针将本地数据同步到State Backend上;如果这份内存数据有一些更新,那再去申请额外的内存空间并维护两份数据,一份是快照时的数据,一份是更新后的数据。
4. checkpoint存储结构
checkpoint由算子写的状态文件和checkpoint调度器写的元数据文件两部分组成,存储在状态后端。
三、精确一次
确保精确一次(exactly once)
当流处理应用程序发生错误的时候,结果可能会产生丢失或者重复。Flink 根据你为应用程序和集群的配置,可以产生以下结果:
Flink 不会从快照中进行恢复(at most once)
没有任何丢失,但是你可能会得到重复冗余的结果(at least once)
没有丢失或冗余重复(exactly once)
Flink 通过回退和重新发送 source 数据流从故障中恢复,当理想情况被描述为精确一次时,这并不意味着每个事件都将被精确一次处理。相反,这意味着 每一个事件都会影响 Flink 管理的状态精确一次。
Barrier 只有在需要提供精确一次的语义保证时需要进行对齐(Barrier alignment)。如果不需要这种语义,可以通过配置 CheckpointingMode.AT_LEAST_ONCE 关闭 Barrier 对齐来提高性能。
端到端精确一次
为了实现端到端的精确一次,以便 sources 中的每个事件都仅精确一次对 sinks 生效,必须满足以下条件:
你的 sources 必须是可重放的,并且
你的 sinks 必须是事务性的(或幂等的)
四、状态后端
前面已经分享了Flink的快照机制,其中State Backend起到了持久化存储数据的重要功能。Flink将State Backend抽象成了一种插件,并提供了三种State Backend,每种State Backend对数据的保存和恢复方式略有不同。接下来我们开始详细了解一下Flink的State Backend。
- MemoryStateBackend
从名字中可以看出,这种State
Backend主要基于内存,它将数据存储在Java的堆区。当进行分布式快照时,所有算子子任务将自己内存上的状态同步到JobManager的堆上,一个作业的所有状态要小于JobManager的内存大小。这种方式显然不能存储过大的状态数据,否则将抛出OutOfMemoryError异常。因此,这种方式只适合调试或者实验,不建议在生产环境下使用。下面的代码告知一个Flink作业使用内存作为State
Backend,并在参数中指定了状态的最大值,默认情况下,这个最大值是5MB。
env.setStateBackend(newMemoryStateBackend(MAX_MEM_STATE_SIZE))
如果不做任何配置,默认情况是使用内存作为State Backend。
- FsStateBackend
这种方式下,数据持久化到文件系统上,文件系统包括本地磁盘、HDFS以及包括Amazon、阿里云在内的云存储服务。使用时,我们要提供文件系统的地址,尤其要写明前缀,比如:file://、hdfs://或s3://。此外,这种方式支持Asynchronous
Snapshot,默认情况下这个功能是开启的,可加快数据同步速度。
// 使用HDFS作为State Backend
env.setStateBackend(newFsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"))// 使用阿里云OSS作为State Backend
env.setStateBackend(newFsStateBackend("oss://<your-bucket>/<object-name>"))// 使用Amazon作为State Backend
env.setStateBackend(newFsStateBackend("s3://<your-bucket>/<endpoint>"))// 关闭Asynchronous Snapshot
env.setStateBackend(newFsStateBackend(checkpointPath,false))
Flink的本地状态仍然在TaskManager的内存堆区上,直到执行快照时状态数据会写到所配置的文件系统上。因此,这种方式能够享受本地内存的快速读写访问,也能保证大容量状态作业的故障恢复能力。
- RocksDBStateBackend
这种方式下,本地状态存储在本地的RocksDB上。RocksDB是一种嵌入式Key-Value数据库,数据实际保存在本地磁盘上。比起FsStateBackend的本地状态存储在内存中,RocksDB利用了磁盘空间,所以可存储的本地状态更大。然而,每次从RocksDB中读写数据都需要进行序列化和反序列化,因此读写本地状态的成本更高。快照执行时,Flink将存储于本地RocksDB的状态同步到远程的存储上,因此使用这种State
Backend时,也要配置分布式存储的地址。Asynchronous Snapshot在默认情况也是开启的。此外,这种State Backend允许增量快照(Incremental Checkpoint),Incremental
Checkpoint的核心思想是每次快照时只对发生变化的数据增量写到分布式存储上,而不是将所有的本地状态都拷贝过去。Incremental
Checkpoint非常适合超大规模的状态,快照的耗时将明显降低,同时,它的代价是重启恢复的时间更长。默认情况下,Incremental
Checkpoint没有开启,需要我们手动开启。
// 开启Incremental Checkpoint
val enableIncrementalCheckpointing =true
env.setStateBackend(newRocksDBStateBackend(checkpointPath, enableIncrementalCheckpointing))
相比FsStateBackend,RocksDBStateBackend能够支持的本地和远程状态都更大,Flink社区已经有TB级的案例。
除了上述三种之外,开发者也可以自行开发State Backend的具体实现。
五、配置推荐
几个配置参考原则
1.checkpoint不宜过于频繁,因为它的目的是记录状态,如果间隔太小,可能一段时间内状态并没有发生变化,会无意义浪费资源,影响性能
2.checkpoint目的是保存状态和offset,状态过大会导致checkpoint耗时过长,占用网络IO
3.状态大小跟业务的窗口大小,吞吐量,以及无窗口groupby和regular join(与时间间隔无关的join)配置的min和max保留时长有关
几个重要配置项参考(其他配置项请参考官方文档按需配置)
1.checkpoint周期:不宜频繁,一般在分钟级别,1~10分钟,不建议超过10分钟。出于的考虑是:如果任务失败(没有savepoint),重新恢复任务肯定只能从checkpoint恢复,那么周期越短,需要重新处理的数据越少,实时性恢复的越快。
2.耗时超时配置项checkpoint.timeout.ms:一般不动,至少大于checkpoint周期,可配置为最大能接受checkpoint周期间隔即可,默认是10分钟
3.between.checkpoints.min.ms:是两个cp最小间隔,目的是防止checkpoint积压:由于状态过大导致checkpoint耗时很长,上一个还没结束,下一个就已经开始这样积压起来。
4.checkpoints.num-retained:checkpoint保留数,默认值1.如果希望任务失败时可以重跑更长时间的数据,那么可以根据需要增加此保留数,无非是增加磁盘存储
5.其中还有个:任务取消后保留Checkpoint目录:这个涉及checkpoint保留策略,生产上按需设置,个人建议是取消作业仍然保留检查点:业务升级时如果改动了算子逻辑,需要重跑一段时间数据时可以指定从前几个checkpoint重启;但是要注意手动清理,尤其配置保留多个checkpoint时,会占用磁盘空间,savepoint同理
版权归原作者 阳呀么阳阳阳 所有, 如有侵权,请联系我们删除。