学习文档:Flink 官方文档 - DataStream API - 状态与容错 - 使用状态
相关文档:
- 有状态流处理背后的概念:Flink|《Flink 官方文档 - 概念透析 - 有状态流处理》学习笔记
- Redis 过期 key 的删除机制:Redis|过期 key 的删除机制
学习笔记如下:
键控流(Keyed DataStream)
如果要使用键控状态,则必须要为 DataStream 指定 key。这个主键将用于对数据流中的记录分区,同时也会用于状态分区。
可以使用 DataStream 中的
keyBy(KeySelector)
(Java / Scala)或
key_by(KeySelector)
来指定 key,在指定 key 后,数据流将变成键控流(KeyedStream),并允许使用基于 Keyed state 的操作。
KeySelector 接受每条记录作为输入,并返回这条记录的 key。该 key 可以是任何类型,但它的计算产生方式必须是具有确定性的(详见 Flink|《Flink 官方文档 - 概念透析 - 有状态流处理》学习笔记)。例如:
// some ordinary POJOpublicclassWC{publicString word;publicint count;publicStringgetWord(){return word;}}DataStream<WC> words =// [...]KeyedStream<WC> keyed = words
.keyBy(WC::getWord);
Flink 的数据类型并不基于 key - value 对,因此实际上将数据集在物理上封装为 key 和 value 是没有必要的。
键控状态(Keyed State)
以下 Keyed State 只能在 KeyedStream 上使用:
ValueState<T>
:保存一个可以更新和检索的值;这个值可以通过update(T)
进行更新,通过T value()
进行检索。ListState<T>
:保存一个元素的列表;可以通过add(T)
或者addAll(List<T>)
添加元素,通过Iterable<T> get()
获取整个列表,通过update(List<T>)
覆盖当前的列表。ReducingState<T>
:保存一个值,表示添加到状态的所有值聚合后的结果;使用add(T)
添加元素,并使用提供的reduceFunction
进行聚合。AggregatingState<IN, OUT>
:保存一个值,表示添加到状态的所有值聚合后的结果;使用add(T)
添加元素,并使用提供的AggregateFunction
进行聚合。与ReducingState
不同的时,聚合类型可能与添加到状态的元素类型不同。MapState<UK, UV>
:保存一个映射列表;可以使用put(UK, UV)
或putAll(Map<UK, UV>)
添加映射,使用get(UK)
来检索特定的 key,使用entires()
、keys()
、values()
分别检索映射、键和值的可迭代视图,使用isEmpty()
判断是否包含任何键值对。
所有的类型状态还有一个
clear()
方法,用于清除当前 key 下的状态数据,也就是当前输入元素的 key。
需要注意的是:
- 这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。
- 从状态中获取的值取决于输入元素所代表的 Key,在不同 key 上调用同一个接口,可能得到不同的值。
在使用中,必须创建一个
StateDescriptor
,才能获得对应的状态句柄。在状态句柄中,记录了状态名称、状态所持有值的类型以及用户所指定的函数。根据不同的状态类型,可以创建
ValueStateDescriptor
、
ListStateDescriptor
、
AggregatingStateDescriptor
、
ReducingStateDescriptor
或
MapStateDescriptor
。
状态通过
RuntimeContext
进行访问,因此只能在
rich functions
中使用。
样例:计数窗口,这个 UDF 会计算每两个相邻的元素的平均值并发送到下游。
publicclassCountWindowAverageextendsRichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Long>>{/** * The ValueState handle. The first field is the count, the second field a running sum. */privatetransientValueState<Tuple2<Long,Long>> sum;@OverridepublicvoidflatMap(Tuple2<Long,Long> input,Collector<Tuple2<Long,Long>> out)throwsException{// access the state valueTuple2<Long,Long> currentSum = sum.value();// update the count currentSum.f0 +=1;// add the second field of the input value currentSum.f1 += input.f1;// update the state sum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif(currentSum.f0 >=2){ out.collect(newTuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear();}}@Overridepublicvoidopen(Configuration config){ValueStateDescriptor<Tuple2<Long,Long>> descriptor =newValueStateDescriptor<>("average",// the state nameTypeInformation.of(newTypeHint<Tuple2<Long,Long>>(){}),// type informationTuple2.of(0L,0L));// default value of the state, if nothing was set sum =getRuntimeContext().getState(descriptor);}}// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env) env.fromElements(Tuple2.of(1L,3L),Tuple2.of(1L,5L),Tuple2.of(1L,7L),Tuple2.of(1L,4L),Tuple2.of(1L,2L)).keyBy(value -> value.f0).flatMap(newCountWindowAverage()).print();// the printed output will be (1,4) and (1,5)
- 定义 UDF 的
ValueState
类型的私有属性sum
,其值为一个元组,元组中第一个元素用于存储计数结果,第二个元素存储求和结果。- 在
open()
方法中,定义了状态句柄ValueStateDescriptor
,定义了状态名称、状态类型和状态的初始值,并将其状态存储到属性sum
中。- 使用
sum.value()
获取当前状态的值- 使用
sum.updaste()
更新当前状态的值- 使用
sum.clear()
清空当前状态的值
状态有效期(TTL)
任何类型的 Keyed State 都可以设置有效期(TTL)。如果配置了 TTL 且状态已过期,则会尽最大可能清除对应的值。
任何状态类型都支持单元素的 TTL。这意味着列表元素和映射元素将单独计算到期时间。
在使用 TTL 前,需要先构建
StateTtlConfig
配置对象,然后把配置传递到 State Descriptor 中启用 TTL 功能。
TTL 配置的选项
- 数据的有效期:
newBuilder()
的第一个参数,必选 - 更新策略:
setUpdateType()
的第一个参数,可选,默认为onCreateAndWrite
-StateTtlConfig.UpdateType.onCreateAndWrite
:仅在创建和写入时更新-StateTtlConfig.UpdateType.onReadAndWrite
:在读取时也更新 - 数据在未被清理时的可见性配置:
setStateVisibility()
的第一个参数,可选,默认为NeverReturnedExpired
-StateTtlConfig.StateVisibility.NeverReturnExpired
- 不返回过期数据-StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
- 会返回过期但未清理的数据 - 关门后台清理:
disableCleanupInBackground()
,可选,添加则关闭过期数据的后台清理 - 开启全量快照时清理:
cleanupFullSnapshot()
,可选,添加则开启在全来那个快照时进行清理 - 开启 Heap Backend 增量数据清理:
cleanupIncrementally()
,可选,添加则在访问和处理时进行检查过期数据并清理 - 开启 RcoksDB Backend 压缩时数据清理:
cleanupInRocksdbCompactFilter()
,可选,添加在开启压缩时数据清理
需要注意的是:因为在开启 TTL 特性后,状态上次的修改时间会和数据一起保存在 state backend 中,所以开启这个特性会增加状态数据的存储。
TTL 的清理策略
默认情况下,过期数据会在读取的时候被删除,同时也会有后台进程定期清理。
在实现上,
HeapStateBackend
依赖增量数据清理,
RocksDBStateBackend
利用压缩过滤器进行后台清理。
- 全量快照时进行清理:在全量快照时进行清理的策略,可以减少整体快照的大小。当前实现中不会清理本地状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。这种清理策略可以在任何时候通过
StateTtlConfig
启动或者关闭。 - 增量数据清理:如果开启增量式清理状态数据,在会状态访问和处理时进行清理。对于开启了增量数据清理策略的状态,会在存储后端保留一个所有状态的惰性全局迭代器,每次出发增量清理时,从迭代器中选择已经过期的数据进行清理。该策略有两个参数,第一个表示每次清理时检查状态的条目数,第二个参数表示是否在处理每条记录时都触发清理。Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。
- 压缩时清理:RcoksDB 会周期性地对数据进行合并压缩从而减少存储空间,Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的数据。该策略有一个参数,该参数表示每处理多少条数据进行一次清理。
Flink 的状态清理策略与 Redis 的被动清理 + 主动清理有很多相似之处,详见 Redis|过期 key 的删除机制。
算子状态(Operator State)
算子状态是绑定到一个并行算子实例的状态。例如,Kafka Connector 是 Flink 中就使用了算子状态,Kafka consumer 的每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。当并行度改变的时候,算子状态支持将状态重新分发给各并行算子实例。
算子状态通常用于实现 source / sink,以及一些没有 key 而无法对 state 进行分区的场景。
广播状态(Broadcast State)
广播状态时一种特殊的算子状态,用于将状态广播到所有下游任务。通过广播状态,可以保持所有子任务状态相同。
广播状态与其他算子状态的差异:
- 它具有 map 格式
- 仅在输入为一个广播数据流和一个非广播数据流的算子中可用
- 可以拥有多个不同名称的广播状态
使用算子状态
通过实现
CheckpointedFunction
接口来使用算子状态。在
CheckpointedFunction
中提供了访问 non-keyed state 的方法,需要实现如下两个方法:
void snapshotState(FunctionSnapshotContext context) throws Exception
:在进行 checkpoint 时调用void initializeState(FunctionInitializationContext context) throws Exception
:在 UDF 初始化时调用,这里的初始化包括第一次启动时的初始化,以及从 checkpoint 恢复的初始化。
当前算子状态会以 list 的形式存在,这些状态彼此独立,方便在改变并发后进行状态的重新分派。有如下几种重新分配的模式:
Even-split redistribution
:每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。Union redistribution
:每个算子保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,每个算子都将得到所有的状态数据。如果状态的数量很大时不要使用这个特性,可能导致内存溢出的问题。
样例:
SinkFunction
在
CheckpointedFunction
中进行数据缓存,然后统一发送到下游。
publicclassBufferingSinkimplementsSinkFunction<Tuple2<String,Integer>>,CheckpointedFunction{privatefinalint threshold;privatetransientListState<Tuple2<String,Integer>> checkpointedState;privateList<Tuple2<String,Integer>> bufferedElements;publicBufferingSink(int threshold){this.threshold = threshold;this.bufferedElements =newArrayList<>();}@Overridepublicvoidinvoke(Tuple2<String,Integer> value,Context contex)throwsException{ bufferedElements.add(value);if(bufferedElements.size()>= threshold){for(Tuple2<String,Integer> element: bufferedElements){// send it to the sink} bufferedElements.clear();}}@OverridepublicvoidsnapshotState(FunctionSnapshotContext context)throwsException{ checkpointedState.clear();for(Tuple2<String,Integer> element : bufferedElements){ checkpointedState.add(element);}}@OverridepublicvoidinitializeState(FunctionInitializationContext context)throwsException{ListStateDescriptor<Tuple2<String,Integer>> descriptor =newListStateDescriptor<>("buffered-elements",TypeInformation.of(newTypeHint<Tuple2<String,Integer>>(){})); checkpointedState = context.getOperatorStateStore().getListState(descriptor);if(context.isRestored()){for(Tuple2<String,Integer> element : checkpointedState.get()){ bufferedElements.add(element);}}}
initializeState()
:接受一个FunctionInitializationContext
参数,并用来初始化 non-keyed state 的容器,这个容器是一个ListState
类型的checkpointedState
吗,用于在 checkpoint 时保存 non-keyed state 对戏那个。与 keyed state 类似,在初始化时状态句柄descriptor
时,也会包括状态名称、状态类型等信息。如果是从 checkpoint 中恢复(即context.isRestored()
),则将checkpointedState
中的元素读取并添加到bufferedElements
中。snapshotState()
:在快照时,清空checkpointedState
并将bufferedElements
中缓存的元素全部添加到checkpointedState
中。invoke()
:将传入的数据添加到bufferedElements
中进行缓存;当缓存数量达到阈值后统一写出并将缓存清空。
在调用
getOperatorStateStore()
后,调用不同的获取状态对象的接口,会使用不同的状态分配算法。例如调用
getUnionListState(descriptor)
会使用 union redistribution 算法,而调用
getListState(descriptor)
则会使用 even-split redistribution 算法。
使用带状态的 Source Function
样例:
publicstaticclassCounterSourceextendsRichParallelSourceFunction<Long>implementsCheckpointedFunction{/** current offset for exactly once semantics */privateLong offset =0L;/** flag for job cancellation */privatevolatileboolean isRunning =true;/** 存储 state 的变量. */privateListState<Long> state;@Overridepublicvoidrun(SourceContext<Long> ctx){finalObject lock = ctx.getCheckpointLock();while(isRunning){// output and state update are atomicsynchronized(lock){ ctx.collect(offset); offset +=1;}}}@Overridepublicvoidcancel(){ isRunning =false;}@OverridepublicvoidinitializeState(FunctionInitializationContext context)throwsException{ state = context.getOperatorStateStore().getListState(newListStateDescriptor<>("state",LongSerializer.INSTANCE));// 从我们已保存的状态中恢复 offset 到内存中,在进行任务恢复的时候也会调用此初始化状态的方法for(Long l : state.get()){ offset = l;}}@OverridepublicvoidsnapshotState(FunctionSnapshotContext context)throwsException{ state.clear(); state.add(offset);}}
run()
:为了保证更新状态以及输出的原子性(用于实现 exactly-once 语义),需要在发送数据前获取数据源的全局锁。snapshotState()
:在快照时,我们存储当前偏移量即可。initializeState()
:在启动或恢复时,我们需要恢复偏移量。
版权归原作者 长行 所有, 如有侵权,请联系我们删除。