0


kafka:消费者从指定时间的偏移开始消费(二)

我的前一篇博客《kafka:AdminClient获取指定主题的所有消费者的消费偏移(一)》为了忽略忽略掉上线之前的所有消息,从获取指定主题的所有消费者的消费偏移并计算出最大偏移来解决此问题。
但这个方案需要使用不常用的AdminClient类,而且如果该主题如果是第一次被消费者拉取消息时,因为得不到消费者的消费偏移,最后的结果,就是从0偏移开始拉取所有消息。并不能真正实现忽略上线之前所有消息的目的。
所以我又优化了方案。基本的原理就是使用

KafkaConsumer.offsetsForTimes

方法获取消费者的所有主题分区的指定时间的偏移,并将这个偏移作为消费开始的偏移(

KafkaConsumer.seek

方法) 。

@Testpublicvoidtest3SeekToTime(){// 配置Kafka消费者的属性Properties props =newProperties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("group.id","my_consumer_group");
        props.put("key.deserializer",StringDeserializer.class.getName());
        props.put("value.deserializer",StringDeserializer.class.getName());// 创建Kafka消费者实例try(Consumer<String,String> consumer =newKafkaConsumer<>(props)){boolean seek =false;/** 
             * 循环开始的时间,
             * 忽略该时间之前的消息
             */long startMills =System.currentTimeMillis();while(true){try{ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(4000));if(!seek){if(!records.isEmpty()){/** 
                             * 获取第一批消息时更新消息偏移到循环开始的时间
                             */
                            consumer.offsetsForTimes(Maps.asMap(consumer.assignment(),t->startMills)).forEach((k,v)->{if(null!= v){System.out.println("seek %s to %s",k,v.offset());
                                    consumer.seek(k,v.offset());}});
                            seek =true;}/** 跳过第一批获取到的消息,继续循环 */continue;}
                    records.forEach(record ->{String value = record.value();System.out.println("Received message: "+ value);});}catch(Exception e){
                    e.printStackTrace();}}}}
标签: kafka java 偏移

本文转载自: https://blog.csdn.net/10km/article/details/131941949
版权归原作者 10km 所有, 如有侵权,请联系我们删除。

“kafka:消费者从指定时间的偏移开始消费(二)”的评论:

还没有评论