点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink CEP 核心组件
- CEP 的应用场景
- CEP 的优势

超时事件提取
当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许制定超时处理程序。
FlinkCEP开发流程
- DataSource中的数据转换为DataStream
- 定义Pattern,并将DataStream和Pattern组合转换为PatternStream。
- PatternStream 经过 Select、Process 等算子转换为 DataStream
- 再次转换为 DataStream 经过处理后,Sink到目标库。
SELECT 方法:
SingleOutputStreamOperator<PayEvent> result =
patternStream.select(orderTimeoutOutput,newPatternTimeoutFunction<PayEvent,PayEvent>(){@OverridepublicPayEventtimeout(Map<String,List<PayEvent>> map,long l)throwsException{return map.get("begin").get(0);}},newPatternSelectFunction<PayEvent,PayEvent>(){@OverridepublicPayEventselect(Map<String,List<PayEvent>> map)throwsException{return map.get("pay").get(0);}});
对检测到的序列模式序列应用选择函数,对于每个模式序列,调用提供的 PatternSelectFunction,模式选择函数只能产生一个结果元素。
对超时的部分模式序列应用超时函数,对于每个部分模式序列,调用提供的 PatternTimeoutFunction,模式超时函数只能产生一个结果元素。
你可以在使用相同 OutputTag 进行 Select 操作 SingleOutputStreamOperator上获得SingleOutputStreamOperator生成的超时数据流。
非确定有限自动机
FlinkCEP 在运行时会将用户的逻辑转换为这样一个 NFA Graph(NFA对象)
所以有限状态机的工作过程,就是从开始状态,根据不同的输入,自动进行转换的过程。
上图中的状态机的功能,是检测二进制数是否含有偶数个0。从图上可以看出,输入只有1和0两种。
从S1状态开始,只有输入0才会转换到S2状态,同样S2状态下只有输入0才会转换到S1。所以,二进制输入完毕,如果满足最终状态,也就是最后停在S1状态,那么输入的二进制数就含有偶数个0。
CEP开发流程
FlinkCEP开发流程:
- DataSource中数据转换为DataStream、Watermark、keyby
- 定义Pattern,并将DataStream和Pattern组合转换为PatternStream
- PatternStream经过select、process等算子转换为 DataStream
- 再次转换为 DataStream 经过处理后,Sink到目标库
添加依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency>
案例1:恶意登录检测
找出5秒内,连续登录失败的账号
以下是数据:
new CepLoginBean(1L, "fail", 1597905234000L),
new CepLoginBean(1L, "success", 1597905235000L),
new CepLoginBean(2L, "fail", 1597905236000L),
new CepLoginBean(2L, "fail", 1597905237000L),
new CepLoginBean(2L, "fail", 1597905238000L),
new CepLoginBean(3L, "fail", 1597905239000L),
new CepLoginBean(3L, "success", 1597905240000L)
整体思路
- 获取到数据
- 在数据源上做Watermark
- 在Watermark上根据ID分组keyBy
- 做出模式Pattern
- 在数据流上进行模式匹配
- 提取匹配成功的数据
编写代码
packageicu.wzk;importorg.apache.flink.api.common.eventtime.*;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.cep.CEP;importorg.apache.flink.cep.PatternStream;importorg.apache.flink.cep.functions.PatternProcessFunction;importorg.apache.flink.cep.pattern.Pattern;importorg.apache.flink.cep.pattern.conditions.IterativeCondition;importorg.apache.flink.streaming.api.TimeCharacteristic;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.util.Collector;importjava.util.List;importjava.util.Map;publicclassFlinkCepLoginTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);DataStreamSource<CepLoginBean> data = env.fromElements(newCepLoginBean(1L,"fail",1597905234000L),newCepLoginBean(1L,"success",1597905235000L),newCepLoginBean(2L,"fail",1597905236000L),newCepLoginBean(2L,"fail",1597905237000L),newCepLoginBean(2L,"fail",1597905238000L),newCepLoginBean(3L,"fail",1597905239000L),newCepLoginBean(3L,"success",1597905240000L));SingleOutputStreamOperator<CepLoginBean> watermarks = data
.assignTimestampsAndWatermarks(newWatermarkStrategy<CepLoginBean>(){@OverridepublicWatermarkGenerator<CepLoginBean>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context){returnnewWatermarkGenerator<CepLoginBean>(){long maxTimestamp =Long.MAX_VALUE;long maxOutOfOrderness =500L;@OverridepublicvoidonEvent(CepLoginBean event,long eventTimestamp,WatermarkOutput output){
maxTimestamp =Math.max(maxTimestamp, event.getTimestamp());}@OverridepublicvoidonPeriodicEmit(WatermarkOutput output){
output.emitWatermark(newWatermark(maxTimestamp - maxOutOfOrderness));}};}}.withTimestampAssigner((element, recordTimestamp)-> element.getTimestamp()));KeyedStream<CepLoginBean,Long> keyed = watermarks
.keyBy(newKeySelector<CepLoginBean,Long>(){@OverridepublicLonggetKey(CepLoginBean value)throwsException{return value.getUserId();}});Pattern<CepLoginBean,CepLoginBean> pattern =Pattern.<CepLoginBean>begin("start").where(newIterativeCondition<CepLoginBean>(){@Overridepublicbooleanfilter(CepLoginBean cepLoginBean,Context<CepLoginBean> context)throwsException{return cepLoginBean.getOperation().equals("fail");}}).next("next").where(newIterativeCondition<CepLoginBean>(){@Overridepublicbooleanfilter(CepLoginBean cepLoginBean,Context<CepLoginBean> context)throwsException{return cepLoginBean.getOperation().equals("fail");}}).within(Time.seconds(5));PatternStream<CepLoginBean> patternStream =CEP.pattern(keyed, pattern);SingleOutputStreamOperator<CepLoginBean> process = patternStream
.process(newPatternProcessFunction<CepLoginBean,CepLoginBean>(){@OverridepublicvoidprocessMatch(Map<String,List<CepLoginBean>> map,Context context,Collector<CepLoginBean> collector)throwsException{System.out.println("map: "+ map);List<CepLoginBean> start = map.get("start");
collector.collect(start.get(0));}});
process.print();
env.execute("FlinkCepLoginTest");}}classCepLoginBean{privateLong userId;privateString operation;privateLong timestamp;publicCepLoginBean(Long userId,String operation,Long timestamp){this.userId = userId;this.operation = operation;this.timestamp = timestamp;}publicLonggetUserId(){return userId;}publicvoidsetUserId(Long userId){this.userId = userId;}publicStringgetOperation(){return operation;}publicvoidsetOperation(String operation){this.operation = operation;}publicLonggetTimestamp(){return timestamp;}publicvoidsetTimestamp(Long timestamp){this.timestamp = timestamp;}@OverridepublicStringtoString(){return"CepLoginBean{"+"userId="+ userId +", operation='"+ operation +'\''+", timestamp="+ timestamp +'}';}}
运行结果
可以看到程序输出:
map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}
map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905238000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}
Process finished with exit code 0
运行截图如下所示:
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。