0


源码解析FlinkKafkaConsumer支持punctuated水位线发送

背景

FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码

punctuated 水位线发送源码解析

1.首先KafkaFetcher中的runFetchLoop方法

publicvoidrunFetchLoop()throwsException{try{// kick off the actual Kafka consumer
            consumerThread.start();while(running){// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinalConsumerRecords<byte[],byte[]> records = handover.pollNext();// get the records for each topic partitionfor(KafkaTopicPartitionState<T,TopicPartition> partition :subscribedPartitionStates()){List<ConsumerRecord<byte[],byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());// 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}}finally{// this signals the consumer thread that no more work is to be done
            consumerThread.shutdown();}

2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线

protectedvoidemitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp){// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized(checkpointLock){T record;while((record = records.poll())!=null){long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子
                sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线
                partitionState.onEvent(record, timestamp);}
            partitionState.setOffset(offset);}}```

3.处理每个分区的水位线

```java
    publicvoidonEvent(T event,long timestamp){
        watermarkGenerator.onEvent(event, timestamp, immediateOutput);}publicvoidonEvent(T event,long eventTimestamp,WatermarkOutput output){finalorg.apache.flink.streaming.api.watermark.Watermark next =
                wms.checkAndGetNextWatermark(event, eventTimestamp);if(next !=null){
            output.emitWatermark(newWatermark(next.getTimestamp()));}}
    其中 output.emitWatermark(newWatermark(next.getTimestamp()));对应方法如下
            publicvoidemitWatermark(Watermark watermark){long timestamp = watermark.getTimestamp();// 更新每个分区对应的水位线,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了if(wasUpdated && timestamp > combinedWatermark){updateCombinedWatermark();}}//每个分区水位线的更新如下publicbooleansetWatermark(long watermark){this.idle =false;finalboolean updated = watermark >this.watermark;this.watermark =Math.max(watermark,this.watermark);return updated;}

4.最后是发送算子任务级别的水位线的方法

privatevoidupdateCombinedWatermark(){long minimumOverAllOutputs =Long.MAX_VALUE;boolean hasOutputs =false;boolean allIdle =true;for(OutputState outputState : watermarkOutputs){if(!outputState.isIdle()){
                minimumOverAllOutputs =Math.min(minimumOverAllOutputs, outputState.getWatermark());
                allIdle =false;}
            hasOutputs =true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif(!hasOutputs){return;}if(allIdle){
            underlyingOutput.markIdle();}elseif(minimumOverAllOutputs > combinedWatermark){
            combinedWatermark = minimumOverAllOutputs;
            underlyingOutput.emitWatermark(newWatermark(minimumOverAllOutputs));}}

你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的

标签: 大数据 flink

本文转载自: https://blog.csdn.net/lixia0417mul2/article/details/133823145
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。

“源码解析FlinkKafkaConsumer支持punctuated水位线发送”的评论:

还没有评论