前言
在前一篇中,我们基于KeyedProcessFunction 实现了一个恶意登录检测的简单案例,但是在具体实现过程中我们发现,过程还是比较繁琐的,有没有更好的实现呢,就可以考虑使用CEP的方式;
我们知道,CEP提供了非常丰富而灵活的事件匹配模式,借助这一点,比如要实现检测特定时间窗口内连续2次登录失败时间,使用CEP就非常方便,而不需要注册定时器那样来的麻烦一些;
下面直接上代码,
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.net.URL;
import java.util.List;
import java.util.Map;
public class LoginFailCep {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//时间语义设置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//从文件中读取数据
String path = "E:\\code-self\\flink_study\\src\\main\\resources\\LoginLog.csv";
DataStream<LoginEvent> loginEventStream = env.readTextFile(path)
.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
});
//通过CEP处理
//1、定义一个匹配模式
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equalsIgnoreCase(loginEvent.getLoginState());
}
})
.next("secondFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equalsIgnoreCase(loginEvent.getLoginState());
}
})
.within(Time.seconds(2));
//2、将匹配模式应用到数据流上面,得到一个 pattern stream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);
//3、从流中检测出符合条件的复杂事件,进行转换处理,得到报警信息
SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginMatchWarning());
warningStream.print();
env.execute("login fail job");
}
public static class LoginMatchWarning implements PatternSelectFunction<LoginEvent,LoginFailWarning>{
@Override
public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception {
LoginEvent firstFailEvent = map.get("firstFail").get(0);
LoginEvent secondFailEvent = map.get("secondFail").get(0);
return new LoginFailWarning(firstFailEvent.getUserId(),firstFailEvent.getTimestamp(),secondFailEvent.getTimestamp(),"fail 2 times");
}
}
}
代码中用到的两个实体类可以参考上一篇的代码,下面来运行下这段代码,观察下效果,
从结果可以检测出,两次连续登录失败的用户ID以及事件发生的事件信息
版权归原作者 逆风飞翔的小叔 所有, 如有侵权,请联系我们删除。