0


深入理解 Kafka 的 ConsumerRebalanceListener

深入理解 Kafka 的 ConsumerRebalanceListener

在分布式系统中,数据的一致性和可靠性是至关重要的。Apache Kafka 作为一个流行的分布式流处理平台,提供了强大的数据传输和处理能力。在 Kafka 中,消费者组(Consumer Group)的概念允许多个消费者实例共同处理一个主题的数据。然而,当消费者实例的个数发生变化时,如何确保数据的平衡和一致性呢?这就引出了我们今天要讨论的主题——

ConsumerRebalanceListener

ConsumerRebalanceListener 简介

ConsumerRebalanceListener

是 Kafka 提供的一个回调接口,用户可以实现该接口来监听分区重新平衡(partition rebalance)事件。当消费者组中的分区分配发生变化时,Kafka 会触发重新平衡操作。这个接口有两个主要的方法:

  1. onPartitionsRevoked(Collection<TopicPartition> partitions):在重新平衡操作期间,当消费者需要放弃一些分区时调用。
  2. onPartitionsAssigned(Collection<TopicPartition> partitions):在分区重新分配完成并且消费者开始获取数据之前调用,并且只有在调用 Consumer#poll() 方法时才会触发。

示例配置

在开始之前,我们需要配置 Kafka 服务器和相关的生产者与消费者属性。以下是示例配置代码:

packagecom.logicbig.example;importjava.util.Properties;publicclassExampleConfig{publicstaticfinalStringBROKERS="localhost:9092";publicstaticPropertiesgetProducerProps(){Properties props =newProperties();
      props.put("bootstrap.servers",BROKERS);
      props.put("acks","all");
      props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");return props;}publicstaticPropertiesgetConsumerProps(){Properties props =newProperties();
      props.setProperty("bootstrap.servers",BROKERS);
      props.setProperty("group.id","testGroup");
      props.setProperty("enable.auto.commit","false");
      props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return props;}}

创建示例主题

接下来,我们创建一个具有 3 个分区的主题:

packagecom.logicbig.example;importorg.apache.kafka.clients.admin.AdminClient;importorg.apache.kafka.clients.admin.AdminClientConfig;importorg.apache.kafka.clients.admin.NewTopic;importjava.util.Collections;importjava.util.Properties;importjava.util.stream.Collectors;publicclassTopicCreator{publicstaticvoidmain(String[] args)throwsException{createTopic("example-topic-2020-6-24",3);}privatestaticvoidcreateTopic(String topicName,int numPartitions)throwsException{Properties config =newProperties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,ExampleConfig.BROKERS);AdminClient admin =AdminClient.create(config);// 检查主题是否已存在boolean alreadyExists = admin.listTopics().names().get().stream().anyMatch(existingTopicName -> existingTopicName.equals(topicName));if(alreadyExists){System.out.printf("topic already exits: %s%n", topicName);}else{// 创建新主题System.out.printf("creating topic: %s%n", topicName);NewTopic newTopic =newNewTopic(topicName, numPartitions,(short)1);
          admin.createTopics(Collections.singleton(newTopic)).all().get();}// 描述主题System.out.println("-- describing topic --");
      admin.describeTopics(Collections.singleton(topicName)).all().get().forEach((topic, desc)->{System.out.println("Topic: "+ topic);System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
                           desc.partitions().stream().map(p ->Integer.toString(p.partition())).collect(Collectors.joining(",")));});

      admin.close();}}

使用 ConsumerRebalanceListener

最后,我们通过一个示例来展示

ConsumerRebalanceListener

的使用:

packagecom.logicbig.example;importorg.apache.kafka.clients.consumer.ConsumerRebalanceListener;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importjava.time.Duration;importjava.util.*;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeUnit;importjava.util.stream.Collectors;publicclassConsumerRebalanceListenerExample{publicstaticvoidmain(String[] args)throwsInterruptedException{ExecutorService executorService =Executors.newFixedThreadPool(3);for(int i =0; i <3; i++){int finalI = i;
          executorService.execute(()->startConsumer("consumer-"+ finalI));Thread.sleep(3000);}
      executorService.shutdown();
      executorService.awaitTermination(3,TimeUnit.MINUTES);}privatestaticKafkaConsumer<String,String>startConsumer(String name){Properties consumerProps =ExampleConfig.getConsumerProps();KafkaConsumer<String,String> consumer =newKafkaConsumer<>(consumerProps);
      consumer.subscribe(Collections.singleton("example-topic-2020-6-24"),newConsumerRebalanceListener(){@OverridepublicvoidonPartitionsRevoked(Collection<TopicPartition> partitions){System.out.printf("onPartitionsRevoked - consumerName: %s, partitions: %s%n", name,formatPartitions(partitions));}@OverridepublicvoidonPartitionsAssigned(Collection<TopicPartition> partitions){System.out.printf("onPartitionsAssigned - consumerName: %s, partitions: %s%n", name,formatPartitions(partitions));}});System.out.printf("starting consumerName: %s%n", name);
      consumer.poll(Duration.ofSeconds(10));System.out.printf("closing consumerName: %s%n", name);
      consumer.close();return consumer;}privatestaticList<String>formatPartitions(Collection<TopicPartition> partitions){return partitions.stream().map(topicPartition ->String.format("topic: %s, partition: %s", topicPartition.topic(), topicPartition.partition())).collect(Collectors.toList());}}

在这个示例中,我们首先启动了一个消费者实例,随后每隔 3 秒启动一个新的消费者实例。可以看到,随着消费者实例的增加和关闭,分区的分配也在不断变化。通过

ConsumerRebalanceListener

,我们可以在分区分配变化时执行一些自定义操作。

总结

通过本文的示例,我们可以看到

ConsumerRebalanceListener

在 Kafka 中的应用。它允许我们在分区重新平衡时执行一些自定义操作,从而更好地管理消费者组中的分区分配。这对于确保数据的一致性和可靠性至关重要。

项目依赖和技术

以下是本示例项目中使用的一些关键依赖和技术:

  • Apache Kafka 2.5.0
  • JDK 8
  • Maven 3.5.4

希望本文能帮助你更好地理解 Kafka 中的分区重新平衡机制以及如何使用

ConsumerRebalanceListener

标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/m0_62153576/article/details/140897600
版权归原作者 t0_54coder 所有, 如有侵权,请联系我们删除。

“深入理解 Kafka 的 ConsumerRebalanceListener”的评论:

还没有评论