0


KafKa存储机制

存储机制

Kafka 是为了解决大数据的实时日志流而生的, 每天要处理的日志量级在千亿规模。对于日志流的特点主要包括 :

  1. 数据实时产生

  2. 海量数据存储与处理

所以它必然要面临分布式系统遇到的高并发、高可用、高性能等三高问题。

对于 Kafka 的存储需要保证以下几点:

  1. 存储的主要是消息流(可以是简单的文本格式也可以是其他格式,对于 Broker 存储来说,它并不关心数据本身)

  2. 要支持海量数据的高效存储、高持久化(保证重启后数据不丢失)

  3. 要支持海量数据的高效检索(消费的时候可以通过offset或者时间戳高效查询并处理)

  4. 要保证数据的安全性和稳定性、故障转移容错性

kafka 存储选型

从上图性能测试的结果看出普通机械磁盘的顺序I/O性能指标是53.2M values/s,而内存的随机I/O性能 指标是36.7M values/s。由此似乎可以得出结论:磁盘的顺序I/O性能要强于内存的随机I/O性能。

另外,如果需要较高的存储性能,必然是提高读速度和写速度:

  1. 提高读速度:利用索引,来提高查询速度,但是有了索引,大量写操作都会维护索引,那么会降低写入效率。常见的如关系型数据库:mysql等

  2. 提高写速度:这种一般是采用日志存储, 通过顺序追加写的方式来提高写入速度,因为没有索引,无法快速查询,最严重的只能一行行遍历读取。常见的如大数据相关领域的基本都基于此方式来实现。

Kafka 存储方案剖析

对于 Kafka 来说, 它主要用来处理海量数据流,这个场景的特点主要包括:

  1. 写操作:写并发要求非常高,基本得达到百万级 TPS,顺序追加写日志即可,无需考虑更新操作

  2. 读操作:相对写操作来说,比较简单,只要能按照一定规则高效查询即可(offset或者时间戳)

对于写操作来说,直接采用顺序追加写日志的方式就可以满足 Kafka 对于百万TPS写入效率要求。所以我们重点放在如何解决高效查询这些日志。Kafka采用了稀疏哈希索引(底层基于Hash Table 实现)的方式

把消息的 Offset 设计成一个有序的字段,这样消息在日志文件中也就有序存放了,也不需要额外引入哈 希表结构, 可以直接将消息划分成若干个块,对于每个块,我们只需要索引当前块的第一条消息的 Offset (类似二分查找算法的原理),即先根据 Offset 大小找到对应的块, 然后再从块中顺序查找, 这样就可以快速定位到要查找的消息。

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制。 它将每个 Partition 分为多个 Segment,每个 Segment 对应两个文件:“.index” 索引文件和 “.log” 数据文件。这些文件位于同一文件下,该文件夹的命名规则为:topic 名-分区号。例如,test这个 topic 有三分分 区,则其对应的文件夹为 test-0,test-1,test-2。

index 和 log 文件以当前 Segment 的第一条消息的 Offset 命名。下图为 index 文件和 log 文件的结构示意图:

“.index” 文件存储大量的索引信息 “.log” 文件存储大量的数据,

索引文件中的元数据指向对应数据文件中 Message 的物理偏移量。

查看索引:./kafka-dump-log.sh --files /tmp/kafka-logs/test-1/00000000000000000000.index

kafka 存储架构设计

从上分析我们可以知道: Kafka 最终的存储实现方案, 即基于顺序追加写日志 + 稀疏哈希索引。

Kafka 日志存储结构:

从上图可以看出Kafka 是基于「主题 + 分区 + 副本 + 分段 + 索引」的结构:

  1. kafka 中消息是以主题 Topic 为基本单位进行归类的,这里的 Topic 是逻辑上的概念,实际上在磁盘存储是根据分区 Partition 存储的, 即每个 Topic 被分成多个 Partition,分区 Partition 的数量可以在主题 Topic 创建的时候进行指定。

  2. Partition 分区主要是为了解决 Kafka 存储的水平扩展问题而设计的, 如果一个 Topic 的所有消息都只存储到一个 Kafka Broker上的话, 对于 Kafka 每秒写入几百万消息的高并发系统来说,这个 Broker 肯定会出现瓶颈, 故障时候不好进行恢复,所以 Kafka 将 Topic 的消息划分成多个 Partition, 然后均衡的分布到整个 Kafka Broker 集群中

  3. Partition 分区内每条消息都会被分配一个唯一的消息 id,即我们通常所说的偏移量 Offset, 因此 kafka 只能保证每个分区内部有序性,并不能保证全局有序性。

  4. 然后每个 Partition 分区又被划分成了多个 LogSegment,这是为了防止 Log 日志过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegement,相当于一个巨型文件被平均分割为一些相对较小的文件,这样也便于消息的查找、维护和清理。这样在做历史数据清理的时候,直接删除旧的 LogSegement 文件就可以了。

  5. Log 日志在物理上只是以文件夹的形式存储,而每个 LogSegement 对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以".snapshot"为后缀的快照索引文件等)

kafka 日志系统架构设计

再来研究topic->partition的关系

kafka 消息是按主题 Topic 为基础单位归类的,各个 Topic 在逻辑上是独立的,每个 Topic 又可以分为一个或者多个 Partition,每条消息在发送的时候会根据分区规则被追加到指定的分区中,如下图所示:

4个分区的主题逻辑结构图

日志目录布局

Kafka 消息写入到磁盘的日志目录布局是怎样的?Log 对应了一个命名为-的文件夹。举个例子,假设现在有一个名为“topic-order”的 Topic,该 Topic 中 有4个 Partition,那么在实际物理存储上表现为“topic-order-0”、“topic-order-1”、“topic-order-2”、 “topic-order-3” 这4个文件夹。

Log 中写入消息是顺序写入的。但是只有最后一个 LogSegement 才能执行写入操作,之前的所有 LogSegement 都不能执行写入操作。为了更好理解这个概念,我们将最后一个 LogSegement 称 为"activeSegement",即表示当前活跃的日志分段。随着消息的不断写入,当 activeSegement 满足一定的条件时,就需要创建新的 activeSegement,之后再追加的消息会写入新的 activeSegement。

为了更高效的进行消息检索,每个 LogSegment 中的日志文件(以“.log”为文件后缀)都有对应的几个索引文件:偏移量索引文件(以“.index”为文件后缀)、时间戳索引文件(以“.timeindex”为文件后缀)、快照索引文件 (以“.snapshot”为文件后缀)。其中每个 LogSegment 都有一个 Offset 来作为 基准偏移量(baseOffset),用来表示当前 LogSegment 中第一条消息的 Offset。偏移量是一个64位的 Long 长整型数,日志文件和这几个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为 20位数字,没有达到的位数前面用0填充。比如第一个 LogSegment 的基准偏移量为0,对应的日志文件 为00000000000000000000.log。

上面例子中 LogSegment 对应的基准位移是12768089,也说明了当前 LogSegment 中的第一条消息的偏移量为12768089,同时可以说明前一个 LogSegment 中共有12768089条消息(偏移量从0至 12768089的消息)。再如:

注意每个 LogSegment 中不只包含“.log”、“.index”、“.timeindex”这几种文件,还可能包含 “.snapshot”、“.txnindex”、“leader-epoch-checkpoint”等文件, 以及 “.deleted”、“.cleaned”、 “.swap”等临时文件。

消费者消费的时候,会将提交的位移保存在 Kafka 内部的主题__consumer_offsets中,下面来看一个整体的日志目录结构图:

磁盘数据存储

我们知道 Kafka 是依赖文件系统来存储和缓存消息,以及典型的顺序追加写日志操作,另外它使用操作系统的 PageCache 来减少对磁盘 I/O 操作,即将磁盘的数据缓存到内存中,把对磁盘的访问转变为对内存的访问。

在 Kafka 中,大量使用了 PageCache, 这也是 Kafka 能实现高吞吐的重要因素之一, 当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据页是否在 PageCache 中,如果命中则直接返回数据,从而避免了对磁盘的 I/O 操作;如果没有命中,操作系统则会向磁盘发起读取请求并将读取的数据页存入 PageCache 中,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检查数据页是否在页缓存中,如果不存在,则 PageCache 中添加相应的数据页,最后将数据写入对应的数据页。被修改过后的数据页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。

除了消息顺序追加写日志、PageCache以外, kafka 还使用了零拷贝(Zero-Copy)技术来进一步提 升系统性能, 如下图所示:

消息从生产到写入磁盘的整体过程如下图所示:

可靠性

可靠性相关的问题:

我发消息的时候,需要等 ack 嘛?

我发了消息之后,消费者一定会收到嘛?

遇到各种故障时,我的消息会不会丢?

消费者侧会收到多条消息嘛?消费者 svr 重启后消息会丢失嘛?

Kafka 从拓扑上分有如下角色:

Consumer: 消费者,一般以 API 形式存在于各个业务 svr 中

Producer: 生产者,一般以 API 形式存在于各个业务 svr 中 Kafka

broker: kafka 集群中的服务器,topic 里的消息数据存在上面

Producer 采用发送 push 的方式将消息发到 broker 上,broker 存储后。由 consumer 采用 pull 模式订阅并消费消息

Producer的可靠性保证

回答生产者的可靠性保证,即回答:

  1. 发消息之后有没有 ack

  2. 发消息收到 ack 后,是不是消息就不会丢失了而 Kafka 通过配置来指定 producer 生产者在发送消息时的 ack 策略:

# -1(全量同步确认,强可靠性保证)
Request.required.acks= -1
# 1(leader 确认收到, 默认)
Request.required.acks = 1
# 0(不确认,但是吞吐量大)
Request.required.acks = 0

kafka 配置为 CP系统

如果想实现 kafka 配置为 CP(Consistency & Partition tolerance) 系统, 配置需要如下:

request.required.acks=-1
min.insync.replicas = ${N/2 + 1}     N: follower的个数
unclean.leader.election.enable = false

如图所示,在 acks=-1 的情况下,新消息只有被 ISR 中的所有 follower(f1 和 f2, f3) 都从 leader 复制过 去才会回 ack, ack 后,无论那种机器故障情况(全部或部分), 写入的 msg4,都不会丢失, 消息状态满足 一致性 C 要求。

正常情况下,所有 follower 复制完成后,leader 回 producer ack。

异常情况下,如果当数据发送到 leader 后部分副本(f1 和 f2 同步), leader 挂了?此时任何 follower 都有可能变成新的 leader, producer 端会得到返回异常,producer 端会重新发送数据,但这样数据可能会重复(但不会丢失), 暂不考虑数据重复的情况。

min.insync.replicas 参数用于保证当前集群中处于正常同步状态的副本 follower 数量,当实际值小于配置值时,集群停止服务。如果配置为 N/2+1, 即多一半的数量,则在满足此条件下,通过算法保证强一致性。当不满足配置数时,牺牲可用性即停服。

如果选举 f3 为新 leader, 则可能会发生消息截断,因为 f3 还未同步 msg4 的数据。Kafka 的通 unclean.leader.election.enable 来控制在这种情况下,是否可以选举 f3 为 leader。旧版本中默认为 true,在某个版本下已默认为 false,避免这种情况下消息截断的出现。

通过 ack 和 min.insync.replicas 和 unclean.leader.election.enable 的配合,保证在 kafka 配置为 CP 系统时,要么不工作,要么得到 ack 后,消息不会丢失且消息状态一致。

kafka 配置为 AP系统

如果想实现 kafka 配置为 AP(Availability & Partition tolerance)系统:

request.required.acks=1

min.insync.replicas = 1

unclean.leader.election.enable = false

当配置为 acks=1 时,即 leader 接收消息后回 ack,这时会出现消息丢失的问题:如果 leader 接受到了 第 4 条消息,此时还没有同步到 follower 中,leader 机器挂了,其中一个 follower 被选为 leader, 则第 4 条消息丢失了。当然这个也需要 unclean.leader.election.enable 参数配置为 false 来配合。但是 leader 回 ack 的情况下,follower 未同步的概率会大大提升。

通过 producer 策略的配置和 kafka 集群通用参数的配置,可以针对自己的业务系统特点来进行合理的参数配置,在通讯性能和消息可靠性下寻得某种平衡。

Broker 的可靠性保证

消息通过 producer 发送到 broker 之后,还会遇到很多问题:

Partition leader 写入成功, follower 什么时候同步?

Leader 写入成功,消费者什么时候能读到这条消息?

Leader 写入成功后,leader 重启,重启后消息状态还正常嘛?

Leader 重启,如何选举新的 leader?

这些问题集中在, 消息落到 broker 后,集群通过何种机制来保证不同副本建的消息状态一致性。

LEO和HW简单介绍

LEO:LogEndOffset的缩写,表示每个partition的log最后一条Message的位置。

HW: HighWaterMark的缩写,是指consumer能够看到的此partition的位置。 取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。

下面具体分析一下 ISR 集合和 HW、LEO的关系。

假设某分区的 ISR 集合中有 3 个副本,即一个 leader 副本和 2 个 follower 副本,此时分区的 LEO 和 HW 都分别为 3 。消息3和消息4从生产者出发之后先被存入leader副本。

在消息被写入leader副本之后,follower副本会发送拉取请求来拉取消息3和消息4进行消息同步。

在同步过程中不同的副本同步的效率不尽相同,在某一时刻follower1完全跟上了leader副本而 follower2只同步了消息3,如此leader副本的LEO为5,follower1的LEO为5,follower2的LEO 为4,那么当前分区的HW取最小值4,此时消费者可以消费到offset0至3之间的消息。

当所有副本都成功写入消息3和消息4之后,整个分区的HW和LEO都变为5,因此消费者可以消费到 offset为4的消息了。

由此可见,HW用于标识消费者可以读取的最大消息位置,LEO用于标识消息追加到文件的最后位置。 如果消息发送成功,不代表消费者可以消费这条消息。

Consumer 的可靠性策略

Consumer 的可靠性策略集中在 consumer 的投递语义上,即:

何时消费,消费到什么?

按消费是否会丢?

消费是否会重复?

这些语义场景,可以通过 kafka 消费者的而部分参数进行配置,简单来说有以下 3 中场景:

AutoCommit(at most once, commit 后挂,实际会丢)

enable.auto.commit = true

auto.commit.interval.ms

配置如上的 consumer 收到消息就返回正确给 brocker, 但是如果业务逻辑没有走完中断了,实际上这个消息没有消费成功。这种场景适用于可靠性要求不高的业务。其中 auto.commit.interval.ms 代表了自动提交的间隔。比如设置为 1s 提交 1 次,那么在 1s 内的故障重启,会从当前消费 offset 进行重新消费时,1s 内未提交但是已经消费的 msg, 会被重新消费到。

手动 Commit(at least once, commit 前挂,就会重复, 重启还会丢)

enable.auto.commit = false

配置为手动提交的场景下,业务开发者需要在消费消息到消息业务逻辑处理整个流程完成后进行手动提交。如果在流程未处理结束时发生重启,则之前消费到未提交的消息会重新消费到,即消息显然会投递多次。此处应用与业务逻辑明显实现了幂等的场景下使用。

特别应关注到在 golang 中 sarama 库的几个参数的配置:

sarama.offset.initial (oldest, newest)

offsets.retention.minutes

intitial = oldest 代表消费可以访问到的 topic 里的最早的消息,大于 commit 的位置,但是小于 HW。 同时也受到 broker 上消息保留时间的影响和位移保留时间的影响。不能保证一定能消费到 topic 起始位置的消息。

如果设置为 newest 则代表访问 commit 位置的下一条消息。如果发生 consumer 重启且 autocommit 没有设置为 false, 则之前的消息会发生丢失,再也消费不到了。在业务环境特别不稳定或非持久化 consumer 实例的场景下,应特别注意。 一般情况下, offsets.retention.minutes 为 1440s。

Exactly once, 很难,需要 msg 持久化和 commit 是原子的

消息投递且仅投递一次的语义是很难实现的。首先要消费消息并且提交保证不会重复投递,其次提交前 要完成整体的业务逻辑关于消息的处理。在 kafka 本身没有提供此场景语义接口的情况下,这几乎是不 可能有效实现的。一般的解决方案,也是进行原子性的消息存储,业务逻辑异步慢慢的从存储中取出消息进行处理。

消费组Reblance

消费者组

消费组指的是多个消费者组成起来的一个组,它们共同消费 topic 的所有消息,并且一个 topic 的一个 partition 只能被一个 consumer 消费。其实reblance就是为了kafka对提升消费效率做的优化,规定了 一个ConsumerGroup下的所有consumer均匀分配订阅 Topic 的每个分区。

例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。

rebalance的影响

每次reblance会把所有的消费者重新分配监听topic,会产生一定影响

首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。如果你了解 JVM 的垃圾回收 机制,你一定听过万物静止的收集方式,即著名的 stop the world,简称 STW。在 STW 期间,所 有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。

其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更 高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配 其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。

最后,Rebalance 实在是太慢了。曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成 功 Rebalance 一次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至 少现在还没有特别好的解决方案。所谓“本事大不如不摊上”,也许最好的解决方案就是避免 Rebalance 的发生吧。

标签: 大数据

本文转载自: https://blog.csdn.net/jianfeng123123/article/details/128075733
版权归原作者 Mr.简锋 所有, 如有侵权,请联系我们删除。

“KafKa存储机制”的评论:

还没有评论