Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
- 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
- 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
- 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
- 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
- 5、Flink 监控系列 本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
本文介绍了在flink中使用java lambda表达式写法示例,并且给出了使用与不使用lambda表达式的对比。
本文由于是在IDE中做的例子,不依赖外部环境。
本文分为3个部分,即示例代码的依赖、简单示例与限制和是否使用lambda对比。
本文的示例是在Flink 1.17版本中运行。
一、Java Lambda 表达式
Java 8 引入了几种新的语言特性,旨在实现更快、更清晰的编码。作为最重要的特性,即所谓的“Lambda 表达式”,它开启了函数式编程的大门。Lambda 表达式允许以简捷的方式实现和传递函数,而无需声明额外的(匿名)类。
Flink 支持对 Java API 的所有算子使用 Lambda 表达式,但是,当 Lambda 表达式使用 Java 泛型时,需要 显式 地声明类型信息。
本文档介绍如何使用 Lambda 表达式并描述了其(Lambda 表达式)当前的限制。
有关 Flink API 的通用介绍,请参阅 DataStream API 编程指南。链接如下:
48、Flink DataStream API 编程指南(1)- DataStream 入门示例
48、Flink DataStream API 编程指南(2)- DataStream的source、transformation、sink、调试
48、Flink DataStream API 编程指南(3)- 完整版
1、maven依赖
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency></dependencies>
2、示例和限制
下面的这个示例演示了如何实现一个简单的内联 map() 函数,它使用 Lambda 表达式计算输入值的平方。
不需要声明 map() 函数的输入 i 和输出参数的数据类型,因为 Java 编译器会对它们做出推断。
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author alanchan
*
*/publicclassTestLambdaDemo{publicstaticvoidtest1()throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// source// transformation// sink
env.fromElements(1,2,3)// 返回 i 的平方.map(i -> i * i).print("平方结果:");// execute
env.execute();}publicstaticvoidmain(String[] args)throwsException{test1();}}//运行结果
平方结果::5>9
平方结果::4>4
平方结果::3>1
由于 OUT 是 Integer 而不是泛型,所以 Flink 可以从方法签名 OUT map(IN value) 的实现中自动提取出结果的类型信息。
像 flatMap() 这样的函数,它的签名 void flatMap(IN value, Collector out) 被 Java 编译器编译为 void flatMap(IN value, Collector out)。这样 Flink 就无法自动推断输出的类型信息了。
Flink 很可能抛出如下异常:
org.apache.flink.api.common.functions.InvalidTypesException:The generic type parameters of 'Collector' are missing.
In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
An easy workaround is touse an (anonymous)class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
Otherwise the type has tobe specified explicitly using type information.
在这种情况下,需要 显式 指定类型信息,否则输出将被视为 Object 类型,这会导致低效的序列化。
importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
* @author alanchan
*
*/publicclassTestLambdaDemo{publicstaticvoidtest2()throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// sourceDataStreamSource<Integer> input = env.fromElements(1,2,3);// transformation// 必须声明 collector 类型
input.flatMap((Integer number,Collector<String> out)->{StringBuilder builder =newStringBuilder();for(int i =0; i < number; i++){
builder.append("a");
out.collect(builder.toString());}})// sink// 显式提供类型信息.returns(Types.STRING).print();// execute
env.execute();}publicstaticvoidmain(String[] args)throwsException{test2();}}//运行结果3> a
4> a
5> a
4> aa
5> aa
5> aaa
当使用 map() 函数返回泛型类型的时候也会发生类似的问题。下面示例中的方法签名 Tuple2<Integer, Integer> map(Integer value) 被擦除为 Tuple2 map(Integer value)。
publicstaticvoidtest3()throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1,2,3).map(i ->Tuple2.of(i, i))// 没有关于 Tuple2 字段的信息.print();// execute
env.execute();}
该方法出现的异常如下:
Exception in thread "main"org.apache.flink.api.common.functions.InvalidTypesException:Thereturn type of function 'test3(TestLambdaDemo.java:59)' could not be determined automatically, due totypeerasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:543)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1237)
at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:937)
at org.tablesql.filesystem.TestLambdaDemo.test3(TestLambdaDemo.java:60)
at org.tablesql.filesystem.TestLambdaDemo.main(TestLambdaDemo.java:66)Caused by:org.apache.flink.api.common.functions.InvalidTypesException:The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is touse an (anonymous)class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has tobe specified explicitly using type information.
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:568)
at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:154)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:575)
at org.tablesql.filesystem.TestLambdaDemo.test3(TestLambdaDemo.java:59)...1 more
一般来说,这些问题可以通过多种方式解决,比如下面四种方式:
importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author alanchan
*
*/publicclassTestLambdaDemo2{publicstaticvoidtest1()throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 使用显式的 ".returns(...)"
env.fromElements(1,2,3).map(i ->Tuple2.of(i, i)).returns(Types.TUPLE(Types.INT,Types.INT)).print();// execute
env.execute();}publicstaticvoidtest2()throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 使用类来替代
env.fromElements(1,2,3).map(newAlanMapper()).print();// execute
env.execute();}publicstaticclassAlanMapperimplementsMapFunction<Integer,Tuple2<Integer,Integer>>{@OverridepublicTuple2<Integer,Integer>map(Integer i){returnTuple2.of(i, i);}}publicstaticvoidtest3()throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 使用匿名类来替代
env.fromElements(1,2,3).map(newMapFunction<Integer,Tuple2<Integer,Integer>>(){@OverridepublicTuple2<Integer,Integer>map(Integer value)throwsException{returnTuple2.of(value, value);}}).print();// execute
env.execute();}publicstaticvoidtest4()throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 也可以像这个示例中使用 Tuple 的子类来替代
env.fromElements(1,2,3).map(i ->newAlanDoubleTuple(i, i)).print();// execute
env.execute();}publicstaticclassAlanDoubleTupleextendsTuple2<Integer,Integer>{// 默认的构造函数是必须的publicAlanDoubleTuple(){}publicAlanDoubleTuple(int f0,int f1){this.f0 = f0;this.f1 = f1;}}publicstaticvoidmain(String[] args)throwsException{test1();test2();test3();test4();}}
3、使用非lambda与lambda的比较实现
下面的示例主要是对比通过lambda与非lambda的实现方式差异,功能一样,至于个人爱好随意。
importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/**
* @author alanchan
*
*/@Data@AllArgsConstructor@NoArgsConstructorpublicclassUser{privateint id;privateString name;privateString pwd;privateString email;privateint age;privatedouble balance;}importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.Partitioner;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author alanchan
*
*/publicclassTransformationMapDemo{publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// source// transformationmapPartitionFunction8(env);// sink// execute
env.execute();}// 构造一个list,然后将list中数字乘以2输出,内部匿名类实现publicstaticvoidmapFunction1(StreamExecutionEnvironment env)throwsException{List<Integer> data =newArrayList<Integer>();for(int i =1; i <=10; i++){
data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);SingleOutputStreamOperator<Integer> sink = source.map(newMapFunction<Integer,Integer>(){@OverridepublicIntegermap(Integer inValue)throwsException{return inValue *2;}});// source.print();
sink.print();}// 构造一个list,然后将list中数字乘以2输出,lambda实现publicstaticvoidmapFunction2(StreamExecutionEnvironment env)throwsException{List<Integer> data =newArrayList<Integer>();for(int i =1; i <=10; i++){
data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);SingleOutputStreamOperator<Integer> sink = source.map(i ->2* i);
sink.print();}// 构造User数据源publicstaticDataStreamSource<User>source(StreamExecutionEnvironment env){DataStreamSource<User> source = env.fromCollection(Arrays.asList(newUser(1,"alan1","1","[email protected]",12,1000),newUser(2,"alan2","2","[email protected]",19,200),newUser(3,"alan1","3","[email protected]",28,1500),newUser(5,"alan1","5","[email protected]",15,500),newUser(4,"alan2","4","[email protected]",30,400)));return source;}// lambda实现用户对象的balance×2和age+5功能publicstaticSingleOutputStreamOperator<User>mapFunction3(StreamExecutionEnvironment env)throwsException{DataStreamSource<User> source =source(env);SingleOutputStreamOperator<User> sink = source.map((MapFunction<User,User>) user ->{User user2 = user;
user2.setAge(user.getAge()+5);
user2.setBalance(user.getBalance()*2);return user2;});
sink.print();return sink;}// lambda实现balance*2和age+5后,balance》=2000和age》=20的数据过滤出来publicstaticSingleOutputStreamOperator<User>mapFunction4(StreamExecutionEnvironment env)throwsException{SingleOutputStreamOperator<User> sink =mapFunction3(env).filter(user -> user.getBalance()>=2000&& user.getAge()>=20);
sink.print();return sink;}// lambda实现balance*2和age+5后,balance》=2000和age》=20的数据过滤出来并通过flatmap收集publicstaticSingleOutputStreamOperator<User>mapFunction5(StreamExecutionEnvironment env)throwsException{SingleOutputStreamOperator<User> sink =mapFunction4(env).flatMap((FlatMapFunction<User,User>)(user, out)->{if(user.getBalance()>=3000){
out.collect(user);}}).returns(User.class);
sink.print();return sink;}// 数据分区示例publicstaticvoidmapPartitionFunction6(StreamExecutionEnvironment env)throwsException{DataStreamSource<User> source =source(env);DataStream<User> userTemp = source.map(user ->{User user2 = user;
user2.setAge(user.getAge()+5);return user2;}).returns(User.class);// public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {// return setConnectionType(new CustomPartitionerWrapper<>(clean(partitioner),// clean(keySelector)));// }DataStream<User> sink = userTemp.partitionCustom(newPartitioner<Integer>(){publicintpartition(Integer key,int numPartitions){System.out.println("分区数:"+ numPartitions);if(key <20)
numPartitions =0;elseif(key >=20&& key <30)
numPartitions =1;elseif(key >=0)
numPartitions =2;System.out.println("分区数2:"+ numPartitions);return numPartitions;}},newKeySelector<User,Integer>(){@OverridepublicIntegergetKey(User value)throwsException{return value.getAge();}});
sink.map((MapFunction<User,User>) user ->{System.out.println("当前线程ID:"+Thread.currentThread().getId()+",user:"+ user.toString());return user;}).returns(User.class);// System.out.println("并行数:" + sink.getParallelism());// 输出结果,3个区,按照年龄分的// 当前线程ID:138,user:User(id=3, name=alan1, pwd=3, [email protected], age=33, balance=1500.0)// 当前线程ID:136,user:User(id=1, name=alan1, pwd=1, [email protected], age=17, balance=1000.0)// 当前线程ID:138,user:User(id=4, name=alan2, pwd=4, [email protected], age=35, balance=400.0)// 当前线程ID:140,user:User(id=2, name=alan2, pwd=2, [email protected], age=24, balance=200.0)// 当前线程ID:140,user:User(id=5, name=alan1, pwd=5, [email protected], age=20, balance=500.0)
sink.print();}// lambda数据分区示例publicstaticvoidmapPartitionFunction7(StreamExecutionEnvironment env)throwsException{DataStreamSource<User> source =source(env);DataStream<User> userTemp = source.map(user ->{User user2 = user;
user2.setAge(user.getAge()+5);return user2;}).returns(User.class);DataStream<User> sink = userTemp.partitionCustom((key, numPartitions)->{if(key <20)
numPartitions =0;elseif(key >=20&& key <30)
numPartitions =1;elseif(key >=0)
numPartitions =2;return numPartitions;}, user -> user.getAge());
sink.print();}//按照用户id的奇数和偶数进行分区,如果id=1是单独分区publicstaticvoidmapPartitionFunction8(StreamExecutionEnvironment env)throwsException{DataStreamSource<User> source =source(env);DataStream<User> sink = source.partitionCustom(newCusPartitioner(), user -> user.getId());// 示例分区过程,输出结果如下// 1> User(id=2, name=alan2, pwd=2, [email protected], age=19, balance=200.0)// 当前线程ID:90,user:User(id=1, name=alan1, pwd=1, [email protected], age=12, balance=1000.0)// 当前线程ID:89,user:User(id=3, name=alan1, pwd=3, [email protected], age=28, balance=1500.0)// 2> User(id=3, name=alan1, pwd=3, [email protected], age=28, balance=1500.0)// 当前线程ID:88,user:User(id=2, name=alan2, pwd=2, [email protected], age=19, balance=200.0)// 当前线程ID:89,user:User(id=5, name=alan1, pwd=5, [email protected], age=15, balance=500.0)// 1> User(id=4, name=alan2, pwd=4, [email protected], age=30, balance=400.0)// 3> User(id=1, name=alan1, pwd=1, [email protected], age=12, balance=1000.0)// 当前线程ID:88,user:User(id=4, name=alan2, pwd=4, [email protected], age=30, balance=400.0)// 2> User(id=5, name=alan1, pwd=5, [email protected], age=15, balance=500.0)
sink.map((MapFunction<User,User>) user ->{System.out.println("当前线程ID:"+Thread.currentThread().getId()+",user:"+ user.toString());return user;}).returns(User.class);
sink.print();}publicstaticclassCusPartitionerimplementsPartitioner<Integer>{@Overridepublicintpartition(Integer key,int numPartitions){if(key ==1)
numPartitions =2;elseif(key %2==0){
numPartitions =0;}else{
numPartitions =1;}return numPartitions;}}}
以上,本文介绍了在flink中使用java lambda表达式写法示例,并且给出了使用与不使用lambda表达式的对比。
版权归原作者 一瓢一瓢的饮 alanchan 所有, 如有侵权,请联系我们删除。