0


Linux平台Kafka高可用集群部署全攻略

🐇明明跟你说过:个人主页

🏅个人专栏:《大数据前沿:技术与应用并进》🏅

🔖行路有良友,便是天堂🔖

一、引言

1、Kafka简介

Apache Kafka 是一个开源的分布式流处理平台,它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的顶级项目。Kafka 设计用于处理实时数据流,提供了一种高效、可扩展、持久化的方式来进行数据发布和订阅。它通常被描述为一种分布式发布-订阅消息队列,但它实际上超越了传统消息队列的概念。

2、Kafka核心优势

  1. 高吞吐量:
  • Kafka 能够处理海量数据,支持每秒数十万条消息的读写操作,即使在大规模部署中也能保持高性能。
  • 通过高效的文件系统设计和内存管理机制,Kafka 能够在处理大量数据的同时保持低延迟。
  1. 持久性和可靠性:
  • Kafka 将数据存储在磁盘上,并支持数据复制(replication),确保即使在节点故障的情况下也能保证数据的可靠性和持久性。
  • 数据以追加的方式写入日志文件,减少了磁盘的随机写操作,提高了写入速度和数据完整性。
  1. 可扩展性:
  • Kafka 具有良好的水平扩展能力,可以通过增加更多的节点来提升系统的处理能力和存储容量。
  • 分布式架构使得 Kafka 能够轻松地在多台服务器上部署,并且能够动态扩展和收缩集群大小。
  1. 灵活的发布-订阅模型:
  • Kafka 支持发布-订阅模式,允许多个消费者订阅同一个主题,并且消费者可以独立消费消息。
  • 消费者可以控制自己的消费进度,不会影响其他消费者的状态,实现了消息消费的解耦。

二、环境准备

1、服务器

准备3台或者5台Linux服务器,用来组建高可用集群,这里使用3台Centos 7.9来进行搭建,大家也可以使用其他的Linux发行版本

配置如下:

2、服务器环境初始化

3台机器都要执行

关闭Selinux

vi /etc/selinux/config

#修改成如下
SELINUX=disabled

之后重启服务器

reboot

关闭并禁用防火墙

[root@kafka1 ~]# systemctl stop firewalld && systemctl disable firewalld

修改 /etc/hosts

vi /etc/hosts

# 添加以下内容
192.168.40.100  kafka1
192.168.40.101  kafka2
192.168.40.102  kafka3

修改镜像源

curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo

三、安装zookeeper

为什么安装Kafka时,要先安装zookeeper:

ZooKeeper 是一个分布式的协调服务,它为分布式应用程序提供了一套完整的协调服务功能,包括命名服务、配置管理、集群管理和同步等。Kafka 利用 ZooKeeper 来管理其集群中的多个组件,确保系统的稳定性和一致性。

1、上传tar包

apache-zookeeper-3.8.0-bin.tar.gz

tar包可以去官网进行下载 Apache ZooKeeperhttps://zookeeper.apache.org/releases.html#download

解压tar包至 /opt 下

tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt

2、编辑配置文件

vim /opt/apache-zookeeper-3.8.0-bin/conf/zoo.cfg

# 输入如下内容
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/zkData
dataLogDir=/opt/zookeeper/zkLog
clientPort=2181
server.1=kafka1:2188:3888
server.2=kafka2:2188:3888
server.3=kafka3:2188:3888
4lw.commands.whitelist=*

tickTime=2000:

  • tickTime 定义了 ZooKeeper 服务器之间的心跳间隔时间(毫秒)。它是 ZooKeeper 中最基本的单位时间。默认值通常是 2000 毫秒(即 2 秒)。

initLimit=10:

  • initLimit 定义了初始同步阶段的最大超时时间(心跳次数)。这意味着在初始同步阶段,跟随者(follower)必须在 initLimit * tickTime 毫秒内完成与领导者(leader)的同步。例如,这里设置为 20 秒(10 * 2000 毫秒)。

syncLimit=5:

  • syncLimit 定义了在领导者和跟随者之间发送消息的最大超时时间(心跳次数)。这意味着在同步阶段,跟随者必须在 syncLimit * tickTime 毫秒内响应领导者的请求。例如,这里设置为 10 秒(5 * 2000 毫秒)。

dataDir=/opt/zookeeper/zkData:

  • dataDir 指定 ZooKeeper 服务器用来存储快照(snapshot)的目录。

dataLogDir=/opt/zookeeper/zkLog:

  • dataLogDir 指定 ZooKeeper 服务器用来存储事务日志(transaction logs)的目录。这是从 ZooKeeper 3.4.6 开始引入的一个配置项,使得日志和数据可以分开存储。

clientPort=2181:

  • clientPort 指定客户端连接到 ZooKeeper 服务器的端口,默认为 2181。

server.1=ka1:2188:3888:

  • server.N 表示第 N 台服务器的信息,格式为 hostname:peerPort:leaderPort。peerPort 是服务器之间通信的端口,leaderPort 是选举领导者时使用的端口。

*4lw.commands.whitelist=

  • 4lw.commands.whitelist 指定客户端可以执行的命令白名单。* 表示允许所有命令。

3、创建数据目录

mkdir -p /opt/zookeeper/zkData 
mkdir -p /opt/zookeeper/zkLog

创建集群ID文件

在3台机器上分别执行

[root@kafka1 bin]# echo 1 >  /opt/zookeeper/zkData/myid
[root@kafka2 bin]# echo 2 >  /opt/zookeeper/zkData/myid
[root@kafka3 bin]# echo 3 >  /opt/zookeeper/zkData/myid

4、安装JAVA

yum install -y java-1.8.0-openjdk-devel

5、启动zookeeper

cd /opt/apache-zookeeper-3.8.0-bin/bin/ 
./zkServer.sh start ../conf/zoo.cfg &

查看状态

[root@kafka1 bin]# ps -aux | grep zook

四、Kafka集群搭建

1、上传tar包

资源包大家可以到官网下载

https://kafka.apache.org/

解压至指定目录

[root@kafka1 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
[root@kafka2 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
[root@kafka3 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/

2、编辑配置文件

在kafka1上执行

[root@kafka1 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties

#输入如下内容
broker.id=0
listeners=PLAINTEXT://kafka1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

在kafka2上执行

[root@kafka2 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties

#输入如下内容
broker.id=1
listeners=PLAINTEXT://kafka2:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

在kafka3上执行

[root@kafka3 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties

#输入如下内容
broker.id=2
listeners=PLAINTEXT://kafka3:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

broker.id=0

  • 这是 Kafka broker 的唯一标识符。每个 broker 必须有唯一的 ID。这里的值为 0,意味着这是一个集群中的单个 broker 或者是第一个 broker。

listeners=PLAINTEXT://kafka1:9092

  • 定义了 broker 监听的网络接口和端口。此处使用 PLAINTEXT 协议,意味着没有加密。kafka1:9092 表示监听名为 kafka1 的主机上的 9092 端口。

num.network.threads=3

  • 指定了用于网络请求处理的线程数。网络请求包括接收来自生产者的消息、发送消息给消费者等操作。这里设置为 3 个线程。

num.io.threads=8

  • 指定了用于处理 I/O 请求的线程数。I/O 请求包括磁盘上的读写操作。这里设置为 8 个线程。

socket.send.buffer.bytes=102400

  • 设置了发送套接字的缓冲区大小(单位:字节)。此配置影响网络数据包的发送速度。此处设置为 102400 字节。

socket.receive.buffer.bytes=102400

  • 设置了接收套接字的缓冲区大小(单位:字节)。此配置影响网络数据包的接收速度。此处设置为 102400 字节。

socket.request.max.bytes=104857600

  • 定义了从客户端接收的最大请求大小(单位:字节)。这有助于防止因过大请求而导致的内存溢出。此处设置为 104857600 字节,即约 100MB。

log.dirs=/opt/kafka-logs

  • 指定了日志文件存储的位置。日志文件包含了 Kafka topic 的数据。这里设置的日志目录为 /opt/kafka-logs。

num.partitions=5

  • 指定了默认主题分区的数量。分区越多,通常意味着更高的并发度。这里设置的主题默认分区数为 5。

default.replication.factor=2

  • 指定了创建新主题时的默认复制因子。复制因子决定了每个分区的副本数量。这里设置的复制因子为 2,意味着每个分区有 2 份副本。

num.recovery.threads.per.data.dir=1

  • 指定了用于恢复日志段的线程数。每个数据目录可以有不同的线程数。这里设置为 1 个线程。

offsets.topic.replication.factor=1

  • 指定了 _consumer_offsets 主题的复制因子。此主题用于存储消费者的偏移量信息。这里设置的复制因子为 1,意味着只有一个副本。

transaction.state.log.replication.factor=1

  • 指定了 _transactions 主题的复制因子。此主题用于记录事务状态。这里设置的复制因子为 1,意味着只有一个副本。

transaction.state.log.min.isr=1

  • 指定了 _transactions 主题的最小 ISR(In-Sync Replicas)数量。ISR 是与 leader 同步的副本集合。这里设置的最小 ISR 数量为 1。

log.retention.hours=168

  • 指定了日志数据保留的时间长度(单位:小时)。这里设置的日志保留时间为 168 小时,即 7 天。

log.segment.bytes=1073741824

  • 指定了日志段的最大大小(单位:字节)。一旦达到这个大小,Kafka 就会创建一个新的日志段。这里设置的日志段大小为 1073741824 字节,即 1GB。

log.retention.check.interval.ms=300000

  • 指定了检查日志清理的间隔时间(单位:毫秒)。这里设置的检查间隔为 300000 毫秒,即 5 分钟。

zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka

  • 指定了 ZooKeeper 服务器列表。ZooKeeper 用于协调 Kafka 集群。这里设置的 ZooKeeper 服务器为 kafka1:2181, kafka2:2181, kafka3:2181,路径为 /kafka。

zookeeper.connection.timeout.ms=18000

  • 指定了 Kafka 与 ZooKeeper 之间的连接超时时间(单位:毫秒)。这里设置的超时时间为 18000 毫秒,即 18 秒。请注意,zookeeper.connection.timeout.ms 在配置中出现了两次,应该是误写,只需要保留一次即可。

group.initial.rebalance.delay.ms=0

  • 指定了消费者组初始重新平衡的延迟时间(单位:毫秒)。这里设置的延迟时间为 0,即立即开始重新平衡。

3、创建数据目录

在3台机器上分别执行

mkdir /opt/kafka-logs

4、启动kafka

[root@kafka1 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka1 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@kafka2 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka2 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@kafka3 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka3 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

5、查看端口状态

[root@kafka1 bin]# netstat -antupl

五、测试

1、创建Topic

前面我们已经将kafka集群搭建起来了,接下来创建一个Topic进行写入测试,如果不清楚Topic是什么,可以翻看作者之前的文章。

在kafka1上执行

[root@kafka1 bin]# cd /opt/kafka_2.13-3.1.0/bin
[root@kafka1 bin]# ./kafka-topics.sh --bootstrap-server=192.168.40.100:9092 --topic test --create --partitions=3 --replication-factor=2
  • --bootstrap-server:指定 Kafka broker 的地址和端口号。这里的 192.168.40.100:9092 指定了 broker 的 IP 地址为 192.168.40.100,端口号为 9092。
  • --topic:指定要操作的主题名称。在这个例子中,主题名为 test。
  • --create:告诉 Kafka 创建一个新主题。如果主题已经存在,这条命令将会失败,除非你配置了允许创建已存在的主题。
  • --partitions:指定主题的分区数。分区数决定了主题能够并行处理消息的能力。在这个例子中,主题 test 将会有 3 个分区。
  • --replication-factor:指定主题的复制因子。复制因子决定了每个分区的副本数量,这对于数据的冗余和可靠性非常重要。在这个例子中,主题 test 的每个分区将会有 2 个副本。

查看Topic

[root@kafka1 bin]# ./kafka-topics.sh --bootstrap-server=192.168.40.100:9092 --list

2、生产消息

[root@kafka1 bin]# ./kafka-console-producer.sh --bootstrap-server 192.168.40.100:9092 --topic test

向我们刚刚创建的test Topic写入几条消息

3、消费消息

[root@kafka1 bin]# ./kafka-console-consumer.sh --bootstrap-server=192.168.40.100:9092 --topic test --from-beginning

如果能看到之前生产的消息,则证明集群搭建成功

💕💕💕每一次的分享都是一次成长的旅程,感谢您的陪伴和关注。希望这些关于大数据的文章能陪伴您走过技术的一段旅程,共同见证成长和进步!😺😺😺

🧨🧨🧨让我们一起在技术的海洋中探索前行,共同书写美好的未来!!!


本文转载自: https://blog.csdn.net/weixin_53269650/article/details/142088476
版权归原作者 明明跟你说过 所有, 如有侵权,请联系我们删除。

“Linux平台Kafka高可用集群部署全攻略”的评论:

还没有评论