Flink
实现另外一种
TypeInfomation
是
BasicArrayTypeInfo
,对应的是
Java
基本类型数组(装箱)或
String
对象的数组,如下代码通过使用
Array
数组和
List
集合创建
DataStream
数据集。
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
//通过 List 集合创建数据集
DataStreamSource<Integer> integerDataStreamSource1 = env.fromCollection(integers);
Java Tuples类型
通过定义
TupleTypeInfo
来描述
Tuple
类型数据,
Flink
在
Java
接口中定义了元祖类
Tuple
供用户使用。
Flink Tuples
是固定长度固定类型的
Java Tuple
实现,不支持空值存储。目前支持任意的
Flink Java Tuple
类型字段数量上限为
25
,如果字段数量超过上限,可以通过继承
Tuple
类的方式进行拓展。如下代码所示,创建
Tuple
数据类型数据集。
//通过实例化 Tuple2 创建具有两个元素的数据集
DataStreamSource<Tuple2<String, Integer>> tuple2DataStreamSource = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
//通过实例化 Tuple3 创建具有三个元素的数据集
DataStreamSource<Tuple3<String, Integer, Long>> tuple3DataStreamSource = env.fromElements(new Tuple3<>("a", 1, 3L), new Tuple3<>("b", 2, 3L));
Scala Case Class类型
Flink
通过实现
CaseClassTypeInfo
支持任意的
Scala Case Class
,包括
Scala tuples
类型,支持的字段数量上限为
22
,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义
WordCount Case Class
数据类型,然后通过
fromElements
方法创建
input
数据集,调用
keyBy()
方法对数据集根据 word字段重新分区。
//定义 WordCount Case Class 数据结构
case class WordCount(word: Sring, count: Int)
//通过 fromElements 方法创建数据集
val input = env.fromElements(WordCount("hello", 1),WordCount("word",2))
val keyStream1 = input.keyBy("word")//根据word字段为分区字段,
val keyStream2 = input.keyBy(0)//也可以通过制定position分区
通过使用
Scala Tuple
创建
DataStream
数据集,其他的使用方式和
Case Class
相似。需要注意的是,如果根据名称获取字段,可以使用
Tuple
中的默认字段名称。
//通过实例化Scala Tuple2 创建具有两个元素的数据集
val tupleStream: DataStream[Tuple2[String,Int]] = env.fromElements(("a",1),("b",2));
//使用默认名字段获取字段,表示第一个 tuple字段,相当于下标0
tuple2DataStreamSource.keyBy("\_1");
POJOs 类型
POJOs
类可以完成复杂数据结构的定义,
Flink
通过实现
PojoTypeInfo
来描述任意的
POJOs
,包括
Java
和
Scala
类。在
Flink
中使用
POJOs
类可以通过字段名称获取字段,例如
dataStream.join(otherStream).where("name").equalTo("personName")
,对于用户做数据处理则非常透明和简单,如代码所示。如果在
Flink
中使用
POJOs
数据类型,需要遵循以下要求:
【1】
POJOs
类必须是
Public
修饰且必须独立定义,不能是内部类;
【2】
POJOs
类中必须含有默认空构造器;
【3】
POJOs
类中所有的
Fields
必须是
Public
或者具有
Public
修饰的
getter
和
setter
方法;
【4】
POJOs
类中的字段类型必须是
Flink
支持的。
//类和属性具有 public 修饰
public class Persion{
public String name;
public Integer age;
//具有默认的空构造器
public Persion(){}
public Persion(String name,Integer age){
this.name = name;
this.age = age;
};
}
定义好
POJOs Class
后,就可以在 Flink环境中使用了,如下代码所示,使用
fromElements
接口构建
Person
类的数据集。
POJOs
类仅支持字段名称指定字段,如代码中通过
Person name
来指定
Keyby
字段。
DataStreamSource<Persion> persionDataStreamSource = env.fromElements(new Persion("zzx", 18), new Persion("fj", 16));
persionData.keyBy("name").sum("age");
Flink Value类型
Value
数据类型实现了
org.apache.flink.types.Value
,其中包括
read()
和
write()
两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前
Flink
提供了內建的
Value
类型有
IntValue、DoubleValue
以及
StringValue
等,用户可以结合原生数据类型和
Value
类型使用。
特殊数据类型
在
Flink
中也支持一些比较特殊的数据数据类型,例如
Scala
中的
List
、
Map
、
Either
、
Option
、
Try
数据类型,以及
Java中Either
数据类型,还有
Hadoop
的
Writable
数据类型。如下代码所示,创建
Map
和
List
类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像
POJOs
类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助
Types Hint
帮助
Flink
推断数据类型信息,关于
Tyeps Hmt
介绍可以参考下一小节。
//创建 map 类型数据集
Map map = new HashMap<>();
map.put("name","zzx");
map.put("age",12);
env.fromElements(map);
//创建 List 类型数据集
env.fromElements(Arrays.asList(1,2,3,4,5),Arrays.asList(3,4,5));
TypeInformation信息获取: 通常情况下
Flink
都能正常进行数据类型推断,并选择合适的
serializers
以及
comparators
。但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,使得
Flink
并不能很容易地获取到数据集中的数据类型信息。同时在
Scala API
和
Java API
中,
Flink
分别使用了不同的方式重构了数据类型信息。
Scala API类型信息
Scala API
通过使用
Manifest
和类标签,在编译器运行时获取类型信息,即使是在函数定义中使用了泛型,也不会像
Java API
出现类型擦除的问题,这使得
Scala API
具有非常精密的类型管理机制。同时在Flink中使用到
Scala Macros
框架,在编译代码的过程中推断函数输入参数和返回值的类型信息,同时在
Flink
中注册成
TypeInformation
以支持上层计算算子使用。
当使用
Scala API
开发
Flink
应用,如果使用到
Flink
已经通过
TypeInformation
定义的数据类型,
TypeInformation
类不会自动创建,而是使用隐式参数的方式引入,代码不会直接抛出编码异常,但是当启动
Flink
应用程序时就会报
could not find implicit value for evidence parameter of type TypeInformation
的错误。这时需要将
TypeInformation
类隐式参数引入到当前程序环境中,代码实例如下:
import org.apache.flink.api.scala._
Java API类型信息
由于
Java
的泛型会出现类型擦除问题,
Flink
通过
Java
反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。同时类型推断在当输出类型依赖于输入参数类型时相对比较容易做到,但是如果函数的输出类型不依赖于输入参数的类型信息,这个时候就需要借助于类型提示
Ctype Himts
来告诉系统函数中传入的参数类型信息和输出参数信息。如代码清单通过在
returns
方法中传入
TypeHint
实例指定输出参数类型,帮助
Flink
系统对输出类型进行数据类型参数的推断和收集。
//定义泛型函数,输入参数 T,O 输出参数为 O
class MyMapFucntion<T,O> implements MapFunction<T,O>{
@Override
public O map(T t) throws Exception {
//定义计算逻辑
return null;
}
}
//通过 List 集合创建数据集
DataStreamSource<Integer> input = env.fromCollection(integers);
input.flatMap(new MyMapFucntion<String,Integer>()).returns(new TypeHint<Integer>() {//通过returns方法指定返回参数类型
})
在使用
Java API
定义
POJOs
类型数据时,
PojoTypeInformation
为
POJOs
类中的所有字段创建序列化器,对于标准的类型,例如
Integer
、
String
、
Long
等类型是通过
Flink
自带的序列化器进行数据序列化,对于其他类型数据都是直接调用
Kryo
序列化工具来进行序列化。通常情况下,如果
Kryo
序列化工具无法对
POJOs
类序列化时,可以使用
Avro
对
POJOs
类进行序列化,如下代码通过在
ExecutionConfig
中调用
enableForceAvro()
来开启
Avro
序列化。
//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 avro 序列化
env.getConfig().enableForceAvro();
如果用户想使用
Kryo
序列化工具来序列化
POJOs
所有字段,则在
ExecutionConfig
中调用
enableForceKryo()
来开启
Kryo
序列化。
版权归原作者 2401_86640747 所有, 如有侵权,请联系我们删除。