0


再谈Java类型擦除与其对Flink类型和序列化的影响

前言

本文前半部分的内容在很久之前讲过,但是最近又有交接到团队内的历史任务出现这方面导致的性能问题,故有必要再讲一次,并扩展一部分新内容。先通过两个例子来引入Java类型擦除。

Java类型擦除的表现

  • 例一

这段代码无法通过编译,提示两个方法签名冲突,因为擦除类型相同。如果去掉其中一个方法,反编译之后的代码如下。

public void foo(List list) { }
  • 例二

这段代码会返回

true

。并且Java只允许

List.class

的写法,不允许

List<T>.class

的写法。

认识类型擦除

我们知道,泛型是高级语言中比较令人头疼的问题,一般来讲要实现泛型有两种方式:

  • Code Sharing:对同一个原始类型下的泛型类型只生成同一份目标代码(在Java中就是字节码)。
  • Code Specialization:对每一个泛型类型都生成不同的目标代码。

Java使用的泛型实现是前者,而C++和C#使用的是后者,它们也分别称为“假”泛型和“真”泛型。Code Sharing通过类型擦除来保证只生成一份目标代码,但也导致程序在运行时对泛型类型没有感知,所以上述例子一的代码反编译后只剩下了

List

,例子二中的类型比较实际上都是

Class<? extends ArrayList>

的比较。如果Java也采用Code Specialization机制(想一想C++ Template)的话,所有

List<T>

就都是显式不同的类型了。

为什么Java要采用Code Sharing和类型擦除呢?主要有两点原因:一是Java泛型是到1.5版本才出现的特性,在此之前Java已经在无泛型的条件下经历了较长时间的发展,如果采用Code Specialization,就得对Java类型系统做伤筋动骨的改动,并且无法保证向前兼容性;二是Code Specialization对每个泛型类型都生成不同的目标代码,如果有10个不同泛型的

List

,就要生成10份字节码,加重解释和执行负担。

由此可见,类型擦除让JVM省了不少事,但是加重了编译器的工作量。编译器必须在运行期之前就进行检查,禁止模糊的或者不合法的泛型使用方式。再举一个例子。

这种用法也是不允许的,换句话说,里氏替换原则不适用于Java的泛型类型参数。这并不难理解:对于一个

List<Object>

而言,向其中添加字符串是完全合法的,但是如果方法传入的参数为

List<Integer>

的话就会直接造成

ClassCastException

,因此编译器会提前block掉这种可能性。

还没完,如果把

traverse()

方法参数中的

List<Object>

换成用通配符表示的

List<?>

,那么

traverse()

方法调用就没问题,但

list.add()

语句就会编译不通过。这是因为

list.add()

方法无法判断具体要加入列表的是

Object

的哪个子类实例,因此会用最简单粗暴的方法来处理,即全部拒绝。相对地,

list.get()

则是可以编译通过的,因为编译器能够通过

<? extends T>

<? super T>

得知泛型类型的上下界限。

如果泛型类型有界限,在类型擦除时会根据最左侧的泛型参数来替换,例如下面的泛型类。

class Test<T extends Comparable & Serializable> {
  private T value;
  public T getValue() { return value; }
  public void setValue(T value) { this.value = value; }
}

类型擦除后就会变成:

class Test {
  private Comparable value;
  public Comparable getValue() { return value; }
  public void setValue(Comparable value) { this.value = value; }
}

同理,如果没有规定T是哪个类的子类或者超类,就会替换为Object。

下面来看类型擦除为Flink类型体系带来的问题,并介绍Flink规避此问题的类型提示(Type Hint)机制。

Flink的类型提示机制

以Flink自带示例中的

SocketWindowWordCount

为例,如果我们将它的主逻辑改写成Lambda表达式,如下:

DataStream<WordWithCount> windowCounts = text
      .flatMap((String value, Collector<WordWithCount> out) -> {
        for (String word : value.split("\\s")) {
          out.collect(new WordWithCount(word, 1L));
        }
      })
      .keyBy("word")
      .timeWindow(Time.seconds(5))
      .reduce((a, b) ->
        new WordWithCount(a.word, a.count + b.count)
      );

执行时会抛出如下异常。

这说明程序无法在运行时推断出

flatMap()

算子的返回类型。为什么之前采用匿名内部类就没有问题?因为匿名内部类会被真正地编译为

.class

文件,而Lambda表达式是在运行时调用

invokedynamic

指令,亦即在第一次执行其逻辑时才会确定。因此Lambda表达式比起匿名内部类,会丢失更多的类型信息。看一下

flatMap()

算子的签名:

void flatMap(T value, Collector<O> out);

经过类型擦除,

Collector

的泛型参数被抹掉了(参看报错

The generic type parameters of Collector are missing

),自然就会抛出无法确定返回类型的异常。如果我们采用的不是

flatMap()

算子而是

map()

,就不会出现这种问题,因为

map()

的返回类型可以自动推断。

为了克服类型擦除带来的问题,Flink类型系统中内置了类型提示机制,即用户在调用此类算子之后,手动指明返回的类型信息。在

flatMap()

之后调用

returns()

方法,就可以指定返回类型了。

text.flatMap((String value, Collector<WordWithCount> out) -> {
        for (String word : value.split("\\s")) {
          out.collect(new WordWithCount(word, 1L));
        }
      })
      .returns(TypeInformation.of(WordWithCount.class));

但是,如果返回类型本身就有泛型,比如在Flink中常用的元组(

TupleX

),就得另外换一种写法,即通过继承

TypeHint

的匿名内部类保留泛型信息。

.returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() { }))

下面再来看一个比较隐匿的可能会引发性能问题的场景。

POJO序列化fallback问题与类型注入

我们知道,Flink对标准的Java POJO类型有专门的

PojoSerializer

序列化器支持,性能相当好。但是也有例外,考虑以下包含容器成员的POJO:

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class MyPojo {
    private String myId;
    private List<Integer> myTags;
    private Map<String, Integer> myFlags;
}

在Flink应用执行时,会提示

myTags

myFlags

两个字段是Generic Types,也就是要fallback到Kryo Serializer做序列化,而非Flink原生的

ListSerializer

MapSerializer

。如果我们调用

ExecutionConfig#disableGenericTypes()

方法来禁用fallback,则应用无法执行,并有异常提示

Generic types have been disabled and type java.util.Map is treated as a generic type

可见,由于类型擦除的存在,POJO中的所有泛型参数都无法识别,无法通过原生序列化器操作,故序列化性能会有数倍的下降,在涉及状态操作和网络传输时尤其明显。本文开头提到的出现问题的历史Flink任务,就是因为单个POJO中包含了十几个

Map

,且均有状态读写,近期因新业务上线,流量增大,导致大量时间耗费在序列化层面,任务严重反压。

由此可见,尽量让POJO内不包含泛型类型(多数情况下就是避免使用Java容器)是最好的,但如果必须使用的话,如何解决这个问题呢?答案是自行实现对应类型的

TypeInfoFactory

,并通过

@TypeInfo

注解对泛型字段做类型注入。例如,我们实现一个无嵌套的

Map

对应的

TypeInfoFactory

@SuppressWarnings("unchecked")
public class SimpleMapTypeInfoFactory<K, V> extends TypeInfoFactory<Map<K, V>> {
    @Override
    public TypeInformation<Map<K, V>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
        return Types.MAP(
            (TypeInformation<K>) genericParameters.get("K"),
            (TypeInformation<V>) genericParameters.get("V")
        );
    }
}

然后为

myFlags

字段注入此类型,在序列化时,

MapSerializer

就会对此字段生效,性能恢复正常。

@TypeInfo(SimpleMapTypeInfoFactory.class)
private Map<String, Integer> myFlags;

当然,对所有其他无嵌套的

Map<K, V>

类型字段,都可以复用上面的

SimpleMapTypeInfoFactory

类,无需重复编写。至于其他类型与组合,读者举一反三即可。

最后再提醒一句,

Set

不是Flink序列化器原生支持的类型,即不存在

SetSerializer

,故所有需要用到

Set<T>

的场合,都需要用

Map<T, Boolean>

代替,并注入上述

SimpleMapTypeInfoFactory

,以获得最佳性能。关于误用

Set

这档事,笔者之前另有文章说明,不再赘述。

The End

标签: java flink 开发语言

本文转载自: https://blog.csdn.net/nazeniwaresakini/article/details/143444269
版权归原作者 LittleMagics 所有, 如有侵权,请联系我们删除。

“再谈Java类型擦除与其对Flink类型和序列化的影响”的评论:

还没有评论