0


11、Flink 的 Keyed State 详解

1.Keyed DataStream

使用 keyed state,首先需要为

DataStream

指定 key(主键);这个 key 用于状态分区(数据流中的 Record 也会被分区)可以使用

DataStream

中 Java/Scala API 的

keyBy(KeySelector)

或者是 Python API 的

key_by(KeySelector)

来指定 key,将生成

KeyedStream

Key selector 函数接收单条 Record 作为输入,返回这条记录的 key,该 key 可以为任何类型,但是它的计算产生方式必须具备确定性,Flink 的数据模型不基于 key-value 对,将数据集在物理上封装成 key 和 value 是没有必要的,Key 是“虚拟”的,用以操纵分组算子。

案例: key selector 函数。

// some ordinary POJO
public class WC {
  public String word;
  public int count;

  public String getWord() { 
    return word; 
  }
}

DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
  .keyBy(WC::getWord);

Flink 有两种不同定义 key 的方式

可以通过 tuple 字段索引,或者选取对象字段的表达式来指定 key 即 Tuple Keys 和 Expression Keys。

2.使用 Keyed State
a)概述

keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下,即这些状态仅可在

KeyedStream

上使用,在Java/Scala API上可以通过

stream.keyBy(...)

得到

KeyedStream

,在Python API上可以通过

stream.key_by(...)

得到

KeyedStream

支持的状态类型如下

  • ValueState: 保存一个可以更新和检索的值(每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
  • ListState: 保存一个元素的列表,可以往这个列表中追加数据,并在当前的列表上进行检索,通过 add(T) 或者 addAll(List) 添加元素,通过 Iterable get() 获得整个列表,还可以通过 update(List) 覆盖当前的列表。
  • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合,使用 add(T) 增加的元素会用提供的 ReduceFunction 进行聚合。
  • AggregatingState: 保留一个单值,表示添加到状态的所有值的聚合,和 ReducingState 相反的是,聚合类型可能与添加到状态的元素的类型不同,使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
  • MapState: 维护了一个映射列表,可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器,使用 put(UK,UV) 或者 putAll(Map) 添加映射,使用 get(UK) 检索特定 key,使用 entries()keys()values() 分别检索映射、键和值的可迭代视图,还可以通过 isEmpty() 来判断是否包含任何键值对。

**所有类型的状态还有一个

clear()

方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。**

状态对象仅用于与状态交互,状态本身不一定存储在内存中,还可能在磁盘或其他位置;从状态中获取的值取决于输入元素所代表的 key,在不同 key 上调用同一个接口,可能得到不同的值。

必须创建一个

StateDescriptor

,才能得到对应的状态句柄,它保存了状态名称, 状态所持有值的类型,可能包含用户指定的函数,例如

ReduceFunction

,根据不同的状态类型,可以创建

ValueStateDescriptor

ListStateDescriptor

AggregatingStateDescriptor

,

ReducingStateDescriptor

MapStateDescriptor

**状态通过

RuntimeContext

进行访问,只能在 rich functions 中使用,

RichFunction

RuntimeContext

提供如下方法**:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState getAggregatingState(AggregatingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

案例

FlatMapFunction

使用状态

实现了计数窗口,把元组的第一个元素当作 key,该函数将出现的次数以及总和存储在 “ValueState” 中,一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。

注意:会为每个不同的 key(元组中第一个元素)保存一个单独的值。

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)
b)状态有效期 (TTL)

参数配置配置2配置3状态的可见性NeverReturnExpired 不返回过期数据ReturnExpiredIfNotCleanedUp 会返回过期但未清理的数据无TTL 更新策略OnCreateAndWrite 仅在创建和写入时更新OnReadAndWrite 读取和写入时更新无状态清理策略cleanupFullSnapshot 全量快照时进行清理cleanupIncrementally 增量数据清理cleanupInRocksdbCompactFilter RocksDB 压缩过滤器
任何类型的 keyed state 都可以有 **有效期 (TTL)**,如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值,所有状态类型都支持单元素的 TTL,列表元素和映射元素将独立到期。

在使用状态 TTL 前,需要先构建一个

StateTtlConfig

配置对象,然后把配置传递到 state descriptor 中启用 TTL 功能:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Duration.ofSeconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

TTL 配置有以下几个选项

newBuilder

的第一个参数表示数据的有效期,是必选项;

TTL 的更新策略(默认是

OnCreateAndWrite

):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新
  • StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新(注意: 如果同时将状态的可见性配置为 StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp, 那么在PyFlink作业中,状态的读缓存将会失效,这将导致一部分的性能损失)

**数据在过期但还未被清理时的可见性配置如下(默认为

NeverReturnExpired

)**:

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据(注意: 在PyFlink作业中,状态的读写缓存都将失效,这将导致一部分的性能损失)
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据
NeverReturnExpired

情况下,过期数据就像不存在一样,不管是否被物理删除,这对于不能访问过期数据的场景下非常有用,比如敏感数据,

ReturnExpiredIfNotCleanedUp

在数据被物理删除前都会返回。

注意:

  • 状态上次的修改时间会和数据一起保存在 state backend 中,开启该特性会增加状态数据的存储;Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend 会在每个状态值(list 或者 map 的每个元素)序列化后增加 8 个字节。
  • 暂时只支持基于 processing time 的 TTL。
  • 尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。
  • TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
  • 不建议 checkpoint 恢复前后将 state TTL 从短调长,这可能会产生潜在的数据错误。
  • 当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null;如果用户值序列化器不支持 null, 可以用 NullableSerializer 包装一层。
  • 启用 TTL 配置后,StateDescriptor 中的 defaultValue(已标记 deprecated)将会失效,在此基础上,用户需要手动管理那些实际值为 null 或已过期的状态默认值。
c)过期数据的清理

默认情况下,过期数据会在读取的时候被删除,例如

ValueState#value

,会有后台线程定期清理(需要 StateBackend 支持)可以通过

StateTtlConfig

配置关闭后台清理。

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Duration.ofSeconds(1))
    .disableCleanupInBackground()
    .build();

可以配置更细粒度的后台清理策略,当前的实现中

HeapStateBackend

依赖增量数据清理,

RocksDBStateBackend

利用压缩过滤器进行后台清理。

d)全量快照时进行清理

可以启用全量快照时进行清理的策略,可以减少整个快照的大小,当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据,该策略可以通过

StateTtlConfig

进行配置。

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Duration.ofSeconds(1))
    .cleanupFullSnapshot()
    .build();

这种策略在

RocksDBStateBackend

的增量 checkpoint 模式下无效。

注意:

  • 这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时。
e)增量数据清理

可以选择增量式清理状态数据,在状态访问或/和处理时进行,如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器,每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Duration.ofSeconds(1))
    .cleanupIncrementally(10, true)
    .build();

该策略有两个参数

  • 第一个参数表示每次清理时检查状态的条目数,在每个状态访问时触发;
  • 第二个参数表示是否在处理每条记录时触发清理,Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。

注意:

  • 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
  • 增量清理会增加数据处理的耗时。
  • 现在仅 Heap state backend 支持增量清除机制,在 RocksDB state backend 上启用该特性无效。
  • 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用,但异步快照则没有这个问题。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
f)在 RocksDB 压缩时清理

如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器,RocksDB 会周期性的对数据进行合并压缩从而减少存储空间,Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Duration.ofSeconds(1))
    .cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
    .build();

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 可以通过

StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)

方法指定处理状态的条数;时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能;RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。

定期压缩可以加速过期状态条目的清理,特别是对于很少访问的状态条目,比这个值早的文件将被选取进行压缩,并重新写入与之前相同的 Level 中,该功能可以确保文件定期通过压缩过滤器压缩,可以通过

StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime)

方法设定定期压缩的时间,定期压缩的时间的默认值是 30 天,可以将其设置为 0 以关闭定期压缩或设置一个较小的值以加速过期状态条目的清理,但它将会触发更多压缩。

还可以通过配置开启 RocksDB 过滤器的 debug 日志:

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

  • 压缩时调用 TTL 过滤器会降低速度,TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查,对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
  • 对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器, 从而确定下一个未过期数据的位置。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
  • 定期压缩功能只在 TTL 启用时生效。
标签: flink 大数据

本文转载自: https://blog.csdn.net/m0_50186249/article/details/138369800
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。

“11、Flink 的 Keyed State 详解”的评论:

还没有评论