Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
文章目录
本文详细的介绍了table api 与datastream api的集成,分为9个部分进行说明,即概述、相互转换的三个入门示例、集成说明、批处理模式下的处理、insert-only的处理、changelog流处理、管道示例、TypeInformation与DataType的转换、旧版本table与datastream转换,并以可运行的示例进行说明。
本文是将本专栏中的三个部分合并成一个文章,文章较长。
其他三篇文章如下:
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
本文依赖flink、kafka集群能正常使用。
本文分为9个部分,即概述、相互转换的三个入门示例、集成说明、批处理模式下的处理、insert-only的处理、changelog流处理、管道示例、TypeInformation与DataType的转换、旧版本table与datastream转换。
本文的示例是在Flink 1.17版本中运行。
一、Table API 与 DataStream API集成
1、概述
在定义数据处理管道时,Table API和DataStream API同样重要。
DataStream API在一个相对低级的命令式编程API中提供流处理的原语(即时间、状态和数据流管理)。Table API抽象了许多内部构件,并提供了结构化和声明性API。
这两个API都可以处理有界和无界流。
在处理历史数据时,需要管理有界流。无界流发生在实时处理场景中,这些场景可能先使用历史数据进行初始化。
为了有效执行,这两个API都以优化的批处理执行模式提供处理有界流。然而,由于批处理只是流的一种特殊情况,因此也可以在常规流执行模式下运行有界流的管道。
一个API中的管道可以端到端定义,而不依赖于另一个API。然而,出于各种原因,混合这两种API可能是有用的:
- 在DataStream API中实现主管道(main pipeline)之前,使用表生态系统(table ecosystem)轻松访问目录(catalogs )或连接到外部系统。
- 在DataStream API中实现主管道之前,访问一些SQL函数以进行无状态数据规范化和清理。
- 如果table API中不存在更低级的操作(例如自定义计时器处理),则不时切换到DataStream API。
Flink提供了特殊的桥接功能,以使与DataStream API的集成尽可能顺利。
在DataStream 和Table API之间切换会增加一些转换开销。例如,部分处理二进制数据的表运行时(即RowData)的内部数据结构需要转换为更用户友好的数据结构(即Row)。通常,这个开销可以忽略。
- maven依赖 本篇文章,如果没有特殊说明,将使用如下maven依赖
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-gateway</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.2</version></dependency><!-- flink连接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency></dependencies>
2、 DataStream 和 Table 相互转换示例
Flink提供了专门的StreamTableEnvironment,用于与DataStream API集成。这些环境使用其他方法扩展常规TableEnvironment,并将DataStream API中使用的StreamExecutionEnvironments作为参数。
1)、示例1 - toDataStream
下面的代码展示了如何在两个API之间来回切换的示例。表的列名和类型自动从DataStream的TypeInformation派生。由于DataStream API本机不支持变更日志处理,因此代码假设在流到表和表到流转换期间仅附加/仅插入语义。
importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.types.Row;/**
* @author alanchan
*
*/publicclassConvertingDataStreamAndTableDemo{/**
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、创建输入流DataStream<String> dataStream = env.fromElements("alan","alanchan","alanchanchn");// 3、将datastream 转为 tableTable inputTable = tenv.fromDataStream(dataStream);// 4、创建视图,该步骤不是必须,将姓名转为大写
tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT UPPER(f0) FROM InputTable");// 5、将table转成datastream进行输出DataStream<Row> resultStream = tenv.toDataStream(resultTable);
resultStream.print();
env.execute();}}
- 示例输出
12>+I[ALAN]14>+I[ALANCHANCHN]13>+I[ALANCHAN]
fromDataStream和toDataStream的完整语义可以在下面的部分中找到。特别是,本节讨论了如何使用更复杂的嵌套类型来影响模式派生。它还包括使用事件时间和水印。
根据查询的类型,在许多情况下,生成的动态表是一个管道,它不仅在将表转换为数据流时产生仅插入的更改,而且还产生收回和其他类型的更新。在表到流转换期间,这可能会导致类似于以下内容的异常
Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].
在这种情况下,需要再次修改查询或切换到ChangelogStream。
2)、示例2 - toChangelogStream
下面的示例显示如何转换更新表。
每个结果行表示更改日志中的一个条目,该条目具有更改标志,可以通过对其调用row.getKind()来查询。在本例中,alan的第二个分数在更改之前(-U)创建更新,在更改之后(+U)创建更新。
本示例仅仅以一个方法来展示,避免没有必要的代码,运行框架参考上述示例。
publicstaticvoidtest2()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、创建输入流DataStream<Row> dataStream = env.fromElements(Row.of("alan",18),Row.of("alanchan",19),Row.of("alanchanchn",20),Row.of("alan",20));// 3、将datastream 转为 tableTable inputTable = tenv.fromDataStream(dataStream).as("name","salary");// 4、创建视图,该步骤不是必须
tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");// 5、将table转成datastream进行输出DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);
resultStream.print();
env.execute();}
- 运行结果
2>+I[alan,18]16>+I[alanchan,19]16>+I[alanchanchn,20]2>-U[alan,18]2>+U[alan,38]
fromChangelogStream和toChangelogStream的完整语义可以在下面的部分中找到。特别是,本节讨论了如何使用更复杂的嵌套类型来影响模式派生。它包括使用事件时间和水印。它讨论了如何为输入和输出流声明主键和变更日志模式。
上面的示例显示了如何通过为每个传入记录连续发出逐行更新来增量计算最终结果。然而,在输入流有限(即有界)的情况下,通过利用批处理原理可以更有效地计算结果。
在批处理中,可以在连续的阶段中执行运算符,这些阶段在发出结果之前使用整个输入表。例如,连接操作符可以在执行实际连接之前对两个有界输入进行排序(即排序合并连接算法),或者在使用另一个输入之前从一个输入构建哈希表(即哈希连接算法的构建/探测阶段)。
DataStream API和Table API都提供专门的批处理运行时模式。
3)、示例3 - 通过仅切换标志来处理批处理和流数据
下面的示例说明了统一管道能够通过仅切换标志来处理批处理和流数据。
publicstaticvoidtest3()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、创建输入流DataStream<Row> dataStream = env.fromElements(Row.of("alan",18),Row.of("alanchan",19),Row.of("alanchanchn",20),Row.of("alan",20));// 3、将datastream 转为 tableTable inputTable = tenv.fromDataStream(dataStream).as("name","salary");// 4、创建视图,该步骤不是必须
tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");// 5、将table转成datastream进行输出DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);
resultStream.print();
env.execute();}
- 运行结果
注意比较和示例2的输出区别
+I[alanchan,19]+I[alan,38]+I[alanchanchn,20]
一旦将changelog 应用于外部系统(例如键值存储),可以看到两种模式都能够产生完全相同的输出表。通过在发出结果之前使用所有输入数据,批处理模式的更改日志仅由仅插入的更改组成。有关更多细节,请参阅下面的专用批处理模式部分。
3、集成说明
将Table API与DataStream API相结合的项目需要添加以下桥接模块之一。
它们包括对 flink-table-api-java或flink-table-api-scala的可传递依赖性,以及相应的特定于语言的DataStream api模块。
1)、maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.17.1</version><scope>provided</scope></dependency>
2)、import
使用DataStream API和Table API的Java或Scala版本声明公共管道需要以下导入。
// imports for Java DataStream APIimport org.apache.flink.streaming.api.*;import org.apache.flink.streaming.api.environment.*;// imports for Table API with bridging to Java DataStream APIimport org.apache.flink.table.api.*;import org.apache.flink.table.api.bridge.java.*;
3)、Configuration
TableEnvironment将采用传递的StreamExecutionEnvironment.中的所有配置选项。然而,不能保证对StreamExecutionEnvironment配置的进一步更改在实例化后传播到StreamTableEnvironment。在规划期间,将选项从Table API传播到DataStream API。
我们建议在切换到Table API之前尽早在DataStream API中设置所有配置选项。
importjava.time.ZoneId;importorg.apache.flink.streaming.api.CheckpointingMode;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;// create Java DataStream APIStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// set various configuration early
env.setMaxParallelism(256);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class,CustomKryoSerializer.class);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// then switch to Java Table APIStreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);// set configuration early
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));// start defining your pipelines in both APIs...
4)、执行行为
这两个API都提供了执行管道的方法。换句话说:如果被请求,它们将编译一个作业图( job graph),该作业图将提交到集群并触发以执行。结果将流式传输到声明的sinks。
通常,这两个API都在方法名称中使用术语“执行”来标记这种行为。然而,Table API和DataStream API之间的执行行为略有不同。
1、DataStream API
DataStream API的StreamExecutionEnvironment使用生成器模式(builder pattern)来构造复杂的管道。管道可能会拆分为多个分支,这些分支可能以sink结尾,也可能不以sink结尾。环境缓冲(environment buffers)所有这些定义的分支,直到提交作业。
StreamExecutionEnvironment.execute()提交整个构建的管道,然后清除构建器。换句话说:不再声明sources 和sinks ,并且可以向生成器中添加新的管道。因此,每个DataStream程序通常以对StreamExecutionEnvironment.execute()的调用结束。或者,DataStream.executeAndCollect()隐式定义了一个sink,用于将结果流式传输到本地客户端。
2、Table API
在Table API中,分支管道仅在StatementSet中受支持,其中每个分支必须声明一个最终sink。TableEnvironment和StreamTableEnvironment都不提供专用的通用execute()方法。相反,它们提供了提交单个source-to-sink管道或语句集的方法:
finalstaticString sinkSQL ="CREATE TABLE OutputTable (\n"+" userId INT,\r\n"+" age INT,\r\n"+" balance DOUBLE,\r\n"+" userName STRING,\r\n"+" t_insert_time TIMESTAMP(3)\r\n"+") WITH (\n"+" 'connector' = 'print'\n"+")";finalstaticString sinkSQL2 ="CREATE TABLE OutputTable2 (\n"+" userId INT,\r\n"+" age INT,\r\n"+" balance DOUBLE,\r\n"+" userName STRING,\r\n"+" t_insert_time TIMESTAMP(3)\r\n"+") WITH (\n"+" 'connector' = 'print'\n"+")";finalstaticString sourceSQL ="CREATE TABLE InputTable (\r\n"+" userId INT,\r\n"+" age INT,\r\n"+" balance DOUBLE,\r\n"+" userName STRING,\r\n"+" t_insert_time AS localtimestamp,\r\n"+" WATERMARK FOR t_insert_time AS t_insert_time\r\n"+") WITH (\r\n"+" 'connector' = 'datagen',\r\n"+" 'rows-per-second'='10',\r\n"+" 'fields.userId.kind'='sequence',\r\n"+" 'fields.userId.start'='1',\r\n"+" 'fields.userId.end'='20',\r\n"+" 'fields.balance.kind'='random',\r\n"+" 'fields.balance.min'='1',\r\n"+" 'fields.balance.max'='100',\r\n"+" 'fields.age.min'='1',\r\n"+" 'fields.age.max'='100',\r\n"+" 'fields.userName.length'='6'\r\n"+");";publicstaticvoidtest4()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);//sinkSQL//sourceSQL// 建表
tenv.executeSql(sourceSQL);//
tenv.executeSql(sinkSQL);
tenv.executeSql(sinkSQL2);//插入表数据,方式一
tenv.from("InputTable").insertInto("OutputTable").execute();
tenv.executeSql("select * from OutputTable");
tenv.from("InputTable").execute().print();//插入表数据,方式二
tenv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");
tenv.executeSql("select * from OutputTable");//插入表数据,方式三
tenv.createStatementSet().addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable").addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable").execute();// 输出
tenv.from("InputTable").execute().print();
tenv.executeSql("SELECT * FROM InputTable").print();
env.execute();}
- 输出结果
3>+I[3,99,36.20987556045243, d23888,2023-11-13T14:49:58.812]15>+I[15,39,68.30743253178122,43bec8,2023-11-13T14:49:58.812]2>+I[2,62,47.280395949976885,7bae4e,2023-11-13T14:49:58.812]16>+I[16,52,42.10205629532836,6baf0e,2023-11-13T14:49:58.812]10>+I[10,25,58.008035887440094, d43dea,2023-11-13T14:49:58.812]13>+I[13,36,70.9215559827798,01bb28,2023-11-13T14:49:58.812]12>+I[12,38,30.31004698340413,322ba8,2023-11-13T14:49:58.812]6>+I[6,17,32.28909358733212,13bf88,2023-11-13T14:49:58.812]9>+I[9,49,44.52802246768357, e8280c,2023-11-13T14:49:58.812]8>+I[8,80,18.03487847824154,803b2a,2023-11-13T14:49:58.812]5>+I[5,61,54.43695775227862,063f08,2023-11-13T14:49:58.812]7>+I[7,64,33.886576642098404,443dea,2023-11-13T14:49:58.812]14>+I[14,92,63.71527772015468,123848,2023-11-13T14:49:58.812]11>+I[11,22,30.745102844313315, e62848,2023-11-13T14:49:58.812]4>+I[4,78,88.60724929598506,55bca8,2023-11-13T14:49:58.812]1>+I[1,82,62.50149215989057,0bba0c,2023-11-13T14:49:58.812]3>+I[19,67,14.244993215937432, e6c911,2023-11-13T14:49:59.806]1>+I[17,67,91.05078612782468,560b6c,2023-11-13T14:49:59.807]4>+I[20,95,82.12047947156385,1ac5b2,2023-11-13T14:49:59.807]2>+I[18,81,25.384055001988084, fe98d1,2023-11-13T14:49:59.806]+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+| op | userId | age | balance | userName | t_insert_time |+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+|+I |1|91|22.629318048042723|923e08 |2023-11-1314:49:59.800||+I |2|67|75.26915785038814|342baa |2023-11-1314:49:59.803||+I |3|68|74.06076023217011|1dbbce |2023-11-1314:49:59.803||+I |4|26|79.47471729272772|083e2e |2023-11-1314:49:59.802||+I |5|97|82.56249330491859|4a3c6e |2023-11-1314:49:59.804||+I |6|32|81.74903214944425| fdac4e |2023-11-1314:49:59.800||+I |7|67|94.80154136831771| f7acea |2023-11-1314:49:59.800||+I |8|53|50.85073238739004| cfbd0c |2023-11-1314:49:59.800||+I |9|69|93.64054547476522|7fa9ec |2023-11-1314:49:59.801||+I |10|66|61.92366658766452|05b86a |2023-11-1314:49:59.803||+I |11|81|95.61717698776191| efa8ce |2023-11-1314:49:59.797||+I |12|8|63.573174957723076|0fbfec |2023-11-1314:49:59.802||+I |13|85|52.938510850778734|43bfa8 |2023-11-1314:49:59.803||+I |14|26|5.130287258770441|083c6c |2023-11-1314:49:59.797||+I |15|35|73.3318749510538|0e3b4c |2023-11-1314:49:59.802||+I |16|84|16.24326410122912| ac2d6e |2023-11-1314:49:59.802||+I |18|41|32.38455189801736| b07afb |2023-11-1314:50:00.804||+I |19|24|77.6947569111452|7f72ac |2023-11-1314:50:00.803||+I |20|92|82.53929937026987|051fb9 |2023-11-1314:50:00.802||+I |17|93|12.784194121509948| bce5d9 |2023-11-1314:50:00.801|+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+20rowsinset
To combine both execution behaviors, every call to StreamTableEnvironment.toDataStream or StreamTableEnvironment.toChangelogStream will materialize (i.e. compile) the Table API sub-pipeline and insert it into the DataStream API pipeline builder. This means that StreamExecutionEnvironment.execute() or DataStream.executeAndCollect must be called afterwards. An execution in Table API will not trigger these “external parts”.
为了组合这两种执行行为,对StreamTableEnvironment.toDataStream或StreamTableEnviron.toChangelogStream的每次调用都将具体化(materialize )(即编译)Table API子管道(sub-pipeline),并将其插入DataStream API管道生成器(builder)中。这意味着之后必须调用StreamExecutionEnvironment.execute()或DataStream.executeAndCollect。Table API中的执行不会触发这些“外部部件(external parts)”。
// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print();// (2)// executes a Table API end-to-end pipeline as a Flink job and prints locally,// thus (1) has still not been executed
table.execute().print();// executes the DataStream API pipeline with the sink defined in (1) as a// Flink job, (2) was already running before
env.execute();
上述示例中有具体应用。
4、批处理模式
批处理运行时模式是有界Flink程序的专用执行模式。
一般来说,有界性是数据源的一个属性,它告诉我们来自该源的所有记录在执行之前是否已知,或者新数据是否会显示,可能是无限期的。反过来,如果作业的所有源都有界,则作业是有界的,否则作业是无界的。
另一方面,流运行时模式可用于有界作业和无界作业。
有关不同执行模式的更多信息,请参阅相应的DataStream API部分。
Table API和SQL计划器为这两种模式中的任何一种提供了一组专门的优化器规则和运行时运算符。
截至Flink 版本 1.17,运行时模式不是从源自动派生的,因此,在实例化StreamTableEnvironment时,必须显式设置或将从StreamExecutionEnvironment采用运行时模式:
importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;// adopt mode from StreamExecutionEnvironmentStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);// or// set mode explicitly for StreamTableEnvironment// it will be propagated to StreamExecutionEnvironment during planningStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env,EnvironmentSettings.inBatchMode());
在将运行时模式设置为BATCH之前,必须满足以下先决条件:
- 所有源都必须声明自己是有界的。
- 截至Flink 版本 1.17,表源必须发出仅插入更改。
- 运算符需要足够的堆外内存用于排序和其他中间结果。
- 所有表操作必须在批处理模式下可用。截至Flink 版本 1.17,其中一些仅在流媒体模式下可用。请查看相应的表API和SQL页面。
批处理执行具有以下含义(以及其他含义):
- 渐进水印(Progressive watermarks)既不会生成,也不会在运算符中使用。但是,源在关闭之前会发出最大水印(maximum watermark)。
- 根据execution.batch-shuffle-mode,任务之间的交换可能会被阻塞。这也意味着与在流模式下执行相同管道相比,可能会减少资源需求。
- 检查点已禁用。插入了人工状态后端。
- 表操作不会产生增量更新,而只会产生一个完整的最终结果,该结果将转换为仅插入的变更日志流。
由于批处理可以被视为流处理的特殊情况,因此我们建议首先实现流管道,因为它是有界和无界数据的最通用实现。
理论上,流管道可以执行所有操作符。然而,在实践中,一些操作可能没有多大意义,因为它们将导致不断增长的状态,因此不受支持。全局排序是一个仅在批处理模式下可用的示例。简单地说:应该可以在批处理模式下运行工作流管道,但不一定相反。
下面的示例演示如何使用DataGen表源处理批处理模式。许多源提供了隐式使连接器有界的选项,例如,通过定义终止偏移量或时间戳。在我们的示例中,我们使用number-of-rows选项限制行数。
publicstaticvoidtest5()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);//建表Table table =
tenv.from(TableDescriptor.forConnector("datagen").option("number-of-rows","5")// make the source bounded.schema(Schema.newBuilder().column("uid",DataTypes.TINYINT()).column("payload",DataTypes.STRING()).build()).build());//转datastream,并输出
tenv.toDataStream(table).keyBy(r -> r.<Byte>getFieldAs("uid")).map(r ->"alan_payload: "+ r.<String>getFieldAs("payload")).executeAndCollect().forEachRemaining(System.out::println);
env.execute();}
- 输出
alan_payload: 143dc81ed1cf71d9b7a4f8088cae78b5fd919f0ba2bc57e24828c18dea47fb9e84f4ce6a74d0f18285c8c66b9587947a81b1
alan_payload: c3bc0a98d286c9db33a02896bca16ac327f267183e16bc42c813741297ed3f51b998dc45d23231d2ca06677072c21b222369
alan_payload: ce3bae6e08c4dbef6b4d4517b426c76792b788126747c494110a48e6b4909920602643e37323e64038e64cc2d359476e7495
alan_payload: b22c2ac79d2e9be20caf3c311d12637dc42422f7d25132750b4afbb8e8dd341d0f767e42e70874f7207cf5a24c7d1caea713
alan_payload: d1bb8a7fe2077efaa61dc4befe8fef884c257c5c201c62bbac11787a222b70df021e16cba32d5cfc42527589af45dc968c7f
1)、Changelog Unification
在大多数情况下,当从流模式切换到批处理模式时,管道定义本身在Table API和DataStream API中都可以保持不变,反之亦然。然而,如前所述,由于避免了批处理模式中的增量操作,因此产生的变更日志流(changelog streams)可能会不同。
依赖于事件时间并利用水印作为完整性标记的基于时间的操作(Time-based operations)能够生成独立于运行时模式的仅插入变更日志流(insert-only changelog stream)。
下面的Java示例演示了一个Flink程序,该程序不仅在API级别上统一,而且在生成的changelog流中统一。
该示例使用基于两个表(ts)中的时间属性的 interval join来联接SQL中的两个表,即UserTable和OrderTable。
它使用DataStream API实现自定义运算符,该运算符使用KeyedProcessFunction和值状态(value state)对用户名进行重复数据消除。
运行结果见输出注释部分。
publicstaticvoidtest6()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);StreamTableEnvironment tenv =StreamTableEnvironment.create(env);//数据源 userStreamDataStream<Row> userStream = env
.fromElements(Row.of(LocalDateTime.parse("2023-11-13T17:50:00"),1,"alan"),Row.of(LocalDateTime.parse("2023-11-13T17:55:00"),2,"alanchan"),Row.of(LocalDateTime.parse("2023-11-13T18:00:00"),2,"alanchanchn")).returns(Types.ROW_NAMED(newString[]{"ts","uid","name"},Types.LOCAL_DATE_TIME,Types.INT,Types.STRING));//数据源 orderStream DataStream<Row> orderStream = env
.fromElements(Row.of(LocalDateTime.parse("2023-11-13T17:52:00"),1,122),Row.of(LocalDateTime.parse("2023-11-13T17:57:00"),2,239),Row.of(LocalDateTime.parse("2023-11-13T18:01:00"),2,999)).returns(Types.ROW_NAMED(newString[]{"ts","uid","amount"},Types.LOCAL_DATE_TIME,Types.INT,Types.INT));//创建视图 UserTable
tenv.createTemporaryView("UserTable",
userStream,Schema.newBuilder().column("ts",DataTypes.TIMESTAMP(3)).column("uid",DataTypes.INT()).column("name",DataTypes.STRING()).watermark("ts","ts - INTERVAL '1' SECOND").build());//创建视图 OrderTable
tenv.createTemporaryView("OrderTable",
orderStream,Schema.newBuilder().column("ts",DataTypes.TIMESTAMP(3)).column("uid",DataTypes.INT()).column("amount",DataTypes.INT()).watermark("ts","ts - INTERVAL '1' SECOND").build());// 建立OrderTable 和 UserTable 关联关系Table joinedTable =
tenv.sqlQuery("SELECT U.name, O.amount "+"FROM UserTable U, OrderTable O "+"WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES");//将table转成datastreamDataStream<Row> joinedStream = tenv.toDataStream(joinedTable);
joinedStream.print();// +I[alanchan, 239]// +I[alanchanchn, 999]// +I[alan, 122]
env.execute();}
- 使用ProcessFunction和ValueState现自定义运算符 在上面的例子中,加入下面的代码即可,运行结果是将姓名输出
// 使用ProcessFunction和值状态实现自定义运算符
joinedStream
.keyBy(r -> r.<String>getFieldAs("name")).process(newKeyedProcessFunction<String,Row,String>(){ValueState<String> seen;@Overridepublicvoidopen(Configuration parameters){
seen =getRuntimeContext().getState(newValueStateDescriptor<>("seen",String.class));}@OverridepublicvoidprocessElement(Row row,Context ctx,Collector<String> out)throwsException{String name = row.getFieldAs("name");if(seen.value()==null){
seen.update(name);
out.collect(name);}}}).print();// alan// alanchan// alanchanchn
5、Handling of (Insert-Only) Streams 处理(仅插入)流
StreamTableEnvironment提供了以下方法进行datastream的转换API:
- **fromDataStream(DataStream)**:将仅插入更改和任意类型的流解释为表。默认情况下,不会传播事件时间和水印。
- **fromDataStream(DataStream, Schema)**:将仅插入更改和任意类型的流解释为表。可选模式允许丰富列数据类型,并添加时间属性、水印策略、其他计算列或主键。
- **createTemporaryView(String, DataStream)**:注册一个可以在sql中访问的流名称(虚表、视图)。它是createTemporaryView(String,fromDataStream(DataStream))的快捷方式。
- **createTemporaryView(String, DataStream, Schema)**:注册一个可以在sql中访问的流名称(虚表、视图)。 它是createTemporaryView(String,fromDataStream(DataStream,Schema))的快捷方式。
- **toDataStream(Table)**:将表转换为仅插入更改的流。默认的流记录类型为org.apache.flink.types.Row。将单个rowtime属性列写回DataStream API的记录中。水印也会传播。
- **toDataStream(Table, AbstractDataType)**:将表转换为仅插入更改的流。该方法接受数据类型来表示所需的流记录类型。planner 可以插入隐式转换和重新排序列,以将列映射到(可能是嵌套的)数据类型的字段。
- **toDataStream(Table, Class)**:toDataStream(Table,DataTypes.of(Class))的快捷方式,用于反射地快速创建所需的数据类型。
从Table API的角度来看,和DataStream API的转换类似于读取或写入在SQL中使用CREATE Table DDL定义的虚拟表连接器。
虚拟CREATE TABLE name(schema)WITH(options)语句中的模式部分可以自动从DataStream的类型信息中派生、丰富或完全使用org.apache.flink.table.api.Schema手动定义。
The virtual DataStream table connector exposes the following metadata for every row:
虚拟DataStream table 连接器为每一行暴露以下元数据:
KeyData TypeDescriptionR/WrowtimeTIMESTAMP_LTZ(3) NOT NULLStream record’s timestamp.R/W
虚拟DataStream table source实现SupportsSourceWatermark,因此允许调用source_WATERMARK()内置函数作为水印策略,以采用来自DataStream API的水印。
1)、fromDataStream 示例
下面的代码展示了如何将fromDataStream用于不同的场景。其输出结果均在每个步骤的输出注释部分。
importjava.time.Instant;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Schema;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/**
* @author alanchan
*
*/publicclassTestFromDataStreamDemo{@NoArgsConstructor@AllArgsConstructor@DatapublicstaticclassUser{publicString name;publicInteger score;publicInstant event_time;}publicstaticvoidtest1()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、创建数据源DataStream<User> dataStream =
env.fromElements(newUser("alan",4,Instant.ofEpochMilli(1000)),newUser("alanchan",6,Instant.ofEpochMilli(1001)),newUser("alanchanchn",10,Instant.ofEpochMilli(1002)));// 示例1、显示table的数据类型// 说明了不需要基于时间的操作时的简单用例。Table table = tenv.fromDataStream(dataStream);// table.printSchema();// (// `name` STRING,// `score` INT,// `event_time` TIMESTAMP_LTZ(9)// )// 示例2、增加一列,并显示table的数据类型// 这些基于时间的操作应在处理时间内工作的最常见用例。Table table2 = tenv.fromDataStream(
dataStream,Schema.newBuilder().columnByExpression("proc_time","PROCTIME()").build());// table2.printSchema();// (// `name` STRING,// `score` INT,// `event_time` TIMESTAMP_LTZ(9),// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()// )// 示例3、增加rowtime列,并增加watermarkTable table3 =
tenv.fromDataStream(
dataStream,Schema.newBuilder().columnByExpression("rowtime","CAST(event_time AS TIMESTAMP_LTZ(3))").watermark("rowtime","rowtime - INTERVAL '10' SECOND").build());// table3.printSchema();// (// `name` STRING,// `score` INT,// `event_time` TIMESTAMP_LTZ(9),// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND// )// 示例4、增加rowtime列,并增加watermark(SOURCE_WATERMARK()水印策略假设已经实现了,本部分仅仅是展示用法)// 基于时间的操作(如窗口或间隔联接)应成为管道的一部分时最常见的用例。Table table4 =
tenv.fromDataStream(
dataStream,Schema.newBuilder().columnByMetadata("rowtime","TIMESTAMP_LTZ(3)").watermark("rowtime","SOURCE_WATERMARK()").build());// table4.printSchema();// (// `name` STRING,// `score` INT,// `event_time` TIMESTAMP_LTZ(9),// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()// ) // 示例5、修改event_time类型长度,增加event_time的水印策略(SOURCE_WATERMARK()水印策略假设已经实现了,本部分仅仅是展示用法)// 完全依赖于用户的声明。这对于用适当的数据类型替换DataStream API中的泛型类型(在Table API中是RAW)很有用。Table table5 =
tenv.fromDataStream(
dataStream,Schema.newBuilder().column("event_time","TIMESTAMP_LTZ(3)").column("name","STRING").column("score","INT").watermark("event_time","SOURCE_WATERMARK()").build());
table5.printSchema();// (// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,// `name` STRING,// `score` INT// )
env.execute();}publicstaticvoidmain(String[] args)throwsException{test1();}}
由于DataType比TypeInformation更丰富,我们可以轻松地启用不可变POJO和其他复杂的数据结构。
下面的Java示例显示了可能的情况。
另请检查DataStream API的“数据类型和序列化”页面,以获取有关那里支持的类型的更多信息。
packageorg.tablesql.convert;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.DataTypes;importorg.apache.flink.table.api.Schema;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/**
* @author alanchan
*
*/publicclassTestFromDataStreamDemo{// user2的属性都加上了final修饰符publicstaticclassUser2{publicfinalString name;publicfinalInteger score;publicUser2(String name,Integer score){this.name = name;this.score = score;}}publicstaticvoidtest2()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);//the DataStream API does not support immutable POJOs yet, the class will result in a generic type that is a RAW type in Table API by defaul//DataStream API尚不支持不可变POJO,该类的结果默认情况下将是一个Table API中是RAW类型的泛型。// 2、创建数据源DataStream<User2> dataStream = env.fromElements(newUser2("Alice",4),newUser2("Bob",6),newUser2("Alice",10));// 示例1:输出表结构Table table = tenv.fromDataStream(dataStream);// table.printSchema();// (// `f0` RAW('org.tablesql.convert.TestFromDataStreamDemo$User2', '...')// )// 示例2:声明式输出表结构// 在自定义模式中使用table API的类型系统为列声明更有用的数据类型,并在下面的“as”投影中重命名列Table table2 = tenv
.fromDataStream(
dataStream,Schema.newBuilder().column("f0",DataTypes.of(User2.class)).build()).as("user");// table2.printSchema(); // (// `user` *org.tablesql.convert.TestFromDataStreamDemo$User2<`name` STRING, `score` INT>*// )//示例3:数据类型可以如上所述反射地提取或显式定义//Table table3 = tenv
.fromDataStream(
dataStream,Schema.newBuilder().column("f0",DataTypes.STRUCTURED(User2.class,DataTypes.FIELD("name",DataTypes.STRING()),DataTypes.FIELD("score",DataTypes.INT()))).build()).as("user");
table3.printSchema();// (// `user` *org.tablesql.convert.TestFromDataStreamDemo$User2<`name` STRING, `score` INT>*// )
env.execute();}publicstaticvoidmain(String[] args)throwsException{test2();}}
2)、createTemporaryView 示例
DataStream可以直接注册为视图。
从DataStream 创建的视图只能注册为临时视图。由于它们的内联/匿名性质,无法在永久目录(permanent catalog)中注册它们。
下面的代码展示了如何对不同的场景使用createTemporaryView。每个示例中的运行结果均在输出部分以注释展示。
importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Schema;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;/**
* @author alanchan
*
*/publicclassTestCreateTemporaryViewDemo{publicstaticvoidtest1()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、创建数据源DataStream<Tuple2<Long,String>> dataStream = env.fromElements(Tuple2.of(12L,"alan"),Tuple2.of(0L,"alanchan"));// 示例1:创建视图、输出表结构
tenv.createTemporaryView("MyView", dataStream);
tenv.from("MyView").printSchema();// (// `f0` BIGINT NOT NULL,// `f1` STRING// )// 示例2:创建视图、输出表结构,使用Schema显示定义列,类似于fromDataStream的定义//在这个例子中,输出的NOT NULL没有定义
tenv.createTemporaryView("MyView",
dataStream,Schema.newBuilder().column("f0","BIGINT").column("f1","STRING").build());
tenv.from("MyView").printSchema();// (// `f0` BIGINT,// `f1` STRING// )// 示例3:创建视图,并输出表结构// 在创建视图前修改(或定义)列名称,as一般是指重命名,原名称是f0、f1
tenv.createTemporaryView("MyView",
tenv.fromDataStream(dataStream).as("id","name"));
tenv.from("MyView").printSchema();// (// `id` BIGINT NOT NULL,// `name` STRING// )
env.execute();}/**
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{test1();}}
3)、toDataStream示例
下面的代码展示了如何在不同的场景中使用toDataStream。每个示例中的运行结果均在输出部分以注释展示。
importjava.time.Instant;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.DataTypes;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/**
* @author alanchan
*
*/publicclassTestToDataStreamDemo{@NoArgsConstructor@AllArgsConstructor@DatapublicstaticclassUser{publicString name;publicInteger score;publicInstant event_time;}staticfinalStringSQL="CREATE TABLE GeneratedTable "+"("+" name STRING,"+" score INT,"+" event_time TIMESTAMP_LTZ(3),"+" WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"+")"+"WITH ('connector'='datagen')";publicstaticvoidtest1()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、建表
tenv.executeSql(SQL);Table table = tenv.from("GeneratedTable");// 示例1:table 转 datastream// 使用默认的Row实例转换// 由于`event_time`是单个行时间属性,因此它被插入到DataStream元数据中,并传播水印// DataStream<Row> dataStream = tenv.toDataStream(table);// dataStream.print();// 以下是示例性输出,实际上是连续的数据 // 10> +I[9b979ecef142c06746ff2be0f79f4afe7ef7089f60f267184e052c12ef5f2c2a144c73d3653bee51b351ed5b20ecaf0673ec, -1424631858, 2023-11-14T02:58:56.071Z]// 1> +I[444998c8992accc54e2c10cac4f4a976cda516d84817a8fd728c9d013da3d87e91d28537a564f09fb07308142ca83c2548e9, -1240938499, 2023-11-14T02:58:56.071Z]// 12> +I[fa42df01fe1f789535df26f81c2e58c02feaeba60338e4cfb7c8fdb06ed96c69b46e9a966d93d0cf811b24dd9434a8ef2253, 2039663083, 2023-11-14T02:58:56.070Z]// 1> +I[25aa121a0d656a5355c32148a0c68cc39ac05443bd7de6a0c499a2daae85868422dd024c6803598133dc26a607cd1e60e747, 1912789884, 2023-11-14T02:58:56.071Z]// 示例2:table 转 datastream// 从类“User”中提取数据类型,planner重新排序字段,并在可能的情况下插入隐式转换,以将内部数据结构转换为所需的结构化类型// 由于`event_time`是单个行时间属性,因此它被插入到DataStream元数据中,并传播水印DataStream<User> dataStream2 = tenv.toDataStream(table,User.class);// dataStream2.print();// 以下是示例性输出,实际上是连续的数据 // 4> TestToDataStreamDemo.User(name=e80b612e48443a292c11e28159c73475b9ef9531b91d5712420753d5d6041a06f5de634348210b151f4fc220b4ec91ed5c72, score=2146560121, event_time=2023-11-14T03:01:17.657Z)// 14> TestToDataStreamDemo.User(name=290b48dea62368bdb35567f31e5e2690ad8b5dd50c1c0f7184f15d2e85b24ea84155f1edef875f4c96e3a2133a320fcb6e41, score=2062379192, event_time=2023-11-14T03:01:17.657Z)// 12> TestToDataStreamDemo.User(name=a0b31a03ad951b53876445001bbc74178c9818ece7d5e53166635d40cb8ef07980eabd7463ca6be38b34b1f0fbd4e2251df0, score=16953697, event_time=2023-11-14T03:01:17.657Z)// 示例3:table 转 datastream// 数据类型可以如上所述反射地提取或显式定义DataStream<User> dataStream3 =
tenv.toDataStream(
table,DataTypes.STRUCTURED(User.class,DataTypes.FIELD("name",DataTypes.STRING()),DataTypes.FIELD("score",DataTypes.INT()),DataTypes.FIELD("event_time",DataTypes.TIMESTAMP_LTZ(3))));
dataStream3.print();// 以下是示例性输出,实际上是连续的数据 // 9> TestToDataStreamDemo.User(name=49550693e3cb3a41cd785504c699684bf2015f0ebff5918dbdea454291c265d316773f2d9507ce73dd18f91a2f5fdbd6e500, score=744771891, event_time=2023-11-14T03:06:13.010Z)// 2> TestToDataStreamDemo.User(name=60589709fe41decb647fcf4e2f91d45c82961bbe64469f3ea8a9a12b0cac071481ec9cfd65a9c218e3799986dd72ab80e457, score=-1056249244, event_time=2023-11-14T03:06:13.010Z)// 15> TestToDataStreamDemo.User(name=d0a179f075c8b521bf5ecb08a32f6c715b5f2c616f815f8173c0a1c2961c53774faf396ddf55a44db49abe8085772f35d75c, score=862651361, event_time=2023-11-14T03:06:13.010Z)
env.execute();}publicstaticvoidmain(String[] args)throwsException{test1();}}
toDataStream仅支持非更新表。通常,基于时间的操作(如windows, interval joins或MATCH_RECOGNIZE子句)非常适合于在 insert-only pipelines的简单操作(如投影(projections )和过滤)。
具有生成更新的操作的管道可以使用toChangelogStream。
6、Handling of Changelog Streams处理变化流
在内部,Flink的表运行时是一个changelog处理器。
StreamTableEnvironment提供了以下方法来暴露change data capture(CDC)功能:
- **fromChangelogStream(DataStream)**:将变更日志条目流(stream of changelog entries)解释为表。流记录类型必须为org.apache.flink.types.Row,因为其RowKind标志在运行时评估(evaluated )。默认情况下,不会传播事件时间和水印。该方法期望将包含所有类型更改的changelog(在org.apache.flink.types.RowKind中枚举)作为默认的ChangelogMode。
- **fromChangelogStream(DataStream, Schema)**:允许为DataStream定义类似于fromDataStream(DataStream ,schema )的schema 。否则,语义等于fromChangelogStream(DataStream)。
- **fromChangelogStream(DataStream, Schema, ChangelogMode)**:提供关于如何将stream 解释为changelog的完全控制。传递的ChangelogMode有助于planner 区分insert-only, upsert, or retract行为。
- **toChangelogStream(Table)**:fromChangelogStream(DataStream)的反向操作。它生成一个包含org.apache.flink.types.Row实例的流,并在运行时为每个记录设置RowKind标志。该方法支持各种更新表。如果输入表包含单个rowtime 列(single rowtime column),则它将传播到流记录的时间戳中(stream record’s timestamp)。水印也将被传播。
- **toChangelogStream(Table, Schema)**:fromChangelogStream(DataStream,Schema)的反向操作。该方法可以丰富生成的列数据类型。如果需要,planner 可以插入隐式转换。可以将rowtime写出为元数据列。
- **toChangelogStream(Table, Schema, ChangelogMode)**:提供关于如何将表转换为变更日志流(convert a table to a changelog stream)的完全控制。传递的ChangelogMode有助于planner 区分insert-only, upsert, or retract 行为。
从Table API的角度来看,和DataStream API的转换类似于读取或写入在SQL中使用CREATE Table DDL定义的虚拟表连接器。
由于fromChangelogStream的行为类似于fromDataStream。
此虚拟连接器还支持读取和写入流记录的rowtime 元数据。
虚拟表源实现SupportsSourceWatermark。
1)、fromChangelogStream示例
下面的代码展示了如何将fromChangelogStream用于不同的场景。
importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Schema;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.table.connector.ChangelogMode;importorg.apache.flink.types.Row;importorg.apache.flink.types.RowKind;/**
* @author alanchan
*
*/publicclassTestFromChangelogStreamDemo{//the stream as a retract stream//默认ChangelogMode应该足以满足大多数用例,因为它接受所有类型的更改。publicstaticvoidtest1()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、创建数据源DataStream<Row> dataStream =
env.fromElements(Row.ofKind(RowKind.INSERT,"alan",12),Row.ofKind(RowKind.INSERT,"alanchan",5),Row.ofKind(RowKind.UPDATE_BEFORE,"alan",12),Row.ofKind(RowKind.UPDATE_AFTER,"alan",100));// 3、changlogstream转为tableTable table = tenv.fromChangelogStream(dataStream);// 4、创建视图
tenv.createTemporaryView("InputTable", table);//5、聚合查询
tenv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print();// +----+--------------------------------+-------------+// | op | name | score |// +----+--------------------------------+-------------+// | +I | alanchan | 5 |// | +I | alan | 12 |// | -D | alan | 12 |// | +I | alan | 100 |// +----+--------------------------------+-------------+// 4 rows in set
env.execute();}//the stream as an upsert stream (without a need for UPDATE_BEFORE)//展示了如何通过使用upsert模式将更新消息的数量减少50%来限制传入更改的类型以提高效率。//通过为toChangelogStream定义主键和upsert changelog模式,可以减少结果消息的数量。publicstaticvoidtest2()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);//2、创建数据源DataStream<Row> dataStream =
env.fromElements(Row.ofKind(RowKind.INSERT,"alan",12),Row.ofKind(RowKind.INSERT,"alanchan",5),Row.ofKind(RowKind.UPDATE_AFTER,"alan",100));// 3、转为tableTable table =
tenv.fromChangelogStream(
dataStream,Schema.newBuilder().primaryKey("f0").build(),ChangelogMode.upsert());// 4、创建视图
tenv.createTemporaryView("InputTable", table);// 5、聚合查询
tenv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print();// +----+--------------------------------+-------------+ // | op | name | score |// +----+--------------------------------+-------------+// | +I | alanchan | 5 |// | +I | alan | 12 |// | -U | alan | 12 |// | +U | alan | 100 |// +----+--------------------------------+-------------+// 4 rows in set
env.execute();}publicstaticvoidmain(String[] args)throwsException{// test1();test2();}}
2)、toChangelogStream示例
下面的代码展示了如何将toChangelogStream用于不同的场景。
importstaticorg.apache.flink.table.api.Expressions.$;importstaticorg.apache.flink.table.api.Expressions.row;importjava.time.Instant;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.ProcessFunction;importorg.apache.flink.table.api.DataTypes;importorg.apache.flink.table.api.Schema;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.table.data.StringData;importorg.apache.flink.types.Row;importorg.apache.flink.util.Collector;/**
* @author alanchan
*
*/publicclassTestToChangelogStreamDemo{staticfinalStringSQL="CREATE TABLE GeneratedTable "+"("+" name STRING,"+" score INT,"+" event_time TIMESTAMP_LTZ(3),"+" WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"+")"+"WITH ('connector'='datagen')";//以最简单和最通用的方式转换为DataStream(无事件时间)publicstaticvoidtest1()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、构建数据源并聚合查询Table simpleTable = tenv
.fromValues(row("alan",12),row("alan",2),row("alanchan",12)).as("name","score").groupBy($("name")).select($("name"), $("score").sum());// 3、将table转成datastream,并输出
tenv
.toChangelogStream(simpleTable).executeAndCollect().forEachRemaining(System.out::println);// +I[alanchan, 12]// +I[alan, 12]// -U[alan, 12]// +U[alan, 14]
env.execute();}//以最简单和最通用的方式转换为DataStream(使用事件时间)//由于`event_time`是schema的单个时间属性,因此它默认设置为流记录的时间戳;同时,它仍然是Row的一部分publicstaticvoidtest2()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、建表并填入数据
tenv.executeSql(SQL);Table table = tenv.from("GeneratedTable");DataStream<Row> dataStream = tenv.toChangelogStream(table);
dataStream.process(newProcessFunction<Row,Void>(){@OverridepublicvoidprocessElement(Row row,Context ctx,Collector<Void> out){System.out.println(row.getFieldNames(true));// [name, score, event_time]// timestamp exists twiceassert ctx.timestamp()== row.<Instant>getFieldAs("event_time").toEpochMilli();}});
env.execute();}//转换为DataStream,但将time属性写出为元数据列,这意味着它不再是physical schema的一部分publicstaticvoidtest3()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、建表并填入数据
tenv.executeSql(SQL);Table table = tenv.from("GeneratedTable");DataStream<Row> dataStream = tenv.toChangelogStream(
table,Schema.newBuilder().column("name","STRING").column("score","INT").columnByMetadata("rowtime","TIMESTAMP_LTZ(3)").build());// the stream record's timestamp is defined by the metadata; it is not part of the Row
dataStream.process(newProcessFunction<Row,Void>(){@OverridepublicvoidprocessElement(Row row,Context ctx,Collector<Void> out){// prints: [name, score]System.out.println(row.getFieldNames(true));// timestamp exists onceSystem.out.println(ctx.timestamp());}});
env.execute();}//可以使用更多的内部数据结构以提高效率//这里提到这只是为了完整性,因为使用内部数据结构增加了复杂性和额外的类型处理//将TIMESTAMP_LTZ列转换为`Long`或将STRING转换为`byte[]`可能很方便,如果需要,结构化类型也可以表示为`Row`publicstaticvoidtest4()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、建表并填入数据
tenv.executeSql(SQL);Table table = tenv.from("GeneratedTable");DataStream<Row> dataStream = tenv.toChangelogStream(
table,Schema.newBuilder().column("name",DataTypes.STRING().bridgedTo(StringData.class)).column("score",DataTypes.INT()).column("event_time",DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class)).build());
dataStream.print();// 12> +I[1b6717eb5d93058ac3b40458a8a549a5e2fbb3b0fa146b36b7c58b5ebc1606cfc26ff9e4ebc3277832b9a8a0bfa1451d6608, 836085755, 1699941384531]// 9> +I[6169d2f3a4766f5fce51cba66ccd33772ab72a690381563426417c75766f99de8b1fd5c3c7fc5ec48954df9299456f433fa9, -766105729, 1699941384531]// 10> +I[e5a815e53d8fdf91b9382d7b15b6c076c5449e27b7ce505520c4334aba227d9a2fefd3333b2609704334b6fb866c244cf03d, 1552621997, 1699941384531]
env.execute();}publicstaticvoidmain(String[] args)throwsException{// test1();// test2();// test3();test4();}}
示例test4()中数据类型支持哪些转换的更多信息,请参阅table API的数据类型页面。
toChangelogStream(Table).executeAndCollect()的行为等于调用Table.execute().collect()。然而,toChangelogStream(表)对于测试可能更有用,因为它允许访问DataStream API中后续ProcessFunction中生成的水印。
7、Adding Table API Pipelines to DataStream API 示例
单个Flink作业可以由多个相邻运行的断开连接的管道组成。
Table API中定义的Source-to-sink管道可以作为一个整体附加到StreamExecutionEnvironment,并在调用DataStream API中的某个执行方法时提交。
源不一定是table source,也可以是以前转换为Table API的另一个DataStream管道。因此,可以将 table sinks用于DataStream API程序。
通过使用StreamTableEnvironment.createStatementSet()创建的专用StreamStatementSet实例可以使用该功能。通过使用语句集,planner 可以一起优化所有添加的语句,并在调用StreamStatement set.attachAsDataStream()时提供一个或多个添加到StreamExecutionEnvironment的端到端管道( end-to-end pipelines)。
下面的示例演示如何将表程序添加到一个作业中的DataStream API程序。
importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.DiscardingSink;importorg.apache.flink.table.api.DataTypes;importorg.apache.flink.table.api.Schema;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.TableDescriptor;importorg.apache.flink.table.api.bridge.java.StreamStatementSet;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;/**
* @author alanchan
*
*/publicclassTestTablePipelinesToDataStreamDemo{/**
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);StreamStatementSet statementSet = tenv.createStatementSet();// 建立数据源TableDescriptor sourceDescriptor =TableDescriptor.forConnector("datagen").option("number-of-rows","3").schema(Schema.newBuilder().column("myCol",DataTypes.INT()).column("myOtherCol",DataTypes.BOOLEAN()).build()).build();// 建立sinkTableDescriptor sinkDescriptor =TableDescriptor.forConnector("print").build();// add a pure Table API pipelineTable tableFromSource = tenv.from(sourceDescriptor);
statementSet.add(tableFromSource.insertInto(sinkDescriptor));// use table sinks for the DataStream API pipelineDataStream<Integer> dataStream = env.fromElements(1,2,3);Table tableFromStream = tenv.fromDataStream(dataStream);
statementSet.add(tableFromStream.insertInto(sinkDescriptor));// attach both pipelines to StreamExecutionEnvironment (the statement set will be cleared after calling this method)
statementSet.attachAsDataStream();// define other DataStream API parts
env.fromElements(4,5,6).addSink(newDiscardingSink<>());// use DataStream API to submit the pipelines
env.execute();// 1> +I[287849559, true]// +I[1]// +I[2]// +I[3]// 3> +I[-1058230612, false]// 2> +I[-995481497, false]}}
8、 TypeInformation 和 DataType 转换
DataStream API使用org.apache.flink.api.common.typeinfo.TypeInformation的实例来描述在流中传输的记录类型。特别是,它定义了如何将记录从一个DataStream操作符序列化和反序列化到另一个。它还可以帮助将状态序列化为savepoints and checkpoints。
Table API使用自定义数据结构在内部表示记录,并向用户暴露org.apache.flink.table.types.DataType,以声明数据结构转换为的外部格式,以便在 sources, sinks, UDFs, or DataStream API中更容易使用。
DataType比TypeInformation更丰富,因为它还包括有关逻辑SQL类型的详细信息。因此,在转换期间将隐式添加一些细节。
表的列名和类型自动从DataStream的TypeInformation派生。使用DataStream.getType()检查是否已通过DataStream API的反射类型提取工具正确检测到类型信息。如果最外层记录的TypeInformation是CompositeType,则在派生 table’s schema时,它将在第一级被展平(flattened )。
DataStream API并不总是能够基于反射提取更特定的TypeInformation。这通常是默默进行的,并转换成由通用Kryo序列化器支持的GenericTypeInfo。
例如,不能反射地分析Row类,并且始终需要显式类型信息声明。如果在DataStream API中没有声明适当的类型信息,则该行将显示为原始数据类型,并且table API无法访问其字段。在Java中使用.map(…).returns(TypeInformation)来显式声明类型信息。
1)、TypeInformation to DataType
将TypeInformation转换为DataType时适用以下规则:
- TypeInformation的所有子类都映射到逻辑类型,包括与Flink的内置序列化器对齐(aligned)的为空性(nullability )。
- TupleTypeInfoBase的子类被转换为行(用于row)或结构化类型(用于tuples、POJO和case类)。
- 默认情况下,BigDecimal转换为DECIMAL(38,18)。
- PojoTypeInfo字段的顺序由构造函数确定,所有字段都作为其参数。如果在转换过程中未找到,则字段顺序将按字母顺序排列。
- 不能表示为列出的org.apache.flink.table.api.DataTypes之一的GenericTypeInfo和其他TypeInformation将被视为黑盒原始类型。当前会话配置用于具体化原始类型的序列化程序(materialize the serializer of the raw type)。然后将无法访问复合嵌套字段。
- 有关完整的转换逻辑,请参阅TypeInfoDataTypeConverter.java 源码。
使用DataTypes.of(TypeInformation)在自定义schema 声明或UDF中调用上述逻辑。
2)、DataType to TypeInformation
表运行时将确保正确地将输出记录序列化到DataStream API的第一个运算符。
需要考虑DataStream API的类型信息语义。
9、Legacy Conversion旧版转换
以下部分介绍了API中将在未来版本中删除的过时部分。
特别是,这些部分可能没有很好地集成到最近的许多新功能和重构中。
1)、将 DataStream 转换成表
DataStream 可以直接转换为 StreamTableEnvironment 中的 Table。 结果视图的架构取决于注册集合的数据类型。
importstaticorg.apache.flink.table.api.Expressions.$;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.types.Row;....publicstaticvoidtestDataStreamToTable()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);DataStream<Row> dataStream = env.fromElements(Row.of("alan",18),Row.of("alanchan",19),Row.of("alanchanchn",20),Row.of("alan",20));Table table = tenv.fromDataStream(dataStream, $("name"), $("age"));// table.execute().print();// +----+--------------------------------+-------------+// | op | name | age |// +----+--------------------------------+-------------+// | +I | alan | 18 |// | +I | alanchan | 19 |// | +I | alanchanchn | 20 |// | +I | alan | 20 |// +----+--------------------------------+-------------+// 4 rows in setDataStream<Tuple2<String,Integer>> dataStream2 = env.fromElements(Tuple2.of("alan",18),Tuple2.of("alanchan",19),Tuple2.of("alanchanchn",20),Tuple2.of("alan",20));Table table2 = tenv.fromDataStream(dataStream2,$("name"),$("age"));
table2.execute().print();// +----+--------------------------------+-------------+// | op | name | age |// +----+--------------------------------+-------------+// | +I | alan | 18 |// | +I | alanchan | 19 |// | +I | alanchanchn | 20 |// | +I | alan | 20 |// +----+--------------------------------+-------------+// 4 rows in set
env.execute();}
2)、将表转换成 DataStream
Table 可以被转换成 DataStream。 通过这种方式,定制的 DataStream 程序就可以在 Table API 或者 SQL 的查询结果上运行了。
将 Table 转换为 DataStream 时,你需要指定生成的 DataStream 的数据类型,即,Table 的每行数据要转换成的数据类型。 通常最方便的选择是转换成 Row 。 以下列表概述了不同选项的功能:
- Row: 字段按位置映射,字段数量任意,支持 null 值,无类型安全(type-safe)检查。
- POJO: 字段按名称映射(POJO 必须按Table 中字段名称命名),字段数量任意,支持 null 值,无类型安全检查。
- Case Class: 字段按位置映射,不支持 null 值,有类型安全检查。
- Tuple: 字段按位置映射,字段数量少于 22(Scala)或者 25(Java),不支持 null 值,无类型安全检查。
- Atomic Type: Table 必须有一个字段,不支持 null 值,有类型安全检查。
流式查询(streaming query)的结果表会动态更新,即,当新纪录到达查询的输入流时,查询结果会改变。因此,像这样将动态查询结果转换成 DataStream 需要对表的更新方式进行编码。
将 Table 转换为 DataStream 有两种模式:
- Append Mode: 仅当动态 Table 仅通过INSERT更改进行修改时,才可以使用此模式,即,它仅是追加操作,并且之前输出的结果永远不会更新。
- Retract Mode: 任何情形都可以使用此模式。它使用 boolean 值对 INSERT 和 DELETE 操作的数据进行标记。
importstaticorg.apache.flink.table.api.Expressions.$;importstaticorg.apache.flink.table.api.Expressions.row;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.typeutils.TupleTypeInfo;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.DataTypes;//import org.apache.flink.table.api.DataTypes;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.types.Row;importlombok.Data;/**
* @author alanchan
*
*/publicclassTestLegacyConversionDataStreamAndTableDemo{@DatapublicstaticclassUser{privateString name;privateint age;}publicstaticvoidtestTableToDataStream()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// Table table = tenv.fromValues(// DataTypes.Row(// DataTypes.FIELD("name", DataTypes.STRING()),// DataTypes.FIELD("age", DataTypes.INT()),// row("john", 35),// row("sarah", 32)));Table table = tenv.fromValues(DataTypes.ROW(DataTypes.FIELD("name",DataTypes.STRING()),DataTypes.FIELD("age",DataTypes.INT())),row("alan",18),row("alanchan",19),row("alanchanchn",20));// Convert the Table into an append DataStream of Row by specifying the classDataStream<Row> dsRow = tenv.toAppendStream(table,Row.class);// dsRow.print();// 1> +I[alanchanchn, 20]// 15> +I[alan, 18]// 16> +I[alanchan, 19]// Convert the Table into an append DataStream of Tuple2<String, Integer> with TypeInformationTupleTypeInfo<Tuple2<String,Integer>> tupleType =newTupleTypeInfo<>(Types.STRING,Types.INT);DataStream<Tuple2<String,Integer>> dsTuple = tenv.toAppendStream(table, tupleType);// dsTuple.print();// 3> (alanchan,19)// 2> (alan,18)// 4> (alanchanchn,20)// Convert the Table into a retract DataStream of Row.// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE.DataStream<Tuple2<Boolean,Row>> retractStream = tenv.toRetractStream(table,Row.class);// retractStream.print();// 10> (true,+I[alan, 18])// 8> (true,+I[alanchan, 19])// 9> (true,+I[alanchanchn, 20])DataStream<User> users = tenv.toAppendStream(table,User.class);
users.print();// 7> TestLegacyConversionDataStreamAndTableDemo.User(name=alan, age=18)// 8> TestLegacyConversionDataStreamAndTableDemo.User(name=alanchan, age=19)// 9> TestLegacyConversionDataStreamAndTableDemo.User(name=alanchanchn, age=20)
env.execute();}publicstaticvoidmain(String[] args)throwsException{testTableToDataStream();}}
一旦 Table 被转化为 DataStream,必须使用 StreamExecutionEnvironment 的 execute 方法执行该 DataStream 作业。
3)、数据类型到 Table Schema 的映射
Flink 的 DataStream API 支持多样的数据类型。 例如 Tuple(Scala 内置,Flink Java tuple 和 Python tuples)、POJO 类型、Scala case class 类型以及 Flink 的 Row 类型等允许嵌套且有多个可在表的表达式中访问的字段的复合数据类型。其他类型被视为原子类型。下面,我们讨论 Table API 如何将这些数据类型类型转换为内部 row 表示形式,并提供将 DataStream 转换成 Table 的样例。
数据类型到 table schema 的映射有两种方式:基于字段位置或基于字段名称。
- 基于位置映射介绍及示例
基于位置的映射可在保持字段顺序的同时为字段提供更有意义的名称。这种映射方式可用于具有特定的字段顺序的复合数据类型以及原子类型。如 tuple、row 以及 case class 这些复合数据类型都有这样的字段顺序。然而,POJO 类型的字段则必须通过名称映射。可以将字段投影出来,但不能使用as(Java 和 Scala) 或者 alias(Python)重命名。
定义基于位置的映射时,输入数据类型中一定不能存在指定的名称,否则 API 会假定应该基于字段名称进行映射。如果未指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。
importstaticorg.apache.flink.table.api.Expressions.$;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;......publicstaticvoidtestDataStreamToTableByPosition()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);DataStream<Tuple2<String,Integer>> dataStream2 = env.fromElements(Tuple2.of("alan",18),Tuple2.of("alanchan",19),Tuple2.of("alanchanchn",20),Tuple2.of("alan",20));Table table = tenv.fromDataStream(dataStream2, $("name"));
table.execute().print();// +----+--------------------------------+// | op | name |// +----+--------------------------------+// | +I | alan |// | +I | alanchan |// | +I | alanchanchn |// | +I | alan |// +----+--------------------------------+// 4 rows in setTable table2 = tenv.fromDataStream(dataStream2, $("name"), $("age"));
table2.execute().print();// +----+--------------------------------+-------------+// | op | name | age |// +----+--------------------------------+-------------+// | +I | alan | 18 |// | +I | alanchan | 19 |// | +I | alanchanchn | 20 |// | +I | alan | 20 |// +----+--------------------------------+-------------+// 4 rows in set
env.execute();}
- 基于字段名称介绍及示例
基于名称的映射适用于任何数据类型包括 POJO 类型。这是定义 table schema 映射最灵活的方式。映射中的所有字段均按名称引用,并且可以通过 as 重命名。字段可以被重新排序和映射。
若果没有指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。
importstaticorg.apache.flink.table.api.Expressions.$;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;..............publicstaticvoidtestDataStreamToTableByName()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(Tuple2.of("alan",18),Tuple2.of("alanchan",19),Tuple2.of("alanchanchn",20),Tuple2.of("alan",20));// convert DataStream into Table with field "f1" onlyTable table = tenv.fromDataStream(dataStream, $("f1"));
table.execute().print();// +----+-------------+// | op | f1 |// +----+-------------+// | +I | 18 |// | +I | 19 |// | +I | 20 |// | +I | 20 |// +----+-------------+// 4 rows in set// convert DataStream into Table with swapped fieldsTable table2 = tenv.fromDataStream(dataStream, $("f1"), $("f0"));
table2.execute().print();// +----+-------------+--------------------------------+// | op | f1 | f0 |// +----+-------------+--------------------------------+// | +I | 18 | alan |// | +I | 19 | alanchan |// | +I | 20 | alanchanchn |// | +I | 20 | alan |// +----+-------------+--------------------------------+// 4 rows in set// convert DataStream into Table with swapped fields and field names "name" and "age"Table table3 = tenv.fromDataStream(dataStream, $("f1").as("name"), $("f0").as("age"));
table3.execute().print();// +----+-------------+--------------------------------+// | op | name | age |// +----+-------------+--------------------------------+// | +I | 18 | alan |// | +I | 19 | alanchan |// | +I | 20 | alanchanchn |// | +I | 20 | alan |// +----+-------------+--------------------------------+// 4 rows in set
env.execute();}
1、原子类型映射介绍及示例
Flink 将基础数据类型(Integer、Double、String)或者通用数据类型(不可再拆分的数据类型)视为原子类型。 原子类型的 DataStream 会被转换成只有一条属性的 Table。 属性的数据类型可以由原子类型推断出,还可以重新命名属性。
importstaticorg.apache.flink.table.api.Expressions.$;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;..................publicstaticvoidtest1()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);DataStream<String> dataStream = env.fromElements("alan","alanchan","alanchanchn");// Convert DataStream into Table with field name "myName"Table table = tenv.fromDataStream(dataStream, $("myName"));
table.execute().print();// +----+--------------------------------+// | op | myName |// +----+--------------------------------+// | +I | alan |// | +I | alanchan |// | +I | alanchanchn |// +----+--------------------------------+// 3 rows in set
env.execute();}
2、Tuple类型和 Case Class类型映射介绍及示例
Flink 支持 Scala 的内置 tuple 类型并给 Java 提供自己的 tuple 类型。 两种 tuple 的 DataStream 都能被转换成表。 可以通过提供所有字段名称来重命名字段(基于位置映射)。 如果没有指明任何字段名称,则会使用默认的字段名称。 如果引用了原始字段名称(对于 Flink tuple 为f0、f1 … …,对于 Scala tuple 为_1、_2 … …),则 API 会假定映射是基于名称的而不是基于位置的。 基于名称的映射可以通过 as 对字段和投影进行重新排序。
importstaticorg.apache.flink.table.api.Expressions.$;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;/**
* @author alanchan
*
*/publicclassTestLegacyConversionDataStreamAndTableDemo2{publicstaticvoidtestDataStreamToTableByPosition()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);DataStream<Tuple2<String,Integer>> dataStream2 = env.fromElements(Tuple2.of("alan",18),Tuple2.of("alanchan",19),Tuple2.of("alanchanchn",20),Tuple2.of("alan",20));Table table = tenv.fromDataStream(dataStream2, $("name"));// table.execute().print();// +----+--------------------------------+// | op | name |// +----+--------------------------------+// | +I | alan |// | +I | alanchan |// | +I | alanchanchn |// | +I | alan |// +----+--------------------------------+// 4 rows in setTable table2 = tenv.fromDataStream(dataStream2, $("name"), $("age"));
table2.execute().print();// +----+--------------------------------+-------------+// | op | name | age |// +----+--------------------------------+-------------+// | +I | alan | 18 |// | +I | alanchan | 19 |// | +I | alanchanchn | 20 |// | +I | alan | 20 |// +----+--------------------------------+-------------+// 4 rows in set
env.execute();}publicstaticvoidtestDataStreamToTableByName()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(Tuple2.of("alan",18),Tuple2.of("alanchan",19),Tuple2.of("alanchanchn",20),Tuple2.of("alan",20));// convert DataStream into Table with field "f1" onlyTable table = tenv.fromDataStream(dataStream, $("f1"));
table.execute().print();// +----+-------------+// | op | f1 |// +----+-------------+// | +I | 18 |// | +I | 19 |// | +I | 20 |// | +I | 20 |// +----+-------------+// 4 rows in set// convert DataStream into Table with swapped fieldsTable table2 = tenv.fromDataStream(dataStream, $("f1"), $("f0"));
table2.execute().print();// +----+-------------+--------------------------------+// | op | f1 | f0 |// +----+-------------+--------------------------------+// | +I | 18 | alan |// | +I | 19 | alanchan |// | +I | 20 | alanchanchn |// | +I | 20 | alan |// +----+-------------+--------------------------------+// 4 rows in set// convert DataStream into Table with swapped fields and field names "name" and// "age"Table table3 = tenv.fromDataStream(dataStream, $("f1").as("name"), $("f0").as("age"));
table3.execute().print();// +----+-------------+--------------------------------+// | op | name | age |// +----+-------------+--------------------------------+// | +I | 18 | alan |// | +I | 19 | alanchan |// | +I | 20 | alanchanchn |// | +I | 20 | alan |// +----+-------------+--------------------------------+// 4 rows in set
env.execute();}publicstaticvoidmain(String[] args)throwsException{testDataStreamToTableByPosition();testDataStreamToTableByName();}}
3、POJO 类型映射介绍及示例
Flink 支持 POJO 类型作为复合类型。
在不指定字段名称的情况下将 POJO 类型的 DataStream 转换成 Table 时,将使用原始 POJO 类型字段的名称。名称映射需要原始名称,并且不能按位置进行。字段可以使用别名(带有 as 关键字)来重命名,重新排序和投影。
importstaticorg.apache.flink.table.api.Expressions.$;importjava.time.Instant;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;...............@NoArgsConstructor@AllArgsConstructor@DatapublicstaticclassUser{publicString name;publicInteger age;publicInstant event_time;}publicstaticvoidtest2()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 2、创建数据源DataStream<User> dataStream =
env.fromElements(newUser("alan",4,Instant.ofEpochMilli(1000)),newUser("alanchan",6,Instant.ofEpochMilli(1001)),newUser("alanchanchn",10,Instant.ofEpochMilli(1002)));// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)Table table = tenv.fromDataStream(dataStream, $("age").as("myAge"), $("name").as("myName"),$("event_time").as("eventTime"));// table.execute().print();// +----+-------------+--------------------------------+-------------------------+// | op | myAge | myName | eventTime |// +----+-------------+--------------------------------+-------------------------+// | +I | 4 | alan | 1970-01-01 08:00:01.000 |// | +I | 6 | alanchan | 1970-01-01 08:00:01.001 |// | +I | 10 | alanchanchn | 1970-01-01 08:00:01.002 |// +----+-------------+--------------------------------+-------------------------+// 3 rows in set// convert DataStream into Table with projected field "name" (name-based)Table table2 = tenv.fromDataStream(dataStream, $("name"));
table2.execute().print();// +----+--------------------------------+// | op | name |// +----+--------------------------------+// | +I | alan |// | +I | alanchan |// | +I | alanchanchn |// +----+--------------------------------+// 3 rows in set// convert DataStream into Table with projected and renamed field "myName" (name-based)Table table3 = tenv.fromDataStream(dataStream, $("name").as("myName"));
table3.execute().print();// +----+--------------------------------+// | op | myName |// +----+--------------------------------+// | +I | alan |// | +I | alanchan |// | +I | alanchanchn |// +----+--------------------------------+// 3 rows in set
env.execute();}
4、Row类型映射介绍及示例
Row 类型支持任意数量的字段以及具有 null 值的字段。字段名称可以通过 RowTypeInfo 指定,也可以在将 Row 的 DataStream 转换为 Table 时指定。 Row 类型的字段映射支持基于名称和基于位置两种方式。 字段可以通过提供所有字段的名称的方式重命名(基于位置映射)或者分别选择进行投影/排序/重命名(基于名称映射)。
官方示例好像有些错误,如果定义的是Row类型,在转换的时候,$(“name”).as(“myName”)是会报错的,因为row的字段名称只有f0、f1,所以不会有name。
importstaticorg.apache.flink.table.api.Expressions.$;importjava.time.Instant;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.types.Row;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;.....publicstaticvoidtest3()throwsException{// 1、创建运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);DataStream<Row> dataStream = env.fromElements(Row.of("alan",18),Row.of("alanchan",19),Row.of("alanchanchn",20),Row.of("alan",20));// Convert DataStream into Table with renamed field names "myName", "myAge"// (position-based)Table table = tenv.fromDataStream(dataStream, $("myName"), $("myAge"));// table.execute().print();// +----+--------------------------------+-------------+// | op | myName | myAge |// +----+--------------------------------+-------------+// | +I | alan | 18 |// | +I | alanchan | 19 |// | +I | alanchanchn | 20 |// | +I | alan | 20 |// +----+--------------------------------+-------------+// 4 rows in set// Convert DataStream into Table with renamed fields "myName", "myAge"// (name-based)Table table2 = tenv.fromDataStream(dataStream, $("f0").as("myName"), $("f1").as("myAge"));
table2.execute().print();// +----+--------------------------------+-------------+// | op | myName | myAge |// +----+--------------------------------+-------------+// | +I | alan | 18 |// | +I | alanchan | 19 |// | +I | alanchanchn | 20 |// | +I | alan | 20 |// +----+--------------------------------+-------------+// 4 rows in set// Convert DataStream into Table with projected field "name" (name-based)Table table3 = tenv.fromDataStream(dataStream, $("name"));// table3.execute().print();// +----+--------------------------------+// | op | name |// +----+--------------------------------+// | +I | alan |// | +I | alanchan |// | +I | alanchanchn |// | +I | alan |// +----+--------------------------------+// 4 rows in set// Convert DataStream into Table with projected and renamed field "myName"// (name-based)Table table4 = tenv.fromDataStream(dataStream, $("f0").as("myName"));
table4.execute().print();// +----+--------------------------------+// | op | myName |// +----+--------------------------------+// | +I | alan |// | +I | alanchan |// | +I | alanchanchn |// | +I | alan |// +----+--------------------------------+// 4 rows in set
env.execute();}
本文详细的介绍了table api 与datastream api的集成,分为9个部分进行说明,即概述、相互转换的三个入门示例、集成说明、批处理模式下的处理、insert-only的处理、changelog流处理、管道示例、TypeInformation与DataType的转换、旧版本table与datastream转换,并以可运行的示例进行说明。
本文是将本专栏中的三个部分合并成一个文章,文章较长。
其他三篇文章如下:
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
版权归原作者 一瓢一瓢的饮 alanchan 所有, 如有侵权,请联系我们删除。