⭐简单说两句⭐
✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~作者:****小叮当撩代码,CSDN后端领域新星创作者 |阿里云专家博主
CSDN个人主页:小叮当撩代码
🔎GZH:
哆啦A梦撩代码
🎉欢迎关注🔎点赞👍收藏⭐️留言📝
Flink状态
Flink中的State
State概念
在 Flink 中,状态是流处理程序中非常重要的一部分,它允许你保存和访问数据,以实现复杂的计算逻辑。
可以简单理解为:历史计算结果
Flink中的算子任务的State分类通常分为两类
1️⃣ 有状态
有状态需要考虑历史的数据,相同的输入可能会得到不同的输出
比如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)
2️⃣ 无状态
无状态简单说就是不需要考虑历史的数据,相同的输入得到相同的结果
比如map、filter、flatmap算子都属于无状态,不需要依赖其他数据
✅ Flink默认已经支持了无状态和有状态计算!
状态分类
Flink中有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)
Managed State是由Flink管理的,Flink帮忙存储、恢复和优化
Raw State是开发者自己管理的,需要自己序列化
❇️通常情况下,我们采用托管状态来实现我们的需求!!!
托管状态
Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以Flink 能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。
很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。
🎨所以:我们又可以将托管状态分为两类:算子状态和按键分区状态。
键控状态Keyed State
Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。
需要注意的是键控状态只能在 KeyedStream 上进行使用,可以通过 stream.keyBy(…) 来得到 KeyedStream 。
Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):
ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。
ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。
ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。
AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。
FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。
MapState:维护 Map 类型的状态。
Code实操
例子1
使用KeyState中的ValueState来模拟实现maxBy
代码清单
importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.common.state.ValueState;importorg.apache.flink.api.common.state.ValueStateDescriptor;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author tiancx
*/publicclassStateMaxByDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//加载数据DataStream<Tuple2<String,Integer>> source = env.fromElements(Tuple2.of("北京",1),Tuple2.of("上海",2),Tuple2.of("广州",3),Tuple2.of("北京",4),Tuple2.of("上海",5),Tuple2.of("广州",6),Tuple2.of("北京",3)).keyBy(t -> t.f0);
source.map(newRichMapFunction<Tuple2<String,Integer>,Tuple3<String,Integer,Integer>>(){//定义状态,用于存储最大值ValueState<Integer> maxValueState =null;//进行初始化@Overridepublicvoidopen(Configuration parameters)throwsException{//创建状态描述器ValueStateDescriptor<Integer> descriptor =newValueStateDescriptor<>("maxValueState",Integer.class);
maxValueState =getRuntimeContext().getState(descriptor);}@OverridepublicTuple3<String,Integer,Integer>map(Tuple2<String,Integer> value)throwsException{//获取当前值Integer currentVal = value.f1;Integer currentMax = maxValueState.value();if(currentMax ==null|| currentVal > currentMax){
maxValueState.update(currentVal);}returnTuple3.of(value.f0, value.f1, maxValueState.value());}}).print();
env.execute();}}
运行看结果
例子2
如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]
代码清单
importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.api.common.state.ListState;importorg.apache.flink.api.common.state.ListStateDescriptor;importorg.apache.flink.api.common.state.ValueState;importorg.apache.flink.api.common.state.ValueStateDescriptor;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;importjava.util.List;/**
* @author tiancx
*/publicclassStateDemo01{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<String> stream = env.socketTextStream("localhost",9999);DataStream<Tuple2<String,Integer>> source = stream.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{String[] split = value.split(" ");returnTuple2.of(split[0],Integer.parseInt(split[1]));}}).keyBy(t -> t.f0);
source.flatMap(newRichFlatMapFunction<Tuple2<String,Integer>,Tuple2<String,List<Integer>>>(){ListState<Integer> listState =null;//存放超过38度的次数ValueState<Integer> valueState =null;@Overridepublicvoidopen(Configuration parameters)throwsException{ListStateDescriptor<Integer> listStateDescriptor =newListStateDescriptor<Integer>("listState",Integer.class);ValueStateDescriptor<Integer> descriptor =newValueStateDescriptor<>("valueState",Integer.class);
listState =getRuntimeContext().getListState(listStateDescriptor);
valueState =getRuntimeContext().getState(descriptor);}@OverridepublicvoidflatMap(Tuple2<String,Integer> value,Collector<Tuple2<String,List<Integer>>> out)throwsException{System.out.println("进入flatMap");Integer val = value.f1;if(valueState.value()==null){
valueState.update(0);}if(val >38){
listState.add(val);
valueState.update(valueState.value()+1);}if(valueState.value()>=3){List<Integer> list =(List<Integer>) listState.get();
out.collect(Tuple2.of(value.f0, list));
listState.clear();
valueState.clear();}}}).print();
env.execute();}}
输入
运行结果
算子状态OperatorState
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。
算 子 状 态 也 支 持 不 同 的 结 构 类 型 , 主 要 有 三 种 : ListState 、 UnionListState 和BroadcastState。
code实操
例子1:
在 map 算子中计算数据的个数
代码清单
importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.state.ListState;importorg.apache.flink.api.common.state.ListStateDescriptor;importorg.apache.flink.api.scala.typeutils.Types;importorg.apache.flink.runtime.state.FunctionInitializationContext;importorg.apache.flink.runtime.state.FunctionSnapshotContext;importorg.apache.flink.streaming.api.checkpoint.CheckpointedFunction;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author tiancx
*/publicclassOperatorListStateDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);DataStreamSource<String> stream = env.socketTextStream("localhost",9999);
stream.map(newMyCountMapFunction()).print();
env.execute();}publicstaticclassMyCountMapFunctionimplementsMapFunction<String,Long>,CheckpointedFunction{privateLong count =0L;privateListState<Long> listState;@OverridepublicLongmap(String value)throwsException{return++count;}/**
* 本地变量持久化:将 本地变量拷贝到算子状态中,开启checkpoint 时才会调用 snapshotState 方法
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/@OverridepublicvoidsnapshotState(FunctionSnapshotContext context)throwsException{System.out.println("MyCountMapFunction.snapshotState");
listState.clear();
listState.add(count);}/**
* 初始化本地变量:程序启动和恢复时,从状态中把数据添加到本地变量,每个子任务调用一次
*
* @param context the context for initializing the operator
* @throws Exception
*/@OverridepublicvoidinitializeState(FunctionInitializationContext context)throwsException{System.out.println("MyCountMapFunction.initializeState");//从上下文初始化状态
listState = context
.getOperatorStateStore().getListState(newListStateDescriptor<>("listState",Types.LONG()));//从算子状态中把数据拷贝到本地变量if(context.isRestored()){for(Long aLong : listState.get()){
count += aLong;}}}}}
输入
运行结果
【都看到这了,点点赞点点关注呗,爱你们】😚😚
💬
✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~
作者:****小叮当撩代码,CSDN后端领域新星创作者 |阿里云专家博主
CSDN个人主页:小叮当撩代码
🔎GZH:
哆啦A梦撩代码
🎉欢迎关注🔎点赞👍收藏⭐️留言📝
版权归原作者 小叮当撩编程 所有, 如有侵权,请联系我们删除。