目录
一、自动提交offset的相关参数
- 官网文档
- 参数解释参数描述enable.auto.commi默认值为 true,消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
- 图解分析
二、消费者(自动提交 offset)代码示例
- 消费者自动提交 offset代码
// 自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 提交时间间隔 1秒properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
- 消费者自动提交 offset代码完整代码
packagecom.xz.kafka.consumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.ArrayList;importjava.util.Properties;publicclassCustomConsumerAutoOffset{publicstaticvoidmain(String[] args){// 配置Properties properties =newProperties();// 连接 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 提交时间间隔 1秒 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);// 1 创建一个消费者 "", "hello"KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics =newArrayList<>(); topics.add("sevenTopic"); kafkaConsumer.subscribe(topics);// 3 消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}}
标签:
kafka
本文转载自: https://blog.csdn.net/li1325169021/article/details/132794902
版权归原作者 小志的博客 所有, 如有侵权,请联系我们删除。
版权归原作者 小志的博客 所有, 如有侵权,请联系我们删除。