0


字节Java面试必问:真的,搞定kafka看这一篇就够了_工作1年的java不会kafka

Kafka 集群包含一个或多个服务器,每个 Kafka 中服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。

broker 是集群的组成部分,每个集群中都会有一个 broker 同时充当了

集群控制器(Leader)

的角色,它是由集群中的活跃成员选举出来的。每个集群中的成员都有可能充当 Leader,Leader 负责管理工作,包括将分区分配给 broker 和监控 broker。集群中,一个分区从属于一个 Leader,但是一个分区可以分配给多个 broker(非Leader),这时候会发生分区复制。这种复制的机制为分区提供了消息冗余,如果一个 broker 失效,那么其他活跃用户会重新选举一个 Leader 接管。

producer

生产者,即消息的发布者,其会将某 topic 的消息发布到相应的 partition 中。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。

consumer

消费者,即消息的使用者,一个消费者可以消费多个 topic 的消息,对于某一个 topic 的消息,其只会消费同一个 partition 中的消息

在了解完 Kafka 的基本概念之后,我们通过搭建 Kafka 集群来进一步深刻认识一下 Kafka。

确保安装环境

安装 Java 环境

在安装 Kafka 之前,先确保Linux 环境上是否有 Java 环境,使用

java -version

命令查看 Java 版本,推荐使用Jdk 1.8 ,如果没有安装 Java 环境的话,可以按照这篇文章进行安装(https://www.cnblogs.com/zs-notes/p/8535275.html)

安装 Zookeeper 环境

Kafka 的底层使用 Zookeeper 储存元数据,确保一致性,所以安装 Kafka 前需要先安装 Zookeeper,Kafka 的发行版自带了 Zookeeper ,可以直接使用脚本来启动,不过安装一个 Zookeeper 也不费劲

Zookeeper 单机搭建

Zookeeper 单机搭建比较简单,直接从 www.apache.org/dyn/closer.… 官网下载一个稳定版本的 Zookeeper ,这里我使用的是

3.4.10

,下载完成后,在 Linux 系统中的

/usr/local

目录下创建 zookeeper 文件夹,使用

xftp

工具(xftp 和 xshell 工具都可以在官网 www.netsarang.com/zh/xshell/ 申请免费的家庭版)把下载好的 zookeeper 压缩包放到 /usr/local/zookeeper 目录下。

如果下载的是一个 tar.gz 包的话,直接使用

tar -zxvf zookeeper-3.4.10.tar.gz

解压即可

如果下载的是 zip 包的话,还要检查一下 Linux 中是否有 unzip 工具,如果没有的话,使用

yum install unzip

安装 zip 解压工具,完成后使用

unzip zookeeper-3.4.10.zip

解压即可。

解压完成后,cd 到

/usr/local/zookeeper/zookeeper-3.4.10

,创建一个 data 文件夹,然后进入到 conf 文件夹下,使用

mv zoo_sample.cfg zoo.cfg

进行重命名操作

然后使用 vi 打开 zoo.cfg ,更改一下

dataDir = /usr/local/zookeeper/zookeeper-3.4.10/data

,保存。

进入bin目录,启动服务输入命令

./zkServer.sh start

输出下面内容表示搭建成功

关闭服务输入命令,

./zkServer.sh stop

使用

./zkServer.sh status

可以查看状态信息。

Zookeeper 集群搭建
准备条件

准备条件:需要三个服务器,这里我使用了CentOS7 并安装了三个虚拟机,并为各自的虚拟机分配了

1GB

的内存,在每个

/usr/local/

下面新建 zookeeper 文件夹,把 zookeeper 的压缩包挪过来,解压,完成后会有 zookeeper-3.4.10 文件夹,进入到文件夹,新建两个文件夹,分别是

data

log

文件夹

注:上一节单机搭建中已经创建了一个data 文件夹,就不需要重新创建了,直接新建一个 log 文件夹,对另外两个新增的服务需要新建这两个文件夹。

设置集群

新建完成后,需要编辑 conf/zoo.cfg 文件,三个文件的内容如下

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data
dataLogDir=/usr/local/zookeeper/zookeeper-3.4.10/log
clientPort=12181
server.1=192.168.1.7:12888:13888
server.2=192.168.1.8:12888:13888
server.3=192.168.1.9:12888:13888

server.1 中的这个 1 表示的是服务器的标识也可以是其他数字,表示这是第几号服务器,这个标识要和下面我们配置的

myid

的标识一致可以。

192.168.1.7:12888:13888

为集群中的 ip 地址,第一个端口表示的是 master 与 slave 之间的通信接口,默认是 2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口,默认是 3888

现在对上面的配置文件进行解释

tickTime

: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间

维持心跳

的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

initLimit

:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒

syncLimit

: 这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒

dataDir

: 快照日志的存储路径

dataLogDir

: 事务日志的存储路径,如果不配置这个那么事务日志会默认存储到dataDir指定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事务日志、快照日志太多

clientPort

: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

创建 myid 文件

在了解完其配置文件后,现在来创建每个集群节点的 myid ,我们上面说过,这个 myid 就是

server.1

的这个 1 ,类似的,需要为集群中的每个服务都指定标识,使用

echo

命令进行创建

# server.1
echo "1" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
# server.2
echo "2" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
# server.3
echo "3" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
启动服务并测试

配置完成,为每个 zk 服务启动并测试,我在 windows 电脑的测试结果如下

启动服务(每台都需要执行)

cd /usr/local/zookeeper/zookeeper-3.4.10/bin
./zkServer.sh start

检查服务状态

使用

./zkServer.sh status

命令检查服务状态

192.168.1.7 — follower

192.168.1.8 — leader

192.168.1.9 — follower

zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。

Kafka 集群搭建

准备条件
  • 搭建好的 Zookeeper 集群
  • Kafka 压缩包

/usr/local

下新建

kafka

文件夹,然后把下载完成的 tar.gz 包移到 /usr/local/kafka 目录下,使用

tar -zxvf 压缩包

进行解压,解压完成后,进入到 kafka_2.12-2.3.0 目录下,新建 log 文件夹,进入到 config 目录下

我们可以看到有很多 properties 配置文件,这里主要关注

server.properties

这个文件即可。

kafka 启动方式有两种,一种是使用 kafka 自带的 zookeeper 配置文件来启动(可以按照官网来进行启动,并使用单个服务多个节点来模拟集群http://kafka.apache.org/quickstart#quickstart_multibroker),一种是通过使用独立的zk集群来启动,这里推荐使用第二种方式,使用 zk 集群来启动

修改配置项

需要为

每个服务

都修改一下配置项,也就是

server.properties

, 需要更新和添加的内容有

broker.id=0 //初始是0,每个 server 的broker.id 都应该设置为不一样的,就和 myid 一样 我的三个服务分别设置的是 1,2,3
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181

配置项的含义

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.1.7 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的连接端口
启动 Kafka 集群并测试
  • 启动服务,进入到 /usr/local/kafka/kafka_2.12-2.3.0/bin 目录下
# 启动后台进程
./kafka-server-start.sh -daemon ../config/server.properties
  • 检查服务是否启动
# 执行命令 jps
6201 QuorumPeerMain
7035 Jps
6972 Kafka
  • kafka 已经启动
  • 创建 Topic 来验证是否创建成功
# cd .. 往回退一层 到 /usr/local/kafka/kafka_2.12-2.3.0 目录下
bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan

对上面的解释

–replication-factor 2 复制两份

–partitions 1 创建1个分区

–topic 创建主题

查看我们的主题是否出创建成功

bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181

启动一个服务就能把集群启动起来

在一台机器上创建一个发布者

# 创建一个broker,发布者
./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic

在一台服务器上创建一个订阅者

# 创建一个consumer, 消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning

注意:这里使用 --zookeeper 的话可能出现

zookeeper is not a recognized option

的错误,这是因为 kafka 版本太高,需要使用

--bootstrap-server

指令

测试结果

发布

标签: java 面试 kafka

本文转载自: https://blog.csdn.net/2401_87167773/article/details/142347750
版权归原作者 2401_87167773 所有, 如有侵权,请联系我们删除。

“字节Java面试必问:真的,搞定kafka看这一篇就够了_工作1年的java不会kafka”的评论:

还没有评论