0


Kafka 物理存储机制

优质博文:IT-BLOG-CN

一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从

Kafka

文件存储机制和物理结构角度,分析

Kafka

是如何实现高效文件存储,及实际应用效果。

Kafka

的基本存储单位是分区。在配置

Kafka

的时候,管理员指定了一个用于存储分区的目录清单

log.dirs

参数的值。

一、分区分配

创建主题时,

Kafka

首先决定如何在

broker

之间分配分区。假设有

6

broker

,打算创建一个包含

10

个分区的主题。并且复制系数是

3

,相当于

30

个分区副本。在被分配到

6

broker

上时,要达到如下的目标:
【1】在

broker

间平均分配分区副本。对于上述例子来说,就是要保证每个

broker

可以分到

5

个副本。
【2】确保每个分区的每个副本分布在不同的

broker

上。
【3】如果为

broker

指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的

broker

上。这样做是为了保证一个机架不可用不会导致整个分区不可用。

为了实现这个目标,我们先随机选择一个

broker

(假设是

2

),然后通过轮询给每个

broker

分配分区来确定首领的位置。如果分区

0

的首领在

broker2

上,那么分区

1

的首领就在

broker3

上,以此类推。然后,从分区首领开始,以此分配跟随者副本。如分区

0

首领在

broker2

上,那么它的第一个副本会出现在

broker3

上,第二个出现在

broker4

上。如果配置了机架信息,那么就不是按照数字顺序来选择

broker

了,而是按照交替机架的方式来选择

broker

。假设

broker0

broker1

broker2

放在同一个机架,

broker3

broker4

broker5

放在其他不同的机架。此时就不是按照

0

5

的顺序来选择

broker

,而是按照

0

,

3

,

1

,

4

,

2

,

5

的顺序进行选择的。

二、文件管理

保留数据时

Kafka

的一个基本特性,

Kafka

不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反,

Kafka

管理员为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。 因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以我们把分区分成若干个片段。默认情况下,

index

大小为

10M

,每个片段

log

包含

1GB

或一周数据,以较小的那个为准。当前正在写入数据的片段叫做活跃片段,活跃片段永远不会被删除。

三、文件格式

我们把

Kafka

的消息和偏移量保存在文件里。保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的。因为使用相同的消息格式进行磁盘存储和网络传输,

Kafka

可以使用零复制技术给消费者发送消息,同时避免了对生产者已经压缩过的消息进行解压缩。除了键、值和偏移量外,消息里还包含了消息大小、校验和、消息格式版本号、压缩算法和时间戳。时间戳可以是生产者发送消息的时间,也可以是消息到达

broker

的时间,这个是可配置的。如果生产者发送的是压缩过的消息,那么同一个批次的消息会被再压缩一次,被当做包装消息进行发送。下面是普通消息和包装消息图:

四、文件存储机制

**【1】

Broker

:** 消息中间件处理结点,一个

Kafka

节点就是一个

Broker

,多个

Broker

可以组成一个

Kafka

集群。
**【2】

Topic

:** 主题,如

page view

日志、

click

日志等都可以以

Topic

的形式存在,

Kafka

集群能够同时负责多个

Topic

的分发。
**【3】

Partition

:**

Topic

物理上的分组,一个

Topic

可以分为多个

Partition

,每个

Partition

是一个有序的队列。
**【4】

Segment

:**

Partition

物理上由多个

Segment

组成。
**【5】

offset

:** 每个

Partition

都由一系列有序的、不可变的消息组成,这些消息被连续的追加到

Partition

中。

Partition

中的每个消息都有一个连续的序列号叫做

offset

,用于

Partition

唯一标识一条消息。

**分析过程分为以下

4

个步骤:**

**【1】

Topic

Partition

存储分布:** 假设

Kafka

集群只有一个

Broker

xxx/message-folder

为数据文件存储根目录,在

Kafka Broker

server.properties

文件配置(参数

log.dirs=xxx/message-folder

),例如创建

2

Topic

名称分别为

report_push

launch_info

Partitions

数量都为

partitions=4

(将一个

Topic

分为

4

个部分存储)存储路径和目录规则为:

xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3

**【2】

Partiton

中文件存储方式:** 每个

Partion

(目录)相当于一个巨型文件被平均分配到多个大小相等

Segment

(段)数据文件中。但每个段

Segment file

消息数量不一定相等,这种特性方便

old segment file

快速被删除。每个

Partiton

只需要支持顺序读写就行了,

Segment

文件生命周期由服务端配置参数决定。这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

【3】

partiton

segment

文件存储结构:

segment file

2

大部分组成,分别为

index file

data file

,此

2

个文件成对出现,后缀

".index"

“.log”

分别表示为

segment

索引文件、数据文件。

segment

文件命名规则:

partion

全局的第一个

segment

0

开始,后续每个

segment

文件名为上一个

segment

文件最后一条消息的

offset

值。数值最大为

64

long

大小,

19

位数字字符长度,没有数字用0填充。下面文件列表是笔者在

Kafka broker

上做的一个实验,创建一个

topicXXX

包含

1 partition

,设置每个

segment

大小为

500MB

,并启动

producer

Kafka broker

写入大量数据,如下图所示

segment

文件列表形象说明了上述

2

个规则以及

segment

index<—->data file

对应关系物理结构如下:

索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中

message

的物理偏移地址。 其中以索引文件中元数据

3,497

为例,依次在数据文件中表示第

3

message

(在全局

partiton

表示第

368772

message

)、以及该消息的物理偏移地址为

497

。从上述图

3

了解到

segment data file

由许多

message

组成,下面详细说明

message

物理结构如下:

【参数说明】:

8 byte offset

:在

Parition

(分区)内的每条消息都有一个有序的

id

号,这个

id

号被称为偏移

offset

,它可以唯一确定每条消息在

Parition

内的位置。即

offset

表示

Partiion

的第多少

message

4 byte message size

message

大小;

4 byte CRC32

:用

crc32

校验

message

1 byte “magic"

:表示本次发布

Kafka

服务程序协议版本号;

1 byte “attributes"

:表示为独立版本、或标识压缩类型、或编码类型;

4 byte key length

:表示

key

的长度,当

key

-1

时,

K byte key

字段不填;

value bytes payload

:表示实际消息数据;

`index文件结构:

offset: 783932 position: 69483992
offset: 784323 position: 69543233
offset: 784565 position: 69589443
offset: 784932 position: 69623433
offset: 785355 position: 69658994
offset: 785894 position: 69704355
offset: 786389 position: 69738993
offset: 786584 position: 69784345

**

log

文件结构:** 有个眼缘即可

offset: 784932 CreateTime:1598161852389 keysize: -1 valuesize: 15 sequence: 9884 baseOffset: 7043213 lastOffset: 784932 count: 1 baseSequence: 907

**【4】在

partition

中如何通过

offset

查找

message

:** 例如读取

offset=368776

Message

,需要通过下面

2

个步骤查找。
**【第一步】查找

segment file

:** 上图为例,其中

00000000000000000000.index

表示最开始的文件,起始偏移量

offset

为0。第二个文件

00000000000000368769.index

的消息量起始偏移量为

368770 = 368769 + 1

。同样,第三个文件

00000000000000737337.index

的起始偏移量为

737338=737337 + 1

,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据

offset

二分查找文件列表,就可以快速定位到具体文件。当

offset=368776

时定位到

00000000000000368769.index|log


**【第二步】通过

segment file

查找

message

:** 通过第一步定位到

segment file

,当

offset=368776

时,依次定位到

00000000000000368769.index

的元数据物理位置和

00000000000000368769.log

的物理偏移地址,然后再通过

00000000000000368769.log

顺序查找直到

offset=368776

为止。从上述图可知这样做的优点,

segment index file

采取稀疏索引存储方式,它减少索引文件大小,通过

mmap

可以直接内存操作,稀疏索引为数据文件的每个对应

message

设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。通过上述过程详细分析,我们就可以清楚认识到

kafka

文件存储机制的奥秘。

五、Kafka文件实际运行效果

【实验环境】:

Kafka

集群 = 由

2

台虚拟机组成;

CPU = 4

核;物理内存

= 8GB

;网卡 = 千兆网卡;

JVM HEAP = 4GB

;详细

Kafka

服务端配置及其优化请参考:

Kafka server.properties

配置详解:

从上述图可以看出,

Kafka

运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效。这跟

Kafka

文件存储中读写

message

的设计是息息相关的。

Kafka

中读写

message

有如下特点:写

message

,消息从

java

堆转入

page cache

即物理内存。由异步线程刷盘,消息从

page cache

刷入磁盘。读

message

消息直接从

page cache

转入

socket

发送出去。当从

page cache

没有找到相应数据时,此时会产生磁盘

IO

,从磁盘

Load

消息到

page cache

,然后直接从

socket

发出去。

六、Kafka 中的 Partition 和 Offset

**【1】

Log

机制:** 说到分区,就要说

Kafka

对消息的存储,首先,

kafka

是通过

log

(日志)来记录消息发布的。每当产生一个消息,

Kafka

会记录到本地的

log

文件中,这个

log

和我们平时的

log

有一定的区别。这里可以参考一下

The Log

,不多解释。这个

log

文件默认的位置在

config/server.properties

中指定的,默认的位置是

log.dirs=/tmp/kafka-logs

Linux

不用说,

windows

的话就在你对应磁盘的根目录下。

**分区

Partition

:**

Kafka

是为分布式环境设计的,因此如果日志文件,其实也可以理解成消息数据库,放在同一个地方,那么必然会带来可用性的下降,一挂全挂,如果全量拷贝到所有的机器上,那么数据又存在过多的冗余,而且由于每台机器的磁盘大小是有限的,所以即使有再多的机器,可处理的消息还是被磁盘所限制,无法超越当前磁盘大小。因此有了

Partition

的概念。

Kafka

对消息进行一定的计算,通过

hash

来进行分区。这样,就把一份

log

文件分成了多份。如上面的分区读写日志图,分成多份以后,在单台

Broker

上,比如快速上手中,如果新建

Topic

的时候,我们选择

replication-factor 1 partitions 2

,那么在

log

目录里,我们会看到

test-0

目录和

test-1

目录就是两个分区了。你可能会想,这没啥区别呀。注意,当有了多个

broker

之后,这个意义就存在了。这里上一张图:

**【2】

Kafka

分布式分区存储:** 这是一个

Topic

包含

4

Partition

2 Replication

(拷贝),也就是说全部的消息被放在了

4

个分区存储,为了高可用,将

4

个分区做了2份冗余,然后根据分配算法。将总共

8

份数据,分配到

Broker

集群上。结果就是每个

Broker

上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用。比如图中的

Broker1

,宕机了那么剩下的三台

Broker

依然保留了全量的分区数据。所以还能使用,如果再宕机一台,那么数据不完整了。当然你可以设置更多的冗余,比如设置了冗余是

4

,那么每台机器就有了

0123

完整的数据,宕机几台都行。需要在存储占用和高可用之间做衡量。至于宕机后,

zookeeper

会选出新的

partition leader

**偏移

offset

:** 上一段说了分区,分区就是一个有序的,不可变的消息队列。新来的

commit log

持续往后面加数据。这些消息被分配了一个下标(或者偏移),就是

offset

,用来定位这一条消息。消费者消费到了哪条消息,是保持在消费者这一端的。消息者也可以控制,消费者可以在本地保存最后消息的

offset

,并间歇性的向

zookeeper

注册

offset

。也可以重置

offset

如何通过

offset

算出分区:其实

Partition

存储的时候,又分成了多个

segment

(段),然后通过一个

index

索引,来标识第几段。这里先可以去看一下本地

log

目录的分区文件夹。在我这里,

test-0

这个分区里面,会有一个

index

文件和一个

log

文件,对于某个指定的分区,假设每

5

个消息作为一个段大小,当产生了

10

条消息的情况下,目前有会分段。

0.index

(表示这里

index

是对

0-4

做的索引)、

5.index

(表示这里

index

是对

5-9

做的索引)、

10.index

(表示这里

index

是对

10-15

做的索引,目前还没满) 和

0.log

5.log

10.log

。当消费者需要读取

offset=8

的时候,首先

kafka

index

文件列表进行二分查找,可以算出应该是在

5.index

对应的

log

文件中,然后对对应的

5.log

文件,进行顺序查找,

5->6->7->8

,直到顺序找到

8

就好了。

七、索引

消费者可以从

Kafka

的任意可用偏移量位置开始读取消息,假设消费者要读取从偏移量

100

开始的

1MB

消息,那么

Broker

必须立即定位到偏移量

100

,为了帮组

broker

更快地定位到指定的偏移量,

Kafka

为每个分区维护一个索引。索引把偏移量映射到片段文件和偏移量在文件里的位置。索引也被分成片段,所以再删除消息时,也可以删除相应的索引。

Kafka

不维护索引的校验和。如果索引出现损坏,

Kafka

会通过重新读取消息并录制偏移量和位置来重新生成索引。如果有必要,管理员是可以删除索引的,这样做是绝对安全的,

Kafka

会自动重新生成这些索引。

八、Kafka高效文件存储设计特点

【1】

Kafka

topic

中一个

parition

大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
【2】通过索引信息可以快速定位

message

和确定

response

的最大大小。
【3】通过

index

元数据全部映射到

memory

,可以避免

segment file

IO

磁盘操作。
【4】通过索引文件稀疏存储,可以大幅降低

index

文件元数据占用空间大小。


本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/143245927
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。

“Kafka 物理存储机制”的评论:

还没有评论