0


[实时计算flink]动态CEP中规则的JSON格式定义

本文为您介绍CEP中规则的JSON格式相关信息。

目标人群

  • 客户风控平台开发人员:对Flink CEP较熟悉的平台研发人员应能快速学习本格式,并根据自身平台需求判断是否需要进一步封装。
  • 客户风控策略人员:只熟悉具体策略但缺乏Java经验的同学,在熟悉CEP概念的基础上,也可快速上手本格式的使用来编写新规则,使其在上线的风控作业中应用。

JSON格式定义

对于一个事件序列(Event Sequence)中的模式(Pattern),我们可以将其看作一个图(Graph),图中节点(Node)为针对某些事件(Event)的模式,节点之间的边(Edge)为事件选择策略(Event Selection Strategy),即如何从一类模式的匹配转移到另一类模式的匹配。每个图也可以看作一个更大的图的子节点,从而允许模式的嵌套。基于以上考虑,阿里云实时计算Flink定义了一套基于JSON的规范来描述CEP中的规则,进而方便规则的存储与修改,该规范中各个字段的含义如下。

  • 节点(Node)定义一个节点(Node)即一个完整的模式(Pattern),它包含如下属性。 字段名描述类型是否必填备注namePattern名称。string是一个唯一的字符串。 说明不同节点的名称不能重复。type该Node类型。enum(string)是- 对于包含子Pattern的节点,该字段值为COMPOSITE。- 对于无子Pattern的节点,该字段值为ATOMIC。quantifier量词,用于描述如何匹配该Pattern,例如只匹配一次。dict是请参见本文量词(Quantifier)定义。condition条件。dict否请参见本文条件(Condition)定义。
  • 量词(Quantifier)定义量词的作用是描述对于满足该Pattern的事件要如何匹配。例如模式"A*" 对应的量词properties为LOOPING,该Pattern内部的事件选择策略为SKIP_TILL_ANY。 字段名描述类型是否必填备注consumingStrategy事件选择策略。enum(string)是仅支持以下取值: - STRICT- SKIP_TILL_NEXT- SKIP_TILL_ANY取值及含义请参见本文连续性定义。times用于描述该Pattern需要匹配多少次。dict否取值示例如下。 "times": { "from": 3, "to": 3, "windowTime": { "unit": "MINUTES", "size": 12 } }, 其中from和to的数据类型均为integer,windowTime的单位可以为DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。 说明windowTime可以设为null,即"windowTime": null。properties描述该量词所具有的属性。array of enumString是取值及含义请参见本文量词属性含义。untilCondition停止条件。 说明仅可在LOOPING量词修饰的Pattern后使用。dict否取值及含义请参见本文条件(Condition)定义。
  • 条件(Condition)定义条件用于筛选符合某些要求的事件。例如要筛选浏览时长超过5分钟的客户,浏览时长超过5分钟即为一个条件。 字段名描述类型是否必填备注type条件类型。enum(string)是条件类型取值如下: - CLASS:对应用户自定义的条件。- AVIATOR:对应基于AVIATOR表达式的条件。- GROOVY:对应基于GROOVY表达式的条件。...其他可序列化的自定义字段。否... 目前我们支持以下几种Condition: - Class类型Condition 字段名描述类型是否必填备注type条件类型。enum(string)是固定值为Class。className类名。string是该class完整类名,例如com.alibaba.ververica.cep.demo.StartCondition。- 包含自定义参数的Condition 用户在使用普通的Class类型Condition时,只能传入类名(className),而无法动态地传入参数。在动态CEP支持中,为了提供更丰富的Condition表达能力,我们设计并实现了包含自定义参数的Condition(即CustomArgsCondition),从而允许用户在JSON中通过字符串数组来设置CustomArgsCondition所需参数, 进而动态构造CustomArgsCondition实例。这一特性允许用户动态更新Condition的参数,而无需修改Java代码。字段名描述类型是否必填备注type条件类型。enum(string)是固定值为Class。className类名。string是该class完整类名,例如com.alibaba.ververica.cep.demo.CustomMiddleCondition。args自定义参数。array of string是一个字符串数组。- 基于Aviator表达式的ConditionAviator是一个表达式求值引擎,可以动态地将表达式编译成字节码(详情请参见aviatorscript)。因此我们可以在作业中使用基于Aviator表达式的Condition,使得条件的阈值也可以动态修改,而无需修改Java代码重新编译运行。 字段名描述类型是否必填备注type类名。string是固定值为AVIATOR。expression表达式字符串。string是形如price > 10这样的表达式字符串(price变量名来自于Java代码中定义的字段)。 您可以将该字符串在数据库中的值进行修改。例如修改为price > 20,Flink CEP作业会动态加载price > 20构造新的AviatorCondition来处理之后的事件。- 基于Groovy表达式的ConditionGroovy是一个基于JVM平台的动态语言(Groovy语法可以参见syntax)。动态CEP支持使用Groovy表达式来定义条件(Condition),从而允许动态修改条件的阈值。 字段名描述类型是否必填备注type类名。string是固定值为GROOVY。expression表达式字符串。string是形如price > 5.0 && name.contains("mid")这样的表达式字符串(price、name等变量名来自于Java代码中定义的字段)。您可以将该字符串在数据库中的值进行修改。例如修改为price > 20 && name.contains("end"),Flink CEP作业会动态加载新的Groovy字符串并构造新的GroovyCondition来处理之后的事件。
  • 边(Edge)定义 字段名描述类型是否必填备注source源模式名称。string是无。target目标模式名称。string是无。type事件选择策略。dict是支持以下取值: - STRICT- SKIP_TILL_NEXT- SKIP_TILL_ANY- NOT_FOLLOW- NOT_NEXT取值及含义请参见本文连续性定义。
  • 图(GraphNode extends Node)定义 一个图(GraphNode)代表一个完整的Pattern序列,它的节点(nodes)是各个独立的Pattern,边(edges)代表如何从一类Pattern的匹配转移到另一类Pattern的匹配。 为了支持Pattern的嵌套(即GroupPattern),我们将一个GraphNode看作是Node的子类,即一个GraphNode可以作为一个更大的GraphNode中的Node。GraphNode相比于基础Node,额外多了以下2类字段: - 描述图的结构的nodes字段与edges字段。- 描述图内时间窗口策略的window字段与事件匹配后的跳出策略afterMatchSkipStrategy字段。 GraphNode的字段详情请参见下表。 字段名描述类型是否必填备注name该复合Pattern名称。String是一个唯一的字符串。 说明不同Graph名称不能重复。type该Node类型。enum(string)是固定值为COMPOSITE。version该Graph使用的JSON格式的版本号。Int是默认值为1。nodes该Pattern内嵌套的子Pattern。array of Node是不可以为空的array。edges嵌套的子Pattern的连接关系。array of Edge是可以为空的array。window- 当类型为FIRST_AND_LAST:代表该复合Pattern一次完整匹配之间的最大时间间隔。- 当类型为PREVIOUS_AND_CURRENT:代表该相邻2个子Pattern匹配之间的最大时间间隔。dict否取值示例如下。 "window": { "type": "FIRST_AND_LAST", "time": { "unit": "DAYS", "size": 1 }}单位可以为DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。数据类型为Long或Integer。afterMatchSkipStrategy该图内所有事件匹配后的跳过策略。dict是请参见本文事件匹配后的跳过策略(AfterMatchSkipStrategy)定义。quantifier量词,用于描述如何匹配该Pattern,例如只匹配一次。dict是请参见本文量词(Quantifier)定义。
  • 事件匹配后的跳过策略(AfterMatchSkipStrategy)定义 字段名描述类型是否必填备注type策略类型。enum(string)是参数取值如下: - NO_SKIP(默认值):每个成功的匹配都会被输出。- SKIP_TO_NEXT:丢弃以相同事件开始的所有部分匹配。- SKIP_PAST_LAST_EVENT:丢弃起始在这个匹配的开始和结束之间的所有部分匹配。- SKIP_TO_FIRST:丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。- SKIP_TO_LAST:丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。详情请参见匹配后跳过策略patternName策略针对的模式的名称。string否一个唯一的字符串。
  • 连续性定义 物理值含义STRICT严格连续。 所有匹配的事件中间没有任何不匹配的事件。SKIP_TILL_NEXT松散连续。允许匹配的事件之间出现不匹配的事件,不匹配的事件会被忽略。SKIP_TILL_ANY不确定松散连续。更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。NOT_NEXT紧接着的后续事件不能是某指定事件。NOT_FOLLOW某指定事件后续不出现。相关示例请参见事件处理(CEP)文档。
  • 量词属性含义 取值含义SINGLE代表该模式只出现一次。LOOPING代表该模式为循环模式,可能出现多次,类比正则表达式中的*与+。TIMES代表该模式会出现指定次数。GREEDY代表在匹配该模式时,会采用贪婪匹配策略,尽可能多地匹配。OPTIONAL代表该模式为可选模式。

示例一:普通Pattern示例

例如在电商大促的实时营销场景中,要找到在大促前10分钟时间窗口内满足指定条件的客户,来使用Flink 动态CEP规则针对性地调整营销策略。这些客户需要满足的条件如下:

  • 领取了某会场的优惠券。
  • 在购物车中添加了超过3次的商品。
  • 但最后没有结账付款。

为此,我们将领取某会场的优惠券定义为StartCondition,添加商品到购物车定义为MiddleCondition,结账定义为EndCondition。抽象出的模式为在大促前10分钟的时间窗口内,满足StartCondition的事件可以发生也可以不发生,满足MiddleCondition的事件发生了大于等于3次,但最后没有1个满足EndCondition的事件。它对应的Pattern用Java代码描述如下。

Pattern<Event, Event> pattern =
    Pattern.<Event>begin("start")
            .where(new StartCondition())
            .optional()
            .followedBy("middle")
            .where(new MiddleCondition())
            .timesOrMore(3)
            .notFollowedBy("end")
            .where(new EndCondition())
            .within(Time.minutes(10));

其按本文档描述的JSON格式表达如下。

{
  "name": "end",
  "quantifier": {
    "consumingStrategy": "SKIP_TILL_NEXT",
    "properties": [
      "SINGLE"
    ],
    "times": null,
    "untilCondition": null
  },
  "condition": null,
  "nodes": [
    {
      "name": "end",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "middle",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "LOOPING"
        ],
        "times": {
          "from": 3,
          "to": 3,
          "windowTime": null
        },
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "start",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE",
          "OPTIONAL"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    }
  ],
  "edges": [
    {
      "source": "middle",
      "target": "end",
      "type": "NOT_FOLLOW"
    },
    {
      "source": "start",
      "target": "middle",
      "type": "SKIP_TILL_NEXT"
    }
  ],
  "window": {
    "type": "FIRST_AND_LAST",
    "time": {
      "unit": "MINUTES",
      "size": 10
    }
  },
  "afterMatchStrategy": {
    "type": "NO_SKIP",
    "patternName": null
  },
  "type": "COMPOSITE",
  "version": 1
}

示例二:在Pattern中使用包含自定义参数的Condition

例如在实时营销场景中,假设我们给用户打上了一个人群标签,之后会根据用户所属的标签采取不同的营销策略,例如对于A类用户我们发送营销短信,对于B类用户我们发送优惠券等,而对于其他用户,我们不采取营销措施。针对上述需求,我们可以定义一个普通的Class类型Condition来解决,但当我们想调整策略,针对C类用户也发送优惠券时,如果使用的是普通的Class类型Condition,那么我们必须改写代码,重新编译并运行作业。这种情况下,我们可以使用包含自定义参数的Condition,在代码中定义好如何根据传入的参数进行策略的调整之后,我们只需要在数据库中修改传入的参数(即包含自定义参数的Condition的args字段的值),例如由["A", "B"] 改为["A", "B", "C"],即可实现营销策略的动态更新。

即假设初始Pattern中定义的Condition如下:

"condition": {
    "args": [
        "A", "B"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}

我们可将其修改为:

"condition": {
    "args": [
        "A", "B", "C"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}

关于该类Condition在具体业务场景的使用示例,详情请参见Demo。

说明

本文中aviatorscript和Demo属于第三方搭建的网站,访问时可能会存在无法打开或访问延迟的问题。

标签: flink json 数据库

本文转载自: https://blog.csdn.net/segwy/article/details/142654975
版权归原作者 soso1968 所有, 如有侵权,请联系我们删除。

“[实时计算flink]动态CEP中规则的JSON格式定义”的评论:

还没有评论