0


使用 Docker-Compose 部署 ZooKeeper + Kafka + Kafka-UI

使用 Docker-Compose 部署 Kafka + ZooKeeper

这里使用的镜像是

bitnami

的,如果使用的是其他项目下的镜像,可能环境变量的参数名不一样,不能直接照抄了,需要看下对应镜像支持的环境变量参数名,后面的值是底层使用的,应该是一样的;

1. 无密码验证部署

1.1 启动 ZooKeeper

创建挂载

zookeeper

数据的文件夹:

mkdir-p zookeeper/{conf,data,logs}

conf

文件夹下创建一个自定义配置文件

zoo.cfg

# 服务器之间或客户端与服务器之间的单次心跳检测时间间隔,单位为毫秒tickTime=2000# 集群中leader服务器与follower服务器第一次连接最多次数initLimit=10# 集群中leader服务器与follower服务器第一次连接最多次数syncLimit=5# 客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求clientPort=2181# 存放数据文件dataDir=/opt/bitnami/zookeeper/data

# 存放日志文件dataLogDir=/opt/bitnami/zookeeper/logs

注:如果不配置dataLogDir,那么事务日志也会写在data目录中。这样会严重影响zookeeper的性能。因为在zookeeper吞吐量很高的时候,产生的事务日志和快照日志太多。

修改文件夹权限

chown-R1001.1001 zookeeper

使用 docker-compose.yaml

version:"3"services:zookeeper:image: docker.io/bitnami/zookeeper:3.9container_name: zookeeper
    hostname: zookeeper
    privileged:truerestart: always
    environment:ALLOW_ANONYMOUS_LOGIN: yes
    volumes:- ./zookeeper/data:/opt/bitnami/zookeeper/data
      - ./zookeeper/logs:/opt/bitnami/zookeeper/logs
      - ./zookeeper/conf/zoo.cfg:/opt/bitnami/zookeeper/conf/zoo.cfg
    ports:-"12181:2181"deploy:resources:limits:cpus:'4'memory: 4G
        reservations:cpus:'0.5'memory: 200M

查看启动日志如下:

[root@localhost deploy]# docker logs -f --tail 20 zookeeper2024-05-03 03:10:19,022 [myid:] - INFO  [main:o.e.j.s.AbstractConnector@333] - Started ServerConnector@4b7dc788{HTTP/1.1, (http/1.1)}{0.0.0.0:8080}2024-05-03 03:10:19,023 [myid:] - INFO  [main:o.e.j.s.Server@415] - Started @1256ms
2024-05-03 03:10:19,023 [myid:] - INFO  [main:o.a.z.s.a.JettyAdminServer@201] - Started AdminServer on address 0.0.0.0, port 8080 and command URL /commands
2024-05-03 03:10:19,030 [myid:] - INFO  [main:o.a.z.s.ServerCnxnFactory@169] - Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory
2024-05-03 03:10:19,032 [myid:] - WARN  [main:o.a.z.s.ServerCnxnFactory@309] - maxCnxns is not configured, using default value 0.
2024-05-03 03:10:19,034 [myid:] - INFO  [main:o.a.z.s.NIOServerCnxnFactory@652] - Configuring NIO connection handler with 10s sessionless connection timeout, 1 selector thread(s), 8 worker threads, and 64 kB direct buffers.
2024-05-03 03:10:19,036 [myid:] - INFO  [main:o.a.z.s.NIOServerCnxnFactory@660] - binding to port 0.0.0.0/0.0.0.0:2181
2024-05-03 03:10:19,057 [myid:] - INFO  [main:o.a.z.s.w.WatchManagerFactory@42] - Using org.apache.zookeeper.server.watch.WatchManager as watch manager
2024-05-03 03:10:19,057 [myid:] - INFO  [main:o.a.z.s.w.WatchManagerFactory@42] - Using org.apache.zookeeper.server.watch.WatchManager as watch manager
2024-05-03 03:10:19,059 [myid:] - INFO  [main:o.a.z.s.ZKDatabase@134] - zookeeper.snapshotSizeFactor =0.332024-05-03 03:10:19,059 [myid:] - INFO  [main:o.a.z.s.ZKDatabase@154] - zookeeper.commitLogCount=5002024-05-03 03:10:19,067 [myid:] - INFO  [main:o.a.z.s.p.SnapStream@61] - zookeeper.snapshot.compression.method = CHECKED
2024-05-03 03:10:19,067 [myid:] - INFO  [main:o.a.z.s.p.FileTxnSnapLog@480] - Snapshotting: 0x0 to /opt/bitnami/zookeeper/data/version-2/snapshot.0
2024-05-03 03:10:19,071 [myid:] - INFO  [main:o.a.z.s.ZKDatabase@291] - Snapshot loaded in13 ms, highest zxid is 0x0, digest is 13719855042024-05-03 03:10:19,073 [myid:] - INFO  [main:o.a.z.s.p.FileTxnSnapLog@480] - Snapshotting: 0x0 to /opt/bitnami/zookeeper/data/version-2/snapshot.0
2024-05-03 03:10:19,088 [myid:] - INFO  [main:o.a.z.s.ZooKeeperServer@589] - Snapshot taken in15 ms
2024-05-03 03:10:19,105 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::o.a.z.s.PrepRequestProcessor@138] - PrepRequestProcessor (sid:0) started, reconfigEnabled=false
2024-05-03 03:10:19,106 [myid:] - INFO  [main:o.a.z.s.RequestThrottler@75] - zookeeper.request_throttler.shutdownTimeout =10000 ms
2024-05-03 03:10:19,134 [myid:] - INFO  [main:o.a.z.s.ContainerManager@83] - Using checkIntervalMs=60000maxPerMinute=10000maxNeverUsedIntervalMs=02024-05-03 03:10:19,136 [myid:] - INFO  [main:o.a.z.a.ZKAuditProvider@42] - ZooKeeper audit is disabled.

可以看到

zookeeper

正常启动了,并且最后一行日志显示验证被禁用了,这是因为上面在启动的时候添加了环境变量:

ALLOW_ANONYMOUS_LOGIN=yes

允许任何人连接;后面配置 Kafka 密码验证的时候,就不这么配置了;

1.2 查看 zookeeper 状态

进入容器:

dockerexec-it zookeeper bash
I have no name!@zookeeper:/$ cd /opt/bitnami/zookeeper/bin
I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$ ls
README.txt    zkCli.sh   zkServer-initialize.sh  zkSnapShotToolkit.cmd   zkSnapshotComparer.sh                  zkTxnLogToolkit.cmd
zkCleanup.sh  zkEnv.cmd  zkServer.cmd            zkSnapShotToolkit.sh    zkSnapshotRecursiveSummaryToolkit.cmd  zkTxnLogToolkit.sh
zkCli.cmd     zkEnv.sh   zkServer.sh             zkSnapshotComparer.cmd  zkSnapshotRecursiveSummaryToolkit.sh
I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$ 

查看

zookeeper

状态:

I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$ zkServer.sh status
/opt/bitnami/java/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/bitnami/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$ 

1.3 启动 Kafka

创建挂载目录:

mkdir-p kafka/data

chown-R1001.1001 kafka

docker-compose.yaml

kafka:image: docker.io/bitnami/kafka:3.4container_name: kafka
    hostname: kafka
    privileged:truerestart: always
    environment:KAFKA_CFG_ZOOKEEPER_CONNECT:'zookeeper:2181'volumes:- ./kafka/data:/opt/bitnami/kafka/data
    ports:-"9092:9092"deploy:resources:limits:cpus:'4'memory: 4G
        reservations:cpus:'0.5'memory: 200M 
    depends_on:- zookeeper

启动 Kafka :

docker-compose up -d kafka
[root@localhost kafka-with-zookeeper]# docker-compose up -d kafka[+] Running 2/2
 ⠿ Container zookeeper  Running                                                                                                                                     0.0s
 ⠿ Container kafka      Started

查看 Kafka 日志:

[root@localhost ~]# docker logs -f --tail 20 kafka[2024-05-03 04:12:03,381] INFO [ReplicaStateMachine controllerId=1001] Triggering online replica state changes (kafka.controller.ZkReplicaStateMachine)[2024-05-03 04:12:03,391] INFO [ReplicaStateMachine controllerId=1001] Triggering offline replica state changes (kafka.controller.ZkReplicaStateMachine)[2024-05-03 04:12:03,391] DEBUG [ReplicaStateMachine controllerId=1001] Started replica state machine with initial state -> Map()(kafka.controller.ZkReplicaStateMachine)[2024-05-03 04:12:03,393] INFO [PartitionStateMachine controllerId=1001] Initializing partition state (kafka.controller.ZkPartitionStateMachine)[2024-05-03 04:12:03,394] INFO [PartitionStateMachine controllerId=1001] Triggering online partition state changes (kafka.controller.ZkPartitionStateMachine)[2024-05-03 04:12:03,396] INFO [RequestSendThread controllerId=1001] Controller 1001 connected to kafka:9092 (id: 1001 rack: null)for sending state change requests (kafka.controller.RequestSendThread)[2024-05-03 04:12:03,399] DEBUG [PartitionStateMachine controllerId=1001] Started partition state machine with initial state -> Map()(kafka.controller.ZkPartitionStateMachine)[2024-05-03 04:12:03,400] INFO [Controller id=1001] Ready to serve as the new controller with epoch 1(kafka.controller.KafkaController)[2024-05-03 04:12:03,411] INFO [Controller id=1001] Partitions undergoing preferred replica election:  (kafka.controller.KafkaController)[2024-05-03 04:12:03,412] INFO [Controller id=1001] Partitions that completed preferred replica election:  (kafka.controller.KafkaController)[2024-05-03 04:12:03,412] INFO [Controller id=1001] Skipping preferred replica election for partitions due to topic deletion:  (kafka.controller.KafkaController)[2024-05-03 04:12:03,413] INFO [Controller id=1001] Resuming preferred replica election for partitions:  (kafka.controller.KafkaController)[2024-05-03 04:12:03,416] INFO [Controller id=1001] Starting replica leader election (PREFERRED)for partitions  triggered by ZkTriggered (kafka.controller.KafkaController)[2024-05-03 04:12:03,431] INFO [Controller id=1001] Starting the controller scheduler (kafka.controller.KafkaController)[2024-05-03 04:12:03,461] INFO [BrokerToControllerChannelManager broker=1001name=forwarding]: Recorded new controller, from now on will use node kafka:9092 (id: 1001 rack: null)(kafka.server.BrokerToControllerRequestThread)[2024-05-03 04:12:03,551] INFO [BrokerToControllerChannelManager broker=1001name=alterPartition]: Recorded new controller, from now on will use node kafka:9092 (id: 1001 rack: null)(kafka.server.BrokerToControllerRequestThread)[2024-05-03 04:12:08,434] INFO [Controller id=1001] Processing automatic preferred replica leader election (kafka.controller.KafkaController)[2024-05-03 04:12:08,435] TRACE [Controller id=1001] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)[2024-05-03 04:17:08,439] INFO [Controller id=1001] Processing automatic preferred replica leader election (kafka.controller.KafkaController)[2024-05-03 04:17:08,439] TRACE [Controller id=1001] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)

可以看到 kafka 正常启动了。

1.4 Kafka 配置文件

进入 Kafka 容器,看下上面什么环境变量都没配置时,配置文件

server.properties

的内容:

I have no name!@kafka:/opt/bitnami/kafka/config$ cat server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements.  See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License.  You may obtain a copy of the License at##    http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.## This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.# See kafka.server.KafkaConfig for additional details and defaults############################## Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.#broker.id=0############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.#   FORMAT:#     listeners = listener_name://host_name:port#   EXAMPLE:#     listeners = PLAINTEXT://your.host.name:9092#listeners=PLAINTEXT://:9092# Listener name, hostname and port the broker will advertise to clients.# If not set, it uses the value for "listeners".#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the networknum.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/Onum.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log fileslog.dirs=/bitnami/kafka/data

# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.# This value is recommended to be increased for installations with data dirs located in RAID array.num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  ############################## The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync# the OS cache lazily. The following configurations control the flush of data to disk.# There are a few important trade-offs here:#    1. Durability: Unflushed data may be lost if you are not using replication.#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.# The settings below allow one to configure the flush policy to flush data after a period of time or# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletion due to agelog.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining# segments drop below log.retention.bytes. Functions independently of log.retention.hours.#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.#log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policieslog.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).# This is a comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.zookeeper.connect=zookeeper:2181

# Timeout in ms for connecting to zookeeper#zookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.# The default value for this is 3 seconds.# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.#group.initial.rebalance.delay.ms=0sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512

这里配置文件的值都可以使用环境变量来指定;
服务基础配置:

broker.id

服务器节点ID,同一集群中必须唯一
可以看到,脚本中没有设置

listeners

advertised.listeners

的值,配置文件中这两项的配置是注释掉的,如果需要外部机器访问当前 Kafka 则需要配置这两项:

  • KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
  • KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
  • KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
  • KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER 这里配置了三个监听器连接方式:
    • 第一组是明文端口9092,可以使用服务名kafka进行访问;
    • 第二组是集群内部通信,9093端口;
    • 第三组是外部机器明文访问,9094端口,在实际使用时,localhost 需要改为当前机器的IP; 如果为单节点 Kafka,允许外部机器访问,则只需要配置KAFKA_CFG_LISTENERSKAFKA_CFG_ADVERTISED_LISTENERS就可以了。

1.4 使用命令操作 Kafak 生产、消费

进入 Kafka 容器:

dockerexec-it kafka bashcd /opt/bitnami/bin/
I have no name!@kafka:/opt/bitnami/kafka/bin$ ls
connect-distributed.sh        kafka-console-producer.sh    kafka-leader-election.sh       kafka-run-class.sh                  kafka-verifiable-producer.sh
connect-mirror-maker.sh       kafka-consumer-groups.sh     kafka-log-dirs.sh              kafka-server-start.sh               trogdor.sh
connect-standalone.sh         kafka-consumer-perf-test.sh  kafka-metadata-quorum.sh       kafka-server-stop.sh                windows
kafka-acls.sh                 kafka-delegation-tokens.sh   kafka-metadata-shell.sh        kafka-storage.sh                    zookeeper-security-migration.sh
kafka-broker-api-versions.sh  kafka-delete-records.sh      kafka-mirror-maker.sh          kafka-streams-application-reset.sh  zookeeper-server-start.sh
kafka-cluster.sh              kafka-dump-log.sh            kafka-producer-perf-test.sh    kafka-topics.sh                     zookeeper-server-stop.sh
kafka-configs.sh              kafka-features.sh            kafka-reassign-partitions.sh   kafka-transactions.sh               zookeeper-shell.sh
kafka-console-consumer.sh     kafka-get-offsets.sh         kafka-replica-verification.sh  kafka-verifiable-consumer.sh
I have no name!@kafka:/opt/bitnami/kafka/bin$

这里看下脚本中的配置

KAFKA_HEAP_OPTS

I have no name!@kafka:/opt/bitnami/kafka/bin$ cat kafka-console-producer.sh 
#!/bin/bash# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements.  See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License.  You may obtain a copy of the License at##    http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.if["x$KAFKA_HEAP_OPTS"="x"];thenexportKAFKA_HEAP_OPTS="-Xmx512M"fiexec$(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
I have no name!@kafka:/opt/bitnami/kafka/bin$ 
I have no name!@kafka:/opt/bitnami/kafka/bin$ cat kafka-console-consumer.sh 
#!/bin/bash# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements.  See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License.  You may obtain a copy of the License at##    http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.if["x$KAFKA_HEAP_OPTS"="x"];thenexportKAFKA_HEAP_OPTS="-Xmx512M"fiexec$(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
I have no name!@kafka:/opt/bitnami/kafka/bin$ 

可以看到生产者和消费者的参数配置仅配置了一个环境变量:

KAFKA_HEAP_OPTS

,并且内容仅为

-Xmx512M

,记住这个玩意,等下密码验证时,需要对这个值进行扩展;

1.4.1 创建topic

创建名为

test

partitions

(分区)为10,

replication

(副本)为1的

topic
I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --create --bootstrap-server kafka:9092 --partitions10 --replication-factor 1--topictest
Created topic test.
I have no name!@kafka:/opt/bitnami/kafka/bin$ 

这里使用的是

kafka

来访问的,如果配置了

KAFKA_CFG_ADVERTISED_LISTENERS

也可以使用当前机器的Ip进行访问;

1.4.2 查看某个 topic

查看 Topic 分区情况,使用

describe
I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --describe --bootstrap-server kafka:9092 --topictest
Topic: test     TopicId: nAakv4u8SJO022gRe-ahVQ PartitionCount: 10      ReplicationFactor: 1    Configs: 
        Topic: test     Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 5    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 6    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 7    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 8    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 9    Leader: 1001    Replicas: 1001  Isr: 1001
I have no name!@kafka:/opt/bitnami/kafka/bin$ 

1.4.3 获取所有 topic

使用关键字

list
I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --list --bootstrap-server kafka:9092
test
I have no name!@kafka:/opt/bitnami/kafka/bin$

1.4.4 删除 topic

I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --delete --bootstrap-server kafka:9092 --topictest
I have no name!@kafka:/opt/bitnami/kafka/bin$ 
I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --list --bootstrap-server kafka:9092               

I have no name!@kafka:/opt/bitnami/kafka/bin$ 

1.4.4 发送消息

首先创建一个 topic:

kafka-topics.sh --create --bootstrap-server kafka:9092 --partitions10 --replication-factor 1--topictest

发送两条消息

I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-console-producer.sh --broker-list kafka:9092 --topictest>hello
>hello everyone
>

1.4.5 消费消息

再打开一个窗口,作为消费者连接 kafka

[root@localhost ~]# docker exec -it kafka bash
I have no name!@kafka:/$ 
I have no name!@kafka:/$ cd /opt/bitnami/kafka/bin/
I have no name!@kafka:/opt/bitnami/kafka/bin$ 
I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-console-consumer.sh --bootstrap-server kafka:9092 --topictest --from-beginning
hello
hello everyone

可以看到,已经正常消费了刚才生成的两条消息;
再次在生产者的窗口里生成消息,可以看到,消费者窗口,可以正常消费消息;

2. 有密码部署

这里使用

SASL_PLAINTEXT

模式,也就是明文密码,明文密码是直接配置在配置文件中,如果需要更安全的操作,可以使用

jks

证书的方式来配置

SASL_SSL

密码验证。可参考:kafka-generate-ssl.sh

2.1 使用 SASL 验证模式

这里使用 无密码的

zookeeper

,来搭配有密码的

Kafka

首先创建一个配置文件

secrets/kafka_server_jaas.conf

,用来配置 Kafka 的账号和密码:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"password="Abc123456"user_admin="Abc123456"user_producer="Abc123456"user_consumer="Abc123456";};

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="producer"password="Abc123456";};

该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule指定采用PLAIN机制,定义了用户。

  • usemamepassword指定该代理与集群其他代理初始化连接的用户名和密码
  • "user_"为前缀后接用户名方式创建连接代理的用户名和密码,例如,user_producer=“Abc123456”是指用户名为producer,密码为Abc123456
  • username为admin的用户,和user为admin的用户,密码要保持一致,否则会认证失败

上述配置中,在

KafkaServer

里,创建了三个用户,分别为

admin

producer

consumer

(创建多少个用户,可根据业务需要配置,用户名和密码可自定义设置;

KafkaClient

里配置了一个用户,用户名和密码需要和服务端配置的账号密码保持一致,这里配置了

producer

这个用户。

修改配置文件权限:

chown-R1001.1001 secrets

接着是 doker-compose 中的

kafka

配置调整为:

kafka:image: docker.io/bitnami/kafka:3.4container_name: kafka
    hostname: kafka
    privileged:truerestart: always
    environment:KAFKA_OPTS:'-Djava.security.auth.login.config=/opt/bitnami/kafka/secrets/kafka_server_jaas.conf'KAFKA_BROKER_ID:0KAFKA_CFG_LISTENERS: SASL_PLAINTEXT://:9092KAFKA_CFG_ADVERTISED_LISTENERS: SASL_PLAINTEXT://192.168.104.147:9092KAFKA_CFG_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_CFG_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_ZOOKEEPER_CONNECT:'zookeeper:2181'# 平替默认配置文件里面的值KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR:3KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:3KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR:2volumes:- ./kafka/data:/opt/bitnami/kafka/data
      - ./secrets:/opt/bitnami/kafka/secrets
    ports:-"9092:9092"deploy:resources:limits:cpus:'4'memory: 4G
        reservations:cpus:'0.5'memory: 200M 
    depends_on:- zookeeper

启动成功日志结尾为:

[2024-05-03 07:26:17,096] TRACE [Controller id=0] Leader imbalance ratio for broker 1001 is 1.0(kafka.controller.KafkaController)[2024-05-03 07:26:17,102] INFO [Controller id=0] Starting replica leader election (PREFERRED)for partitions  triggered by AutoTriggered (kafka.controller.KafkaController)[2024-05-03 07:26:20,808] INFO [RequestSendThread controllerId=0] Controller 0 connected to 192.168.104.147:9092 (id: 0 rack: null)for sending state change requests (kafka.controller.RequestSendThread)[2024-05-03 07:26:20,850] INFO [BrokerToControllerChannelManager broker=0name=forwarding]: Recorded new controller, from now on will use node192.168.104.147:9092 (id: 0 rack: null)(kafka.server.BrokerToControllerRequestThread)[2024-05-03 07:26:20,859] INFO [Broker id=0] Add 60 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 0 epoch 2 with correlation id1(state.change.logger)[2024-05-03 07:26:20,927] INFO [BrokerToControllerChannelManager broker=0name=alterPartition]: Recorded new controller, from now on will use node192.168.104.147:9092 (id: 0 rack: null)(kafka.server.BrokerToControllerRequestThread)

可以看到,由于指定了环境变量:

Broker ID

这里也确实生效了,由默认的ID

1001

改成了我们设置的

0

2.2 客户端验证

进入容器,测试命令行操作 Kafka
使用原来操作 Topic 的命令:
使用命令:

kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topictest--producer.config /opt/bitnami/kafka/config/producer.properties

上面命令不好使,执行会住,查看日志可以看到一直循环打印下面的信息:

[2024-05-03 07:53:41,938] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9092-127.0.0.1:36094-17)(Unexpected Kafka request of type METADATA during SASL handshake.)(org.apache.kafka.common.network.Selector)

发现一直在报错,然后新加一个配置文件

client.properties

,内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer"password="Abc123456";

接着使用查看 Topic 的命令:

kafka-topics.sh --list --bootstrap-server kafka:9092 --command-config /opt/bitnami/kafka/secrets/client.properties
__consumer_offsets
test
test1

可以看到,可以正常看到 topic 列表;

查看某个 Topic 信息:

I have no name!@kafka:/opt/bitnami/kafka/config$ kafka-topics.sh --describe --bootstrap-server kafka:9092 --topictest --command-config /opt/bitnami/kafka/secrets/client.properties
Topic: test     TopicId: _rY4JuTlQ-KRWdhbmIpklQ PartitionCount: 10      ReplicationFactor: 1    Configs: 
        Topic: test     Partition: 0    Leader: none    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 1    Leader: none    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 2    Leader: none    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 3    Leader: none    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 4    Leader: none    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 5    Leader: none    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 6    Leader: none    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 7    Leader: none    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 8    Leader: none    Replicas: 1001  Isr: 1001
        Topic: test     Partition: 9    Leader: none    Replicas: 1001  Isr: 1001
I have no name!@kafka:/opt/bitnami/kafka/config$ 
I have no name!@kafka:/opt/bitnami/kafka/config$ kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic test1 --command-config /opt/bitnami/kafka/secrets/client.properties
Topic: test1    TopicId: pNlhOdhjQtaemjVB3jjM0g PartitionCount: 1       ReplicationFactor: 1    Configs: 
        Topic: test1    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
I have no name!@kafka:/opt/bitnami/kafka/config$ 

现在问题来了,在生产者生成消息和消费消息时就噶了,生成消息也不报错,消费消息也不报错,但是消费者那不到消息,就很烦了。。。。。

既然命令行不行,那就换个其他的工具,本来还想省事呢,再加个 kafka-ui 瞅一眼;

3. 使用可视化工具 Kafka-ui

3.1 Kafka-ui 的基础使用

在上面 docker-compose 脚本中添加 Kafka-ui 的服务配置:

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    hostname: kafka-ui
    privileged: true
    restart: always
    environment:
      - DYNAMIC_CONFIG_ENABLED=true
      - AUTH_TYPE=LOGIN_FORM
      - SPRING_SECURITY_USER_NAME=admin
      - SPRING_SECURITY_USER_PASSWORD=admin123
    #volumes:#  - ~/kui/config.yml:/etc/kafkaui/dynamic_config.yaml
    ports:
      - "8080:8080"
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 2G
        reservations:
          cpus: '0.5'

配置上面的内容,启动 kafka-ui 服务后,可以看到正常启动的日志如下:

[root@localhost ~]# docker logs -f --tail 20 kafka-ui
Standard Commons Logging discovery in action with spring-jcl: please remove commons-logging.jar from classpath in order to avoid potential conflicts
 _   _ ___    __             _                _          _  __      __ _
||||_ _|  / _|___ _ _    /_\  _ __ __ _ __||_  ___  ||/ /__ _ / _||_____
||_|||||  _/ _ |'_|  / _ \| '_ / _` / _|' \/ -_) | '</ _`|  _| / / _`|\___/|___||_|\___|_|   /_/ \_| .__\__,_\__|_||_\___||_|\_\__,_|_||_\_\__,||_|2024-05-04 02:37:44,032 WARN  [main] c.p.k.u.u.DynamicConfigOperations: Dynamic config file /etc/kafkaui/dynamic_config.yaml doesnt exist or not readable
2024-05-04 02:37:44,098 INFO  [main] c.p.k.u.KafkaUiApplication: Starting KafkaUiApplication using Java 17.0.6 with PID 1(/kafka-ui-api.jar started by kafkaui in /)2024-05-04 02:37:44,099 DEBUG [main] c.p.k.u.KafkaUiApplication: Running with Spring Boot v3.1.3, Spring v6.0.11
2024-05-04 02:37:44,100 INFO  [main] c.p.k.u.KafkaUiApplication: No active profile set, falling back to 1 default profile: "default"2024-05-04 02:37:59,321 INFO  [main] c.p.k.u.c.a.BasicAuthSecurityConfig: Configuring LOGIN_FORM authentication.
2024-05-04 02:38:00,411 INFO  [main] o.s.b.a.e.w.EndpointLinksResolver: Exposing 3 endpoint(s) beneath base path '/actuator'2024-05-04 02:38:03,523 INFO  [main] o.s.b.w.e.n.NettyWebServer: Netty started on port 80802024-05-04 02:38:03,615 INFO  [main] c.p.k.u.KafkaUiApplication: Started KafkaUiApplication in22.402 seconds (process running for25.788)

这里并没有初始化配置连接

Kafka

的信息,仅仅在官方 Demo 上配置了个登录验证,登录页面如下:
在这里插入图片描述
这里登录的账号和密码就是上面脚本中配置的

admin

admin123

官方支持的环境变量,可以参考这里:misc-configuration-properties

并且脚本里面注释了挂载的配置文件,这个配置文件等下我们来看看里面的内容到底是什么;

登录成功之后,会自动进入

Kafka-ui

的控制台页面:

http://IP:8080/ui/clusters/create-new-cluster

,来配置连接

Kafka

集群的信息;
在这里插入图片描述
这里我们需要配置下必填项,如集群名称

Cluster name

,是否只读一般是不勾选的,我们可以使用这个来测试 Kafka 的发送消息,接着是

Bootstrap Servers 

这里如果是kafka 集群,则可以配置多个;

如果没有配置

Kafka

SASL

验证,可以直接拉到最下面 使用 Validate 来测试是否能正常连接

Kafka

由于我这里配置了

SASL

验证,所以在

Authentication

这一项里面,配置

Authentication Method

SASL/PLAIN

Security Protocol

配置为:

SASL_PLAINTEXT

,接着勾选:

Secured with auth?

, 配置用户名和密码;

这里需要注意的是,用户名和密码,得是 Kafka 的

KafkaClient

里面配置的用户,不能是

KafkaServer

里面的,否则在下面验证时报错:
在这里插入图片描述

KafkaClient

里面配置的用户:

producer

再次验证 在这里插入图片描述
可以看到,连接成功了,也就是说这里的用户名和密码配置是作为 Kafka 的客户端来连接的。

点击

Submit

按钮,提交当前配置,自动进入 Kafka-UI 的控制台页面;
在这里插入图片描述
可以看到控制台面板下显示了我们的集群:

test

, 点击下面的

Brokes

Topic

Consumers

可以正常显示我们的集群信息:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
但凡在连接集群的时候哪里配置的不对的,在上面菜单中,就可能不会正常显示我们的 Kafka 集群信息;

3.2 Kafka-ui 的配置文件

上面我们的脚本里面注释了一个挂载目录文件

/etc/kafkaui/dynamic_config.yaml

,现在我们来看下这个文件的内容

[root@localhost ~]# docker exec -it kafka-ui sh
/ $ 
/ $ cd /etc/kafkaui/
/etc/kafkaui $ ls
dynamic_config.yaml
/etc/kafkaui $ 
/etc/kafkaui $ cat dynamic_config.yaml 
auth:
  type: LOGIN_FORM
kafka:
  clusters:
  - bootstrapServers: kafka:9092
    name: test
    properties:
      security.protocol: SASL_PLAINTEXT
      sasl.mechanism: PLAIN
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required
        username="producer"password="Abc123456";
    readOnly: false
rbac:
  roles: []
webclient: {}
/etc/kafkaui $

可以看到这个文件里面的内容,也就是上面配置的连接 Kafka 集群信息;
并且官方也说了这些配置是支持环境变量来配置的:
如:

KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS

到配置文件里面就成了:

kafka:clusters:-bootstrapServers: xxx

但是我试了下直接把上面配置文件的内容转成如下的环境变量:

- KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181- KAFKA_CLUSTERS_0_READONLY=false
      - KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_PLAINTEXT
      - KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=PLAIN
      - KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="Abc123456";'

但是启动的时候就报错了。然后还是注释掉,在启动 Kafka-ui 之后,在页面配置要连接的 Kafka 集群信息就好了。

如果在配置Kafka账号的时候提示验证失败,这玩意好像有点不稳定还是怎么着,多试几下 说不定就好了。

如果账号配置错误,在连接

kafka

的时候,会报错,比如我刚开始配置了

KafkaClient

的账号,然后就报错了,异常如下:

2024-05-04 02:28:06,338 ERROR [parallel-1] c.p.k.u.s.StatisticsService: Failed to collect cluster local info
java.lang.IllegalStateException: Error while creating AdminClient for Cluster local
        at com.provectus.kafka.ui.service.AdminClientServiceImpl.lambda$createAdminClient$5(AdminClientServiceImpl.java:56)
        at reactor.core.publisher.Mono.lambda$onErrorMap$28(Mono.java:3783)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
        at reactor.core.publisher.Operators.error(Operators.java:198)
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:135)
        at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4480)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)
        at reactor.core.publisher.Operators.complete(Operators.java:137)
        at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:427)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:551)
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:488)
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:134)
        at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
        at com.provectus.kafka.ui.service.AdminClientServiceImpl.lambda$createAdminClient$2(AdminClientServiceImpl.java:53)
        at reactor.core.publisher.MonoSupplier.call(MonoSupplier.java:67)
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:127)... 16 common frames omitted
Caused by: java.lang.IllegalArgumentException: Login module control flag not specified in JAAS config
        at org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:110)
        at org.apache.kafka.common.security.JaasConfig.<init>(JaasConfig.java:63)
        at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:93)
        at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:87)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:167)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:522)... 22 common frames omitted
2024-05-04 02:28:06,339 DEBUG [parallel-1] c.p.k.u.s.ClustersStatisticsScheduler: Metrics updated for cluster: local

这里要配置成

KafkaServer

admin

账号;如果

KafkaServer

admin

账号也不好使,就再试

KafkaClient

的账号。只要 Kafka 配置的没问题,这里连接应该也问题不大。

4. 问题记录

4.1 消费者监听Topic时异常:Replication factor: 3 larger than available brokers: 1.

[2024-06-05 09:51:42,266] INFO [Admin Manager on Broker 0]: Error processing create topic request CreatableTopic(name='__consumer_offsets', numPartitions=50, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='compression.type', value='producer'), CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='segment.bytes', value='104857600')])(kafka.server.ZkAdminManager)
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

这个异常是因为 Kafka 环境配置问题,错误信息中显示在尝试创建一个名为

__consumer_offsets

的主题,该主题的复制因子(replicationFactor)被设置为

3

,但是Kafka集群中当前可用的

Broker

数量只有

1

个。在Kafka中,复制因子表示一个分区的数据将被复制到多少个Broker上,以提供数据冗余和容错能力。

要解决这个问题,可以采取以下几种方式之一:

增加Broker的数量:确保你的Kafka集群中有至少

3

个活跃的Broker。这可以通过启动更多的Kafka实例并将其配置加入到集群中来实现。

减少复制因子:如果你暂时无法增加Broker的数量,可以考虑减少__consumer_offsets主题的复制因子,使其小于或等于当前集群中的Broker数量。例如,如果只有一个Broker,那么复制因子应该设为1。

检查Broker的健康状态:确保所有的Broker都处于正常运行状态,没有Broker因为故障而离线。可以使用Kafka的管理命令或者监控工具检查Broker的状态。

我的 Kafak 服务配置了环境变量:

# 平替默认配置文件里面的值 TOPIC_REPLICATION_FACTOR 为集群中节点个数 KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR:1# 控制着与事务相关的日志主题的复制因子 KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:1KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR:1

在报错的时候,这里

KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR

配置的是3,集群中只有一个节点,所以报错了,现在改成1;

在修改了上述设置后,你可能需要重新创建或更新__consumer_offsets主题, 这里直接删除 zookeeper 的数据,重新启动 zookeeper ,然后再重新创建 Kafka 服务。这时候生产者发送消息、消费者消费消息都可以正常工作了。

标签: docker zookeeper kafka

本文转载自: https://blog.csdn.net/linmengmeng_1314/article/details/138412264
版权归原作者 linmengmeng_1314 所有, 如有侵权,请联系我们删除。

“使用 Docker-Compose 部署 ZooKeeper + Kafka + Kafka-UI”的评论:

还没有评论