0


kafka消费者组分区分配实战

kafka消费者组分区分配实战

问题引入:一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据

Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky

可以通过配置参数partition.assignment.strategy,修改分区的分配策略。Kafka可以同时使用多个分区分配策略

说明:默认策略是Range + CooperativeSticky

创建分区

  1. [root@VM-16-3-centos bin]# ./kafka-topics.sh --create --bootstrap-server 150.158.33.191:9092 --replication-factor 1 --partitions 7 --topic test
  2. Created topic test.
  3. [root@VM-16-3-centos bin]#

查看分区

  1. [root@VM-16-3-centos bin]# ./kafka-topics.sh --describe --bootstrap-server 150.158.33.191:9092 --topic test
  2. Topic: test TopicId: l-eKz8zgRlmnhTVEh854wA PartitionCount: 7 ReplicationFactor: 1 Configs: segment.bytes=1073741824
  3. Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  4. Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  5. Topic: test Partition: 2 Leader: 1 Replicas: 1 Isr: 1
  6. Topic: test Partition: 3 Leader: 1 Replicas: 1 Isr: 1
  7. Topic: test Partition: 4 Leader: 1 Replicas: 1 Isr: 1
  8. Topic: test Partition: 5 Leader: 1 Replicas: 1 Isr: 1
  9. Topic: test Partition: 6 Leader: 1 Replicas: 1 Isr: 1
  10. [root@VM-16-3-centos bin]#

消费者代码

  1. publicclassCommon{publicstaticPropertiesgetProperties(){Logger logger =(Logger)LoggerFactory.getLogger("ROOT");
  2. logger.setLevel(Level.INFO);//配置Properties properties =newProperties();//连接集群
  3. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"150.158.33.191:9092");//反序列化
  4. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
  5. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组ID 可以任意起
  6. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-partition-assignment-strategy");//设置分区策略,一共有四种 Range、RoundRobin、Sticky、CooperativeSticky,默认策略是Range + CooperativeSticky//Range分区策略// properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");//RoundRobin分区策略// properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");//Sticky分区策略
  7. properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");//CooperativeSticky分区策略// properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");return properties;}}

Consumer1

  1. @Slf4jpublicclassConsumer1{publicstaticvoidmain(String[] args){Properties properties =Common.getProperties();//1.创建一个消费者KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);//2.订阅主题ArrayList<String> topics =newArrayList<>();
  2. topics.add("test");
  3. kafkaConsumer.subscribe(topics);//3.消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(3));//打印消费的数据 consumerRecords
  4. log.info("consumerRecords:{}", JSON.toJSONString(consumerRecords));}}}

Consumer2

  1. @Slf4jpublicclassConsumer2{publicstaticvoidmain(String[] args){Properties properties =Common.getProperties();//1.创建一个消费者KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);//2.订阅主题ArrayList<String> topics =newArrayList<>();
  2. topics.add("test");
  3. kafkaConsumer.subscribe(topics);//3.消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(3));//每1秒拉取一批数据//打印消费的数据 consumerRecords
  4. log.info("consumerRecords:{}", JSON.toJSONString(consumerRecords));}}}

Consumer3

  1. @Slf4jpublicclassConsumer3{publicstaticvoidmain(String[] args){Properties properties =Common.getProperties();//1.创建一个消费者KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<>(properties);//2.订阅主题ArrayList<String> topics =newArrayList<>();
  2. topics.add("test");
  3. kafkaConsumer.subscribe(topics);//3.消费数据while(true){ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(3));//打印消费的数据 consumerRecords
  4. log.info("consumerRecords:{}", JSON.toJSONString(consumerRecords));}}}

RangeAssignor范围分配策略

Range是对每个topic而言的,首先对同一个topic里面的分区按照序号进行排序,而消费者按照字母顺序进行排序。
例如:7个分区,3个消费者,排序后分区为0,1,2,3,4,5,6;消费者排序之后为C0,C1,C2

通过分区数 / 消费者数来决定每个消费者消费几个分区,如果除不尽,那么前几个消费者将会多消费数据
例如: 7/3 = 2 余 1,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个

优点:分区连续
缺点:数据量如果非常大,容易造成数据倾斜
例如:如果有 N 多个 topic,那么针对每个topic,消费者 C0都将多消费 1 个分区,topic越多,C0消 费的分区会比其他消费者明显多消费 N 个分区

通过启动日志可以看到分区分配策略为RangeAssignor

  1. 16:26:33.181 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
  2. ...
  3. group.id = test-partition-assignment-strategy
  4. ...
  5. partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
  6. ...

启动消费者1后由于消费者组内只有一个消费者,所以所有分区都分配给消费者1

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test-partition-assignment-strategy test 6 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  4. test-partition-assignment-strategy test 0 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  5. test-partition-assignment-strategy test 5 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  6. test-partition-assignment-strategy test 1 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  7. test-partition-assignment-strategy test 4 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  8. test-partition-assignment-strategy test 3 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  9. test-partition-assignment-strategy test 2 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  10. [root@VM-16-3-centos bin]#

启动消费者2后触发再平衡,将后4个分配给了消费者1,后3个分配给了消费者2

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test-partition-assignment-strategy test 0 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  4. test-partition-assignment-strategy test 1 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  5. test-partition-assignment-strategy test 2 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  6. test-partition-assignment-strategy test 3 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  7. test-partition-assignment-strategy test 4 0 0 0 consumer-1-33f526a4-e034-4809-9bca-46f2bd817196 /222.222.120.75 consumer-1
  8. test-partition-assignment-strategy test 5 0 0 0 consumer-1-33f526a4-e034-4809-9bca-46f2bd817196 /222.222.120.75 consumer-1
  9. test-partition-assignment-strategy test 6 0 0 0 consumer-1-33f526a4-e034-4809-9bca-46f2bd817196 /222.222.120.75 consumer-1
  10. [root@VM-16-3-centos bin]#

启动消费者3后再次触发再平衡,前三个给消费者1,中间2个给消费者2,后面2个给消费者3

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test-partition-assignment-strategy test 0 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  4. test-partition-assignment-strategy test 1 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  5. test-partition-assignment-strategy test 2 0 0 0 consumer-1-2b3167af-5a69-462c-b687-c935f45e9736 /222.222.120.75 consumer-1
  6. test-partition-assignment-strategy test 3 0 0 0 consumer-1-33f526a4-e034-4809-9bca-46f2bd817196 /222.222.120.75 consumer-1
  7. test-partition-assignment-strategy test 4 0 0 0 consumer-1-33f526a4-e034-4809-9bca-46f2bd817196 /222.222.120.75 consumer-1
  8. test-partition-assignment-strategy test 5 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  9. test-partition-assignment-strategy test 6 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  10. [root@VM-16-3-centos bin]#

停掉消费者1,重新分配,前4个给消费者2,后3个给消费者3

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test-partition-assignment-strategy test 0 0 0 0 consumer-1-33f526a4-e034-4809-9bca-46f2bd817196 /222.222.120.75 consumer-1
  4. test-partition-assignment-strategy test 1 0 0 0 consumer-1-33f526a4-e034-4809-9bca-46f2bd817196 /222.222.120.75 consumer-1
  5. test-partition-assignment-strategy test 2 0 0 0 consumer-1-33f526a4-e034-4809-9bca-46f2bd817196 /222.222.120.75 consumer-1
  6. test-partition-assignment-strategy test 3 0 0 0 consumer-1-33f526a4-e034-4809-9bca-46f2bd817196 /222.222.120.75 consumer-1
  7. test-partition-assignment-strategy test 4 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  8. test-partition-assignment-strategy test 5 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  9. test-partition-assignment-strategy test 6 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  10. [root@VM-16-3-centos bin]#

停掉消费者2

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test-partition-assignment-strategy test 6 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  4. test-partition-assignment-strategy test 0 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  5. test-partition-assignment-strategy test 5 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  6. test-partition-assignment-strategy test 1 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  7. test-partition-assignment-strategy test 4 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  8. test-partition-assignment-strategy test 3 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  9. test-partition-assignment-strategy test 2 0 0 0 consumer-1-62bc74c4-5dd2-4b66-84ce-cd5fd47c3144 /183.196.130.69 consumer-1
  10. [root@VM-16-3-centos bin]#

停掉消费者3

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. Consumer group 'test-partition-assignment-strategy' has no active members.
  3. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  4. test-partition-assignment-strategy test 1 0 0 0 - - -
  5. test-partition-assignment-strategy test 0 0 0 0 - - -
  6. test-partition-assignment-strategy test 3 0 0 0 - - -
  7. test-partition-assignment-strategy test 2 0 0 0 - - -
  8. test-partition-assignment-strategy test 5 0 0 0 - - -
  9. test-partition-assignment-strategy test 4 0 0 0 - - -
  10. test-partition-assignment-strategy test 6 0 0 0 - - -
  11. [root@VM-16-3-centos bin]#

另外在再平衡期间还能看到

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. Warning: Consumer group 'test-partition-assignment-strategy' is rebalancing.
  3. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  4. test-partition-assignment-strategy test 1 0 0 0 - - -
  5. test-partition-assignment-strategy test 0 0 0 0 - - -
  6. test-partition-assignment-strategy test 3 0 0 0 - - -
  7. test-partition-assignment-strategy test 2 0 0 0 - - -
  8. test-partition-assignment-strategy test 5 0 0 0 - - -
  9. test-partition-assignment-strategy test 4 0 0 0 - - -
  10. test-partition-assignment-strategy test 6 0 0 0 - - -
  11. [root@VM-16-3-centos bin]#

RoundRobin轮训策略

RoundRobin 针对集群中所有Topic而言,RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者
例如:C0消费0号分区,C1消费1号分区,C2消费2号分区,C0消费3号分区,以此类推

优点:负载均衡

  1. 17:18:42.829 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
  2. ...
  3. group.id = test-partition-assignment-strategy
  4. ...
  5. partition.assignment.strategy = [org.apache.kafka.clients.consumer.RoundRobinAssignor]
  6. ...

启动消费者1

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test-partition-assignment-strategy test 6 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  4. test-partition-assignment-strategy test 0 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  5. test-partition-assignment-strategy test 5 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  6. test-partition-assignment-strategy test 1 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  7. test-partition-assignment-strategy test 4 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  8. test-partition-assignment-strategy test 3 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  9. test-partition-assignment-strategy test 2 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  10. [root@VM-16-3-centos bin]#

启动消费者2

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test-partition-assignment-strategy test 0 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  4. test-partition-assignment-strategy test 2 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  5. test-partition-assignment-strategy test 4 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  6. test-partition-assignment-strategy test 6 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  7. test-partition-assignment-strategy test 1 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  8. test-partition-assignment-strategy test 3 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  9. test-partition-assignment-strategy test 5 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  10. [root@VM-16-3-centos bin]#

启动消费者3

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test-partition-assignment-strategy test 0 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  4. test-partition-assignment-strategy test 3 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  5. test-partition-assignment-strategy test 6 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  6. test-partition-assignment-strategy test 2 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  7. test-partition-assignment-strategy test 5 0 0 0 consumer-1-b7ad17fe-940b-4854-a08f-11770c7319b1 /222.222.120.75 consumer-1
  8. test-partition-assignment-strategy test 1 0 0 0 consumer-1-9b44d652-996d-4e0a-a159-0766430fb915 /183.196.130.69 consumer-1
  9. test-partition-assignment-strategy test 4 0 0 0 consumer-1-9b44d652-996d-4e0a-a159-0766430fb915 /183.196.130.69 consumer-1
  10. [root@VM-16-3-centos bin]#

停用消费者1

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test-partition-assignment-strategy test 0 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  4. test-partition-assignment-strategy test 2 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  5. test-partition-assignment-strategy test 4 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  6. test-partition-assignment-strategy test 6 0 0 0 consumer-1-5f8a7dcf-4772-4c72-b4c1-b0c17430c164 /222.222.120.75 consumer-1
  7. test-partition-assignment-strategy test 1 0 0 0 consumer-1-9b44d652-996d-4e0a-a159-0766430fb915 /183.196.130.69 consumer-1
  8. test-partition-assignment-strategy test 3 0 0 0 consumer-1-9b44d652-996d-4e0a-a159-0766430fb915 /183.196.130.69 consumer-1
  9. test-partition-assignment-strategy test 5 0 0 0 consumer-1-9b44d652-996d-4e0a-a159-0766430fb915 /183.196.130.69 consumer-1
  10. [root@VM-16-3-centos bin]#

Sticky粘性分区策略

Sticky(粘性)分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销

修改分配策略

  1. 17:44:21.267 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
  2. ...
  3. group.id = test-partition-assignment-strategy
  4. ...
  5. partition.assignment.strategy = [org.apache.kafka.clients.consumer.StickyAssignor]
  6. ...

启动消费者1

  1. [root@VM-16-3-centos bin]# sh /usr/local/kafka/kafka_2.11-2.4.1/bin/kafka-consumer-groups.sh --bootstrap-server 150.158.33.191:9092 --describe --group test-partition-assignment-strategy
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test-partition-assignment-strategy test 6 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  4. test-partition-assignment-strategy test 0 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  5. test-partition-assignment-strategy test 5 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  6. test-partition-assignment-strategy test 1 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  7. test-partition-assignment-strategy test 4 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  8. test-partition-assignment-strategy test 3 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  9. test-partition-assignment-strategy test 2 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  10. [root@VM-16-3-centos bin]#

启动消费者2

  1. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  2. test-partition-assignment-strategy test 2 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  3. test-partition-assignment-strategy test 4 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  4. test-partition-assignment-strategy test 5 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  5. test-partition-assignment-strategy test 6 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  6. test-partition-assignment-strategy test 0 0 0 0 consumer-1-4df266ba-d439-4328-bf2b-cf90e0ff2b32 /183.196.130.69 consumer-1
  7. test-partition-assignment-strategy test 1 0 0 0 consumer-1-4df266ba-d439-4328-bf2b-cf90e0ff2b32 /183.196.130.69 consumer-1
  8. test-partition-assignment-strategy test 3 0 0 0 consumer-1-4df266ba-d439-4328-bf2b-cf90e0ff2b32 /183.196.130.69 consumer-1
  9. [root@VM-16-3-centos bin]#

启动消费者3,从这里可以看出cf90e0ff2b32没有动,从6b9c48068869中分了2个给0f93b7d3aa77.体现了尽量少的调整

  1. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  2. test-partition-assignment-strategy test 0 0 0 0 consumer-1-4df266ba-d439-4328-bf2b-cf90e0ff2b32 /183.196.130.69 consumer-1
  3. test-partition-assignment-strategy test 1 0 0 0 consumer-1-4df266ba-d439-4328-bf2b-cf90e0ff2b32 /183.196.130.69 consumer-1
  4. test-partition-assignment-strategy test 3 0 0 0 consumer-1-4df266ba-d439-4328-bf2b-cf90e0ff2b32 /183.196.130.69 consumer-1
  5. test-partition-assignment-strategy test 2 0 0 0 consumer-1-91e69dba-1789-4040-a103-0f93b7d3aa77 /183.196.130.69 consumer-1
  6. test-partition-assignment-strategy test 5 0 0 0 consumer-1-91e69dba-1789-4040-a103-0f93b7d3aa77 /183.196.130.69 consumer-1
  7. test-partition-assignment-strategy test 4 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  8. test-partition-assignment-strategy test 6 0 0 0 consumer-1-90751324-f462-42ac-b069-6b9c48068869 /222.222.120.75 consumer-1
  9. [root@VM-16-3-centos bin]#
标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/weixin_41883161/article/details/142105228
版权归原作者 九转成圣 所有, 如有侵权,请联系我们删除。

“kafka消费者组分区分配实战”的评论:

还没有评论