最近面试问的最多的就是Flink如何进行容错的,总结一下关于checkpoint的知识点
1、checkpoint是什么
将任务运行时的状态做一个快照,保存到稳定的存储系统中,当任务发生重启时,可以从保存的状态中重新恢复任务。
原理可以这么理解,当数据x从source出发的时候,JM会给x一个令牌,告诉x已经被监视,x把令牌展示给经过的operator,operator就把自己对x的state进行汇报到JM(持久化),一直到最后,JM收到所有的反馈后,就认为x已经完成了。如果x还没完成,任务就挂掉了。在从checkpoint恢复的时候,是从最近的一次(可指定,一般用最近的,除非你最近的有问题,你要回滚)checkpoint恢复状态,告诉任务x走了那一步,到什么位置了。
2、checkpoint存的什么
一般来说存储的是state的数据,另外也可以保存kafka的offset(数据重放)
同时既然存储的是数据,那么也需要进行定期清理。
3、checkpoint存到什么地方?
三种存储方式:
第一种是基于内存的,叫做MemoryStateBackend,将状态维护在jvm的堆中,默认大小为5M,一般用于本地测试,或者较小数据规模的场景,问题是一旦任务失败死掉是无法恢复的。
第二种是基于文件的,叫做FsStateBackend,一般将数据维护在hdfs中,TM的状态数据存到hdfs中,JM的元数据存储还在内存中,也有大小限制,因为是全量将数据写到hdfs,那么你的数据大小必须小于TM可使用的内存,适合较大状态的数据,一旦失败可以flink -s 指定hdfs checkpoint位置进行重启,恢复任务状态,保证高可用。
第三种是基于DB的,叫做RocksDBStateBackend,RocksDB 是一种嵌入式的本地数据库,TM会直接与DB建立联系,进行增量的数据存储,适合大状态的数据存储,保证高可用,带来的影响无非是读的时候会变慢。
需要进行外部依赖引入
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.10.0</version></dependency>
4、清理冗余的state数据
两种策略清理checkpoint:
1、DELETE_ON_CANCELLATION:仅当作业失败时,作业的 Checkpoint 才会被保留用于任务恢复。当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务。
2、RETAIN_ON_CANCELLATION:当作业手动取消时,将会保留作业的 Checkpoint 状态信息。注意,这种情况下,需要手动清除该作业保留的 Checkpoint 状态信息,否则这些状态信息将永远保留在外部的持久化存储中。
两种策略清理state:
1、Flink1.8.0引入了基于TTL的对于过期状态的清理,属于惰性删除。
2、手动删除,比如每天用户的pv,每天定时删除key就可以了,案例见5.1中的博客
5、使用checkpoint的场景
既然是为了容错,那么肯定是某些因素导致不稳定,从前用到checkpoint,这儿举几个例子
1、任务意外挂掉,比如某台flink节点突然关机了,可以先看我之前写过的一个博客:https://blog.csdn.net/jklcl/article/details/119395449?spm=1001.2014.3001.5502 (Flink基于State做千万用户的pv)
2、系统迁移/升级,flink集群发生变化,只要hdfs没有被干掉,任务就能从丢失的状态下进行回复
3、代码bug,此处的bug不是致命的bug,只是业务逻辑上的bug修改修改,如果是对state中的数据进行了很大的改造,其实重启后的数据依然是不可用的。
版权归原作者 来一块提拉米苏 所有, 如有侵权,请联系我们删除。