🐇明明跟你说过:个人主页
🏅个人专栏:《大数据前沿:技术与应用并进》🏅
🔖行路有良友,便是天堂🔖
一、引言
1、Kafka简介
Apache Kafka 是一个开源的分布式流处理平台,它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的顶级项目。Kafka 设计用于处理实时数据流,提供了一种高效、可扩展、持久化的方式来进行数据发布和订阅。它通常被描述为一种分布式发布-订阅消息队列,但它实际上超越了传统消息队列的概念。
2、Kafka核心优势
- 高吞吐量:
- Kafka 能够处理海量数据,支持每秒数十万条消息的读写操作,即使在大规模部署中也能保持高性能。
- 通过高效的文件系统设计和内存管理机制,Kafka 能够在处理大量数据的同时保持低延迟。
- 持久性和可靠性:
- Kafka 将数据存储在磁盘上,并支持数据复制(replication),确保即使在节点故障的情况下也能保证数据的可靠性和持久性。
- 数据以追加的方式写入日志文件,减少了磁盘的随机写操作,提高了写入速度和数据完整性。
- 可扩展性:
- Kafka 具有良好的水平扩展能力,可以通过增加更多的节点来提升系统的处理能力和存储容量。
- 分布式架构使得 Kafka 能够轻松地在多台服务器上部署,并且能够动态扩展和收缩集群大小。
- 灵活的发布-订阅模型:
- Kafka 支持发布-订阅模式,允许多个消费者订阅同一个主题,并且消费者可以独立消费消息。
- 消费者可以控制自己的消费进度,不会影响其他消费者的状态,实现了消息消费的解耦。
二、环境准备
1、服务器
准备3台或者5台Linux服务器,用来组建高可用集群,这里使用3台Centos 7.9来进行搭建,大家也可以使用其他的Linux发行版本
配置如下:
2、服务器环境初始化
3台机器都要执行
关闭Selinux
vi /etc/selinux/config
#修改成如下
SELINUX=disabled
之后重启服务器
reboot
关闭并禁用防火墙
[root@kafka1 ~]# systemctl stop firewalld && systemctl disable firewalld
修改 /etc/hosts
vi /etc/hosts
# 添加以下内容
192.168.40.100 kafka1
192.168.40.101 kafka2
192.168.40.102 kafka3
修改镜像源
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
三、安装zookeeper
为什么安装Kafka时,要先安装zookeeper:
ZooKeeper 是一个分布式的协调服务,它为分布式应用程序提供了一套完整的协调服务功能,包括命名服务、配置管理、集群管理和同步等。Kafka 利用 ZooKeeper 来管理其集群中的多个组件,确保系统的稳定性和一致性。
1、上传tar包
apache-zookeeper-3.8.0-bin.tar.gz
tar包可以去官网进行下载 Apache ZooKeeperhttps://zookeeper.apache.org/releases.html#download
解压tar包至 /opt 下
tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt
2、编辑配置文件
vim /opt/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
# 输入如下内容
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/zkData
dataLogDir=/opt/zookeeper/zkLog
clientPort=2181
server.1=kafka1:2188:3888
server.2=kafka2:2188:3888
server.3=kafka3:2188:3888
4lw.commands.whitelist=*
tickTime=2000:
- tickTime 定义了 ZooKeeper 服务器之间的心跳间隔时间(毫秒)。它是 ZooKeeper 中最基本的单位时间。默认值通常是 2000 毫秒(即 2 秒)。
initLimit=10:
- initLimit 定义了初始同步阶段的最大超时时间(心跳次数)。这意味着在初始同步阶段,跟随者(follower)必须在 initLimit * tickTime 毫秒内完成与领导者(leader)的同步。例如,这里设置为 20 秒(10 * 2000 毫秒)。
syncLimit=5:
- syncLimit 定义了在领导者和跟随者之间发送消息的最大超时时间(心跳次数)。这意味着在同步阶段,跟随者必须在 syncLimit * tickTime 毫秒内响应领导者的请求。例如,这里设置为 10 秒(5 * 2000 毫秒)。
dataDir=/opt/zookeeper/zkData:
- dataDir 指定 ZooKeeper 服务器用来存储快照(snapshot)的目录。
dataLogDir=/opt/zookeeper/zkLog:
- dataLogDir 指定 ZooKeeper 服务器用来存储事务日志(transaction logs)的目录。这是从 ZooKeeper 3.4.6 开始引入的一个配置项,使得日志和数据可以分开存储。
clientPort=2181:
- clientPort 指定客户端连接到 ZooKeeper 服务器的端口,默认为 2181。
server.1=ka1:2188:3888:
- server.N 表示第 N 台服务器的信息,格式为 hostname:peerPort:leaderPort。peerPort 是服务器之间通信的端口,leaderPort 是选举领导者时使用的端口。
*4lw.commands.whitelist=:
- 4lw.commands.whitelist 指定客户端可以执行的命令白名单。* 表示允许所有命令。
3、创建数据目录
mkdir -p /opt/zookeeper/zkData
mkdir -p /opt/zookeeper/zkLog
创建集群ID文件
在3台机器上分别执行
[root@kafka1 bin]# echo 1 > /opt/zookeeper/zkData/myid
[root@kafka2 bin]# echo 2 > /opt/zookeeper/zkData/myid
[root@kafka3 bin]# echo 3 > /opt/zookeeper/zkData/myid
4、安装JAVA
yum install -y java-1.8.0-openjdk-devel
5、启动zookeeper
cd /opt/apache-zookeeper-3.8.0-bin/bin/
./zkServer.sh start ../conf/zoo.cfg &
查看状态
[root@kafka1 bin]# ps -aux | grep zook
四、Kafka集群搭建
1、上传tar包
资源包大家可以到官网下载
解压至指定目录
[root@kafka1 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
[root@kafka2 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
[root@kafka3 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
2、编辑配置文件
在kafka1上执行
[root@kafka1 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties
#输入如下内容
broker.id=0
listeners=PLAINTEXT://kafka1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
在kafka2上执行
[root@kafka2 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties
#输入如下内容
broker.id=1
listeners=PLAINTEXT://kafka2:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
在kafka3上执行
[root@kafka3 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties
#输入如下内容
broker.id=2
listeners=PLAINTEXT://kafka3:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
broker.id=0
- 这是 Kafka broker 的唯一标识符。每个 broker 必须有唯一的 ID。这里的值为 0,意味着这是一个集群中的单个 broker 或者是第一个 broker。
listeners=PLAINTEXT://kafka1:9092
- 定义了 broker 监听的网络接口和端口。此处使用 PLAINTEXT 协议,意味着没有加密。kafka1:9092 表示监听名为 kafka1 的主机上的 9092 端口。
num.network.threads=3
- 指定了用于网络请求处理的线程数。网络请求包括接收来自生产者的消息、发送消息给消费者等操作。这里设置为 3 个线程。
num.io.threads=8
- 指定了用于处理 I/O 请求的线程数。I/O 请求包括磁盘上的读写操作。这里设置为 8 个线程。
socket.send.buffer.bytes=102400
- 设置了发送套接字的缓冲区大小(单位:字节)。此配置影响网络数据包的发送速度。此处设置为 102400 字节。
socket.receive.buffer.bytes=102400
- 设置了接收套接字的缓冲区大小(单位:字节)。此配置影响网络数据包的接收速度。此处设置为 102400 字节。
socket.request.max.bytes=104857600
- 定义了从客户端接收的最大请求大小(单位:字节)。这有助于防止因过大请求而导致的内存溢出。此处设置为 104857600 字节,即约 100MB。
log.dirs=/opt/kafka-logs
- 指定了日志文件存储的位置。日志文件包含了 Kafka topic 的数据。这里设置的日志目录为 /opt/kafka-logs。
num.partitions=5
- 指定了默认主题分区的数量。分区越多,通常意味着更高的并发度。这里设置的主题默认分区数为 5。
default.replication.factor=2
- 指定了创建新主题时的默认复制因子。复制因子决定了每个分区的副本数量。这里设置的复制因子为 2,意味着每个分区有 2 份副本。
num.recovery.threads.per.data.dir=1
- 指定了用于恢复日志段的线程数。每个数据目录可以有不同的线程数。这里设置为 1 个线程。
offsets.topic.replication.factor=1
- 指定了 _consumer_offsets 主题的复制因子。此主题用于存储消费者的偏移量信息。这里设置的复制因子为 1,意味着只有一个副本。
transaction.state.log.replication.factor=1
- 指定了 _transactions 主题的复制因子。此主题用于记录事务状态。这里设置的复制因子为 1,意味着只有一个副本。
transaction.state.log.min.isr=1
- 指定了 _transactions 主题的最小 ISR(In-Sync Replicas)数量。ISR 是与 leader 同步的副本集合。这里设置的最小 ISR 数量为 1。
log.retention.hours=168
- 指定了日志数据保留的时间长度(单位:小时)。这里设置的日志保留时间为 168 小时,即 7 天。
log.segment.bytes=1073741824
- 指定了日志段的最大大小(单位:字节)。一旦达到这个大小,Kafka 就会创建一个新的日志段。这里设置的日志段大小为 1073741824 字节,即 1GB。
log.retention.check.interval.ms=300000
- 指定了检查日志清理的间隔时间(单位:毫秒)。这里设置的检查间隔为 300000 毫秒,即 5 分钟。
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
- 指定了 ZooKeeper 服务器列表。ZooKeeper 用于协调 Kafka 集群。这里设置的 ZooKeeper 服务器为 kafka1:2181, kafka2:2181, kafka3:2181,路径为 /kafka。
zookeeper.connection.timeout.ms=18000
- 指定了 Kafka 与 ZooKeeper 之间的连接超时时间(单位:毫秒)。这里设置的超时时间为 18000 毫秒,即 18 秒。请注意,zookeeper.connection.timeout.ms 在配置中出现了两次,应该是误写,只需要保留一次即可。
group.initial.rebalance.delay.ms=0
- 指定了消费者组初始重新平衡的延迟时间(单位:毫秒)。这里设置的延迟时间为 0,即立即开始重新平衡。
3、创建数据目录
在3台机器上分别执行
mkdir /opt/kafka-logs
4、启动kafka
[root@kafka1 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka1 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@kafka2 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka2 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@kafka3 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka3 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
5、查看端口状态
[root@kafka1 bin]# netstat -antupl
五、测试
1、创建Topic
前面我们已经将kafka集群搭建起来了,接下来创建一个Topic进行写入测试,如果不清楚Topic是什么,可以翻看作者之前的文章。
在kafka1上执行
[root@kafka1 bin]# cd /opt/kafka_2.13-3.1.0/bin
[root@kafka1 bin]# ./kafka-topics.sh --bootstrap-server=192.168.40.100:9092 --topic test --create --partitions=3 --replication-factor=2
- --bootstrap-server:指定 Kafka broker 的地址和端口号。这里的 192.168.40.100:9092 指定了 broker 的 IP 地址为 192.168.40.100,端口号为 9092。
- --topic:指定要操作的主题名称。在这个例子中,主题名为 test。
- --create:告诉 Kafka 创建一个新主题。如果主题已经存在,这条命令将会失败,除非你配置了允许创建已存在的主题。
- --partitions:指定主题的分区数。分区数决定了主题能够并行处理消息的能力。在这个例子中,主题 test 将会有 3 个分区。
- --replication-factor:指定主题的复制因子。复制因子决定了每个分区的副本数量,这对于数据的冗余和可靠性非常重要。在这个例子中,主题 test 的每个分区将会有 2 个副本。
查看Topic
[root@kafka1 bin]# ./kafka-topics.sh --bootstrap-server=192.168.40.100:9092 --list
2、生产消息
[root@kafka1 bin]# ./kafka-console-producer.sh --bootstrap-server 192.168.40.100:9092 --topic test
向我们刚刚创建的test Topic写入几条消息
3、消费消息
[root@kafka1 bin]# ./kafka-console-consumer.sh --bootstrap-server=192.168.40.100:9092 --topic test --from-beginning
如果能看到之前生产的消息,则证明集群搭建成功
💕💕💕每一次的分享都是一次成长的旅程,感谢您的陪伴和关注。希望这些关于大数据的文章能陪伴您走过技术的一段旅程,共同见证成长和进步!😺😺😺
🧨🧨🧨让我们一起在技术的海洋中探索前行,共同书写美好的未来!!!
版权归原作者 明明跟你说过 所有, 如有侵权,请联系我们删除。