0


【Kafka】Zookeeper和Kafka集群的安装和配置

一、集群环境说明

  1. 虚拟机:192.168.223.101/103/105

  2. 系统版本:CentOS 7.9

  3. JDK版本:11.0.18.0.1

  4. Zookeeper版本:3.7.1

  5. Kafka版本:2.13-2.8.2

备注:无论是ZK,还是Kafka的安装,都需要用到JDK,上面给出的ZK和Kafka版本,都已经支持JDK11(JDK 11 Supported)。这三者之间的兼容关系,感兴趣的可以去对应的官网上查询官方Docs,这里就不做赘述了。

二、集群组件部署

2.1 安装JDK

使用root用户安装JDK11,JDK目录为:/usr/jdk-11.0.18.0.1

cd /usr
tar -xzf jdk-11.0.18.0.1_linux-x64_bin.tar.gz
rm -f jdk-11.0.18.0.1_linux-x64_bin.tar.gz
# 目录授权,供其他用户和组调用
chmod -R 755 jdk-11.0.18.0.1

2.2 创建用户和组

由于Zookeeper和Kafka的安装和运行无需root用户,因此从安全角度考虑,我们为其安装和运行创建普通用户和组(app:apps)。

groupadd apps
useradd -d /app -g apps app
chmod -R 755 /app
chown -R app:apps /app

2.3配置Java环境变量

首先,我们切换到app用户下,然后vi .bash_profile这个文件(如果不存在可以直接vi创建即可)。然后将下面的内容黏贴到文件中,并保存退出。最后使用source .bash_profile使配置生效即可。

# .bash_profile

# Get the aliases and functions
if [ -f ~/.bashrc ]; then
        . ~/.bashrc
fi

# User specific environment and startup programs
JAVA_HOME=/usr/jdk-11.0.18.0.1
export JAVA_HOME

CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
export CLASSPATH

PATH=$JAVA_HOME/bin:$PATH:/usr/local/bin:$HOME/bin
export PATH

2.4 安装Zookeeper集群

首先,我们将zk的安装包使用rz命令上传到app用户目录下,然后解压,按照下面的配置编辑配置文件即可。

cd /app
tar -xzf apache-zookeeper-3.7.1-bin.tar.gz
rm -f ./apache-zookeeper-3.7.1-bin.tar.gz
mv apache-zookeeper-3.7.1-bin/ zookeeper
cd /app/zookeeper/conf 
vi zoo.cfg

192.168.223.101/103/105三台服务器上的zoo.cfg配置一致,这里需要注意dataDir,需要自定义目录,目录需要提前创建好。

# The number of milliseconds of each tick
tickTime=2000

# The number of ticks that the initial synchronization phase can take
initLimit=10

# The number of ticks that can pass between sending a request and getting an acknowledgement
syncLimit=5

# the directory where the snapshot is stored.do not use /tmp for storage, /tmp here is just example sakes.
dataDir=/app/zookeeper/data

# the port at which the clients will connect
clientPort=2181

# the maximum number of client connections.increase this if you need to handle more clients
maxClientCnxns=60

# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.

# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3

# Purge task interval in hours Set to "0" to disable auto purge feature
autopurge.purgeInterval=1

## Metrics Providers
# https://prometheus.io Metrics Exporter
metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
metricsProvider.httpPort=17000
metricsProvider.exportJvmInfo=true

server.1=192.168.223.101:2888:3888
server.2=192.168.223.103:2888:3888
server.3=192.168.223.105:2888:3888

这里不同的配置为myid,配置如下:

# 192.168.223.101
cd /app/zookeeper/data
echo 1 > myid

# 192.168.223.103
cd /app/zookeeper/data
echo 2 > myid

# 192.168.223.105
cd /app/zookeeper/data
echo 3 > myid

最后依次启动三台服务器上的zookeeper即可,并检查当前节点的状态,三个节点中,2个为follower,1个为leader。

cd /app/zookeeper/bin
./zkServer.sh start
./zkServer.sh status

到这里,zookeeper集群的安装配置就完成了,下面我们来安装配置Kafka集群。

2.5 安装Kafka集群

首先,还是先将Kafka的二进制包上传的app用户目录下,然后执行解压、配置和服务启动。

cd /app
tar -xzf kafka_2.13-2.8.2.tgz
rm -f kafka_2.13-2.8.2.tgz
mv kafka_2.13-2.8.2 kafka
cd  kafka/config
mv server.properties server.properties.bak
vi server.properties

三台服务器(192.168.223.101/103/105)的server.properties配置分别如下:

# Server - 192.168.223.101
############################# Server Basics #############################
# 每个节点的broker-id不能一样,需要修改
broker.id=1

############################# Socket Server Settings #############################
# 每个节点的listeners,需要修改IP
listeners=PLAINTEXT://192.168.223.101:9092

num.network.threads=3
num.io.threads=8

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

############################# Log Basics #############################
# 自定义log目录路径
log.dirs=/app/kafka/logs

num.partitions=3
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=192.168.223.101:2181,192.168.223.103:2181,192.168.223.105:2181
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
# Server - 192.168.223.103
############################# Server Basics #############################
# 每个节点的broker-id不能一样,需要修改
broker.id=2

############################# Socket Server Settings #############################
# 每个节点的listeners,需要修改IP
listeners=PLAINTEXT://192.168.223.103:9092

num.network.threads=3
num.io.threads=8

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

############################# Log Basics #############################
# 自定义log目录路径
log.dirs=/app/kafka/logs

num.partitions=3
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=192.168.223.101:2181,192.168.223.103:2181,192.168.223.105:2181
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
# Server - 192.168.223.105
############################# Server Basics #############################
# 每个节点的broker-id不能一样,需要修改
broker.id=3

############################# Socket Server Settings #############################
# 每个节点的listeners,需要修改IP
listeners=PLAINTEXT://192.168.223.105:9092

num.network.threads=3
num.io.threads=8

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

############################# Log Basics #############################
# 自定义log目录路径
log.dirs=/app/kafka/logs

num.partitions=3
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=192.168.223.101:2181,192.168.223.103:2181,192.168.223.105:2181
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0

然后,我们依次启动三台服务器上的Kafka即可

cd /app/kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties

备注:-daemon为后台启动,这样就无需在启动命令中写nohup …… &这样的字符了。

三、Kafka使用测试

到此为止,三节点的Kafka集群就已经部署完毕,部分配置参数还有待调优,这里就不做扩展说明了,生产环境上的配置,以本地的配置为准。

$ cd /app/kafka

# 创建topic
$ sh ./bin/kafka-topics.sh --create --zookeeper 192.168.223.103:2181 --replication-factor 1 --partitions 1 --topic my-topic
Created topic my-topic.

# 浏览所有topic
$ ./bin/kafka-topics.sh --list --zookeeper 192.168.223.103:2181
my-topic

$ ./bin/kafka-topics.sh --list --zookeeper 192.168.223.101:2181
my-topic

$ ./bin/kafka-topics.sh --list --zookeeper 192.168.223.105:2181
my-topic

# 浏览指定topic
$ ./bin/kafka-topics.sh --describe --zookeeper 192.168.223.103:2181 --topic my-topic
Topic: my-topic TopicId: wfI9VvK1QAyCP9ReZrDlIQ PartitionCount: 1       ReplicationFactor: 1    Configs: 
        Topic: my-topic Partition: 0    Leader: 2       Replicas: 2     Isr: 2

# 生产console信息
$ ./bin/kafka-console-producer.sh --broker-list 192.168.223.103:9092 --topic my-topic

# 消费Console消息:
$ ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.223.103:9092 --topic my-topic --from-beginning

$ cd /app/zookeeper/bin
$ ./zkCli.sh -server 192.168.223.103:2181
[zk: 192.168.223.103:2181(CONNECTED) 6] ls /brokers/topics
[__consumer_offsets, my-topic]
标签: JDK zookeeper kafka

本文转载自: https://blog.csdn.net/cnskylee/article/details/128990198
版权归原作者 cnskylee 所有, 如有侵权,请联系我们删除。

“【Kafka】Zookeeper和Kafka集群的安装和配置”的评论:

还没有评论