0


Flink CEP 实现恶意登录检测

前言

在前一篇中,我们基于KeyedProcessFunction 实现了一个恶意登录检测的简单案例,但是在具体实现过程中我们发现,过程还是比较繁琐的,有没有更好的实现呢,就可以考虑使用CEP的方式;

我们知道,CEP提供了非常丰富而灵活的事件匹配模式,借助这一点,比如要实现检测特定时间窗口内连续2次登录失败时间,使用CEP就非常方便,而不需要注册定时器那样来的麻烦一些;

下面直接上代码,

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.net.URL;
import java.util.List;
import java.util.Map;

public class LoginFailCep {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //时间语义设置
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //从文件中读取数据
        String path = "E:\\code-self\\flink_study\\src\\main\\resources\\LoginLog.csv";

        DataStream<LoginEvent> loginEventStream = env.readTextFile(path)
                .map(line -> {
                    String[] fields = line.split(",");
                    return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
                    @Override
                    public long extractTimestamp(LoginEvent element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

        //通过CEP处理

        //1、定义一个匹配模式
        Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return "fail".equalsIgnoreCase(loginEvent.getLoginState());
            }
        })
                .next("secondFail").where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return "fail".equalsIgnoreCase(loginEvent.getLoginState());
                    }
                })
                .within(Time.seconds(2));

        //2、将匹配模式应用到数据流上面,得到一个 pattern stream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);

        //3、从流中检测出符合条件的复杂事件,进行转换处理,得到报警信息
        SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginMatchWarning());

        warningStream.print();
        env.execute("login fail job");

    }

    public static class LoginMatchWarning implements PatternSelectFunction<LoginEvent,LoginFailWarning>{

        @Override
        public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception {
            LoginEvent firstFailEvent = map.get("firstFail").get(0);
            LoginEvent secondFailEvent = map.get("secondFail").get(0);
            return new LoginFailWarning(firstFailEvent.getUserId(),firstFailEvent.getTimestamp(),secondFailEvent.getTimestamp(),"fail 2 times");
        }
    }

}

代码中用到的两个实体类可以参考上一篇的代码,下面来运行下这段代码,观察下效果,

从结果可以检测出,两次连续登录失败的用户ID以及事件发生的事件信息


本文转载自: https://blog.csdn.net/congge_study/article/details/123942129
版权归原作者 逆风飞翔的小叔 所有, 如有侵权,请联系我们删除。

“Flink CEP 实现恶意登录检测”的评论:

还没有评论