0


59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文介绍了Flink 的类库CEP的时间处理、可选的参数以及在实际工作中非常有用的三个示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

四、CEP库中的时间

1、按照事件时间处理迟到事件

在CEP中,事件的处理顺序很重要。在使用事件时间时,为了保证事件按照正确的顺序被处理,一个事件到来后会先被放到一个缓冲区中, 在缓冲区里事件都按照时间戳从小到大排序,当水位线到达后,缓冲区中所有小于水位线的事件被处理。这意味着水位线之间的数据都按照时间戳被顺序处理。

这个库假定按照事件时间时水位线一定是正确的。
为了保证跨水位线的事件按照事件时间处理,Flink CEP库假定水位线一定是正确的,并且把时间戳小于最新水位线的事件看作是晚到的。 晚到的事件不会被处理。你也可以指定一个侧输出标志来收集比最新水位线晚到的事件,你可以这样做:

PatternStream<Event> patternStream =CEP.pattern(input, pattern);OutputTag<String> lateDataOutputTag =newOutputTag<String>("late-data"){};SingleOutputStreamOperator<ComplexEvent> result = patternStream
    .sideOutputLateData(lateDataOutputTag).select(newPatternSelectFunction<Event,ComplexEvent>(){...});DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);

2、时间上下文

在PatternProcessFunction中,用户可以和IterativeCondition中 一样按照下面的方法使用实现了TimeContext的上下文:

/**
 * 支持获取事件属性比如当前处理事件或当前正处理的事件的时间。
 * 用在{@link PatternProcessFunction}和{@link org.apache.flink.cep.pattern.conditions.IterativeCondition}中
 */@PublicEvolvingpublicinterfaceTimeContext{/**
     * 当前正处理的事件的时间戳。
     *
     * <p>如果是{@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime},这个值会被设置为事件进入CEP算子的时间。
     */longtimestamp();/** 返回当前的处理时间。 */longcurrentProcessingTime();}

这个上下文让用户可以获取处理的事件(在IterativeCondition时候是进来的记录,在PatternProcessFunction是匹配的结果)的时间属性。 调用TimeContext#currentProcessingTime总是返回当前的处理时间,而且尽量去调用这个函数而不是调用其它的比如说System.currentTimeMillis()。

使用EventTime时,TimeContext#timestamp()返回的值等于分配的时间戳。 使用ProcessingTime时,这个值等于事件进入CEP算子的时间点(在PatternProcessFunction中是匹配产生的时间)。 这意味着多次调用这个方法得到的值是一致的。

五、可选的参数设置

用于配置 Flink CEP 的 SharedBuffer 缓存容量的选项。它可以加快 CEP 算子的处理速度,并限制内存中缓存的元素数量。

仅当 state.backend.type 设置为 rocksdb 时限制内存使用才有效,这会将超过缓存数量的元素传输到 rocksdb 状态存储而不是内存状态存储。当 state.backend.type 设置为 rocksdb 时,这些配置项有助于限制内存。相比之下,当 state.backend 设置为非 rocksdb 时,缓存会导致性能下降。与使用 Map 实现的旧缓存相比,状态部分将包含更多从 guava-cache 换出的元素,这将使得 copy on write 时的状态处理增加一些开销。

在这里插入图片描述

六、示例

本部分通过几个示例展示CEP的使用方式,有些是工作中实际的例子简化版,有些是为了演示CEP功能构造的例子,其运行结果均在代码的注释中。

1、maven依赖

本文示例均使用此处的依赖。

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!-- <scope>provided</scope> --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep</artifactId><version>1.17.2</version></dependency></dependencies>

2、示例:输出每个用户连续三次登录失败的信息,允许数据延迟10s

importjava.time.Duration;importjava.util.Arrays;importjava.util.List;importjava.util.Map;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.cep.CEP;importorg.apache.flink.cep.PatternSelectFunction;importorg.apache.flink.cep.PatternStream;importorg.apache.flink.cep.pattern.Pattern;importorg.apache.flink.cep.pattern.conditions.SimpleCondition;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 输出每个用户连续三次登录失败的信息,允许数据延迟10s
 */publicclassTestLoginFailDemo{@Data@NoArgsConstructor@AllArgsConstructorstaticclassLoginEvent{privateInteger userId;privateString ip;privateString status;privateLong timestamp;@Overridepublicbooleanequals(Object obj){if(obj instanceofLoginEvent){LoginEvent loginEvent =(LoginEvent) obj;returnthis.userId == loginEvent.getUserId()&&this.ip.equals(loginEvent.ip)&&this.status.equals(loginEvent.getStatus())&&this.timestamp == loginEvent.getTimestamp();}else{returnfalse;}}@OverridepublicinthashCode(){returnsuper.hashCode()+Long.hashCode(timestamp);}}finalstaticList<LoginEvent> loginEventList =Arrays.asList(newLoginEvent(1001,"192.168.10.1","F",2L),newLoginEvent(1001,"192.168.10.2","F",3L),newLoginEvent(1002,"192.168.10.8","F",4L),newLoginEvent(1001,"192.168.10.6","F",5L),newLoginEvent(1002,"192.168.10.8","F",7L),newLoginEvent(1002,"192.168.10.8","F",8L),newLoginEvent(1002,"192.168.10.8","S",6L));publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs)-> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式,连续的三个登录失败事件Pattern<LoginEvent,?> loginEventPattern =Pattern.<LoginEvent>begin("first").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}}).next("second").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}}).next("third").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}});// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream =CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出
        patternStream
                .select(newPatternSelectFunction<LoginEvent,String>(){@OverridepublicStringselect(Map<String,List<LoginEvent>> map)throwsException{// 个体模式是单例的,List 中只有一个元素LoginEvent first = map.get("first").get(0);LoginEvent second = map.get("second").get(0);LoginEvent third = map.get("third").get(0);// return first.toString() + "\n" + second.toString() + "\n" + third.toString();return map.get("first").toString()+" \n"+ map.get("second").toString()+" \n"+ map.get("third").toString();}}).print("连续三次登录失败用户信息:\n");// 连续三次登录失败用户信息:// :9> TestLoginFailDemo.LoginEvent(userId=1001, ip=192.168.10.1, status=F,timestamp=2)// TestLoginFailDemo.LoginEvent(userId=1001, ip=192.168.10.2, status=F, timestamp=3)// TestLoginFailDemo.LoginEvent(userId=1001, ip=192.168.10.6, status=F, timestamp=5)

        env.execute();}}

3、示例:查找 同一个ip在10秒内访问同一个链接超过3次的ip,可以不连续

可以监控接口是否被攻击,应用应该比较广泛。

importjava.time.Duration;importjava.util.Arrays;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.cep.CEP;importorg.apache.flink.cep.PatternStream;importorg.apache.flink.cep.functions.PatternProcessFunction;importorg.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;importorg.apache.flink.cep.pattern.Pattern;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.util.Collector;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 查找 同一个ip在10秒内访问同一个链接超过3次的ip,可以不连续
 */publicclassTestRepeatAccessDemo{@Data@NoArgsConstructor@AllArgsConstructorstaticclassLogMessage{privateString ip;privateString url;privateLong timestamp;@Overridepublicbooleanequals(Object obj){if(obj instanceofLogMessage){LogMessage logMessage =(LogMessage) obj;returnthis.ip.equals(logMessage.getIp())&&this.url.equals(logMessage.getUrl())&&this.timestamp == logMessage.getTimestamp();}else{returnfalse;}}@OverridepublicinthashCode(){returnsuper.hashCode()+Long.hashCode(timestamp);}}finalstaticList<LogMessage> logMessageList =Arrays.asList(newLogMessage("192.168.10.1","URL1",2L),newLogMessage("192.168.10.1","URL1",3L),newLogMessage("192.168.10.1","URL2",4L),newLogMessage("192.168.10.1","URL2",5L),newLogMessage("192.168.10.8","URL1",6L),newLogMessage("192.168.10.1","URL1",7L));@Data@NoArgsConstructor@AllArgsConstructorstaticclassRiskLogListextendsLogMessage{privateint count;}staticvoidtest1()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LogMessage> logMessageDS = env.fromCollection(logMessageList).assignTimestampsAndWatermarks(WatermarkStrategy.<LogMessage>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((logMessage, rs)-> logMessage.getTimestamp())).keyBy(logMessage -> logMessage.getIp()+ logMessage.getUrl());// 根据ip和url分组// 定义模式Pattern<LogMessage,?> logMessagePattern =Pattern.<LogMessage>begin("first").followedBy("second").times(2).within(Time.seconds(10));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LogMessage> patternStream =CEP.pattern(logMessageDS, logMessagePattern);// 将匹配到的流选择出来输出
        patternStream.process(newPatternProcessFunction<LogMessage,String>(){@OverridepublicvoidprocessMatch(Map<String,List<LogMessage>> match,Context ctx,Collector<String> out)throwsException{LogMessage logMessage1 = match.get("first").get(0);LogMessage logMessage2 = match.get("second").get(0);LogMessage logMessage3 = match.get("second").get(1);boolean flag = logMessage1.getUrl().equals(logMessage2.getUrl())&& logMessage1.getUrl().equals(logMessage3.getUrl());if(flag){
                    out.collect(logMessage1.getIp()+"  url:"+ logMessage1.getUrl()+"   timestamp:"+ logMessage1.getTimestamp()+"  timestamp2:"+ logMessage2.getTimestamp()+"  timestamp3:"+ logMessage3.getTimestamp());}}}).print("输出信息:\n");// 控制台输出:// 输出信息::1> 192.168.10.1 url:URL1 timestamp:2 timestamp2:3 timestamp3:7
        env.execute();}publicstaticvoidmain(String[] args)throwsException{test1();}}

4、示例:监测服务器的温度并告警

监测 机器温度一小时内三次大于设定温度进行风险记录,将风险记录中的数据上一次大于平均值进行报警。

importjava.time.Duration;importjava.util.ArrayList;importjava.util.Arrays;importjava.util.HashMap;importjava.util.Iterator;importjava.util.List;importjava.util.Map;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.cep.CEP;importorg.apache.flink.cep.PatternStream;importorg.apache.flink.cep.functions.PatternProcessFunction;importorg.apache.flink.cep.pattern.Pattern;importorg.apache.flink.cep.pattern.conditions.SimpleCondition;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.util.Collector;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 监测 机器温度一小时内三次大于设定温度进行风险记录,将风险记录中的数据上一次大于平均值进行报警
 */publicclassTestMachineMonitoring{// 机器的基本信息@Data@NoArgsConstructor@AllArgsConstructorstaticclassMechineInfo{privateint mechineId;privateString mechineName;privateint temperature;privateLong timestamp;@Overridepublicbooleanequals(Object obj){if(obj instanceofMechineInfo){MechineInfo mechineInfo =(MechineInfo) obj;returnthis.mechineId == mechineInfo.getMechineId()&&this.mechineName.equals(mechineInfo.getMechineName())&&this.timestamp == mechineInfo.getTimestamp()&&this.temperature == mechineInfo.getTemperature();}else{returnfalse;}}@OverridepublicinthashCode(){returnsuper.hashCode()+Long.hashCode(timestamp);}}// 机器的三次平均温度@Data@NoArgsConstructor@AllArgsConstructorstaticclassMechineRiskInfo{privateint mechineId;privatedouble avgTemperature;privateLong timestamp;@Overridepublicbooleanequals(Object obj){if(obj instanceofMechineRiskInfo){MechineRiskInfo mechineRiskInfo =(MechineRiskInfo) obj;returnthis.mechineId == mechineRiskInfo.getMechineId()&&this.avgTemperature == mechineRiskInfo.getAvgTemperature()&&this.timestamp == mechineRiskInfo.getTimestamp();}else{returnfalse;}}@OverridepublicinthashCode(){returnsuper.hashCode()+Long.hashCode(timestamp);}}// 预警通知信息@Data@NoArgsConstructor@AllArgsConstructorstaticclassMechineAlertInfo{privateint mechineId;privateString email;privatedouble avgTemperature;privateLong timestamp;@Overridepublicbooleanequals(Object obj){if(obj instanceofMechineAlertInfo){MechineAlertInfo mechineAlertInfo =(MechineAlertInfo) obj;returnthis.mechineId == mechineAlertInfo.getMechineId()&&this.email == mechineAlertInfo.getEmail()&&this.avgTemperature == mechineAlertInfo.getAvgTemperature()&&this.timestamp == mechineAlertInfo.getTimestamp();}else{returnfalse;}}@OverridepublicinthashCode(){returnsuper.hashCode()+Long.hashCode(timestamp);}}// 初始化流数据staticList<MechineInfo> mechineInfoList =Arrays.asList(newMechineInfo(1,"m1",331,2L),newMechineInfo(1,"m1",321,4L),newMechineInfo(1,"m1",311,5L),newMechineInfo(1,"m1",361,7L),newMechineInfo(1,"m1",351,9L),newMechineInfo(1,"m1",341,11L),newMechineInfo(2,"m11",121,3L),newMechineInfo(3,"m21",101,4L),newMechineInfo(4,"m31",98,5L),newMechineInfo(5,"m41",123,6L));// 风险数据集合// static List<MechineRiskInfo> mechineRiskInfoList = new ArrayList();// 预警数据集合// static Map<String, MechineAlertInfo> mechineAlertInfoMap = new HashMap<String, MechineAlertInfo>();// 预警温度privatestaticfinaldoubleTEMPERATURE_SETTING=100;// 超时数据staticvoidtest1()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<MechineInfo> mechineInfoStream = env.fromCollection(mechineInfoList).assignTimestampsAndWatermarks(WatermarkStrategy.<MechineInfo>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((logMessage, rs)-> logMessage.getTimestamp())).keyBy(mechineInfo -> mechineInfo.getMechineId());// 根据ip和url分组// 定义模式1,过滤温度大于设置温度Pattern<MechineInfo,?> mechineInfoPattern =Pattern.<MechineInfo>begin("first").where(newSimpleCondition<MechineInfo>(){@Overridepublicbooleanfilter(MechineInfo value)throwsException{return value.getTemperature()>=TEMPERATURE_SETTING;}}).followedBy("second").where(newSimpleCondition<MechineInfo>(){@Overridepublicbooleanfilter(MechineInfo value)throwsException{return value.getTemperature()>=TEMPERATURE_SETTING;}}).times(2).within(Time.minutes(10));PatternStream<MechineInfo> patternStream =CEP.pattern(mechineInfoStream, mechineInfoPattern);// 筛选,并计算 三次温度平均值DataStream<MechineRiskInfo> mechineRiskInfoStream = patternStream
                .process(newPatternProcessFunction<TestMachineMonitoring.MechineInfo,MechineRiskInfo>(){@OverridepublicvoidprocessMatch(Map<String,List<MechineInfo>> match,Context ctx,Collector<MechineRiskInfo> out)throwsException{MechineInfo firstMechineInfo = match.get("first").get(0);MechineInfo secondMechineInfo1 = match.get("second").get(0);MechineInfo secondMechineInfo2 = match.get("second").get(1);// System.out.printf("mechineInfo:id=%s,name=%s,t=%s,ts=%s",//         firstMechineInfo.getMechineId(),//         firstMechineInfo.getMechineName(), firstMechineInfo.getTemperature(),//         firstMechineInfo.getTimestamp() + "\n");// System.out.printf("secondMechineInfo1:id=%s,name=%s,t=%s,ts=%s",//         secondMechineInfo1.getMechineId(),//         secondMechineInfo1.getMechineName(), secondMechineInfo1.getTemperature(),//         secondMechineInfo1.getTimestamp() + "\n");// System.out.printf("secondMechineInfo2:id=%s,name=%s,t=%s,ts=%s",//         secondMechineInfo2.getMechineId(),//         secondMechineInfo2.getMechineName(), secondMechineInfo2.getTemperature(),//         secondMechineInfo2.getTimestamp() + "\n");

                        out.collect(newMechineRiskInfo(
                                firstMechineInfo.getMechineId(),(firstMechineInfo.getTemperature()+ secondMechineInfo1.getTemperature()+ secondMechineInfo2.getTemperature())/3,
                                ctx.timestamp()));}}).keyBy(mechineRiskInfo -> mechineRiskInfo.getMechineId());

        mechineRiskInfoStream.print("mechineRiskInfoStream:");// 定义模式2,比较风险数据的前后两条,如果是上升的趋势,则报警,并设置报警联系人Pattern<MechineRiskInfo,?> mechineRiskInfoPattern =Pattern.<MechineRiskInfo>begin("step1").next("step2").within(Time.hours(1));PatternStream<MechineRiskInfo> patternStream2 =CEP.pattern(mechineRiskInfoStream, mechineRiskInfoPattern);// 筛选 警告信息,并设置发送邮箱DataStream<MechineAlertInfo> mechineAlertInfoList = patternStream2
                .process(newPatternProcessFunction<TestMachineMonitoring.MechineRiskInfo,MechineAlertInfo>(){@OverridepublicvoidprocessMatch(Map<String,List<MechineRiskInfo>> match,Context ctx,Collector<MechineAlertInfo> out)throwsException{MechineRiskInfo mechineRiskInfo1 = match.get("step1").get(0);MechineRiskInfo mechineRiskInfo2 = match.get("step2").get(0);MechineAlertInfoMechineAlertInfo=null;if(mechineRiskInfo1.getAvgTemperature()<= mechineRiskInfo2.getAvgTemperature()){MechineAlertInfo=newMechineAlertInfo(mechineRiskInfo1.getMechineId(),"[email protected]",
                                    mechineRiskInfo2.getAvgTemperature(), ctx.currentProcessingTime());

                            out.collect(MechineAlertInfo);}}});
        mechineAlertInfoList.print("mechineAlertInfoList:");// mechineAlertInfoList::11> TestMachineMonitoring.MechineAlertInfo(mechineId=1, [email protected], avgTemperature=331.0, timestamp=1705366481553)// mechineAlertInfoList::11> TestMachineMonitoring.MechineAlertInfo(mechineId=1, [email protected], avgTemperature=341.0, timestamp=1705366481566)// mechineAlertInfoList::11> TestMachineMonitoring.MechineAlertInfo(mechineId=1, [email protected], avgTemperature=351.0, timestamp=1705366481567)
        env.execute();}publicstaticvoidmain(String[] args)throwsException{test1();}}

以上,本文介绍了Flink 的类库CEP的时间处理、可选的参数以及在实际工作中非常有用的三个示例。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

标签: flink 大数据 kafka

本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/135615696
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。

“59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例”的评论:

还没有评论