0


【大数据学习 | kafka】kafka的偏移量管理

1. 偏移量的概念

消费者在消费数据的时候需要将消费的记录存储到一个位置,防止因为消费者程序宕机而引起断点消费数据丢失问题,下一次可以按照相应的位置从kafka中找寻数据,这个消费位置记录称之为偏移量offset。

kafka0.9以前版本将偏移量信息记录到zookeeper中

新版本中偏移量信息记录在__consumer_offsets中,这个topic是系统生成的,不仅仅帮助管理偏移量信息还能分配consumer给哪个coordinator管理,是一个非常重要的topic

它的记录方式和我们知道的记录方式一样 groupid + topic + partition ==> offset

其中存储到__consumer_offsets中的数据格式也是按照k-v进行存储的,其中k是groupid + topic + partition
value值为offset的偏移量信息。

  1. [hexuan@hadoop106 ~]$ kafka-topics.sh --bootstrap-server hadoop106:9092 --list
  2. __consumer_offsets
  3. topic_a
  4. topic_b
  5. topic_c
  6. topic_e
  7. topic_f
  8. topic_g

可以看到系统生成的topic

因为之前我们消费过很多数据,现在可以查看一下记录在这个topic中的偏移量信息

其中存在一个kafka-consumer-groups.sh 命令

  1. # 查看消费者组信息
  2. kafka-consumer-groups.sh --bootstrap-server hadoop106:9092 --list
  3. # 查询具体信息
  4. kafka-consumer-groups.sh --bootstrap-server hadoop106:9092 --describe --group my-group
  5. # 查看活跃信息
  6. kafka-consumer-groups.sh --bootstrap-server hadoop106:9092 --describe --group my-group --members

查看消费者组信息:

  1. [hexuan@hadoop106 ~]$ kafka-consumer-groups.sh --bootstrap-server hadoop106:9092 --list
  2. hainiu_group
  3. hainiu_group2

当前使用组信息:

  1. [hexuan@hadoop106 ~]$ kafka-consumer-groups.sh --bootstrap-server hadoop106:9092 --describe --group hainiu_group
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. hainiu_group topic_c 0 0 0 0 consumer-hainiu_group-1-41a9ebd6-99a3-4d83-b1d7-88a2a9295054 /192.168.154.1 consumer-hainiu_group-1
  4. hainiu_group topic_b 1 1438 1438 0 - - -
  5. hainiu_group topic_b 0 1440 1440 0 - - -
  6. hainiu_group topic_b 3 1417 1417 0 - - -
  7. hainiu_group topic_b 4 1473 1473 0 - - -
  8. hainiu_group topic_b 5 1440 1440 0 - - -
  9. hainiu_group topic_b 2 1407 1407 0 - - -
  10. hainiu_group topic_b 6 1391 1391 0 -

当前组消费偏移量信息:

  1. GROUP:组名
  2. TOPICtopic信息
  3. PARTITION:分区
  4. CURRENT-OFFSET:当前消费偏移量
  5. LOG-END-OFFSET:这个分区总共存在多少数据
  6. LAG:还差多少没消费
  7. CONSUMER-ID:随机消费者id
  8. HOST:主机名
  9. CLIENT-ID:客户端id

同时我们也可以查询**__consumer_offset中的原生数据:**

  1. kafka-console-consumer.sh --bootstrap-server hadoop106:9092 \
  2. --topic __consumer_offsets --from-beginning --formatter \
  3. kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter

使用元数据格式化方式查看偏移量信息数据

key展示的是groupid,topic,partition , value值展示的是当前的偏移量信息

并且在这个topic中是追加形式一致往里面写入的

2. 偏移量的自动管理

那么我们已经看到了偏移量的存储但是偏移量究竟是怎么提交的呢?

首先我们没有设置任何的偏移量提交的代码,这个是默认开启的,其中存在两个参数

  1. pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  2. //开启自动提交偏移量信息
  3. pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  4. //默认提交间隔5s

官网的设置参数为两个true和5000。

所以我们在没有开启默认提交的时候已经自动提交了

为了演示自动提交的效果我们引入一个参数

  1. auto.offset.reset

这个参数用于控制没有偏移量存储的时候,应该从什么位置进行消费数据

(因为偏移量自动提交默认是5秒一次,如果数据在5秒内消费完毕,则会造成偏移量并没有存储的情况)

其中参数值官网中给出三个

  1. [latest, earliest, none]
  2. latest:从最新位置消费
  3. earliest:最早位置消费数据
  4. none:如果不指定消费的偏移量直接报错

一定要记得一点,如果有偏移量信息那么以上的设置是无效的.

官方文档显示给出的该参数的默认值为lastest,即从最新位置开始消费。

现在我们设置读取位置为最早位置,并且消费数据,看看可不可以记录偏移量,断点续传

思路:

首先修改组id为一个新的组,然后从最早位置消费数据,如果记录了偏移量,那么重新启动消费者会看到,没有任何数据,因为之前记录了消费数据的位置

整体代码如下:

  1. package com.hainiu.kafka;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.TopicPartition;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import java.time.Duration;
  6. import java.util.*;
  7. public class Consumer1 {
  8. public static void main(String[] args) {
  9. Properties pro = new Properties();
  10. pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
  11. pro.put(ConsumerConfig.GROUP_ID_CONFIG,"new_group");
  12. pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  13. pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  14. pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  15. pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  16. pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  17. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
  18. List<String> topics = Arrays.asList("topic_d","topic_e");
  19. consumer.subscribe(topics);
  20. while (true){
  21. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  22. Iterator<ConsumerRecord<String, String>> it = records.iterator();
  23. while(it.hasNext()){
  24. ConsumerRecord<String, String> record = it.next();
  25. System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
  26. }
  27. }
  28. }
  29. }

运行完毕打印数据

这个时候我们需要在5s之内关闭应用,然后重新启动,因为提交的间隔时间是5s

再次启动

我们发现数据依旧被消费出来了,证明之前的偏移量存储没有任何效果和作用,因为间隔时间是5s

现在我们等待5s后在关闭应用

发现没有任何数据产生,因为偏移量已经提交了

3. 偏移量的手动提交

如上的案例我们发现偏移量的管理如果交给系统自己管理,我们没有办法及时的修改和管理偏移量信息,这个时候我们需要手动来提交给管理偏移量,更加及时和方便

这个时候引入两个方法

  1. consumer.commitAsync();
  2. consumer.commitSync();

commitAsync 异步提交方式:只提交一次,不管成功与否不会重试

commitSync 同步提交方式:同步提交方式会一直提交到成功为止

一般我们都会选择异步提交方式,他们的功能都是将拉取到的一整批数据的最大偏移量直接提交到__consumer_offsets中,但是同步方式会很浪费资源,异步方式虽然不能保证稳定性但是我们的偏移量是一直递增存储的,所以偶尔提交不成功一个两个不影响我们的使用

  1. pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  2. //设定自动提交为false
  3. consumer.commitSync();
  4. consumer.commitAsync();
  5. //设定提交方式为手动提交

整体代码如下:

  1. package com.hainiu.kafka.consumer;
  2. /**
  3. * ClassName : consumer_offsets
  4. * Package : com.hainiu.kafka.consumer
  5. * Description
  6. *
  7. * @Author HeXua
  8. * @Create 2024/11/5 21:30
  9. * Version 1.0
  10. */
  11. import org.apache.kafka.clients.consumer.*;
  12. import org.apache.kafka.common.TopicPartition;
  13. import org.apache.kafka.common.serialization.StringDeserializer;
  14. import java.time.Duration;
  15. import java.util.*;
  16. public class Consumer_CommitSync {
  17. public static void main(String[] args) {
  18. Properties pro = new Properties();
  19. pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
  20. pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");
  21. pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  22. pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  23. pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  24. pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  25. // pro.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  26. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
  27. List<String> topics = Arrays.asList("topic_h");
  28. consumer.subscribe(topics);
  29. while (true){
  30. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  31. Iterator<ConsumerRecord<String, String>> it = records.iterator();
  32. while(it.hasNext()){
  33. ConsumerRecord<String, String> record = it.next();
  34. System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
  35. }
  36. consumer.commitAsync();
  37. // consumer.commitSync();
  38. }
  39. }
  40. }

现在先在topic中输入部分数据

然后启动消费者,当存在数据打印的时候马上关闭掉应用,在此启动会发现数据不会重新消费

  1. topic_h->5->12->null->1
  2. topic_h->5->13->null->2
  3. topic_h->5->14->null->3
  4. topic_h->5->15->null->4
  5. topic_h->5->16->null->5
  6. topic_h->5->17->null->6

偏移量已经提交不会重复消费数据

4. 断点消费数据

在没有偏移量的时候我们可以设定

auto.offset.reset进行数据的消费

可选参数有 latest earliest none等位置

但是如果存在偏移量以上的设定就不在好用了,我们需要根据偏移量的位置进行断点消费数据

但是有的时候我们需要指定位置消费相应的数据

这个时候我们需要使用到

  1. consumer.seek();
  2. //可以指定位置进行数据的检索

但是我们不能随意的指定消费者消费数据的位置,因为在启动消费者的时候,一个组中会存在多个消费者,每个人拿到的对应分区是不同的,所以我们需要知道这个消费者能够获取的分区是哪个,然后再指定相应的断点位置

这里我们就需要监控分区的方法展示出来所有订阅的分区信息

  1. consumer.subscribe(topics, new ConsumerRebalanceListener() {
  2. @Override
  3. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  4. }
  5. @Override
  6. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  7. }
  8. });

为了演示效果我们使用生产者在topic_d中增加多个消息

  1. package com.hainiu.kafka.consumer;
  2. /**
  3. * ClassName : Producer2
  4. * Package : com.hainiu.kafka.consumer
  5. * Description
  6. *
  7. * @Author HeXua
  8. * @Create 2024/11/5 23:01
  9. * Version 1.0
  10. */
  11. import org.apache.kafka.clients.producer.KafkaProducer;
  12. import org.apache.kafka.clients.producer.ProducerConfig;
  13. import org.apache.kafka.clients.producer.ProducerRecord;
  14. import org.apache.kafka.common.serialization.StringSerializer;
  15. import java.util.Properties;
  16. public class Producer2 {
  17. public static void main(String[] args) {
  18. Properties pro = new Properties();
  19. pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
  20. pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  21. pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  22. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
  23. for (int i = 0; i < 1000; i++) {
  24. ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic_d", "" + i, "message"+i);
  25. producer.send(record);
  26. }
  27. producer.close();
  28. }
  29. }

随机发送数据到不同的节点,使用随机key

然后使用断点消费数据

不设置任何的偏移量提交操作和断点位置

  1. package com.hainiu.kafka.consumer;
  2. /**
  3. * ClassName : ConsumerWithUDOffset
  4. * Package : com.hainiu.kafka.consumer
  5. * Description
  6. *
  7. * @Author HeXua
  8. * @Create 2024/11/5 23:03
  9. * Version 1.0
  10. */
  11. import org.apache.kafka.clients.consumer.*;
  12. import org.apache.kafka.common.TopicPartition;
  13. import org.apache.kafka.common.serialization.StringDeserializer;
  14. import java.time.Duration;
  15. import java.util.*;
  16. public class ConsumerWithUDOffset {
  17. public static void main(String[] args) {
  18. Properties pro = new Properties();
  19. pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
  20. pro.put(ConsumerConfig.GROUP_ID_CONFIG,"new1");
  21. pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  22. pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  23. pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  24. pro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,6000);
  25. pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
  26. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
  27. List<String> topics = Arrays.asList("topic_h");
  28. // range roundRobin sticky cooperativeSticky
  29. consumer.subscribe(topics, new ConsumerRebalanceListener() {
  30. @Override
  31. public void onPartitionsRevoked(Collection<TopicPartition> collection) {
  32. }
  33. @Override
  34. public void onPartitionsAssigned(Collection<TopicPartition> collection) {
  35. for (TopicPartition topicPartition : collection) {
  36. consumer.seek(topicPartition,195);
  37. }
  38. }
  39. });
  40. while (true){
  41. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  42. Iterator<ConsumerRecord<String, String>> it = records.iterator();
  43. while(it.hasNext()){
  44. ConsumerRecord<String, String> record = it.next();
  45. System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
  46. }
  47. consumer.commitAsync();
  48. }
  49. }
  50. }

5. 时间断点

kafka没有给大家提供直接根据时间找到断点位置的方法,我们需要根据时间找到偏移量,然后根据偏移量进行数据消费

  1. consumer.offsetsForTimes();
  2. //通过这个方法找到对应时间的偏移量位置
  3. consumer.seek();
  4. //然后在通过这个方法根据断点进行消费数据

整体代码如下

  1. package com.hainiu.kafka;
  2. import org.apache.kafka.clients.consumer.*;
  3. import org.apache.kafka.common.TopicPartition;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import java.time.Duration;
  6. import java.util.*;
  7. public class Consumer1 {
  8. public static void main(String[] args) {
  9. Properties pro = new Properties();
  10. pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
  11. pro.put(ConsumerConfig.GROUP_ID_CONFIG,"new_group221");
  12. pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  13. pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  14. pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
  15. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
  16. List<String> topics = Arrays.asList("topic_e");
  17. consumer.subscribe(topics, new ConsumerRebalanceListener() {
  18. @Override
  19. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  20. // no op
  21. }
  22. @Override
  23. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  24. HashMap<TopicPartition, Long> map = new HashMap<>();
  25. for (TopicPartition partition : partitions) {
  26. map.put(partition,1675076400000L);
  27. //将时间和分区绑定在一起,然后合并在一起放入到检索方法中
  28. }
  29. Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map);
  30. //根据时间获取时间对应的偏移量位置
  31. for (Map.Entry<TopicPartition, OffsetAndTimestamp> en : offsets.entrySet()) {
  32. System.out.println(en.getKey()+"-->"+en.getValue());
  33. if(en.getValue() != null){
  34. consumer.seek(en.getKey(),en.getValue().offset());
  35. //获取每个分区的偏移量的位置,使用seek进行找寻数据
  36. }
  37. }
  38. }
  39. });
  40. while (true){
  41. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  42. Iterator<ConsumerRecord<String, String>> it = records.iterator();
  43. while(it.hasNext()){
  44. ConsumerRecord<String, String> record = it.next();
  45. System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
  46. }
  47. // consumer.commitAsync();
  48. }
  49. }
  50. }
标签: 大数据 sqlite oracle

本文转载自: https://blog.csdn.net/2301_80912559/article/details/143525859
版权归原作者 Vez'nan的幸福生活 所有, 如有侵权,请联系我们删除。

“【大数据学习 | kafka】kafka的偏移量管理”的评论:

还没有评论