0


flink支持的数据类型

Supported Data Types

Flink 对可以在

DataStream

中的元素类型施加了一些限制。这是因为在系统分析这些类型以确定高效的执行策略时需要考虑的因素。

数据类型分为七个不同的类别:

  • Java Tuples 和 Scala Case 类
  • Java POJOs(Plain Old Java Objects)
  • 基本类型(Primitive Types)
  • 普通类(Regular Classes)
  • 值类型(Values)
  • Hadoop Writables
  • 特殊类型(Special Types)

Tuples 和 Case 类

​ Tuples 是包含固定数量字段的复合类型,这些字段可以有不同的类型。Java API 提供了从 Tuple1 到 Tuple25 的类。Tuple 的每一个字段都可以是任意的 Flink 类型,包括更深层的 Tuples,从而形成嵌套的 Tuples。Tuple 的字段可以通过字段名直接访问,如

tuple.f4

,或者使用通用的获取方法

tuple.getField(int position)

。字段索引是从0开始的。需要注意的是,这与 Scala 的 Tuples 形成了对比,但与 Java 的一般索引方式更加一致。

以下是一个示例代码,展示了如何使用

Tuple2

创建一个

DataStream

并对其进行操作:

// 创建一个包含字符串及其对应计数的DataStreamDataStream<Tuple2<String,Integer>> wordCounts = env.fromElements(newTuple2<>("hello",1),// 使用字符串字面量初始化Tuple2newTuple2<>("world",2));// 映射操作,仅保留每个Tuple中的整数值
wordCounts.map(newMapFunction<Tuple2<String,Integer>,Integer>(){@OverridepublicIntegermap(Tuple2<String,Integer> value)throwsException{return value.f1;// 访问Tuple中的第二个元素}});// 根据Tuple中的字符串值进行键控
wordCounts.keyBy(value -> value.f0);// 使用Lambda表达式访问Tuple中的第一个元素

这段代码创建了一个包含单词和它们的计数的

DataStream

,然后通过映射函数来提取每个

Tuple2

中的整数值,并根据

Tuple2

中的字符串部分对流进行键控。

POJOs(Plain Old Java Objects)

如果某些 Java 和 Scala 类满足以下要求,Flink 将把它们视为特殊的 POJO 数据类型:

  • 该类必须是公共的(public)。
  • 必须有一个无参数的公共构造器(默认构造器)。
  • 所有字段要么是公共的(public),要么必须通过 getter 和 setter 方法可访问。对于名为 foo 的字段,getter 和 setter 方法必须分别命名为 getFoo() 和 setFoo()。
  • 字段的类型必须由已注册的序列化器支持。

​ POJOs 通常使用 PojoTypeInfo 表示,并且使用 PojoSerializer 进行序列化(使用 Kryo 作为可配置的回退选项)。例外情况是当 POJO 实际上是 Avro 类型(Avro Specific Records)或作为“Avro Reflect Types”生成时,在这种情况下,POJO 使用 AvroTypeInfo 表示并通过 AvroSerializer 序列化。如果你需要的话,也可以注册自己的自定义序列化器;更多信息请参见序列化章节。

​ Flink 会分析 POJO 类型的结构,即了解 POJO 的字段。因此,POJO 类型比一般类型更容易使用。此外,Flink 处理 POJOs 的效率也高于处理一般类型。

​ 你可以通过 flink-test-utils 中的

org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo()

来测试你的类是否符合 POJO 的要求。如果你想进一步确保 POJO 的任何字段都不会使用 Kryo 进行序列化,则应使用

assertSerializedAsPojoWithoutKryo()

下面的例子展示了一个带有两个公共字段的简单 POJO。

publicclassWordWithCount{publicString word;publicint count;publicWordWithCount(){}// 默认构造函数publicWordWithCount(String word,int count){// 构造函数this.word = word;this.count = count;}}// 创建一个包含单词及其计数的DataStreamDataStream<WordWithCount> wordCounts = env.fromElements(newWordWithCount("hello",1),newWordWithCount("world",2));// 根据单词进行键控
wordCounts.keyBy(value -> value.word);

​ 在这个例子中,我们创建了一个名为

WordWithCount

的 POJO 类,它有两个字段:一个字符串类型的

word

和一个整型的

count

。我们还定义了两个构造函数:一个是默认的无参构造函数,另一个则接收

word

count

作为参数。最后,我们创建了一个

DataStream

,并根据

word

字段对数据进行了键控。

基本类型(Primitive Types)

Flink 支持所有 Java 和 Scala 的基本类型,例如 Integer、String 和 Double。

普通类类型(General Class Types)

​ Flink 支持大多数 Java 和 Scala 类(API 定义的类和用户自定义的类)。对于包含无法序列化的字段(如文件指针、I/O 流或其他本地资源)的类有限制。遵循 Java Beans 规范的类通常工作得很好。

​ 所有未被识别为 POJO 类型(见上述 POJO 要求)的类都被 Flink 当作普通类类型处理。Flink 把这些数据类型当作黑盒对待,并不能访问其内容(比如为了高效排序)。普通类型使用 Kryo 序列化框架进行序列化/反序列化。

值类型(Values)

​ 值类型描述了它们自身的序列化和反序列化过程。它们通过实现 org.apache.flink.types.Value 接口中的 read 和 write 方法来提供特定于用途的序列化逻辑,而不是通过通用的序列化框架。当通用序列化极为低效时,使用值类型是合理的。例如,一种实现了稀疏向量作为数组的数据类型,由于数组大部分是零,可以为非零元素提供特殊的编码,而通用序列化会简单地写入所有数组元素。

​ org.apache.flink.types.CopyableValue 接口以类似的方式支持手动内部克隆逻辑。

​ Flink 预定义了对应于基本数据类型的值类型(ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。这些值类型作为基本数据类型的可变变体起作用:它们的值可以被改变,允许程序员重用对象,减轻垃圾收集器的压力。

Hadoop 可写类型(Hadoop Writables)

你可以使用实现了 org.apache.hadoop.Writable 接口的类型。在 write() 和 readFields() 方法中定义的序列化逻辑将用于序列化。

特殊类型(Special Types)

你可以使用一些特殊类型,包括 Scala 中的 Either、Option 和 Try。Java API 有自己的 Either 类型的实现。类似于 Scala 的 Either,它代表两种可能的类型之一,Left 或 Right。Either 可以用于错误处理或需要输出两种不同记录类型的运算符。

类型擦除与类型推断(Type Erasure & Type Inference)

注意:这一节只对 Java 相关。

Java 编译器在编译后会丢弃大量的泛型信息。这在 Java 中被称为类型擦除。这意味着在运行时,对象实例不再知道它的泛型类型。例如,DataStream 和 DataStream 的实例对于 JVM 来说看起来是一样的。

Flink 在准备程序执行(当调用程序的主方法时)时需要类型信息。Flink 的 Java API 试图通过各种方式重构被丢弃的类型信息,并将其显式地存储在数据集和运算符中。你可以通过 DataStream.getType() 获取类型。这个方法返回一个 TypeInformation 的实例,这是 Flink 内部表示类型的一种方式。

​ 类型推断有一定的局限性,并且在某些情况下需要程序员的“配合”。例如,在从集合创建数据集的方法中,如 StreamExecutionEnvironment.fromCollection(),你可以传递一个描述类型的参数。但是,像 MapFunction<I, O> 这样的泛型函数可能也需要额外的类型信息。

输入格式和函数可以通过实现 ResultTypeQueryable 接口来显式地告诉 API 它们的返回类型。函数被调用时的输入类型通常可以从前面操作的结果类型中推断出来。

Flink 中的类型处理

Flink 试图推断出分布式计算过程中交换和存储的数据类型的大量信息。可以把这想象成数据库推断表的模式。在大多数情况下,Flink 无缝地自行推断所有必要的信息,拥有类型信息使得 Flink 能够做一些很酷的事情:

  • Flink 对数据类型了解得越多,序列化和数据布局方案就越优化。这对于 Flink 的内存使用范式非常重要(尽可能在堆内/堆外对序列化数据进行操作,并使序列化非常廉价)。
  • 最后,这也使得用户在大多数情况下不必担心序列化框架或不得不注册类型。

​ 一般来说,在预编译阶段(即程序对 DataStream 的调用发生时,以及任何执行 execute()、print()、count() 或 collect() 的调用之前)就需要数据类型的这些信息。

​ 这意味着当你构建 DataStream 以及对其应用转换操作时,Flink 会尝试理解你正在处理的数据类型。这种类型信息的推断发生在实际执行之前,也就是说在你提交作业给 Flink 运行环境之前,Flink 会根据你的程序逻辑来决定如何处理数据。

​ Flink 的这种类型系统不仅有助于优化内存管理,还有助于优化整个数据处理流程,包括但不限于数据的序列化、反序列化、以及如何有效地在各个节点之间传输数据。这使得开发者能够专注于业务逻辑,而不必过多关注底层的技术细节。

最常见的问题

​ 用户最常遇到需要与 Flink 的数据类型处理交互的问题包括:

注册子类型:如果函数签名仅描述了超类型,但在执行期间实际上使用了这些类型的子类型,那么让 Flink 意识到这些子类型可能会大大提高性能。为此,需要对

StreamExecutionEnvironment

上的每个子类型调用

.registerType(clazz)

importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.api.common.typeinfo.Types;publicclassRegisterSubTypesExample{publicstaticvoidmain(String[] args)throwsException{finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 注册子类型
        env.registerType(Dog.class);
        env.registerType(Cat.class);
        env.registerType(Bird.class);// 创建包含子类型实例的DataStreamDataStream<Animal> animals = env.fromElements(newDog("Rex","German Shepherd"),newCat("Whiskers","Black"),newBird("Polly",true));// 打印动物的名字
        animals.map(animal -> animal.getName()).print();// 触发执行
        env.execute("Register Subtypes Example");}}

注册自定义序列化器:对于 Flink 自身无法透明处理的类型,默认情况下 Flink 会回退到 Kryo 进行序列化。然而,并不是所有的类型都能被 Kryo 无缝处理(也因此影响到了 Flink)。例如,默认情况下许多 Google Guava 集合类型并不能很好地工作。解决办法是为引起问题的类型注册额外的序列化器。调用

StreamExecutionEnvironment

.getConfig().addDefaultKryoSerializer(clazz, serializer)

来完成这项工作。许多库提供了额外的 Kryo 序列化器。有关如何使用自定义序列化器的更多详细信息,请参阅自定义序列化器部分。

添加类型提示:有时即使使用了各种技巧,Flink 也无法推断出泛型类型,在这种情况下,用户必须传递一个类型提示。这种情况通常只会在 Java API 中出现。类型提示部分对此进行了更详细的描述。

手动创建 TypeInformation:在某些 API 调用中,由于 Java 的泛型类型擦除,Flink 无法推断数据类型,这时可能需要手动创建 TypeInformation。有关详细信息,请参阅创建 TypeInformation 或 TypeSerializer 的部分。

​ 以上这些步骤可以帮助解决 Flink 在处理数据类型时遇到的一些常见问题,尤其是当涉及到自定义类、复杂数据结构或者当 Flink 无法自动推断类型信息时。正确处理这些问题可以显著提高应用程序的性能和可靠性。

Flink’s TypeInformation class

​ Flink 的

TypeInformation

类是所有类型描述符的基础类。它揭示了类型的一些基本属性,并能为类型生成序列化器,在某些特化中还能生成比较器(请注意,Flink 中的比较器不仅仅是定义顺序,它们基本上是处理键的工具)。

内部地,Flink 对类型进行了如下区分:

  • 基本类型:所有的 Java 原生类型及其装箱形式,加上 void、String、Date、BigDecimal 和 BigInteger。
  • 原生数组和对象数组
  • 复合类型
  • Flink Java Tuples(作为 Flink Java API 的一部分):最多支持 25 个字段,不支持 null 字段
  • Scala case 类(包括 Scala tuples):不支持 null 字段
  • Row:具有任意数量字段的元组,并支持 null 字段
  • POJOs:遵循某种类似 Bean 的模式的类
  • 辅助类型(Option、Either、Lists、Maps 等)
  • 泛型:这些不会被 Flink 自身序列化,而是由 Kryo 序列化POJOs 特别值得关注,因为它们支持创建复杂类型。它们对运行时也是透明的,并且可以被 Flink 非常高效地处理。

POJO 类型的规则

如果一个数据类型满足以下条件,Flink 将识别它为 POJO 类型(并允许按名称引用字段):

  • 类是公共的(public)并且独立的(没有非静态内部类)
  • 类具有一个无参数的公共构造器
  • 类(及所有父类)中所有非静态、非瞬态的字段要么是公共的(public,且非 final),要么有遵循 Java Beans 命名规范的公共 getter 和 setter 方法。

需要注意的是,如果用户定义的数据类型不能被识别为 POJO 类型,则必须作为 GenericType 处理,并使用 Kryo 进行序列化。

创建

TypeInformation

TypeSerializer

为了为一个类型创建

TypeInformation

对象,可以使用特定语言的方法:

Java

由于 Java 通常会擦除泛型类型信息,因此你需要将类型传递给

TypeInformation

的构造函数:

对于非泛型类型,可以直接传递

Class

TypeInformation<String> info =TypeInformation.of(String.class);

对于泛型类型,需要通过

TypeHint

来“捕获”泛型信息:

TypeInformation<Tuple2<String,Double>> info =TypeInformation.of(newTypeHint<Tuple2<String,Double>>(){});

内部地,这创建了一个匿名子类来捕获泛型信息,并保持到运行时。

要创建一个

TypeSerializer

,只需在

TypeInformation

对象上调用

createSerializer(config)

config

参数类型为

ExecutionConfig

,持有程序注册的自定义序列化器的信息。尽量传递程序的实际

ExecutionConfig

。通常可以通过从

DataStream

调用

getExecutionConfig()

来获得它。在函数(如

MapFunction

)内部,可以通过将函数实现为

Rich Function

并调用

getRuntimeContext().getExecutionConfig()

来获取它。

通过以上方式,我们可以为特定的数据类型创建

TypeInformation

TypeSerializer

,以便更好地控制数据的序列化和反序列化过程,进而提升性能。

在 Flink 的 Java API 中,类型信息的处理

在一般情况下,Java 会擦除掉泛型类型信息。Flink 试图通过反射重构尽可能多的类型信息,利用 Java 保留下来的一点信息(主要是函数签名和子类信息)。此逻辑还包括了一些简单的类型推断机制,用于处理函数的返回类型依赖于其输入类型的情况:

publicclassAppendOne<T>implementsMapFunction<T,Tuple2<T,Long>>{publicTuple2<T,Long>map(T value){returnnewTuple2<>(value,1L);}}

在某些情况下,Flink 无法重构所有的泛型类型信息。在这种情况下,用户需要通过类型提示来帮助系统。

Java API 中的类型提示

在 Flink 无法重构擦除的泛型类型信息的情况下,Java API 提供了所谓的类型提示。类型提示告诉系统由某个函数产生的数据流或数据集的类型:

DataStream<SomeType> result = stream
    .map(newMyGenericNonInferrableFunction<Long,SomeType>()).returns(SomeType.class);
returns

语句指定了产生的类型,在这个例子中是通过一个类来指定的。类型提示支持通过以下方式定义类型:

  • 通过 Class,对于非参数化类型(没有泛型);
  • 通过 TypeHint 形式的 returns(new TypeHint<Tuple2<Integer, SomeType>>() {})TypeHint 类可以捕获泛型类型信息,并将其保存到运行时(通过一个匿名子类)。

Java 8 Lambda 的类型提取

​ 对于 Java 8 的 Lambda 表达式的类型提取不同于非 Lambda 表达式,因为 Lambda 表达式并没有与扩展了函数接口的实现类关联。

目前,Flink 尝试找出哪个方法实现了 Lambda,并使用 Java 的泛型签名来确定参数类型和返回类型。然而,并不是所有的编译器都会为 Lambda 生成这些签名(截至本文档编写时,只有 Eclipse JDT 编译器版本 4.5 及以后可靠地生成了这些签名)。

POJO 类型的序列化

 PojoTypeInfo

为 POJO 内的所有字段创建序列化器。标准类型如

int

,

long

,

String

等由随 Flink 发布的序列化器处理。对于其他所有类型,我们会回退到 Kryo。

​ 如果 Kryo 无法处理某种类型,你可以请求

PojoTypeInfo

使用 Avro 来序列化 POJO。要做到这一点,你需要调用:

finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();

​ 请注意,Flink 会自动使用 Avro 序列化器来序列化由 Avro 生成的 POJO。

​ 如果你希望整个 POJO 类型都由 Kryo 序列化器处理,可以设置:

finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();

​ 如果 Kryo 无法序列化你的 POJO,你可以向 Kryo 添加一个自定义序列化器,使用:

env.getConfig().addDefaultKryoSerializer(Class<?> type,Class<?extendsSerializer<?>> serializerClass);

有多种变体的方法可供选择。这些方法允许你指定自定义的序列化逻辑,从而更好地控制序列化过程。

禁用 Kryo 回退

有时候,程序可能希望明确避免使用 Kryo 作为泛型的回退序列化器。最常见的原因是想要确保所有类型都能通过 Flink 自带的序列化器或用户定义的自定义序列化器高效地进行序列化。

下面的设置会在遇到将通过 Kryo 进行序列化的数据类型时抛出异常:

env.getConfig().disableGenericTypes();

使用工厂定义类型信息

​ 类型信息工厂允许插件式地将用户定义的类型信息整合进 Flink 的类型系统中。你需要实现

org.apache.flink.api.common.typeinfo.TypeInfoFactory

接口以返回自定义的类型信息。在类型提取阶段,如果相应的类型注解了

@org.apache.flink.api.common.typeinfo.TypeInfo

注解,那么就会调用该工厂。

类型信息工厂可以在 Java 和 Scala API 中使用。

在一个类型层次结构中,最接近的工厂将会在向上遍历时被选中,不过内置工厂具有最高优先级。工厂也比 Flink 的内置类型有更高的优先级,因此你应该清楚自己在做什么。

下面的例子展示了如何在 Java 中使用工厂来为自定义类型

MyTuple

注解并提供自定义类型信息。

注解过的自定义类型:

@TypeInfo(MyTupleTypeInfoFactory.class)publicclassMyTuple<T0, T1>{publicT0 myfield0;publicT1 myfield1;}

提供自定义类型信息的工厂:

publicclassMyTupleTypeInfoFactoryextendsTypeInfoFactory<MyTuple>{@OverridepublicTypeInformation<MyTuple>createTypeInfo(Type t,Map<String,TypeInformation<?>> genericParameters){returnnewMyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));}}

createTypeInfo(Type, Map<String, TypeInformation<?>>)

方法为工厂所针对的类型创建类型信息。参数提供了关于类型本身以及类型泛型参数的附加信息(如果可用的话)。

​ 如果你的类型包含可能需要从 Flink 函数的输入类型派生出来的泛型参数,请确保也实现

org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters

以实现泛型参数到类型信息的双向映射。

通过这种方式,你可以为特定的类型提供高度定制化的序列化和反序列化逻辑,从而优化性能或适应特定的应用场景。

标签: flink 大数据

本文转载自: https://blog.csdn.net/qq_35643146/article/details/143226516
版权归原作者 小白鼠666 所有, 如有侵权,请联系我们删除。

“flink支持的数据类型”的评论:

还没有评论