0


用好 kafka,你不得不知的那些工具

前言

工欲善其事,必先利其器。本文主要分享一下消息中间件 kafka 安装部署的过程,以及我平时在工作中针对 kafka 用的一些客户端工具和监控工具。

kafka 部署架构

一个 kafka 集群由多个

kafka broker

组成,每个

broker

将自己的元数据信息注册到

zookeeper

中,通过

zookeeper

关联形成一个集群。

prettyZoo 客户端

既然

kafka

依赖

zookeeper

,我难免就需要看看

zookeeper

中究竟存储了

kafka

的哪些数据,这边介绍一款高颜值的客户端工具

prettyZoo

PrettyZoo

是一款基于

Apache Curator

JavaFX

实现的

Zookeeper

图形化管理客户端,使用非常简单。

下载地址:

https://github.com/vran-dev/PrettyZoo
  • 连接

  • 界面化操作zookeeper

小 tips:

kafka

部署时配置文件中配置

zookeeper

地址的时候,可以采用如下的方式,带上目录,比如

xxxx:2181/kafka

或者

xxxx:2181/kafka1

,可以避免冲突。

#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

复制代码

kafka Tool 客户端

Kafka Tool

是一个用于管理和使用

Apache Kafka

集群的

GUI

应用程序。

Kafka Tool

提供了一个较为直观的 UI 可让用户快速查看

Kafka

集群中的对象以及存储在

topic

中的消息,提供了一些专门面向开发人员和管理员的功能。

下载地址:

https://www.kafkatool.com/index.html

kafka 监控工具

kafka

自身并没有继承监控管理系统,因此对

kafka

的监控管理比较不便,好在有大量的第三方监控管理系统来使用,这里介绍一款优秀的监控工具

Kafka Eagle

,可以用监控

Kafka

集群的整体运行情况。

下载地址

https://www.kafka-eagle.org/

,部署也很简单,根据官方文档一步一步来即可。

注意,kafka 需要开启 JMX 端口,即修改

kafka

的启动命令文件

kafka-server-start.sh

,如下图:

kafka 集群部署

一、zookeeper 集群部署

  1. 上传安装包
  2. 移动到指定文件夹
mv zookeeper-3.4.6.tar.gz /opt/apps/

复制代码

  1. 解压
tar -zxvf zookeeper-3.4.6.tar.gz

复制代码

  1. 修改配置文件
  • 进入配置文件目录
cd /opt/apps/zookeeper-3.4.6/conf

复制代码

  • 修改配置文件名称
mv zoo_sample.cfg zoo.cfg

复制代码

  • 编辑配置文件 vi zoo.cfg
## zk数据保存位置dataDir=/opt/apps/data/zkdata## 集群配置, hadoop1、hadoop2、hadoop3是主机名,后面是端口,没有被占用即可server.1=hadoop1:2888:3888 server.2=hadoop2:2888:3888 server.3=hadoop3:2888:3888

复制代码

  1. 创建数据目录
mkdir -p /opt/apps/data/zkdata

复制代码

  1. 生成一个 myid 文件,内容为它的id, 表示是哪个节点。
echo 1 > /opt/apps/data/zkdata/myid

复制代码

  1. 配置环境变量
vi /etc/profile #ZOOKEEPER_HOME export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.4.6 export PATH=$PATH:$ZOOKEEPER_HOME/bin source /etc/profile

复制代码

  1. 在其他几个节点,即hadoop2, hadoop3上重复上面的步骤,但是 myid 文件的内容有所区别,分别是对应的 id。
echo 2 > /opt/apps/data/zkdata/myidecho 3 > /opt/apps/data/zkdata/myid

复制代码

  1. 启停集群
bin/zkServer.sh start zk 服务启动 bin/zkServer.sh status zk 查看服务状态bin/zkServer.sh stop zk 停止服务

复制代码

二、kafka 集群部署

  1. 官方下载地址:http://kafka.apache.org/downloads.html
  2. 上传安装包, 移动到指定文件夹
mv kafka_2.11-2.2.2.tgz /opt/apps/

复制代码

  1. 解压
tar -zxvf kafka_2.11-2.2.2.tgz

复制代码

  1. 修改配置文件
  • 进入配置文件目录
cd /opt/apps/kafka_2.11-2.2.2/config

复制代码

  • 编辑配置文件vi server.properties
#为依次增长的:0、1、2、3、4,集群中唯一 id broker.id=0 #数据存储的⽬录 log.dirs=/opt/apps/data/kafkadata #指定 zk 集群地址,注意这里加了一个目录 zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka

复制代码

其他的配置内容说明如下:

#broker 的全局唯一编号,不能重复,只能是数字。broker.id=0#处理网络请求的线程数量num.network.threads=3#用来处理磁盘 IO 的线程数量num.io.threads=8#发送套接字的缓冲区大小socket.send.buffer.bytes=102400#接收套接字的缓冲区大小socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小socket.request.max.bytes=104857600#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔log.dirs=/opt/module/kafka/datas#topic 在当前 broker 上的分区个数num.partitions=1#用来恢复和清理 data 下数据的线程数量num.recovery.threads.per.data.dir=1# 每个 topic 创建时的副本数,默认时 1 个副本offsets.topic.replication.factor=1#segment 文件保留的最长时间,超时将被删除log.retention.hours=168#每个 segment 文件的大小,默认最大 1Glog.segment.bytes=1073741824# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期log.retention.check.interval.ms=300000#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

复制代码

  1. 配置环境变量
vi /etc/profile export KAFKA_HOME=/opt/apps/kafka_2.11-2.2.2 export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile

复制代码

  1. 在不同的节点上重复上面的步骤,但是需要修改配置文件server.properties中的broker.id
# broker.id标记是哪个kafka节点,不能重复broker.id=1 broker.id=2

复制代码

  1. 启停集群
# 启动集群bin/kafka-server-start.sh -daemon /opt/apps/kafka_2.11-2.2.2/config/server.properties # 停止集群 bin/kafka-server-stop.sh stop

复制代码

kafka 命令行工具

1. 主题命令行操作

  • 查看操作主题命令参数kafka-topics.sh

  • 查看当前服务器中的所有 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list

复制代码

  • 创建 first topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first

复制代码

选项说明:

--topic 定义 topic 名

--replication-factor 定义副本数

--partitions 定义分区数

  • 查看 first 主题的详情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first

复制代码

  • 修改分区数(注意:分区数只能增加,不能减少
 bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

复制代码

  • 删除 topic
 bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first

复制代码

2. 生产者命令行操作

  • 查看操作生产者命令参数kafka-console-producer.sh

  • 发送消息
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first>hello world>xuyang hello

复制代码

3. 消费者命令行操作

  • 查看操作消费者命令参数kafka-console-consumer.sh

  • 消费消息
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

复制代码

  • 把主题中所有的数据都读取出来(包括历史数据)。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

复制代码

总结

本文分享了平时我在工作使用 kafka 以及 zookeeper 常用的一些工具,同时分享了 kafka 集群的部署,值得一提的是 kafka 部署配置 zookeeper 地址的时候,我们可以添加一个路径,比如

hadoop:2181/kafka

这种方式,那么 kafka 的元数据信息都会放到

/kafka

这个目录下,以防混淆。


本文转载自: https://blog.csdn.net/qq13321123/article/details/130880287
版权归原作者 进阶的架构师 所有, 如有侵权,请联系我们删除。

“用好 kafka,你不得不知的那些工具”的评论:

还没有评论