0


flink消费kafka数据,按照指定时间开始消费

在很多时候我们需要根据指定的时间戳来开始消费kafka中的数据
但是由于flink没有自带的方法
所以只能手动写逻辑来实现从
kafka中根据时间戳开始消费数据
使用OffsetsInitializer接口实现

importorg.apache.flink.api.java.utils.ParameterTool;importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;importorg.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;importorg.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.yaml.snakeyaml.nodes.CollectionNode;importjava.util.Collection;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;/**
 * 支持按topic指定开始消费时间戳
 *
 * @author 
 */publicclassKafkaOffsetsInitializerimplementsOffsetsInitializer{privateLogger logger =LoggerFactory.getLogger(KafkaOffsetsInitializer.class);privatestaticfinallong serialVersionUID =1L;/**
     * key:topic,value:开始消费时间戳
     */privateMap<String,Long> topicStartingTimestamps;privateParameterTool parameters;/**
     * @param topicStartingTimestamps
     * @param parameters
     */publicKafkaOffsetsInitializer(Map<String,Long> topicStartingTimestamps,ParameterTool parameters){this.topicStartingTimestamps = topicStartingTimestamps;this.parameters = parameters;}@OverridepublicMap<TopicPartition,Long>getPartitionOffsets(Collection<TopicPartition> partitions,PartitionOffsetsRetriever partitionOffsetsRetriever){//定义起始时间,初始offsetMap<TopicPartition,Long> startingTimestamps =newHashMap<>();Map<TopicPartition,Long> initialOffsets =newHashMap<>();//commited offsetMap<TopicPartition,Long> committedOffsets = partitionOffsetsRetriever.committedOffsets(partitions);//beginningOffsets the first offset for the given partitions.Map<TopicPartition,Long> beginningOffsets = partitionOffsetsRetriever.beginningOffsets(partitions);//endOffsets the for the given partitions.Map<TopicPartition,Long> endOffsets = partitionOffsetsRetriever.endOffsets(partitions);finallong now =System.currentTimeMillis();
        partitions.forEach(tp ->{//起始时间赋值为从redis中获取到相对应topic的时间Long startingTimestamp = topicStartingTimestamps.get(tp.topic());if(startingTimestamp ==null){//redis里没有取到消费开始时间从启动时间消费
                startingTimestamp = now;
                logger.info("从redis没有取到时间戳,topic:{},partition:{},使用当前时间:{},{}", tp.topic(), tp.partition(), now,newDate(now));}
            logger.info("读取时间戳,topic:{},partition:{},时间戳:{},{}", tp.topic(), tp.partition(), now,newDate(now));
            startingTimestamps.put(tp, startingTimestamp);});
        partitionOffsetsRetriever.offsetsForTimes(startingTimestamps).forEach((tp, offsetMetadata)->{long offsetForTime = beginningOffsets.get(tp);long offsetForCommit = beginningOffsets.get(tp);if(offsetMetadata !=null){
                offsetForTime = offsetMetadata.offset();
                logger.info("根据时间戳取到offset,topic:{},partition:{},offset:{}", tp.topic(), tp.partition(), offsetForTime);}Long commitedOffset = committedOffsets.get(tp);if(commitedOffset !=null){
                offsetForCommit = commitedOffset.longValue();
                logger.info("根据已提交offset取到offset,topic:{},partition:{},offset:{}", tp.topic(), tp.partition(), offsetForCommit);}
            logger.info("设置读取offset,topic:{},partition:{},offset:{},endOffset:{}", tp.topic(), tp.partition(),Math.max(offsetForTime, offsetForCommit), endOffsets.get(tp));//对比时间戳对应的offset和checkpoint保存的offset,取较大值//initialOffsets.put(tp, Math.max(offsetForTime, offsetForCommit));
            initialOffsets.put(tp, offsetForCommit);});return initialOffsets;}@OverridepublicOffsetResetStrategygetAutoOffsetResetStrategy(){returnOffsetResetStrategy.NONE;}}
标签: flink kafka

本文转载自: https://blog.csdn.net/Harden_zsc/article/details/132361142
版权归原作者 昌昌苦练背后 所有, 如有侵权,请联系我们删除。

“flink消费kafka数据,按照指定时间开始消费”的评论:

还没有评论