0


「Kafka」Broker篇

「Kafka」Broker篇

主要讲解的是在 Kafka 中是怎么存储数据的,以及 Kafka 和 Zookeeper 之间如何进行数据沟通的。

Kafka Broker 总体工作流程

Zookeeper 存储的 Kafka 信息

  • 启动 Zookeeper 客户端:[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
  • 通过 ls 命令可以查看 kafka 相关信息:[zk: localhost:2181(CONNECTED)2]ls /kafkaimage-20240110143832837

image-20231229163857940

Kafka Broker 总体工作流程

image-20231229163930666

模拟 Kafka 上下线,Zookeeper 中数据变化:

  1. 查看 /kafka/brokers/ids 路径上的节点:image-20231229164453868
  2. 查看 /kafka/controller 路径上的数据:image-20231229164440799
  3. 查看 /kafka/brokers/topics/first/partitions/0/state 路径上的数据:image-20231229164521363
  4. 停止 hadoop104 上的 kafka: image-20240110142636596
  5. 再次查看 /kafka/brokers/ids 路径上的节点image-20240110142623644
  6. 再次查看 /kafka/controller 路径上的数据image-20240110142702080
  7. 再次查看 /kafka/brokers/topics/first/partitions/0/state 路径上的数据image-20240110142724094
  8. 启动 hadoop104 上的 kafkaimage-20240110142742279
  9. 再次观察 1、2、3 步骤中的内容。

Broker 重要参数

image-20231229164041782

image-20231229164056634

image-20231229164110883

生产经验—节点服役和退役

服役新节点

新节点准备

  1. 关闭 hadoop104,并右键执行克隆操作
  2. 开启 hadoop105,并修改 IP 地址image-20240110150510473
  3. 在 hadoop105 上,修改主机名称为 hadoop105image-20240110150536171
  4. 重新启动 hadoop104、hadoop105
  5. 修改 haodoop105 中 kafka 的 broker.id 为 3保证唯一[atguigu@hadoop105 config]$ vim server.propertiesimage-20240110155843127
  6. 删除 hadoop105 中 kafka 下的 datas 和 logs[atguigu@hadoop105 kafka]$ rm-rf datas/* logs/*
  7. 启动 hadoop102、hadoop103、hadoop104 上的 kafka 集群[atguigu@hadoop102 ~]$ zk.sh start[atguigu@hadoop102 ~]$ kf.sh start
  8. 单独启动 hadoop105 中的 kafka[atguigu@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties

我们先来看一下 first 主题的信息:

image-20240110160935007

目前 first 主题的信息仍然只存在 broker0、1、2上,但 broker3 并没有帮助分担历史数据,所以我们需要负载均衡的操作。

执行负载均衡操作

  1. 创建一个要均衡的主题:[atguigu@hadoop102 kafka]$ vim topics-to-move.json{"topics":[{"topic":"first"}], "version":1}
  2. 生成一个负载均衡的计划image-20240110160653990
  3. 创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json输入以下内容(刚生成的计划):{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
  4. 执行副本存储计划:image-20240110161449511
  5. 验证副本存储计划:image-20240110161658340image-20240110161609204

退役旧节点

执行负载均衡操作

先按照退役一台节点,生成

执行计划

,然后按照服役时操作流程

执行负载均衡

把要退役节点的数据导入到其他节点上。

  1. 创建一个要均衡的主题[atguigu@hadoop102 kafka]$ vim topics-to-move.json{"topics":[{"topic":"first"}], "version":1}
  2. 创建执行计划image-20240110162052104
  3. 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
  4. 执行副本存储计划[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  5. 验证副本存储计划[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verifyStatus of partition reassignment:Reassignment of partition first-0 is complete.Reassignment of partition first-1 is complete.Reassignment of partition first-2 is complete.Clearing broker-level throttles on brokers 0,1,2,3Clearing topic-level throttles on topic firstimage-20240110162329053

执行停止命令

在 hadoop105 上执行停止命令即可:

[atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh

Kafka 副本

副本基本信息

  • Kafka 副本作用:提高数据可靠性。

  • Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;- 太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。

  • Kafka 中副本分为:Leader 和 Follower。- Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。

  • Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

        A 
       
      
        R 
       
      
        = 
       
      
        I 
       
      
        S 
       
      
        R 
       
      
        + 
       
      
        O 
       
      
        S 
       
      
        R 
       
      
     
       AR = ISR + OSR 
      
     
    

    AR=ISR+OSR

       I 
      
     
       S 
      
     
       R 
      
     
    
      ISR 
     
    

    ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 **

replica.lag.time.max.ms

** 参数设定,默认

30s

。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。

     O 
    
   
     S 
    
   
     R 
    
   
  
    OSR 
   
  
OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。

Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。

Controller 的信息同步工作是依赖于 Zookeeper 的。

image-20240110153554112

Leader 选举会按照 AR 的顺序进行选取,就是下图中的

Replicas

顺序:

image-20240110153908376

image-20240110153923789

image-20240110153939706

Leader 和 Follower 故障处理细节

Follower 故障处理细节

消费者可见的数据最大 offset 就是 4,

      H 
     
    
      W 
     
    
      − 
     
    
      1 
     
    
   
     HW - 1 
    
   
 HW−1。

该 Follower 先被踢出 ISR 队列,然后其余的 Leader、Follower继续接受数据。如果该 Follower 恢复了,会读取本地磁盘上次记录的 HW,并裁剪掉 高于 HW 的数据,从 HW 开始向 Leader 进行同步数据。

image-20240111145337546

待该 Follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上了 Leader,就可以重新加入 ISR 了。

image-20240111145207846

Leader 故障处理细节

broker0 一开始是 Leader,然后挂掉了,选举 broker1 为新的 Leader,然后其余的 Follower 会把各自 log 文件高于 HW 的部分裁剪掉,然后从新的 Leader 同步数据。

image-20240110154045978

分区副本分配

如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka 底层如何分配存储副本呢?

创建 16 分区,3 个副本

  1. 创建一个新的 topic,名称为 secondimage-20240110154238901
  2. 查看分区和副本情况:image-20240110154302249

依次错开,让每一个副本负载均衡,均匀分配,也可以保证数据的可靠性。

image-20240110154314929

生产经验—手动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。

需求:创建一个新的topic,4个分区,两个副本,名称为 three。将该 topic 的所有副本都存储到 broker0 和 broker1 两台服务器上。

image-20240110154440464

手动调整分区副本存储的步骤如下:

image-20240110154459796

image-20240110154534569

image-20240111164010567

生产经验—Leader Partition 负载平衡

image-20240110154628926

image-20240110154640522

真正生产环境建议关闭,或设置 percentage 为 20%、30%,不要频繁的触发自平衡,浪费集群大量性能。

生产经验—增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

  1. 创建 topic[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create--partitions3 --replication-factor 1--topic four
  2. 手动增加副本存储1. 创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json输入如下内容:{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}2. 执行副本存储计划[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

文件存储

文件存储机制

Topic 数据的存储机制

image-20240116153508193

kafka 中默认数据保存 7 天,通过

.timeindex

文件判断日志保存多久,过期会定时清理对应的数据,详情参考下方的 - 文件清理策略。

思考:Topic 数据到底存储在什么位置?

image-20240111204450098
在这里插入图片描述
image-20240111204513026

index 文件和 log 文件详解

.index 文件面试考点:存储的索引是对每一条数据都进行存储吗?
**答:这里用的是

稀疏索引

,log 文件每存储

4kb

的数据,就会往 index 文件中写入一条索引。**

image-20240111204551632

说明:日志存储参数配置

image-20240111204615225

文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间:

  • log.retention.hours,最低优先级,小时,默认 7 天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级,毫秒。
  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka 中提供的日志清理策略有 delete 和 compact 两种。

1)delete 日志删除:将过期数据删除
  • log.cleanup.policy = delete 所有数据启用删除策略(默认)1. 基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。2. 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。log.retention.bytes,默认等于 -1,表示无穷大,其实就是关闭掉了。

思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

image-20240116154803754

以 segment 中所有记录中的最大时间戳作为该文件时间戳,进行删除。

也就是只要这个 segment 中有数据还未过期,就不进行删除操作。

2)compact 日志压缩

image-20240116154918725

高效读写数据

分布式集群

Kafka 本身是分布式集群,可以采用分区技术,并行度高。

稀疏索引

读数据采用稀疏索引,可以快速定位要消费的数据。

顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

image-20240116160711136

页缓存 + 零拷贝技术

image-20240116160741389

image-20240116160751069

笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)

标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/weixin_53407527/article/details/135628714
版权归原作者 小成同学_ 所有, 如有侵权,请联系我们删除。

“「Kafka」Broker篇”的评论:

还没有评论