0


大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink CEP 核心组件
  • CEP 的应用场景
  • CEP 的优势

在这里插入图片描述

超时事件提取

当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许制定超时处理程序。

FlinkCEP开发流程

  • DataSource中的数据转换为DataStream
  • 定义Pattern,并将DataStream和Pattern组合转换为PatternStream。
  • PatternStream 经过 Select、Process 等算子转换为 DataStream
  • 再次转换为 DataStream 经过处理后,Sink到目标库。

SELECT 方法:

SingleOutputStreamOperator<PayEvent> result =
    patternStream.select(orderTimeoutOutput,newPatternTimeoutFunction<PayEvent,PayEvent>(){@OverridepublicPayEventtimeout(Map<String,List<PayEvent>> map,long l)throwsException{return map.get("begin").get(0);}},newPatternSelectFunction<PayEvent,PayEvent>(){@OverridepublicPayEventselect(Map<String,List<PayEvent>> map)throwsException{return map.get("pay").get(0);}});

对检测到的序列模式序列应用选择函数,对于每个模式序列,调用提供的 PatternSelectFunction,模式选择函数只能产生一个结果元素。
对超时的部分模式序列应用超时函数,对于每个部分模式序列,调用提供的 PatternTimeoutFunction,模式超时函数只能产生一个结果元素。
你可以在使用相同 OutputTag 进行 Select 操作 SingleOutputStreamOperator上获得SingleOutputStreamOperator生成的超时数据流。

非确定有限自动机

FlinkCEP 在运行时会将用户的逻辑转换为这样一个 NFA Graph(NFA对象)
所以有限状态机的工作过程,就是从开始状态,根据不同的输入,自动进行转换的过程。
在这里插入图片描述

上图中的状态机的功能,是检测二进制数是否含有偶数个0。从图上可以看出,输入只有1和0两种。
从S1状态开始,只有输入0才会转换到S2状态,同样S2状态下只有输入0才会转换到S1。所以,二进制输入完毕,如果满足最终状态,也就是最后停在S1状态,那么输入的二进制数就含有偶数个0。

CEP开发流程

FlinkCEP开发流程:

  • DataSource中数据转换为DataStream、Watermark、keyby
  • 定义Pattern,并将DataStream和Pattern组合转换为PatternStream
  • PatternStream经过select、process等算子转换为 DataStream
  • 再次转换为 DataStream 经过处理后,Sink到目标库

添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency>

案例1:恶意登录检测

找出5秒内,连续登录失败的账号
以下是数据:

new CepLoginBean(1L, "fail", 1597905234000L),
new CepLoginBean(1L, "success", 1597905235000L),
new CepLoginBean(2L, "fail", 1597905236000L),
new CepLoginBean(2L, "fail", 1597905237000L),
new CepLoginBean(2L, "fail", 1597905238000L),
new CepLoginBean(3L, "fail", 1597905239000L),
new CepLoginBean(3L, "success", 1597905240000L)

整体思路

  • 获取到数据
  • 在数据源上做Watermark
  • 在Watermark上根据ID分组keyBy
  • 做出模式Pattern
  • 在数据流上进行模式匹配
  • 提取匹配成功的数据

编写代码

packageicu.wzk;importorg.apache.flink.api.common.eventtime.*;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.cep.CEP;importorg.apache.flink.cep.PatternStream;importorg.apache.flink.cep.functions.PatternProcessFunction;importorg.apache.flink.cep.pattern.Pattern;importorg.apache.flink.cep.pattern.conditions.IterativeCondition;importorg.apache.flink.streaming.api.TimeCharacteristic;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.util.Collector;importjava.util.List;importjava.util.Map;publicclassFlinkCepLoginTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);DataStreamSource<CepLoginBean> data = env.fromElements(newCepLoginBean(1L,"fail",1597905234000L),newCepLoginBean(1L,"success",1597905235000L),newCepLoginBean(2L,"fail",1597905236000L),newCepLoginBean(2L,"fail",1597905237000L),newCepLoginBean(2L,"fail",1597905238000L),newCepLoginBean(3L,"fail",1597905239000L),newCepLoginBean(3L,"success",1597905240000L));SingleOutputStreamOperator<CepLoginBean> watermarks = data
                .assignTimestampsAndWatermarks(newWatermarkStrategy<CepLoginBean>(){@OverridepublicWatermarkGenerator<CepLoginBean>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context){returnnewWatermarkGenerator<CepLoginBean>(){long maxTimestamp =Long.MAX_VALUE;long maxOutOfOrderness =500L;@OverridepublicvoidonEvent(CepLoginBean event,long eventTimestamp,WatermarkOutput output){
                                maxTimestamp =Math.max(maxTimestamp, event.getTimestamp());}@OverridepublicvoidonPeriodicEmit(WatermarkOutput output){
                                output.emitWatermark(newWatermark(maxTimestamp - maxOutOfOrderness));}};}}.withTimestampAssigner((element, recordTimestamp)-> element.getTimestamp()));KeyedStream<CepLoginBean,Long> keyed = watermarks
                .keyBy(newKeySelector<CepLoginBean,Long>(){@OverridepublicLonggetKey(CepLoginBean value)throwsException{return value.getUserId();}});Pattern<CepLoginBean,CepLoginBean> pattern =Pattern.<CepLoginBean>begin("start").where(newIterativeCondition<CepLoginBean>(){@Overridepublicbooleanfilter(CepLoginBean cepLoginBean,Context<CepLoginBean> context)throwsException{return cepLoginBean.getOperation().equals("fail");}}).next("next").where(newIterativeCondition<CepLoginBean>(){@Overridepublicbooleanfilter(CepLoginBean cepLoginBean,Context<CepLoginBean> context)throwsException{return cepLoginBean.getOperation().equals("fail");}}).within(Time.seconds(5));PatternStream<CepLoginBean> patternStream =CEP.pattern(keyed, pattern);SingleOutputStreamOperator<CepLoginBean> process = patternStream
                .process(newPatternProcessFunction<CepLoginBean,CepLoginBean>(){@OverridepublicvoidprocessMatch(Map<String,List<CepLoginBean>> map,Context context,Collector<CepLoginBean> collector)throwsException{System.out.println("map: "+ map);List<CepLoginBean> start = map.get("start");
                        collector.collect(start.get(0));}});
        process.print();
        env.execute("FlinkCepLoginTest");}}classCepLoginBean{privateLong userId;privateString operation;privateLong timestamp;publicCepLoginBean(Long userId,String operation,Long timestamp){this.userId = userId;this.operation = operation;this.timestamp = timestamp;}publicLonggetUserId(){return userId;}publicvoidsetUserId(Long userId){this.userId = userId;}publicStringgetOperation(){return operation;}publicvoidsetOperation(String operation){this.operation = operation;}publicLonggetTimestamp(){return timestamp;}publicvoidsetTimestamp(Long timestamp){this.timestamp = timestamp;}@OverridepublicStringtoString(){return"CepLoginBean{"+"userId="+ userId +", operation='"+ operation +'\''+", timestamp="+ timestamp +'}';}}

运行结果

可以看到程序输出:

map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}
map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905238000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}

Process finished with exit code 0

运行截图如下所示:
在这里插入图片描述

标签: 大数据 flink java

本文转载自: https://blog.csdn.net/w776341482/article/details/142130145
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现”的评论:

还没有评论