0


在k8s中部署Kafka高可用集群超详细讲解

🐇明明跟你说过:个人主页

🏅个人专栏:《数据流专家:Kafka探索》🏅

🔖行路有良友,便是天堂🔖

一、引言

1、Kafka简介

阿帕奇Kafka(Apache Kafka)是一个开源的分布式流处理平台,由LinkedIn开发并于2011年开源,现在是Apache软件基金会的一部分。Kafka的设计初衷是为了处理实时数据流,具备高吞吐量、低延迟、高容错性和可扩展性的特点。

Kafka的核心概念

1. Producer(生产者):

  • 生产者是向Kafka主题(topic)发送消息的应用程序。它们负责将数据推送到Kafka集群。

2. Consumer(消费者):

  • 消费者是从Kafka主题中读取消息的应用程序。它们可以是单个消费者或消费者组(consumer group),每个消费者组中的消费者可以并行地读取数据。

3. Topic(主题):

  • 主题是Kafka中的一个逻辑分类,用于将消息进行分组。每个主题可以细分为多个分区(partition),每个分区内的消息是有序的,但不同分区之间没有顺序保证。

4. Partition(分区):

  • 分区是主题的一个子单元,每个分区可以存储大量消息,并且每个分区可以被多个消费者读取。分区使得Kafka能够水平扩展。

5. Broker(代理):

  • Broker是Kafka集群中的一个服务器节点,负责接收、存储和发送消息。一个Kafka集群可以包含多个broker,以提供高可用性和容错能力。

6. Zookeeper:

  • Kafka使用Zookeeper来进行分布式协调和管理集群状态。Zookeeper负责维护配置信息、分区的leader选举以及消费组的offset管理等。

Kafka的主要功能

1. 消息持久化:

  • Kafka将消息持久化到磁盘,并且允许设置保留策略,确保数据的可靠性。

2. 高吞吐量:

  • Kafka能够处理高吞吐量的数据流,适合处理大规模的数据流应用。

3. 低延迟:

  • Kafka的设计优化了数据传输路径,使得消息传输具有低延迟。

4. 扩展性:

  • Kafka的分区机制允许在集群中增加更多的broker和分区,从而实现水平扩展。

5. 容错性:

  • 通过复制机制,Kafka确保了数据的高可用性,即使个别节点故障,数据仍然能够恢复。

Kafka的应用场景

1. 日志收集和聚合:

  • 作为一种高效的日志收集系统,Kafka可以收集和聚合分布式系统的日志数据。

2. 实时数据流处理:

  • Kafka与流处理框架(如Apache Flink、Apache Storm等)结合,可以进行实时数据分析和处理。

3. 消息队列:

  • Kafka可以充当高吞吐量的消息队列系统,用于不同应用程序之间的解耦和异步通信。

4. 事件源:

  • Kafka可以用作事件源系统,捕获和存储所有变化事件,以便后续处理和回放。

2、为什么在Kubernetes中部署Kafka

  1. 弹性和可伸缩性:Kubernetes提供了强大的自动伸缩和调度功能,可以根据负载情况自动扩展Kafka集群的节点数量,以满足不同工作负载的需求。这种弹性能力使得Kafka能够更好地应对数据量的变化和突发性负载。
  2. 易于管理:Kubernetes提供了统一的管理接口和工具,简化了Kafka集群的部署、扩展、更新和监控。通过Kubernetes的控制面板,管理员可以轻松地管理Kafka集群的生命周期,而无需深入了解底层的部署细节。
  3. 高可用性:Kubernetes具有自动故障检测和恢复机制,可以在节点故障时自动重新调度Kafka的副本,确保数据的高可用性和持久性。
  4. 资源隔离:通过Kubernetes的命名空间和资源限制功能,可以实现Kafka集群与其他应用程序之间的资源隔离,避免因资源竞争导致的性能问题。

点击并拖拽以移动​编辑

二、Kubernetes基础

1、Kubernetes概述

Kubernetes(K8s)是一个开源的容器编排平台,最初由Google开发并于2014年发布,现已成为Cloud Native Computing Foundation(CNCF)的一部分。Kubernetes旨在简化容器化应用程序的部署、管理和扩展,提供了丰富的功能和工具,使得用户可以更轻松地构建和运行分布式系统。

Kubernetes的主要功能

1. 自动化部署和扩展:

  • Kubernetes提供了丰富的调度和自动伸缩功能,可以根据负载情况自动调度和扩展应用程序的副本数量,以适应不同的工作负载需求。

2. 服务发现和负载均衡:

  • Kubernetes通过Service对象提供了统一的服务发现和负载均衡功能,使得应用程序可以通过简单的域名访问其他服务,并自动实现负载均衡。

3. 存储管理:

  • Kubernetes支持多种存储后端和存储卷类型,可以为应用程序提供持久化存储和共享存储功能,如PersistentVolume、PersistentVolumeClaim等。

4. 自我修复:

  • Kubernetes具有自我修复和健康检查机制,可以监控容器和节点的健康状态,并在出现故障时自动进行修复和恢复。

2、Pods、Services、Deployments等基本概念

1. Pods(容器组)

Pod是Kubernetes中最小的可部署单元,它可以包含一个或多个紧密相关的容器。这些容器共享相同的网络命名空间和存储卷,并且通常在同一节点上运行。Pod提供了一种逻辑主机的概念,使得容器之间可以共享资源和通信。Pod通常用于部署一个应用程序的一组相关容器。

2. Services(服务)

服务是Kubernetes中一种抽象,用于定义一组Pod的逻辑集合,并为它们提供统一的访问入口。服务通过标签选择器(Selector)将请求路由到对应的Pod,并实现负载均衡和服务发现功能。Kubernetes支持多种类型的服务,如ClusterIP、NodePort、LoadBalancer和ExternalName。

3. Deployments(部署)

部署是Kubernetes中用于管理Pod副本数量和应用程序版本更新的对象。部署定义了应用程序的期望状态,包括所需的副本数量、容器镜像和其他配置信息。部署控制器负责根据部署的定义创建、更新和删除Pod实例,同时提供滚动更新和回滚的功能。

三、Kafka集群架构

1、Kafka集群的组成与工作原理

Kafka集群由多个服务器节点(Broker)组成,每个Broker负责存储和处理一部分数据。Kafka的工作原理基于发布/订阅模式,其中生产者(Producer)将消息发布到一个或多个主题(Topic),而消费者(Consumer)订阅这些主题并从Broker拉取消息进行处理。

工作原理

1. 消息存储:

  • 生产者将消息发布到主题,并根据主题的分区策略选择将消息存储到对应的分区中。每个分区的消息按顺序存储,并根据消息的偏移量(Offset)进行标识。

2. 消息复制:

  • Kafka通过副本集机制实现数据的高可用性和持久性。每个分区都有一个主副本(Leader Replica)和多个副本(Follower Replica),主副本负责处理读写请求,而副本用于备份数据。当主副本故障时,Kafka会自动选举一个新的主副本来接管工作。

3. 消息传输:

  • 生产者将消息发送到Broker的主副本,Broker负责将消息复制到副本并进行持久化存储。消费者从Broker的主副本拉取消息进行处理,可以选择从不同的副本读取消息以实现负载均衡和故障恢复。

4. 消息保留策略:

  • Kafka支持设置消息的保留策略,包括基于时间、基于大小或基于日志段的策略。一旦消息达到保留期限或超过指定的大小限制,Kafka将自动删除过期消息,以释放存储空间。

2、Kafka集群的高可用性设计

1. 多副本复制

  • Kafka通过在多个Broker之间复制分区的副本来实现数据的冗余和高可用性。每个分区通常有多个副本,其中一个是主副本(Leader Replica),负责处理读写请求;其他副本是从副本(Follower Replica),用于备份数据。当主副本发生故障时,Kafka会自动选举一个新的主副本来接管工作,确保数据的可用性和一致性。

2. 自动故障检测与恢复

  • Kafka集群内置了自动故障检测和恢复机制,可以监控Broker和分区的健康状态,并在发现故障时自动进行故障转移和数据恢复。当Broker或分区发生故障时,Kafka会触发重新选举过程,选举出新的主副本并将数据复制到新的副本中,以实现快速的故障恢复。

3. 副本分布和均衡

  • Kafka会将分区的副本分布在不同的Broker上,以确保数据的冗余和均衡。通过将副本分布在不同的机架、数据中心或云区域中,可以提高系统的容错能力,即使整个机架或数据中心发生故障,系统仍然能够继续运行。

4. 水平扩展和动态伸缩

  • Kafka支持水平扩展和动态伸缩,可以根据负载情况和性能需求来增加或减少Broker的数量和容量。通过添加更多的Broker和分区,可以提高系统的吞吐量和容量,同时降低单点故障的风险,实现更高的可用性和可伸缩性。

四、准备部署环境

1、准备k8s集群

这里我们使用的k8s集群版本为 1.23,也可以使用其他的版本,只是镜像导入命令不通,如果还未搭建k8s集群,请参考《在Centos中搭建 K8s 1.23 集群超详细讲解》这篇文章

2、准备StorageClass

因为我们要对Kafka中的数据进行持久化,避免Pod漂移后数据丢失,保证数据的完整性与可用性

如果还未创建存储类,请参考《k8s 存储类(StorageClass)创建与动态生成PV解析,(附带镜像)》这篇文件。

3、准备部署Kafka集群所需的镜像

在k8s Node节点上执行以下命令

[root@node1 ~]# docker pull zookeeper:3.8
[root@node1 ~]# docker pull kafka:3.1.0
[root@node2 ~]# docker pull zookeeper:3.8
[root@node2 ~]# docker pull kafka:3.1.0
[root@node3 ~]# docker pull zookeeper:3.8
[root@node3 ~]# docker pull kafka:3.1.0

五、部署Kafka集群

1、部署Zookeeper集群

为什么部署kafka前要部署zookeeper?

Kafka依赖Zookeeper来实现分布式协调和配置管理。在Kafka集群中,Zookeeper扮演着多种角色,包括:

  1. 配置管理:Kafka集群的配置信息和元数据存储在Zookeeper中,包括主题(topics)、分区(partitions)、副本(replicas)等配置信息。
  2. Leader选举:Kafka的分区(partitions)被分布式存储在集群中的多个Broker上,每个分区都有一个Leader和多个Follower。Zookeeper负责Leader选举,确保每个分区都有一个活跃的Leader。
  3. Broker注册:Kafka Broker在启动时会向Zookeeper注册自己的信息,包括地址、ID等,以便其他Broker和客户端发现和连接。
  4. 健康监测:Zookeeper监控Kafka集群中各个节点的健康状态,并在节点出现故障或宕机时触发相应的处理操作。

因此,在部署Kafka之前,需要先部署Zookeeper,确保Kafka集群正常运行所需的分布式协调和配置管理功能可用。没有Zookeeper,Kafka无法正常运行,并且无法实现高可用性、数据一致性和故障容错等特性。

编写部署Zookeeper集群的YAML文件

[root@master ~]# vim zook.yaml
# 添加如下内容
apiVersion: v1
kind: Namespace
metadata:
  name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-cluster  #无头服务的名称,需要通过这个获取ip,与主机的对应关系
  namespace: kafka
  labels:
    app: zookeeper
spec:
  ports:
    - port: 2181
      name: zookeeper
    - port: 2188
      name: cluster1
    - port: 3888
      name: cluster2
  clusterIP: None
  selector:
    app: zookeeper
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-nodeport-service-0
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: zookeeper-0
  ports:
    - protocol: TCP
      port: 80        # Service 暴露的端口
      targetPort: 2181   # Pod 中容器的端口
      nodePort: 32181   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-nodeport-service-1
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: zookeeper-1
  ports:
    - protocol: TCP
      port: 80        # Service 暴露的端口
      targetPort: 2181   # Pod 中容器的端口
      nodePort: 32182   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-nodeport-service-2
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: zookeeper-2
  ports:
    - protocol: TCP
      port: 80        # Service 暴露的端口
      targetPort: 2181   # Pod 中容器的端口
      nodePort: 32183   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: zookeeper-config
  namespace: kafka
  labels:
    app: zookeeper
data:             #具体挂载的配置文件
  zoo.cfg: |+
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/data
    dataLogDir=/datalog
    clientPort=2181
    server.1=zookeeper-0.zookeeper-cluster.kafka:2188:3888
    server.2=zookeeper-1.zookeeper-cluster.kafka:2188:3888
    server.3=zookeeper-2.zookeeper-cluster.kafka:2188:3888
    4lw.commands.whitelist=*
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
  namespace: kafka
spec:
  serviceName: "zookeeper-cluster"   #填写无头服务的名称
  replicas: 3
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      initContainers:
      - name: set-zk-id
        image: busybox:latest
        command: ['sh', '-c', "hostname | cut -d '-' -f 2 | awk '{print $0 + 1}' > /data/myid"]
        volumeMounts:
        - name: data
          mountPath: /data
      containers:
      - name: zookeeper
        image: zookeeper:3.8
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "1000m"
        ports:
        - containerPort: 2181
          name: zookeeper
        - containerPort: 2188
          name: cluster1
        - containerPort: 3888
          name: cluster2
        volumeMounts:
          - name: zook-config            #挂载配置
            mountPath: /conf/zoo.cfg
            subPath: zoo.cfg
          - name: data
            mountPath: /data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
      volumes:
      - name: zook-config
        configMap:                                #configMap挂载
          name: zookeeper-config
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi

创建Zookeeper集群

[root@master ~]# kubectl apply -f  zook.yaml

查看Pod状态

2、部署Kafka集群

编写部署Kafka集群的YAML文件

[root@master ~]# vim kafka.yaml
# 输入如下内容 
apiVersion: v1
kind: Service
metadata:
  name: kafka-cluster  #无头服务的名称,需要通过这个获取ip,与主机的对应关系
  namespace: kafka
  labels:
    app: kafka
spec:
  ports:
    - port: 9092
      name: kafka
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-nodeport-service-0
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: kafka0-0
  ports:
    - protocol: TCP
      port: 9092        # Service 暴露的端口
      targetPort: 9092   # Pod 中容器的端口
      nodePort: 30092   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
      name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-nodeport-service-1
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: kafka1-0
  ports:
    - protocol: TCP
      port: 9092        # Service 暴露的端口
      targetPort: 9092   # Pod 中容器的端口
      nodePort: 30093   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
      name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-nodeport-service-2
  namespace: kafka
spec:
  type: NodePort
  selector:
    statefulset.kubernetes.io/pod-name: kafka2-0
  ports:
    - protocol: TCP
      port: 9092        # Service 暴露的端口
      targetPort: 9092   # Pod 中容器的端口
      nodePort: 30094   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
      name: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka0
  namespace: kafka
spec:
  serviceName: "kafka-cluster"   #填写无头服务的名称
  replicas: 1
  selector:
    matchLabels:
      app: kafka0
  template:
    metadata:
      labels:
        app: kafka0
    spec:
      containers:
      - name: kafka
        image: kafka:3.1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "2000m"
        ports:
        - containerPort: 9092
          name: kafka
        command:
        - sh 
        - -c 
        - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=0 \
          --override listeners=PLAINTEXT://:9092 \
          --override advertised.listeners=PLAINTEXT://192.168.40.181:30092 \
          --override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \
          --override log.dirs=/var/lib/kafka/data \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000"
        volumeMounts:
          - name: data0
            mountPath: /var/lib/kafka/data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_HEAP_OPTS
          value : "-Xms1g -Xmx1g"
        - name: JMX_PORT
          value: "5555"
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data0
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka1
  namespace: kafka
spec:
  serviceName: "kafka-cluster"   #填写无头服务的名称
  replicas: 1
  selector:
    matchLabels:
      app: kafka1
  template:
    metadata:
      labels:
        app: kafka1
    spec:
      containers:
      - name: kafka
        image: kafka:3.1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "2000m"
        ports:
        - containerPort: 9092
          name: kafka
        command:
        - sh 
        - -c 
        - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=1 \
          --override listeners=PLAINTEXT://:9092 \
          --override advertised.listeners=PLAINTEXT://192.168.40.181:30093 \
          --override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \
          --override log.dirs=/var/lib/kafka/data \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000"
        volumeMounts:
          - name: data1
            mountPath: /var/lib/kafka/data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_HEAP_OPTS
          value : "-Xms1g -Xmx1g"
        - name: JMX_PORT
          value: "5555"
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data1
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka2
  namespace: kafka
spec:
  serviceName: "kafka-cluster"   #填写无头服务的名称
  replicas: 1
  selector:
    matchLabels:
      app: kafka2
  template:
    metadata:
      labels:
        app: kafka2
    spec:
      containers:
      - name: kafka
        image: kafka:3.1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "500Mi"
            cpu: "500m"
          limits:
            memory: "1000Mi"
            cpu: "2000m"
        ports:
        - containerPort: 9092
          name: kafka
        command:
        - sh 
        - -c 
        - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=2 \
          --override listeners=PLAINTEXT://:9092 \
          --override advertised.listeners=PLAINTEXT://192.168.40.181:30094 \
          --override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \
          --override log.dirs=/var/lib/kafka/data \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000"
        volumeMounts:
          - name: data2
            mountPath: /var/lib/kafka/data
        env:
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_HEAP_OPTS
          value : "-Xms1g -Xmx1g"
        - name: JMX_PORT
          value: "5555"
  volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv
    - metadata:
        name: data2
      spec:
        accessModes: ["ReadWriteMany"]
        storageClassName: nfs
        resources:
          requests:
            storage: 10Gi

创建Kafka集群

[root@master ~]# kubectl apply -f  kafka.yaml 

查看Pod状态

至此,Kafka集群部署完成

💕💕💕每一次的分享都是一次成长的旅程,感谢您的陪伴和关注。希望这些关于Kafka的文章能陪伴您走过技术的一段旅程,共同见证成长和进步!😺😺😺

🧨🧨🧨让我们一起在技术的海洋中探索前行,共同书写美好的未来!!!


本文转载自: https://blog.csdn.net/weixin_53269650/article/details/139469452
版权归原作者 明明跟你说过 所有, 如有侵权,请联系我们删除。

“在k8s中部署Kafka高可用集群超详细讲解”的评论:

还没有评论