点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink 并行度
- Flink 并行度详解
- Flink 并行度 案例

状态类型
Flink根据是否需要保存中间结果,把计算分为有状态计算和无状态计算。
- 有状态计算:依赖之前或之后的事件
- 无状态计算:独立
根据数据结构不同,Flink定义了多种State,应用于不同的场景。
- ValueState:即类型为T的单值状态,这个状态与对应的Key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过 value() 方法获取状态值
- ListState:即Key上的状态值为一个列表,可以通过add方法往列表中附加值,也可以通过get()方法返回一个Iterable来遍历状态值
- ReducingState:这种状态通过用户传入的ReduceFunction,每次调用add方法添加值的时候,会调用ReduceFunction,最后合并到一个单一的状态值。
- FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态会在未来的Flink版本当中删除)
- MapState:即状态值为一个Map,用户通过put和putAll方法添加元素
State按照是否有Key划分为:
- KeyedState
- OperatorState
案例1 利用State求平均值
实现思路
- 读数据源
- 将数据源根据Key分组
- 按照Key分组策略,对流式数据调用状态化处理:实例化出一个状态实例,随着流式数据的到来更新状态,最后输出结果
编写代码
packageicu.wzk;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.api.common.state.ValueState;importorg.apache.flink.api.common.state.ValueStateDescriptor;importorg.apache.flink.api.common.typeinfo.TypeHint;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;publicclassFlinkStateTest01{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<Tuple2<Long,Long>> data = env
.fromElements(Tuple2.of(1L,3L),Tuple2.of(1L,5L),Tuple2.of(1L,7L),Tuple2.of(1L,4L),Tuple2.of(1L,2L));KeyedStream<Tuple2<Long,Long>,Long> keyed = data
.keyBy(newKeySelector<Tuple2<Long,Long>,Long>(){@OverridepublicLonggetKey(Tuple2<Long,Long> value)throwsException{return value.f0;}});SingleOutputStreamOperator<Tuple2<Long,Long>> flatMapped = keyed
.flatMap(newRichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Long>>(){privatetransientValueState<Tuple2<Long,Long>> sum;@OverridepublicvoidflatMap(Tuple2<Long,Long> value,Collector<Tuple2<Long,Long>> out)throwsException{Tuple2<Long,Long> currentSum = sum.value();if(currentSum ==null){
currentSum =Tuple2.of(0L,0L);}// 更新
currentSum.f0 +=1L;
currentSum.f1 += value.f1;System.out.println("currentValue: "+ currentSum);// 更新状态值
sum.update(currentSum);// 如果 count >= 5 清空状态值 重新计算if(currentSum.f0 >=5){
out.collect(newTuple2<>(value.f0, currentSum.f1 / currentSum.f0));
sum.clear();}}@Overridepublicvoidopen(Configuration parameters)throwsException{ValueStateDescriptor<Tuple2<Long,Long>> descriptor =newValueStateDescriptor<>("average",TypeInformation.of(newTypeHint<Tuple2<Long,Long>>(){}));
sum =getRuntimeContext().getState(descriptor);}});
flatMapped.print();
env.execute("Flink State Test");}}
运行结果

执行分析


Keyed State
表示和Key相关的一种State, 只能用于KeyedStream类型数据集对应的Function和Operator之上,KeyedState是OperatorState的特例,区别在于KeyedState事先按照Key对数据集进行了区分,每个KeyState仅对应一个Operator和Key的组合。
KeyedState可以通过KeyGroups进行管理,主要用于当算子并行度发生变化时,自动重新分布KeyedState数据。在系统运行过程中,一个Keyed算子实例可能运行一个或者多个KeyGroups的Keys。
Operator State
与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的Key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State 支持算子实例并行度发生变化时自动重新分配状态数据。
同时在Flink中KeyedState和OperatorState均具有两种形式,其中一种为托管状态(Managed State)形式,由FlinkRuntime中控制和管理状态数据,并将状态数据转换为内存HashTables或RocksDB的对象存储,然后将这些状态数据通过内部的接口持久话到CheckPoints中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发CheckPoint中,当从CheckPoint恢复任务时,算子自己再返序列化出状态的数据结构。
DataStreamAPI支持使用ManagedState和RawState两种状态形式,在Flink中推荐用户使用ManagedState管理状态数据,主要原因是ManagedState能够更好地支持状态数据的重平衡以及更加完善的内存管理。
状态描述

State既然是暴露给用户的,那么就需要有一些属性需要指定:
- State名称
- Value Serializer
- State Type Info
在对应的StateBackend中,会去调用对应的create方法获取到stateDescriptor中的值。
Flink通过StateDescriptor来定义一个状态,这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息,与上面的状态对应,从StateDescriptor派生ValueStateDescriptor、ListStateDescriptor等等
- ValueState getState(ValueStateDescriptor)
- ReducingState getReducingState(ReducingStateDescriptor)
- ListState getListState(ListStateDescriptor)
- FoldingState getFoldingState(FoldingStateDescriptor)
- MapState getMapState(MapStateDescriptot)
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。