0


十二、Flink自定义 FlatMap 方法

1、概述

1)作用

flatMap是将数据先map在打平,输入一个元素,可以输出0到多个元素

2)使用

1.匿名内部类

2.lambda表达式

3.实现FlatMapFunction接口

4.继承RichFlatMapFunction

2、代码实现
  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.common.functions.RichFlatMapFunction;
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. public class MyFlatmapDemo {
  9. public static void main(String[] args) throws Exception {
  10. // 创建执行环境的配置,添加webUI的端口号
  11. Configuration configuration = new Configuration();
  12. configuration.setInteger("rest.port", 8081);
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
  14. // 从端口接入数据
  15. DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
  16. // 1、匿名内部类
  17. SingleOutputStreamOperator<String> flatMapStream1 = lines.flatMap(new FlatMapFunction<String, String>() {
  18. @Override
  19. public void flatMap(String value, Collector<String> out) throws Exception {
  20. for (String word : value.split(" ")) {
  21. out.collect(word);
  22. }
  23. }
  24. });
  25. // 2、lambda表达式
  26. SingleOutputStreamOperator<String> flatMapStream2 = lines.flatMap((FlatMapFunction<String, String>) (value, out) -> {
  27. for (String word : value.split(" ")) {
  28. out.collect(word);
  29. }
  30. }).returns(String.class);
  31. // 3、继承 FlatmapFunction
  32. SingleOutputStreamOperator<String> flatMapStream3 = lines.flatMap(new MyFlatmapFunc());
  33. // 4、继承 RichFlatMapFunction
  34. SingleOutputStreamOperator<String> flatMapStream4 = lines.flatMap(new MyRichFlatMapFunc());
  35. flatMapStream1.print();
  36. flatMapStream2.print();
  37. flatMapStream3.print();
  38. flatMapStream4.print();
  39. env.execute();
  40. }
  41. }
  42. class MyFlatmapFunc implements FlatMapFunction<String,String>{
  43. @Override
  44. public void flatMap(String value, Collector<String> out) throws Exception {
  45. for (String word : value.split(" ")) {
  46. out.collect(word);
  47. }
  48. }
  49. }
  50. class MyRichFlatMapFunc extends RichFlatMapFunction<String,String>{
  51. @Override
  52. public void open(Configuration parameters) throws Exception {
  53. super.open(parameters);
  54. System.out.println("可以在rich方法中创建状态和定时器");
  55. }
  56. @Override
  57. public void close() throws Exception {
  58. super.close();
  59. }
  60. @Override
  61. public void flatMap(String value, Collector<String> out) throws Exception {
  62. for (String word : value.split(" ")) {
  63. out.collect(word);
  64. }
  65. }
  66. }
3、执行结果

1)输入测试数据

  1. nc -lk 8888
  2. hello world

控制台输出执行结果

  1. 3> hello
  2. 3> world
  3. 5> hello
  4. 5> world
  5. 8> hello
  6. 8> world
  7. 7> hello
  8. 7> world
标签: flink big data java

本文转载自: https://blog.csdn.net/m0_50186249/article/details/122312668
版权归原作者 猫猫爱吃小鱼粮 所有, 如有侵权,请联系我们删除。

“十二、Flink自定义 FlatMap 方法”的评论:

还没有评论