一、什么是状态
无状态计算的例子: 例如一个加法算子,第一次输入
2+3=5
那么以后我多次数据
2+3
的时候得到的结果都是
5
。得出的结论就是,相同的输入都会得到相同的结果,与次数无关。
有状态计算的例子: 访问量的统计,我们都知道
Nginx
的访问日志一个请求一条日志,基于此我们就可以统计访问量。如下,
/api/a
这个
url
第一此访问的时候,返回的结果就是
count1
,但当第二次访问的时候,返回的结果变成了
2
。为什么
Flink
知道之前已经处理过一次
hello world
,这就是
state
发挥作用了,这里是被称为
keyed state
存储了之前需要统计的数据,
keyby
接口的调用会创建
keyed stream
对
key
进行划分,这是使用
keyed state
的前提。得出的结论就是,相同的输入得到不同的结果,与次数有关。这就是有状态的数据。
什么场景下会大量使用到这种状态数据啦?简单举几个例子:
【1】去重的需求中,比如说我们只想知道这
100
个同事都属于那几个部门的等等。
【2】窗口计算,已进入未触发的数据。比如,我们一分钟统计一次,
1-2
之间的
1.5
这个时候的数据对于
2
来说就是一个有状态的数据,因为
2
的结果与
1.5
有关。
【3】机器学习/深度学习,训练的模型及参数。这对于机器学习的同学深入感触。比如,第一次输入
hello
,机器会给我一个反馈,那么下次会基于这个反馈做进一步的学习处理。那么上一步的结果对于我而言就是一种有状态的输入。
【4】访问历史数据,需要与昨日进行对比。昨日的数据对于今日而言也属于一种状态。你品,你细品。
为什么要管理状态,用内存不香吗?首先流失作业是有它的标准的,不是什么东西随随便便就说自己这个是流失处理。首先,7*24小时运行,高可靠,你内存不行吧,你的容量总有用完的时候吧。其次,数据不丢失不重,恰好计算一次,你内存要实现需要备份和恢复,你还总伴随着小部分数据的丢失吧。最后,数据实时产生,不延迟,你内存不够横向扩展时,你需要延迟吧。
理想的状态管理就是下面描述的样子,
Flink
也都帮我们实现了。
二、状态的类型
Managed State & Raw State
Managed StateRaw State状态管理方式Flink Runtime 管理 —自动存储,自动恢复 —内存管理上有优化用户自己管理(Flink不知道你在State中存储的数据结构的) —要自己实例化状态数据结构已知的数据结构 —value,list,map…字节数据 —byte[]推荐使用场景大多数情况下均可使用自定义 Operator 时可以使用(当Managed State 不够时使用)
Managed Stated 分为:
Keyed Stated
和
Operator State
【1】Keyed Stated: 只能用于
keyBy
生成的
KeyedStream
上的算子。每一个
key
对应一个
State
,一个
Operator
实例处理多个
Key
,访问相应的多个
State
。相同
Key
会在相同的实例中处理。整个过程如果没有
keyBy
操作,它是没有
KeyedStream
的,而
Keyed Stated
只能应用在
KeyedStream
上。
并发改变:
State
随着
Key
在实例间迁移。例如:实例
A
中之前处理
KeyA
与
KeyB
,后面我扩展了实例
B
,那么 实例
A
就只需要处理
KeyA
,
KeyB
就交给 实例
B
进行处理。安装状态进行分离,可以理解为分布式。
通过 RuntimeContext 访问,说明
Operator
是一个
Rich Function
,否则是拿不到
RuntimeContext
。
支持的数据结构:
ValueState
、
ListState
、
ReducingState
、
AggregatingState
、
MapState
【2】Operator State: 可以用于所有的算子,常用于
source
上,例如
FlinkKafkaConsumer
。一个
Operator
实例对应一个
State
,所以一个
Operator
中会处理多个
key
,可以理解为集群。
并发改变:
Operator State
没有
key
,并发改变的时候就需要重新分配。内置了两种方案:均匀分配和合并后每个得到全量。
访问方式: 实现
CheckpointedFunction
或
ListCheckpointed
接口。
支持的数据结构:
ListState
三、Keyed State 使用示例
什么是 keyed state: 对于
keyed state
,有两个特点:
【1】只能应用于KeyedStream 的函数与操作中,例如
Keyed UDF
,
window state
;
【2】
keyed state
是已经分区 / 划分好的,每一个 key 只能属于某一个 keyed state;
对于如何理解已经分区的概念,我们需要看一下
keyby
的语义,大家可以看到下图左边有三个并发,右边也是三个并发,左边的词进来之后,通过
keyby
会进行相应的分发。例如对于
hello word
,
hello
这个词通过
hash
运算永远只会到右下方并发的
task
上面去。
什么是 operator state
【1】又称为
non-keyed state
,每一个
operator state
都仅与一个
operator
的实例绑定。
【2】常见的
operator state
是
source state
,例如记录当前
source
的
offset
再看一段使用
operator state
的
word count
代码:
这里的
fromElements
会调用
FromElementsFunction
的类,其中就使用了类型为
list state
的
operator state
。如下几种
Keyed State
之间的依赖关系,都是
state
的子类。它们的访问方式和数据结构都有一定的区别。
状态数据类型访问接口备注ValueState单个值[update(T) 修改/T value 获取]例如 WordCount 用 word 做 key,state就是单个的数值。这个单个也可以是字符串、对象等都有可能。访问方式只有上面两种。MapStateMapput(UK key, UV value) putAll(Map<UK,UV> map) remove(UK key) boolean contains(UK key) UV get(UK key) Iterable<Map.Entry> entries() Iterable<Map.Entry> iterator() Iterable keys() Iterable values()能够操作具体的对象的keyListStateListadd/ addAll(List) update(List) Iterable get()ReducingState单个值add/ addAll(List) update(List) T get()与 List 是同一个父类,这个add是直接将数据更新进了 Reducing的结果里面。举个例子,例如我们统计1分钟的结果,list是先将数据添加到list中,等到1分钟的时候全来出来统计。而 Reducing是来一条就统计一条结果。好处是节省内存。AggregatingState单个值add(IN)/OUT get()与 List 是同一个父类,与Reducing的不同是,Reducing输入和输出的类型都是相同的。而Aggregating 是可以不同的。例如,我要计算一个平局值,Reducing是算好返回,而Aggregating会返回总和和个数。
举个
ValueState
的案例
finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//获取数据流DataStream<Event> events = env.addSource(source);DataStream<Alert> alerts = events
// 生成 keyedStata 通过 sourceAddress.keyBy(Event::sourceAddress)// StateMachineMapper 状态机.flatMap(newStateMachineMapper());//我么看下状态机怎么写 实现 RichFlatMapFunction@SuppressWarnings("serial")staticclassStateMachineMapperextendsRichFlatMapFunction<Event,Alert>{privateValueState<LeaderLatch.State> currentState;@Overridepublicvoidopen(Configuration conf){// 获取一个 valueState
currentState =getRuntimeContext().getState(newValueStateDescriptor<>("state",State.class));}//来一条数据处理一条@OverridepublicvoidflatMap(Event evt,Collector<Alert> out)throwsException{// 获取 valueState state = currentState.value();if(state ==null){
state =State.Initial;//State 是本地的变量}// 把事件对状态的影响加上去,得到一个状态State nextState = state.transition(evt.type());//判断状态是否合法if(nextState ==State.InvalidTransition){//扔出去
out.collect(newAlert(evt.sourceAddress(), state, evt.type()));}//是否不能继续转化了,例如取消的订单elseif(nextState.isTerminal()){// 从 state 中清楚掉
currentState.clear();}else{// 修改状态
currentState.update(nextState);}}}
四、CheckPoint 与 state 的关系
Checkpoint
是从
source
触发到下游所有节点完成的一次全局操作。下图可以有一个对
Checkpoint
的直观感受,红框里面可以看到一共触发了
569K
次
Checkpoint
,然后全部都成功完成,没有
fail
的。
state 其实就是 Checkpoint 所做的主要持久化备份的主要数据,看下图的具体数据统计,其
state
也就
9kb
大小 。
五、状态如何保存和恢复
Checkpoint
定时制作分布式快照,对程序的状态进行备份。发生故障时,将整个作业的
Task
都回滚到最后一次成功
Checkpoint
中的状态,然后从保存的点继续处理。
必要条件: 数据源支持重发(如果不重发,丢失的消息就真的丢了)
一致性语义: 恰好一次(如果
p
相同,单线程,多个线程时,可能有的算子对其已经计算了一次了,有的没有就需要注意),至少一次。
// 获取运行环境finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//状态数据//两个checkpoint 触发间隔设置1S,越频繁追的数据就越少,io消耗也越大
env.enableCheckpointing(1000);//EXACTLY_ONCE语义说明 Checkpoint是要对替的,这样消息不会重复,也不会对丢。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//两个checkpoint 最少等待500ms 例如第一个checkpoint做了700ms按理300ms后就要做下一个checkpoint。但是它们之间的等待时间300ms<500ms 此时,就会延长200ms减少checkpoint过于频繁,影响业务。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//checkpoint多久超时,如果这个checkpoint在1分钟内还没做完,那就失败了
env.getCheckpointConfig().setCheckpointTimeout(60000);//同时最多有多少个checkpoint进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//当重新分配并发度,拆分task时,是否保存checkpoint。如果不保存就需要使用savepoint来保存数据,放到外部的介质中。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
Checkpoint vs Savepoint
CheckpointSavepoint触发管理方式由Flink自动触发并管理由用户手动触发并管理主要用途在 Task 发生异常时快速恢复,例如网络抖动导致的超时异常有计划的进行备份,使作业能停止后再恢复,例如修改代码、调整并发。特点轻量、自动从故障中服务、在作业停止后默认清除持久、以标准格式存储,允许代码或配置发生变化、手动触发 savepoint 恢复。
可选的状态存储方式:
【1】
MemoryStateBackend
:构造方法:
MemoryStateBackend(int maxStateSize,boolean asynchronousSnapshots)
存储方式:
State
:
TaskManager
内存。
Checkpoint
:
JobManager
内存。
容量限制: 单个
State maxStateSize
默认
5M
。
maxStateSize <= akka.framesize
默认
10M
。总大小不超过
JobManager
内存。
推荐使用场景: 本地测试,几乎无状态的作业,比如
ETL/JobManager
不容易挂,或影响不大的情况。不推荐在生产场景使用。
【2】FsStateBackend: 构造方法:
FsStateBackend(URL checkpointDataUri,boolean asynchronousSnapshots)
存储方式:
State
:
TaskManager
内存。
Checkpoint
:外部文件系统(本地或
HDFS
)。
容量限制: 单个
TaskManager
上
State
总量不超过它的内存。总大小不超过配置的文件系统容量(会定期清理)。
推荐使用场景: 常规使用状态的作业,例如分钟级窗口聚合、
join
。需要开启
HA
的作业。可以在生产环境使用。
【3】RocksDBStateBackend: 构造方法:
RocksDBStateBackend(URL checkpointDataUri,boolean enableIncrementalCheckpointing)
存储方式:
State
:
TaskManager
上的
KV
数据库(实际使用内存+磁盘)。
Checkpoint
:外部文件系统(本地或
HDFS
)。
容量限制: 单个
TaskManager
上
State
总量不超过它的内存+磁盘,单个
key
最大
2G
。总大小不超过配置的文件系统容量。
推荐使用场景: 超大状态的作业,例如天级窗口聚合。需要开启
HA
的作业。对状态读写性能要求比较高的作业。可以在生产环境使用。
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。