随笔记录
1. 安装zookeeper
docker会自动拉取对应镜像
# docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime -e TZ=Asia/Shanghai wurstmeister/zookeeper
[root@localhost Docker-Compose-Master]# mkdir zookeeper
[root@localhost Docker-Compose-Master]# ls
docker-compose.yml kafka zookeeper
[root@localhost Docker-Compose-Master]# cd zookeeper/
[root@localhost zookeeper]# ls
[root@localhost zookeeper]#
[root@localhost zookeeper]# docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime -e TZ=Asia/Shanghai wurstmeister/zookeeper
Unable to find image 'wurstmeister/zookeeper:latest' locally
latest: Pulling from wurstmeister/zookeeper
a3ed95caeb02: Pull complete
ef38b711a50f: Pull complete
e057c74597c7: Pull complete
666c214f6385: Pull complete
c3d6a96f1ffc: Pull complete
3fe26a83e0ca: Pull complete
3d3a7dd3a3b1: Pull complete
f8cc938abe5f: Pull complete
9978b75f7a58: Pull complete
4d4dbcc8f8cc: Pull complete
8b130a9baa49: Pull complete
6b9611650a73: Pull complete
5df5aac51927: Pull complete
76eea4448d9b: Pull complete
8b66990876c6: Pull complete
f0dd38204b6f: Pull complete
Digest: sha256:7a7fd44a72104bfbd24a77844bad5fabc86485b036f988ea927d1780782a6680
Status: Downloaded newer image for wurstmeister/zookeeper:latest
8dbbc5f4768e37b6049e7830e2c233476b629bdf3bafdf2eef9b0d2eb127b6c2
[root@localhost zookeeper]#
======================================================================================
# 以上命令参数解释说明:
docker run:用于创建并启动一个新的容器
--name zookeeper:指定容器的名称为 "zookeeper"
-p 2181:2181:将主机的端口 2181 映射到容器的端口 2181,允许从主机上的其他应用程序访问 ZooKeeper 服务
-v /etc/localtime:/etc/localtime:将主机的系统时间配置文件挂载到容器内,以便容器内的时间与主机保持同步
-e TZ=Asia/Shanghai: 设置容器系统时区
wurstmeister/zookeeper:指定要使用的容器镜像
总结:
执行该命令后,Docker 将使用 wurstmeister/zookeeper 镜像创建一个名为 "zookeeper" 的容器。ZooKeeper 是一个开源的分布式协调服务,该容器提供了运行 ZooKeeper 服务器所需的环境。
通过将主机的端口 2181 映射到容器的端口 2181,可以轻松地访问在容器中运行的 ZooKeeper 服务。使用 -v 参数将主机的系统时间配置文件挂载到容器内,可以确保容器的时间与主机保持一致。
这条命令执行后,ZooKeeper 容器将在后台运行,并且您可以使用 docker ps 命令来查看正在运行的容器。
2. 安装Kafka
2.1 拉取kafka image
# 拉取kafka 镜像
# docker pull wuristmeister/kafka
[root@localhost kafka]# pwd
/home/magx/Docker-Compose-Master/kafka
[root@localhost kafka]#
[root@localhost kafka]# ll
total 8
-rw-r--r--. 1 root root 3112 Dec 4 17:48 docker-compose-kafka.yml
drwxr-xr-x. 5 root root 4096 Dec 4 16:40 kafka-docker
[root@localhost kafka]#
[root@localhost kafka]# docker pull wurstmeister/kafka
Using default tag: latest
latest: Pulling from wurstmeister/kafka
42c077c10790: Pull complete
44b062e78fd7: Pull complete
b3ba9647f279: Pull complete
10c9a58bd495: Pull complete
ed9bd501c190: Pull complete
03346d650161: Pull complete
539ec416bc55: Pull complete
Digest: sha256:2d4bbf9cc83d9854d36582987da5f939fb9255fb128d18e3cf2c6ad825a32751
Status: Downloaded newer image for wurstmeister/kafka:latest
docker.io/wurstmeister/kafka:latest
[root@localhost kafka]#
======================================================================================
# 以上命令参数解释说明:
docker pull:用于从 Docker 镜像仓库中拉取(下载)一个镜像
wurstmeister/kafka:要拉取的镜像的名称
总结:
执行该命令后,Docker 将尝试从 Docker 镜像仓库中下载名为 wurstmeister/kafka 的镜像。这个镜像是由 wurstmeister 团队维护的 Kafka 镜像,Kafka 是一个流行的分布式流处理平台。
注意:
执行该命令需要在网络环境良好的情况下,并且 Docker 需要与 Docker 镜像仓库建立连接。下载完成后,可以使用 docker images 命令来查看已下载的镜像列表,确认 wurstmeister/kafka 镜像已成功下载
2.2 查询本地docker images
# 查询本地docker 镜像文件
# docker images
[root@localhost kafka]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
hello-world latest 9c7a54a9a43c 7 months ago 13.3kB
wurstmeister/kafka latest a692873757c0 19 months ago 468MB
wurstmeister/zookeeper latest 3f43f72cb283 4 years ago 510MB
[root@localhost kafka]#
[root@localhost kafka]#
2.3 查看本地 容器(docker container)
2.3.1 查看本地已启动的 docker container
# 查询本地已启动docker 容器
# docker ps
[root@localhost kafka]#
[root@localhost kafka]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost kafka]#
2.3.2 查看所有容器的列表,包括已停止的容器。
# 查看本地所有 docker container
# docker ps -a 命令来查看所有容器的列表,包括已停止的容器。
它会显示容器的 ID、状态、创建时间等信息。
[root@localhost kafka]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
913b2a1d7f07 wurstmeister/kafka "start-kafka.sh" 11 minutes ago Exited (143) 8 minutes ago kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
[root@localhost kafka]#
2.3.3 停止的启动的某个容器
# 停止某个容器
# docker stop <container_id>
[root@localhost ~]# docker ps # 停止前,查询已启动容器list
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 29 hours ago Up 29 hours 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost ~]#
[root@localhost ~]# docker stop b03ba55d79cb # 停止kafka 容器
b03ba55d79cb
[root@localhost ~]#
# 停止后,查询已启动容器list
[root@localhost ~]#
[root@localhost ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost ~]#
# 停止后,查询所有容器list
[root@localhost ~]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 29 hours ago Exited (143) 8 seconds ago kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
[root@localhost ~]#
2.3.4 启动某个容器
# 启动某个容器
# docker start <container_id or container_name>
[root@localhost ~]# docker start kafka # 以container_name 启动容器
kafka
[root@localhost ~]#
[root@localhost ~]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 29 hours ago Up 3 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost ~]#
2.4 删除指定容器
# 要删除某个 Docker 容器,您可以使用 docker rm 命令,并提供要删除的容器的标识符或名称作为参数
# docker rm <CONTAINER_ID> # 容器标识符: container_ID
or
# docker rm <CONTAINER_NAME> #容器名称: container_name
# 删除前查询本地所有docker 容器
[root@localhost kafka]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
913b2a1d7f07 wurstmeister/kafka "start-kafka.sh" 19 minutes ago Exited (143) 16 minutes ago kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
[root@localhost kafka]#
[root@localhost kafka]#
# 删除指定docker 容器
[root@localhost kafka]#
[root@localhost kafka]# docker rm 913b2a1d7f07 # docker rm <container_ID>
913b2a1d7f07
[root@localhost kafka]#
# 删除容器后,再次查询本地所有容器,不再显示已删除的容器
[root@localhost kafka]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
[root@localhost kafka]#
2.5 启动kafka 镜像
2.5.0 挂在自定义配置文件
# 为避免后期修改kafka 容器配置文件,直接挂在配置文件
1)本地创建同名 配置文件 consumer.properties
2)修改配置文件中 bootstrap.server=<容器所在主机IP:9092>
3) 执行 docker run -v <本地同名配置文件路径>:<容器挂在配置文件目录>
[root@localhost kafka]# pwd
/home/magx/Docker-Compose-Master/kafka
[root@localhost kafka]#
[root@localhost kafka]# ll
total 12
-rw-r--r--. 1 root root 1224 Dec 25 14:40 consumer.properties # 本地同名配置文件
-rw-r--r--. 1 root root 3112 Dec 4 17:48 docker-compose-kafka.yml
drwxr-xr-x. 5 root root 4096 Dec 4 16:40 kafka-docker
[root@localhost kafka]#
[root@localhost kafka]#
2.5.1 启动kafaka container
#启动kakfa 容器
[root@localhost kafka]#
[root@localhost kafka]# docker run -d --name kafka -v /etc/localtime:/etc/localtime:ro -p 9092:9092 -v /home/magx/Docker-Compose-Master/kafka/consumer.properties:/opt/kafka/config/consumer.properties -e TZ=Asia/Shanghai --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.247:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
caac221a5a61b11a524f4c2601e02f300cb7229b69f4667d600c3827443be312
[root@localhost kafka]#
[root@localhost kafka]#
或者:
docker run -d --name kafka -v /etc/localtime:/etc/localtime:ro -p 9092:9092 -e TZ=Asia/Shanghai --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
======================================================================================
通过以上命令,我们链接了ZooKeeper容器,并且设置了几个环境变量来配置Kafka。
在这个命令中:
--name kafka: 设置容器的名字为“kafka”。
-v /etc/localtime:/etc/localtime:ro 文件挂载到容器内的相应位置。这将使容器内部的时间与主机系统时间保持一致;
ro选项将/etc/localtime文件挂载为只读模式,以防止容器内部意外修改主机系统时间
-v /home/magx/Docker-Compose-Master/kafka/consumer.properties:/opt/kafka/config/consumer.properties
本地同名文件所在目录 : 容器中挂在目录
-p 9092:9092: 将容器的9092端口映射到宿主机的9092端口。
-e TZ=Asia/Shanghai: 设置容器系统时区
--link zookeeper:zookeeper: 连接到名为“zookeeper”的另一个Docker容器,并且在当前的容器中可以通过zookeeper这个别名来访问它。
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181: 设置环境变量,指定ZooKeeper的连接字符串。
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092: 设置环境变量,指定Kafka的advertised listeners。
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 设置环境变量,指定Kafka的listeners。
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1: 设置环境变量,指定offsets topic的副本因子。
wurstmeister/kafka: 使用的Docker镜像名字
确保在运行这个命令之前ZooKeeper容器已经在运行,并且可以通过zookeeper:2181来访问。
如果你的ZooKeeper容器有一个不同的名字或者你使用的是不同的网络设置,需要相应地调整--link和KAFKA_ZOOKEEPER_CONNECT的值
2.5.2 验证kafka 容器已启动
# docker ps # 查询 已启动 docker container
#docker ps -a # 查询 所有 docker container
[root@localhost kafka]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 27 seconds ago Up 26 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost kafka]#
[root@localhost kafka]#
[root@localhost kafka]# docker ps -a # 查询所有 docker container
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 41 seconds ago Up 40 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
[root@localhost kafka]#
2.6 创建测试主题
2.6.1 进入kafka容器
# 进入kafka容器
# docker exec -it kafka /bin/bash
[root@localhost kafka]#
[root@localhost kafka]# docker exec -it kafka /bin/bash
root@b03ba55d79cb:/#
root@b03ba55d79cb:/#
2.6.2 创建topic
# 在Kafka容器中,运行以下命令创建一个测试主题
# 进入kafka 容器后,创建topic
# kafka-topics.sh --create --topic <topic_name> --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181
[root@localhost kafka]# docker exec -it kafka /bin/bash # 进入kafka 容器
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# kafka-topics.sh --create --topic test1221 --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181 # 创建 topic
Created topic test1221.
root@b03ba55d79cb:/#
注意: 如果topic 包含 . 或者 _ 时,在执行创建topic过程中,会出现一个警告信息,提示主题名称中句点和下划线的使用限制。最后,命令成功执行并显示创建主题的结果
root@b03ba55d79cb:/# kafka-topics.sh --create --topic "alarm_warning" --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic alarm_warning.
root@b03ba55d79cb:/#
注意:
在执行命令之后,出现了一个警告信息,提示由于指标名称的限制,主题名称中的句点('.')或下划线('_')可能会发生冲突。为了避免问题,最好只使用其中一种符号,而不是同时使用
2.6.3 查询已创建的topic
# 查询已创建的所有topic
[root@localhost grafana_loki_vector]# docker exec -it kafka /bin/bash # 进入容器
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# kafka-topics.sh --list --zookeeper zookeeper:2181 # 查询topic
__consumer_offsets
alarm_warning
mag_test
test
test1221
test2013
test2023
test20231221
root@b03ba55d79cb:/#
2.6.4 在创建的主题中生产消息
# 在创建的主题中,生产kafka 消息
# kafka-console-producer.sh --broker-list localhost:9092 --topic <主题名>
[root@localhost kafka]# docker exec -it kafka /bin/bash
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# kafka-topics.sh --create --topic test1221 --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181
Created topic test1221.
root@b03ba55d79cb:/#
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# kafka-console-producer.sh --broker-list localhost:9092 --topic test1221
>hell ka^H^H
>hello kafka-122^H^H
>
>hello kafka-20131221
>
>topci test2023? y/n
>topci test1221!
>e\^H
>e
>
>^Croot@b03ba55d79cb:/# #通过 Ctrl + C 退出kafka 生产者
root@b03ba55d79cb:/#
2.6.5 kafka 消费者消费消息
在另一个终端窗口中后,操作如下:
1)需要先进入kakfa 容器
# docker exec -it kafka /bin/bash
2)打开一个消费者来读取测试主题的消息
#kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <主题名> --from-beginning
--from-beginning 为可选参数: 每次消费 该主题所有消息
不带 --from-beginning 参数: 每次仅仅消费启动kafka 消费者后,该主题最新的消息
ex: kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <主题名>
注意: kafka 消费者消费消息之前要先进入 kafka 容器
======================================================================================
[root@localhost ~]#
[root@localhost ~]# docker exec -it kafka /bin/bash #进入kafka 容器
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1221 --from-beginning
hell ka
hello kafka-122
hello kafka-20131221
topci test2023? y/n
topci test1221!
e\
e
^CProcessed a total of 11 messages #通过 Ctrl + C 退出kafka 消费者
root@b03ba55d79cb:/#
到此,基本完成使用Docker运行ZooKeeper和Kafka,并进行基本验证的过程。
3 安装完kafka 容器后,修改docker 中kafka 容器配置文件
3.1 进入kafka 容器
# 进入kafka 容器
# docker exec -it <container_name_or_id> /bin/bash
[root@localhost ~]# docker exec -it kafka /bin/bash
root@b03ba55d79cb:/#
3.2 修改配置文件
3.2.1 安装 vim
# 在一些Docker镜像中,可能没有预安装vi编辑器。你可以使用其他可用的编辑器来修改Kafka配置文件
# apt-get update
# apt-get install vim
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# vi /opt/kafka/config/server.properties
bash: vi: command not found
root@b03ba55d79cb:/# vim /opt/kafka/config/server.properties
bash: vim: command not found
root@b03ba55d79cb:/#
# vim: 一种功能丰富的终端文本编辑器。可以使用以下命令安装并使用vim
root@b03ba55d79cb:/# apt-get update
Get:1 http://deb.debian.org/debian bullseye InRelease [116 kB]
Get:2 http://security.debian.org/debian-security bullseye-security InRelease [48.4 kB]
Get:3 https://download.docker.com/linux/debian bullseye InRelease [43.3 kB]
Get:4 http://deb.debian.org/debian bullseye-updates InRelease [44.1 kB]
Get:5 http://security.debian.org/debian-security bullseye-security/main amd64 Packages [261 kB]
Get:6 https://download.docker.com/linux/debian bullseye/stable amd64 Packages [28.1 kB]
Get:7 http://deb.debian.org/debian bullseye/main amd64 Packages [8062 kB]
Get:8 http://deb.debian.org/debian bullseye-updates/main amd64 Packages [17.7 kB]
Fetched 8621 kB in 1min 57s (74.0 kB/s)
Reading package lists... Done
root@b03ba55d79cb:/#
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# apt-get install vim
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
libgpm2 vim-common vim-runtime xxd
Suggested packages:
gpm ctags vim-doc vim-scripts
The following NEW packages will be installed:
libgpm2 vim vim-common vim-runtime xxd
0 upgraded, 5 newly installed, 0 to remove and 32 not upgraded.
Need to get 8174 kB of archives.
After this operation, 36.9 MB of additional disk space will be used.
Do you want to continue? [Y/n] y
Get:1 http://deb.debian.org/debian bullseye/main amd64 xxd amd64 2:8.2.2434-3+deb11u1 [192 kB]
Get:2 http://deb.debian.org/debian bullseye/main amd64 vim-common all 2:8.2.2434-3+deb11u1 [226 kB]
Get:3 http://deb.debian.org/debian bullseye/main amd64 libgpm2 amd64 1.20.7-8 [35.6 kB]
Get:4 http://deb.debian.org/debian bullseye/main amd64 vim-runtime all 2:8.2.2434-3+deb11u1 [6226 kB]
......
......
......
3.2.2 修改配置文件
# 修改配置文件以下字段内容:
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://your_ip_address:9092
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# vim /opt/kafka/config/server.properties
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# cat /opt/kafka/config/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=-1
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# listeners=PLAINTEXT://0.0.0.0:9092
listeners=PLAINTEXT://192.168.2.247:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://192.168.2.247:9092 #//your_ip_address:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/kafka/kafka-logs-b03ba55d79cb
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=zookeeper:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
port=9092
root@b03ba55d79cb:/#
注意:修改kafka 配置文件后,重启kafka 服务即可生效。
若重启kafka 容器(比如:docker start/restart <kafka container_ID or container_Name>)会自动调用生成kafka 容器时命令(docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka )
故,若第一次创建 容器时配置写错,即便进入容器修改配置文件,只要执行 docker start / restart <container_ID or container_Name> 的命令,容器配置文件会再次恢复,修改配置不生效。
以上有效解决放法,删除容器,重新创建,具体操作 3.3 修改配置,重启重建容器
重启容器中kafka 服务,配置更改生效,为避免已后麻烦,最好直接删除容器,修改配置参数,重建容器
以上有效解决放法,删除容器,重新创建,具体操作 3.3 修改配置,重启重建容器
3.3 重启重建容器
3.3.1 停止运行的kafka 容器
# 修改配置文件后,重启kafka 容器后,再次查看配置文件,修改没有保存,为此,删除kafka 缓存
1. 停止运行的kafka 容器
2. 删除 kafka 容器
3. 重建新kafka 容器
[root@localhost ~]# docker ps # 查询运行的kafka 容器
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 31 hours ago Up 5 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost ~]#
[root@localhost ~]#
[root@localhost ~]# docker stop kafka # 停止 kafka 容器
kafka
[root@localhost ~]#
[root@localhost ~]#
备注:
重启容器命令: docker restart <container_id or container_name >
# docker restart <container_id>
[root@localhost ~]#
[root@localhost ~]# docker ps # 重启前
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 29 hours ago Up 28 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost ~]#
[root@localhost ~]#
[root@localhost ~]# docker restart b03ba55d79cb # 重启kafka 容器
b03ba55d79cb
[root@localhost ~]#
[root@localhost ~]#
[root@localhost ~]# docker ps # 重启后
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b03ba55d79cb wurstmeister/kafka "start-kafka.sh" 30 hours ago Up 3 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost ~]#
3.3.2 删除kafka 容器
[root@localhost ~]#
[root@localhost ~]# docker rm kafka # 删除 kafka 容器
kafka
[root@localhost ~]#
[root@localhost ~]# docker ps -a # 查询所有容器
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
[root@localhost ~]#
3.3.3 删除Kafka数据目录
删除Kafka数据目录:Kafka存储数据的目录通常位于容器内部的/var/lib/kafka路径。删除该目录以清除Kafka的数据缓存。注意,删除数据目录将导致所有Kafka数据的丢失,包括主题、消费者偏移量等
[root@localhost ~]#
[root@localhost ~]# rm -rf /var/lib/kafka # 删除Kafka数据目录
[root@localhost ~]#
[root@localhost ~]#
3.3.4 重建kafk 容器
# 重建kafk 容器
[root@localhost ~]#
[root@localhost ~]# docker run -d --name kafka -v /etc/localtime:/etc/localtime:ro -p 9092:9092 --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.247:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
563f64beaba4621a714fc44d6a4f81f9464fab682330eddb51a737bdeb001934
[root@localhost ~]#
[root@localhost ~]#
[root@localhost ~]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
563f64beaba4 wurstmeister/kafka "start-kafka.sh" 8 seconds ago Up 7 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 weeks ago Up 2 weeks 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
b38e9b5b6a2e hello-world "/hello" 2 weeks ago Exited (0) 2 weeks ago infallible_rosalind
[root@localhost ~]#
4. 安装完kafka 容器后,设置容器与主机时间保持一致
[root@localhost ~]# date # 设置之前,主机时间
Fri Dec 22 17:06:37 CST 2023
[root@localhost ~]#
[root@localhost ~]#
[root@localhost ~]# docker exec -it kafka /bin/bash # 进入kafka 容器
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# date # kafka container 时间
Fri Dec 22 09:06:50 UTC 2023
root@b03ba55d79cb:/#
# kafka容器内部,可以使用以下命令来设置与主机系统时间同步
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# date # 设置之后,kafka container 时间
Fri Dec 22 17:08:13 CST 2023
root@b03ba55d79cb:/#
root@b03ba55d79cb:/# exit
exit
[root@localhost ~]#
[root@localhost ~]# date # 设置之后,主机时间
Fri Dec 22 17:08:18 CST 2023
[root@localhost ~]#
注意:
为了避免后期设置时间同步问题,在创建 容器时添加 参数 保证容器与主机时间一致
-v /etc/localtime:/etc/localtime:ro
ro选项将/etc/localtime文件挂载为只读模式,以防止容器内部意外修改主机系统时间
至此,已结束kafka 部署
5. 远程连接kafka 容器失败原因定位
因远程连接kafka 容器失败,曾多次频繁删除kafka 容器,导致kafka 容器内 生产者生产的消息,消费者接收不到。
5.1 删除zookeeper 容器
[root@localhost kafka]# docker stop zookeeper
zookeeper
[root@localhost kafka]#
[root@localhost kafka]#
[root@localhost kafka]#
[root@localhost kafka]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8dbbc5f4768e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 3 weeks ago Exited (137) 6 seconds ago zookeeper
b38e9b5b6a2e hello-world "/hello" 3 weeks ago Exited (0) 3 weeks ago infallible_rosalind
[root@localhost kafka]#
[root@localhost kafka]#
[root@localhost kafka]#
[root@localhost kafka]# docker rm zookeeper
zookeeper
[root@localhost kafka]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b38e9b5b6a2e hello-world "/hello" 3 weeks ago Exited (0) 3 weeks ago infallible_rosalind
[root@localhost kafka]#
[root@localhost kafka]#
5.2 删除kafka 容器
5.3 重新创建 zookeeper 容器
# 创建 zookeeper 容器
[root@localhost kafka]# docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime -e TZ=Asia/Shanghai wurstmeister/zookeeper
a893cce0d4652933da78da1da3a64ade5e530d42fc4806eb5c8448a13305867f
[root@localhost kafka]#
[root@localhost kafka]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a893cce0d465 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 4 seconds ago Up 3 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost kafka]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a893cce0d465 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 13 seconds ago Up 12 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
b38e9b5b6a2e hello-world "/hello" 3 weeks ago Exited (0) 3 weeks ago infallible_rosalind
[root@localhost kafka]#
[root@localhost kafka]#
[root@localhost kafka]#
[root@localhost kafka]# docker ps # 查询启动容器
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a893cce0d465 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 18 seconds ago Up 17 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost kafka]#
5.4 创建 kafka 容器
#创建 kafka 容器
[root@localhost kafka]#
[root@localhost kafka]# docker run -d --name kafka -v /etc/localtime:/etc/localtime:ro -p 9092:9092 -v /home/magx/Docker-Compose-Master/kafka/consumer.properties:/opt/kafka/config/consumer.properties -e TZ=Asia/Shanghai --link zookeeper:zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.247:9092 --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
c4f7b69648a8e6fe6e3916d14c75c2a9d402d07bbff8e8eb53910834000647e6
[root@localhost kafka]#
[root@localhost kafka]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
c4f7b69648a8 wurstmeister/kafka "start-kafka.sh" 4 seconds ago Up 3 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
a893cce0d465 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" About a minute ago Up About a minute 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
[root@localhost kafka]#
[root@localhost kafka]# docker exec -it kafka /bin/bash
root@c4f7b69648a8:/#
root@c4f7b69648a8:/#
root@c4f7b69648a8:/#
root@c4f7b69648a8:/#
root@c4f7b69648a8:/#
root@c4f7b69648a8:/# kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
alarm_warning
root@c4f7b69648a8:/#
root@c4f7b69648a8:/#
5.5 创建 topic
# 创建topic
root@c4f7b69648a8:/#
root@c4f7b69648a8:/# kafka-topics.sh --create --topic kafka_consumer --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic kafka_consumer.
root@c4f7b69648a8:/#
root@c4f7b69648a8:/# kafka-topics.sh --create --topic test1 --partitions 1 --replication-factor 1 --zookeeper zookeeper:2181
Created topic test1.
root@c4f7b69648a8:/#
root@c4f7b69648a8:/#
root@c4f7b69648a8:/# kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
alarm_warning
kafka_consumer
test1
root@c4f7b69648a8:/#
root@c4f7b69648a8:/#
5.6 远程连接kafka 容器
# 远程连接配置kafka 容器所在 主机IP:9092
# kafka 消费者消费消息
# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic_name>
[root@localhost kafka]# docker exec -it kafka /bin/bash # 进入容器
root@c4f7b69648a8:/#
root@c4f7b69648a8:/# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic alarm_warning
{"currentIndex":0,"header":{"headerMap":{"35":"381166"}},"mapList":[{"10007":"1"},{"4322 0":"金额:[3000000,5205900],数量:[300000,80000],价格涨跌幅:[0.02,0.0398387],市场成交量占 比:[0.3,0.444444],区间内最低买入成交价: 63.53,昨收盘价: 62,下单数量: 50000,当前交易价格: 66,挂单数量: 0,挂单金额: 0,成交数量: 30000,成交金额: 1905900,市场成交数量: 130000","660 05":"2","28005":"2","66004":"0","44":"660000","38":"50000","28000":"20231225","28001":"1 703496682550","11001":"四川路桥","64103":"2408-4096个账户-4096个证券","93020":"1","64101 ":"2410","28101":"1565740725","91001":"830035","64102":"1101","11436":"20231225","37":"2 ","48":"600038","65098":"1","1301":"1","66003":"1","66002":"5"}]}
版权归原作者 芝麻馅汤圆儿 所有, 如有侵权,请联系我们删除。