0


【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文主要介绍Flink 的3种常用的operator(map、flatmap和filter)及以具体可运行示例进行说明.
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。

本专题分为五篇,即:
【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等
【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)

一、Flink的23种算子说明及示例

1、maven依赖

下文中所有示例都是用该maven依赖,除非有特殊说明的情况。

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version></dependency></dependencies>

2、java bean

下文所依赖的User如下

importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/**
 * @author alanchan
 *
 */@Data@AllArgsConstructor@NoArgsConstructorpublicclassUser{privateint id;privateString name;privateString pwd;privateString email;privateint age;privatedouble balance;}

3、map

[DataStream->DataStream]
这是最简单的转换之一,其中输入是一个数据流,输出的也是一个数据流。
在这里插入图片描述

importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.datastreamapi.User;/**
 * @author alanchan
 *
 */publicclassTestMapDemo{/**
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// source// transformationmapFunction5(env);// sink// execute
        env.execute();}// 构造一个list,然后将list中数字乘以2输出,内部匿名类实现publicstaticvoidmapFunction1(StreamExecutionEnvironment env)throwsException{List<Integer> data =newArrayList<Integer>();for(int i =1; i <=10; i++){
            data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);SingleOutputStreamOperator<Integer> sink = source.map(newMapFunction<Integer,Integer>(){@OverridepublicIntegermap(Integer inValue)throwsException{return inValue *2;}});

        sink.print();//        9> 12//        4> 2//        10> 14//        8> 10//        13> 20//        7> 8//        12> 18//        11> 16//        5> 4//        6> 6}// 构造一个list,然后将list中数字乘以2输出,lambda实现publicstaticvoidmapFunction2(StreamExecutionEnvironment env)throwsException{List<Integer> data =newArrayList<Integer>();for(int i =1; i <=10; i++){
            data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);SingleOutputStreamOperator<Integer> sink = source.map(i ->2* i);
        sink.print();//        3> 4//        4> 6//        9> 16//        7> 12//        10> 18//        2> 2//        6> 10//        5> 8//        8> 14//        11> 20}// 构造User数据源publicstaticDataStreamSource<User>source(StreamExecutionEnvironment env){DataStreamSource<User> source = env.fromCollection(Arrays.asList(newUser(1,"alan1","1","[email protected]",12,1000),newUser(2,"alan2","2","[email protected]",19,200),newUser(3,"alan1","3","[email protected]",28,1500),newUser(5,"alan1","5","[email protected]",15,500),newUser(4,"alan2","4","[email protected]",30,400)));return source;}// lambda实现用户对象的balance×2和age+5功能publicstaticSingleOutputStreamOperator<User>mapFunction3(StreamExecutionEnvironment env)throwsException{DataStreamSource<User> source =source(env);SingleOutputStreamOperator<User> sink = source.map((MapFunction<User,User>) user ->{User user2 = user;
            user2.setAge(user.getAge()+5);
            user2.setBalance(user.getBalance()*2);return user2;});
        sink.print();//        10> User(id=1, name=alan1, pwd=1, [email protected], age=17, balance=2000.0)//        14> User(id=4, name=alan2, pwd=4, [email protected], age=35, balance=800.0)//        11> User(id=2, name=alan2, pwd=2, [email protected], age=24, balance=400.0)//        12> User(id=3, name=alan1, pwd=3, [email protected], age=33, balance=3000.0)//        13> User(id=5, name=alan1, pwd=5, [email protected], age=20, balance=1000.0)return sink;}// lambda实现balance*2和age+5后,balance》=2000和age》=20的数据过滤出来publicstaticSingleOutputStreamOperator<User>mapFunction4(StreamExecutionEnvironment env)throwsException{SingleOutputStreamOperator<User> sink =mapFunction3(env).filter(user -> user.getBalance()>=2000&& user.getAge()>=20);
        sink.print();//        15> User(id=1, name=alan1, pwd=1, [email protected], age=17, balance=2000.0)//        1> User(id=3, name=alan1, pwd=3, [email protected], age=33, balance=3000.0)//        16> User(id=2, name=alan2, pwd=2, [email protected], age=24, balance=400.0)//        3> User(id=4, name=alan2, pwd=4, [email protected], age=35, balance=800.0)//        2> User(id=5, name=alan1, pwd=5, [email protected], age=20, balance=1000.0)//        1> User(id=3, name=alan1, pwd=3, [email protected], age=33, balance=3000.0)return sink;}// lambda实现balance*2和age+5后,balance》=2000和age》=20的数据过滤出来并通过flatmap收集publicstaticSingleOutputStreamOperator<User>mapFunction5(StreamExecutionEnvironment env)throwsException{SingleOutputStreamOperator<User> sink =mapFunction4(env).flatMap((FlatMapFunction<User,User>)(user, out)->{if(user.getBalance()>=3000){
                out.collect(user);}}).returns(User.class);

        sink.print();//        8> User(id=5, name=alan1, pwd=5, [email protected], age=20, balance=1000.0)//        7> User(id=3, name=alan1, pwd=3, [email protected], age=33, balance=3000.0)//        6> User(id=2, name=alan2, pwd=2, [email protected], age=24, balance=400.0)//        9> User(id=4, name=alan2, pwd=4, [email protected], age=35, balance=800.0)//        5> User(id=1, name=alan1, pwd=1, [email protected], age=17, balance=2000.0)//        7> User(id=3, name=alan1, pwd=3, [email protected], age=33, balance=3000.0)//        7> User(id=3, name=alan1, pwd=3, [email protected], age=33, balance=3000.0)return sink;}}

4、flatmap

[DataStream->DataStream]
FlatMap 采用一条记录并输出零个,一个或多个记录。将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果。
在这里插入图片描述

importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
 * @author alanchan
 *
 */publicclassTestFlatMapDemo{/**
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();flatMapFunction3(env);

        env.execute();}// 构造User数据源publicstaticDataStreamSource<String>source(StreamExecutionEnvironment env){List<String> info =newArrayList<>();
        info.add("i am alanchan");
        info.add("i like hadoop");
        info.add("i like flink");
        info.add("and you ?");DataStreamSource<String> dataSource = env.fromCollection(info);return dataSource;}// 将句子以空格进行分割-内部匿名类实现publicstaticvoidflatMapFunction1(StreamExecutionEnvironment env)throwsException{DataStreamSource<String> source =source(env);SingleOutputStreamOperator<String> sink = source.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String value,Collector<String> out)throwsException{String[] splits = value.split(" ");for(String split : splits){
                    out.collect(split);}}});
        sink.print();//        11> and//        10> i//        8> i//        9> i//        8> am//        10> like//        11> you//        10> flink//        8> alanchan//        9> like//        11> ?//        9> hadoop}// lambda实现publicstaticvoidflatMapFunction2(StreamExecutionEnvironment env)throwsException{DataStreamSource<String> source =source(env);SingleOutputStreamOperator<String> sink = source.flatMap((FlatMapFunction<String,String>)(input, out)->{String[] splits = input.split(" ");for(String split : splits){
                out.collect(split);}}).returns(String.class);

        sink.print();//        6> i//        8> and//        8> you//        8> ?//        5> i//        7> i//        5> am//        5> alanchan//        6> like//        7> like//        6> hadoop//        7> flink}// lambda实现publicstaticvoidflatMapFunction3(StreamExecutionEnvironment env)throwsException{DataStreamSource<String> source =source(env);SingleOutputStreamOperator<String> sink = source.flatMap((String input,Collector<String> out)->Arrays.stream(input.split(" ")).forEach(out::collect)).returns(String.class);

        sink.print();//        8> i//        11> and//        10> i//        9> i//        10> like//        11> you//        8> am//        11> ?//        10> flink//        9> like//        9> hadoop//        8> alanchan}}

5、Filter

DataStream → DataStream
Filter 函数根据条件判断出结果。按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素。
在这里插入图片描述

importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.datastreamapi.User;/**
 * @author alanchan
 *
 */publicclassTestFilterDemo{// 构造User数据源publicstaticDataStreamSource<User>sourceUser(StreamExecutionEnvironment env){DataStreamSource<User> source = env.fromCollection(Arrays.asList(newUser(1,"alan1","1","[email protected]",12,1000),newUser(2,"alan2","2","[email protected]",19,200),newUser(3,"alan1","3","[email protected]",28,1500),newUser(5,"alan1","5","[email protected]",15,500),newUser(4,"alan2","4","[email protected]",30,400)));return source;}// 构造User数据源publicstaticDataStreamSource<Integer>sourceList(StreamExecutionEnvironment env){List<Integer> data =newArrayList<Integer>();for(int i =1; i <=10; i++){
            data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);return source;}// 过滤出大于5的数字,内部匿名类publicstaticvoidfilterFunction1(StreamExecutionEnvironment env)throwsException{DataStream<Integer> source =sourceList(env);SingleOutputStreamOperator<Integer> sink = source.map(newMapFunction<Integer,Integer>(){publicIntegermap(Integer value)throwsException{return value +1;}}).filter(newFilterFunction<Integer>(){@Overridepublicbooleanfilter(Integer value)throwsException{return value >5;}});
        sink.print();//        1> 10//        14> 7//        16> 9//        13> 6//        2> 11//        15> 8}// lambda实现publicstaticvoidfilterFunction2(StreamExecutionEnvironment env)throwsException{DataStream<Integer> source =sourceList(env);SingleOutputStreamOperator<Integer> sink = source.map(i -> i +1).filter(value -> value >5);
        sink.print();//        12> 7//        15> 10//        11> 6//        13> 8//        14> 9//        16> 11}// 查询user id大于3的记录publicstaticvoidfilterFunction3(StreamExecutionEnvironment env)throwsException{DataStream<User> source =sourceUser(env);SingleOutputStreamOperator<User> sink = source.filter(user -> user.getId()>3);
        sink.print();//        14> User(id=5, name=alan1, pwd=5, [email protected], age=15, balance=500.0)//        15> User(id=4, name=alan2, pwd=4, [email protected], age=30, balance=400.0)}/**
     * @param args
     */publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();filterFunction3(env);
        env.execute();}}

本文主要介绍Flink 的3种常用的operator及以具体可运行示例进行说明。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为五篇,即:
【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等
【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)


本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/134781106
版权归原作者 一瓢一瓢的饮 alanchan 所有, 如有侵权,请联系我们删除。

“【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter”的评论:

还没有评论