0


Flink CEP(三)pattern动态更新(附源码)

线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

1.实现分析

  • 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
  • 动态更新:需要提供定时去检测规则是否变更
  • 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
  • API:需要对外提供易用的API

2.代码实现

   首先实现一个用户API。
package cep.functions;

import java.io.Serializable;

import org.apache.flink.api.common.functions.Function;

import cep.pattern.Pattern;

/**
 * @author StephenYou
 * Created on 2023-07-23
 * Description: 动态Pattern接口(用户调用API)不区分key
 */
public interface DynamicPatternFunction<T> extends Function, Serializable {

    /***
     * 初始化
     * @throws Exception
     */
    public void init() throws Exception;

    /**
     * 注入新的pattern
     * @return
     */
    public Pattern<T,T> inject() throws Exception;

    /**
     * 一个扫描周期:ms
     * @return
     */
    public long getPeriod() throws Exception;

    /**
     * 规则是否发生变更
     * @return
     */
    public boolean isChanged() throws Exception;
}
    希望上述API的调用方式如下。
//正常调用

CEP.pattern(dataStream,pattern);

//动态Pattern

CEP.injectionPattern(dataStream, new UserDynamicPatternFunction())
    所以需要修改CEP-Lib源码

    b.增加injectionPattern函数。
public class CEP {
    /***
     * Dynamic injection pattern function 
     * @param input
     * @param dynamicPatternFunction
     * @return
     * @param <T>
     */
    public static <T> PatternStream<T> injectionPattern throws Exception (
            DataStream<T> input,
            DynamicPatternFunction<T> dynamicPatternFunction){
        return new PatternStream<>(input, dynamicPatternFunction); 
    }
}
    增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。
public class PatternStream<T> {
    PatternStream(final DataStream<T> inputStream, DynamicPatternFunction<T> dynamicPatternFunction) throws Exception {
        this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));
    }
}
    修改PatternStreamBuilder.build, 增加调用函数的过程。
        final CepOperator<IN, K, OUT> operator = null;
        if (patternFunction == null ) {
            operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    nfaFactory,
                    comparator,
                    pattern.getAfterMatchSkipStrategy(),
                    processFunction,
                    lateDataOutputTag);
        } else {
            operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    patternFunction,
                    comparator,
                    null,
                    processFunction,
                    lateDataOutputTag);
        }
    增加对应的CepOperator构造函数。
    public CepOperator(
            final TypeSerializer<IN> inputSerializer,
            final boolean isProcessingTime,
            final DynamicPatternFunction patternFunction,
            @Nullable final EventComparator<IN> comparator,
            @Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
            final PatternProcessFunction<IN, OUT> function,
            @Nullable final OutputTag<IN> lateDataOutputTag) {
        super(function);

        this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
        this.patternFunction = patternFunction;
        this.isProcessingTime = isProcessingTime;
        this.comparator = comparator;
        this.lateDataOutputTag = lateDataOutputTag;

        if (afterMatchSkipStrategy == null) {
            this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
        } else {
            this.afterMatchSkipStrategy = afterMatchSkipStrategy;
        }
        this.nfaFactory = null;
    }
    加载Pattern,构造NFA
    @Override
    public void open() throws Exception {
        super.open();
        timerService =
                getInternalTimerService(
                        "watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);

        //初始化
        if (patternFunction != null) {
            patternFunction.init();
            Pattern pattern = patternFunction.inject();
            afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();
            boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;
            nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
            long period = patternFunction.getPeriod();
            // 注册定时器检测规则是否变更
            if (period > 0) {
                getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);
            }
        }

        nfa = nfaFactory.createNFA();
        nfa.open(cepRuntimeContext, new Configuration());

        context = new ContextFunctionImpl();
        collector = new TimestampedCollector<>(output);
        cepTimerService = new TimerServiceImpl();

        // metrics
        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    }
    状态清理一共分为两块: 匹配状态数据清理、定时器清理;

    进行状态清理:
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

        if (patternFunction != null) {
            // 规则版本更新
            if (needRefresh.value() < refreshVersion.get()) {
                //清除状态
                computationStates.clear();
                elementQueueState.clear();
                partialMatches.releaseCacheStatisticsTimer();
                //清除定时器
                Iterable<Long> registerTime = registerTimeState.get();
                if (registerTime != null) {
                    Iterator<Long> iterator = registerTime.iterator();
                    while (iterator.hasNext()) {
                        Long l = iterator.next();
                        //删除定时器
                        timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);
                        timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);
                        //状态清理
                        iterator.remove();
                    }
                }
                //更新当前的版本
                needRefresh.update(refreshVersion.get());
            }
        }
}
    上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);

        //初始化状态
        if (patternFunction != null) {
            /**
             * 两个标识位状态
             */
            refreshFlagState = context.getOperatorStateStore()
                    .getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));
            if (context.isRestored()) {
                if (refreshFlagState.get().iterator().hasNext()) {
                    refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());
                }
            } else {
                refreshVersion = new AtomicInteger(0);
            }
            needRefresh = context.getKeyedStateStore()
                    .getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));
        }
}

3.测试验证

    设置每10s变更一次Pattern。
 PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());
        patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>, Map>() {
            @Override
            public Map select(Map map) throws Exception {
                map.put("processingTime", System.currentTimeMillis());
                return map;
            }
        }).print();
        env.execute("SyCep");

    }

    public static class TestDynamicPatternFunction implements DynamicPatternFunction<Tuple3<String, Long, String>> {

        public TestDynamicPatternFunction() {
            this.flag = true;
        }

        boolean flag;
        int time = 0;
        @Override
        public void init() throws Exception {
            flag = true;
        }

        @Override
        public Pattern<Tuple3<String, Long, String>, Tuple3<String, Long, String>> inject()
                            throws Exception {

            // 2种pattern
            if (flag) {
                Pattern pattern = Pattern
                        .<Tuple3<String, Long, String>>begin("start")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("success");
                            }
                        })
                        .times(1)
                        .followedBy("middle")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("fail");
                            }
                        })
                        .times(1)
                        .next("end");
                return pattern;
            } else {

                Pattern pattern = Pattern
                        .<Tuple3<String, Long, String>>begin("start2")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("success2");
                            }
                        })
                        .times(2)
                        .next("middle2")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("fail2");
                            }
                        })
                        .times(2)
                        .next("end2");
                return pattern;
            }
        }

        @Override
        public long getPeriod() throws Exception {
            return 10000;
        }

        @Override
        public boolean isChanged() throws Exception {
            flag = !flag ;
            time += getPeriod();
            System.out.println("change pattern : " + time);
            return true;
        }
    }

打印结果:符合预期

4.源码地址

感觉有用的话,帮忙点个小星星。^_^

GitHub - StephenYou520/SyCep: CEP 动态Pattern

标签: flink 大数据

本文转载自: https://blog.csdn.net/qq_35590459/article/details/131869596
版权归原作者 StephenYYYou 所有, 如有侵权,请联系我们删除。

“Flink CEP(三)pattern动态更新(附源码)”的评论:

还没有评论