0


Flink系列Table API和SQL之:表和流的转换

Flink系列Table API和SQL之:表和流的转换

一、表和流的转换

  • 从创建表环境开始,历经表的创建、查询转换和输出,已经可以使用Table API和SQL进行完整的流处理了。不过在应用的开发过程中,我们测试业务逻辑一般不会直接将结果直接写入到外部系统,而是在本地控制台打印输出。对于DataStream非常容易,直接调用print()方法就可以看到结果数据流的内容了。但对于Table就比较悲剧,没有提供print()方法。
  • 在Flink中可以将Table再转换成DataStream,然后进行打印输出。这就涉及了表和流的转换

二、将表(Table)转换成流(DataStream)

调用toDataStream()方法

  • 将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。
Table aliceVisitTable = tableEnv.sqlQuery("SELECT user,url "+"FROM EventTable "+"WHERE user = 'Alice' ");

将表转换成数据流,这里需要将要转换的Table对象作为参数传入。

tableEnv.toDataStream(aliceVisitTable).print();

调用toChangelogStream()方法

tableEnv.createTemporaryView("clickTable",eventTable);Table aggResult = tableEnv.sqlQuery("select user,COUNT(url) as cnt from clickTable group by user");

tableEnv.toChangelogStream(aggResult).print("agg");

三、将流转换成表

调用fromDataStream()方法

  • 想要将一个DataStream转换成表也很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。例如,可以直接将事件流eventStream转换成一个表。
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

获取表环境

//创建表执行环境StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);

读取数据源

SingleOutputStreamOperator<Event> eventStream = env.addSource(...)

将数据流转换成表

Table eventTable = tableEnv.fromDataStream(eventStream);

由于流中的数据本身就是定义好的POJO类型Event,所以我们将流转换成表之后,每一行数据就对应着一个Event,而表中的列名就对应着Event中的属性。

另外,还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置。

提取Event中的timestamp和url作为表中的列

Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp"),$("url"));

需要注意的是,timestamp本身是SQL中的关键字,所以我们在定义表名、列名时要尽量避免。这时可以通过表达式的as()方法对字段进行重命名。

Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp").as("ts"),$("url"));

调用createTemporaryView()方法

  • 调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换。不过如果希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图。
  • 对于这种场景,更简洁的调用方式,可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后可以传入多个参数,用来指定表中的字段:
tableEnv.createTemporaryView("EventTable",eventStream,$("timestamp").as("ts"),$("url"));

这样接下来就可以直接在SQL中引用表EventTable了。

调用fromChangelogStream()方法
表环境还提供了一个方法fromChangelogStream(),可以将一个更新日志流转换成表。这个方法要求流中的数据类型只能是Row,而且每一个数据都需要指定当前航的更新类型(RowKind)。所以一般是由连接器帮我们实现的。

四、支持的数据类型

  • DataStream,流中的数据类型都是定义好的POJO类。如果DataStream中的类型是简单的基本类型,还可以直接转换成表么?这就涉及了Table中支持的数据类型。
  • 整体来看,DataStream中支持的数据类型,Table中也都是支持的,只不过在进行转换时需要注意一些细节。

原子类型:

  • 在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称做原子类型。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。
StreamTableEnvironment tableEnv =...;DataStream<Long> stream =...;

将数据流转换成动态表,动态表只有一个字段,重命名为myLong

Table table = tableEnv.fromDataStream(stream,$("myLong"));

Tuple类型

  • 当原子类型不做重命名时,默认的字段名就是"f0",容易想到,其实就是将原子类型看做了一元组Tuple1的处理结果。
  • Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元祖中元素的属性名f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。
StreamTableEnvironment tableEnv =...;DataStream<Tuple2<Long,Integer>> stream =...;

将数据流转换成只包含f1字段的表

Table table = tableEnv.fromDataStream(stream,$("f1"));

将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换

Table table = tableEnv.fromDataStream(stream,$("f1"),$("f0"));

将f1字段命名为myInt,f0命名为myLong

Table table = tableEnv.fromDataStream(stream,$("f1").as("myInt"),$("f0").as("myLong"));

Row类型

  • Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息。在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的模式结构(Schema)。除此之外,Row类型还附加了一个属性RowKind,用来表示当前行在更新操作中的类型。这样,Row就可以用来表示更新日志流(changelog stream)中的数据,从而架起了Flink中流和表的转换桥梁。
  • 所以在更新日志流中,元素的类型必须是Row,而且需要调用ofKind(0方法来指定更新类型。下面是一个具体的例子:
DataStream<Row> dataStream = env.fromElements(Row.ofKind(RowKind.INSERT,"Alice",12),Row.ofKind(RowKind.INSERT,"Bob",5),Row.ofKind(RowKind.UPDATE_BEFORE,"Alice",12),Row.ofKind(RowKind.UPDATE_AFTER,"Alice",100));

将更新日志流转换为表

Table table = tableEnv.fromChangelogStream(dataStream);

本文转载自: https://blog.csdn.net/zhengzaifeidelushang/article/details/128509493
版权归原作者 最笨的羊羊 所有, 如有侵权,请联系我们删除。

“Flink系列Table API和SQL之:表和流的转换”的评论:

还没有评论