0


深入理解Flink IntervalJoin源码

IntervalJoin基于connect实现,期间会生成对应的IntervalJoinOperator。

@PublicEvolvingpublic<OUT>SingleOutputStreamOperator<OUT>process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,TypeInformation<OUT> outputType){Preconditions.checkNotNull(processJoinFunction);Preconditions.checkNotNull(outputType);// 检查用户自定义FunctionfinalProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);// 构建IntervalJoin对应的IntervalJoinOperatorfinalIntervalJoinOperator<KEY, IN1, IN2, OUT> operator =newIntervalJoinOperator<>(
        lowerBound,
        upperBound,
        lowerBoundInclusive,
        upperBoundInclusive,
        left.getType().createSerializer(left.getExecutionConfig()),
        right.getType().createSerializer(right.getExecutionConfig()),
        cleanedUdf
    );// (基于connect实现)使用给定的自定义Function,对每个元素进行连接操作return left
        .connect(right)// 根据k1、k2,为s1、s2分配k,实际就是构建ConnectedStreams,以便后续构建IntervalJoinOperator对应的Transformation.keyBy(keySelector1, keySelector2)// 构建IntervalJoinOperator对应的TwoInputTransformation.transform("Interval Join", outputType, operator);}

并且会根据给定的自定义Function构建出对应的TwoInputTransformation,以便能够参与Transformation树的构建。

/**
 * 创建StreamOperator对应的Transformation,以便能参与Transformation树的构建
 */@PublicEvolvingpublic<R>SingleOutputStreamOperator<R>transform(String functionName,TypeInformation<R> outTypeInfo,TwoInputStreamOperator<IN1, IN2,R> operator){

    inputStream1.getType();
    inputStream2.getType();// 创建IntervalJoinOperator对应的TwoInputTransformationTwoInputTransformation<IN1, IN2,R> transform =newTwoInputTransformation<>(
        inputStream1.getTransformation(),
        inputStream2.getTransformation(),
        functionName,
        operator,
        outTypeInfo,
        environment.getParallelism());if(inputStream1 instanceofKeyedStream&& inputStream2 instanceofKeyedStream){KeyedStream<IN1,?> keyedInput1 =(KeyedStream<IN1,?>) inputStream1;KeyedStream<IN2,?> keyedInput2 =(KeyedStream<IN2,?>) inputStream2;TypeInformation<?> keyType1 = keyedInput1.getKeyType();TypeInformation<?> keyType2 = keyedInput2.getKeyType();if(!(keyType1.canEqual(keyType2)&& keyType1.equals(keyType2))){thrownewUnsupportedOperationException("Key types if input KeyedStreams "+"don't match: "+ keyType1 +" and "+ keyType2 +".");}

        transform.setStateKeySelectors(keyedInput1.getKeySelector(), keyedInput2.getKeySelector());
        transform.setStateKeyType(keyType1);}@SuppressWarnings({"unchecked","rawtypes"})SingleOutputStreamOperator<R> returnStream =newSingleOutputStreamOperator(environment, transform);// 将IntervalJoinOperator对应的TwoInputTransformation,添加到Transformation树上getExecutionEnvironment().addOperator(transform);return returnStream;}

作为ConnectedStreams,一旦left or right流中的StreamRecord抵达,就会被及时处理:

@OverridepublicvoidprocessElement1(StreamRecord<T1> record)throwsException{/**处理left*/processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound,true);}@OverridepublicvoidprocessElement2(StreamRecord<T2> record)throwsException{/**处理right*/processElement(record, rightBuffer, leftBuffer,-upperBound,-lowerBound,false);}

两者的处理逻辑是相同的:

/**
 * 处理Left和Right中的数据
 */@SuppressWarnings("unchecked")private<THIS, OTHER>voidprocessElement(finalStreamRecord<THIS> record,finalMapState<Long,List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,finalMapState<Long,List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,finallong relativeLowerBound,finallong relativeUpperBound,// 当前Join上的数据是否为leftfinalboolean isLeft)throwsException{// 当前left or right的StreamRecordfinalTHIS ourValue = record.getValue();// 当前left or right的StreamRecord中的时间戳finallong ourTimestamp = record.getTimestamp();if(ourTimestamp ==Long.MIN_VALUE){thrownewFlinkException("Long.MIN_VALUE timestamp: Elements used in "+"interval stream joins need to have timestamps meaningful timestamps.");}// 是否迟到:当前StreamRecord中的时间戳是否小于当前Watermarkif(isLate(ourTimestamp)){return;}// 将当前StreamRecord写入到它所对应的“己方MapState”中(left归left,right归right)addToBuffer(ourBuffer, ourValue, ourTimestamp);/**
      * 遍历当前StreamRecord的“对方MapState”,判断哪个StreamRecord被Join上了
      */for(Map.Entry<Long,List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()){// “对方MapState”中的Key,即时间戳finallong timestamp  = bucket.getKey();// 如果遍历到的MapState的这个元素的时间戳不在(以当前StreamRecord的时间戳为基准的)Join的范围内,// 说明没Join上,那就跳过本次循环。这是判断哪个StreamRecord是否Join上的核心!if(timestamp < ourTimestamp + relativeLowerBound ||
            timestamp > ourTimestamp + relativeUpperBound){continue;}// 反之,说明已经Join上了,那就取出这个元素的Value,即时间戳所对应的List<BufferEntry<T1>>for(BufferEntry<OTHER> entry: bucket.getValue()){// 将Join上的left和right分发下游(回调用户自定义函数中的processElement()方法)if(isLeft){collect((T1) ourValue,(T2) entry.element, ourTimestamp, timestamp);}else{collect((T1) entry.element,(T2) ourValue, timestamp, ourTimestamp);}}}// 经历双层for循环并分发下游后,计算清理时间(当前StreamRecord的时间戳+上界值)long cleanupTime =(relativeUpperBound >0L)? ourTimestamp + relativeUpperBound : ourTimestamp;// 注册Timer来清理保存在MapState中的过期数据if(isLeft){
        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);}else{
        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);}}

先取出当前StreamRecord中的Timestamp检查它是否已经迟到了,判断依据为:当前StreamRecord中的Timestamp是否小于当前Watermark。

/**
 * 判断当前StreamRecord是否迟到:当前StreamRecord中的时间戳是否小于当前Watermark
 */privatebooleanisLate(long timestamp){// 获取当前的Watermarklong currentWatermark = internalTimerService.currentWatermark();// 迟到判定条件return currentWatermark !=Long.MIN_VALUE&& timestamp < currentWatermark;}

接着将当前StreamRecord写入到对应的MapState中。需要注意的是,left和right都有各自的MapState,这个MapState将Timestamp作为Key,将List集合作为Value(考虑到同一时刻可能会有多条数据)

/**
 * 将当前StreamRecord写入到它所对应的MapState中(left归left,right归right)
 */privatestatic<T>voidaddToBuffer(finalMapState<Long,List<IntervalJoinOperator.BufferEntry<T>>> buffer,finalT value,finallong timestamp)throwsException{// 先拿着时间戳作为key去MapState中取List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);if(elemsInBucket ==null){
        elemsInBucket =newArrayList<>();}// 将StreamRecord包装成BufferEntry(默认未被Join上),add到List集合中
    elemsInBucket.add(newBufferEntry<>(value,false));// 将List集合put到MapState中(时间戳作为Key)
    buffer.put(timestamp, elemsInBucket);}

接着会经历嵌套for循环,判断哪些StreamRecord是满足Join条件的:以当前StreamRecord的Timestamp和指定的上、下界组成时间过滤条件,对当前StreamRecord的“对方MapState”内的每个Timestamp(作为Key)进行比对。

/**
 * 遍历当前StreamRecord的“对方MapState”,判断哪个StreamRecord被Join上了
 */for(Map.Entry<Long,List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()){// “对方MapState”中的Key,即时间戳finallong timestamp  = bucket.getKey();// 如果遍历到的MapState的这个元素的时间戳不在(以当前StreamRecord的时间戳为基准的)Join的范围内,// 说明没Join上,那就跳过本次循环。这是判断哪个StreamRecord是否Join上的核心!if(timestamp < ourTimestamp + relativeLowerBound ||
        timestamp > ourTimestamp + relativeUpperBound){continue;}// 反之,说明已经Join上了,那就取出这个元素的Value,即时间戳所对应的List<BufferEntry<T1>>for(BufferEntry<OTHER> entry: bucket.getValue()){// 将Join上的left和right分发下游(回调用户自定义函数中的processElement()方法)if(isLeft){collect((T1) ourValue,(T2) entry.element, ourTimestamp, timestamp);}else{collect((T1) entry.element,(T2) ourValue, timestamp, ourTimestamp);}}}

一旦某个Key符合时间过滤条件,那就将它所对应的List集合(作为Value)取出来,逐条将其发送给下游,本质就是将其交给自定义Function处理

/**
 * 将满足IntervalJoin条件的StreamRecord发送给下游,本质就是将其交给自定义Function处理
 */privatevoidcollect(T1 left,T2 right,long leftTimestamp,long rightTimestamp)throwsException{finallong resultTimestamp =Math.max(leftTimestamp, rightTimestamp);

    collector.setAbsoluteTimestamp(resultTimestamp);
    context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);// 将Join上的StreamRecord交给自定义Function,执行开发者的处理逻辑
    userFunction.processElement(left, right, context, collector);}

整个过滤筛选过程,也是IntervalJoin的核心所在!

最后,会计算保存在MapState中的StreamRecord的过期清理时间,因为StreamRecord不能一直被保存。本质就是基于InternalTimerService注册Timer,触发时间为:当前StreamRecord的Timestamp + 给定的上界值。

// 经历双层for循环并分发下游后,计算清理时间(当前StreamRecord的时间戳+上界值)long cleanupTime =(relativeUpperBound >0L)? ourTimestamp + relativeUpperBound : ourTimestamp;// 注册Timer来清理保存在MapState中的过期数据if(isLeft){
    internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);}else{
    internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);}

由于IntervalJoinOperator实现了Triggerable接口,因此一旦注册的Timer被触发,就会将对应MapState中对应的Timestamp进行remove

/**
 * 基于InternalTimerService注册的Timer,会定时对MapState执行clean操作
 */@OverridepublicvoidonEventTime(InternalTimer<K,String> timer)throwsException{long timerTimestamp = timer.getTimestamp();String namespace = timer.getNamespace();

    logger.trace("onEventTime @ {}", timerTimestamp);switch(namespace){caseCLEANUP_NAMESPACE_LEFT:{long timestamp =(upperBound <=0L)? timerTimestamp : timerTimestamp - upperBound;
            logger.trace("Removing from left buffer @ {}", timestamp);// clean left
            leftBuffer.remove(timestamp);break;}caseCLEANUP_NAMESPACE_RIGHT:{long timestamp =(lowerBound <=0L)? timerTimestamp + lowerBound : timerTimestamp;
            logger.trace("Removing from right buffer @ {}", timestamp);// clean right
            rightBuffer.remove(timestamp);break;}default:thrownewRuntimeException("Invalid namespace "+ namespace);}}
标签: flink

本文转载自: https://blog.csdn.net/qq_36299025/article/details/132228261
版权归原作者 墨玉浮白 所有, 如有侵权,请联系我们删除。

“深入理解Flink IntervalJoin源码”的评论:

还没有评论