kafka服务启动
**1、先启动
ZooKeeper
服务(
/usr/local/kafka
):**
bin/zookeeper-server-start.sh config/zookeeper.properties
**2、启动
kafka
服务:**
bin/kafka-server-start.sh config/server.properties
消息路由
上行: 中控至虚幻4 。
下行: 虚幻4至中控 。
消息主题
**
CC2UE
:** 上行消息主题 。
**
UE2CC
:** 下行消息主题 。
主题操作
1、创建主题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1--partitions1--topic CC2UE
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1--partitions1--topic UE2CC
2、删除主题
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic CC2UE
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic UE2CC
3、列出所有主题(查看所有主题)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
客户端测试
生产者测试
打开
Producer
(生产者)服务,然后键入(输入)发送信息即可。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topictest
ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$
ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$ bin/kafka-console-producer.sh --broker-list 192.168.165.242:9092 --topictest>hello
>world
>
消费者测试
打开
Customer
(消费者)服务:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topictest --from-beginning
ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$
ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$ bin/kafka-console-consumer.sh --bootstrap-server ubuntu:9092 --topictest --from-beginning
hello
world
消息生命周期设置
关于消息保留时长(消息生命周期)相关内容,请参考《kafka消息保留机制》章节。
设置消息保留时长
编辑 **
"/usr/local/kafka/kafka_2.12-3.0.0/config/server.properties"
** 配置文件的 **
Log Retention Policy
** 部分内容,修改如下两个参数:
log.retention.ms
消息保留时长,单位是:毫秒
log.retention.check.interval.ms
清理器检查日志符合被删除条件的轮询时间
单位是:毫秒
10000(10 秒)
部分截图:
############################ Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age#log.retention.hours=168log.retention.ms=5# A size-based retention policy for logs. Segments are pruned from the log unless the remaining# segments drop below log.retention.bytes. Functions independently of log.retention.hours.#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policies#log.retention.check.interval.ms=300000log.retention.check.interval.ms=10000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
kafka消息保留机制
运行机制
生产者保存到
broker
中的消息,会保存在本地的
logs/__consumer_offsets-xx/00000000000000000000.log
文件中。
默认情况,这些文件不会永久保留,当超过了保留时间或体积后,
kafka
会对这些文件进行删除。
首先,根据
log.retention
条件判断,以
segment
为单位,判断该
segment
是否为可删除。
如果满足条件,将其标记为可删除。并且在日志文件
cleaner-offset-checkpoint
中记录当前清理到的位置。
由组件
LogCleaner
实现,将
null
写入该
log
文件中,这样该文件就被置空了。注:此处可再展开。
时间控制参数
log.retention.bytes
当文件大于该值后,会删除该文件
log.retention.hours
当保留时间超过该时间后,删除文件
The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property
单位:小时
168(7天)
log.retention.minutes
当保留时间超过该时间后,删除文件。
log.retention.ms
当保留时间超过该时间后,删除文件。
log.retention.check.interval.ms
清理器检查日志符合被删除条件的轮询时间
单位是:毫秒
300000(5 minutes)
log.segment.bytes=1024
该值是每个日志片段的体积,如果超过该体积,就会新建一个日志。非本次实验重点。
参数间关系
时间参数优先级
ms>minutes>hours
当时间参数与空间参数都存在时,谁先满足,就执行谁。
例:
log.retentions.hours=168log.retentions.bytes=1024
当文件体积达到1024后,即便没有超过168小时,也会删除该文件。
实验
log.retention.hours=168log.retention.bytes=1024log.segment.bytes=1024log.retention.check.interval.ms=10000
分段体积超过了
log.segment.bytes
=1024,所以产生了新的日志。
时间超过7天或者体积超过1024 ,检测到可被删除的段。
kafka可视化
Kafka可视化工具有很多种,常见的有Kafka Monitor、Kafka Manager(CMAK)、Kafka Eagle等。下文就介绍Kafka Manager的部署和配置。CMAK参考github官网说明:
https://github.com/yahoo/CMAK
。
准备工作
注意: cmak从 3.0.0.2版本开始使用java 11编译,所以启动java环境也必须是java11的,要不然会报版本不适应错误。我的服务器上使用的jdk版本是1.8。
查看java版本:
ubuntu@ubuntu:~$ java-version
openjdk version "1.8.0_342"
OpenJDK Runtime Environment (build 1.8.0_342-8u342-b07-0ubuntu1~20.04-b07)
OpenJDK 64-Bit Server VM (build 25.342-b07, mixed mode)
ubuntu@ubuntu:~$
升级jdk
将 java8 升级到 java11。
ubuntu安装jdk11
1、直接在线安装
apt-cache search java11
2、选择安装的jdk版本
sudoapt-getinstall openjdk-11-jdk
oot@ubuntu:/home/ubuntu# apt-get install openjdk-11-jdk
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following additional packages will be installed:
openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
Suggested packages:
openjdk-11-demo openjdk-11-source visualvm fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei | fonts-wqy-zenhei
The following NEW packages will be installed:
openjdk-11-jdk openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
0 upgraded, 4 newly installed, 0 to remove and 538 not upgraded.
Need to get 263 MB of archives.
After this operation, 407 MB of additional disk space will be used.
Do you want to continue? [Y/n] Y
Get:1 http://cn.archive.ubuntu.com/ubuntu focal-updates/main amd64 openjdk-11-jre-headless amd64 11.0.16+8-0ubuntu1~20.04 [37.4 MB]1% [1 openjdk-11-jre-headless 1,944 kB/37.4 MB 5%]
3、查看安装后的版本
root@ubuntu:/home/ubuntu# java -version
openjdk version "11.0.16"2022-07-19
OpenJDK Runtime Environment (build 11.0.16+8-post-Ubuntu-0ubuntu120.04)
OpenJDK 64-Bit Server VM (build 11.0.16+8-post-Ubuntu-0ubuntu120.04, mixed mode, sharing)
root@ubuntu:/home/ubuntu#
部署CMAK
1、CMAK下载
可以从
https://github.com/yahoo/CMAK/tree/master
主页获取最新的
CMAK
Releases
版本,并下载。
或者
wget https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip
我下载时cmak最新版本已经是3.0.0.6了,我下载的3.0.0.5版本。
ubuntu@ubuntu:~$ wget https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip
--2022-09-08 10:04:21-- https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
2、解压
unzip cmak-3.0.0.5.zip
sudomv cmak-3.0.0.5 /usr/local/kafka/
3、配置CMAK
编辑
/usr/local/kafka/cmak-3.0.0.5/conf/application.conf
配置文件。
编辑
application.conf
文件:
root@ubuntu:/usr/local/kafka/cmak-3.0.0.5/conf# cat application.conf # Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0# See accompanying LICENSE file.# This is the main configuration file for the application.# ~~~~~# Secret key# ~~~~~# The secret key is used to secure cryptographics functions.# If you deploy your application to several instances be sure to use the same key!play.crypto.secret="^<csmm5Fx4d=r2HEX8pelM3iBkFVv?k[mc;IZE<_Qoq8EkX_/7@Zt6dP05Pzea3U"play.crypto.secret=${?APPLICATION_SECRET}play.http.session.maxAge="1h"# The application languages# ~~~~~play.i18n.langs=["en"]
play.http.requestHandler ="play.http.DefaultHttpRequestHandler"
play.http.context ="/"play.application.loader=loader.KafkaManagerLoader
# Settings prefixed with 'kafka-manager.' will be deprecated, use 'cmak.' instead.# https://github.com/yahoo/CMAK/issues/713#kafka-manager.zkhosts="kafka-manager-zookeeper:2181"#kafka-manager.zkhosts=${?ZK_HOSTS}cmak.zkhosts="localhost:2181"###填写zookeeper地址cmak.zkhosts=${?ZK_HOSTS}
pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature", "KMScheduleLeaderElectionFeature"]
akka {
loggers =["akka.event.slf4j.Slf4jLogger"]
loglevel ="INFO"}
akka.logger-startup-timeout = 60s
basicAuthentication.enabled=true ###登陆WEB UI需要账户登陆basicAuthentication.enabled=${?KAFKA_MANAGER_AUTH_ENABLED}basicAuthentication.ldap.enabled=false
basicAuthentication.ldap.enabled=${?KAFKA_MANAGER_LDAP_ENABLED}basicAuthentication.ldap.server=""basicAuthentication.ldap.server=${?KAFKA_MANAGER_LDAP_SERVER}basicAuthentication.ldap.port=389basicAuthentication.ldap.port=${?KAFKA_MANAGER_LDAP_PORT}basicAuthentication.ldap.username=""basicAuthentication.ldap.username=${?KAFKA_MANAGER_LDAP_USERNAME}basicAuthentication.ldap.password=""basicAuthentication.ldap.password=${?KAFKA_MANAGER_LDAP_PASSWORD}
basicAuthentication.ldap.search-base-dn=""
basicAuthentication.ldap.search-base-dn=${?KAFKA_MANAGER_LDAP_SEARCH_BASE_DN}
basicAuthentication.ldap.search-filter="(uid=$capturedLogin$)"
basicAuthentication.ldap.search-filter=${?KAFKA_MANAGER_LDAP_SEARCH_FILTER}
basicAuthentication.ldap.group-filter=""
basicAuthentication.ldap.group-filter=${?KAFKA_MANAGER_LDAP_GROUP_FILTER}
basicAuthentication.ldap.connection-pool-size=10
basicAuthentication.ldap.connection-pool-size=${?KAFKA_MANAGER_LDAP_CONNECTION_POOL_SIZE}basicAuthentication.ldap.ssl=false
basicAuthentication.ldap.ssl=${?KAFKA_MANAGER_LDAP_SSL}
basicAuthentication.ldap.ssl-trust-all=false
basicAuthentication.ldap.ssl-trust-all=${?KAFKA_MANAGER_LDAP_SSL_TRUST_ALL}basicAuthentication.username="admin"###登陆用户名basicAuthentication.username=${?KAFKA_MANAGER_USERNAME}basicAuthentication.password="password"###登陆密码basicAuthentication.password=${?KAFKA_MANAGER_PASSWORD}basicAuthentication.realm="Kafka-Manager"basicAuthentication.excluded=["/api/health"]# ping the health of your instance without authentification
kafka-manager.consumer.properties.file=${?CONSUMER_PROPERTIES_FILE}
root@ubuntu:/usr/local/kafka/cmak-3.0.0.5/conf#
4、启动CMAK服务
进入到
/usr/local/kafka/cmak-3.0.0.5
目录,启动
CMAK
。
bin/cmak
5、切换端口
CMAK默认的端口是9000,如果需要更换端口,则执行如下命令:
$ bin/cmak -Dconfig.file=/path/to/application.conf -Dhttp.port=8080
6、WEB登陆
浏览器输入
http://192.168.165.241:9000
输入用户名密码
admin
password
7、通过WEB UI创建相关资源
不需要再通过终端创建资源了。
故障诊断
kafka集群修改IP地址
在日常维护过程中,有时需要把
kafka
集群的IP地址进行修改,这个时候中控软件
kafka
客户端会异常,连接不上
kafka
集群。
为了保证修改
kafka
集群IP地址后,客户端正常运行,需要保证客户端所在节点
hosts
文件需要同步更新。
C:\Windows\System32\drivers\etc\hosts
文件中的节点域名映射需要与
kafka
集群域名IP地址保持一致,否则出错。
版权归原作者 码村长 所有, 如有侵权,请联系我们删除。