深入理解 Kafka 的 ConsumerRebalanceListener
在分布式系统中,数据的一致性和可靠性是至关重要的。Apache Kafka 作为一个流行的分布式流处理平台,提供了强大的数据传输和处理能力。在 Kafka 中,消费者组(Consumer Group)的概念允许多个消费者实例共同处理一个主题的数据。然而,当消费者实例的个数发生变化时,如何确保数据的平衡和一致性呢?这就引出了我们今天要讨论的主题——
ConsumerRebalanceListener
。
ConsumerRebalanceListener 简介
ConsumerRebalanceListener
是 Kafka 提供的一个回调接口,用户可以实现该接口来监听分区重新平衡(partition rebalance)事件。当消费者组中的分区分配发生变化时,Kafka 会触发重新平衡操作。这个接口有两个主要的方法:
onPartitionsRevoked(Collection<TopicPartition> partitions)
:在重新平衡操作期间,当消费者需要放弃一些分区时调用。onPartitionsAssigned(Collection<TopicPartition> partitions)
:在分区重新分配完成并且消费者开始获取数据之前调用,并且只有在调用Consumer#poll()
方法时才会触发。
示例配置
在开始之前,我们需要配置 Kafka 服务器和相关的生产者与消费者属性。以下是示例配置代码:
packagecom.logicbig.example;importjava.util.Properties;publicclassExampleConfig{publicstaticfinalStringBROKERS="localhost:9092";publicstaticPropertiesgetProducerProps(){Properties props =newProperties();
props.put("bootstrap.servers",BROKERS);
props.put("acks","all");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");return props;}publicstaticPropertiesgetConsumerProps(){Properties props =newProperties();
props.setProperty("bootstrap.servers",BROKERS);
props.setProperty("group.id","testGroup");
props.setProperty("enable.auto.commit","false");
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return props;}}
创建示例主题
接下来,我们创建一个具有 3 个分区的主题:
packagecom.logicbig.example;importorg.apache.kafka.clients.admin.AdminClient;importorg.apache.kafka.clients.admin.AdminClientConfig;importorg.apache.kafka.clients.admin.NewTopic;importjava.util.Collections;importjava.util.Properties;importjava.util.stream.Collectors;publicclassTopicCreator{publicstaticvoidmain(String[] args)throwsException{createTopic("example-topic-2020-6-24",3);}privatestaticvoidcreateTopic(String topicName,int numPartitions)throwsException{Properties config =newProperties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,ExampleConfig.BROKERS);AdminClient admin =AdminClient.create(config);// 检查主题是否已存在boolean alreadyExists = admin.listTopics().names().get().stream().anyMatch(existingTopicName -> existingTopicName.equals(topicName));if(alreadyExists){System.out.printf("topic already exits: %s%n", topicName);}else{// 创建新主题System.out.printf("creating topic: %s%n", topicName);NewTopic newTopic =newNewTopic(topicName, numPartitions,(short)1);
admin.createTopics(Collections.singleton(newTopic)).all().get();}// 描述主题System.out.println("-- describing topic --");
admin.describeTopics(Collections.singleton(topicName)).all().get().forEach((topic, desc)->{System.out.println("Topic: "+ topic);System.out.printf("Partitions: %s, partition ids: %s%n", desc.partitions().size(),
desc.partitions().stream().map(p ->Integer.toString(p.partition())).collect(Collectors.joining(",")));});
admin.close();}}
使用 ConsumerRebalanceListener
最后,我们通过一个示例来展示
ConsumerRebalanceListener
的使用:
packagecom.logicbig.example;importorg.apache.kafka.clients.consumer.ConsumerRebalanceListener;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importjava.time.Duration;importjava.util.*;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeUnit;importjava.util.stream.Collectors;publicclassConsumerRebalanceListenerExample{publicstaticvoidmain(String[] args)throwsInterruptedException{ExecutorService executorService =Executors.newFixedThreadPool(3);for(int i =0; i <3; i++){int finalI = i;
executorService.execute(()->startConsumer("consumer-"+ finalI));Thread.sleep(3000);}
executorService.shutdown();
executorService.awaitTermination(3,TimeUnit.MINUTES);}privatestaticKafkaConsumer<String,String>startConsumer(String name){Properties consumerProps =ExampleConfig.getConsumerProps();KafkaConsumer<String,String> consumer =newKafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singleton("example-topic-2020-6-24"),newConsumerRebalanceListener(){@OverridepublicvoidonPartitionsRevoked(Collection<TopicPartition> partitions){System.out.printf("onPartitionsRevoked - consumerName: %s, partitions: %s%n", name,formatPartitions(partitions));}@OverridepublicvoidonPartitionsAssigned(Collection<TopicPartition> partitions){System.out.printf("onPartitionsAssigned - consumerName: %s, partitions: %s%n", name,formatPartitions(partitions));}});System.out.printf("starting consumerName: %s%n", name);
consumer.poll(Duration.ofSeconds(10));System.out.printf("closing consumerName: %s%n", name);
consumer.close();return consumer;}privatestaticList<String>formatPartitions(Collection<TopicPartition> partitions){return partitions.stream().map(topicPartition ->String.format("topic: %s, partition: %s", topicPartition.topic(), topicPartition.partition())).collect(Collectors.toList());}}
在这个示例中,我们首先启动了一个消费者实例,随后每隔 3 秒启动一个新的消费者实例。可以看到,随着消费者实例的增加和关闭,分区的分配也在不断变化。通过
ConsumerRebalanceListener
,我们可以在分区分配变化时执行一些自定义操作。
总结
通过本文的示例,我们可以看到
ConsumerRebalanceListener
在 Kafka 中的应用。它允许我们在分区重新平衡时执行一些自定义操作,从而更好地管理消费者组中的分区分配。这对于确保数据的一致性和可靠性至关重要。
项目依赖和技术
以下是本示例项目中使用的一些关键依赖和技术:
- Apache Kafka 2.5.0
- JDK 8
- Maven 3.5.4
希望本文能帮助你更好地理解 Kafka 中的分区重新平衡机制以及如何使用
ConsumerRebalanceListener
。
版权归原作者 t0_54coder 所有, 如有侵权,请联系我们删除。