0


【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文主要介绍Flink 的10种常用的operator(window、distinct、join等)及以具体可运行示例进行说明.
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。

本专题分为五篇,即:
【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等
【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)

一、Flink的23种算子说明及示例

本文示例中使用的maven依赖和java bean 参考本专题的第一篇中的maven和java bean。

9、first、distinct、join、outjoin、cross

具体事例详见例子及结果。

  1. importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.JoinFunction;importorg.apache.flink.api.common.operators.Order;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.operators.DataSource;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;importorg.datastreamapi.User;/**
  2. * @author alanchan
  3. *
  4. */publicclassTestFirst_Join_Distinct_OutJoin_CrossDemo{publicstaticvoidmain(String[] args)throwsException{ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();joinFunction(env);
  5. env.execute();}publicstaticvoidunionFunction(StreamExecutionEnvironment env)throwsException{List<String> info1 =newArrayList<>();
  6. info1.add("team A");
  7. info1.add("team B");List<String> info2 =newArrayList<>();
  8. info2.add("team C");
  9. info2.add("team D");List<String> info3 =newArrayList<>();
  10. info3.add("team E");
  11. info3.add("team F");List<String> info4 =newArrayList<>();
  12. info4.add("team G");
  13. info4.add("team H");DataStream<String> source1 = env.fromCollection(info1);DataStream<String> source2 = env.fromCollection(info2);DataStream<String> source3 = env.fromCollection(info3);DataStream<String> source4 = env.fromCollection(info4);
  14. source1.union(source2).union(source3).union(source4).print();// team A// team C// team E// team G// team B// team D// team F// team H}publicstaticvoidcrossFunction(ExecutionEnvironment env)throwsException{// cross,求两个集合的笛卡尔积,得到的结果数为:集合1的条数 乘以 集合2的条数List<String> info1 =newArrayList<>();
  15. info1.add("team A");
  16. info1.add("team B");List<Tuple2<String,Integer>> info2 =newArrayList<>();
  17. info2.add(newTuple2("W",3));
  18. info2.add(newTuple2("D",1));
  19. info2.add(newTuple2("L",0));DataSource<String> data1 = env.fromCollection(info1);DataSource<Tuple2<String,Integer>> data2 = env.fromCollection(info2);
  20. data1.cross(data2).print();// (team A,(W,3))// (team A,(D,1))// (team A,(L,0))// (team B,(W,3))// (team B,(D,1))// (team B,(L,0))}publicstaticvoidouterJoinFunction(ExecutionEnvironment env)throwsException{// Outjoin,跟sql语句中的left join,right join,full join意思一样// leftOuterJoin,跟join一样,但是左边集合的没有关联上的结果也会取出来,没关联上的右边为null// rightOuterJoin,跟join一样,但是右边集合的没有关联上的结果也会取出来,没关联上的左边为null// fullOuterJoin,跟join一样,但是两个集合没有关联上的结果也会取出来,没关联上的一边为nullList<Tuple2<Integer,String>> info1 =newArrayList<>();
  21. info1.add(newTuple2<>(1,"shenzhen"));
  22. info1.add(newTuple2<>(2,"guangzhou"));
  23. info1.add(newTuple2<>(3,"shanghai"));
  24. info1.add(newTuple2<>(4,"chengdu"));List<Tuple2<Integer,String>> info2 =newArrayList<>();
  25. info2.add(newTuple2<>(1,"深圳"));
  26. info2.add(newTuple2<>(2,"广州"));
  27. info2.add(newTuple2<>(3,"上海"));
  28. info2.add(newTuple2<>(5,"杭州"));DataSource<Tuple2<Integer,String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer,String>> data2 = env.fromCollection(info2);// left join// eft join:7> (1,shenzhen,深圳)// left join:2> (3,shanghai,上海)// left join:8> (4,chengdu,未知)// left join:16> (2,guangzhou,广州)
  29. data1.leftOuterJoin(data2).where(0).equalTo(0).with(newJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple3<Integer,String,String>>(){@OverridepublicTuple3<Integer,String,String>join(Tuple2<Integer,String> first,Tuple2<Integer,String> second)throwsException{Tuple3<Integer,String,String> tuple =newTuple3();if(second ==null){
  30. tuple.setField(first.f0,0);
  31. tuple.setField(first.f1,1);
  32. tuple.setField("未知",2);}else{// 另外一种赋值方式,和直接用构造函数赋值相同
  33. tuple.setField(first.f0,0);
  34. tuple.setField(first.f1,1);
  35. tuple.setField(second.f1,2);}return tuple;}}).print("left join");// right join// right join:2> (3,shanghai,上海)// right join:7> (1,shenzhen,深圳)// right join:15> (5,--,杭州)// right join:16> (2,guangzhou,广州)
  36. data1.rightOuterJoin(data2).where(0).equalTo(0).with(newJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple3<Integer,String,String>>(){@OverridepublicTuple3<Integer,String,String>join(Tuple2<Integer,String> first,Tuple2<Integer,String> second)throwsException{Tuple3<Integer,String,String> tuple =newTuple3();if(first ==null){
  37. tuple.setField(second.f0,0);
  38. tuple.setField("--",1);
  39. tuple.setField(second.f1,2);}else{// 另外一种赋值方式,和直接用构造函数赋值相同
  40. tuple.setField(first.f0,0);
  41. tuple.setField(first.f1,1);
  42. tuple.setField(second.f1,2);}return tuple;}}).print("right join");// fullOuterJoin// fullOuterJoin:2> (3,shanghai,上海)// fullOuterJoin:8> (4,chengdu,--)// fullOuterJoin:15> (5,--,杭州)// fullOuterJoin:16> (2,guangzhou,广州)// fullOuterJoin:7> (1,shenzhen,深圳)
  43. data1.fullOuterJoin(data2).where(0).equalTo(0).with(newJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple3<Integer,String,String>>(){@OverridepublicTuple3<Integer,String,String>join(Tuple2<Integer,String> first,Tuple2<Integer,String> second)throwsException{Tuple3<Integer,String,String> tuple =newTuple3();if(second ==null){
  44. tuple.setField(first.f0,0);
  45. tuple.setField(first.f1,1);
  46. tuple.setField("--",2);}elseif(first ==null){
  47. tuple.setField(second.f0,0);
  48. tuple.setField("--",1);
  49. tuple.setField(second.f1,2);}else{// 另外一种赋值方式,和直接用构造函数赋值相同
  50. tuple.setField(first.f0,0);
  51. tuple.setField(first.f1,1);
  52. tuple.setField(second.f1,2);}return tuple;}}).print("fullOuterJoin");}publicstaticvoidjoinFunction(ExecutionEnvironment env)throwsException{List<Tuple2<Integer,String>> info1 =newArrayList<>();
  53. info1.add(newTuple2<>(1,"shenzhen"));
  54. info1.add(newTuple2<>(2,"guangzhou"));
  55. info1.add(newTuple2<>(3,"shanghai"));
  56. info1.add(newTuple2<>(4,"chengdu"));List<Tuple2<Integer,String>> info2 =newArrayList<>();
  57. info2.add(newTuple2<>(1,"深圳"));
  58. info2.add(newTuple2<>(2,"广州"));
  59. info2.add(newTuple2<>(3,"上海"));
  60. info2.add(newTuple2<>(5,"杭州"));DataSource<Tuple2<Integer,String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer,String>> data2 = env.fromCollection(info2);//// join:2> ((3,shanghai),(3,上海))// join:16> ((2,guangzhou),(2,广州))// join:7> ((1,shenzhen),(1,深圳))
  61. data1.join(data2).where(0).equalTo(0).print("join");// join2:2> (3,上海,shanghai)// join2:7> (1,深圳,shenzhen)// join2:16> (2,广州,guangzhou)DataSet<Tuple3<Integer,String,String>> data3 = data1.join(data2).where(0).equalTo(0).with(newJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple3<Integer,String,String>>(){@OverridepublicTuple3<Integer,String,String>join(Tuple2<Integer,String> first,Tuple2<Integer,String> second)throwsException{returnnewTuple3<Integer,String,String>(first.f0, second.f1, first.f1);}});
  62. data3.print("join2");}publicstaticvoidfirstFunction(ExecutionEnvironment env)throwsException{List<Tuple2<Integer,String>> info =newArrayList<>();
  63. info.add(newTuple2(1,"Hadoop"));
  64. info.add(newTuple2(1,"Spark"));
  65. info.add(newTuple2(1,"Flink"));
  66. info.add(newTuple2(2,"Scala"));
  67. info.add(newTuple2(2,"Java"));
  68. info.add(newTuple2(2,"Python"));
  69. info.add(newTuple2(3,"Linux"));
  70. info.add(newTuple2(3,"Window"));
  71. info.add(newTuple2(3,"MacOS"));DataSet<Tuple2<Integer,String>> dataSet = env.fromCollection(info);// 前几个// dataSet.first(4).print();// (1,Hadoop)// (1,Spark)// (1,Flink)// (2,Scala)// 按照tuple2的第一个元素进行分组,查出每组的前2个// dataSet.groupBy(0).first(2).print();// (3,Linux)// (3,Window)// (1,Hadoop)// (1,Spark)// (2,Scala)// (2,Java)// 按照tpule2的第一个元素进行分组,并按照倒序排列,查出每组的前2个
  72. dataSet.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print();// (3,Window)// (3,MacOS)// (1,Spark)// (1,Hadoop)// (2,Scala)// (2,Python)}publicstaticvoiddistinctFunction(ExecutionEnvironment env)throwsException{List list =newArrayList<Tuple3<Integer,Integer,Integer>>();
  73. list.add(newTuple3<>(0,3,6));
  74. list.add(newTuple3<>(0,2,5));
  75. list.add(newTuple3<>(0,3,6));
  76. list.add(newTuple3<>(1,1,9));
  77. list.add(newTuple3<>(1,2,8));
  78. list.add(newTuple3<>(1,2,8));
  79. list.add(newTuple3<>(1,3,9));DataSet<Tuple3<Integer,Integer,Integer>> source = env.fromCollection(list);// 去除tuple3中元素完全一样的
  80. source.distinct().print();// (1,3,9)// (0,3,6)// (1,1,9)// (1,2,8)// (0,2,5)// 去除tuple3中第一个元素一样的,只保留第一个// source.distinct(0).print();// (1,1,9)// (0,3,6)// 去除tuple3中第一个和第三个相同的元素,只保留第一个// source.distinct(0,2).print();// (0,3,6)// (1,1,9)// (1,2,8)// (0,2,5)}publicstaticvoiddistinctFunction2(ExecutionEnvironment env)throwsException{DataSet<User> source = env.fromCollection(Arrays.asList(newUser(1,"alan1","1","1@1.com",18,3000),newUser(2,"alan2","2","2@2.com",19,200),newUser(3,"alan1","3","3@3.com",18,1000),newUser(5,"alan1","5","5@5.com",28,1500),newUser(4,"alan2","4","4@4.com",20,300)));// source.distinct("name").print();// User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)// User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=3000.0)
  81. source.distinct("name","age").print();// User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=3000.0)// User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)// User(id=5, name=alan1, pwd=5, email=5@5.com, age=28, balance=1500.0)// User(id=4, name=alan2, pwd=4, email=4@4.com, age=20, balance=300.0)}publicstaticvoiddistinctFunction3(ExecutionEnvironment env)throwsException{DataSet<User> source = env.fromCollection(Arrays.asList(newUser(1,"alan1","1","1@1.com",18,-1000),newUser(2,"alan2","2","2@2.com",19,200),newUser(3,"alan1","3","3@3.com",18,-1000),newUser(5,"alan1","5","5@5.com",28,1500),newUser(4,"alan2","4","4@4.com",20,-300)));// 针对balance增加绝对值去重
  82. source.distinct(newKeySelector<User,Double>(){@OverridepublicDoublegetKey(User value)throwsException{returnMath.abs(value.getBalance());}}).print();// User(id=5, name=alan1, pwd=5, email=5@5.com, age=28, balance=1500.0)// User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)// User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=-1000.0)// User(id=4, name=alan2, pwd=4, email=4@4.com, age=20, balance=-300.0)}publicstaticvoiddistinctFunction4(ExecutionEnvironment env)throwsException{List<String> info =newArrayList<>();
  83. info.add("Hadoop,Spark");
  84. info.add("Spark,Flink");
  85. info.add("Hadoop,Flink");
  86. info.add("Hadoop,Flink");DataSet<String> source = env.fromCollection(info);
  87. source.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String value,Collector<String> out)throwsException{System.err.print("come in ");for(String token : value.split(",")){
  88. out.collect(token);}}});
  89. source.distinct().print();}}

10、Window

KeyedStream → WindowedStream
Window 函数允许按时间或其他条件对现有 KeyedStream 进行分组。 以下是以 10 秒的时间窗口聚合:

  1. inputStream.keyBy(0).window(Time.seconds(10));

Flink 定义数据片段以便(可能)处理无限数据流。 这些切片称为窗口。 此切片有助于通过应用转换处理数据块。 要对流进行窗口化,需要分配一个可以进行分发的键和一个描述要对窗口化流执行哪些转换的函数。要将流切片到窗口,可以使用 Flink 自带的窗口分配器。 我们有选项,如 tumbling windows, sliding windows, global 和 session windows。
具体参考系列文章
6、Flink四大基石之Window详解与详细示例(一)
6、Flink四大基石之Window详解与详细示例(二)
7、Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)

11、WindowAll

DataStream → AllWindowedStream
windowAll 函数允许对常规数据流进行分组。 通常,这是非并行数据转换,因为它在非分区数据流上运行。
与常规数据流功能类似,也有窗口数据流功能。 唯一的区别是它们处理窗口数据流。 所以窗口缩小就像 Reduce 函数一样,Window fold 就像 Fold 函数一样,并且还有聚合。

  1. dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));// Last 5 seconds of data

这适用于非并行转换的大多数场景。所有记录都将收集到 windowAll 算子对应的一个任务中。

具体参考系列文章
6、Flink四大基石之Window详解与详细示例(一)
6、Flink四大基石之Window详解与详细示例(二)
7、Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)

12、Window Apply

WindowedStream → DataStream
AllWindowedStream → DataStream
将通用 function 应用于整个窗口。下面是一个手动对窗口内元素求和的 function。

如果你使用 windowAll 转换,则需要改用 AllWindowFunction。

  1. windowedStream.apply(newWindowFunction<Tuple2<String,Integer>,Integer,Tuple,Window>(){publicvoid apply (Tuple tuple,Window window,Iterable<Tuple2<String,Integer>> values,Collector<Integer> out)throwsException{int sum =0;for(value t: values){
  2. sum += t.f1;}
  3. out.collect (newInteger(sum));}});// 在 non-keyed 窗口流上应用 AllWindowFunction
  4. allWindowedStream.apply (newAllWindowFunction<Tuple2<String,Integer>,Integer,Window>(){publicvoid apply (Window window,Iterable<Tuple2<String,Integer>> values,Collector<Integer> out)throwsException{int sum =0;for(value t: values){
  5. sum += t.f1;}
  6. out.collect (newInteger(sum));}});

13、Window Reduce

WindowedStream → DataStream
对窗口应用 reduce function 并返回 reduce 后的值。

  1. windowedStream.reduce (newReduceFunction<Tuple2<String,Integer>>(){publicTuple2<String,Integer>reduce(Tuple2<String,Integer> value1,Tuple2<String,Integer> value2)throwsException{returnnewTuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);}});

14、Aggregations on windows

WindowedStream → DataStream
聚合窗口的内容。min和minBy之间的区别在于,min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。

  1. windowedStream.sum(0);
  2. windowedStream.sum("key");
  3. windowedStream.min(0);
  4. windowedStream.min("key");
  5. windowedStream.max(0);
  6. windowedStream.max("key");
  7. windowedStream.minBy(0);
  8. windowedStream.minBy("key");
  9. windowedStream.maxBy(0);
  10. windowedStream.maxBy("key");

以上,本文主要介绍Flink 的10种常用的operator(window、distinct、join等)及以具体可运行示例进行说明.
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为五篇,即:
【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等
【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)

标签: flink 数据库 android

本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/134781144
版权归原作者 一瓢一瓢的饮 alanchan 所有, 如有侵权,请联系我们删除。

“【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等”的评论:

还没有评论