在很多时候我们需要根据指定的时间戳来开始消费kafka中的数据
但是由于flink没有自带的方法
所以只能手动写逻辑来实现从
kafka中根据时间戳开始消费数据
使用OffsetsInitializer接口实现
importorg.apache.flink.api.java.utils.ParameterTool;importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;importorg.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;importorg.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.yaml.snakeyaml.nodes.CollectionNode;importjava.util.Collection;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;/**
* 支持按topic指定开始消费时间戳
*
* @author
*/publicclassKafkaOffsetsInitializerimplementsOffsetsInitializer{privateLogger logger =LoggerFactory.getLogger(KafkaOffsetsInitializer.class);privatestaticfinallong serialVersionUID =1L;/**
* key:topic,value:开始消费时间戳
*/privateMap<String,Long> topicStartingTimestamps;privateParameterTool parameters;/**
* @param topicStartingTimestamps
* @param parameters
*/publicKafkaOffsetsInitializer(Map<String,Long> topicStartingTimestamps,ParameterTool parameters){this.topicStartingTimestamps = topicStartingTimestamps;this.parameters = parameters;}@OverridepublicMap<TopicPartition,Long>getPartitionOffsets(Collection<TopicPartition> partitions,PartitionOffsetsRetriever partitionOffsetsRetriever){//定义起始时间,初始offsetMap<TopicPartition,Long> startingTimestamps =newHashMap<>();Map<TopicPartition,Long> initialOffsets =newHashMap<>();//commited offsetMap<TopicPartition,Long> committedOffsets = partitionOffsetsRetriever.committedOffsets(partitions);//beginningOffsets the first offset for the given partitions.Map<TopicPartition,Long> beginningOffsets = partitionOffsetsRetriever.beginningOffsets(partitions);//endOffsets the for the given partitions.Map<TopicPartition,Long> endOffsets = partitionOffsetsRetriever.endOffsets(partitions);finallong now =System.currentTimeMillis();
partitions.forEach(tp ->{//起始时间赋值为从redis中获取到相对应topic的时间Long startingTimestamp = topicStartingTimestamps.get(tp.topic());if(startingTimestamp ==null){//redis里没有取到消费开始时间从启动时间消费
startingTimestamp = now;
logger.info("从redis没有取到时间戳,topic:{},partition:{},使用当前时间:{},{}", tp.topic(), tp.partition(), now,newDate(now));}
logger.info("读取时间戳,topic:{},partition:{},时间戳:{},{}", tp.topic(), tp.partition(), now,newDate(now));
startingTimestamps.put(tp, startingTimestamp);});
partitionOffsetsRetriever.offsetsForTimes(startingTimestamps).forEach((tp, offsetMetadata)->{long offsetForTime = beginningOffsets.get(tp);long offsetForCommit = beginningOffsets.get(tp);if(offsetMetadata !=null){
offsetForTime = offsetMetadata.offset();
logger.info("根据时间戳取到offset,topic:{},partition:{},offset:{}", tp.topic(), tp.partition(), offsetForTime);}Long commitedOffset = committedOffsets.get(tp);if(commitedOffset !=null){
offsetForCommit = commitedOffset.longValue();
logger.info("根据已提交offset取到offset,topic:{},partition:{},offset:{}", tp.topic(), tp.partition(), offsetForCommit);}
logger.info("设置读取offset,topic:{},partition:{},offset:{},endOffset:{}", tp.topic(), tp.partition(),Math.max(offsetForTime, offsetForCommit), endOffsets.get(tp));//对比时间戳对应的offset和checkpoint保存的offset,取较大值//initialOffsets.put(tp, Math.max(offsetForTime, offsetForCommit));
initialOffsets.put(tp, offsetForCommit);});return initialOffsets;}@OverridepublicOffsetResetStrategygetAutoOffsetResetStrategy(){returnOffsetResetStrategy.NONE;}}
本文转载自: https://blog.csdn.net/Harden_zsc/article/details/132361142
版权归原作者 昌昌苦练背后 所有, 如有侵权,请联系我们删除。
版权归原作者 昌昌苦练背后 所有, 如有侵权,请联系我们删除。