0


Kafka技术详解[4]:构建简易Windows环境下的Kafka集群

Kafka基础

Kafka虽然借鉴了JMS规范的思想,但在设计原理上并未完全遵循JMS规范。因此,Kafka内部包含了许多用于数据传输的组件对象,这些组件相互关联,共同实现了高效的数据传输。下面,我们将详细介绍Kafka中的基础概念及核心组件,并展示如何在Windows环境下搭建一个简单的Kafka集群以供学习和练习之用。

集群部署

尽管生产环境中通常使用Linux系统搭建服务器集群,但为了便于理解和实践,我们将在此章节中搭建一个基于Windows系统的简易集群。关于Linux集群的搭建将在后续章节中详述。

解压文件

  1. 在磁盘根目录创建文件夹cluster,文件夹名称应保持简短。
  2. 将Kafka安装包kafka_2.12-3.6.1.tgz解压缩至cluster文件夹下的kafka子文件夹中。

安装ZooKeeper

  1. 将解压缩后的文件夹重命名为kafka-zookeeper
  2. 修改config/zookeeper.properties文件:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
# 此处注意,如果文件目录不存在,会自动创建
dataDir=E:/cluster/kafka-zookeeper/data
# the port at which the clients will connect
# ZooKeeper默认端口为2181
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

安装Kafka

  1. 将解压缩后的kafka-zookeeper文件夹复制一份,并重命名为kafka-node-1
  2. 修改config/server.properties配置文件:
# The id of the broker. This must be set to a unique integer for each broker.
# kafka节点数字标识,集群内具有唯一性
broker.id=1

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
# 监听器 9091为本地端口,如果冲突,请重新指定
listeners=PLAINTEXT://:9091

# A comma separated list of directories under which to store log files
# 数据文件路径,如果不存在,会自动创建
log.dirs=E:/cluster/kafka-node-1/data

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# ZooKeeper软件连接地址,2181为默认的ZK端口号 /kafka 为ZK的管理节点
zookeeper.connect=localhost:2181/kafka

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=190
log.flush.interval.messages=2
log.index.interval.bytes=17

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
  1. kafka-node-1文件夹复制两份,分别重命名为kafka-node-2kafka-node-3
  2. 分别修改kafka-node-2kafka-node-3文件夹中的server.properties配置文件: - 将broker.id=1改为broker.id=2broker.id=3;- 将9091改为90929093(如端口冲突,请重新设置);- 将kafka-node-1改为kafka-node-2kafka-node-3

封装启动脚本

由于启动Kafka集群前需先启动ZooKeeper,并且Kafka集群包含多个节点,因此启动过程较为繁琐。为此,我们将启动指令封装进批处理文件中:

  1. kafka-zookeeper文件夹下创建zk.cmd批处理文件。
  2. zk.cmd文件中添加启动命令: call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
  3. kafka-node-1kafka-node-2kafka-node-3文件夹下分别创建kfk.cmd批处理文件。
  4. kfk.cmd文件中添加启动命令: call bin/windows/kafka-server-start.bat config/server.properties
  5. cluster文件夹下创建cluster.cmd批处理文件,用于启动Kafka集群。
  6. cluster.cmd文件中添加启动命令: cd kafka-zookeeperstart zk.cmdping 127.0.0.1 -n 10 >nulcd ../kafka-node-1start kfk.cmdcd ../kafka-node-2start kfk.cmdcd ../kafka-node-3start kfk.cmd
  7. cluster文件夹下创建cluster-clear.cmd批处理文件,用于清理和重置Kafka数据。
  8. cluster-clear.cmd文件中添加清理命令: cd kafka-zookeeperrd /s /q datacd ../kafka-node-1rd /s /q datacd ../kafka-node-2rd /s /q datacd ../kafka-node-3rd /s /q data
  9. 双击执行cluster.cmd文件以启动Kafka集群。

启动集群命令后,会打开多个黑窗口,每个窗口代表一个Kafka服务,请勿关闭这些窗口,否则对应的Kafka服务将会停止。如果启动过程中出现错误,通常是由于ZooKeeper与Kafka之间的同步问题,请先执行

cluster-clear.cmd

文件,然后再执行

cluster.cmd

文件即可。

标签: kafka windows 分布式

本文转载自: https://blog.csdn.net/qq_45115959/article/details/142497375
版权归原作者 大数据深度洞察 所有, 如有侵权,请联系我们删除。

“Kafka技术详解[4]:构建简易Windows环境下的Kafka集群”的评论:

还没有评论