优质博文:IT-BLOG-CN
一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从
Kafka
文件存储机制和物理结构角度,分析
Kafka
是如何实现高效文件存储,及实际应用效果。
Kafka
的基本存储单位是分区。在配置
Kafka
的时候,管理员指定了一个用于存储分区的目录清单
log.dirs
参数的值。
一、分区分配
创建主题时,
Kafka
首先决定如何在
broker
之间分配分区。假设有
6
个
broker
,打算创建一个包含
10
个分区的主题。并且复制系数是
3
,相当于
30
个分区副本。在被分配到
6
个
broker
上时,要达到如下的目标:
【1】在
broker
间平均分配分区副本。对于上述例子来说,就是要保证每个
broker
可以分到
5
个副本。
【2】确保每个分区的每个副本分布在不同的
broker
上。
【3】如果为
broker
指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的
broker
上。这样做是为了保证一个机架不可用不会导致整个分区不可用。
为了实现这个目标,我们先随机选择一个
broker
(假设是
2
),然后通过轮询给每个
broker
分配分区来确定首领的位置。如果分区
0
的首领在
broker2
上,那么分区
1
的首领就在
broker3
上,以此类推。然后,从分区首领开始,以此分配跟随者副本。如分区
0
首领在
broker2
上,那么它的第一个副本会出现在
broker3
上,第二个出现在
broker4
上。如果配置了机架信息,那么就不是按照数字顺序来选择
broker
了,而是按照交替机架的方式来选择
broker
。假设
broker0
、
broker1
、
broker2
放在同一个机架,
broker3
、
broker4
、
broker5
放在其他不同的机架。此时就不是按照
0
到
5
的顺序来选择
broker
,而是按照
0
,
3
,
1
,
4
,
2
,
5
的顺序进行选择的。
二、文件管理
保留数据时
Kafka
的一个基本特性,
Kafka
不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反,
Kafka
管理员为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。 因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以我们把分区分成若干个片段。默认情况下,
index
大小为
10M
,每个片段
log
包含
1GB
或一周数据,以较小的那个为准。当前正在写入数据的片段叫做活跃片段,活跃片段永远不会被删除。
三、文件格式
我们把
Kafka
的消息和偏移量保存在文件里。保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的。因为使用相同的消息格式进行磁盘存储和网络传输,
Kafka
可以使用零复制技术给消费者发送消息,同时避免了对生产者已经压缩过的消息进行解压缩。除了键、值和偏移量外,消息里还包含了消息大小、校验和、消息格式版本号、压缩算法和时间戳。时间戳可以是生产者发送消息的时间,也可以是消息到达
broker
的时间,这个是可配置的。如果生产者发送的是压缩过的消息,那么同一个批次的消息会被再压缩一次,被当做包装消息进行发送。下面是普通消息和包装消息图:
四、文件存储机制
**【1】
Broker
:** 消息中间件处理结点,一个
Kafka
节点就是一个
Broker
,多个
Broker
可以组成一个
Kafka
集群。
**【2】
Topic
:** 主题,如
page view
日志、
click
日志等都可以以
Topic
的形式存在,
Kafka
集群能够同时负责多个
Topic
的分发。
**【3】
Partition
:**
Topic
物理上的分组,一个
Topic
可以分为多个
Partition
,每个
Partition
是一个有序的队列。
**【4】
Segment
:**
Partition
物理上由多个
Segment
组成。
**【5】
offset
:** 每个
Partition
都由一系列有序的、不可变的消息组成,这些消息被连续的追加到
Partition
中。
Partition
中的每个消息都有一个连续的序列号叫做
offset
,用于
Partition
唯一标识一条消息。
**分析过程分为以下
4
个步骤:**
**【1】
Topic
中
Partition
存储分布:** 假设
Kafka
集群只有一个
Broker
,
xxx/message-folder
为数据文件存储根目录,在
Kafka Broker
中
server.properties
文件配置(参数
log.dirs=xxx/message-folder
),例如创建
2
个
Topic
名称分别为
report_push
、
launch_info
,
Partitions
数量都为
partitions=4
(将一个
Topic
分为
4
个部分存储)存储路径和目录规则为:
xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
**【2】
Partiton
中文件存储方式:** 每个
Partion
(目录)相当于一个巨型文件被平均分配到多个大小相等
Segment
(段)数据文件中。但每个段
Segment file
消息数量不一定相等,这种特性方便
old segment file
快速被删除。每个
Partiton
只需要支持顺序读写就行了,
Segment
文件生命周期由服务端配置参数决定。这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。
【3】
partiton
中
segment
文件存储结构:
segment file
由
2
大部分组成,分别为
index file
和
data file
,此
2
个文件成对出现,后缀
".index"
和
“.log”
分别表示为
segment
索引文件、数据文件。
segment
文件命名规则:
partion
全局的第一个
segment
从
0
开始,后续每个
segment
文件名为上一个
segment
文件最后一条消息的
offset
值。数值最大为
64
位
long
大小,
19
位数字字符长度,没有数字用0填充。下面文件列表是笔者在
Kafka broker
上做的一个实验,创建一个
topicXXX
包含
1 partition
,设置每个
segment
大小为
500MB
,并启动
producer
向
Kafka broker
写入大量数据,如下图所示
segment
文件列表形象说明了上述
2
个规则以及
segment
中
index<—->data file
对应关系物理结构如下:
索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中
message
的物理偏移地址。 其中以索引文件中元数据
3,497
为例,依次在数据文件中表示第
3
个
message
(在全局
partiton
表示第
368772
个
message
)、以及该消息的物理偏移地址为
497
。从上述图
3
了解到
segment data file
由许多
message
组成,下面详细说明
message
物理结构如下:
【参数说明】:
8 byte offset
:在
Parition
(分区)内的每条消息都有一个有序的
id
号,这个
id
号被称为偏移
offset
,它可以唯一确定每条消息在
Parition
内的位置。即
offset
表示
Partiion
的第多少
message
。
4 byte message size
:
message
大小;
4 byte CRC32
:用
crc32
校验
message
;
1 byte “magic"
:表示本次发布
Kafka
服务程序协议版本号;
1 byte “attributes"
:表示为独立版本、或标识压缩类型、或编码类型;
4 byte key length
:表示
key
的长度,当
key
为
-1
时,
K byte key
字段不填;
value bytes payload
:表示实际消息数据;
`index文件结构:
offset: 783932 position: 69483992
offset: 784323 position: 69543233
offset: 784565 position: 69589443
offset: 784932 position: 69623433
offset: 785355 position: 69658994
offset: 785894 position: 69704355
offset: 786389 position: 69738993
offset: 786584 position: 69784345
**
log
文件结构:** 有个眼缘即可
offset: 784932 CreateTime:1598161852389 keysize: -1 valuesize: 15 sequence: 9884 baseOffset: 7043213 lastOffset: 784932 count: 1 baseSequence: 907
**【4】在
partition
中如何通过
offset
查找
message
:** 例如读取
offset=368776
的
Message
,需要通过下面
2
个步骤查找。
**【第一步】查找
segment file
:** 上图为例,其中
00000000000000000000.index
表示最开始的文件,起始偏移量
offset
为0。第二个文件
00000000000000368769.index
的消息量起始偏移量为
368770 = 368769 + 1
。同样,第三个文件
00000000000000737337.index
的起始偏移量为
737338=737337 + 1
,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据
offset
二分查找文件列表,就可以快速定位到具体文件。当
offset=368776
时定位到
00000000000000368769.index|log
;
**【第二步】通过
segment file
查找
message
:** 通过第一步定位到
segment file
,当
offset=368776
时,依次定位到
00000000000000368769.index
的元数据物理位置和
00000000000000368769.log
的物理偏移地址,然后再通过
00000000000000368769.log
顺序查找直到
offset=368776
为止。从上述图可知这样做的优点,
segment index file
采取稀疏索引存储方式,它减少索引文件大小,通过
mmap
可以直接内存操作,稀疏索引为数据文件的每个对应
message
设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。通过上述过程详细分析,我们就可以清楚认识到
kafka
文件存储机制的奥秘。
五、Kafka文件实际运行效果
【实验环境】:
Kafka
集群 = 由
2
台虚拟机组成;
CPU = 4
核;物理内存
= 8GB
;网卡 = 千兆网卡;
JVM HEAP = 4GB
;详细
Kafka
服务端配置及其优化请参考:
Kafka server.properties
配置详解:
从上述图可以看出,
Kafka
运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效。这跟
Kafka
文件存储中读写
message
的设计是息息相关的。
Kafka
中读写
message
有如下特点:写
message
,消息从
java
堆转入
page cache
即物理内存。由异步线程刷盘,消息从
page cache
刷入磁盘。读
message
消息直接从
page cache
转入
socket
发送出去。当从
page cache
没有找到相应数据时,此时会产生磁盘
IO
,从磁盘
Load
消息到
page cache
,然后直接从
socket
发出去。
六、Kafka 中的 Partition 和 Offset
**【1】
Log
机制:** 说到分区,就要说
Kafka
对消息的存储,首先,
kafka
是通过
log
(日志)来记录消息发布的。每当产生一个消息,
Kafka
会记录到本地的
log
文件中,这个
log
和我们平时的
log
有一定的区别。这里可以参考一下
The Log
,不多解释。这个
log
文件默认的位置在
config/server.properties
中指定的,默认的位置是
log.dirs=/tmp/kafka-logs
,
Linux
不用说,
windows
的话就在你对应磁盘的根目录下。
**分区
Partition
:**
Kafka
是为分布式环境设计的,因此如果日志文件,其实也可以理解成消息数据库,放在同一个地方,那么必然会带来可用性的下降,一挂全挂,如果全量拷贝到所有的机器上,那么数据又存在过多的冗余,而且由于每台机器的磁盘大小是有限的,所以即使有再多的机器,可处理的消息还是被磁盘所限制,无法超越当前磁盘大小。因此有了
Partition
的概念。
Kafka
对消息进行一定的计算,通过
hash
来进行分区。这样,就把一份
log
文件分成了多份。如上面的分区读写日志图,分成多份以后,在单台
Broker
上,比如快速上手中,如果新建
Topic
的时候,我们选择
replication-factor 1 partitions 2
,那么在
log
目录里,我们会看到
test-0
目录和
test-1
目录就是两个分区了。你可能会想,这没啥区别呀。注意,当有了多个
broker
之后,这个意义就存在了。这里上一张图:
**【2】
Kafka
分布式分区存储:** 这是一个
Topic
包含
4
个
Partition
,
2 Replication
(拷贝),也就是说全部的消息被放在了
4
个分区存储,为了高可用,将
4
个分区做了2份冗余,然后根据分配算法。将总共
8
份数据,分配到
Broker
集群上。结果就是每个
Broker
上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用。比如图中的
Broker1
,宕机了那么剩下的三台
Broker
依然保留了全量的分区数据。所以还能使用,如果再宕机一台,那么数据不完整了。当然你可以设置更多的冗余,比如设置了冗余是
4
,那么每台机器就有了
0123
完整的数据,宕机几台都行。需要在存储占用和高可用之间做衡量。至于宕机后,
zookeeper
会选出新的
partition leader
。
**偏移
offset
:** 上一段说了分区,分区就是一个有序的,不可变的消息队列。新来的
commit log
持续往后面加数据。这些消息被分配了一个下标(或者偏移),就是
offset
,用来定位这一条消息。消费者消费到了哪条消息,是保持在消费者这一端的。消息者也可以控制,消费者可以在本地保存最后消息的
offset
,并间歇性的向
zookeeper
注册
offset
。也可以重置
offset
。
如何通过
offset
算出分区:其实
Partition
存储的时候,又分成了多个
segment
(段),然后通过一个
index
索引,来标识第几段。这里先可以去看一下本地
log
目录的分区文件夹。在我这里,
test-0
这个分区里面,会有一个
index
文件和一个
log
文件,对于某个指定的分区,假设每
5
个消息作为一个段大小,当产生了
10
条消息的情况下,目前有会分段。
0.index
(表示这里
index
是对
0-4
做的索引)、
5.index
(表示这里
index
是对
5-9
做的索引)、
10.index
(表示这里
index
是对
10-15
做的索引,目前还没满) 和
0.log
、
5.log
、
10.log
。当消费者需要读取
offset=8
的时候,首先
kafka
对
index
文件列表进行二分查找,可以算出应该是在
5.index
对应的
log
文件中,然后对对应的
5.log
文件,进行顺序查找,
5->6->7->8
,直到顺序找到
8
就好了。
七、索引
消费者可以从
Kafka
的任意可用偏移量位置开始读取消息,假设消费者要读取从偏移量
100
开始的
1MB
消息,那么
Broker
必须立即定位到偏移量
100
,为了帮组
broker
更快地定位到指定的偏移量,
Kafka
为每个分区维护一个索引。索引把偏移量映射到片段文件和偏移量在文件里的位置。索引也被分成片段,所以再删除消息时,也可以删除相应的索引。
Kafka
不维护索引的校验和。如果索引出现损坏,
Kafka
会通过重新读取消息并录制偏移量和位置来重新生成索引。如果有必要,管理员是可以删除索引的,这样做是绝对安全的,
Kafka
会自动重新生成这些索引。
八、Kafka高效文件存储设计特点
【1】
Kafka
把
topic
中一个
parition
大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
【2】通过索引信息可以快速定位
message
和确定
response
的最大大小。
【3】通过
index
元数据全部映射到
memory
,可以避免
segment file
的
IO
磁盘操作。
【4】通过索引文件稀疏存储,可以大幅降低
index
文件元数据占用空间大小。
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。