0


Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP?

Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

在这里插入图片描述
上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。

2、代码实现

2.1 引入maven依赖:

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roy</groupId><artifactId>FlinkDemo</artifactId><version>1.0</version><properties><flink.version>1.12.5</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- CEP主要是下面这个依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.14</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

2.2 基本流程

//1、获取原始事件流DataStream<Event> input =......;//2、定义匹配器Pattern<Event,?> pattern =.......;//3、获取匹配流PatternStream<Event> patternStream = CEP.pattern(input, pattern);//4、将匹配流中的数据处理形成结果数据流DataStream<Result> resultStream = patternStream.process(newPatternProcessFunction<Event,Result>(){@OverridepublicvoidprocessMatch(Map<String,List<Event>> pattern,Context ctx,Collector<Result> out)throwsException{}});

2.3 完整代码

注意:代码运行前,先启动2.4 nlk socket服务

packagecom.roy.flink.project.userlogin;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.cep.CEP;importorg.apache.flink.cep.PatternStream;importorg.apache.flink.cep.functions.PatternProcessFunction;importorg.apache.flink.cep.pattern.Pattern;importorg.apache.flink.cep.pattern.conditions.SimpleCondition;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.util.Collector;importjava.time.Duration;importjava.util.List;importjava.util.Map;/**
 * @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配
 */publicclassMyUserLoginAna{publicstaticvoidmain(String[] args)throwsException{finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔
        env.getConfig().setAutoWatermarkInterval(1000L);// 使用Socket测试
        env.setParallelism(1);// 1、获取原始事件流(10.86.97.206改为实际地址)finalDataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);finalSingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(newMapFunction<String,UserLoginRecord>(){@OverridepublicUserLoginRecordmap(String s)throwsException{finalString[] splitVal = s.split(",");returnnewUserLoginRecord(splitVal[0],Integer.parseInt(splitVal[1]),Long.parseLong(splitVal[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间.withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>)(element, recordTimestamp)-> element.getLoginTime()));// 2、定义匹配器// 2.1:10秒内出现3次登录失败的记录(不一定连续)// Flink CEP定义消息匹配器。//        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {//            @Override//            public boolean filter(UserLoginRecord userLoginRecord) throws Exception {//                return 1 == userLoginRecord.getLoginRes();//            }//        }).times(3).within(Time.seconds(10));// 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followByfinalPattern<UserLoginRecord,UserLoginRecord> pattern =Pattern.<UserLoginRecord>begin("one").where(newSimpleCondition<UserLoginRecord>(){@Overridepublicbooleanfilter(UserLoginRecord value)throwsException{return1== value.getLoginRes();}}).next("two").where(newSimpleCondition<UserLoginRecord>(){@Overridepublicbooleanfilter(UserLoginRecord value)throwsException{return1== value.getLoginRes();}}).next("three").where(newSimpleCondition<UserLoginRecord>(){@Overridepublicbooleanfilter(UserLoginRecord value)throwsException{return1== value.getLoginRes();}}).within(Time.seconds(10));// 3、获取匹配流finalPatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);finalMyProcessFunction myProcessFunction =newMyProcessFunction();// 4、将匹配流中的数据处理成结果数据流finalSingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);
        badUserStream.print("badUser");
        env.execute("UserLoginAna");}// mainpublicstaticclassMyProcessFunctionextendsPatternProcessFunction<UserLoginRecord,UserLoginRecord>{@OverridepublicvoidprocessMatch(Map<String,List<UserLoginRecord>> match,Context ctx,Collector<UserLoginRecord> out)throwsException{// 针对2.1 连续3次登录失败//            final List<UserLoginRecord> records = match.get("start");//            for(UserLoginRecord record : records){//                out.collect(record);//            }// 针对2.2 非连续3次登录失败finalList<UserLoginRecord> records = match.get("three");for(UserLoginRecordrecord: records){
                out.collect(record);}}// processMarch}// MyProcessFunction}

UserLoginRecord对象,如下:

publicclassUserLoginRecord{privateString userId;privateint loginRes;// 0-成功, 1-失败privatelong loginTime;publicUserLoginRecord(){}publicUserLoginRecord(String userId,int loginRes,long loginTime){this.userId = userId;this.loginRes = loginRes;this.loginTime = loginTime;}@OverridepublicStringtoString(){return"UserLoginRecord{"+"userId='"+ userId +'\''+", loginRes="+ loginRes +", loginTime="+ loginTime +'}';}publicStringgetUserId(){return userId;}publicvoidsetUserId(String userId){this.userId = userId;}publicintgetLoginRes(){return loginRes;}publicvoidsetLoginRes(int loginRes){this.loginRes = loginRes;}publiclonggetLoginTime(){return loginTime;}publicvoidsetLoginTime(long loginTime){this.loginTime = loginTime;}}

2.4 nlk模拟socket服务端

在这里插入图片描述

2.5 IDEA控制台打印

在这里插入图片描述

标签: flink cep 用户登录

本文转载自: https://blog.csdn.net/taotao_guiwang/article/details/135925449
版权归原作者 core512 所有, 如有侵权,请联系我们删除。

“Flink CEP实现10秒内连续登录失败用户分析”的评论:

还没有评论