为了在三台虚拟机上部署ZooKeeper和Kafka集群,每台主机上各有一个ZooKeeper实例和一个Kafka实例,你可以使用
docker-compose
来简化这一过程。下面是一个基本的示例配置,用于在每台主机上部署ZooKeeper和Kafka。
准备工作
- 确保每台虚拟机都安装了Docker和Docker Compose。
- 确定每台虚拟机的IP地址: - 192.168.88.130- 192.168.88.131- 192.168.88.132
1、创建 Docker Compose 文件
在每台虚拟机上创建一个
docker-compose.yml
文件。这个文件将定义ZooKeeper和Kafka的服务。
docker-compose.yml
节点1
version: '3'
services:
zk1:
container_name: zk1
hostname: zk1
image: wurstmeister/zookeeper:latest
restart: always
environment:
- ZOO_MY_ID=1
- ZOO_SERVERS=server.1=0.0.0.0:2888:3888,server.2=192.168.88.131:2888:3888,server.3=192.168.88.132:2888:3888
volumes:
- ./zk_conf/zoo.cfg:/opt/zookeeper-3.4.13/conf/zoo.cfg
- ./zk_data:/opt/zookeeper-3.4.13/data
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka1:
container_name: kafka1
hostname: kafka1
image: wurstmeister/kafka:latest
restart: always
environment:
- KAFKA_BROKER_ID=1
- KAFKA_MIN_INSYNC_REPLICAS=1
- KAFKA_DEFAULT_REPLICATION_FACTOR=3
- KAFKA_ZOOKEEPER_CONNECT=192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.88.130:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
ports:
- 9092:9092
节点2
version: '3'
services:
zk2:
container_name: zk2
hostname: zk2
image: wurstmeister/zookeeper:latest
restart: always
environment:
- ZOO_MY_ID=2
- ZOO_SERVERS=server.1=192.168.88.130:2888:3888,server.2=0.0.0.0:2888:3888,server.3=192.168.88.132:2888:3888
volumes:
- ./zk_conf/zoo.cfg:/opt/zookeeper-3.4.13/conf/zoo.cfg
- ./zk_data:/opt/zookeeper-3.4.13/data
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka2:
container_name: kafka2
hostname: kafka2
image: wurstmeister/kafka:latest
restart: always
environment:
- KAFKA_BROKER_ID=2
- KAFKA_MIN_INSYNC_REPLICAS=1
- KAFKA_DEFAULT_REPLICATION_FACTOR=3
- KAFKA_ZOOKEEPER_CONNECT=192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.88.131:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
ports:
- 9092:9092
节点3
version: '3'
services:
zk3:
container_name: zk3
hostname: zk3
image: wurstmeister/zookeeper:latest
restart: always
environment:
- ZOO_MY_ID=3
- ZOO_SERVERS=server.1=192.168.88.130:2888:3888,server.2=192.168.88.131:2888:3888,server.3=0.0.0.0:2888:3888
volumes:
- ./zk_conf/zoo.cfg:/opt/zookeeper-3.4.13/conf/zoo.cfg
- ./zk_data:/opt/zookeeper-3.4.13/data
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka3:
container_name: kafka3
hostname: kafka3
image: wurstmeister/kafka:latest
restart: always
environment:
- KAFKA_BROKER_ID=3
- KAFKA_MIN_INSYNC_REPLICAS=1
- KAFKA_DEFAULT_REPLICATION_FACTOR=3
- KAFKA_ZOOKEEPER_CONNECT=192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.88.132:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
ports:
- 9092:9092
2、创建配置文件
根据每台主机的情况,你需要修改compose文件中环境变量,以确保每台主机上的ZooKeeper和Kafka实例具有唯一的ID。每台需要修改为本机IP,集群配置不变。
在每台主机上面创建目录
mkdir /data/zk_kafka/zk_conf -pmkdir /data/zk_kafka/zk_data
cd /data/zk_kafka
vim zk_conf/zoo.cfg # 三台主机都需要创建tickTime=2000initLimit=10syncLimit=5dataDir=/opt/zookeeper-3.4.13/data
clientPort=2181autopurge.snapRetainCount=3autopurge.purgeInterval=1server.1=0.0.0.0:2888:3888 # 节点1配置server.2=192.168.88.131:2888:3888 # 节点2到时配置0.0.0.0,节点3一致这样配置。server.3=192.168.88.132:2888:3888
# 第一台echo1> zk_data/myid
# 第二台echo2> zk_data/myid
# 第三台echo3> zk_data/myid
3、启动 Docker Compose
在每台虚拟机上,使用以下命令启动Docker Compose:
docker-composeps
NAME COMMAND SERVICE STATUS PORTS
kafka1 "start-kafka.sh" kafka1 running 0.0.0.0:9092->9092/tcp
zk1 "/bin/sh -c '/usr/sb…" zk1 running 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp
这将启动ZooKeeper和Kafka服务,并将它们作为守护进程在后台运行。
4、验证ZooKeeper
- 使用ZooKeeper客户端工具(如
zkCli.sh
)连接到ZooKeeper。
要验证ZooKeeper是否正常运行,你可以使用ZooKeeper自带的命令行工具
zkCli.sh
来进行一些基本的操作,比如查看集群状态、检查节点数据等。以下是如何验证ZooKeeper集群是否正常运行的步骤:
验证zookeeper角色
# 节点1dockerexec-it zk1 bash
./bin/zkServer.sh status
# 输出结果
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower
# 节点2dockerexec-it zk2 bash
./bin/zkServer.sh status
# 输出结果
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower
# 节点3dockerexec-it zk3 bash
./bin/zkServer.sh status
# 输出结果
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: leader
可以看到zookeeper三台集群各自的角色。
验证集群同步
./bin/zkCli.sh
create /test "hello zookeeper"# 输出结果
Created /test
查看临时节点
get /test
# 输出结果
hello zookeeper
cZxid = 0x200000002
ctime = Fri Sep 06 08:06:31 UTC 2024
mZxid = 0x200000002
mtime = Fri Sep 06 08:06:31 UTC 2024
pZxid = 0x200000002
cversion =0
dataVersion =0
aclVersion =0
ephemeralOwner = 0x0
dataLength =15
numChildren =0
连接到其他ZooKeeper实例,并检查刚才创建的临时节点是否存在:
dockerexec-it zk2 bash
./bin/zkCli.sh
get /test
# 输出结果
hello zookeeper
cZxid = 0x200000002
ctime = Fri Sep 06 08:06:31 UTC 2024
mZxid = 0x200000002
mtime = Fri Sep 06 08:06:31 UTC 2024
pZxid = 0x200000002
cversion =0
dataVersion =0
aclVersion =0
ephemeralOwner = 0x0
dataLength =15
numChildren =0
如果其他ZooKeeper实例也显示了这个临时节点,则说明集群同步正常。
总结
通过以上步骤,你可以验证ZooKeeper是否正常运行,并检查集群的状态。如果一切正常,你应该能看到集群成员的角色以及数据同步情况。如果遇到任何问题,可以通过查看ZooKeeper的日志文件或使用更详细的命令来诊断问题。
5、验证Kafka
- 使用Kafka命令行工具(如
kafka-topics.sh
)创建主题并验证集群状态。
5.1 创建主题
dockerexec-it kafka1 bashcd /opt/kafka/
./bin/kafka-topics.sh --create--zookeeper192.168.88.130:2181 --topictest--partitions3 --replication-factor 3# 输出结果
Created topic test.
这里创建了一个名为
test-topic
的主题,它有3个分区和3个副本,以确保高可用性。
验证主题是否成功创建:
# 到另外两台查看新创建的test# 查看topics# 192.168.88.131
./bin/kafka-topics.sh --list--zookeeper192.168.88.131:2181
test# 192.168.88.132
./bin/kafka-topics.sh --list--zookeeper192.168.88.132:2181
test
你应该能看到刚刚创建的主题
test
。
查看主题的详细信息:
./bin/kafka-topics.sh --describe--topictest --bootstrap-server 192.168.88.130:9092
# 输出结果
Topic: test TopicId: jKMCnaYeSvayrZatlgZp8A PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
这段信息描述了一个名为
test
的 Kafka 主题的配置和分区状态。以下是对这些信息的详细解释:
主题配置
- Topic:
test
- TopicId:
jKMCnaYeSvayrZatlgZp8A
(这是主题的唯一标识符) - PartitionCount:
3
(主题被分为3个分区) - ReplicationFactor:
3
(每个分区的副本因子为3,意味着每个分区的数据会被复制到3个不同的Kafka broker上) - Configs: -
min.insync.replicas=2
: 表示生产者在发送消息时,至少需要有2个副本同步确认后才算成功。-segment.bytes=1073741824
: 每个分区的segment文件的最大大小为1GB。 分区状态 - Partition 0: - Leader:
1
(Broker 1是该分区的领导者,负责处理所有对该分区的读写请求)- Replicas:1,2,3
(该分区的副本存储在Broker 1, 2, 3上)- Isr:1,2,3
(In-Sync Replicas,即与领导者同步的副本列表,这里所有副本都是同步的) - Partition 1: - Leader:
2
(Broker 2是该分区的领导者)- Replicas:2,3,1
- Isr:2,3,1
- Partition 2: - Leader:
3
(Broker 3是该分区的领导者)- Replicas:3,1,2
- Isr:3,1,2
这个主题的配置和状态显示了一个典型的高可用性和高可靠性的Kafka主题设置。如果你需要进行任何操作,如调整配置或监控状态,这些信息都是非常重要的基础数据。
这些信息对于监控和维护Kafka集群的健康状态非常重要。例如,如果某个分区的ISR列表中缺少某些副本,可能意味着这些副本落后于领导者,或者存在其他同步问题。在这种情况下,可能需要进一步的调查和干预。
5.2 发布消息
向主题发布一条消息:
./bin/kafka-console-producer.sh --broker-list 192.168.88.130:9092 --topictest
然后输入一些消息,例如:
>hello kafka
按回车键发送每条消息。
消费消息
同时在另一个终端窗口中,启动一个消费者来消费消息:
# 节点2
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.131:9092 --topictest --from-beginning
# 输出结果
hello kafka
验证Kafka集群的状态,确保所有的Broker都在集群中并且正常工作:
./bin/kafka-topics.sh --describe --bootstrap-server 192.168.88.131:9092
......
检查输出结果中的
Leader
和
Replicas
部分,确保每个分区都有一个Leader,并且所有的副本都在同步中。
总结
通过以上步骤,你可以验证Kafka集群是否正常运行,并检查集群的状态。如果一切正常,你应该能看到主题列表、主题描述、发布的消息以及消费的消息。如果遇到任何问题,可以通过查看Kafka Broker的日志文件或使用更详细的命令来诊断问题。
版权归原作者 云卷云舒处 所有, 如有侵权,请联系我们删除。