0


Ubuntu操作Kafka简单说明

kafka服务启动

**1、先启动

ZooKeeper

服务(

/usr/local/kafka

):**

bin/zookeeper-server-start.sh config/zookeeper.properties

**2、启动

kafka

服务:**

bin/kafka-server-start.sh config/server.properties

消息路由

上行: 中控至虚幻4 。

下行: 虚幻4至中控 。

消息主题

**

CC2UE

:** 上行消息主题 。

**

UE2CC

:** 下行消息主题 。

主题操作

1、创建主题

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1--partitions1--topic CC2UE
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1--partitions1--topic UE2CC

2、删除主题

bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic CC2UE
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic UE2CC

3、列出所有主题(查看所有主题)

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

客户端测试

生产者测试

打开

Producer

(生产者)服务,然后键入(输入)发送信息即可。

bin/kafka-console-producer.sh --broker-list localhost:9092  --topictest
ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$ 
ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$ bin/kafka-console-producer.sh --broker-list 192.168.165.242:9092 --topictest>hello
>world
>

消费者测试

打开

Customer

(消费者)服务:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --topictest --from-beginning
ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$ 
ubuntu@ubuntu:/usr/local/kafka/kafka_2.12-3.0.0$ bin/kafka-console-consumer.sh --bootstrap-server ubuntu:9092 --topictest --from-beginning
hello
world

消息生命周期设置

关于消息保留时长(消息生命周期)相关内容,请参考《kafka消息保留机制》章节。

设置消息保留时长

编辑 **

"/usr/local/kafka/kafka_2.12-3.0.0/config/server.properties"

** 配置文件的 **

Log Retention Policy

** 部分内容,修改如下两个参数:

log.retention.ms
消息保留时长,单位是:毫秒

log.retention.check.interval.ms
清理器检查日志符合被删除条件的轮询时间
单位是:毫秒 
10000(10 秒)

部分截图:

############################ Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age#log.retention.hours=168log.retention.ms=5# A size-based retention policy for logs. Segments are pruned from the log unless the remaining# segments drop below log.retention.bytes. Functions independently of log.retention.hours.#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policies#log.retention.check.interval.ms=300000log.retention.check.interval.ms=10000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).

kafka消息保留机制

运行机制

生产者保存到

broker

中的消息,会保存在本地的

logs/__consumer_offsets-xx/00000000000000000000.log

文件中。

默认情况,这些文件不会永久保留,当超过了保留时间或体积后,

kafka

会对这些文件进行删除。

首先,根据

log.retention

条件判断,以

segment

为单位,判断该

segment

是否为可删除。

如果满足条件,将其标记为可删除。并且在日志文件

cleaner-offset-checkpoint

中记录当前清理到的位置。

由组件

LogCleaner

实现,将

null

写入该

log

文件中,这样该文件就被置空了。注:此处可再展开。

时间控制参数

log.retention.bytes
当文件大于该值后,会删除该文件
log.retention.hours
当保留时间超过该时间后,删除文件
The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property
单位:小时
168(7天)
log.retention.minutes
当保留时间超过该时间后,删除文件。
log.retention.ms
当保留时间超过该时间后,删除文件。
log.retention.check.interval.ms
清理器检查日志符合被删除条件的轮询时间
单位是:毫秒 
300000(5 minutes)
log.segment.bytes=1024 
该值是每个日志片段的体积,如果超过该体积,就会新建一个日志。非本次实验重点。

参数间关系

时间参数优先级

ms>minutes>hours

当时间参数与空间参数都存在时,谁先满足,就执行谁。

例:

log.retentions.hours=168log.retentions.bytes=1024

当文件体积达到1024后,即便没有超过168小时,也会删除该文件。

实验

log.retention.hours=168log.retention.bytes=1024log.segment.bytes=1024log.retention.check.interval.ms=10000

分段体积超过了

log.segment.bytes

=1024,所以产生了新的日志。

时间超过7天或者体积超过1024 ,检测到可被删除的段。

kafka可视化

Kafka可视化工具有很多种,常见的有Kafka Monitor、Kafka Manager(CMAK)、Kafka Eagle等。下文就介绍Kafka Manager的部署和配置。CMAK参考github官网说明:

https://github.com/yahoo/CMAK

准备工作

注意: cmak从 3.0.0.2版本开始使用java 11编译,所以启动java环境也必须是java11的,要不然会报版本不适应错误。我的服务器上使用的jdk版本是1.8。

查看java版本:

ubuntu@ubuntu:~$ java-version
openjdk version "1.8.0_342"
OpenJDK Runtime Environment (build 1.8.0_342-8u342-b07-0ubuntu1~20.04-b07)
OpenJDK 64-Bit Server VM (build 25.342-b07, mixed mode)
ubuntu@ubuntu:~$ 

升级jdk

将 java8 升级到 java11。

ubuntu安装jdk11

1、直接在线安装

apt-cache search java11

2、选择安装的jdk版本

sudoapt-getinstall openjdk-11-jdk
oot@ubuntu:/home/ubuntu# apt-get install openjdk-11-jdk
Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following additional packages will be installed:
  openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
Suggested packages:
  openjdk-11-demo openjdk-11-source visualvm fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei | fonts-wqy-zenhei
The following NEW packages will be installed:
  openjdk-11-jdk openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
0 upgraded, 4 newly installed, 0 to remove and 538 not upgraded.
Need to get 263 MB of archives.
After this operation, 407 MB of additional disk space will be used.
Do you want to continue? [Y/n] Y
Get:1 http://cn.archive.ubuntu.com/ubuntu focal-updates/main amd64 openjdk-11-jre-headless amd64 11.0.16+8-0ubuntu1~20.04 [37.4 MB]1% [1 openjdk-11-jre-headless 1,944 kB/37.4 MB 5%]

3、查看安装后的版本

root@ubuntu:/home/ubuntu# java -version
openjdk version "11.0.16"2022-07-19
OpenJDK Runtime Environment (build 11.0.16+8-post-Ubuntu-0ubuntu120.04)
OpenJDK 64-Bit Server VM (build 11.0.16+8-post-Ubuntu-0ubuntu120.04, mixed mode, sharing)
root@ubuntu:/home/ubuntu#

部署CMAK

1、CMAK下载

可以从

https://github.com/yahoo/CMAK/tree/master

主页获取最新的

CMAK
Releases

版本,并下载。

或者

wget https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip

我下载时cmak最新版本已经是3.0.0.6了,我下载的3.0.0.5版本。

ubuntu@ubuntu:~$ wget https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip
--2022-09-08 10:04:21--  https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.

2、解压

unzip cmak-3.0.0.5.zip
sudomv cmak-3.0.0.5 /usr/local/kafka/

3、配置CMAK

编辑

/usr/local/kafka/cmak-3.0.0.5/conf/application.conf

配置文件。

编辑

application.conf

文件:

root@ubuntu:/usr/local/kafka/cmak-3.0.0.5/conf# cat application.conf # Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0# See accompanying LICENSE file.# This is the main configuration file for the application.# ~~~~~# Secret key# ~~~~~# The secret key is used to secure cryptographics functions.# If you deploy your application to several instances be sure to use the same key!play.crypto.secret="^<csmm5Fx4d=r2HEX8pelM3iBkFVv?k[mc;IZE<_Qoq8EkX_/7@Zt6dP05Pzea3U"play.crypto.secret=${?APPLICATION_SECRET}play.http.session.maxAge="1h"# The application languages# ~~~~~play.i18n.langs=["en"]

play.http.requestHandler ="play.http.DefaultHttpRequestHandler"
play.http.context ="/"play.application.loader=loader.KafkaManagerLoader

# Settings prefixed with 'kafka-manager.' will be deprecated, use 'cmak.' instead.# https://github.com/yahoo/CMAK/issues/713#kafka-manager.zkhosts="kafka-manager-zookeeper:2181"#kafka-manager.zkhosts=${?ZK_HOSTS}cmak.zkhosts="localhost:2181"###填写zookeeper地址cmak.zkhosts=${?ZK_HOSTS}

pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature", "KMScheduleLeaderElectionFeature"]

akka {
  loggers =["akka.event.slf4j.Slf4jLogger"]
  loglevel ="INFO"}

akka.logger-startup-timeout = 60s

basicAuthentication.enabled=true                                          ###登陆WEB UI需要账户登陆basicAuthentication.enabled=${?KAFKA_MANAGER_AUTH_ENABLED}basicAuthentication.ldap.enabled=false
basicAuthentication.ldap.enabled=${?KAFKA_MANAGER_LDAP_ENABLED}basicAuthentication.ldap.server=""basicAuthentication.ldap.server=${?KAFKA_MANAGER_LDAP_SERVER}basicAuthentication.ldap.port=389basicAuthentication.ldap.port=${?KAFKA_MANAGER_LDAP_PORT}basicAuthentication.ldap.username=""basicAuthentication.ldap.username=${?KAFKA_MANAGER_LDAP_USERNAME}basicAuthentication.ldap.password=""basicAuthentication.ldap.password=${?KAFKA_MANAGER_LDAP_PASSWORD}
basicAuthentication.ldap.search-base-dn=""
basicAuthentication.ldap.search-base-dn=${?KAFKA_MANAGER_LDAP_SEARCH_BASE_DN}
basicAuthentication.ldap.search-filter="(uid=$capturedLogin$)"
basicAuthentication.ldap.search-filter=${?KAFKA_MANAGER_LDAP_SEARCH_FILTER}
basicAuthentication.ldap.group-filter=""
basicAuthentication.ldap.group-filter=${?KAFKA_MANAGER_LDAP_GROUP_FILTER}
basicAuthentication.ldap.connection-pool-size=10
basicAuthentication.ldap.connection-pool-size=${?KAFKA_MANAGER_LDAP_CONNECTION_POOL_SIZE}basicAuthentication.ldap.ssl=false
basicAuthentication.ldap.ssl=${?KAFKA_MANAGER_LDAP_SSL}
basicAuthentication.ldap.ssl-trust-all=false
basicAuthentication.ldap.ssl-trust-all=${?KAFKA_MANAGER_LDAP_SSL_TRUST_ALL}basicAuthentication.username="admin"###登陆用户名basicAuthentication.username=${?KAFKA_MANAGER_USERNAME}basicAuthentication.password="password"###登陆密码basicAuthentication.password=${?KAFKA_MANAGER_PASSWORD}basicAuthentication.realm="Kafka-Manager"basicAuthentication.excluded=["/api/health"]# ping the health of your instance without authentification

kafka-manager.consumer.properties.file=${?CONSUMER_PROPERTIES_FILE}
root@ubuntu:/usr/local/kafka/cmak-3.0.0.5/conf# 

4、启动CMAK服务

进入到

/usr/local/kafka/cmak-3.0.0.5

目录,启动

CMAK

bin/cmak

5、切换端口

CMAK默认的端口是9000,如果需要更换端口,则执行如下命令:

$ bin/cmak -Dconfig.file=/path/to/application.conf -Dhttp.port=8080

6、WEB登陆

浏览器输入

http://192.168.165.241:9000

输入用户名密码

admin
password

7、通过WEB UI创建相关资源

不需要再通过终端创建资源了。

故障诊断

kafka集群修改IP地址

在日常维护过程中,有时需要把

kafka

集群的IP地址进行修改,这个时候中控软件

kafka

客户端会异常,连接不上

kafka

集群。

为了保证修改

kafka

集群IP地址后,客户端正常运行,需要保证客户端所在节点

hosts

文件需要同步更新。

C:\Windows\System32\drivers\etc\hosts

文件中的节点域名映射需要与

kafka

集群域名IP地址保持一致,否则出错。

标签: ubuntu kafka

本文转载自: https://blog.csdn.net/mageriletu2012/article/details/140228899
版权归原作者 码村长 所有, 如有侵权,请联系我们删除。

“Ubuntu操作Kafka简单说明”的评论:

还没有评论