0


Flink多流转换(一)

8.1 分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子 DataStream,如图 所示。一般来说,我们会定义一些 筛选条件将符合条件的数据拣选出来放到对应的流里

8.1.1 简单实现

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调 用.filter()方法进行筛选,就可以得到拆分之后的流了。例如,我们可以将电商网站收集到的用户行为数据进行一个拆分,根据类型(type)的不 同,分为“Mary”的浏览数据、“Bob”的浏览数据等等。

  1. package com.atguigu.chapter08;
  2. import com.atguigu.chapter05.ClickSource;
  3. import com.atguigu.chapter05.Event;
  4. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.api.java.tuple.Tuple3;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.ProcessFunction;
  12. import org.apache.flink.util.Collector;
  13. import org.apache.flink.util.OutputTag;
  14. import java.time.Duration;
  15. public class SplitStreamTest {
  16. public static void main(String[] args) throws Exception {
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.setParallelism(1);
  19. DataStreamSource<Event> stream = env.addSource(new ClickSource());
  20. stream.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
  21. .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
  22. @Override
  23. public long extractTimestamp(Event event, long l) {
  24. return event.timestamp;
  25. }
  26. }));
  27. //定义输出标签
  28. OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary"){};
  29. OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob"){};
  30. //
  31. SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
  32. @Override
  33. public void processElement(Event event, ProcessFunction<Event, Event>.Context context, Collector<Event> collector) throws Exception {
  34. if (event.user.equals("Mary")) {//侧输出流1
  35. context.output(MaryTag, Tuple3.of(event.user, event.url, event.timestamp));
  36. } else if (event.user.equals("Bob")) {//侧输出流2
  37. context.output(BobTag, Tuple3.of(event.user, event.url, event.timestamp));
  38. } else {//主流
  39. collector.collect(event);
  40. }
  41. }
  42. });
  43. processedStream.print("else");
  44. processedStream.getSideOutput(MaryTag).print("Mary");
  45. processedStream.getSideOutput(BobTag).print("Bob");
  46. env.execute();
  47. }
  48. }

8.2 基本合流操作

既然一条流可以分开,自然多条流就可以合并。在实际应用中,我们经常会遇到来源不同 的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的API 也更加丰富。

8.2.1 联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union),如图所示。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素, 数据类型不变。这种合流方式非常简单粗暴,就像公路上多个车道汇在一起一样。

在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参 数,就可以实现流的联合了;得到的依然是一个 DataStream:

  1. stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。

这里需要考虑一个问题。在事件时间语义下,水位线是时间的进度标志;不同的流中可能 水位线的进展快慢完全不同,如果它们合并在一起,水位线又该以哪个为准呢?

还以要考虑水位线的本质含义,是“之前的所有数据已经到齐了”;所以对于合流之后的 水位线,也是要以最小的那个为准,这样才可以保证所有流都不会再传来之前的数据。换句话 说,多流合并时处理的时效性是以最慢的那个流为准的。我们自然可以想到,这与之前介绍的 并行任务水位线传递的规则是完全一致的;多条流的合并,某种意义上也可以看作是多个并行 任务向同一个下游任务汇合的过程。

  1. package com.atguigu.chapter08;
  2. import com.atguigu.chapter05.ClickSource;
  3. import com.atguigu.chapter05.Event;
  4. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.functions.ProcessFunction;
  10. import org.apache.flink.util.Collector;
  11. import java.time.Duration;
  12. public class UnionTest {
  13. public static void main(String[] args) throws Exception {
  14. StreamExecutionEnvironment env =
  15. StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.setParallelism(1);
  17. SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102", 7777)
  18. .map(data -> {
  19. String[] field = data.split(",");
  20. return new Event(field[0].trim(), field[1].trim(),
  21. Long.valueOf(field[2].trim()));
  22. })
  23. .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  24. .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
  25. @Override
  26. public long extractTimestamp(Event element, long
  27. recordTimestamp) {
  28. return element.timestamp;
  29. }
  30. })
  31. );
  32. stream1.print("stream1");
  33. SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop103", 7777)
  34. .map(data -> {
  35. String[] field = data.split(",");
  36. return new Event(field[0].trim(), field[1].trim(),
  37. Long.valueOf(field[2].trim()));
  38. })
  39. .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  40. .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
  41. @Override
  42. public long extractTimestamp(Event element, long
  43. recordTimestamp) {
  44. return element.timestamp;
  45. }
  46. })
  47. );
  48. stream2.print("stream2");
  49. // 合并两条流
  50. stream1.union(stream2)
  51. .process(new ProcessFunction<Event, String>() {
  52. @Override
  53. public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
  54. out.collect(" 水 位 线 : " + ctx.timerService().currentWatermark());
  55. }
  56. })
  57. .print();
  58. env.execute();
  59. }
  60. }

这里为了更清晰地看到水位线的进展,我们创建了两条流来读取 socket 文本数据,并从 数据中提取时间戳作为生成水位线的依据。用 union 将两条流合并后,用一个 ProcessFunction来进行处理,获取当前的水位线进行输出。我们会发现两条流中每输入一个数据,合并之后的 流中都会有数据出现;而水位线只有在两条流中水位线最小值增大的时候,才会真正向前推进。

我们可以来分析一下程序的运行:

在合流之后的 ProcessFunction 对应的算子任务中,逻辑时钟的初始状态如图所示。

(初始状态)

由于 Flink 会在流的开始处,插入一个负无穷大(Long.MIN_VALUE)的水位线,所以合 流后的 ProcessFunction 对应的处理任务,会为合并的每条流保存一个“分区水位线”,初始值 都是 Long.MIN_VALUE;而此时算子任务的水位线是所有分区水位线的最小值,因此也是Long.MIN_VALUE。

我们在第一条 socket 文本流输入数据[Alice, ./home, 1000] 时,水位线不会立即改变,只 有到水位线生成周期的时间点(200ms 一次)才会推进到 1000 - 1 = 999 毫秒;这与我们在 7.3.2小节中对事件时间定时器的测试是一致的。不过即使第一条水位线推进到了 999,由于另一条 流没有变化,所以合流之后的 Process 任务水位线仍然是初始值。

(第一条流数据到达)

如果这时我们在第二条 socket 文本流输入数据[Alice, ./home, 2000],那么第二条流的水位线会随之推进到 2000 – 1 = 1999 毫秒,Process 任务所保存的第二条流分区水位线更新为 1999; 这样两个分区水位线取最小值,Process 任务的水位线也就可以推进到 999 了。

(第二条流数据到达)

进而如果我们继续在第一条流中输入数据[Alice, ./home, 3000],Process 任务的第一条流分 区水位线就会更新为 2999,同时将算子任务的时钟推进到 1999。

(第一条流数据再次到达)

8.2.2 连接(Connect)

流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少 出现。除了联合(union),Flink 还提供了另外一种方便的合流操作——连接(connect)。顾名 思义,这种操作就是直接把两条流像接线一样对接起来。

**1. 连接流(ConnectedStreams) **

为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个 DataStream 中的 数据只能有唯一的类型,所以连接得到的并不是 DataStream,而是一个“连接流” (ConnectedStreams)。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中; 事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的 DataStream, 还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型 的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是 “一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到 同一个 DataStream 中。

在代码实现上,需要分为两步:首先基于一条 DataStream 调用**.connect()方法,传入另外 一条 DataStream 作为参数,将两条流连接起来,得到一个 ConnectedStreams;然后再调用同处 理方法得到 DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法**。

  1. package com.atguigu.chapter08;
  2. import com.atguigu.chapter05.Event;
  3. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  5. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.functions.co.CoMapFunction;
  10. import java.time.Duration;
  11. public class ConnectTest {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env =
  14. StreamExecutionEnvironment.getExecutionEnvironment();
  15. env.setParallelism(1);
  16. DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3);
  17. DataStreamSource<Long> stream2 = env.fromElements(4L, 5L, 6L,7L);
  18. ConnectedStreams<Long,Integer> connectedStream = stream2.connect(stream1);
  19. connectedStream.map(new CoMapFunction<Long, Integer, String>() {
  20. @Override
  21. public String map1(Long aLong) throws Exception {
  22. return "Long:"+aLong.toString();
  23. }
  24. @Override
  25. public String map2(Integer integer) throws Exception {
  26. return "Integer:"+integer.toString();
  27. }
  28. })
  29. .print();
  30. env.execute();
  31. }
  32. }

上面的代码中,ConnectedStreams 有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用.map()方法时传入的不再是一个简单的 MapFunction, 而是一个** CoMapFunction**,表示分别对两条流中的数据执行 map 操作。这个接口有三个类型 参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常 直白:.map1()就是对第一条流中数据的 map 操作,.map2()则是针对第二条流。这里我们将一 条 Integer 流和一条 Long 流合并,转换成 String 输出。所以当遇到第一条流输入的整型值时, 调用.map1();而遇到第二条流输入的长整型数据时,调用.map2():最终都转换为字符串输出, 合并成了一条字符串流。

值得一提的是,ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的 还是一个 ConnectedStreams:

  1. connectedStreams.keyBy(keySelector1, keySelector2);

2. CoProcessFunction

对于连接流 ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口 中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调 用。我们把这种接口叫作“协同处理函数”(co-process function)。与 CoMapFunction 类似,如 果是调用.flatMap()就需要传入一个 CoFlatMapFunction,需要实现 flatMap1()、flatMap2()两个 方法;而调用.process()时,传入的则是一个 CoProcessFunction。

抽象类 CoProcessFunction 在源码中定义如下:

  1. public abstract class CoProcessFunction<IN1, IN2, OUT> extends
  2. AbstractRichFunction {
  3. ...
  4. public abstract void processElement1(IN1 value, Context ctx, Collector<OUT>
  5. out) throws Exception;
  6. public abstract void processElement2(IN2 value, Context ctx, Collector<OUT>
  7. out) throws Exception;
  8. public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
  9. public abstract class Context {...}
  10. ...
  11. }

下面是 CoProcessFunction 的一个具体示例:我们可以实现一个实时对账的需求,也就是

app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将 会互相等待 5 秒钟,如果等不来对应的支付事件,那么就输出报警信息。程序如下:

  1. package com.atguigu.chapter08;
  2. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  3. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  4. import org.apache.flink.api.common.state.ValueState;
  5. import org.apache.flink.api.common.state.ValueStateDescriptor;
  6. import org.apache.flink.api.common.typeinfo.Types;
  7. import org.apache.flink.api.java.tuple.Tuple;
  8. import org.apache.flink.api.java.tuple.Tuple3;
  9. import org.apache.flink.api.java.tuple.Tuple4;
  10. import org.apache.flink.configuration.Configuration;
  11. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
  15. import org.apache.flink.util.Collector;
  16. import java.time.Duration;
  17. public class BillCheckExample {
  18. public static void main(String[] args) throws Exception {
  19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. env.setParallelism(1);
  21. //来自app的支付日志
  22. SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(
  23. Tuple3.of("order-1", "app", 1000L),
  24. Tuple3.of("order-2", "app", 2000L),
  25. Tuple3.of("order-3", "app", 3500L)
  26. ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
  27. .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
  28. @Override
  29. public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
  30. return stringStringLongTuple3.f2;
  31. }
  32. }));
  33. //来自第三方支付平台的支付日志
  34. SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdPartStream = env.fromElements(
  35. Tuple4.of("order-1", "third-party", "success", 3000L),
  36. Tuple4.of("order-3", "third-party", "success", 4000L)
  37. ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
  38. .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
  39. @Override
  40. public long extractTimestamp(Tuple4<String, String, String, Long> stringStringStringLongTuple4, long l) {
  41. return stringStringStringLongTuple4.f3;
  42. }
  43. }));
  44. //检测统一支付单在两条流中是否匹配,不匹配就报警
  45. /*
  46. appStream.keyBy(data -> data.f0)
  47. .connect(thirdPartStream.keyBy(data -> data.f0));
  48. */
  49. appStream.connect(thirdPartStream)
  50. .keyBy(data -> data.f0,data -> data.f0)
  51. .process(new OrderMatchResult())
  52. .print();
  53. env.execute();
  54. }
  55. //自定义实现CoProcessFunction
  56. public static class OrderMatchResult extends CoProcessFunction<Tuple3<String,String,Long>,Tuple4<String,String,String,Long>,String>{
  57. //定义状态变量,用来保存已经到达的事件
  58. private ValueState<Tuple3<String, String, Long>> appEventState;
  59. private ValueState<Tuple4<String, String, String, Long>> thirdPartEventState;
  60. @Override
  61. public void open(Configuration parameters) throws Exception {
  62. appEventState =getRuntimeContext().getState(
  63. new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
  64. );
  65. thirdPartEventState=getRuntimeContext().getState(
  66. new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
  67. );
  68. }
  69. @Override
  70. public void processElement1(Tuple3<String, String, Long> stringStringLongTuple3, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context context, Collector<String> collector) throws Exception {
  71. //来的是app event,看另一条流中事件是否来过
  72. if(thirdPartEventState.value() !=null){ //来过
  73. collector.collect("对账成功"+stringStringLongTuple3+" "+thirdPartEventState.value());
  74. //清空状态
  75. thirdPartEventState.clear();
  76. }else {
  77. //更新状态
  78. appEventState.update(stringStringLongTuple3);
  79. //注册一个定时器,开始等待另一条流的事件
  80. context.timerService().registerEventTimeTimer(stringStringLongTuple3.f2+5000L);
  81. }
  82. }
  83. @Override
  84. public void processElement2(Tuple4<String, String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context context, Collector<String> out) throws Exception {
  85. if (appEventState.value() != null){
  86. out.collect("对账成功:" + appEventState.value() + " " + value);
  87. // 清空状态
  88. appEventState.clear();
  89. } else {
  90. // 更新状态
  91. thirdPartEventState.update(value);
  92. // 注册一个 5 秒后的定时器,开始等待另一条流的事件
  93. context.timerService().registerEventTimeTimer(value.f3 + 5000L);
  94. }
  95. }
  96. @Override
  97. public void onTimer(long timestamp, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
  98. //定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来
  99. if(appEventState.value()!=null){
  100. out.collect("对账失败"+appEventState.value()+" "+"第三方支付平台信息未到");
  101. }
  102. if(thirdPartEventState.value()!=null){
  103. out.collect("对账失败"+thirdPartEventState.value()+" "+"app信息未到");
  104. }
  105. appEventState.clear();
  106. thirdPartEventState.clear();
  107. }
  108. }
  109. }

在程序中,我们声明了两个状态变量分别用来保存 App 的支付信息和第三方的支付信息

App 的支付信息到达以后,会检查对应的第三方支付信息是否已经先到达(先到达会保存在对 应的状态变量中),如果已经到达了,那么对账成功,直接输出对账成功的信息,并将保存第 三方支付消息的状态变量清空。如果 App 对应的第三方支付信息没有到来,那么我们会注册 一个 5 秒钟之后的定时器,也就是说等待第三方支付事件 5 秒钟。当定时器触发时,检查保存app 支付信息的状态变量是否还在,如果还在,说明对应的第三方支付信息没有到来,所以输 出报警信息。

标签: flink 大数据

本文转载自: https://blog.csdn.net/JiaXingNashishua/article/details/126915629
版权归原作者 大数据阿嘉 所有, 如有侵权,请联系我们删除。

“Flink多流转换(一)”的评论:

还没有评论