检查点(Checkpoint)
发生故障之后怎么办?最简单的想法当然是重启机器、重启应用。这里的问题在于,流处理应用中的任务都是有状态的,而为了快速访问这些状态一般会直接放在堆内存里;现在重启应用,内存中的状态已经丢失,就意味着之前的计算全部白费了,需要从头来过。就像编写文档或是玩 RPG 游戏,因为宕机没保存而要重来一遍是一件令人崩溃的事情;所以就有了存档,这样即使遇到宕机也可以读档继续了。
在流处理中,我们同样可以用存档读档的思路,把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。进一步地,我们知道在有状态的流处理中, 任务继续处理新数据,并不需要“之前的计算结果”,而是需要任务“之前的状态”。所以我们最终的选择,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点” (checkpoint)。
检查点是 Flink 容错机制的核心。这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把 checkpoint 叫作“一致性检查点”。
检查点的保存
什么时候进行检查点的保存呢?最理想的情况下,我们应该“随时”保存,也就是每处理完一个数据就保存一下当前的状态;这样如果在处理某条数据时出现故障,我们只要回到上一个数据处理完之后的状态,然后重新处理一遍这条数据就可以。这样重复处理的数据最少,完全没有多余操作,可以做到最低的延迟。然而实际情况不会这么完美。
周期性的触发保存
“随时存档”确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据 就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会受到影响。所以更好的方式是,每隔一段时间去做一次存档,这样既不会影响数据的正常处理,也不会有太大的延迟——毕竟故障恢复的情况不是随时发生的。**在 Flink 中,检查点的保存是周期性触发的,间隔时间可以进行设置。**
所以检查点作为应用状态的一份“存档”,其实就是所有任务状态在同一时间点的一个“快照”(snapshot),它的触发是周期性的。具体来说,当每隔一段时间检查点保存操作被触发时, 就把每个任当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点。
保存的时间点
这里有一个关键问题:当检查点的保存被触发时,任务有可能正在处理某个数据,这时该怎么办呢? 最简单的想法是,可以在某个时刻“按下暂停键”,让所有任务停止处理数据。这样状态就不再更改,大家可以一起复制保存;保存完毕之后,再同时恢复数据处理就可以了。 然而仔细思考就会发现这有很多问题。这种想法其实是粗暴地“停止一切来拍照”,在保存检查点的过程中,任务完全中断了,这会造成很大的延迟;我们之前为了实时性做出的所有设计就毁在了做快照上。另一方面,我们做快照的目的是为了故障恢复;现在的快照中,有些任务正在处理数据,那它保存的到底是处理到什么程度的状态呢?举个例子,我们在程序中某一步操作中自定义了一个 ValueState,处理的逻辑是:当遇到一个数据时,状态先加 1;而后 经过一些其他步骤后再加 1。现在停止处理数据,状态到底是被加了 1 还是加了 2 呢?这很重要,因为状态恢复之后,我们需要知道当前数据从哪里开始继续处理。要满足这个要求,就必须将暂停时的所有环境信息都保存下来——而这显然是很麻烦的。 为了解决这个问题,我们不应该“一刀切”把所有任务同时停掉,而是至少得先把手头正在处理的数据弄完。这样的话,我们在检查点中就不需要保存所有上下文信息,只要知道当前处理到哪个数据就可以了。 但这样依然会有问题:分布式系统的节点之间需要通过网络通信来传递数据,如果我们保存检查点的时候刚好有数据在网络传输的路上,那么下游任务是没法将数据保存起来的;故障重启之后,我们只能期待上游任务重新发送这个数据。然而上游任务是无法知道下游任务是否收到数据的,只能盲目地重发,这可能导致下游将数据处理两次,结果就会出现错误。 所以我们最终的选择是:当所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。 其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(source) 任务向数据源重新提交偏移量、请求重放数据就可以了。这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;Kafka 就是满足这些要求的一个最好的例子,我们会在后面详细讨论。
保存的具体流程
检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。下面我们通过一 个具体的例子,来详细描述一下检查点具体的保存过程。 回忆一下我们最初实现的统计词频的程序——WordCount。这里为了方便,我们直接从数 据源读入已经分开的一个个单词,例如这里输入的就是: “hello”“world”“hello”“flink”“hello”“world”“hello”“flink”…… 对应的代码就可以简化为:
SingleOutputStreamOperator<Tuple2<String,Long>> wordCountStream =
env.addSource(...).map(word ->Tuple2.of(word,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG));.keyBy(t -> t.f0).sum(1);
源(Source)任务从外部数据源读取数据,并记录当前的偏移量,作为算子状态(Operator State)保存下来。然后将数据发给下游的 Map 任务,它会将一个单词转换成(word, count)二元组,初始 count 都是 1,也就是(“hello”, 1)、(“world”, 1)这样的形式;这是一个无状态的算子任务。进而以 word 作为键(key)进行分区,调用.sum()方法就可以对 count 值进行求和统计了; Sum 算子会把当前求和的结果作为按键分区状态(Keyed State)保存下来。最后得到的就是当前单词的频次统计(word, count),如图所示。
当我们需要保存检查点(checkpoint)时,就是在所有任务处理完同一条数据后,对状态做个快照保存下来。例如上图中,已经处理了 3 条数据:“hello”“world”“hello”,所以我们 会看到 Source 算子的偏移量为 3;后面的 Sum 算子处理完第三条数据“hello”之后,此时已 经有 2 个“hello”和 1 个“world”,所以对应的状态为“hello”-> 2,“world”-> 1(这里 KeyedState 底层会以 key-value 形式存储)。此时所有任务都已经处理完了前三个数据,所以我们可以把当前的状态保存成一个检查点,写入外部存储中。至于具体保存到哪里,这是由状态后端的配置项 “ 检 查 点 存 储 ” ( CheckpointStorage )来决定的,可以有作业管理器的堆内存 (JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)两种选择。一般情况下,我们会将检查点写入持久化的分布式文件系统。
从检查点恢复状态
在运行流处理程序时,Flink 会周期性地保存检查点。当发生故障时,就需要找到最近一次成功保存的检查点来恢复状态。
例如在上节的 word count 示例中,我们处理完三个数据后保存了一个检查点。之后继续运行,又正常处理了一个数据“flink”,在处理第五个数据“hello”时发生了故障,如图 所示。
这里 Source 任务已经处理完毕,所以偏移量为 5;Map 任务也处理完成了。而 Sum 任务在处理中发生了故障,此时状态并未保存。
接下来就需要从检查点来恢复状态了。具体的步骤为:
(1)重启应用
遇到故障之后,第一步当然就是重启。我们将应用重新启动后,所有任务的状态会清空,如图 10-4 所示。
(2)读取检查点,重置状态
找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink 内部所有任务的状态,就恢复到了保存检查点的那一时刻,也就是刚好处理完第三个数据的时候,如图 所示。这里 key 为“flink”并没有数据到来,所以初始为 0。
(3)重放数据
从检查点恢复状态后还有一个问题:如果直接继续处理数据,那么保存检查点之后、到发生故障这段时间内的数据,也就是第 4、5 个数据(“flink”“hello”)就相当于丢掉了;这会造成计算结果的错误。
为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过 Source 任务向外部数据源重新提交偏移量(offset)来实现,如图 所示。
这样,整个系统的状态已经完全回退到了检查点保存完成的那一时刻。
(4)继续处理数据
接下来,我们就可以正常处理数据了。首先是重放第 4、5 个数据,然后继续读取后面的数据,如图 所示。
这里我们也可以发现,想要正确地从检查点中读取并恢复状态,必须知道每个算子任务状态的类型和它们的先后顺序(拓扑结构);因此为了可以从之前的检查点中恢复状态,我们在改动程序、修复 bug 时要保证状态的拓扑顺序和类型不变。状态的拓扑结构在 JobManager 上可以由 JobGraph 分析得到,而检查点保存的定期触发也是由 JobManager 控制的;所以故障恢复的过程需要 JobManager 的参与。
检查点算法
我们已经知道,Flink 保存检查点的时间点,是所有任务都处理完同一个输入数据的时候。 但是不同的任务处理数据的速度不同,当第一个 Source 任务处理到某个数据时,后面的 Sum 任务可能还在处理之前的数据;而且数据经过任务处理之后类型和值都会发生变化,面对着“面目全非”的数据,不同的任务怎么知道处理的是“同一个”呢?
一个简单的想法是,当接到 JobManager 发出的保存检查点的指令后,Source 算子任务处 理完当前数据就暂停等待,不再读取新的数据了。这样我们就可以保证在流中只有需要保存到 检查点的数据,只要把它们全部处理完,就可以保证所有任务刚好处理完最后一个数据;这时把所有状态保存起来,合并之后就是一个检查点了。样做最大的问题,当先保存完状态的任务需要等待其他任务时,就导致了资源的闲置和性能的降低。
所以更好的做法是,在不暂停整体流处理的前提下,将状态备份保存到检查点。在 Flink 中,采用了基于 Chandy-Lamport 算法的分布式快照,下面我们就来详细了解一下。
检查点分界线(Barrier)
我们现在的目标是,在不暂停流处理的前提下,让每个任务“认出”触发检查点保存的那个数据。 所以我们可以借鉴水位线(watermark)的设计,在数据流中插入一个特殊的数据结构, 专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source 任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。
这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫作检查点的 “分界线”(Checkpoint Barrier)。
与水位线很类似,检查点分界线也是一条特殊的数据,由 Source 算子注入到常规的数据流中,它的位置是限定好的,不能超过其他数据,也不能被后面的数据超过。检查点分界线中带有一个检查点 ID,这是当前要保存的检查点的唯一标识,如图 所示。
这样,分界线就将一条流逻辑上分成了两部分:分界线之前到来的数据导致的状态更改, 都会被包含在当前分界线所表示的检查点中;而基于分界线之后的数据导致的状态更改,则会被包含在之后的检查点中。
在 JobManager 中有一个“检查点协调器”(checkpoint coordinator),专门用来协调处理检 查点的相关工作。检查点协调器会定期向 TaskManager 发出指令,要求保存检查点(带着检查点 ID);TaskManager 会让所有的 Source 任务把自己的偏移量(算子状态)保存起来,并将带有检查点 ID 的分界线(barrier)插入到当前的数据流中,然后像正常的数据一样像下游传递; 之后 Source 任务就可以继续读入新的数据了。
每个算子任务只要处理到这个 barrier,就把当前的状态进行快照;在收到 barrier 之前, 还是正常地处理之前的数据,完全不受影响。比如上图中,Source 任务收到 1 号检查点保存 指令时,读取完了三个数据,所以将偏移量 3 保存到外部存储中;而后将 ID 为 1 的 barrier 注 入数据流;与此同时,Map 任务刚刚收到上一条数据“hello”,而 Sum 任务则还在处理之前的 第二条数据(world, 1)。下游任务不会在这时就立刻保存状态,而是等收到 barrier 时才去做快照,这时可以保证前三个数据都已经处理完了。同样地,下游任务做状态快照时,也不会影响 上游任务的处理,每个任务的快照保存并行不悖,不会有暂停等待的时间。
分布式快照算法
通过在流中插入分界线(barrier),我们可以明确地指示触发检查点保存的时间。在一条单一的流上,数据依次进行处理,顺序保持不变;不过对于分布式流处理来说,想要一直保持 数据的顺序就不是那么容易了。 我们先回忆一下水位线(watermark)的处理:上游任务向多个并行下游任务传递时,需要广播出去;而多个上游任务向同一个下游任务传递时,则需要下游任务为每个上游并行任务维护一个“分区水位线”,取其中最小的那个作为当前任务的事件时钟。 那 barier 在并行数据流中的传递,是不是也有类似的规则呢? watermark 指示的是“之前的数据全部到齐了”,而 barrier 指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。 具体实现上,Flink 使用了 Chandy-Lamport 算法的一种变体,被称为“异步分界线快照” (asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行下游任务发送 barrier 时,需要广播出去;而当多个上游任务向同一个下游任务传递 barrier 时, 需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区 的 barrier 都到齐,才可以开始状态的保存。 为了详细解释检查点算法的原理,我们对之前的 word count 程序进行扩展,考虑所有算子并行度为 2 的场景,如图 所示。
我们有两个并行的 Source 任务,会分别读取两个数据流(或者是一个源的不同分区)。这里每条流中的数据都是一个个的单词:“hello”“world”“hello”“flink”交替出现。此时第一 条流的 Source 任务(为了方便,下文中我们直接叫它“Source 1”,其他任务类似)读取了 3 个数据,偏移量为 3;而第二条流的 Source 任务(Source 2)只读取了一个“hello”数据,偏 移量为 1。第一条流中的第一个数据“hello”已经完全处理完毕,所以 Sum 任务的状态中 key 为 hello 对应着值 1,而且已经发出了结果(hello, 1);第二个数据“world”经过了 Map 任务的 转换,还在被 Sum 任务处理;第三个数据“hello”还在被 Map 任务处理。而第二条流的第一 个数据“hello”同样已经经过了 Map 转换,正在被 Sum 任务处理。
接下来就是检查点保存的算法。具体过程如下:
(1)JobManager 发送指令,触发检查点的保存;Source 任务保存状态,插入分界线 JobManager 会周期性地向每个 TaskManager 发送一条带有新检查点 ID 的消息,通过这 种方式来启动检查点。收到指令后,TaskManger 会在所有 Source 任务中插入一个分界线 (barrier),并将偏移量保存到远程的持久化存储中,如图 所示。
并行的 Source 任务保存的状态为 3 和 1,表示当前的 1 号检查点应该包含:第一条流中截至第三个数据、第二条流中截至第一个数据的所有状态更改。可以发现 Source 任务做这些 的时候并不影响后面任务的处理,Sum 任务已经处理完了第一条流中传来的(world, 1),对应的状态也有了更改。
(2)状态快照保存完成,分界线向下游传递
状态存入持久化存储之后,会返回通知给 Source 任务;Source 任务就会向 JobManager 确认检查点完成,然后像数据一样把 barrier 向下游任务传递,如图 所示。
由于 Source 和 Map 之间是一对一(forward)的传输关系(这里没有考虑算子链 operator chain),所以 barrier 可以直接传递给对应的 Map 任务。之后 Source 任务就可以继续读取新的数据了。与此同时,Sum 1 已经将第二条流传来的(hello,1)处理完毕,更新了状态。
(3)向下游多个并行子任务广播分界线,执行分界线对齐
Map 任务没有状态,所以直接将 barrier 继续向下游传递。这时由于进行了 keyBy 分区, 所以需要将 barrier 广播到下游并行的两个 Sum 任务,如图所示。同时,Sum 任务可能 收到来自上游两个并行 Map 任务的 barrier,所以需要执行“分界线对齐”操作。
此时的 Sum 2 收到了来自上游两个 Map 任务的 barrier,说明第一条流第三个数据、第二 条流第一个数据都已经处理完,可以进行状态的保存了;而 Sum 1 只收到了来自 Map 2 的 barrier,所以这时需要等待分界线对齐。在等待的过程中,如果分界线尚未到达的分区任务 Map 1 又传来了数据(hello, 1),说明这是需要保存到检查点的,Sum 任务应该正常继续处理数据,状态更新为 3;而如果分界线已经到达的分区任务 Map 2 又传来数据,这已经是下一个检查点要保存的内容了,就不应立即处理,而是要缓存起来、等到状态保存之后再做处理。
(4)分界线对齐后,保存状态到持久化存储
各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成 之后,同样将 barrier 向下游继续传递,并通知 JobManager 保存完毕,如图 所示。
这个过程中,每个任务保存自己的状态都是相对独立的,互不影响。我们可以看到,当 Sum 将当前状态保存完毕时,Source 1 任务已经读取到第一条流的第五个数据了。
(5)先处理缓存数据,然后正常继续处理
完成检查点保存之后,任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据。
当 JobManager 收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。
由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压(backpressure)时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。为了应对这种场景,Flink 1.11 之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据(in-flight data)也保存进检查点。这样,当我们遇到一个分区 barrier 时就不需等待对齐,而是可以直接启动状态的保存了。
版权归原作者 ambitfly 所有, 如有侵权,请联系我们删除。