0


【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文给出针对表字段的各种操作及验证。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文需要有kafka的运行环境。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、表的列操作

针对表的字段进行操作,具体示例如下,运行结果在源文件中。

importstaticorg.apache.flink.table.api.Expressions.$;importstaticorg.apache.flink.table.api.Expressions.row;importstaticorg.apache.flink.table.api.Expressions.and;importstaticorg.apache.flink.table.api.Expressions.concat;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.DataTypes;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.types.Row;/**
 * @author alanchan
 *
 */publicclassTestTableAPIOperationDemo{staticString sourceSql ="CREATE TABLE Alan_KafkaTable (\r\n"+"  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n"+"  `partition` BIGINT METADATA VIRTUAL,\r\n"+"  `offset` BIGINT METADATA VIRTUAL,\r\n"+"  `user_id` BIGINT,\r\n"+"  `item_id` BIGINT,\r\n"+"  `behavior` STRING\r\n"+") WITH (\r\n"+"  'connector' = 'kafka',\r\n"+"  'topic' = 'user_behavior',\r\n"+"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n"+"  'properties.group.id' = 'testGroup',\r\n"+"  'scan.startup.mode' = 'earliest-offset',\r\n"+"  'format' = 'csv'\r\n"+");";/**
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{//        test1();//        test2();test3();}staticvoidtest3()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 建表
        tenv.executeSql(sourceSql);Table table1 = tenv.from("Alan_KafkaTable");// 重命名字段。Table result = table1.as("a","b","c","d","e","f");DataStream<Tuple2<Boolean,Row>> resultDS = tenv.toRetractStream(result,Row.class);
        resultDS.print();//11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])//和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。Table table2 = result.where($("f").isEqual("login"));DataStream<Tuple2<Boolean,Row>> result2DS = tenv.toRetractStream(table2,Row.class);
        result2DS.print();//11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])Table table3 = result.where($("f").isNotEqual("login"));DataStream<Tuple2<Boolean,Row>> result3DS = tenv.toRetractStream(table3,Row.class);
        result3DS.print();// 没有匹配条件的记录,无输出Table table4 = result
                                    .filter(and(
                                                    $("f").isNotNull(),//                                                    $("d").isGreater(1)
                                                    $("e").isNotNull()));DataStream<Tuple2<Boolean,Row>> result4DS = tenv.toRetractStream(table4,Row.class);
        result4DS.print("test filter:");//test filter::11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
        
        env.execute();}/**
     * 和 SQL 查询中的 VALUES 子句类似。 基于提供的行生成一张内联表。
     * 
     * 你可以使用 row(...) 表达式创建复合行:
     * 
     * @throws Exception
     */staticvoidtest2()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);Table table = tenv.fromValues(row(1,"ABC"),row(2L,"ABCDE"));
        table.printSchema();//        (//                  `f0` BIGINT NOT NULL,//                  `f1` VARCHAR(5) NOT NULL//        )DataStream<Tuple2<Boolean,Row>> resultDS = tenv.toRetractStream(table,Row.class);
        resultDS.print();//        1> (true,+I[2, ABCDE])//        2> (true,+I[1, ABC])Table table2 = tenv.fromValues(DataTypes.ROW(DataTypes.FIELD("id",DataTypes.DECIMAL(10,2)),DataTypes.FIELD("name",DataTypes.STRING())),row(1,"ABCD"),row(2L,"ABCDEF"));
        table2.printSchema();//        (//                  `id` DECIMAL(10, 2),//                  `name` STRING//        )DataStream<Tuple2<Boolean,Row>> result2DS = tenv.toRetractStream(table2,Row.class);
        result2DS.print();//        15> (true,+I[2.00, ABCDEF])//        14> (true,+I[1.00, ABCD])
        env.execute();}/**
     * 和 SQL 查询的 FROM 子句类似。 执行一个注册过的表的扫描。
     * 
     * @throws Exception
     */staticvoidtest1()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 建表
        tenv.executeSql(sourceSql);// 查询//        tenv.from("Alan_KafkaTable").execute().print();// kafka输入数据// 1,1002,login// 应用程序控制台输出如下//        +----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+//        | op |              event_time |            partition |               offset |              user_id |              item_id |                       behavior |//        +----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+//        | +I | 2023-11-01 11:00:30.183 |                    0 |                    2 |                    1 |                 1002 |                          login |Table temp = tenv.from("Alan_KafkaTable");//和 SQL 的 SELECT 子句类似。 执行一个 select 操作Table result1 = temp.select($("user_id"), $("item_id").as("behavior"), $("event_time"));DataStream<Tuple2<Boolean,Row>> result1DS = tenv.toRetractStream(result1,Row.class);//        result1DS.print();// 11> (true,+I[1, 1002, 2023-11-01T11:00:30.183])//选择星号(*)作为通配符,select 表中的所有列。Table result2 = temp.select($("*"));DataStream<Tuple2<Boolean,Row>> result2DS = tenv.toRetractStream(result2,Row.class);
        result2DS.print();// 11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
        env.execute();}staticvoidtest5()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 建表
        tenv.executeSql(sourceSql);Table table = tenv.from("Alan_KafkaTable");//和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。Table result = table.groupBy($("user_id")).select($("user_id"), $("user_id").count().as("count(user_id)"));DataStream<Tuple2<Boolean,Row>> resultDS = tenv.toRetractStream(result,Row.class);
        resultDS.print();//        12> (true,+I[1, 1])
        
        env.execute();}staticvoidtest4()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);// 建表
        tenv.executeSql(sourceSql);Table table = tenv.from("Alan_KafkaTable");//执行字段添加操作。 如果所添加的字段已经存在,将抛出异常。Table result2 = table.addColumns($("behavior").plus(1).as("t_col1"));
        result2.printSchema();//        (//                  `event_time` TIMESTAMP(3),//                  `partition` BIGINT,//                  `offset` BIGINT,//                  `user_id` BIGINT,//                  `item_id` BIGINT,//                  `behavior` STRING,//                  `t_col1` STRING//                )Table result = table.addColumns($("behavior").plus(1).as("t_col3"),concat($("behavior"),"alanchan").as("t_col4"));
        result.printSchema();//        (//                  `event_time` TIMESTAMP(3),//                  `partition` BIGINT,//                  `offset` BIGINT,//                  `user_id` BIGINT,//                  `item_id` BIGINT,//                  `behavior` STRING,//                  `t_col3` STRING,//                  `t_col4` STRING//                )Table result3 = table.addColumns(concat($("behavior"),"alanchan").as("t_col4"));
        result3.printSchema();//        (//                  `event_time` TIMESTAMP(3),//                  `partition` BIGINT,//                  `offset` BIGINT,//                  `user_id` BIGINT,//                  `item_id` BIGINT,//                  `behavior` STRING,//                  `t_col4` STRING//                )//执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段。Table result4 = result3.addOrReplaceColumns(concat($("t_col4"),"alanchan").as("t_col"));
        result4.printSchema();//        (//                  `event_time` TIMESTAMP(3),//                  `partition` BIGINT,//                  `offset` BIGINT,//                  `user_id` BIGINT,//                  `item_id` BIGINT,//                  `behavior` STRING,//                  `t_col4` STRING,//                  `t_col` STRING//                )Table result5 = result4.dropColumns($("t_col4"), $("t_col"));
        result5.printSchema();//        (//                  `event_time` TIMESTAMP(3),//                  `partition` BIGINT,//                  `offset` BIGINT,//                  `user_id` BIGINT,//                  `item_id` BIGINT,//                  `behavior` STRING//                )//执行字段重命名操作。 字段表达式应该是别名表达式,并且仅当字段已存在时才能被重命名。Table result6 = result4.renameColumns($("t_col4").as("col1"), $("t_col").as("col2"));
        result6.printSchema();//        (//                  `event_time` TIMESTAMP(3),//                  `partition` BIGINT,//                  `offset` BIGINT,//                  `user_id` BIGINT,//                  `item_id` BIGINT,//                  `behavior` STRING,//                  `col1` STRING,//                  `col2` STRING//                )DataStream<Tuple2<Boolean,Row>> resultDS = tenv.toRetractStream(table,Row.class);
        resultDS.print();//        11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
        
        env.execute();}}

以上,本文给出针对表字段的各种操作及验证。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

标签: flink 大数据 kafka

本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/135059842
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。

“【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作”的评论:

还没有评论