0


使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费

文章目录


1、指定 Offset 消费

auto.offset.reset = earliest | latest | none 默认是 latest
(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

这个参数的力度太大了,不是从头,就是从尾
kafka提供了

seek方法

,可以让我们从分区的固定位置开始消费

seek(TopicPartition topicPartition,offset offset)

示例代码:

packagecom.bigdata.kafka.consumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.ArrayList;importjava.util.Properties;importjava.util.Set;publicclassCustomConsumerSeek{publicstaticvoidmain(String[] args){Properties properties =newProperties();// 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");// 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 关闭自动提交offset
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<String,String>(properties);// 2 订阅一个主题ArrayList<String> topics =newArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);// 执行计划// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size()==0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
            kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
            assignment = kafkaConsumer.assignment();}// 获取所有分区的offset =5 以后的数据/*for (TopicPartition tp:assignment) {
            kafkaConsumer.seek(tp,5);
        }*/// 获取分区0的offset =5 以后的数据//kafkaConsumer.seek(new TopicPartition("bigdata",0),5);for(TopicPartition tp:assignment){if(tp.partition()==0){
                kafkaConsumer.seek(tp,5);}}while(true){//1 秒中向kafka拉取一批数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String,String>record:records){// 打印一条数据System.out.println(record);// 可以打印记录中的很多内容,比如 key  value  offset topic 等信息System.out.println(record.value());}}}}

2、指定时间消费

示例代码:

packagecom.bigdata.consumer;importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.*;/**
 *  从某个特定的时间开始进行消费
 */publicclassCustomer05{publicstaticvoidmain(String[] args){// 其实就是mapProperties properties =newProperties();// 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");// 字段反序列化   key 和  value

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"testf");// 指定分区的分配方案  为轮询策略//properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");// 指定分区的分配策略为:Sticky(粘性)ArrayList<String> startegys =newArrayList<>();
        startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);// 创建一个kafka消费者的对象KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<String,String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<String> topics =newArrayList<>();
        topics.add("five");// list总可以设置多个主题的名称
        kafkaConsumer.subscribe(topics);// 因为消费者是不停的消费,所以是while true// 指定了获取分区数据的起始位置。// 这样写会报错的,因为前期消费需要指定计划,指定计划需要时间// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size()==0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
            kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
            assignment = kafkaConsumer.assignment();}Map<TopicPartition,Long> hashMap =newHashMap<>();for(TopicPartition partition:assignment){
            hashMap.put(partition,System.currentTimeMillis()-60*60*1000);}Map<TopicPartition,OffsetAndTimestamp> map = kafkaConsumer.offsetsForTimes(hashMap);for(TopicPartition partition:assignment){OffsetAndTimestamp offsetAndTimestamp = map.get(partition);
            kafkaConsumer.seek(partition,offsetAndTimestamp.offset());}while(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for(ConsumerRecordrecord:records){// 打印数据中的值System.out.println(record.value());System.out.println(record.offset());// 打印一条数据System.out.println(record);}}}}
标签: java kafka offset

本文转载自: https://blog.csdn.net/lzhlizihang/article/details/144035326
版权归原作者 lzhlizihang 所有, 如有侵权,请联系我们删除。

“使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费”的评论:

还没有评论