一、集群环境说明
虚拟机:192.168.223.101/103/105
系统版本:CentOS 7.9
JDK版本:11.0.18.0.1
Zookeeper版本:3.7.1
Kafka版本:2.13-2.8.2
备注:无论是ZK,还是Kafka的安装,都需要用到JDK,上面给出的ZK和Kafka版本,都已经支持JDK11(JDK 11 Supported)。这三者之间的兼容关系,感兴趣的可以去对应的官网上查询官方Docs,这里就不做赘述了。
二、集群组件部署
2.1 安装JDK
使用root用户安装JDK11,JDK目录为:/usr/jdk-11.0.18.0.1
cd /usr
tar -xzf jdk-11.0.18.0.1_linux-x64_bin.tar.gz
rm -f jdk-11.0.18.0.1_linux-x64_bin.tar.gz
# 目录授权,供其他用户和组调用
chmod -R 755 jdk-11.0.18.0.1
2.2 创建用户和组
由于Zookeeper和Kafka的安装和运行无需root用户,因此从安全角度考虑,我们为其安装和运行创建普通用户和组(app:apps)。
groupadd apps
useradd -d /app -g apps app
chmod -R 755 /app
chown -R app:apps /app
2.3配置Java环境变量
首先,我们切换到app用户下,然后vi .bash_profile这个文件(如果不存在可以直接vi创建即可)。然后将下面的内容黏贴到文件中,并保存退出。最后使用source .bash_profile使配置生效即可。
# .bash_profile
# Get the aliases and functions
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi
# User specific environment and startup programs
JAVA_HOME=/usr/jdk-11.0.18.0.1
export JAVA_HOME
CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
export CLASSPATH
PATH=$JAVA_HOME/bin:$PATH:/usr/local/bin:$HOME/bin
export PATH
2.4 安装Zookeeper集群
首先,我们将zk的安装包使用rz命令上传到app用户目录下,然后解压,按照下面的配置编辑配置文件即可。
cd /app
tar -xzf apache-zookeeper-3.7.1-bin.tar.gz
rm -f ./apache-zookeeper-3.7.1-bin.tar.gz
mv apache-zookeeper-3.7.1-bin/ zookeeper
cd /app/zookeeper/conf
vi zoo.cfg
192.168.223.101/103/105三台服务器上的zoo.cfg配置一致,这里需要注意dataDir,需要自定义目录,目录需要提前创建好。
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial synchronization phase can take
initLimit=10
# The number of ticks that can pass between sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.do not use /tmp for storage, /tmp here is just example sakes.
dataDir=/app/zookeeper/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.increase this if you need to handle more clients
maxClientCnxns=60
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
## Metrics Providers
# https://prometheus.io Metrics Exporter
metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
metricsProvider.httpPort=17000
metricsProvider.exportJvmInfo=true
server.1=192.168.223.101:2888:3888
server.2=192.168.223.103:2888:3888
server.3=192.168.223.105:2888:3888
这里不同的配置为myid,配置如下:
# 192.168.223.101
cd /app/zookeeper/data
echo 1 > myid
# 192.168.223.103
cd /app/zookeeper/data
echo 2 > myid
# 192.168.223.105
cd /app/zookeeper/data
echo 3 > myid
最后依次启动三台服务器上的zookeeper即可,并检查当前节点的状态,三个节点中,2个为follower,1个为leader。
cd /app/zookeeper/bin
./zkServer.sh start
./zkServer.sh status
到这里,zookeeper集群的安装配置就完成了,下面我们来安装配置Kafka集群。
2.5 安装Kafka集群
首先,还是先将Kafka的二进制包上传的app用户目录下,然后执行解压、配置和服务启动。
cd /app
tar -xzf kafka_2.13-2.8.2.tgz
rm -f kafka_2.13-2.8.2.tgz
mv kafka_2.13-2.8.2 kafka
cd kafka/config
mv server.properties server.properties.bak
vi server.properties
三台服务器(192.168.223.101/103/105)的server.properties配置分别如下:
# Server - 192.168.223.101
############################# Server Basics #############################
# 每个节点的broker-id不能一样,需要修改
broker.id=1
############################# Socket Server Settings #############################
# 每个节点的listeners,需要修改IP
listeners=PLAINTEXT://192.168.223.101:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 自定义log目录路径
log.dirs=/app/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=192.168.223.101:2181,192.168.223.103:2181,192.168.223.105:2181
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
# Server - 192.168.223.103
############################# Server Basics #############################
# 每个节点的broker-id不能一样,需要修改
broker.id=2
############################# Socket Server Settings #############################
# 每个节点的listeners,需要修改IP
listeners=PLAINTEXT://192.168.223.103:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 自定义log目录路径
log.dirs=/app/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=192.168.223.101:2181,192.168.223.103:2181,192.168.223.105:2181
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
# Server - 192.168.223.105
############################# Server Basics #############################
# 每个节点的broker-id不能一样,需要修改
broker.id=3
############################# Socket Server Settings #############################
# 每个节点的listeners,需要修改IP
listeners=PLAINTEXT://192.168.223.105:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 自定义log目录路径
log.dirs=/app/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=192.168.223.101:2181,192.168.223.103:2181,192.168.223.105:2181
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
然后,我们依次启动三台服务器上的Kafka即可
cd /app/kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties
备注:-daemon为后台启动,这样就无需在启动命令中写nohup …… &这样的字符了。
三、Kafka使用测试
到此为止,三节点的Kafka集群就已经部署完毕,部分配置参数还有待调优,这里就不做扩展说明了,生产环境上的配置,以本地的配置为准。
$ cd /app/kafka
# 创建topic
$ sh ./bin/kafka-topics.sh --create --zookeeper 192.168.223.103:2181 --replication-factor 1 --partitions 1 --topic my-topic
Created topic my-topic.
# 浏览所有topic
$ ./bin/kafka-topics.sh --list --zookeeper 192.168.223.103:2181
my-topic
$ ./bin/kafka-topics.sh --list --zookeeper 192.168.223.101:2181
my-topic
$ ./bin/kafka-topics.sh --list --zookeeper 192.168.223.105:2181
my-topic
# 浏览指定topic
$ ./bin/kafka-topics.sh --describe --zookeeper 192.168.223.103:2181 --topic my-topic
Topic: my-topic TopicId: wfI9VvK1QAyCP9ReZrDlIQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
# 生产console信息
$ ./bin/kafka-console-producer.sh --broker-list 192.168.223.103:9092 --topic my-topic
# 消费Console消息:
$ ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.223.103:9092 --topic my-topic --from-beginning
$ cd /app/zookeeper/bin
$ ./zkCli.sh -server 192.168.223.103:2181
[zk: 192.168.223.103:2181(CONNECTED) 6] ls /brokers/topics
[__consumer_offsets, my-topic]
版权归原作者 cnskylee 所有, 如有侵权,请联系我们删除。