学习目录
一、offset的基本概述
offset定义:消费者再消费的过程中通过offset来记录消费数据的具体位置
offset存放的位置:从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic(系统主题)中,名为__consumer_offsets,即offset维护在系统主题中
说明:__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact(压缩存储),也就是每个 group.id+topic+分区号就保留最新数据
1.面试题☆☆☆
问:消费者的offset维护在什么位置
答:在0.9版本之前维护在zookeeper当中,0.9版本之后维护在系统主题当中
二、自动提交offset
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能
自动提交offset的相关参数如下:
- enable.auto.commit:是否开启自动提交offset功能,默认是true
- auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
packagecom.kafka.consumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.ArrayList;importjava.util.Properties;/**
* @author wangbo
* @version 1.0
*//**
* 自动提交offset
*/publicclassCustomConsumer_03{publicstaticvoidmain(String[] args){//配置Properties properties =newProperties();//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");//多写一个,避免其中一台挂掉,保证数据的可靠性//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组ID 可以任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//自动提交,默认为true采用自动提交,为false则为手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//提交时间间隔,默认为5000毫秒,即5s。我们修改为2秒
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,2000);//1.创建一个消费者 "","hello"KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);//2.订阅主题 first3ArrayList<String> topics =newArrayList<String>();
topics.add("first3");
kafkaConsumer.subscribe(topics);//3.消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据//循环打印消费的数据 consumerRecords.forfor(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}}
三、手动提交offset
虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)
- 相同点:都会将本次提交的一批数据最高的偏移量提交
- 不同点:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败
比较
同步提交:必须等待offset提交完毕,再去消费下一批数据 ,由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低
异步提交:发送完提交offset请求后,就开始消费下一批数据了,由于同步提交 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式
同步提交和异步提交的API代码
packagecom.kafka.consumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.ArrayList;importjava.util.Properties;/**
* @author wangbo
* @version 1.0
*//**
* offset 同步提交
*/publicclassCustomConsumer_04{publicstaticvoidmain(String[] args){//配置Properties properties =newProperties();//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");//多写一个,避免其中一台挂掉,保证数据的可靠性//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组ID 可以任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//手动提交 需要将参数改为false
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//1.创建一个消费者 "","hello"KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);//2.订阅主题 first3ArrayList<String> topics =newArrayList<String>();
topics.add("first3");
kafkaConsumer.subscribe(topics);//3.消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据//循环打印消费的数据 consumerRecords.forfor(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}//手动提交offset
kafkaConsumer.commitSync();//同步提交// kafkaConsumer.commitAsync(); //异步提交}}}
四、指定offset位置消费
问题引入:当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
可以通过设置offset的消费位置,进行开始消费
auto.offset.reset = earliest | latest | none 默认是 latest
- earliest:自动将偏移量重置为最早的偏移量
- latest(默认值):自动将偏移量重置为最新偏移量
- none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
API中通过下面参数进行配置
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
还有一种是在任意指定 offset 位移开始消费
packagecom.kafka.consumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.ArrayList;importjava.util.Properties;importjava.util.Set;/**
* @author wangbo
* @version 1.0
*//**
* 1. 指定offset位置进行消费
*/publicclassCustomConsumer_05{publicstaticvoidmain(String[] args){//配置Properties properties =newProperties();//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");//多写一个,避免其中一台挂掉,保证数据的可靠性//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组ID 可以任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//1.创建一个消费者 "","hello"KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);//2.订阅主题 first3ArrayList<String> topics =newArrayList<String>();
topics.add("first3");
kafkaConsumer.subscribe(topics);//指定位置进行消费//获取分区信息,返回一个分区集合Set<TopicPartition> assignment = kafkaConsumer.assignment();//保证分区的分配方案指定完毕while(assignment.size()==0){//说明没有分区分配方案
kafkaConsumer.poll(Duration.ofSeconds(1));//通过拉去数据,来获取分区分配方案//获取分区信息,返回一个分区集合,相当于更新一下
assignment = kafkaConsumer.assignment();}//遍历分区集合,拿到所有的分区信息,指定消费的offsetfor(TopicPartition topicPartition : assignment){
kafkaConsumer.seek(topicPartition,100);//指定offset为100,从100的位置进行消费数据}//3.消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据//循环打印消费的数据 consumerRecords.forfor(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}}
五、指定时间消费
需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
packagecom.kafka.consumer;importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.*;/**
* @author wangbo
* @version 1.0
*//**
* 1. 指定时间消费,把时间转换为对应的offset
*/publicclassCustomConsumer_06{publicstaticvoidmain(String[] args){//配置Properties properties =newProperties();//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");//多写一个,避免其中一台挂掉,保证数据的可靠性//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组ID 可以任意起
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//1.创建一个消费者 "","hello"KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);//2.订阅主题 first3ArrayList<String> topics =newArrayList<String>();
topics.add("first3");
kafkaConsumer.subscribe(topics);//==============================================================================================================//指定位置进行消费//获取分区信息,返回一个分区集合Set<TopicPartition> assignment = kafkaConsumer.assignment();//保证分区的分配方案指定完毕while(assignment.size()==0){//说明没有分区分配方案
kafkaConsumer.poll(Duration.ofSeconds(1));//通过拉去数据,来获取分区分配方案//获取分区信息,返回一个分区集合,相当于更新一下
assignment = kafkaConsumer.assignment();}//--------------------------------------------------------------------------------------------------------------//把时间转换为对应的offset//Key:TopicPartition主题分区 value:对应的时间HashMap<TopicPartition,Long> topicPartitionLongHashMap =newHashMap<>();//封装对应的集合,对集合中添加数据for(TopicPartition topicPartition : assignment){
topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis()-1*24*3600*1000);//当前时间-1天的时间 = 一天前的时间}//需要传入一个Map集合Map<TopicPartition,OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);//--------------------------------------------------------------------------------------------------------------//遍历分区集合,拿到所有的分区信息,指定消费的offsetfor(TopicPartition topicPartition : assignment){OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);//获取集合中分区对应的value值,下面通过offset()方法进行转换
kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());//指定offset为100,从100的位置进行消费数据}//==============================================================================================================//3.消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据//循环打印消费的数据 consumerRecords.forfor(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}}
六、消费者事物
问题引入:
- 重复消费:已经消费了数据,但是 offset 没提交
- 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费
为了解决以上问题,保证数据的精确一次性消费,需要使用消费者事物的方式进行处理
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定,跟生产者事物类似 p57
七、数据积压(提高吞吐量)
- 如果是Kafka消费能力不足,则可以考虑增 加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
- 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压
版权归原作者 王博1999 所有, 如有侵权,请联系我们删除。