Kafka系列整体栏目
内容链接地址【一】afka安装和基本核心概念https://zhenghuisheng.blog.csdn.net/article/details/142213307【二】kafka集群搭建https://zhenghuisheng.blog.csdn.net/article/details/142253288
kafka集群搭建
一,kafka集群搭建
1,kafka下载和安装
在集群搭建之前,需要先在官网下载一个压缩包,下载地址如下,目前下载的版本是 Scala 2.12 - kafka_2.12-3.6.2.tgz (asc, sha512),一定要下载二进制文件,不要下载源码
https://kafka.apache.org/downloads
随后将压缩包解压,这里把压缩包上传到 /usr/local/software/kafka 下面
tar -zxvf kafka_2.12-3.6.2.tgz
2,基础配置
由于我这边只有一台机器,因此搭建的是一台机器三个broker的集群,不管是在同一台机器上还是在不同的机器上,以下需要执行的命令都是相同的。
在解压完成之后,先复制安装目录下的config目录下的 server.properties 文件,先复制三份
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties
复制完成之后,分别给每个配置设置对应的数据存储目录,在 /usr/local/software/kafka/temp 目录下面,再创建三个目录
mkdir logs1
mkdir logs2
mkdir logs3
3,三台机器服务配置
修改 server-1.properties 文件的参数如下,端口号用9093,日志目录设置为logs1文件目录下
broker.id=1// 机器内部的唯一标识
listeners=PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://localhost:9093//ip用localhost或者服务器ip
log.dirs=/usr/local/software/kafka/temp/logs1 //设置kafka的日志目录
zookeeper.connect=localhost:2181//连接zookeeper,默认端口是2181
修改 server-2.properties 文件的参数如下,端口号用9094,日志目录设置为logs2文件目录下
broker.id=2// 机器内部的唯一标识
listeners=PLAINTEXT://0.0.0.0:9094
advertised.listeners=PLAINTEXT://localhost:9094//ip用localhost或者服务器ip
log.dirs=/usr/local/software/kafka/temp/logs2 //设置kafka的日志目录
zookeeper.connect=localhost:2181//连接zookeeper,默认端口是2181
修改 server-3.properties 文件的参数如下,端口号用9095,日志目录设置为logs3文件目录下
broker.id=3// 机器内部的唯一标识
listeners=PLAINTEXT://0.0.0.0:9095
advertised.listeners=PLAINTEXT://localhost:9095//ip用localhost或者服务器ip
log.dirs=/usr/local/software/kafka/temp/logs3 //设置kafka的日志目录
zookeeper.connect=localhost:2181//连接zookeeper,默认端口是2181
4,启动zookeeper
在设置了三个机器的集群之后,先来启动这个zookeeper,zookeeper此时是只有一个节点,用的kaka下载内部自带的,因此需要在zk的配置文件中做相关配置
zookeeper.connect=localhost:2181//连接zookeeper,默认端口是2181
因此也不需要去额外的安装kafka。zookeeper的启动命令如下,通过安装目录下的bin中的脚本配合config目录下的配置结合使用。
bin/zookeeper-server-start.sh config/zookeeper.properties
5,启动三个kafka结点
启动三个节点的命令如下,启动时需要设置不同的结点的配置,并且通过
-daemon
后台的方式运行,也可以不加此命令,直接开三个端口分别执行
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties
bin/kafka-server-start.sh -daemon config/server-3.properties
随后输入jps命令,可以查看到此时有三个kakfa的进程在启动
随后查看zookeeper上面是否已经有了这三个broker节点,通过下图可知此时三个节点都已经注册到了zookeeper上面
//执行进入zookeeper命令
bin/zookeeper-shell.sh localhost:2181//查看broker结点情况
ls /brokers/ids
6,创建主题
创建一个3个副本,1个分区的Order订单主题,在9093的端口上。在执行这条命令的时候遇到报错的话,查看服务器端口9093,-9095是否开放
// partitions 表示设置分区数,replication-factor表示设置副本数
bin/kafka-topics.sh --create --topic 'order'--bootstrap-server localhost:9093--partitions 1--replication-factor 3
打印的结果如下,表示主题创建成功
Created topic order.
此时在9093的broker中已经有了order这个主题
//查看9093对应broker下的全部主题
bin/kafka-topics.sh --list --bootstrap-server localhost:9093
查看order主题的详情如下
bin/kafka-topics.sh --describe --topic order --bootstrap-server localhost:9093
接下来对返回的参数做一个详细的解释
- Topic: order,指的是创建主题的名称为order
- TopicId: Tf1fRjPDTBiL0WzNwgdtZQ,指的是topic的唯一id
- PartitionCount: 1:指的是主题的分区数量,上面创建主题时设置的分区就是1个
- ReplicationFactor: 3:指的是主题的副本,上面创建主题时设置的副本为3个
- Partition: 0:指的是分区的详细信息,kafka从编号0开始
- Leader: 2:表示在多副本中,broker2上面的副本为leader节点,负责处理生产者和消费者的请求
- Replicas: 2,1,3:表示该分区所有副本所在的broker列表,只有一个副本是leader,其他的是follower
- Isr: 2,1,3:表示能与leader同步的集合,可参与选举的副本
7,发送和消费数据
生产者发送消息如下,先发送到9093所在的broker上面
bin/kafka-console-producer.sh --topic order --bootstrap-server localhost:9093
再切换到 /usr/local/software/kafka/temp/logs1 上面设置的log目录,然后切换到 order-0 目录下面,可以发现数据已经成功的发送到了broker中的分区上面
消费者消费最新的消息,也可以全量消费,因为kakfa会将这些日志默认的保留7天
bin/kafka-console-consumer.sh --topic order --bootstrap-server localhost:9093
bin/kafka-console-consumer.sh --topic order --from-beginning --bootstrap-server localhost:9093
也可以设置消费者组消费,一个分区中的消息只能被消费者组中的一个消费者消费
bin/kafka-console-consumer.sh --consumer-property group.id=group111 --topic order --bootstrap-server localhost:9093
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093--describe --group group111
大致的集群架构如下图,只不过这里是只有一个主题,并且分区只设置了一个
版权归原作者 huisheng_qaq 所有, 如有侵权,请联系我们删除。