1.kafka版本 3.6.1
2.docker 拉取kafka镜像
2.1启动docker,打开cmd输入以下命令,查看docker界面,已经拉取了镜像
docker pull bitnami/kafka:3.6.1
3.设置网络
3.1 在cmd中输入以下命令,这将使用指定的IP地址范围和网关地址创建一个名为netkafka****的自定义网络。这个网络将在后续的Kafka Broker容器配置中使用。
docker network create --subnet=172.23.0.0/25 --gateway=172.23.0.1 netkafka
4.运行kafka.yml文件
4.1 在cmd中进入该文件所在目录,执行以下命令(文件内容附文末,创建应该.txt文件,修改后缀为.yml即可)
docker-compose -f kafka.yml up -d
4.2 查看docker界面,kafka集群已经启动
5.集群消费者生产者测试
5.1 打开一个cmd,执行以下命令,进入kafka1容器
docker exec -it kafka1 bash
5.2 执行以下命令,进入目录
cd /opt/bitnami/kafka/bin
*5.3 执行以下命令,创建topic,创建一个副本为3、分区为5的topic*
./kafka-topics.sh --create --topic foo --partitions 5 --replication-factor 3 --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
*5.4 执行以下命令,查看topic详细信息*
kafka-topics.sh --describe --topic foo --bootstrap-server kafka1:9092, kafka2:9092, kafka3:9092
5.5 新打开一个cmd界面,执行以下命令进入kafak1容器
docker exec -it kafka1 bash
5.6 执行以下命令创建生产者
kafka-console-producer.sh --broker-list 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
5.7 新打开一个cmd界面,执行以下命令,进入kafka2容器
docker exec -it kafka2 bash
5.8 执行以下命令创建消费者
kafka-console-consumer.sh --bootstrap-server 172.23.0.11:9092,172.23.0.12:9092,172.23.0.13:9092 --topic foo
在生产者中输入消息
查看消费者是否收到消息,可以看见消费者中成功接收到消息
****6.****以下为kafka.yml文件内容
其中,需要修改的内容为
*1.修改宿主机ip KAFKA_CFG_ADVERTISED_LISTENERS(自己的主机IP)*
2.修改挂载路径
version: "3.6"
services:
kafka1:
container_name: kafka1
image: 'bitnami/kafka:3.6.1'
user: root
ports:
- '19092:9092'
- '19093:9093'
environment:
# 允许使用Kraft
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
#定义外网访问地址(宿主机ip地址和端口,标红处修改为自己主机IP)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://1.1.1.1:19092
- KAFKA_CFG_NODE_ID=1
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
- [email protected]:9093,[email protected]:9093,[email protected]:9093
- ALLOW_PLAINTEXT_LISTENER=yes
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
volumes:
#挂载路径,标红处修改为自己的路径
- /E/dockers/volume/kafka/broker01:/bitnami/kafka:rw
networks:
netkafka:
ipv4_address: 172.23.0.11
kafka2:
container_name: kafka2
image: 'bitnami/kafka:3.6.1'
user: root
ports:
- '29092:9092'
- '29093:9093'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
#标红处修改为自己主机IP
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://1.1.1.1:29092
- KAFKA_CFG_NODE_ID=2
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI #哪一,三个节点保持一致
- [email protected]:9093,[email protected]:9093,[email protected]:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
volumes:
#挂载路径,标红处修改为自己的路径
- /E/dockers/volume/kafka/broker02:/bitnami/kafka:rw
networks:
netkafka:
ipv4_address: 172.23.0.12
kafka3:
container_name: kafka3
image: 'bitnami/kafka:3.6.1'
user: root
ports:
- '39092:9092'
- '39093:9093'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 标红处修改为自己主机IP
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://1.1.1.1:39092
- KAFKA_CFG_NODE_ID=3
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
- [email protected]:9093,[email protected]:9093,[email protected]:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
volumes:
#挂载路径,标红处修改为自己的路径
- /E/dockers/volume/kafka/broker03:/bitnami/kafka:rw
networks:
netkafka:
ipv4_address: 172.23.0.13
networks:
name:
netkafka:
external: true
driver: bridge
name: netkafka
ipam:
driver: default
config:
- subnet: 172.23.0.0/25
gateway: 172.23.0.1
代码解释:
1. version: "3.6"`:指定了 Docker Compose 文件的版本。
3. kafka1:
、kafka2:
和 kafka3:
:这些是要创建的 Kafka 服务的配置部分。每个服务都包括了容器的名称、镜像、端口映射、环境变量、卷挂载、网络配置等信息。
4. container_name
: 指定了容器的名称。
5. image
: 指定了要使用的 Docker 镜像。
6. ports
: 指定了要映射的端口,格式为 <host_port>:<container_port>
。
7. environment
: 指定了环境变量,用于配置 Kafka 实例。
KAFKA_ENABLE_KRAFT=yes:启用 Kafka 中的 Kafka Raft 强一致性协议 (KRaft)。
****8.****KAFKA_CFG_PROCESS_ROLES=broker,controller:指定 Kafka 进程的角色,其中包括 “broker” 和 “controller”。
****9.****KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER:指定控制器监听器的名称为 “CONTROLLER”。
****10.****KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093:定义 Kafka 监听器的配置。这里使用了两个监听器,一个用于 “PLAINTEXT” 协议,监听 9092 端口,另一个用于 “CONTROLLER” 协议,监听 9093 端口。
****11.****KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT:定义监听器的安全协议映射。这里将 “CONTROLLER” 协议映射到 “PLAINTEXT” 协议。
****12.****KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.135:29092:定义 Kafka 在外部可访问的监听器地址。这里使用了 “PLAINTEXT” 协议和宿主机的 IP 地址。
****13.*****KAFKA_CFG_NODE_ID=1:指定当前 Kafka 节点的节点 ID。*
****14.****KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI:指定 Kafka KRaft 集群的 ID。确保三个节点的集群 ID 相同。
****15.****KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@172.23.0.11:9093,2@172.23.0.12:9093,3@172.23.0.13:9093:指定 KRaft 控制器的选举候选人列表。这里列出了三个节点的 IP 地址和端口。
****16.****ALLOW_PLAINTEXT_LISTENER=yes:允许使用 “PLAINTEXT” 协议的监听器。
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M:指定 Kafka 进程的 Java 堆内存大小。这里将最大堆内存 (Xmx) 设置为 512M,初始堆内存 (Xms) 设置为 256M。
18. volumes
: 定义了数据卷的挂载方式,用于持久化存储 Kafka 的数据。
1****9. networks
: 指定了要连接的网络。
2****0. networks -> netkafka
: 定义了名为 netkafka
的网络配置,其中包括了 IP 地址分配等信息。
解释:****关于网络配置部分,netkafka
是一个外部网络,其 IP 地址由 IP 地址管理 (IPAM) 驱动程序提供。它使用了子网 172.23.0.0/25
,并配置了网关地址为 172.23.0.1
。
版权归原作者 慕仙少白 所有, 如有侵权,请联系我们删除。