0


Kubernetes 部署 Kafka 集群

docker-compose 部署 kafka

  • 镜像地址 kafka官网 kafka镜像 zookeeper镜像
  • Kafka 4.0 将移除zookeeper,仅支持KRaft 所以我们使用KRaft模式,这也是kafka:3.4的默认模式.
  • 由于这是一个非 root 的容器,挂载的文件和目录必须具有 UID 1001 的适当权限sudo chown -R 1001:1001 ./kafka_data
  • 创建kafka容器 docker compose -f docker-compose.yml up -d
version:"3"services:kafka:image: bitnami/kafka:3.4.1
    ports:-"9092:9092"volumes:-"./kafka_data:/bitnami"environment:# KRaft settings- KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093# Listeners- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  • 集群方式可参考链接

在 Kubernetes 上部署 Kafka 集群

  1. yaml示例使用的是KRaft模式
  2. 创建无头Service kafka-headless,用于kafka间相互通信
  3. 创建Service kafka, 用于外部访问kafka
  4. 启动脚本写入ConfigMap, 只是在默认的启动脚本前增加了环境变量KAFKA_CFG_NODE_ID赋值: 1. 集群中每个副本都需要设置KAFKA_CFG_NODE_ID,且必须为整数2. StatefulSet中将副本名称metadata.name赋值给环境变量MY_POD_NAME参考文档。(也可以直接使用环境变量HOSTNAME,副本名称是默认的主机名)3. 截取环境变量MY_POD_NAME最后的序号,赋值给环境变量KAFKA_CFG_NODE_ID,比如:MY_POD_NAME=kafka-0,那么 KAFKA_CFG_NODE_ID=04. 环境变量KAFKA_CFG_CONTROLLER_QUORUM_VOTERS也可以在此脚本中自动生成,可参考。yaml示例中直接设置成了3个。
  5. 非root容器需要设置securityContext
  6. 使用模板volumeClaimTemplates动态创建存储,每个副本挂载单独的存储。例子里使用的是华为云现有的storageClassName: csi-disk,如果没有声明StorageClass,可以参考文档提前创建。
apiVersion: v1
kind: Service
metadata:name: kafka-headless
  labels:app: kafka
spec:type: ClusterIP
  clusterIP: None
  ports:-name: kafka-client
    port:9092targetPort: kafka-client
  -name: controller
    port:9093targetPort: controller   
  selector:app: kafka
---#部署 Service,用于外部访问 KafkaapiVersion: v1
kind: Service
metadata:name: kafka
  labels:app: kafka
spec:type: ClusterIP
  ports:-name: kafka-client
    port:9092targetPort: kafka-client
  selector:app: kafka
---# 分别在 StatefulSet 中的每个 Pod 中获取相应的序号作为 KAFKA_CFG_NODE_ID(只能是整数),然后再执行启动脚本apiVersion: v1
kind: ConfigMap
metadata:name: ldc-kafka-scripts
data:setup.sh:|-#!/bin/bash
    export KAFKA_CFG_NODE_ID=${MY_POD_NAME##*-} 
    exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
---apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka
  labels:app: kafka
spec:selector:matchLabels:app: kafka
  serviceName: kafka-headless
  podManagementPolicy: Parallel
  replicas:3# 部署完成后,将会创建 3 个 Kafka 副本updateStrategy:type: RollingUpdate
  template:metadata:labels:app: kafka
    spec:affinity:podAntiAffinity:# 工作负载反亲和preferredDuringSchedulingIgnoredDuringExecution:# 尽量满足如下条件-weight:1podAffinityTerm:labelSelector:# 选择Pod的标签,与工作负载本身反亲和matchExpressions:-key:"app"operator: In
                    values:- kafka
              topologyKey:"kubernetes.io/hostname"# 在节点上起作用    containers:-name: kafka
        image: bitnami/kafka:3.4.1
        imagePullPolicy:"IfNotPresent"command:- /opt/leaderchain/setup.sh
        env:-name: BITNAMI_DEBUG
          value:"true"# true 详细日志# KRaft settings -name: MY_POD_NAME # 用于生成 KAFKA_CFG_NODE_IDvalueFrom:fieldRef:fieldPath: metadata.name            
        -name: KAFKA_CFG_PROCESS_ROLES
          value:"controller,broker"-name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
          value:"[email protected]:9093,[email protected]:9093,[email protected]:9093"-name: KAFKA_KRAFT_CLUSTER_ID
          value:"Jc7hwCMorEyPprSI1Iw4sW"# Listeners            -name: KAFKA_CFG_LISTENERS
          value:"PLAINTEXT://:9092,CONTROLLER://:9093"-name: KAFKA_CFG_ADVERTISED_LISTENERS
          value:"PLAINTEXT://:9092"-name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
          value:"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"-name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
          value:"CONTROLLER"-name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
          value:"PLAINTEXT"ports:-containerPort:9092name: kafka-client                  
        -containerPort:9093name: controller
          protocol: TCP                     
        volumeMounts:-mountPath: /bitnami/kafka
          name: data
        -mountPath: /opt/leaderchain/setup.sh
          name: scripts
          subPath: setup.sh
          readOnly:truesecurityContext:fsGroup:1001runAsUser:1001volumes:-configMap:defaultMode:493name: ldc-kafka-scripts
        name: scripts                   
  volumeClaimTemplates:-apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:name: data
      annotations:everest.io/disk-volume-type: SAS
      labels:failure-domain.beta.kubernetes.io/region: cn-south-1failure-domain.beta.kubernetes.io/zone: cn-south-2b      
    spec:accessModes:["ReadWriteOnce"]storageClassName: csi-disk
      resources:requests:storage: 10Gi

主题Topic

  1. 查看帮助(容器中kafka的脚本目录为:/opt/bitnami/kafka/bin)sh kafka-topics.sh --help
  2. 获取所有的主题sh kafka-topics.sh --bootstrap-server localhost:9092 --list
  3. 创建一个Topic –partitions(分区数量) –topic(主题名) –replication-factor(副本数量,不能大于broker的数量)sh kafka-topics.sh --create --topic myTopic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9092
  4. 查询 Topic 的详细信息sh kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic myTopic
  5. 删除 Topic (Topic 中所有的消息数据都将被永久删除,且无法恢复)sh kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic myTopic
  6. 增加主题分区数量 (如果要减少分区的数量,只能删除Topic,然后重新创建)sh kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --alter --partitions 3
  7. 修改数据过期时间 (kafka默认的只保存7天的数据,retention.ms=-1表示不过期)sh kafka-topics.sh --bootstrap-server localhost:9092 -topic myTopic --alter --config retention.ms=259200000
  8. 修改多字段sh kafka-topics.sh --bootstrap-server localhost:9092 -topic myTopic --alter --config retention.ms=259200000 max.message.bytes=128000
  9. 修改 Topic 副本数 1. 编写分配脚本get-reassign-tpl.jsonecho '{"topics":[{"topic":"myTopic"}],"version": 1}' > get-reassign-tpl.json2. 执行分配计划,用于生成json格式的文件sh kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file get-reassign-tpl.json --broker-list "0,1,2" --generate3. 复制上一步返回结果中的json,修改副本字段replicas,填写broker.id,生成reassign.json文件echo '{"version":1,"partitions":[{"topic":"myTopic","partition":0,"replicas":[1,2]}]}' > reassign.json4. 利用上一步生成的reassign.json,进行topic的重新分配sh kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassign.json --execute5. 查看分配的进度sh kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassign.json --verify6. 分配完成,再次查询详情sh kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic myTopic

测试发送消息和接收消息

  1. 开启一个 Producer(生产者)窗口,然后生产几条信息
sh kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic
>hello
>world
  1. 创建一个 Consumer(消费者)窗口:sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --consumer-property group.id=myGroup --from-beginning –from-beginning 如果消费者尚未建立消费偏移量(offset),那么就从Topic的第一条消息开始消费 –consumer-property group.id=myGroup 消费者的group.id,不设置会自动生成如果存在group.id相同的多个消费者窗口,只会有其中一个消费者收到消息
  2. 列出所有主题中的所有用户组sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  3. 查询消费者组详情(数据积压情况)sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup LogEndOffset:下一条将要被加入到日志的消息的位移 CurrentOffset:当前消费的位移 LAG:消息堆积量:消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量也称之为消费滞后量
  4. 更多操作,可以参考官方文档

Open-Source Web UI for Apache Kafka

KRaft模式的kafka没有zookeeper,图形客户端工具

offsetexplorer

无法连接,找到一套开源的 Web UI,Docker、Helm下的安装方式可参考官方文档。
使用

kubectl apply

命令安装

provectuslabs/kafka-ui

:

  1. kubectl apply -f k8s.kafka-ui.yaml
  2. k8s.kafka-ui.yaml示例如下:
apiVersion: v1
kind: Service
metadata:name: kafka-ui
  labels:app: kafka-ui
spec:type: NodePort
  ports:-name: web
    port:8080targetPort: web
    nodePort:0selector:app: kafka-ui
---apiVersion: apps/v1
kind: Deployment
metadata:name: kafka-ui
  labels:app: kafka-ui
spec:selector:matchLabels:app: kafka-ui
  replicas:1template:metadata:labels:app: kafka-ui
    spec:containers:-name: kafka-ui
        image: provectuslabs/kafka-ui:latest
        # imagePullPolicy: "IfNotPresent"env:-name: KAFKA_CLUSTERS_0_NAME
          value:"kafka-c0"-name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
          value:"kafka-0.kafka-headless:9092"-name: DYNAMIC_CONFIG_ENABLED
          value:"true"-name: AUTH_TYPE # https://docs.kafka-ui.provectus.io/configuration/authentication/basic-authenticationvalue:"LOGIN_FORM"-name: SPRING_SECURITY_USER_NAME
          value:"name_admin"-name: SPRING_SECURITY_USER_PASSWORD
          value:"password_123456"ports:-name: web
          containerPort:8080
标签: kafka kubernetes docker

本文转载自: https://blog.csdn.net/fangling86/article/details/131073435
版权归原作者 fangling86 所有, 如有侵权,请联系我们删除。

“Kubernetes 部署 Kafka 集群”的评论:

还没有评论