0


【运维笔记】kafka跨域通信代理

kafka跨域通信代理

在项目部署过程中遇到kafka需要走代理跨域通信的情景,搭建了一套环境模拟实验,以此记录。

场景描述

两套kafka集群KafkaS和KafkaC分别位于两个不互通的网络域,跨域互访需要经过nginx代理机,现需要确认nginx、kafka的配置。

模拟思路

利用docker在一台虚拟机上创建两个网络不互通的kafka集群,同时在宿主机上部署nginx。

模拟环境说明

基础环境

1、宿主机:Ubuntu 22.10
2、工具:docker、docker-compose

kafka版本

1、kafka镜像/zookeeper镜像:wurstmeister/zookeeper、wurstmeister/kafka
2、实验时版本:zookeeper

环境部署

基础软件安装

安装docker、docker-compose、nginx

sudoaptinstall docker.io docker-compose nginx -y

编写kafka的docker-compose.yml文件

1、网络配置

networks:kafka_server_net:ipam:config:-subnet: 172.33.0.0/16
  kafka_client_net:ipam:config:-subnet: 172.34.0.0/16

2、zookeeper配置

zookeeperS:image: wurstmeister/zookeeper
    container_name: zookeeperS
    restart: always
    networks:kafka_server_net:ipv4_address: 172.33.0.10

zookeeperC的配置类似。

3、kafka配置

kafkaS1:image: wurstmeister/kafka
    depends_on:[ zookeeperS ]container_name: kafkaS1
    environment:HOSTNAME: kafkaS1
      KAFKA_BROKER_ID:0KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS1:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka
    # extra_hosts可以把地址映射添加到hosts文件里,后面有用。其中,统一集群的kafka映射为容器的IP地址,不同集群的kafka映射为宿主机地址,72.130是宿主机IP。extra_hosts:kafkaS1: 172.33.0.11
      kafkaS2: 172.33.0.12
      kafkaS3: 172.33.0.13
      kafkaC1: 192.168.72.128
      kafkaC2: 192.168.72.128
      kafkaC3: 192.168.72.128
    networks:kafka_server_net:ipv4_address: 172.33.0.11
        

kafkaC的配置类似

4、完整的docker-compose文件

version:'3.8'services:# kafkaS && zookeeperSzookeeperS:image: wurstmeister/zookeeper
    container_name: zookeeperS
    restart: always
    networks:kafka_server_net:ipv4_address: 172.33.0.10

  kafkaS1:image: wurstmeister/kafka
    depends_on:[ zookeeperS ]container_name: kafkaS1
    environment:HOSTNAME: kafkaS1
      KAFKA_BROKER_ID:10KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS1:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka
    networks:kafka_server_net:ipv4_address: 172.33.0.11
    extra_hosts:kafkaS1: 172.33.0.11
      kafkaS2: 172.33.0.12
      kafkaS3: 172.33.0.13
      kafkaC1: 192.168.72.128
      kafkaC2: 192.168.72.128
      kafkaC3: 192.168.72.128

  kafkaS2:image: wurstmeister/kafka
    depends_on:[ zookeeperS ]container_name: kafkaS2
    environment:HOSTNAME: kafkaS2
      KAFKA_BROKER_ID:11KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS2:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka
    networks:kafka_server_net:ipv4_address: 172.33.0.12
    extra_hosts:kafkaS1: 172.33.0.11
      kafkaS2: 172.33.0.12
      kafkaS3: 172.33.0.13
      kafkaC1: 192.168.72.128
      kafkaC2: 192.168.72.128
      kafkaC3: 192.168.72.128

  kafkaS3:image: wurstmeister/kafka
    depends_on:[ zookeeperS ]container_name: kafkaS3
    environment:HOSTNAME: kafkaS3
      KAFKA_BROKER_ID:12KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaS3:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperS:2181/kafka
    networks:kafka_server_net:ipv4_address: 172.33.0.13
    extra_hosts:kafkaS1: 172.33.0.11
      kafkaS2: 172.33.0.12
      kafkaS3: 172.33.0.13
      kafkaC1: 192.168.72.128
      kafkaC2: 192.168.72.128
      kafkaC3: 192.168.72.128

# kafkaC && zookeeperCzookeeperC:image: wurstmeister/zookeeper
    container_name: zookeeperC
    restart: always
    networks:kafka_client_net:ipv4_address: 172.34.0.10

  kafkaC1:image: wurstmeister/kafka
    depends_on:[ zookeeperC ]container_name: kafkaC1
    environment:HOSTNAME: kafkaC1
      KAFKA_BROKER_ID:20KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaC1:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperC:2181/kafka
    networks:kafka_client_net:ipv4_address: 172.34.0.11
    extra_hosts:kafkaS1: 192.168.72.128
      kafkaS2: 192.168.72.128
      kafkaS3: 192.168.72.128
      kafkaC1: 172.34.0.11
      kafkaC2: 172.34.0.12
      kafkaC3: 172.34.0.13

  kafkaC2:image: wurstmeister/kafka
    depends_on:[ zookeeperC ]container_name: kafkaC2
    environment:HOSTNAME: kafkaC2
      KAFKA_BROKER_ID:21KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaC2:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperC:2181/kafka
    networks:kafka_client_net:ipv4_address: 172.34.0.12
    extra_hosts:kafkaS1: 192.168.72.128
      kafkaS2: 192.168.72.128
      kafkaS3: 192.168.72.128
      kafkaC1: 172.34.0.11
      kafkaC2: 172.34.0.12
      kafkaC3: 172.34.0.13

  kafkaC3:image: wurstmeister/kafka
    depends_on:[ zookeeperC ]container_name: kafkaC3
    environment:HOSTNAME: kafkaC3
      KAFKA_BROKER_ID:22KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafkaC3:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT: zookeeperC:2181/kafka
    networks:kafka_client_net:ipv4_address: 172.34.0.13
    extra_hosts:kafkaS1: 192.168.72.128
      kafkaS2: 192.168.72.128
      kafkaS3: 192.168.72.128
      kafkaC1: 172.34.0.11
      kafkaC2: 172.34.0.12
      kafkaC3: 172.34.0.13

# networksnetworks:kafka_server_net:ipam:config:-subnet: 172.33.0.0/16
  kafka_client_net:ipam:config:-subnet: 172.34.0.0/16

环境验证

docker-compose up -d启动所有容器后,开始验证环境是否符合要求。
1、网络连通性验证
本场景应当kafkaS和kafkaC集群间不能互访,内部三个节点可以互访;宿主机和两个集群可以互访。
1-1、kafka集群内部互访
先进入容器内:

docker -exec -it kafkaS1 bash

kafka容器没有telnet、ping等工具测试网络,但我们可以用nc:

nc -vz kafkaS2 9092
nc -vz zookeeperS 2181

测试结果1

以上测试应该会回显succeed

1-2、kafka集群间验证

nc -vz kafkaC1 9092

测试结果2

测试结果是refused,是否与我们预期不一样?别慌,这是因为kafkaC1被映射为宿主机的地址,因此是正常的,要测试kafka集群间连通性,应该直接用ip测试,这次结果就是超时了。

nc -vz 172.34.0.11 9092

测试结果3

1-3、代理(宿主机)与kafka的连通性验证
上一部分已经验证过,也可以在宿主机上Telnet端口,正常情况是通的。

2、kafka可用性验证
2-1、分别在两个kafka集群测试统一集群内kafka能否成功收发消息。

# 进入kafka测试脚本所在目录cd /opt/kafka/bin/
# 创建主题
kafka-topics.sh --create--zookeeper zookeeperS:2181/kafka --replication-factor 1--partitions2--topic testtopic
# 模拟生产消息
kafka-console-producer.sh --topic=testtopic --broker-list kafkaS1:9092,kafkaS2:9092,kafkaS3:9092
# 随便输入内容进行测试
# 进入另一个kafka容器,模拟消费
kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --from-beginning --topic testtopic

消费的窗口能看到生产的消息,则验证通过。测试结果4
至此,模拟环境搭建完成,下面记录如何实现跨域通信。

解决方案

Kafka通信机制

kafka通过二次交互建立会话:服务端在第一次交互时会返回一个地址供客户端建立会话,这个交互机制可以防止中间人攻击,但对走代理的场景来说,配置就比较麻烦了。
一个kafka服务端会提供给多个客户端访问(含不经代理的访问),配置为返回代理地址不太合适,

解决思路

kafka返回相同值给客户端,客户端却需要识别成各不相同、自己能连接的地址,这个问题很自然地就能想到用hosts解决。
docker-compose配置里已经给kafka节点都加上了extra_hosts,并根据连通性做好主机名和IP的映射。

代理配置

映射加上之后,只需要再把nginx代理配好就能成功互访了。
这里我用四层代理进行转发。
在nginx.conf文件里新建stream块,和http同级。
在这里插入图片描述
在nginx配置目录下新建kafkaproxy.conf,具体如下:

# 新建配置文件sudonano /etc/nginx/conf.d/stream/kafkaproxy.conf

配置内容:

upstream KAFKA_SERVER
{
server 172.33.0.11:9092;
server 172.33.0.12:9092;
server 172.33.0.13:9092;
}
server {
listen *:9092;
proxy_pass KAFKA_SERVER;
proxy_connect_timeout 2s;
proxy_timeout 1m;
}

# 配置完毕后重载配置sudo nginx -s reload

验证是否满足要求

1、模拟发送消息
进入kafkaS任意节点,模拟发送消息

kafka-console-producer.sh --topic=testtopic --broker-list kafkaS1:9092,kafkaS2:9092,kafkaS3:9092

2、模拟接收消息

kafka-console-consumer.sh --bootstrap-server kafkaS1:9092,kafkaS2:9092,kafkaS3:9092 --from-beginning --topic testtopic

能相互通信
测试结果5
因为–from-beginning参数没去掉,所以有一些之前产生的消息。
但此时仍然是有问题的,通过代理访问kafka服务端的时候,有时会消费不到数据。这是负载均衡的问题,如果要避免,还是得配置多监听。

标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/u013943146/article/details/128274711
版权归原作者 三水有余 所有, 如有侵权,请联系我们删除。

“【运维笔记】kafka跨域通信代理”的评论:

还没有评论