0


Kafka详解

一、kafka的应用场景

1、解耦(A服务可以直接生产消息,然后不等B的结果,去做别的事。B异步的去消费)

2、异步

3、削峰

二、消息队列的通信模式

1、推拉模式

推拉模式的时候指的是 Comsumer 和 Broker 之间的交互

** 推模式**:消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送。

优点:

  • 消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer。
  • 对于消费者使用来说更简单

缺点:

  • 推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,消费者可能消费不过来。

** 拉模式**:Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer。

优点

  • 消费者可以根据自身的情况来发起拉取消息的请求。
  • 拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息。
  • 拉模式可以更合适的进行消息的批量发送

缺点

  • 消息延迟,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢。所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。
  • 消息忙请求,比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。

2、Kafka拉模式实现原理

    RocketMQ 和 Kafka 都是利用“**长轮询**”来实现**拉模式**。Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。简单的说就是消费者去 Broker 拉消息,定义了一个超时时间。拉消息时,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。

    并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回。

参考:消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?_kafka_yes_InfoQ写作社区

2.1、消费端实现逻辑

client.poll() 方法如下。实际调用的是kafka的包装过的selector。最终会调用到 Java nio的select(timeout)

2.2、Kafka实现逻辑

KafkaApis.scala 文件的 handle方法:

handleFetchRequest的重要部分源码如下:

fetchMessages实现如下:

3、点对点和发布订阅

    在**点对点系统**中,消息保留在队列中。一个或者多个消费者可以消费队列中的消息,但是**特定消息只能由一个消费者消费**,一旦该消息被一个消费者消费,他就会从队列中消失[不会将消息落磁盘]。

    在**发布订阅系统**中,**消息被保存在topic中**。与点对点的不同的是,消费者可以订阅多个topic,**一个topic中的消息可供多个消费组消费**。

    Kafka是一种高吞吐量的分布式发布订阅消息系统。具有高性能、持久化、多副本备份、横向扩展能力。

QA:kafka怎么实现点对点?

参考:Kafka如何实现点对点消息和发布订阅消息?-阿里云开发者社区

实现点对点:通过不同的消费者消费不同的partiton实现。

三、kafka的工作原理

1、基本概念及结构

kafka集群结构图

producer:消息的生产者

consumer:消息的消费者

kafka cluster:

** Broker:**是kafka的实例,每个服务器上有一个或者多个Broker。假设一台机器一个Broker实例,每个Broker都有一个不重复的编号

** Topic:**消息的主题,可以理解为消息的分类。kafka的数据就保存在topic中。在每个Broker上都可以创建多个Topic。

** Partition:**Topic的分区。每个Topic可以有多个分区。分区的作用是做负载,提高kafka的吞吐量。同一个Topic的每个分区中的数据是不重复的,partition的表现形式是一个一个文件夹。每个partition只能被一个consumer消费。

** Replication:**每一个分区有多个副本,当Leader故障时,会选择一个follower成为leader。kafka的最大默认副本数是10,且副本的数量不能 > broker的数量。每个partition的副本和leader绝对不在同一台机器上。

2、kafka生产消息的流程

Producer在写入数据的时候永远的找leader,不会直接将数据写入follower!

kafka发送消息流程图

    1、producer先从集群获取分区的leader

    2、producer将消息发送给分区的leader

    3、leader将消息写入本地文件

    4、follower从leader poll拉取消息

    5、follower将消息写入本地后,向leader发送ack

    6、leader收到所有副本的ack后向producer发送ack

注意点:

注意1: 消息写入leader后,follower是主动去leader进行同步的。

注意2: producer采用push模式将消息发布到broker,每条消息追加到分区中,顺序写入磁盘。所以每个分区内的数据是有序的。

问题1: 如果某个topic有多个partition,怎么按照partition写入数据的?

1、producer发送消息的时候指定partition

2、如果producer发送消息的时候没有指定partition。但是设置了key,会通过key值hash出一个partition

3、如果producer发送消息时没指定partition,也没有设置key,会轮询选出一个partition

问题2: kafka怎么保证消息不丢失的?

    通过上面生产消息的流程可知,producer向leader发送消息,leader将消息写入本地后会发送ACK。所以保证消息不丢的关键是**ACK应答机制**。生产者向队列写入消息的时候可以设置参数来确定kafka是否收到消息。这个参数有三个值。

0: 表示producer发送给leader就返回,不等待leader写入文件成功就返回ACK。安全性最低但是效率最高。

1: 表示producer发送给leader后,leader写入文件成功就返回ACK。producer可以接着写入下一条。

**all: **表示producer发送给leader后,leader写入文件成功并且所有follower都写入文件成功返回ACK之后,leader再给producer返回ACK。安全性最高,但是效率最低。

问题3: 如果往不存在的topic写入消息会咋样?

    kafka会自动新建一个topic,默认partition为1,默认副本数为1.

四、kafka的消息数据是怎么存储的

1、消息存储时机:

    producer将消息写入kafka之后,集群就需要对数据进行保存了,kafka会将数据保存在磁盘中。可能在我们的认知中,写入磁盘是一个很耗时的操作,不应该在这种高并发的组件中使用。但是kafka初始会单独开辟出一块磁盘空间,顺序的写入数据(顺序写入比随机写入的效率更高。为啥?)

2、Partition结构

    每个topic可以有多个partition,Partition在服务器上的表现形式是**一个一个的文件夹**。每个partition文件夹下面有**很多组的segment文件**。一组segment文件里面包含**.index文件、.log文件、.timeindex文件**。其中 **.log文件** 就是存储message的地方,.index文件和.timeindex文件是索引文件,主要用于检索消息。

Partition文件示意图

    如上图所示,这个partition有**三组segment**文件,每个 .log文件 的大小是一样的,但是里面存储的消息条数可能不相同(可能消息的大小不一致)。.log文件 命名前缀是以文件中 message **最小的 offset 命名**。这种方式其实类似二分查找,通过**segment分段 + 索引**的方式来优化查询消息的效率。

3、Message结构

    message是存在 .log 文件中的。里面主要存了

offset:占位 8 byte,是一个有序ID,他可以确定一条消息在 partition中的具体位置

消息大小:占位4 byte,用于描述消息大小

消息体:消息体存放的实际是消息数据,被压缩过。占用的空间根据具体的消息大小不一样。

4、存储策略

1、基于时间,默认保存7天。7天之后会删除

2、基于大小,默认保存配置是 1073741824(1G)

** 注意:kafka读取特定消息的时间复杂度是 o(1), 所以删除过期文件并不会提升kafka 的性能。**

5、消费数据

    消息存储在 .log 文件 之后就可以开始消费了,kafka采用的是**点对点**通信模式,即消费者**主动**去集群拉取消息消费,与生产消息类似,**消费者也是从leader拉取数据消费**。同一个topic的消息者会组成一个消费组,消费组内的每个消费者可以消费一个或者多个partition,但是,**同一个topic下的每个partition只会被消费组中的一个消费者消费**。所以性能最强的方案是消费组内的消费者个数 == partition 个数(即一台机器消费一个partition)。

consumer消费消息示意图

问题1: kafka怎么快速查找到指定的一条message?(这一切都是建立在 offset 有序的基础上)

segment文件内容展示图

1、通过 offset 找到在那个segment文件组。可以通过 .log文件 的前缀,用二分查找确定。

2、在segment文件组中,打开 .index 文件,查找 <= 相对索引 的那条数据(用的是稀疏索引)在 .log 文件中的位置。

3、在 .log文件 中按照顺序往后查找,找到offset那条数据即可。

如查找368801的offset消息。先通过二分查找找到segment2,368801 - 368796 = 5,即368801在 .log 文件中的相对offset为5。 从 .index 的稀疏索引中找出 <= 5 的最大值,得到 4.256 定位到368800 offset处。然后通过顺序查找,找到368801.

问题2:稀疏索引和稠密索引的区别?

    首先对密集索引和稀疏索引的区分在于**是否为每个索引键的值都建立索引**,简单来说就是比如有一列**有序**的值如下:1、2、3、4、5、6、7

    密集索引的做法是为这7个值建立索引记录,那么就有7条索引记录,抽象索引记录如下:
    1:到1的指针

    2:到2的指针

    ....

    7:到7的指针
    稀疏索引的做法是将这个6个值分组,1、2、3和4、5、6和7分为不同的3组,取这三组中最小的索引键值作为索引记录中的索引值,抽象索引记录如下:
    1:到顺序存储1、2、3的起始位置的指针

    4:到顺序存在4、5、6的起始位置的指针

    7:到顺序存储7的起始位置的指针
    这两种索引都要通过折半查找或者叫做二分查找来确定数据位置,不同的是密集索引,只需要通过二分查找到搜索值=索引的索引记录就能确定准确的数据位置,而稀疏索引则需要先定位到搜索值>索引值的最小的那个,然后在通过起始位置去定位具体的偏移量。

    这是两种不同的索引实现,**一种建立了索引值与数据位置的1:1的关系,一种建立了索引值与数据位置1:n的关系**。在大多数场景密集索引查询效率更高,在大多数场景稀疏索引占用空间更小。

五、offset管理

1、offset分类

    offset主要分为 **Current Offse**t 和 **Committed Offset**。Current Offset 主要是consumer用来poll()消息的,**保证不拉取到重复的消息**。Committed Offset 主要是用于 consumer rebalance。如果一个partition被重新rebalance,那么新的consumer就应该从Committed Offset开始消费新消息, **避免了重复消费**。

current offset: 是由consumer维护的,表示下一条希望收到的消息序号,仅仅在 poll() 方法中使用。例如第一次poll() 了20条消息,那么current offset就被设置成了20,下一次poll希望收到序号为20的消息。

commited offset: 保存在broker的,_consumer_offset 的topic里面。表示consumer确认已经消费过的消息序号。主要通过commitSync 和 commitAsync API 来操作。

2、Group Coordinator

    Group Coordinator是运行在Kafka集群中每一个Broker内的一个进程。它主要负责Consumer Group的管理,Offset位移管理以及Consumer Rebalance。

对于每一个Consumer Group,Group Coordinator都会存储以下信息:

1、订阅的topics列表

2、Consumer Group配置信息,包括session timeout等

3、组中每个Consumer的元数据。包括主机名,consumer id

4、每个Group正在消费的topic partition的当前offsets

5、Partition的ownership元数据,包括consumer消费的partitions映射关系

问题1: Consumer Group如何确定自己的coordinator是谁呢?

1、确定Consumer Group offset信息将要写入__consumers_offsets topic的哪个分区。具体计算公式:partition = Math.abs(groupId.hashCode() % offsets.topic.num.partitions) 。默认50个分区

2、该分区leader所在的broker就是被选定的coordinator

3、offset 存储模型

    由于一个partition只能被一个consumerGroup 中的一个consumer消费。所以 kafka保存offset时并不直接为每个消费保存。而是以 group_id-topic-partition(key)  -> offset(value) 的方式将 consumerGroup 和 partition 对应的offset 以消息的形式保存在 _consumer_offset 的topic里面。

offset存储模型

4、offset 查询

    通过 offset Cache。consumer 提交offset的时候,kafka Offset manager会首先追加一条新的conmit消息到 _consumer_offset topic中,然后更新对应的缓存,读offset时从缓存中读取,而不是直接读取 _consumer_offset topic。

Offset Cache

问题1:如果kafka中没有对应的offset信息(有可能offset被删除了),消费者应该从何处开始消费?

    读取 auto.offset.reset 参数
    1、earliest: 从最早的offset开始消费

    2、latest: 从最近的offset开始消费

    3、none: 报错

问题2: 如果consumer消费了5条消息,然后宕机,重新连接后从哪个位置开始消费?

    从第6条消息开始消费,这是因为重启后读取 commited offset 为5,防止重复消费从6开始

问题3: 如果consumer第一次接入一个已经存在的topic,此时kafka中不存在这个consumer的消费offset信息,从哪开始消费?

    看消费的哪个partition,按照groupid+topic+partation 去 consumer_offset 查找commited_offset  存在就继续消费,不存在按照 auto.offset.reset 参数配置的开始消费。

六、Rebalance

在新建一个Consumer时,我们可以通过指定groupId来将其添加进一个Consumer Group中。Consumer Group是为了实现多个Consumer能够并行的消费一个Topic,并且一个partition只能被一个Consumer Group里的一个固定的Consumer消费。

1、触发场景

1、有新的消费者加入Consumer Group。

2、有消费者主动退出Consumer Group。

3、Consumer Group订阅的任何一个Topic出现分区数量的变化

2、Rebalance策略

默认情况下,Kafka提供了两种分配策略:Range和RoundRobin且Rebalance过程中,所有consumer都不可以消费。举个例子,比如有两个消费者C0和C1,两个topic(t0,t1),每个topic有三个分区p(0-2)。那么采用Range策略,分配出的结果为:

C0: [t0p0, t0p1, t1p0, t1p1]

C1: [t0p2, t1p2]

采用RoundRobin策略(所有topic一起排序),分配出的结果为:

C0: [t0p0, t0p2, t1p1]

C1: [t0p1, t1p0, t1p2]

QA:用db设计实现Kafka,表结构应该是咋样的?需要加什么索引?

对kafka来说,最主要的有两点:

  1. 按照topic + consumerGroupId + partition: 唯一键获取当前的commit offset。
  2. 按照topic+ partition + offset: 查到对应的消息。
CREATE TABLE `commit_offset_manage_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '数据库自增主键',
  `topic` varchar(32) NOT NULL DEFAULT '' COMMENT '消息Topic',
  `consumer_group_id` varchar(32) NOT NULL COMMENT '消费组ID',
  `partition` int(10) NOT NULL COMMENT 'topic分区',
  `commit_offset` bigint(10) NOT NULL COMMENT '消费组已提交的offset',
  `consumer_ip` varchar(32) NOT NULL COMMENT '消费组中消费者ip',
  `created_at` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `updated_at` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_topic_group_id_partition` (`topic`,`consumer_group_id`,`partition`),
  KEY `idx_created_at` (`created_at`),
  KEY `idx_updated_at` (`updated_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消费组管理明细表';


-- 可以按照 topic分库,partition分表进行水平扩展
CREATE TABLE `manage_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '数据库自增主键',
  `topic` varchar(32) NOT NULL DEFAULT '' COMMENT '消息Topic',
  `partition` int(10) NOT NULL COMMENT 'topic分区',
  `offset` bigint(10) NOT NULL COMMENT 'offset',
  `message_size` int(10) NOT NULL COMMENT '消息大小',
  `message` varchar(526) NOT NULL DEFAULT '' COMMENT '消息体',
  `created_at` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `updated_at` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_topic_partition_offset` (`topic`,`partition`,`offset`),
  KEY `idx_created_at` (`created_at`),
  KEY `idx_updated_at` (`updated_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消费明细表';


Kafka Consumer Group和Consumer Rebalance机制

Kafka 读写原理与存储结构

Kafka消息送达语义详解

Kafka事务特性详解

Kafka常见问题精选

Kafka架构设计

标签: kafka 分布式

本文转载自: https://blog.csdn.net/qq_41644412/article/details/139202241
版权归原作者 玛卡巴卡历险记 所有, 如有侵权,请联系我们删除。

“Kafka详解”的评论:

还没有评论