什么是Cep?
在流式数据中(
事件流
),
筛选出符合条件
的一系列动作(
事件
)【复杂事件处理】
什么是 Flink-Cep?
Flink Cep库
Api
【
实时操作
】
官方文档
什么是Pattern?
Pattern就是Cep里的
规则
制定
Pattern分为
个体模式
,
组合模式(模式序列)
和
模式组
模式组是将
组合模式
作为
条件
的
个体模式
Cep开发流程
- DataStream 或
Keyedstream
- 定义
规则(Pattern)
- 将
规则
应用于KeyedStream
,生成PatternStream
- 将
PatternStream
,通过Select
方法,将符合规则
的数据输出
代码实战
依赖
<!-- Flink-Cep --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
Cep开发伪代码(个体模式和组合模式)
publicclassCepDemo{publicstaticvoidmain(String[] args){// 创建流式计算上下文环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 生成DataStreamDataStream<String> dataStream =null;// 生成KeyedStream (分组)KeyedStream<String,Tuple> keyedStream = dataStream.keyBy("");// 生成模式(规则) ( Pattern 对象)/* **********************
*
* 【个体模式】
* 1. 【单例】模式:只接收1个事件
* 2. 【循环】模式:能接收多个事件或1个事件, 单例模式 + 量词(times())
* *********************/// 生成名叫 “login” 的单个PatternPattern<String,String> pattern =Pattern.<String>begin("login").where(newSimpleCondition<String>(){@Overridepublicbooleanfilter(String s)throwsException{// Patter规则内容returnfalse;}}).times(3);/* **********************
* 【组合模式】
*
* 组合方式:
* 1. next: 严格紧邻 (连续)
* 2. fallowedBy: 宽松近邻 (非连续)
* 3. fallowedByAny: 非严格匹配,比 fallowedBy 更宽松
*
* *********************/// 生成了两个Patten所组成的Pattern序列,分别名叫 "login", "sale"Pattern<String,String> patterns =Pattern.<String>begin("login")//.where().followedBy("sale");//.where();// 将 Pattern 应用于 KeyedStream, 生成 PatternStream 对象PatternStream<String> patternStream =CEP.pattern(keyedStream, patterns);// 通过PatternStream 对象的 select() 方法, 将符合规则的数据提取输出DataStream<Object> patternResult = patternStream.select(newPatternSelectFunction<String,Object>(){/**
* @param map: key: 指的是Pattern的名称。 value: 符合这个Pattern的数据
*/@OverridepublicObjectselect(Map<String,List<String>> map)throwsException{returnnull;}});}}
【生成模式】
基于【个体模式】检测最近1分钟内登录失败超过3次的用户
CEP模式:允许这3次登录失败事件之间出现其他行为事件(不连续)【宽松近邻】
publicclassLoginFailBySingleton{publicstaticvoidmain(String[] args){// Kafka数据源DataStream<EventPO> eventStream =KafkaUtil.read(args);// 生成KeyedStream 用户id分组KeyedStream<EventPO,Integer> keyedStream = eventStream.keyBy((KeySelector<EventPO,Integer>)EventPO::getUser_id_int);// 生成模式 (规则/Pattern)Pattern.<EventPO>begin("login_fail_first")// Pattern名称/*
1. IterativeCondition 抽象类 表示通用的匹配规则
需要实现 filter(), 需要传入2个参数
2.SimpleCondition 是 IterativeCondition 的子类,表示简单的匹配规则
需要实现 filter(), 需要传入1个参数
*/.where(newSimpleCondition<EventPO>(){@Overridepublicbooleanfilter(EventPO eventPO){// 登录失败事件returnEventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name());}}).times(3)// 3次,宽松近邻.within(Time.seconds(60));// 最近一分钟(时间)}}
检测最近1分钟内【连续】登录失败超过3次的用户
基于【个体模式】
CEP模式:3次登录失败事件必须是连续的【严格紧邻】
添加该方法即可
.consecutive()
publicclassLoginFailByConsecutive{publicstaticvoidmain(String[] args){// KafkaDataStream<EventPO> eventStream =KafkaUtil.read(args);// 生成KeyedStreamKeyedStream<EventPO,Integer> keyedStream = eventStream.keyBy(newKeySelector<EventPO,Integer>(){@OverridepublicIntegergetKey(EventPO eventPO)throwsException{return eventPO.getUser_id_int();}});Pattern.<EventPO>begin("login_fail_first").where(newSimpleCondition<EventPO>(){@Overridepublicbooleanfilter(EventPO eventPO)throwsException{returnEventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name());}})/* **********************
* 1. 个体模式的循环模式 匹配的是 宽松近邻 (能够允许插入其他事件)
* 2. consecutive() 就指定匹配模式是 严格紧邻(连续)
* *********************/.times(3).consecutive()// 连续.within(Time.seconds(60));}}
宽松近邻与严格紧邻
宽松近邻
:不连续事件
严格紧邻
:连续事件
例子:
事件流1(连续登录失败的事件流):event_A(login_fail),event_B(login_fail),event_C(login_fail)【严格紧邻】
事件流2(不连续登录失败的事件流):event_A(login_fail),event_D(login_success),event_B(login_fail),event_C(login_fail)【宽松近邻】
基于【组合模式】
单体模式、组合模式通用
组合模式
.next(...)
严格紧邻(连续事件)
publicclassLoginFailByComposite{publicstaticvoidmain(String[] args){DataStream<EventPO> eventStream =KafkaUtil.read(args);KeyedStream<EventPO,Integer> keyedStream = eventStream.keyBy(newKeySelector<EventPO,Integer>(){@OverridepublicIntegergetKey(EventPO eventPO)throwsException{return eventPO.getUser_id_int();}});// 三个连续登录失败事件【组合模式】Pattern.<EventPO>begin("login_fail_first").where(newSimpleCondition<EventPO>(){@Overridepublicbooleanfilter(EventPO eventPO)throwsException{returnEventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name());}}).next("login_fail_second")// 严格紧邻.where(newSimpleCondition<EventPO>(){@Overridepublicbooleanfilter(EventPO eventPO)throwsException{returnEventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name());}}).next("login_fail_third").where(newSimpleCondition<EventPO>(){@Overridepublicbooleanfilter(EventPO eventPO)throwsException{returnEventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name());}}).within(Time.seconds(60));}}
基于【迭代条件】检测最近15分钟内IP更换次数超过3次的用户
注意
:
- 对于每个模式 (规则/Pattern)可以
设置条件
,判定到达的行为事件,是否能够进入到这个模式如:设置条件为只有登录成功这个行为事件,才能够进入到这个模式 - 条件的设置方法是:
where()
where() 的参数是IterativeCondition对象
- IterativeCondition 称为
迭代条件
能够设置较复杂的条件,尤其和循环模式相结合
publicclassIpChangeByIterative{publicstaticvoidmain(String[] args){DataStream<EventPO> eventStream =KafkaUtil.read(args);KeyedStream<EventPO,Integer> keyedStream = eventStream.keyBy((KeySelector<EventPO,Integer>)EventPO::getUser_id_int);Pattern<EventPO,?> pattern =Pattern.// 组合模式以begin开头,// 不设置条件,所有行为事件都可以进入到这个模式<EventPO>begin("ip")// 判断用户行为事件在15分钟内IP是否发生变化(更换IP之间可以有其他事件).followedBy("next").where(newIpChangeCondition())// 15分钟内IP发生变化次数超过3次.timesOrMore(3)// 满足条件的行为事件必须在最近15分钟内.within(Time.seconds(900));// 将模式应用到事件流/* **********************
*
* CEP.pattern(),
* 还可以有第3个参数,
* 第3个参数是比较器 EventComparator 对象,
* 可以对于同时进入模式的行为事件,进行更精确的排序
*
* *********************/PatternStream<EventPO> patternStream =CEP.pattern(keyedStream, pattern);// 提取数据...}}
**
判断条件
**
继承
IterativeCondition
类
Context
是上下文对象,
getEventsForPattern(...)
根据传入的模式名获取对应模式中已匹配的所有行为事件
publicclassIpChangeConditionextendsIterativeCondition<EventPO>{@Overridepublicbooleanfilter(EventPO eventPO,Context<EventPO> context)throwsException{boolean change =false;// 当前模式名称是"ip", 获取当前模式之前已经匹配的事件for(EventPO preEvent : context.getEventsForPattern("ip")){// 前一个行为事件的IPString preIP = preEvent.getEvent_context().getDevice().getIp();// 当前行为事件的IPStringIP= eventPO.getEvent_context().getDevice().getIp();// 判断前后行为事件的IP是否发生变化if(!Objects.equals(preIP,IP)){
change =true;break;}}return change;}}
用户在15分钟内的行为路径是"登录-领券-下单"(明显薅羊毛行为特征)
组合模式
publicclassClipCouponsRoute{publicstaticvoidmain(String[] args){DataStream<EventPO> eventStream =KafkaUtil.read(args);KeyedStream<EventPO,Integer> keyedStream = eventStream.keyBy((KeySelector<EventPO,Integer>)EventPO::getUser_id_int);// 生成模式 (规则/Pattern)【组合模式】Pattern<EventPO,?> pattern =Pattern// 过滤登录行为事件.<EventPO>begin("login").where(newSimpleCondition<EventPO>(){@Overridepublicbooleanfilter(EventPO eventPO)throwsException{returnEventConstant.LOGIN_SUCCESS.equals(eventPO.getEvent_type());}})// 宽松近邻:过滤领取优惠券行为事件.followedBy("receive").where(newSimpleCondition<EventPO>(){@Overridepublicbooleanfilter(EventPO eventPO)throwsException{returnEventConstant.COUPON_RECEIVE.equals(eventPO.getEvent_type());}})// 宽松近邻:过滤使用优惠券行为事件.followedBy("use").where(newSimpleCondition<EventPO>(){@Overridepublicbooleanfilter(EventPO eventPO)throwsException{returnEventConstant.COUPON_USE.equals(eventPO.getEvent_type());}})// 模式有效时间:15分钟内.within(Time.minutes(15));}}
【提取、输出事件流】
将模式应用到事件流生成
PatternStream
生成Pattern之后,就要提取输出事件流
// ...// 生成模式 (规则/Pattern)Pattern<EventPO,?> pattern =Pattern.<EventPO>begin("ip").followedBy("next").where(newIpChangeCondition()).timesOrMore(3).within(Time.seconds(900));// 将模式应用到事件流/* **********************
* CEP.pattern(),
* 还可以有第3个参数,
* 第3个参数是比较器 EventComparator 对象,
* 可以对于同时进入模式的行为事件,进行更精确的排序
* *********************/PatternStream<EventPO> patternStream =CEP.pattern(keyedStream, pattern);
PatternStream三个提取匹配事件方法
select()
: 参数是 PatternSelectFunction 对象,有返回值flatselect()
:参数是 PatternFlatSelectFunction 对象,无返回值,可以通过 Collector.collect() 以事件流输出process()
: 参数是 PatternProcessFunction 对象,无返回值,可以通过 Collector.collect() 以事件流输出也可以通过 Context对象获取上下文信息
建议使用 flatSelect(), 可以更加灵活;官方建议使用 process()
以15分钟IP变化为例,完整代码:
publicclassIpChangeByIterative{publicstaticvoidmain(String[] args){DataStream<EventPO> eventStream =KafkaUtil.read(args);KeyedStream<EventPO,Integer> keyedStream = eventStream.keyBy((KeySelector<EventPO,Integer>)EventPO::getUser_id_int);// 生成模式 (规则/Pattern)Pattern<EventPO,?> pattern =Pattern.<EventPO>begin("ip").followedBy("next").where(newIpChangeCondition()).timesOrMore(3).within(Time.seconds(900));// 将模式应用到事件流PatternStream<EventPO> patternStream =CEP.pattern(keyedStream, pattern);// 提取匹配事件DataStream<EventPO> result = patternStream.process(newIpChangeProcessFunction());// 执行规则命中的策略动作}}
publicclassIpChangeProcessFunctionextendsPatternProcessFunction<EventPO,EventPO>{/**
* @param map Map<模式名, 模式名对应匹配事件列表>
* @param context 上下文对象
* @param collector 输出事件流
*/@OverridepublicvoidprocessMatch(Map<String,List<EventPO>> map,Context context,Collector<EventPO> collector)throwsException{}}
提取输出事件流,下游算子处理
Flink-Cep基石 NFA状态转移流程
薅羊毛用户是有着明显目的的
正常用户行为事件流
:
来回比较不同商品价格,最终决定购买哪件商品。
薅羊毛用户行为事件流
:
带有很强的目的性,
“登录-领券-下单”
事件流一气呵成。
Cep底层原理
:
- CEP模式匹配:
每个模式包含多个状态
- CEP模式匹配:
状态转换的过程(NFA)
以羊毛党购买商品为例,状态变化流程:
匹配上事件,设置状态,三个状态都不一样,符合条件的事件,放到结果集中,与预先设置条件数量一致。
状态最后转换为最终状态,后续传递给下游算子计算。
CEP工作流程
:
- 定义一个一个的
Pattern
,如有多个Pattern,将Pattern串联起来
构成模式匹配的逻辑表达 - 将模式匹配分拆,
创建NFA对象
- NFA对象包含了这个模式匹配的
状态和状态转换表达式
- 状态变化、处理
版权归原作者 不进大厂不改名二号 所有, 如有侵权,请联系我们删除。