Kafka基础
Kafka虽然借鉴了JMS规范的思想,但在设计原理上并未完全遵循JMS规范。因此,Kafka内部包含了许多用于数据传输的组件对象,这些组件相互关联,共同实现了高效的数据传输。下面,我们将详细介绍Kafka中的基础概念及核心组件,并展示如何在Windows环境下搭建一个简单的Kafka集群以供学习和练习之用。
集群部署
尽管生产环境中通常使用Linux系统搭建服务器集群,但为了便于理解和实践,我们将在此章节中搭建一个基于Windows系统的简易集群。关于Linux集群的搭建将在后续章节中详述。
解压文件
- 在磁盘根目录创建文件夹
cluster
,文件夹名称应保持简短。 - 将Kafka安装包
kafka_2.12-3.6.1.tgz
解压缩至cluster
文件夹下的kafka
子文件夹中。
安装ZooKeeper
- 将解压缩后的文件夹重命名为
kafka-zookeeper
。 - 修改
config/zookeeper.properties
文件:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
# 此处注意,如果文件目录不存在,会自动创建
dataDir=E:/cluster/kafka-zookeeper/data
# the port at which the clients will connect
# ZooKeeper默认端口为2181
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
安装Kafka
- 将解压缩后的
kafka-zookeeper
文件夹复制一份,并重命名为kafka-node-1
。 - 修改
config/server.properties
配置文件:
# The id of the broker. This must be set to a unique integer for each broker.
# kafka节点数字标识,集群内具有唯一性
broker.id=1
# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# 监听器 9091为本地端口,如果冲突,请重新指定
listeners=PLAINTEXT://:9091
# A comma separated list of directories under which to store log files
# 数据文件路径,如果不存在,会自动创建
log.dirs=E:/cluster/kafka-node-1/data
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# ZooKeeper软件连接地址,2181为默认的ZK端口号 /kafka 为ZK的管理节点
zookeeper.connect=localhost:2181/kafka
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=190
log.flush.interval.messages=2
log.index.interval.bytes=17
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
- 将
kafka-node-1
文件夹复制两份,分别重命名为kafka-node-2
,kafka-node-3
。 - 分别修改
kafka-node-2
,kafka-node-3
文件夹中的server.properties
配置文件: - 将broker.id=1
改为broker.id=2
,broker.id=3
;- 将9091
改为9092
,9093
(如端口冲突,请重新设置);- 将kafka-node-1
改为kafka-node-2
,kafka-node-3
。
封装启动脚本
由于启动Kafka集群前需先启动ZooKeeper,并且Kafka集群包含多个节点,因此启动过程较为繁琐。为此,我们将启动指令封装进批处理文件中:
- 在
kafka-zookeeper
文件夹下创建zk.cmd
批处理文件。 - 在
zk.cmd
文件中添加启动命令:call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
- 在
kafka-node-1
,kafka-node-2
,kafka-node-3
文件夹下分别创建kfk.cmd
批处理文件。 - 在
kfk.cmd
文件中添加启动命令:call bin/windows/kafka-server-start.bat config/server.properties
- 在
cluster
文件夹下创建cluster.cmd
批处理文件,用于启动Kafka集群。 - 在
cluster.cmd
文件中添加启动命令:cd kafka-zookeeperstart zk.cmdping 127.0.0.1 -n 10 >nulcd ../kafka-node-1start kfk.cmdcd ../kafka-node-2start kfk.cmdcd ../kafka-node-3start kfk.cmd
- 在
cluster
文件夹下创建cluster-clear.cmd
批处理文件,用于清理和重置Kafka数据。 - 在
cluster-clear.cmd
文件中添加清理命令:cd kafka-zookeeperrd /s /q datacd ../kafka-node-1rd /s /q datacd ../kafka-node-2rd /s /q datacd ../kafka-node-3rd /s /q data
- 双击执行
cluster.cmd
文件以启动Kafka集群。
启动集群命令后,会打开多个黑窗口,每个窗口代表一个Kafka服务,请勿关闭这些窗口,否则对应的Kafka服务将会停止。如果启动过程中出现错误,通常是由于ZooKeeper与Kafka之间的同步问题,请先执行
cluster-clear.cmd
文件,然后再执行
cluster.cmd
文件即可。
版权归原作者 大数据深度洞察 所有, 如有侵权,请联系我们删除。