⭐简单说两句⭐
✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~作者:****小叮当撩代码,CSDN后端领域新星创作者 |阿里云专家博主
CSDN个人主页:小叮当撩代码
🔎GZH:
哆啦A梦撩代码
🎉欢迎关注🔎点赞👍收藏⭐️留言📝
文章目录
❤️时间语义
💕时间的分类
Flink中,时间通常分为三类
EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间
IngestionTime:摄入时间,是事件/数据到达流处理系统的时间
ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间
💛水位线Watermark
✅水位线
Flink的三种时间语义中,处理时间和摄入时间都可以不用设置Watermark。如果我们要使用事件时间Event Time语义,以下两项配置缺一不可:
- 使用一个时间戳为数据流中每个事件的Event Time赋值
- 生成Watermark
Event Time是每个事件的元数据,如果不设置,Flink并不知道每个事件的发生时间,我们必须要为每个事件的Event Time赋值一个时间戳。
有了Event Time时间戳,我们还必须生成Watermark。Watermark是Flink插入到数据流中的一种特殊的数据结构,它包含一个时间戳,并假设后续不会有小于该时间戳的数据。下图展示了一个乱序数据流,其中方框是单个事件,方框中的数字是其对应的Event Time时间戳,圆圈为Watermark,圆圈中的数字为Watermark对应的时间戳。
一个包含Watermark的乱序数据流
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)
Watermark 是一个单独计算出来的时间戳
Watermark可以通过改变窗口的触发时机 在 一定程度上解决数据乱序或延迟达到的问题
Watermark >= 窗口结束时间 时 就会触发窗口计算(窗口中得有数据)
延迟或乱序严重的数据还是丢失, 但是可以通过调大最大允许的延迟时间(乱序度) 来解决, 或 使用侧道输出流来单独收集延迟或乱序严重的数据,保证数据不丢失!
🍏分布式环境下水位线的传播
在多并行度下,每个并行有一个水印
比如并行度是6,那么程序中就有6个watermark
分别属于这6个并行度(线程)
那么,触发条件以6个水印中最小的那个为准
平时测试水位线强烈建议将并行度设为1
🍊代码实战
需求
实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
我们循序渐进先写一版没有Watermark的
代码清单
importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importjava.text.SimpleDateFormat;importjava.util.Random;importjava.util.UUID;/**
* @author tiancx
*/publicclassWatermarkDemo{@Data// set get toString@AllArgsConstructor@NoArgsConstructorpublicstaticclassOrderInfo{//格式化的时间privateString time;privateString orderId;privateint uid;privateint money;privatelong timeStamp;}publicstaticclassMySourceimplementsSourceFunction<OrderInfo>{boolean flag =true;@Overridepublicvoidrun(SourceFunction.SourceContext ctx)throwsException{// 源源不断的产生数据Random random =newRandom();while(flag){OrderInfo orderInfo =newOrderInfo();
orderInfo.setOrderId(UUID.randomUUID().toString());
orderInfo.setUid(random.nextInt(3));
orderInfo.setMoney(random.nextInt(101));
orderInfo.setTimeStamp(System.currentTimeMillis());long timeStamp = orderInfo.getTimeStamp();//转成yyyy-MM-dd HH:mm:ssString format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timeStamp);
orderInfo.setTime(format);System.out.println("数据:"+ orderInfo);
ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥@Overridepublicvoidcancel(){
flag =false;}}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//加载数据DataStreamSource<OrderInfo> source = env.addSource(newMySource());//keyby分组KeyedStream<OrderInfo,Integer> keyBy = source.keyBy(OrderInfo::getUid);//开窗计算(滚动窗口)SingleOutputStreamOperator<OrderInfo> sum = keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money");
sum.print();
env.execute();}}
我们再写一版有水位线的
代码清单
importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importorg.apache.commons.lang.time.DateFormatUtils;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importjava.text.SimpleDateFormat;importjava.time.Duration;importjava.util.Random;importjava.util.UUID;/**
* @author tiancx
*/publicclassWatermarkDemo{@Data// set get toString@AllArgsConstructor@NoArgsConstructorpublicstaticclassOrderInfo{//格式化的时间privateString time;privateString orderId;privateint uid;privateint money;privatelong timeStamp;}publicstaticclassMySourceimplementsSourceFunction<OrderInfo>{boolean flag =true;@Overridepublicvoidrun(SourceFunction.SourceContext ctx)throwsException{// 源源不断的产生数据Random random =newRandom();while(flag){OrderInfo orderInfo =newOrderInfo();
orderInfo.setOrderId(UUID.randomUUID().toString());
orderInfo.setUid(random.nextInt(3));
orderInfo.setMoney(random.nextInt(101));
orderInfo.setTimeStamp(System.currentTimeMillis()-1000*2);long timeStamp = orderInfo.getTimeStamp();//转成yyyy-MM-dd HH:mm:ssString format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timeStamp);
orderInfo.setTime(format);// System.out.println("数据:" + orderInfo);
ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥@Overridepublicvoidcancel(){
flag =false;}}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);//加载数据DataStreamSource<OrderInfo> source = env.addSource(newMySource());// 在转换算子之前,加载数据之后,添加水印// 添加使用event以及watermark进行操作SingleOutputStreamOperator<OrderInfo> watermarks = source.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(newSerializableTimestampAssigner<OrderInfo>(){@OverridepubliclongextractTimestamp(OrderInfo element,long recordTimestamp){System.out.println("数据:"+ element +"系统时间:"+ recordTimestamp);return element.getTimeStamp();}}));//keyby分组KeyedStream<OrderInfo,Integer> keyBy = watermarks.keyBy(OrderInfo::getUid);//开窗计算(滚动窗口)SingleOutputStreamOperator<String> sum = keyBy.window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(newWindowFunction<OrderInfo,String,Integer,TimeWindow>(){@Overridepublicvoidapply(Integer key,TimeWindow window,Iterable<OrderInfo> input,Collector<String> out)throwsException{String startTime =DateFormatUtils.format(window.getStart(),"yyyy-MM-dd HH:mm:ss");String endTime =DateFormatUtils.format(window.getEnd(),"yyyy-MM-dd HH:mm:ss");String waterTime =DateFormatUtils.format(window.maxTimestamp(),"yyyy-MM-dd HH:mm:ss");int sumMoney =0;for(OrderInfo orderInfo : input){
sumMoney += orderInfo.getMoney();}
out.collect("uid="+ key +",starttime="+ startTime +",endTime="+ endTime +",totalMoney="+ sumMoney);}});
sum.print("窗口计算:");
env.execute();}
我们看下运行结果
🌽自定义水位线生成器
我们上面使用的是Flink帮我们内置的
我们还可以使用自定义水位线生成器
🌶️周期性水位线生成器(Periodic Generator)
假如我们想周期性地生成Watermark,这个周期是可以设置的,默认情况下是每200毫秒生成一个Watermark,或者说Flink每200毫秒调用一次生成Watermark的方法。我们可以在执行环境中设置这个周期:
env.getConfig.setAutoWatermarkInterval(1000L)
使用方式
DataStream<MyType> stream =...DataStream<MyType> withTimestampsAndWatermarks = stream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(...).withTimestampAssigner(...));
代码清单
importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importorg.apache.commons.lang.time.DateFormatUtils;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.eventtime.*;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importjava.text.SimpleDateFormat;importjava.util.Random;importjava.util.UUID;/**
* @author tiancx
*/publicclassWatermarkDemo{@Data// set get toString@AllArgsConstructor@NoArgsConstructorpublicstaticclassOrderInfo{//格式化的时间privateString time;privateString orderId;privateint uid;privateint money;privatelong timeStamp;}publicstaticclassMySourceimplementsSourceFunction<OrderInfo>{boolean flag =true;@Overridepublicvoidrun(SourceFunction.SourceContext ctx)throwsException{// 源源不断的产生数据Random random =newRandom();while(flag){OrderInfo orderInfo =newOrderInfo();
orderInfo.setOrderId(UUID.randomUUID().toString());
orderInfo.setUid(random.nextInt(3));
orderInfo.setMoney(random.nextInt(101));
orderInfo.setTimeStamp(System.currentTimeMillis()-1000*2);long timeStamp = orderInfo.getTimeStamp();//转成yyyy-MM-dd HH:mm:ssString format =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timeStamp);
orderInfo.setTime(format);// System.out.println("数据:" + orderInfo);
ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥@Overridepublicvoidcancel(){
flag =false;}}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);//加载数据DataStreamSource<OrderInfo> source = env.addSource(newMySource());// 在转换算子之前,加载数据之后,添加水印// 添加使用event以及watermark进行操作SingleOutputStreamOperator<OrderInfo> watermarks = source.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(x ->newMyPeriodicGenerator()).withTimestampAssigner(newSerializableTimestampAssigner<OrderInfo>(){@OverridepubliclongextractTimestamp(OrderInfo element,long recordTimestamp){System.out.println("数据:"+ element +"系统时间:"+ recordTimestamp);return element.getTimeStamp();}}));//keyby分组KeyedStream<OrderInfo,Integer> keyBy = watermarks.keyBy(OrderInfo::getUid);//开窗计算(滚动窗口)SingleOutputStreamOperator<String> sum = keyBy.window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(newWindowFunction<OrderInfo,String,Integer,TimeWindow>(){@Overridepublicvoidapply(Integer key,TimeWindow window,Iterable<OrderInfo> input,Collector<String> out)throwsException{String startTime =DateFormatUtils.format(window.getStart(),"yyyy-MM-dd HH:mm:ss");String endTime =DateFormatUtils.format(window.getEnd(),"yyyy-MM-dd HH:mm:ss");String waterTime =DateFormatUtils.format(window.maxTimestamp(),"yyyy-MM-dd HH:mm:ss");int sumMoney =0;for(OrderInfo orderInfo : input){
sumMoney += orderInfo.getMoney();}
out.collect("uid="+ key +",starttime="+ startTime +",endTime="+ endTime +",totalMoney="+ sumMoney);}});
sum.print("窗口计算:");
env.execute();}publicstaticclassMyPeriodicGeneratorimplementsWatermarkGenerator<OrderInfo>{privatelong maxOutOfOrderness =3000;// 3 secondsprivatelong currentMaxTimestamp;@OverridepublicvoidonEvent(OrderInfo event,long eventTimestamp,WatermarkOutput output){// 更新currentMaxTimestamp为当前遇到的最大值
currentMaxTimestamp =Math.max(currentMaxTimestamp, eventTimestamp);}@OverridepublicvoidonPeriodicEmit(WatermarkOutput output){// Watermark比currentMaxTimestamp最大值慢3秒
output.emitWatermark(newWatermark(currentMaxTimestamp - maxOutOfOrderness));}}}
🫑断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的事件时,就立
即发出水位线。我们把发射水位线的逻辑写在 onEvent 方法当中即可。
🧃迟到数据处理
waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,对于延迟的数据Flink也有自己的解决办法:
主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据
设置允许延迟的时间是通过allowedLateness(lateness: Time)设置
保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存
获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取
🫖设置窗口延迟关闭
Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。
以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
【Tips】: 延迟关闭只能用到event time上
☕️使用侧流接收迟到的数据
侧输出机制:可以将错过水印又错过allowedLateness允许的时间的数据,单独的存放到一个DataStream中,然后开发人员可以自定逻辑对这些超级迟到数据进行处理。
处理主要使用两个方式:
对窗口对象调用sideOutputLateData(OutputTag outputTag)方法,将数据存储到一个地方
对DataStream对象调用getSideOutput(OutputTag outputTag)方法,取出这些被单独处理的数据的DataStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)
【都看到这了,点点赞点点关注呗,爱你们】😚😚
💬
✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~
作者:****小叮当撩代码,CSDN后端领域新星创作者 |阿里云专家博主
CSDN个人主页:小叮当撩代码
🔎GZH:
哆啦A梦撩代码
🎉欢迎关注🔎点赞👍收藏⭐️留言📝
版权归原作者 小叮当撩编程 所有, 如有侵权,请联系我们删除。