本文为您介绍CEP中规则的JSON格式相关信息。
目标人群
- 客户风控平台开发人员:对Flink CEP较熟悉的平台研发人员应能快速学习本格式,并根据自身平台需求判断是否需要进一步封装。
- 客户风控策略人员:只熟悉具体策略但缺乏Java经验的同学,在熟悉CEP概念的基础上,也可快速上手本格式的使用来编写新规则,使其在上线的风控作业中应用。
JSON格式定义
对于一个事件序列(Event Sequence)中的模式(Pattern),我们可以将其看作一个图(Graph),图中节点(Node)为针对某些事件(Event)的模式,节点之间的边(Edge)为事件选择策略(Event Selection Strategy),即如何从一类模式的匹配转移到另一类模式的匹配。每个图也可以看作一个更大的图的子节点,从而允许模式的嵌套。基于以上考虑,阿里云实时计算Flink定义了一套基于JSON的规范来描述CEP中的规则,进而方便规则的存储与修改,该规范中各个字段的含义如下。
- 节点(Node)定义一个节点(Node)即一个完整的模式(Pattern),它包含如下属性。 字段名描述类型是否必填备注namePattern名称。string是一个唯一的字符串。 说明不同节点的名称不能重复。type该Node类型。enum(string)是- 对于包含子Pattern的节点,该字段值为COMPOSITE。- 对于无子Pattern的节点,该字段值为ATOMIC。quantifier量词,用于描述如何匹配该Pattern,例如只匹配一次。dict是请参见本文量词(Quantifier)定义。condition条件。dict否请参见本文条件(Condition)定义。
- 量词(Quantifier)定义量词的作用是描述对于满足该Pattern的事件要如何匹配。例如模式
"A*"
对应的量词properties为LOOPING,该Pattern内部的事件选择策略为SKIP_TILL_ANY。 字段名描述类型是否必填备注consumingStrategy事件选择策略。enum(string)是仅支持以下取值: - STRICT- SKIP_TILL_NEXT- SKIP_TILL_ANY取值及含义请参见本文连续性定义。times用于描述该Pattern需要匹配多少次。dict否取值示例如下。"times": { "from": 3, "to": 3, "windowTime": { "unit": "MINUTES", "size": 12 } },
其中from和to的数据类型均为integer,windowTime的单位可以为DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。 说明windowTime可以设为null,即"windowTime": null
。properties描述该量词所具有的属性。array of enumString是取值及含义请参见本文量词属性含义。untilCondition停止条件。 说明仅可在LOOPING量词修饰的Pattern后使用。dict否取值及含义请参见本文条件(Condition)定义。 - 条件(Condition)定义条件用于筛选符合某些要求的事件。例如要筛选浏览时长超过5分钟的客户,浏览时长超过5分钟即为一个条件。 字段名描述类型是否必填备注type条件类型。enum(string)是条件类型取值如下: - CLASS:对应用户自定义的条件。- AVIATOR:对应基于AVIATOR表达式的条件。- GROOVY:对应基于GROOVY表达式的条件。...其他可序列化的自定义字段。否... 目前我们支持以下几种Condition: - Class类型Condition 字段名描述类型是否必填备注type条件类型。enum(string)是固定值为Class。className类名。string是该class完整类名,例如
com.alibaba.ververica.cep.demo.StartCondition
。- 包含自定义参数的Condition 用户在使用普通的Class类型Condition时,只能传入类名(className),而无法动态地传入参数。在动态CEP支持中,为了提供更丰富的Condition表达能力,我们设计并实现了包含自定义参数的Condition(即CustomArgsCondition),从而允许用户在JSON中通过字符串数组来设置CustomArgsCondition所需参数, 进而动态构造CustomArgsCondition实例。这一特性允许用户动态更新Condition的参数,而无需修改Java代码。字段名描述类型是否必填备注type条件类型。enum(string)是固定值为Class。className类名。string是该class完整类名,例如com.alibaba.ververica.cep.demo.CustomMiddleCondition
。args自定义参数。array of string是一个字符串数组。- 基于Aviator表达式的ConditionAviator是一个表达式求值引擎,可以动态地将表达式编译成字节码(详情请参见aviatorscript)。因此我们可以在作业中使用基于Aviator表达式的Condition,使得条件的阈值也可以动态修改,而无需修改Java代码重新编译运行。 字段名描述类型是否必填备注type类名。string是固定值为AVIATOR。expression表达式字符串。string是形如price > 10这样的表达式字符串(price变量名来自于Java代码中定义的字段)。 您可以将该字符串在数据库中的值进行修改。例如修改为price > 20,Flink CEP作业会动态加载price > 20构造新的AviatorCondition来处理之后的事件。- 基于Groovy表达式的ConditionGroovy是一个基于JVM平台的动态语言(Groovy语法可以参见syntax)。动态CEP支持使用Groovy表达式来定义条件(Condition),从而允许动态修改条件的阈值。 字段名描述类型是否必填备注type类名。string是固定值为GROOVY。expression表达式字符串。string是形如price > 5.0 && name.contains("mid")这样的表达式字符串(price、name等变量名来自于Java代码中定义的字段)。您可以将该字符串在数据库中的值进行修改。例如修改为price > 20 && name.contains("end"),Flink CEP作业会动态加载新的Groovy字符串并构造新的GroovyCondition来处理之后的事件。 - 边(Edge)定义 字段名描述类型是否必填备注source源模式名称。string是无。target目标模式名称。string是无。type事件选择策略。dict是支持以下取值: - STRICT- SKIP_TILL_NEXT- SKIP_TILL_ANY- NOT_FOLLOW- NOT_NEXT取值及含义请参见本文连续性定义。
- 图(GraphNode extends Node)定义 一个图(GraphNode)代表一个完整的Pattern序列,它的节点(nodes)是各个独立的Pattern,边(edges)代表如何从一类Pattern的匹配转移到另一类Pattern的匹配。 为了支持Pattern的嵌套(即GroupPattern),我们将一个GraphNode看作是Node的子类,即一个GraphNode可以作为一个更大的GraphNode中的Node。GraphNode相比于基础Node,额外多了以下2类字段: - 描述图的结构的nodes字段与edges字段。- 描述图内时间窗口策略的window字段与事件匹配后的跳出策略afterMatchSkipStrategy字段。 GraphNode的字段详情请参见下表。 字段名描述类型是否必填备注name该复合Pattern名称。String是一个唯一的字符串。 说明不同Graph名称不能重复。type该Node类型。enum(string)是固定值为COMPOSITE。version该Graph使用的JSON格式的版本号。Int是默认值为1。nodes该Pattern内嵌套的子Pattern。array of Node是不可以为空的array。edges嵌套的子Pattern的连接关系。array of Edge是可以为空的array。window- 当类型为FIRST_AND_LAST:代表该复合Pattern一次完整匹配之间的最大时间间隔。- 当类型为PREVIOUS_AND_CURRENT:代表该相邻2个子Pattern匹配之间的最大时间间隔。dict否取值示例如下。
"window": { "type": "FIRST_AND_LAST", "time": { "unit": "DAYS", "size": 1 }}
单位可以为DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。数据类型为Long或Integer。afterMatchSkipStrategy该图内所有事件匹配后的跳过策略。dict是请参见本文事件匹配后的跳过策略(AfterMatchSkipStrategy)定义。quantifier量词,用于描述如何匹配该Pattern,例如只匹配一次。dict是请参见本文量词(Quantifier)定义。 - 事件匹配后的跳过策略(AfterMatchSkipStrategy)定义 字段名描述类型是否必填备注type策略类型。enum(string)是参数取值如下: - NO_SKIP(默认值):每个成功的匹配都会被输出。- SKIP_TO_NEXT:丢弃以相同事件开始的所有部分匹配。- SKIP_PAST_LAST_EVENT:丢弃起始在这个匹配的开始和结束之间的所有部分匹配。- SKIP_TO_FIRST:丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。- SKIP_TO_LAST:丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。详情请参见匹配后跳过策略patternName策略针对的模式的名称。string否一个唯一的字符串。
- 连续性定义 物理值含义STRICT严格连续。 所有匹配的事件中间没有任何不匹配的事件。SKIP_TILL_NEXT松散连续。允许匹配的事件之间出现不匹配的事件,不匹配的事件会被忽略。SKIP_TILL_ANY不确定松散连续。更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。NOT_NEXT紧接着的后续事件不能是某指定事件。NOT_FOLLOW某指定事件后续不出现。相关示例请参见事件处理(CEP)文档。
- 量词属性含义 取值含义SINGLE代表该模式只出现一次。LOOPING代表该模式为循环模式,可能出现多次,类比正则表达式中的*与+。TIMES代表该模式会出现指定次数。GREEDY代表在匹配该模式时,会采用贪婪匹配策略,尽可能多地匹配。OPTIONAL代表该模式为可选模式。
示例一:普通Pattern示例
例如在电商大促的实时营销场景中,要找到在大促前10分钟时间窗口内满足指定条件的客户,来使用Flink 动态CEP规则针对性地调整营销策略。这些客户需要满足的条件如下:
- 领取了某会场的优惠券。
- 在购物车中添加了超过3次的商品。
- 但最后没有结账付款。
为此,我们将领取某会场的优惠券定义为StartCondition,添加商品到购物车定义为MiddleCondition,结账定义为EndCondition。抽象出的模式为在大促前10分钟的时间窗口内,满足StartCondition的事件可以发生也可以不发生,满足MiddleCondition的事件发生了大于等于3次,但最后没有1个满足EndCondition的事件。它对应的Pattern用Java代码描述如下。
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));
其按本文档描述的JSON格式表达如下。
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}
示例二:在Pattern中使用包含自定义参数的Condition
例如在实时营销场景中,假设我们给用户打上了一个人群标签,之后会根据用户所属的标签采取不同的营销策略,例如对于A类用户我们发送营销短信,对于B类用户我们发送优惠券等,而对于其他用户,我们不采取营销措施。针对上述需求,我们可以定义一个普通的Class类型Condition来解决,但当我们想调整策略,针对C类用户也发送优惠券时,如果使用的是普通的Class类型Condition,那么我们必须改写代码,重新编译并运行作业。这种情况下,我们可以使用包含自定义参数的Condition,在代码中定义好如何根据传入的参数进行策略的调整之后,我们只需要在数据库中修改传入的参数(即包含自定义参数的Condition的args字段的值),例如由["A", "B"] 改为["A", "B", "C"],即可实现营销策略的动态更新。
即假设初始Pattern中定义的Condition如下:
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
我们可将其修改为:
"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
关于该类Condition在具体业务场景的使用示例,详情请参见Demo。
说明
本文中aviatorscript和Demo属于第三方搭建的网站,访问时可能会存在无法打开或访问延迟的问题。
版权归原作者 soso1968 所有, 如有侵权,请联系我们删除。