1. 为什么要使用kafka集群
单机服务下,Kafka已经具备了非常高的性能。TPS能够达到百万级别。但是,在实际工作中使用时,单机搭建的Kafka会有很大的局限性。
**消息太多,需要分开保存。**Kafka是面向海量消息设计的,一个Topic下的消息会非常多,单机服务很难存得下来。这些消息就需要分成不同的Partition,分布到多个不同的Broker上。这样每个Broker就只需要保存一部分数据。这些分区的个数就称为分区数。
- 服务不稳定,数据容易丢失。单机服务下,如果服务崩溃,数据就丢失了。为了保证数据安全,就需要给每个Partition配置一个或多个备份,保证数据不丢失。Kafka的集群模式下,每个Partition都有一个或多个备份。Kafka会通过一个统一的Zookeeper集群作为选举中心,给每个Partition选举出一个主节点Leader,其他节点就是从节点Follower。主节点负责响应客户端的具体业务请求,并保存消息。而从节点则负责同步主节点的数据。当主节点发生故障时,Kafka会选举出一个从节点成为新的主节点。
- ** Kafka集群中的这些Broker信息,包括Partition的选举信息,都会保存在额外部署的Zookeeper集群当中**,这样,kafka集群就不会因为某一些Broker服务崩溃而中断。
2. kafka的集群架构
由章节1中对kafka集群特点的描述,我们可以大致画出kafka的集群架构图大致如下:
3. kafka集群搭建
接下来我们就动手部署一个Kafka集群,来体验一下Kafka是如何面向海量数据进行横向扩展的。
3.1 搭建kafka集群
搭建环境:
准备3台虚拟机
安装jdk
jdk安装可参考Linux环境下安装JDK-CSDN博客
关闭防火墙(实验版本关闭防火墙,生产环境开启对应端口即可)
firewall-cmd --state 查看防火墙状态 systemctl stop firewalld.service 关闭防火墙
** 第一步: zookeeper 集群搭建**
kafka依赖于zookeeper,虽然kafka内部自带了一个zookeeper,为了保证服务之间的独立性,不建议使用其内部的zookeeper,所以我们先搭建一个独立的zookeeper集群。Zookeeper是一种多数同意的选举机制,允许集群中少数节点出现故障。因此,在搭建集群时,通常都是**采用3,5,7这样的奇数节点**,这样可以最大化集群的高可用特性。 zk集群搭建参考:zookeeper之集群搭建-CSDN博客
根据上述文章搭建完成之后,启动zookeeper集群
第二步: 部署kafka集群
** kafka服务并不需要进行选举,因此也没有奇数台服务的建议。 **
**首先,三台机器都根据 **
初识Kafka-CSDN博客 中的单机服务体验,上传、解压kafka到
解压到/app/kafka目录下。接下来,进入config目录,修改server.properties。这个配置文件里面的配置项非常多,下面列出几个要重点关注的配置
#broker 的全局唯一编号,不能重复,只能是数字。 broker.id=0 #数据文件地址。同样默认是给的/tmp目录。 log.dirs=/app/kafka/logs #默认的每个Topic的分区数 num.partitions=1 #zookeeper的服务地址 zookeeper.connect=192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181
broker.id需要每个服务器上不一样,分发到其他服务器上时,要注意修改一下,比如第一台是0,第二台就是1,第三台的配置就是2。
当多个Kafka服务注册到同一个zookeeper集群上的节点,会自动组成集群。
**server.properties文件中比较重要的核心配置 **
** 第三步: 启动kafka集群**
bin/kafka-server-start.sh -daemon config/server.properties
-daemon表示后台启动kafka服务,这样就不会占用当前命令窗口。
jps看一下启动情况:
3.2 搭建中遇到的问题
3.2.1 启动时报错
问题1描述: 启动kafka时,报没有与主机关联的地址
** 问题解决**:在hosts文件中追加 127.0.0.1 主机名 localhost
示例如下:
问题2描述: node already exists
问题解决: 检查kafka集群各个节点的server.properties,看broker.id有没有一样的,各个节点的broker.id不能重复。
3.2.1 启动成功后创建topic报错
问题描述:执行以下命令报连接超时
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 2 --partitions 4 --topic disTopic
** 问题解决:**命令中的localhost修改为本机ip,还是报超时;
最终解决,修改每个节点的server.properties,配置监听
然后执行命令时使用主机ip
bin/kafka-topics.sh --bootstrap-server 192.168.31.232:9092 --create --replication-factor 2 --partitions 4 --topic disTopic
创建成功
4. 理解Kafka集群当中核心的Topic、Partition、Broker
接下来,我们创建几个topic,对比单机服务下,理解以下topic、partition和broker。
创建几个topic:
bin/kafka-topics.sh --bootstrap-server 192.168.31.5:9092 --create --replication-factor 2 --partitions 4 --topic disTopic
列出所有topic:
bin/kafka-topics.sh --bootstrap-server 192.168.31.5:9092 --list __consumer_offsets
查看topic列表情况:
bin/kafka-topics.sh --bootstrap-server 192.168.31.5:9092 --describe -- topic disTopic
1、--create创建,可以指定一些补充的参数。大部分的参数都可以在配置文件中指定默认值。
- partitons参数表示分区数,这个Topic下的消息会分别存入这些不同的分区中。示例中创建的disTopic,指定了四个分区,也就是说这个Topic下的消息会划分为四个部分。
- replication-factor表示每个分区有几个备份。示例中创建的disTopic,指定了每个partition有两个备份。
2、--describe查看Topic信息。
partiton参数列出了四个partition,后面带有分区编号,用来标识这些分区。
Leader表示这一组partiton中的Leader节点是哪一个。这个Leader节点就是负责响应客户端请求的主节点。从这里可以看到,Kafka中的每一个Partition都会分配Leader,也就是说每个Partition都有不同的节点来负责响应客户端的请求。这样就可以将客户端的请求做到尽量的分散。
Replicas参数表示这个partition的多个备份是分配在哪些Broker上的。也称为AR。这里的0,1,2就对应配置集群时指定的broker.id。但是,Replicas列出的只是一个逻辑上的分配情况,并不关心数据实际是不是按照这个分配。甚至有些节点服务挂了之后,Replicas中也依然会列出节点的ID。
ISR参数表示partition的实际分配情况。他是AR的一个子集,只列出那些当前还存活,能够正常同步数据的那些Broker节点。
接下来,我们还可以查看**Topic下的Partition分布情况**。在Broker上,与消息,联系最为紧密的,其实就是Partition了。之前在配置Kafka集群时,指定了一个log.dirs属性,指向了一个服务器上的日志目录。进入这个目录,就能看到每个Broker的实际数据承载情况。
从这里可以看到,Broker上的一个Partition对应了日志目录中的一个目录。而这个Partition上的所有消息,就保存在这个对应的目录当中。
从整个过程可以看到,Kafka当中,**Topic是一个数据集合的逻辑单元**。同一个Topic下的数据,实际上是存储在Partition分区中的,**Partition就是数据存储的物理单元**。而**Broker是Partition的物理载体**,这些Partition分区会尽量均匀的分配到不同的Broker机器上。而之前接触到的offset,就是每个消息在partition上的偏移量。
Kafka为何要这样来设计Topic、Partition和Broker的关系呢?
1、Kafka设计需要支持海量的数据,而这样庞大的数据量,一个Broker是存不下的。那就拆分成多个Partition,每个Broker只存一部分数据。这样极大的扩展了集群的吞吐量。
2、每个Partition保留了一部分的消息副本,如果放到一个Broker上,就容易出现单点故障。所以就给每个Partition设计Follower节点,进行数据备份,从而保证数据安全。另外,多备份的Partition设计也提高了读取消息时的并发度。
3、在同一个Topic的多个Partition中,会产生一个Partition作为Leader。这个Leader Partition会负责响应客户端的请求,并将数据往其他Partition分发。
5.总结
经过上面的实验,我们接触到了很多Kafka中的概念。将这些基础概念整合起来,就形成了Kafka集群的整体结构。这次我们先把这个整体结构梳理清楚,后续再一点点去了解其中的细节。
![](https://img-blog.csdnimg.cn/direct/1580f26e63264429b38a3f48ebdbdfbe.png)
1、Topic是一个逻辑概念,Producer和Consumer通过Topic进行业务沟通。
2、Topic并不存储数据,Topic下的数据分为多组Partition,尽量平均的分散到各个Broker上。每组Partition包含Topic下一部分的消息。每组Partition包含一个Leader Partition以及若干个Follower Partition进行备份,每组Partition的个数称为备份因子 replica factor。
3、Producer将消息发送到对应的Partition上,然后Consumer通过Partition上的Offset偏移量,记录自己所属消费者组Group在当前Partition上消费消息的进度。
4、Producer发送给一个Topic的消息,会由Kafka推送给所有订阅了这个Topic的消费者组进行处理。但是在每个消费者组内部,只会有一个消费者实例处理这一条消息。
5、最后,Kafka的Broker通过Zookeeper组成集群。然后在这些Broker中,需要选举产生一个担任Controller角色的Broker。这个Controller的主要任务就是负责Topic的分配以及后续管理工作。在我们实验的集群中,这个Controller实际上是通过ZooKeeper产生的。
版权归原作者 瑜伽娃娃 所有, 如有侵权,请联系我们删除。