Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
- 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
- 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
- 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
- 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
- 5、Flink 监控系列 本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
本文介绍了Flink 的类库CEP的模式检测,主要介绍数据的三种选取方式以及延迟数据的处理。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)
三、检测模式
1、将模式应用到流上
将模式应用到事件流上只要调用 CEP 类的静态方法.pattern()即可,将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个 PatternStream:
// 代码片段,完整内容可以参考本文中的其他完整示例StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent,?> loginEventPattern =Pattern.begin(...);// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream =CEP.pattern(loginEventDS, loginEventPattern);
输入流根据你的使用场景可以是keyed或者non-keyed。
在 non-keyed 流上使用模式将会使你的作业并发度被设为1。
这里的 DataStream,也可以通过 keyBy 进行按键分区得到 KeyedStream,接下来对复杂事件的检测就会针对不同的 key 单独进行了。
模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。
默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;
如果是处理时间语义,那么所谓先后就是数据到达的顺序。
对于时间戳相同或是同时到达的事件,我们还可以在 CEP.pattern()中传入一个比较器作为第三个参数,用来进行更精确的排序:
// 代码片段DataStream<Event> input =...;Pattern<Event,?> pattern =...;EventComparator<Event> comparator =...;// 可选的PatternStream<Event> patternStream =CEP.pattern(loginEventDS, loginEventPattern, comparator);
得到 PatternStream 后,接下来要做的就是对匹配事件的检测处理了。
2、从模式中选取
PatternStream 的转换操作分成两种:选择提取(select)操作和处理(process)操作。与 DataStream 的转换类似,在调用API 时传入一个函数类,即选择操作传入的是一个 PatternSelectFunction,处理操作传入PatternProcessFunction。
1)、匹配事件的选择提取(PatternSelectFunction)
处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。
PatternSelectFunction:代码中基于 PatternStream 直接调用.select()方法,传入一个 PatternSelectFunction 作为参数。
- 示例
staticvoidtest1()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs)-> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent,?> loginEventPattern =Pattern.<LoginEvent>begin("first").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}}).next("second").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}}).next("third").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}});// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream =CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出
patternStream
.select(newPatternSelectFunction<LoginEvent,String>(){@OverridepublicStringselect(Map<String,List<LoginEvent>> map)throwsException{return map.get("first").toString()+" \n"+ map.get("second").toString()+" \n"+ map.get("third").toString();}}).print("输出信息:\n");// 控制台输出:
env.execute();}
PatternSelectFunction 是 Flink CEP 提供的一个函数类接口,需要实现一个 select()方法,这个方法每当检测到一组匹配的复杂事件时都会调用一次。它会将检测到的匹配事件保存在一个 Map 里,对应的 key 就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在 Map 里的 value 就是一个事件的列表(List)。
如果个体模式是单例的,那么 List 中只有一个元素,直接调用.get(0)就可以把它取出。
如果个体模式是循环的,List 中就有可能有多个元素了。
可以将匹配到的事件包装成 String 类型输出,代码如下:
staticvoidtest2()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs)-> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent,?> loginEventPattern =Pattern.<LoginEvent>begin("pattern").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}}).times(3)// 匹配三次.consecutive();// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream =CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出
patternStream
.select(newPatternSelectFunction<LoginEvent,String>(){@OverridepublicStringselect(Map<String,List<LoginEvent>> map)throwsException{// list中放了一个匹配了3个事件的模式return map.get("pattern").get(0).toString()+" \n"+ map.get("pattern").get(1).toString()+" \n"+ map.get("pattern").get(2).toString();}}).print("输出信息:\n");// 控制台输出:
env.execute();}
2)、匹配事件的选择提取(PatternFlatSelectFunction)
要实现一个flatSelect()方法,与 select()的不同就在于没有返回值,b并且多了一个收集器(Collector)参数 out,通过调用 out.collet()方法就可以实现多次发送输出数据了。
staticvoidtest3()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs)-> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent,?> loginEventPattern =Pattern.<LoginEvent>begin("pattern").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}}).times(3)// 匹配三次.consecutive();// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream =CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出
patternStream
.flatSelect(newPatternFlatSelectFunction<LoginEvent,String>(){@OverridepublicvoidflatSelect(Map<String,List<LoginEvent>> map,Collector<String> out)throwsException{
out.collect(// list中放了一个匹配了3个事件的模式
map.get("pattern").get(0).toString()+" \n"+ map.get("pattern").get(1).toString()+" \n"+ map.get("pattern").get(2).toString());}}).print("输出信息:\n");// 控制台输出:
env.execute();}
3)、匹配事件的通用处理(PatternProcessFunction)
在获得到一个PatternStream之后,你可以应用各种转换来发现事件序列。
推荐使用PatternProcessFunction。
PatternProcessFunction有一个processMatch的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List>的格式接收一个匹配,映射的键是你的模式序列中的每个模式的名称,值是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()和times())时, 对一个模式会有不止一个事件被接受。
PatternProcessFunction 中必须实现一个 processMatch()方法;这个方法与之前的 flatSelect()类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time),还可以调用.output()方法将数据输出到侧输出流。在 CEP 中,侧输出流一般被用来处理超时事件。
- 官方示例
classMyPatternProcessFunction<IN, OUT>extendsPatternProcessFunction<IN, OUT>{@OverridepublicvoidprocessMatch(Map<String,List<IN>> match,Context ctx,Collector<OUT> out)throwsException;IN startEvent = match.get("start").get(0);IN endEvent = match.get("end").get(0);
out.collect(OUT(startEvent, endEvent));}}
- 完整示例
packageorg.cep;importjava.time.Duration;importjava.util.Arrays;importjava.util.List;importjava.util.Map;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.cep.CEP;importorg.apache.flink.cep.PatternFlatSelectFunction;importorg.apache.flink.cep.PatternSelectFunction;importorg.apache.flink.cep.PatternStream;importorg.apache.flink.cep.functions.PatternProcessFunction;importorg.apache.flink.cep.pattern.Pattern;importorg.apache.flink.cep.pattern.conditions.IterativeCondition;importorg.apache.flink.cep.pattern.conditions.SimpleCondition;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/publicclassTestCEPDemo{@Data@NoArgsConstructor@AllArgsConstructorstaticclassLoginEvent{privateInteger userId;privateString ip;privateString status;privateLong timestamp;@Overridepublicbooleanequals(Object obj){if(obj instanceofLoginEvent){LoginEvent loginEvent =(LoginEvent) obj;returnthis.userId == loginEvent.getUserId()&&this.ip.equals(loginEvent.ip)&&this.status.equals(loginEvent.getStatus())&&this.timestamp == loginEvent.getTimestamp();}else{returnfalse;}}@OverridepublicinthashCode(){returnsuper.hashCode()+Long.hashCode(timestamp);}}finalstaticList<LoginEvent> loginEventList =Arrays.asList(newLoginEvent(1001,"192.168.10.1","F",2L),newLoginEvent(1001,"192.168.10.2","F",3L),newLoginEvent(1002,"192.168.10.8","F",4L),newLoginEvent(1001,"192.168.10.6","F",5L),newLoginEvent(1002,"192.168.10.8","F",7L),newLoginEvent(1002,"192.168.10.8","F",8L),newLoginEvent(1002,"192.168.10.8","S",6L),newLoginEvent(1003,"192.168.10.8","F",6L),newLoginEvent(1004,"192.168.10.3","S",4L));staticvoidtestProcess()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs)-> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent,?> loginEventPattern =Pattern.begin(Pattern.<LoginEvent>begin("first").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream =CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出
patternStream
.flatSelect(newPatternFlatSelectFunction<LoginEvent,String>(){@OverridepublicvoidflatSelect(Map<String,List<LoginEvent>> pattern,Collector<String> out)throwsException{
out.collect(pattern.get("first").toString());}}).print("flatSelect输出信息:\n");
patternStream.process(newPatternProcessFunction<LoginEvent,String>(){@OverridepublicvoidprocessMatch(Map<String,List<LoginEvent>> match,Context ctx,Collector<String> out)throwsException{
out.collect(match.get("first").toString());}}).print("process输出信息:\n");// 控制台输出:
env.execute();}publicstaticvoidmain(String[] args)throwsException{testProcess();}}
PatternProcessFunction可以访问Context对象。有了它之后,你可以访问时间属性,比如currentProcessingTime或者当前匹配的timestamp (最新分配到匹配上的事件的时间戳)。
3、处理超时的部分匹配
当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction里另外实现这个接口。 TimedOutPartialMatchHandler提供了另外的processTimedOutMatch方法,这个方法对每个超时的部分匹配都会调用。
- 官方示例
classMyPatternProcessFunction<IN, OUT>extendsPatternProcessFunction<IN, OUT>implementsTimedOutPartialMatchHandler<IN>{@OverridepublicvoidprocessMatch(Map<String,List<IN>> match,Context ctx,Collector<OUT> out)throwsException;...}@OverridepublicvoidprocessTimedOutMatch(Map<String,List<IN>> match,Context ctx)throwsException;IN startEvent = match.get("start").get(0);
ctx.output(outputTag,T(startEvent));}}
processTimedOutMatch不能访问主输出。 但你可以通过Context对象把结果输出到侧输出。
前面提到的PatternProcessFunction是在Flink 1.8之后引入的,从那之后推荐使用这个接口来处理匹配到的结果。 用户仍然可以使用像select/flatSelect这样旧格式的API,它们会在内部被转换为PatternProcessFunction。
PatternStream<Event> patternStream =CEP.pattern(input, pattern);OutputTag<String> outputTag =newOutputTag<String>("side-output"){};SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
outputTag,newPatternFlatTimeoutFunction<Event,TimeoutEvent>(){publicvoidtimeout(Map<String,List<Event>> pattern,long timeoutTimestamp,Collector<TimeoutEvent> out)throwsException{
out.collect(newTimeoutEvent());}},newPatternFlatSelectFunction<Event,ComplexEvent>(){publicvoidflatSelect(Map<String,List<IN>> pattern,Collector<OUT> out)throwsException{
out.collect(newComplexEvent());}});DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
1)、使用 PatternProcessFunction 的侧输出流
在 Flink CEP 中 , 提供了一个专门捕捉超时的部分匹配事件的接口TimedOutPartialMatchHandler。这个接口需要实现一个 processTimedOutMatch()方法,可以将超时的已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。
官方推荐做法
完整示例如下:
packageorg.cep;importjava.time.Duration;importjava.util.Arrays;importjava.util.List;importjava.util.Map;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.cep.CEP;importorg.apache.flink.cep.PatternFlatSelectFunction;importorg.apache.flink.cep.PatternSelectFunction;importorg.apache.flink.cep.PatternStream;importorg.apache.flink.cep.PatternTimeoutFunction;importorg.apache.flink.cep.functions.PatternProcessFunction;importorg.apache.flink.cep.functions.TimedOutPartialMatchHandler;importorg.apache.flink.cep.pattern.Pattern;importorg.apache.flink.cep.pattern.conditions.IterativeCondition;importorg.apache.flink.cep.pattern.conditions.SimpleCondition;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;importorg.apache.flink.util.OutputTag;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/publicclassTestCEPDemo{@Data@NoArgsConstructor@AllArgsConstructorstaticclassLoginEvent{privateInteger userId;privateString ip;privateString status;privateLong timestamp;@Overridepublicbooleanequals(Object obj){if(obj instanceofLoginEvent){LoginEvent loginEvent =(LoginEvent) obj;returnthis.userId == loginEvent.getUserId()&&this.ip.equals(loginEvent.ip)&&this.status.equals(loginEvent.getStatus())&&this.timestamp == loginEvent.getTimestamp();}else{returnfalse;}}@OverridepublicinthashCode(){returnsuper.hashCode()+Long.hashCode(timestamp);}}finalstaticList<LoginEvent> loginEventList =Arrays.asList(newLoginEvent(1001,"192.168.10.1","F",2L),newLoginEvent(1001,"192.168.10.2","F",3L),newLoginEvent(1002,"192.168.10.8","F",4L),newLoginEvent(1001,"192.168.10.6","F",5L),newLoginEvent(1002,"192.168.10.8","F",7L),newLoginEvent(1002,"192.168.10.8","F",8L),newLoginEvent(1002,"192.168.10.8","S",6L),newLoginEvent(1003,"192.168.10.8","F",6L),newLoginEvent(1005,"192.168.10.8","F",26L),newLoginEvent(1004,"192.168.10.3","S",4L));// 推荐做法staticvoidtestProcessTimedOut()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs)-> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent,?> loginEventPattern =Pattern.begin(Pattern.<LoginEvent>begin("first").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream =CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出OutputTag<String> outputTag =newOutputTag<String>("alan_ProcessTimedOut",TypeInformation.of(String.class));DataStream<String> resultStream = patternStream.process(newAlanProcessTimedOut(outputTag));// 正常流输出
resultStream.print("输出信息:\n");// 超时流输出,通过OutputTag((SingleOutputStreamOperator<String>) resultStream).getSideOutput(outputTag).print("timeout输出信息:\n");// 控制台输出:
env.execute();}publicstaticvoidmain(String[] args)throwsException{testProcessTimedOut();}staticclassAlanProcessTimedOutextendsPatternProcessFunction<LoginEvent,String>implementsTimedOutPartialMatchHandler<LoginEvent>{privateOutputTag<String> outputTag;publicAlanProcessTimedOut(OutputTag<String> outputTag){this.outputTag = outputTag;}// 超时匹配处理@OverridepublicvoidprocessTimedOutMatch(Map<String,List<LoginEvent>> match,Context ctx)throwsException{// OutputTag<LoginEvent> outputTag = new OutputTag<LoginEvent>("AlanProcessTimedOut");
ctx.output(outputTag, match.get("first").toString());}// 正常匹配处理@OverridepublicvoidprocessMatch(Map<String,List<LoginEvent>> match,Context ctx,Collector<String> out)throwsException{
out.collect(match.get("first").toString());}}}
2)、使用 PatternTimeoutFunction的侧输出流
PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力,这是官方推荐的做法。
Flink CEP 中也保留了简化版的PatternSelectFunction,它无法直接处理超时事件,不过可以通过调用 PatternStream的.select()方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。
PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout()方法,同样会将部分匹配的事件放在一个 Map 中作为参数传入,此外还有一个参数是当前的时间戳。提取部分匹配事件进行处理转换后,可以将通知或报警信息输出。
在调用 PatternStream 的.select()方法时需要传入三个参数:
- 侧输出流标签(OutputTag)
- 超时事件处理函数 PatternTimeoutFunction
- 匹配事件提取函数PatternSelectFunction
importjava.time.Duration;importjava.util.Arrays;importjava.util.List;importjava.util.Map;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.cep.CEP;importorg.apache.flink.cep.PatternFlatSelectFunction;importorg.apache.flink.cep.PatternSelectFunction;importorg.apache.flink.cep.PatternStream;importorg.apache.flink.cep.PatternTimeoutFunction;importorg.apache.flink.cep.functions.PatternProcessFunction;importorg.apache.flink.cep.functions.TimedOutPartialMatchHandler;importorg.apache.flink.cep.pattern.Pattern;importorg.apache.flink.cep.pattern.conditions.IterativeCondition;importorg.apache.flink.cep.pattern.conditions.SimpleCondition;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;importorg.apache.flink.util.OutputTag;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/publicclassTestCEPDemo{@Data@NoArgsConstructor@AllArgsConstructorstaticclassLoginEvent{privateInteger userId;privateString ip;privateString status;privateLong timestamp;@Overridepublicbooleanequals(Object obj){if(obj instanceofLoginEvent){LoginEvent loginEvent =(LoginEvent) obj;returnthis.userId == loginEvent.getUserId()&&this.ip.equals(loginEvent.ip)&&this.status.equals(loginEvent.getStatus())&&this.timestamp == loginEvent.getTimestamp();}else{returnfalse;}}@OverridepublicinthashCode(){returnsuper.hashCode()+Long.hashCode(timestamp);}}finalstaticList<LoginEvent> loginEventList =Arrays.asList(newLoginEvent(1001,"192.168.10.1","F",2L),newLoginEvent(1001,"192.168.10.2","F",3L),newLoginEvent(1002,"192.168.10.8","F",4L),newLoginEvent(1001,"192.168.10.6","F",5L),newLoginEvent(1002,"192.168.10.8","F",7L),newLoginEvent(1002,"192.168.10.8","F",8L),newLoginEvent(1002,"192.168.10.8","S",6L),newLoginEvent(1003,"192.168.10.8","F",6L),newLoginEvent(1005,"192.168.10.8","F",26L),newLoginEvent(1004,"192.168.10.3","S",4L));staticvoidtestProcessTimedOut2()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs)-> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent,?> loginEventPattern =Pattern.begin(Pattern.<LoginEvent>begin("first").where(newSimpleCondition<LoginEvent>(){@Overridepublicbooleanfilter(LoginEvent value)throwsException{return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream =CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出OutputTag<String> outputTag =newOutputTag<String>("alan_ProcessTimedOut",TypeInformation.of(String.class));SingleOutputStreamOperator<String> resultStream = patternStream.select(outputTag,newPatternTimeoutFunction<LoginEvent,String>(){// 处理超时流@OverridepublicStringtimeout(Map<String,List<LoginEvent>> pattern,long timeoutTimestamp)throwsException{return pattern.get("first").toString()+" timeoutTimestamp:"+ timeoutTimestamp;}},newPatternSelectFunction<LoginEvent,String>(){// 处理正常流@OverridepublicStringselect(Map<String,List<LoginEvent>> pattern)throwsException{return pattern.get("first").toString();}});// 正常流输出
resultStream.print("输出信息:\n");// 超时流输出,通过OutputTag
resultStream.getSideOutput(outputTag).print("timeout输出信息:\n");// 控制台输出:
env.execute();}publicstaticvoidmain(String[] args)throwsException{testProcessTimedOut2();}}
以上,本文介绍了Flink 的类库CEP的模式检测,主要介绍数据的三种选取方式以及延迟数据的处理。
本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。