文章目录
- 文档内出现的
${KAFKA_BROKERS}
表示 kafka 的连接地址,${ZOOKEEPER_CONNECT}
表示 zk 的连接地址,需要替换成自己的实际 ip 地址
创建一个演示 topic
kafka-topics.sh --create--zookeeper${ZOOKEEPER_CONNECT} --replication-factor 1--partitions3--topic test-topic-update
查看 topic 详情
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS}--describe--topic test-topic-update
总共是六个 kafka 节点,三分区一副本,分散在三个不同的 kafka 节点
Topic:test-topic-update PartitionCount:3 ReplicationFactor:1 Configs:segment.bytes=1073741824
Topic: test-topic-update Partition: 0 Leader: 5 Replicas: 5 Isr: 5
Topic: test-topic-update Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: test-topic-update Partition: 2 Leader: 0 Replicas: 0 Isr: 0
- 关于输出内容的概念 -
分区(Partition)
: - 主题(Topic)在 Kafka 中的数据被分成一个或多个分区。每个分区是一个有序且持久化的消息日志。- 分区允许 Kafka 集群进行水平扩展,使多个消费者能够并行地处理主题的消息。- 消费者组中的每个消费者负责处理一个或多个分区的消息。-领导者(Leader)
: - 每个分区都有一个领导者,领导者负责处理该分区的所有读写请求。- 生产者向领导者发送消息,消费者从领导者读取消息。- 领导者也负责维护分区的复制和同步。-副本(Replicas)
: - 为了提高数据的冗余和可用性,每个分区可以有多个副本,包括一个领导者副本和零个或多个追随者副本。- 领导者副本处理写请求,追随者副本用于数据冗余和读请求。-同步副本集(In-Sync Replicas,ISR)
: - 同步副本集是指在分区的所有副本中,与领导者副本保持同步的副本。- 领导者和同步副本集中的副本是可用于读取的,其他追随者副本可能会有一些延迟。
生产一些数据
- 手动生产 300 条数据
kafka-verifiable-producer.sh --broker-list ${KAFKA_BROKERS}--topic test-topic-update --max-messages 300
使用消费者组消费数据
- 消费者组不存在的情况下,没有返回被消费的数据,过两三秒之后,可以中断这个命令,然后使用下面的
--describe
来验证
kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS}--topic test-topic-update --group test-topic-update-group
查看消费组内的 topic 消费情况
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS}--describe--group test-topic-update-group
目前三百条都被消费了,使用上面的生产数据的命令,再生产300条,模拟 topic 有数据的场景
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 2 100 100 0 - - -
test-topic-update-group test-topic-update 0 100 100 0 - - -
test-topic-update-group test-topic-update 1 100 100 0 - - -
生产完数据后,再次查看,返回结果如下
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 2 100 200 100 - - -
test-topic-update-group test-topic-update 0 100 200 100 - - -
test-topic-update-group test-topic-update 1 100 200 100 - - -
增加分区
- 在增加分区的场景下比较方便,直接使用
--alter
就能实现,这里将原来的 3 分区改成 12 分区
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS}--alter--topic test-topic-update --partitions12
无新数据产生,有旧数据未消费
查看 topic 情况
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS}--describe--topic test-topic-update
可以看到,分区已经更新成 12 个了,也可以看出,kafka 在动态增加分区的时候,是均分的,都会按照类似下面的 5-1-0-3-2-4 这样的顺序去均分(当然,前提是分区数和节点数是倍数关系)
Topic:test-topic-update PartitionCount:12 ReplicationFactor:1 Configs:segment.bytes=1073741824
Topic: test-topic-update Partition: 0 Leader: 5 Replicas: 5 Isr: 5
Topic: test-topic-update Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: test-topic-update Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-update Partition: 3 Leader: 3 Replicas: 3 Isr: 3
Topic: test-topic-update Partition: 4 Leader: 2 Replicas: 2 Isr: 2
Topic: test-topic-update Partition: 5 Leader: 4 Replicas: 4 Isr: 4
Topic: test-topic-update Partition: 6 Leader: 5 Replicas: 5 Isr: 5
Topic: test-topic-update Partition: 7 Leader: 1 Replicas: 1 Isr: 1
Topic: test-topic-update Partition: 8 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-update Partition: 9 Leader: 3 Replicas: 3 Isr: 3
Topic: test-topic-update Partition: 10 Leader: 2 Replicas: 2 Isr: 2
Topic: test-topic-update Partition: 11 Leader: 4 Replicas: 4 Isr: 4
查看消费组内的分区情况
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS}--describe--group test-topic-update-group
因为没有新数据进入,也没有消费旧数据,此时还是显示的原先的信息
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 2 100 200 100 - - -
test-topic-update-group test-topic-update 0 100 200 100 - - -
test-topic-update-group test-topic-update 1 100 200 100 - - -
将未消费的 300 条数据进行消费
kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS}--topic test-topic-update --group test-topic-update-group --max-messages 300
消费完成后,再次查看消费组的情况
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS}--describe--group test-topic-update-group
此时就变成正常的 12 分区了
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 0 100 100 0 - - -
test-topic-update-group test-topic-update 7 0 0 0 - - -
test-topic-update-group test-topic-update 5 0 0 0 - - -
test-topic-update-group test-topic-update 1 100 100 0 - - -
test-topic-update-group test-topic-update 6 0 0 0 - - -
test-topic-update-group test-topic-update 2 100 100 0 - - -
test-topic-update-group test-topic-update 3 0 0 0 - - -
test-topic-update-group test-topic-update 10 0 0 0 - - -
test-topic-update-group test-topic-update 9 0 0 0 - - -
test-topic-update-group test-topic-update 8 0 0 0 - - -
test-topic-update-group test-topic-update 11 0 0 0 - - -
test-topic-update-group test-topic-update 4 0 0 0 - - -
这里为了方便验证,我把 topic 删了后重建了,下面这个删除 topic 的命令,大家别随意执行,会删除数据的
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS}--delete--topic test-topic-update
有新数据产生,有旧数据未消费
- 同样,先扩容分区
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS}--alter--topic test-topic-update --partitions12
未生产新数据的时候,查看消费者组的信息同样是没有更新分区信息
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS}--describe--group test-topic-update-group
此时,手动使用命令模拟新数据进来
kafka-verifiable-producer.sh --broker-list ${KAFKA_BROKERS}--topic test-topic-update --max-messages 100
通过命令查看消费者组的情况
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS}--describe--group test-topic-update-group
此时显示的是老分区,而且只显示了 8+8+9=25 条数据
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 2 100 108 8 - - -
test-topic-update-group test-topic-update 0 100 108 8 - - -
test-topic-update-group test-topic-update 1 100 109 9 - - -
手动消费一下数据试试
kafka-console-consumer.sh --bootstrap-server ${KAFKA_BROKERS}--topic test-topic-update --group test-topic-update-group --max-messages 100
发现返回的信息里面,只显示25条数据
kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS}--describe--group test-topic-update-group
但是观察消费者组的情况,显示的是都消费了,看起来,应该是和 topic 加入新消费者组的情况一样,不展示,但实际消费数据了(这块是个人的理解,具体的原理需要有兴趣的大佬深究一下,希望能赐教带我飞)
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic-update-group test-topic-update 0 108 108 0 - - -
test-topic-update-group test-topic-update 7 9 9 0 - - -
test-topic-update-group test-topic-update 5 8 8 0 - - -
test-topic-update-group test-topic-update 1 109 109 0 - - -
test-topic-update-group test-topic-update 6 9 9 0 - - -
test-topic-update-group test-topic-update 2 108 108 0 - - -
test-topic-update-group test-topic-update 3 8 8 0 - - -
test-topic-update-group test-topic-update 10 8 8 0 - - -
test-topic-update-group test-topic-update 9 8 8 0 - - -
test-topic-update-group test-topic-update 8 8 8 0 - - -
test-topic-update-group test-topic-update 11 8 8 0 - - -
test-topic-update-group test-topic-update 4 9 9 0 - - -
增加副本
创建 json 文件
kafka-reassign-partitions.sh
是 Kafka 提供的命令行工具,用于重新分配主题分区的副本。这个工具允许你重新定义主题分区副本的分布,以实现负载均衡、故障恢复或集群扩展等目的
之前的 topic 是 1 副本,12 分区,按照之前的 5-1-0-3-2-4 的顺序来分配第一个副本,然后按照 4-3-2-0-1-5 的顺序来分配第二个副本,我这里的 json 文件就命名为:add_rep_test_topic_update.json,大家可以以自己实际来命名
{"version":1,"partitions":[{"topic":"test-topic-update","partition":0,"replicas":[5,4]},{"topic":"test-topic-update","partition":1,"replicas":[1,3]},{"topic":"test-topic-update","partition":2,"replicas":[0,2]},{"topic":"test-topic-update","partition":3,"replicas":[3,0]},{"topic":"test-topic-update","partition":4,"replicas":[2,1]},{"topic":"test-topic-update","partition":5,"replicas":[4,5]},{"topic":"test-topic-update","partition":6,"replicas":[5,4]},{"topic":"test-topic-update","partition":7,"replicas":[1,3]},{"topic":"test-topic-update","partition":8,"replicas":[0,2]},{"topic":"test-topic-update","partition":9,"replicas":[3,0]},{"topic":"test-topic-update","partition":10,"replicas":[2,1]},{"topic":"test-topic-update","partition":11,"replicas":[4,5]}]}
使用指定的 json 文件增加 topic 的副本数
kafka-reassign-partitions.sh --zookeeper${ZOOKEEPER_CONNECT}--execute --reassignment-json-file add_rep_test_topic_update.json
使用指定的 json 文件查看 topic 的副本数增加的进度
kafka-reassign-partitions.sh --zookeeper${ZOOKEEPER_CONNECT}--verify --reassignment-json-file add_rep_test_topic_update.json
通过命令返回的内容,可以看出都成功了
Reassignment of partition test-topic-update-0 completed successfully
Reassignment of partition test-topic-update-7 completed successfully
Reassignment of partition test-topic-update-5 completed successfully
Reassignment of partition test-topic-update-1 completed successfully
Reassignment of partition test-topic-update-6 completed successfully
Reassignment of partition test-topic-update-2 completed successfully
Reassignment of partition test-topic-update-3 completed successfully
Reassignment of partition test-topic-update-10 completed successfully
Reassignment of partition test-topic-update-9 completed successfully
Reassignment of partition test-topic-update-8 completed successfully
Reassignment of partition test-topic-update-11 completed successfully
Reassignment of partition test-topic-update-4 completed successfully
查看 topic 情况
kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS}--describe--topic test-topic-update
现在的 topic 变成了 12 分区,2 副本的状态了
Topic:test-topic-update PartitionCount:12 ReplicationFactor:2 Configs:segment.bytes=1073741824
Topic: test-topic-update Partition: 0 Leader: 5 Replicas: 5,4 Isr: 5,4
Topic: test-topic-update Partition: 1 Leader: 1 Replicas: 1,3 Isr: 3,1
Topic: test-topic-update Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: test-topic-update Partition: 3 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: test-topic-update Partition: 4 Leader: 1 Replicas: 2,1 Isr: 1,2
Topic: test-topic-update Partition: 5 Leader: 4 Replicas: 4,5 Isr: 5,4
Topic: test-topic-update Partition: 6 Leader: 5 Replicas: 5,4 Isr: 4,5
Topic: test-topic-update Partition: 7 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: test-topic-update Partition: 8 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: test-topic-update Partition: 9 Leader: 3 Replicas: 3,0 Isr: 0,3
Topic: test-topic-update Partition: 10 Leader: 1 Replicas: 2,1 Isr: 1,2
Topic: test-topic-update Partition: 11 Leader: 4 Replicas: 4,5 Isr: 4,5
版权归原作者 陈小肚 所有, 如有侵权,请联系我们删除。