1.Keyed DataStream
使用 keyed state,首先需要为
DataStream
指定 key(主键);这个 key 用于状态分区(数据流中的 Record 也会被分区)可以使用
DataStream
中 Java/Scala API 的
keyBy(KeySelector)
或者是 Python API 的
key_by(KeySelector)
来指定 key,将生成
KeyedStream
。
Key selector 函数接收单条 Record 作为输入,返回这条记录的 key,该 key 可以为任何类型,但是它的计算产生方式必须具备确定性,Flink 的数据模型不基于 key-value 对,将数据集在物理上封装成 key 和 value 是没有必要的,Key 是“虚拟”的,用以操纵分组算子。
案例: key selector 函数。
// some ordinary POJO
public class WC {
public String word;
public int count;
public String getWord() {
return word;
}
}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(WC::getWord);
Flink 有两种不同定义 key 的方式:
可以通过 tuple 字段索引,或者选取对象字段的表达式来指定 key 即 Tuple Keys 和 Expression Keys。
2.使用 Keyed State
a)概述
keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下,即这些状态仅可在
KeyedStream
上使用,在Java/Scala API上可以通过
stream.keyBy(...)
得到
KeyedStream
,在Python API上可以通过
stream.key_by(...)
得到
KeyedStream
。
支持的状态类型如下
ValueState
: 保存一个可以更新和检索的值(每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)这个值可以通过update(T)
进行更新,通过T value()
进行检索。ListState
: 保存一个元素的列表,可以往这个列表中追加数据,并在当前的列表上进行检索,通过add(T)
或者addAll(List)
添加元素,通过Iterable get()
获得整个列表,还可以通过update(List)
覆盖当前的列表。ReducingState
: 保存一个单值,表示添加到状态的所有值的聚合,使用add(T)
增加的元素会用提供的ReduceFunction
进行聚合。AggregatingState
: 保留一个单值,表示添加到状态的所有值的聚合,和ReducingState
相反的是,聚合类型可能与添加到状态的元素的类型不同,使用add(IN)
添加的元素会用指定的AggregateFunction
进行聚合。MapState
: 维护了一个映射列表,可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器,使用put(UK,UV)
或者putAll(Map)
添加映射,使用get(UK)
检索特定 key,使用entries()
,keys()
和values()
分别检索映射、键和值的可迭代视图,还可以通过isEmpty()
来判断是否包含任何键值对。
**所有类型的状态还有一个
clear()
方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。**
状态对象仅用于与状态交互,状态本身不一定存储在内存中,还可能在磁盘或其他位置;从状态中获取的值取决于输入元素所代表的 key,在不同 key 上调用同一个接口,可能得到不同的值。
必须创建一个
StateDescriptor
,才能得到对应的状态句柄,它保存了状态名称, 状态所持有值的类型,可能包含用户指定的函数,例如
ReduceFunction
,根据不同的状态类型,可以创建
ValueStateDescriptor
,
ListStateDescriptor
,
AggregatingStateDescriptor
,
ReducingStateDescriptor
或
MapStateDescriptor
。
**状态通过
RuntimeContext
进行访问,只能在 rich functions 中使用,
RichFunction
中
RuntimeContext
提供如下方法**:
ValueState getState(ValueStateDescriptor)
ReducingState getReducingState(ReducingStateDescriptor)
ListState getListState(ListStateDescriptor)
AggregatingState getAggregatingState(AggregatingStateDescriptor)
MapState getMapState(MapStateDescriptor)
案例:
FlatMapFunction
使用状态
实现了计数窗口,把元组的第一个元素当作 key,该函数将出现的次数以及总和存储在 “ValueState” 中,一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。
注意:会为每个不同的 key(元组中第一个元素)保存一个单独的值。
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(value -> value.f0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
b)状态有效期 (TTL)
参数配置配置2配置3状态的可见性NeverReturnExpired 不返回过期数据ReturnExpiredIfNotCleanedUp 会返回过期但未清理的数据无TTL 更新策略OnCreateAndWrite 仅在创建和写入时更新OnReadAndWrite 读取和写入时更新无状态清理策略cleanupFullSnapshot 全量快照时进行清理cleanupIncrementally 增量数据清理cleanupInRocksdbCompactFilter RocksDB 压缩过滤器
任何类型的 keyed state 都可以有 **有效期 (TTL)**,如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值,所有状态类型都支持单元素的 TTL,列表元素和映射元素将独立到期。
在使用状态 TTL 前,需要先构建一个
StateTtlConfig
配置对象,然后把配置传递到 state descriptor 中启用 TTL 功能:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
TTL 配置有以下几个选项:
newBuilder
的第一个参数表示数据的有效期,是必选项;
TTL 的更新策略(默认是
OnCreateAndWrite
):
StateTtlConfig.UpdateType.OnCreateAndWrite
- 仅在创建和写入时更新StateTtlConfig.UpdateType.OnReadAndWrite
- 读取时也更新(注意: 如果同时将状态的可见性配置为StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
, 那么在PyFlink作业中,状态的读缓存将会失效,这将导致一部分的性能损失)
**数据在过期但还未被清理时的可见性配置如下(默认为
NeverReturnExpired
)**:
StateTtlConfig.StateVisibility.NeverReturnExpired
- 不返回过期数据(注意: 在PyFlink作业中,状态的读写缓存都将失效,这将导致一部分的性能损失)StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
- 会返回过期但未清理的数据
NeverReturnExpired
情况下,过期数据就像不存在一样,不管是否被物理删除,这对于不能访问过期数据的场景下非常有用,比如敏感数据,
ReturnExpiredIfNotCleanedUp
在数据被物理删除前都会返回。
注意:
- 状态上次的修改时间会和数据一起保存在 state backend 中,开启该特性会增加状态数据的存储;Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend 会在每个状态值(list 或者 map 的每个元素)序列化后增加 8 个字节。
- 暂时只支持基于 processing time 的 TTL。
- 尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。
- TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
- 不建议 checkpoint 恢复前后将 state TTL 从短调长,这可能会产生潜在的数据错误。
- 当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null;如果用户值序列化器不支持 null, 可以用
NullableSerializer
包装一层。 - 启用 TTL 配置后,
StateDescriptor
中的defaultValue
(已标记deprecated
)将会失效,在此基础上,用户需要手动管理那些实际值为 null 或已过期的状态默认值。
c)过期数据的清理
默认情况下,过期数据会在读取的时候被删除,例如
ValueState#value
,会有后台线程定期清理(需要 StateBackend 支持)可以通过
StateTtlConfig
配置关闭后台清理。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.disableCleanupInBackground()
.build();
可以配置更细粒度的后台清理策略,当前的实现中
HeapStateBackend
依赖增量数据清理,
RocksDBStateBackend
利用压缩过滤器进行后台清理。
d)全量快照时进行清理
可以启用全量快照时进行清理的策略,可以减少整个快照的大小,当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据,该策略可以通过
StateTtlConfig
进行配置。
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.cleanupFullSnapshot()
.build();
这种策略在
RocksDBStateBackend
的增量 checkpoint 模式下无效。
注意:
- 这种清理方式可以在任何时候通过
StateTtlConfig
启用或者关闭,比如在从 savepoint 恢复时。
e)增量数据清理
可以选择增量式清理状态数据,在状态访问或/和处理时进行,如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器,每次触发增量清理时,从迭代器中选择已经过期的数进行清理。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.cleanupIncrementally(10, true)
.build();
该策略有两个参数
- 第一个参数表示每次清理时检查状态的条目数,在每个状态访问时触发;
- 第二个参数表示是否在处理每条记录时触发清理,Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。
注意:
- 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
- 增量清理会增加数据处理的耗时。
- 现在仅 Heap state backend 支持增量清除机制,在 RocksDB state backend 上启用该特性无效。
- 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用,但异步快照则没有这个问题。
- 对已有的作业,这个清理方式可以在任何时候通过
StateTtlConfig
启用或禁用该特性,比如从 savepoint 重启后。
f)在 RocksDB 压缩时清理
如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器,RocksDB 会周期性的对数据进行合并压缩从而减少存储空间,Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
.cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
.build();
Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 可以通过
StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)
方法指定处理状态的条数;时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能;RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。
定期压缩可以加速过期状态条目的清理,特别是对于很少访问的状态条目,比这个值早的文件将被选取进行压缩,并重新写入与之前相同的 Level 中,该功能可以确保文件定期通过压缩过滤器压缩,可以通过
StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime)
方法设定定期压缩的时间,定期压缩的时间的默认值是 30 天,可以将其设置为 0 以关闭定期压缩或设置一个较小的值以加速过期状态条目的清理,但它将会触发更多压缩。
还可以通过配置开启 RocksDB 过滤器的 debug 日志:
log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
注意:
- 压缩时调用 TTL 过滤器会降低速度,TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查,对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
- 对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器, 从而确定下一个未过期数据的位置。
- 对已有的作业,这个清理方式可以在任何时候通过
StateTtlConfig
启用或禁用该特性,比如从 savepoint 重启后。 - 定期压缩功能只在 TTL 启用时生效。
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。