1、State Backends 概述
Flink 提供了多种 state backends,它用于指定状态的存储方式和位置。
状态可以位于 Java 的堆或堆外内存;取决于你的 state backend,Flink 也可以自己管理应用程序的状态。
为了让应用程序可以维护非常大的状态,Flink 可以自己管理内存(如果有必要可以溢写到磁盘)默认情况下,所有 Flink Job 会使用 Flink 配置文件中指定的 state backend,但配置文件中指定的 state backend 会被 Job 中指定的 state backend 覆盖。
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
env.configure(config);
2、使用State Backends
1.State Backends 作用
用 Data Stream API 编写的程序通常以各种形式保存状态:
- 在 Window 触发之前要么收集元素、要么聚合;
- 转换函数可以使用 key/value 格式的状态接口来存储状态;
- 转换函数可以实现
CheckpointedFunction
接口,使其本地变量具有容错能力;
在启动 CheckPoint 机制时,状态会随着 CheckPoint 而持久化,以防止数据丢失、保障恢复时的一致性;状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 State Backend。
2.可用的 State Backends
Flink 内置了以下 state backends :
- HashMapStateBackend
- EmbeddedRocksDBStateBackend
默认使用 HashMapStateBackend。
a)HashMapStateBackend
在 HashMapStateBackend 内部,数据以 Java 对象的形式存储在堆中;Key/value 形式的状态和窗口算子会持有一个 hash table,其中存储着状态值、触发器。
HashMapStateBackend 的适用场景:
- 有较大 state,较长 window 和较大 key/value 状态的 Job。
- 所有的高可用场景。
建议将 managed memory 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。
与 EmbeddedRocksDBStateBackend 不同的是,由于 HashMapStateBackend 将数据以对象形式存储在堆中,因此重用这些对象数据是不安全的。
b)EmbeddedRocksDBStateBackend
EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录;不同于
HashMapStateBackend
中的 java 对象,数据被以序列化字节数组的方式存储,这种方式由序列化器决定,因此 key 之间的比较是以字节序的形式进行而不是使用 Java 的
hashCode
或
equals()
方法。
EmbeddedRocksDBStateBackend 会使用异步的方式生成 snapshots。
EmbeddedRocksDBStateBackend 的局限:
- 由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节;RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,会在下一次获取数据时失败,这是当前 RocksDB JNI 的限制。
EmbeddedRocksDBStateBackend 的适用场景:
- 状态非常大、窗口非常长、key/value 状态非常大的 Job。
- 所有高可用的场景。
注意:
- 保留的状态大小仅受磁盘空间的限制;
- 与状态存储在内存中的 HashMapStateBackend 相比,EmbeddedRocksDBStateBackend 允许存储非常大的状态;
- 使用 EmbeddedRocksDBStateBackend 将会使应用程序的最大吞吐量降低,所有的读写都必须序列化、反序列化,比基于堆内存的 state backend 的效率要低很多;
- 因为需要序列化、反序列化,重用放入 EmbeddedRocksDBStateBackend 的对象是安全的。
EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend。
可以使用 RocksDB 的本地指标(metrics),但默认是关闭的;每个 slot 中的 RocksDB instance 的内存大小是有限制的。
3.选择合适的 State Backend
**在选择
HashMapStateBackend
和
RocksDB
时,是在性能与可扩展性之间权衡**;
HashMapStateBackend
非常快,因为每个状态的读取和算子对于 objects 的更新都是在 Java 的 heap 上;但是状态的大小受限于集群中可用的内存。RocksDB
可以根据可用的 disk 空间扩展,并且只有它支持增量 snapshot;然而,每个状态的读取和更新都需要(反)序列化,而且在 disk 上进行读操作的性能要比基于内存的 state backend 慢一个数量级。
在 Flink 1.13 版本中统一了 savepoints 的二进制格式;可以生成 savepoint 并且之后使用另一种 state backend 读取它。
4.设置 State Backend
默认使用 jobmanager 做为 state backend,每一个 Job 的 state backend 配置会覆盖默认的 state backend 配置。
设置每个 Job 的 State Backend
StreamExecutionEnvironment
可以对每个 Job 的 State Backend 进行设置,如下所示:
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
env.configure(config);
在 IDE 中使用
EmbeddedRocksDBStateBackend
,需要添加以下依赖到 Flink 项目中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.19.0</version>
<scope>provided</scope>
</dependency>
5.设置默认的(全局的) State Backend
在 Flink 配置文件中,通过键
state.backend.type
设置默认的 State Backend。
可选值包括 jobmanager (HashMapStateBackend),rocksdb (EmbeddedRocksDBStateBackend), 或使用实现了 state backend 工厂 StateBackendFactory 的类的全限定类名, 例如: EmbeddedRocksDBStateBackend 对应为
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory
。
**
state.checkpoints.dir
选项指定了所有 State Backend 写 CheckPoint 数据和写元数据文件的目录**。
配置文件的部分示例如下所示:
# 存储 operator state 快照的 State Backend
state.backend: hashmap
# 存储快照的目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
6.RocksDB State Backend 进阶
a)增量快照
RocksDB 支持增量快照,不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,可以显著减少快照完成的耗时。
一个增量快照是基于(通常多个)前序快照构建的,由于 RocksDB 内部存在 compaction 机制对 sst 文件进行合并,Flink 的增量快照也会定期重新设立起点(rebase),因此增量链条不会一直增长,旧快照包含的文件也会逐渐过期并被自动清理。
- 和基于全量快照的恢复时间相比,如果网络带宽是瓶颈,那么基于增量快照恢复可能会消耗更多时间,因为增量快照包含的 sst 文件之间可能存在数据重叠导致需要下载的数据量变大;
- 而当 CPU 或者 IO 是瓶颈的时候,基于增量快照恢复会更快,因为从增量快照恢复不需要解析 Flink 的统一快照格式来重建本地的 RocksDB 数据表,而是可以直接基于 sst 文件加载。
状态数据量很大时,推荐使用增量快照,但这并不是默认的快照机制,需要通过配置手动开启该功能:
- 在 Flink 配置文件中设置:
state.backend.incremental: true
或者 - 在代码中按照右侧方式配置(来覆盖默认配置):
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
注意:一旦启用了增量快照,网页上展示的
Checkpointed Data Size
只代表增量上传的数据量,而不是一次快照的完整数据量。
b)内存管理
Flink 致力于控制整个进程的内存消耗,以确保 Flink 任务管理器(TaskManager)有良好的内存使用,保证既不会在容器(Docker/Kubernetes, Yarn等)环境中由于内存超用被杀掉,也不会因为内存利用率过低导致不必要的数据落盘或是缓存命中率下降,致使性能下降。
因此,Flink 默认将 RocksDB 的可用内存配置为任务管理器的单槽(per-slot)托管内存量;即大多数应用程序不需要调整 RocksDB 配置,简单的增加 Flink 的托管内存即可改善内存相关性能问题。
也可以选择不使用 Flink 自带的内存管理,而是手动为 RocksDB 的每个列族(ColumnFamily)分配内存(每个算子的每个 state 都对应一个列族);提供了对 RocksDB 进行更细粒度控制的途径,但是需要自行保证总内存消耗不会超过(尤其是容器)环境的限制。
RocksDB 使用托管内存
默认打开,可以通过
state.backend.rocksdb.memory.managed
控制。
Flink 并不直接控制 RocksDB 的 native 内存分配,而是通过配置 RocksDB 来确保其使用的内存正好与 Flink 的托管内存预算相同;这是在任务槽(per-slot)级别上完成的(托管内存以任务槽为粒度计算)。
为了设置 RocksDB 实例的总内存使用量,Flink 对同一个任务槽上的所有 RocksDB 实例使用共享的 cache 以及 write buffer manager;共享 cache 将对 RocksDB 中内存消耗的三个主要来源(块缓存、索引和bloom过滤器、MemTables)设置上限。
Flink还提供了两个参数来控制写路径(MemTable)和读路径(索引及过滤器,读缓存)之间的内存分配;当看到 RocksDB 由于缺少写缓冲内存(频繁刷新)或读缓存未命中而性能不佳时,可以使用这些参数调整读写间的内存分配。
state.backend.rocksdb.memory.write-buffer-ratio
,默认值0.5
,即 50% 的给定内存会分配给写缓冲区使用。state.backend.rocksdb.memory.high-prio-pool-ratio
,默认值0.1
,即 10% 的 block cache 内存会优先分配给索引及过滤器,强烈建议不要将此值设置为零,以防止索引和过滤器被频繁踢出缓存而导致性能问题;此外默认将L0级的过滤器和索引固定到缓存中以提高性能。
注意 上述机制开启时将覆盖用户在
PredefinedOptions 和 RocksDBOptionsFactory
中对 block cache 和 write buffer 进行的配置。
注意仅面向专业用户:若要手动控制内存,可以将
state.backend.rocksdb.memory.managed
设置为
false
,并通过
ColumnFamilyOptions
配置 RocksDB;或者复用上述 cache/write-buffer-manager 机制,但将内存大小设置为与 Flink 的托管内存大小无关的固定大小(通过
state.backend.rocksdb.memory.fixed-per-slot
/
state.backend.rocksdb.memory.fixed-per-tm
选项)在这两种情况下,用户都需要确保在 JVM 之外有足够的内存可供 RocksDB 使用。
c)计时器(内存 vs. RocksDB)
当选择 RocksDB 作为 State Backend 时,默认情况下计时器也存储在 RocksDB 中;这是一种健壮且可扩展的方式,允许应用程序使用很多个计时器。
另一方面,在 RocksDB 中维护计时器会有一定的成本,因此 Flink 也提供了将计时器存储在 JVM 堆上而使用 RocksDB 存储其他状态的选项,当计时器数量较少时,基于堆的计时器可以有更好的性能。
**通过将
state.backend.rocksdb.timer-service.factory
配置项设置为
heap
(而不是默认的
rocksdb
)来将计时器存储在堆上。**
注意在 RocksDB state backend 中使用基于堆的计时器的组合当前不支持计时器状态的异步快照,其他状态(如 keyed state)可以被异步快照。
d)开启 RocksDB 原生监控指标
可以使用 Flink 的监控指标系统来汇报 RocksDB 的原生指标,也可以选择性的指定特定指标进行汇报。
注意: 启用 RocksDB 的原生指标可能会对应用程序的性能产生负面影响。
列族(ColumnFamily)级别的预定义选项
注意 在引入 RocksDB 使用托管内存 功能后,此机制应限于在专家调优或故障处理中使用。
使用预定义选项,可以在每个 RocksDB 列族上应用一些预定义的配置,例如配置内存使用、线程、Compaction 设置等;目前每个算子的每个状态都在 RocksDB 中有专门的一个列族存储。
选择要应用的预定义选项:
- 通过
state.backend.rocksdb.predefined-options
配置项将选项名称设置进 Flink 配置文件。 - 通过程序设置:
EmbeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)
。
该选项的默认值是
DEFAULT
,对应
PredefinedOptions.DEFAULT
。
从 Flink 配置文件中读取列族选项
RocksDB State Backend 会将【Advanced RocksDB State Backends Options】的所有配置项全部加载;可以通过关闭 RocksDB 使用托管内存的功能并将需要的设置选项加入配置文件来配置底层的列族选项。
通过 RocksDBOptionsFactory 配置 RocksDB 选项
注意 在引入 RocksDB 使用托管内存功能后,此机制应限于在专家调优或故障处理中使用。
通过配置一个
RocksDBOptionsFactory
来手动控制 RocksDB 的选项,您可以对列族的设置进行细粒度控制,例如内存使用、线程、Compaction 设置等,目前每个算子的每个状态都在 RocksDB 中有专门的一个列族存储。
**将
RocksDBOptionsFactory
传递给 RocksDB State Backend**:
- 通过
state.backend.rocksdb.options-factory
选项将工厂实现类的名称设置到 Flink 配置文件。 - 通过程序设置,例如
EmbeddedRocksDBStateBackend.setRocksDBOptions(new MyOptionsFactory());
。
注意 通过程序设置的
RocksDBOptionsFactory
将覆盖 Flink 配置文件的设置,且
RocksDBOptionsFactory
设置的优先级高于预定义选项(
PredefinedOptions
)。
注意 RocksDB 是一个本地库,它直接从进程分配内存, 而不是从JVM分配内存,分配给 RocksDB 的任何内存都必须被考虑在内,通常需要将这部分内存从任务管理器(
TaskManager
)的JVM堆中减去;否则可能会导致JVM进程由于分配的内存超过申请值而被 YARN 等资源管理框架终止。
自定义
ConfigurableRocksDBOptionsFactory
示例 (需要将实现类全名设置到
state.backend.rocksdb.options-factory
)
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
public static final ConfigOption<Integer> BLOCK_RESTART_INTERVAL = ConfigOptions
.key("my.custom.rocksdb.block.restart-interval")
.intType()
.defaultValue(16)
.withDescription(
" Block restart interval. RocksDB has default block restart interval as 16. ");
private int blockRestartInterval = BLOCK_RESTART_INTERVAL.defaultValue();
@Override
public DBOptions createDBOptions(DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
return currentOptions
.setIncreaseParallelism(4)
.setUseFsync(false);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
return currentOptions.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockRestartInterval(blockRestartInterval));
}
@Override
public RocksDBOptionsFactory configure(ReadableConfig configuration) {
this.blockRestartInterval = configuration.get(BLOCK_RESTART_INTERVAL);
return this;
}
}
7.开启 Changelog
a)介绍
Changelog 旨在减少 checkpointing 的时间,也可以减少 exactly-once 模式下的端到端延迟。
一般 checkpoint 的持续时间受如下因素影响:
- Barrier 到达和对齐时间,可以通过 Unaligned checkpoints 和 Buffer debloating 解决。
- 快照制作时间(所谓同步阶段),可以通过异步快照解决。
- 快照上传时间(异步阶段),可以用增量 checkpoints 来减少上传时间;但是,大多数支持增量 checkpoint 的状态后端会定期执行合并类型的操作,这会导致除了新的变更之外还要重新上传旧状态;在大规模部署中,每次 checkpoint 中至少有一个 task 上传大量数据的可能性往往非常高。
开启 Changelog 功能之后,Flink 会不断上传状态变更并形成 changelog;创建 checkpoint 时,只有 changelog 中的相关部分需要上传;而配置的状态后端则会定期在后台进行快照,快照成功上传后,相关的changelog 将会被截断。
基于此,异步阶段的持续时间减少(另外因为不需要将数据刷新到磁盘,同步阶段持续时间也减少了),特别是长尾延迟得到了改善,同时,还可以获得以下好处:
- 更稳定、更低的端到端时延。
- Failover 后数据重放更少。
- 资源利用更加稳定。
但是,资源使用会变得更高:
- 将会在 DFS 上创建更多文件
- 将使用更多的 IO 带宽用来上传状态变更
- 将使用更多 CPU 资源来序列化状态变更
- Task Managers 将会使用更多内存来缓存状态变更
虽然 Changelog 增加了少量的日常 CPU 和网络带宽资源使用, 但会降低峰值的 CPU 和网络带宽使用量。
恢复时间取决于
state.backend.changelog.periodic-materialize.interval
的设置,changelog 可能会变得冗长,因此重放会花费更多时间;但是恢复时间加上 checkpoint 持续时间仍然可能低于不开启 changelog 功能的时间,从而在故障恢复的情况下也能提供更低的端到端延迟;当然,取决于上述时间的实际比例,有效恢复时间也有可能会增加。
b)安装
标准的 Flink 发行版包含 Changelog 所需要的 JAR包,请确保添加所需的文件系统插件。
c)配置
YAML 中的示例配置:
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem # 当前只支持 filesystem 和 memory(仅供测试用)
dstl.dfs.base-path: s3://<bucket-name> # 类似于 state.checkpoints.dir
请将如下配置保持默认值:
execution.checkpointing.max-concurrent-checkpoints: 1
通过编程方式为每个作业开启或关闭 Changelog:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableChangelogStateBackend(true);
d)监控
如果 task 因写状态变更而被反压,它将在 UI 中被显示为忙碌(红色)。
e)升级现有作业
开启 Changelog
支持从 savepoint 或 checkpoint 恢复:
- 给定一个没有开启 Changelog 的作业
- 创建一个 savepoint 或一个 checkpoint
- 更改配置(开启 Changelog)
- 从创建的 snapshot 恢复
关闭 Changelog
支持从 savepoint 或 checkpoint 恢复:
- 给定一个开启 Changelog 的作业
- 创建一个 savepoint 或一个 checkpoint
- 更改配置(关闭 Changelog)
- 从创建的 snapshot 恢复
f)限制
- 最多同时创建一个 checkpoint
- 到 Flink 1.15 为止,只有
filesystem
changelog 实现可用 - 尚不支持 NO_CLAIM 模式
8.自旧版本迁移
a)概述
从 Flink 1.13 开始,社区改进了 state backend 的公开类,帮助理解本地状态存储和 checkpoint 存储;这个变化并不会影响 state backend 和 checkpointing 过程的运行时实现和机制,用户可以将现有作业迁移到新的 API,同时不会损失原有 state。
b)MemoryStateBackend
旧版本的
MemoryStateBackend
等价于使用 HashMapStateBackend 和 JobManagerCheckpointStorage。
使用 Flink 配置文件
state.backend: hashmap
# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager
代码配置
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
env.configure(config);
c)FsStateBackend
旧版本的 FsStateBackend 等价于使用 HashMapStateBackend 和 FileSystemCheckpointStorage。
使用 Flink 配置文件
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
代码配置
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///checkpoint-dir");
env.configure(config);
// Advanced FsStateBackend configurations, such as write buffer size
// can be set manually by using CheckpointingOptions.
config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 4 * 1024);
env.configure(config);
d)RocksDBStateBackend
旧版本的 RocksDBStateBackend 等价于使用 EmbeddedRocksDBStateBackend 和 FileSystemCheckpointStorage。
使用 Flink 配置文件
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
代码配置
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///checkpoint-dir");
env.configure(config);
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using CheckpointingOptions.
config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 4 * 1024);
env.configure(config);
9.总结
1.状态可以存储在 TM 的内存/rocksdb[文件系统]中,进行 Checkpoint 时可以存储在 JM [内存]/文件系统中;
2.开启 Changelog 可以减少 checkpointing 的时间,也可以减少 exactly-once 模式下的端到端延迟。
3.开启 ChangelogStateBackend 限制[最多同时创建一个 checkpoint\只有 filesystem changelog 可用\不支持 NO_CLAIM 模式]
4.选择 HashMapStateBackend 和 RocksDB 时,是在性能与可扩展性之间权衡:
- HashMapStateBackend 非常快,因为每个状态的读取和算子对于 objects 的更新都是在 Java 的 heap 上;但是状态的大小受限于集群中可用的内存。
- RocksDB 可以根据可用的 disk 空间扩展,并且只有它支持增量 snapshot;但每个状态的读取和更新都需要(反)序列化,而且在 disk 上进行读操作的性能要比基于内存的 state backend 慢一个数量级。
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。