0


jdk+zookeeper+kafka 搭建kafka集群

环境准备

环境资源包:
jdk-8u341-linux-x64.tar.gz
kafka_2.12-2.2.0.tgz
zookeeper-3.4.14.tar.gz
server-idip状态server110.206.120.10leaderserver210.206.120.2followerserver310.206.120.3follower

一、安装jdk

因为kafka需要Java环境,所以优先配置jdk环境,若已经配置了java环境,此步骤可以忽略

[root@VM-120-2-centos ~]# tar -xvf jdk-8u341-linux-x64.tar.gz [root@VM-120-2-centos ~]# mv jdk1.8.0_341/ /usr/local/#在文件末尾加入以下语句[root@VM-120-2-centos ~]# vim /etc/profileexportJAVA_HOME=/usr/local/jdk1.8.0_341
exportPATH=$JAVA_HOME/bin:$PATHexportCLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib

[root@VM-120-2-centos ~]# source /etc/profile[root@VM-120-2-centos ~]# java -versionjava version "1.8.0_341"
Java(TM) SE Runtime Environment (build 1.8.0_341-b10)
Java HotSpot(TM)64-Bit Server VM (build 25.341-b10, mixed mode)
至此jdk环境配置完成

二、zookeeper集群安装

[root@VM-120-2-centos ~]#  cd [root@VM-120-2-centos ~]# tar -xvf zookeeper-3.4.14.tar.gz [root@VM-120-2-centos ~]# mv zookeeper-3.4.14 /usr/local/zookeeper[root@VM-120-2-centos ~]# cd /usr/local/zookeeper/conf/[root@VM-120-2-centos ~]# cp zoo_sample.cfg zoo.cfg[root@VM-120-2-centos ~]# vim zoo.cfgtickTime=2000initLimit=10syncLimit=5dataLogDir=/usr/local/zookeeper/logs
dataDir=/usr/local/zookeeper/data
clientPort=2181autopurge.snapRetainCount=500autopurge.purgeInterval=24server.1=10.206.120.10:2888:3888
server.2=10.206.120.2:2888:3888
server.3=10.206.120.2:2888:3888
[root@VM-120-2-centos ~]# mkdir /usr/local/zookeeper/data#10.206.120.10服务器上执行[root@VM-120-2-centos ~]# echo "1" > /usr/local/zookeeper/data/myid #10.206.120.2服务器上执行       [root@VM-120-2-centos ~]# echo "2" > /usr/local/zookeeper/data/myid#10.206.120.3服务器上执行       [root@VM-120-2-centos ~]# echo "3" > /usr/local/zookeeper/data/myid[root@VM-120-2-centos ~]# cd ../bin/[root@VM-120-2-centos ~]# ./zkServer.sh start[root@VM-120-2-centos ~]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader
[root@VM-120-2-centos ~]# netstat -ntlp
看看端口是否正常启动

二、kafka集群安装

[root@VM-120-2-centos ~]# cd [root@VM-120-2-centos ~]# tar -xvf kafka_2.12-2.2.0.tgz [root@VM-120-2-centos ~]# mv kafka_2.12-2.2.0 /usr/local/kafka[root@VM-120-2-centos ~]# cd /usr/local/kafka/config/[root@VM-120-2-centos ~]# cp server.properties server.properties.bak#10.206.120.2服务器上将以下内容写入文件[root@VM-120-2-centos ~]# vim server.propertiesbroker.id=2listeners=PLAINTEXT://10.206.120.2:9092
advertised.listeners=PLAINTEXT://43.137.8.225:9092
num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=204857600log.dirs=/usr/local/kafka/logs
num.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=1680log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=10.206.120.2:2181,10.206.120.10:2181
zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=10max.poll.interval.ms=800000max.poll.records=50#10.206.120.10服务器上将以下内容写入文件[root@VM-120-2-centos ~]# vim server.propertiesbroker.id=1listeners=PLAINTEXT://10.206.120.10:9092
advertised.listeners=PLAINTEXT://118.195.137.101:9092
num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=204857600log.dirs=/usr/local/kafka/logs
num.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=1680log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=10.206.120.2:2181,10.206.120.10:2181
zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=10max.poll.interval.ms=800000max.poll.records=50#10.206.120.3服务器上将以下内容写入文件[root@VM-120-2-centos ~]# vim server.propertiesbroker.id=3listeners=PLAINTEXT://10.206.120.3:9092
advertised.listeners=PLAINTEXT://175.27.146.204:9092
num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=204857600log.dirs=/usr/local/kafka/logs
num.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=1680log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=10.206.120.2:2181,10.206.120.10:2181
zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=10max.poll.interval.ms=800000max.poll.records=50[root@VM-120-2-centos ~]# cd ../bin/[root@VM-120-2-centos ~]# nohup ./kafka-server-start.sh ../config/server.properties &#查看9092端口是否启动,启动即为正常[root@VM-120-10-centos bin]# netstat -ntlp  
#下面所有命令在kafka的bin目录下执行(笔者是/usr/local/kafka/bin)#创建topic话题
./kafka-topics.sh --create --bootstrap-server 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --replication-factor 3--topictest--partitions3#开启一个生产者
./kafka-console-producer.sh --broker-list 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --topictest#开启一个消费者
./kafka-console-consumer.sh --bootstrap-server 10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --topictest --from-beginning
#列出所有topic
./kafka-topics.sh --zookeeper10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 -list#输出某个topic
./kafka-topics.sh --zookeeper10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --delete--topictest#详细描述某个topic
./kafka-topics.sh --zookeeper10.206.120.10:2181,10.206.120.2:2181,10.206.120.3:2181 --describe--topictest

在这里插入图片描述
在这里插入图片描述
至此集群搭建成功,可以正常使用了

四、测试

外网测试

#1、上传测试压缩包并解压如下图1#2、更改application.yml将ip更改为集群主机的外网ip
spring:
  application:
    name: kafka-tester
  kafka:
    bootstrap-servers:
      - 10.206.120.3:9092
      - 10.206.120.2:9092
      - 10.206.120.10:9092
    producer:
      topic: test
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      topic: test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

#3、 后台启动kafka测试包nohup ./test.sh &#4、开启一个消费者,查看jar包生产结果图2(说明kafka-test.jar包启动正常,kafka集群工作正常)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 175.27.146.204:9092,118.195.137.101:9092,43.137.8.225:9092 --topictest --from-beginning

#5、查看consumer_offsets只有一个副本数图三
/usr/local/kafka/bin/kafka-topics.sh --zookeeper175.27.146.204:2181,118.195.137.101:2181,43.137.8.225:2181 --describe--topic __consumer_offsets

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

将集群中的一个follower关机,模拟服务器宕机
结果发现消息队列出现问题(kafka集群不可用)
在这里插入图片描述
将服务器启动,并将zookeeper和kafka集群启动,此时集群才正常
在这里插入图片描述

故当__consumer_offsets副本数设置为1时,若服务器宕机了,则kafka集群无法正常使用,服务器启动,并将zookeeper和kafka集群启动,此时集群才正常

解决方案

1.修改系统_offsets副本数为3
修改kafka的核心配置文件server.properties

num.partitions参数(默认为1)

修改为3,

offsets.topic.replication.factor=3(默认为1)

另外需要添加

auto.create.topics.enable=true

在这里插入图片描述

由于__consumer_offsets是kafka默认的主题,无法删除,我们可以删除zookeeper中的__consumer_offsets。
进入zookeeper/bin目录执行./zkCli.sh
ls /broksers/topics
rmr /brokers/topics/__consumer_offsets
ls /broksers/topics
在这里插入图片描述

先将集群停掉
在重新启动zookeeper和kafka
再次查看__consumer_offsets。发现副本数已经是3
在这里插入图片描述
将集群中的一个follower关机,模拟服务器宕机
消费队列正常
在这里插入图片描述

cd /usr/local/zookeeper/bin/
./zkServer.sh start

cd /usr/local/kafka/bin/
nohup ./kafka-server-start.sh …/config/server.properties &

./kafka-topics.sh --zookeeper 175.27.146.204:2181,118.195.137.101:2181,43.137.8.225:2181 --describe --topic __consumer_offsets

在这里插入图片描述


本文转载自: https://blog.csdn.net/givenchy_yzl/article/details/134973051
版权归原作者 givenchy_yzl 所有, 如有侵权,请联系我们删除。

“jdk+zookeeper+kafka 搭建kafka集群”的评论:

还没有评论