kafka跨域通信代理
在项目部署过程中遇到kafka需要走代理跨域通信的情景,搭建了一套环境模拟实验,以此记录。
场景描述
两套kafka集群KafkaS和KafkaC分别位于两个不互通的网络域,跨域互访需要经过nginx代理机,现需要确认nginx、kafka的配置。
模拟思路
利用docker在一台虚拟机上创建两个网络不互通的kafka集群,同时在宿主机上部署nginx。
模拟环境说明
基础环境
1、宿主机:Ubuntu 22.10
2、工具:docker、docker-compose
kafka版本
1、kafka镜像/zookeeper镜像:wurstmeister/zookeeper、wurstmeister/kafka
2、实验时版本:zookeeper
环境部署
基础软件安装
安装docker、docker-compose、nginx
sudoaptinstall docker.io docker-compose nginx -y
编写kafka的docker-compose.yml文件
1、网络配置
networks:kafka_server_net:ipam:config:-subnet: 172.33.0.0/16
kafka_client_net:ipam:config:-subnet: 172.34.0.0/16
2、zookeeper配置
zookeeperS:image: wurstmeister/zookeeper
container_name: zookeeperS
restart: always
networks:kafka_server_net:ipv4_address: 172.33.0.10
zookeeperC的配置类似。
3、kafka配置
kafkaS1:image: wurstmeister/kafka
depends_on:[ zookeeperS ]container_name: kafkaS1
environment:HOSTNAME: kafkaS1
KAFKA_BROKER_ID:0KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS1:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka
# extra_hosts可以把地址映射添加到hosts文件里,后面有用。其中,统一集群的kafka映射为容器的IP地址,不同集群的kafka映射为宿主机地址,72.130是宿主机IP。extra_hosts:kafkaS1: 172.33.0.11
kafkaS2: 172.33.0.12
kafkaS3: 172.33.0.13
kafkaC1: 192.168.72.128
kafkaC2: 192.168.72.128
kafkaC3: 192.168.72.128
networks:kafka_server_net:ipv4_address: 172.33.0.11
kafkaC的配置类似
4、完整的docker-compose文件
version:'3.8'services:# kafkaS && zookeeperSzookeeperS:image: wurstmeister/zookeeper
container_name: zookeeperS
restart: always
networks:kafka_server_net:ipv4_address: 172.33.0.10
kafkaS1:image: wurstmeister/kafka
depends_on:[ zookeeperS ]container_name: kafkaS1
environment:HOSTNAME: kafkaS1
KAFKA_BROKER_ID:10KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS1:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka
networks:kafka_server_net:ipv4_address: 172.33.0.11
extra_hosts:kafkaS1: 172.33.0.11
kafkaS2: 172.33.0.12
kafkaS3: 172.33.0.13
kafkaC1: 192.168.72.128
kafkaC2: 192.168.72.128
kafkaC3: 192.168.72.128
kafkaS2:image: wurstmeister/kafka
depends_on:[ zookeeperS ]container_name: kafkaS2
environment:HOSTNAME: kafkaS2
KAFKA_BROKER_ID:11KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS2:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka
networks:kafka_server_net:ipv4_address: 172.33.0.12
extra_hosts:kafkaS1: 172.33.0.11
kafkaS2: 172.33.0.12
kafkaS3: 172.33.0.13
kafkaC1: 192.168.72.128
kafkaC2: 192.168.72.128
kafkaC3: 192.168.72.128
kafkaS3:image: wurstmeister/kafka
depends_on:[ zookeeperS ]container_name: kafkaS3
environment:HOSTNAME: kafkaS3
KAFKA_BROKER_ID:12KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS3:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka
networks:kafka_server_net:ipv4_address: 172.33.0.13
extra_hosts:kafkaS1: 172.33.0.11
kafkaS2: 172.33.0.12
kafkaS3: 172.33.0.13
kafkaC1: 192.168.72.128
kafkaC2: 192.168.72.128
kafkaC3: 192.168.72.128
# kafkaC && zookeeperCzookeeperC:image: wurstmeister/zookeeper
container_name: zookeeperC
restart: always
networks:kafka_client_net:ipv4_address: 172.34.0.10
kafkaC1:image: wurstmeister/kafka
depends_on:[ zookeeperC ]container_name: kafkaC1
environment:HOSTNAME: kafkaC1
KAFKA_BROKER_ID:20KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaC1:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperC:2181/kafka
networks:kafka_client_net:ipv4_address: 172.34.0.11
extra_hosts:kafkaS1: 192.168.72.128
kafkaS2: 192.168.72.128
kafkaS3: 192.168.72.128
kafkaC1: 172.34.0.11
kafkaC2: 172.34.0.12
kafkaC3: 172.34.0.13
kafkaC2:image: wurstmeister/kafka
depends_on:[ zookeeperC ]container_name: kafkaC2
environment:HOSTNAME: kafkaC2
KAFKA_BROKER_ID:21KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaC2:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperC:2181/kafka
networks:kafka_client_net:ipv4_address: 172.34.0.12
extra_hosts:kafkaS1: 192.168.72.128
kafkaS2: 192.168.72.128
kafkaS3: 192.168.72.128
kafkaC1: 172.34.0.11
kafkaC2: 172.34.0.12
kafkaC3: 172.34.0.13
kafkaC3:image: wurstmeister/kafka
depends_on:[ zookeeperC ]container_name: kafkaC3
environment:HOSTNAME: kafkaC3
KAFKA_BROKER_ID:22KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaC3:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperC:2181/kafka
networks:kafka_client_net:ipv4_address: 172.34.0.13
extra_hosts:kafkaS1: 192.168.72.128
kafkaS2: 192.168.72.128
kafkaS3: 192.168.72.128
kafkaC1: 172.34.0.11
kafkaC2: 172.34.0.12
kafkaC3: 172.34.0.13
# networksnetworks:kafka_server_net:ipam:config:-subnet: 172.33.0.0/16
kafka_client_net:ipam:config:-subnet: 172.34.0.0/16
环境验证
docker-compose up -d启动所有容器后,开始验证环境是否符合要求。
1、网络连通性验证
本场景应当kafkaS和kafkaC集群间不能互访,内部三个节点可以互访;宿主机和两个集群可以互访。
1-1、kafka集群内部互访
先进入容器内:
docker -exec -it kafkaS1 bash
kafka容器没有telnet、ping等工具测试网络,但我们可以用nc:
nc -vz kafkaS2 9092
nc -vz zookeeperS 2181
以上测试应该会回显succeed
1-2、kafka集群间验证
nc -vz kafkaC1 9092
测试结果是refused,是否与我们预期不一样?别慌,这是因为kafkaC1被映射为宿主机的地址,因此是正常的,要测试kafka集群间连通性,应该直接用ip测试,这次结果就是超时了。
nc -vz 172.34.0.11 9092
1-3、代理(宿主机)与kafka的连通性验证
上一部分已经验证过,也可以在宿主机上Telnet端口,正常情况是通的。
2、kafka可用性验证
2-1、分别在两个kafka集群测试统一集群内kafka能否成功收发消息。
# 进入kafka测试脚本所在目录cd /opt/kafka/bin/
# 创建主题
kafka-topics.sh --create--zookeeper zookeeperS:2181/kafka --replication-factor 1--partitions2--topic testtopic
# 模拟生产消息
kafka-console-producer.sh --topic=testtopic --broker-list kafkaS1:9092,kafkaS2:9092,kafkaS3:9092
# 随便输入内容进行测试
# 进入另一个kafka容器,模拟消费
kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --from-beginning --topic testtopic
消费的窗口能看到生产的消息,则验证通过。
至此,模拟环境搭建完成,下面记录如何实现跨域通信。
解决方案
Kafka通信机制
kafka通过二次交互建立会话:服务端在第一次交互时会返回一个地址供客户端建立会话,这个交互机制可以防止中间人攻击,但对走代理的场景来说,配置就比较麻烦了。
一个kafka服务端会提供给多个客户端访问(含不经代理的访问),配置为返回代理地址不太合适,
解决思路
kafka返回相同值给客户端,客户端却需要识别成各不相同、自己能连接的地址,这个问题很自然地就能想到用hosts解决。
docker-compose配置里已经给kafka节点都加上了extra_hosts,并根据连通性做好主机名和IP的映射。
代理配置
映射加上之后,只需要再把nginx代理配好就能成功互访了。
这里我用四层代理进行转发。
在nginx.conf文件里新建stream块,和http同级。
在nginx配置目录下新建kafkaproxy.conf,具体如下:
# 新建配置文件sudonano /etc/nginx/conf.d/stream/kafkaproxy.conf
配置内容:
upstream KAFKA_SERVER
{
server 172.33.0.11:9092;
server 172.33.0.12:9092;
server 172.33.0.13:9092;
}
server {
listen *:9092;
proxy_pass KAFKA_SERVER;
proxy_connect_timeout 2s;
proxy_timeout 1m;
}
# 配置完毕后重载配置sudo nginx -s reload
验证是否满足要求
1、模拟发送消息
进入kafkaS任意节点,模拟发送消息
kafka-console-producer.sh --topic=testtopic --broker-list kafkaS1:9092,kafkaS2:9092,kafkaS3:9092
2、模拟接收消息
kafka-console-consumer.sh --bootstrap-server kafkaS1:9092,kafkaS2:9092,kafkaS3:9092 --from-beginning --topic testtopic
能相互通信
因为–from-beginning参数没去掉,所以有一些之前产生的消息。
但此时仍然是有问题的,通过代理访问kafka服务端的时候,有时会消费不到数据。这是负载均衡的问题,如果要避免,还是得配置多监听。
版权归原作者 三水有余 所有, 如有侵权,请联系我们删除。