0


Ubuntu操作Kafka简单说明

kafka服务启动

**1、先启动

  1. ZooKeeper

服务(

  1. /usr/local/kafka

):**

  1. bin/zookeeper-server-start.sh config/zookeeper.properties

**2、启动

  1. kafka

服务:**

  1. bin/kafka-server-start.sh config/server.properties

消息路由

上行: 中控至虚幻4 。

下行: 虚幻4至中控 。

消息主题

**

  1. CC2UE

:** 上行消息主题 。

**

  1. UE2CC

:** 下行消息主题 。

主题操作

1、创建主题

  1. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1--partitions1--topic CC2UE
  1. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1--partitions1--topic UE2CC

2、删除主题

  1. bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic CC2UE
  1. bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic UE2CC

3、列出所有主题(查看所有主题)

  1. bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

客户端测试

生产者测试

打开

  1. Producer

(生产者)服务,然后键入(输入)发送信息即可。

  1. bin/kafka-console-producer.sh --broker-list localhost:9092 --topictest
  1. ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$
  2. ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$ bin/kafka-console-producer.sh --broker-list 192.168.165.242:9092 --topictest>hello
  3. >world
  4. >

消费者测试

打开

  1. Customer

(消费者)服务:

  1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topictest --from-beginning
  1. ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$
  2. ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$ bin/kafka-console-consumer.sh --bootstrap-server ubuntu:9092 --topictest --from-beginning
  3. hello
  4. world

消息生命周期设置

关于消息保留时长(消息生命周期)相关内容,请参考《kafka消息保留机制》章节。

设置消息保留时长

编辑 **

  1. "/usr/local/kafka/kafka_2.12-3.0.0/config/server.properties"

** 配置文件的 **

  1. Log Retention Policy

** 部分内容,修改如下两个参数:

  1. log.retention.ms
  2. 消息保留时长,单位是:毫秒
  3. log.retention.check.interval.ms
  4. 清理器检查日志符合被删除条件的轮询时间
  5. 单位是:毫秒
  6. 10000(10 秒)

部分截图:

  1. ############################ Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age#log.retention.hours=168log.retention.ms=5# A size-based retention policy for logs. Segments are pruned from the log unless the remaining# segments drop below log.retention.bytes. Functions independently of log.retention.hours.#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policies#log.retention.check.interval.ms=300000log.retention.check.interval.ms=10000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).

kafka消息保留机制

运行机制

生产者保存到

  1. broker

中的消息,会保存在本地的

  1. logs/__consumer_offsets-xx/00000000000000000000.log

文件中。

默认情况,这些文件不会永久保留,当超过了保留时间或体积后,

  1. kafka

会对这些文件进行删除。

首先,根据

  1. log.retention

条件判断,以

  1. segment

为单位,判断该

  1. segment

是否为可删除。

如果满足条件,将其标记为可删除。并且在日志文件

  1. cleaner-offset-checkpoint

中记录当前清理到的位置。

由组件

  1. LogCleaner

实现,将

  1. null

写入该

  1. log

文件中,这样该文件就被置空了。注:此处可再展开。

时间控制参数

  1. log.retention.bytes
  2. 当文件大于该值后,会删除该文件
  1. log.retention.hours
  2. 当保留时间超过该时间后,删除文件
  3. The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property
  4. 单位:小时
  5. 1687天)
  1. log.retention.minutes
  2. 当保留时间超过该时间后,删除文件。
  1. log.retention.ms
  2. 当保留时间超过该时间后,删除文件。
  1. log.retention.check.interval.ms
  2. 清理器检查日志符合被删除条件的轮询时间
  3. 单位是:毫秒
  4. 300000(5 minutes)
  1. log.segment.bytes=1024
  2. 该值是每个日志片段的体积,如果超过该体积,就会新建一个日志。非本次实验重点。

参数间关系

  1. 时间参数优先级
  2. ms>minutes>hours
  3. 当时间参数与空间参数都存在时,谁先满足,就执行谁。
  4. 例:
  5. log.retentions.hours=168log.retentions.bytes=1024
  6. 当文件体积达到1024后,即便没有超过168小时,也会删除该文件。

实验

  1. log.retention.hours=168log.retention.bytes=1024log.segment.bytes=1024log.retention.check.interval.ms=10000

分段体积超过了

  1. log.segment.bytes

=1024,所以产生了新的日志。

时间超过7天或者体积超过1024 ,检测到可被删除的段。

kafka可视化

Kafka可视化工具有很多种,常见的有Kafka Monitor、Kafka Manager(CMAK)、Kafka Eagle等。下文就介绍Kafka Manager的部署和配置。CMAK参考github官网说明:

  1. https://github.com/yahoo/CMAK

准备工作

注意: cmak从 3.0.0.2版本开始使用java 11编译,所以启动java环境也必须是java11的,要不然会报版本不适应错误。我的服务器上使用的jdk版本是1.8。

查看java版本:

  1. ubuntu@ubuntu:~$ java-version
  2. openjdk version "1.8.0_342"
  3. OpenJDK Runtime Environment (build 1.8.0_342-8u342-b07-0ubuntu1~20.04-b07)
  4. OpenJDK 64-Bit Server VM (build 25.342-b07, mixed mode)
  5. ubuntu@ubuntu:~$

升级jdk

将 java8 升级到 java11。

ubuntu安装jdk11

1、直接在线安装

  1. apt-cache search java11

2、选择安装的jdk版本

  1. sudoapt-getinstall openjdk-11-jdk
  1. oot@ubuntu:/home/ubuntu# apt-get install openjdk-11-jdk
  2. Reading package lists... Done
  3. Building dependency tree
  4. Reading state information... Done
  5. The following additional packages will be installed:
  6. openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
  7. Suggested packages:
  8. openjdk-11-demo openjdk-11-source visualvm fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei | fonts-wqy-zenhei
  9. The following NEW packages will be installed:
  10. openjdk-11-jdk openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
  11. 0 upgraded, 4 newly installed, 0 to remove and 538 not upgraded.
  12. Need to get 263 MB of archives.
  13. After this operation, 407 MB of additional disk space will be used.
  14. Do you want to continue? [Y/n] Y
  15. Get:1 http://cn.archive.ubuntu.com/ubuntu focal-updates/main amd64 openjdk-11-jre-headless amd64 11.0.16+8-0ubuntu1~20.04 [37.4 MB]1% [1 openjdk-11-jre-headless 1,944 kB/37.4 MB 5%]

3、查看安装后的版本

  1. root@ubuntu:/home/ubuntu# java -version
  2. openjdk version "11.0.16"2022-07-19
  3. OpenJDK Runtime Environment (build 11.0.16+8-post-Ubuntu-0ubuntu120.04)
  4. OpenJDK 64-Bit Server VM (build 11.0.16+8-post-Ubuntu-0ubuntu120.04, mixed mode, sharing)
  5. root@ubuntu:/home/ubuntu#

部署CMAK

1、CMAK下载

可以从

  1. https://github.com/yahoo/CMAK/tree/master

主页获取最新的

  1. CMAK
  1. Releases

版本,并下载。

或者

  1. wget https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip

我下载时cmak最新版本已经是3.0.0.6了,我下载的3.0.0.5版本。

  1. ubuntu@ubuntu:~$ wget https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip
  2. --2022-09-08 10:04:21-- https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip
  3. Resolving github.com (github.com)... 20.205.243.166
  4. Connecting to github.com (github.com)|20.205.243.166|:443... connected.

2、解压

  1. unzip cmak-3.0.0.5.zip
  1. sudomv cmak-3.0.0.5 /usr/local/kafka/

3、配置CMAK

编辑

  1. /usr/local/kafka/cmak-3.0.0.5/conf/application.conf

配置文件。

编辑

  1. application.conf

文件:

  1. root@ubuntu:/usr/local/kafka/cmak-3.0.0.5/conf# cat application.conf # Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0# See accompanying LICENSE file.# This is the main configuration file for the application.# ~~~~~# Secret key# ~~~~~# The secret key is used to secure cryptographics functions.# If you deploy your application to several instances be sure to use the same key!play.crypto.secret="^<csmm5Fx4d=r2HEX8pelM3iBkFVv?k[mc;IZE<_Qoq8EkX_/7@Zt6dP05Pzea3U"play.crypto.secret=${?APPLICATION_SECRET}play.http.session.maxAge="1h"# The application languages# ~~~~~play.i18n.langs=["en"]
  2. play.http.requestHandler ="play.http.DefaultHttpRequestHandler"
  3. play.http.context ="/"play.application.loader=loader.KafkaManagerLoader
  4. # Settings prefixed with 'kafka-manager.' will be deprecated, use 'cmak.' instead.# https://github.com/yahoo/CMAK/issues/713#kafka-manager.zkhosts="kafka-manager-zookeeper:2181"#kafka-manager.zkhosts=${?ZK_HOSTS}cmak.zkhosts="localhost:2181"###填写zookeeper地址cmak.zkhosts=${?ZK_HOSTS}
  5. pinned-dispatcher.type="PinnedDispatcher"
  6. pinned-dispatcher.executor="thread-pool-executor"application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature", "KMScheduleLeaderElectionFeature"]
  7. akka {
  8. loggers =["akka.event.slf4j.Slf4jLogger"]
  9. loglevel ="INFO"}
  10. akka.logger-startup-timeout = 60s
  11. basicAuthentication.enabled=true ###登陆WEB UI需要账户登陆basicAuthentication.enabled=${?KAFKA_MANAGER_AUTH_ENABLED}basicAuthentication.ldap.enabled=false
  12. basicAuthentication.ldap.enabled=${?KAFKA_MANAGER_LDAP_ENABLED}basicAuthentication.ldap.server=""basicAuthentication.ldap.server=${?KAFKA_MANAGER_LDAP_SERVER}basicAuthentication.ldap.port=389basicAuthentication.ldap.port=${?KAFKA_MANAGER_LDAP_PORT}basicAuthentication.ldap.username=""basicAuthentication.ldap.username=${?KAFKA_MANAGER_LDAP_USERNAME}basicAuthentication.ldap.password=""basicAuthentication.ldap.password=${?KAFKA_MANAGER_LDAP_PASSWORD}
  13. basicAuthentication.ldap.search-base-dn=""
  14. basicAuthentication.ldap.search-base-dn=${?KAFKA_MANAGER_LDAP_SEARCH_BASE_DN}
  15. basicAuthentication.ldap.search-filter="(uid=$capturedLogin$)"
  16. basicAuthentication.ldap.search-filter=${?KAFKA_MANAGER_LDAP_SEARCH_FILTER}
  17. basicAuthentication.ldap.group-filter=""
  18. basicAuthentication.ldap.group-filter=${?KAFKA_MANAGER_LDAP_GROUP_FILTER}
  19. basicAuthentication.ldap.connection-pool-size=10
  20. basicAuthentication.ldap.connection-pool-size=${?KAFKA_MANAGER_LDAP_CONNECTION_POOL_SIZE}basicAuthentication.ldap.ssl=false
  21. basicAuthentication.ldap.ssl=${?KAFKA_MANAGER_LDAP_SSL}
  22. basicAuthentication.ldap.ssl-trust-all=false
  23. basicAuthentication.ldap.ssl-trust-all=${?KAFKA_MANAGER_LDAP_SSL_TRUST_ALL}basicAuthentication.username="admin"###登陆用户名basicAuthentication.username=${?KAFKA_MANAGER_USERNAME}basicAuthentication.password="password"###登陆密码basicAuthentication.password=${?KAFKA_MANAGER_PASSWORD}basicAuthentication.realm="Kafka-Manager"basicAuthentication.excluded=["/api/health"]# ping the health of your instance without authentification
  24. kafka-manager.consumer.properties.file=${?CONSUMER_PROPERTIES_FILE}
  25. root@ubuntu:/usr/local/kafka/cmak-3.0.0.5/conf#

4、启动CMAK服务

进入到

  1. /usr/local/kafka/cmak-3.0.0.5

目录,启动

  1. CMAK

  1. bin/cmak

5、切换端口

CMAK默认的端口是9000,如果需要更换端口,则执行如下命令:

  1. $ bin/cmak -Dconfig.file=/path/to/application.conf -Dhttp.port=8080

6、WEB登陆

浏览器输入

  1. http://192.168.165.241:9000

输入用户名密码

  1. admin
  2. password

7、通过WEB UI创建相关资源

不需要再通过终端创建资源了。

故障诊断

kafka集群修改IP地址

在日常维护过程中,有时需要把

  1. kafka

集群的IP地址进行修改,这个时候中控软件

  1. kafka

客户端会异常,连接不上

  1. kafka

集群。

为了保证修改

  1. kafka

集群IP地址后,客户端正常运行,需要保证客户端所在节点

  1. hosts

文件需要同步更新。

  1. C:\Windows\System32\drivers\etc\hosts

文件中的节点域名映射需要与

  1. kafka

集群域名IP地址保持一致,否则出错。

标签: ubuntu kafka

本文转载自: https://blog.csdn.net/mageriletu2012/article/details/140228899
版权归原作者 码村长 所有, 如有侵权,请联系我们删除。

“Ubuntu操作Kafka简单说明”的评论:

还没有评论