0


Kafka

学习记录来自:尚硅谷大数据技术之 Kafka

消息队列(MQ)

消息队列(MQ)在分布式系统中有着广泛的应用。以下是一些常见的应用场景:

  1. 解耦系统:消息队列可以用来解耦不同系统或服务之间的依赖。生产者只需将消息发送到队列,不需要关心消费者的处理速度或处理逻辑,消费者可以根据自己的节奏从队列中读取和处理消息。
  2. 异步处理:在需要异步处理的场景中,消息队列非常有用。生产者可以迅速地将任务提交到队列,而不必等待任务完成。消费者可以在后台异步地处理这些任务。例如,订单处理系统可以立即响应用户的订单请求,并将订单处理任务放入队列进行异步处理。
  3. 异步调用的优势包括:降低耦合度、提高性能和拓展性方面。缺点,比如依赖于中间件(Broker)的可靠性以及架构复杂性带来的维护挑战。
  4. 负载调节:消息队列可以帮助调节系统负载。在高峰期,生产者可以快速地将大量请求放入队列,而消费者可以根据自己的处理能力逐步处理这些请求,从而避免系统过载。
  5. 日志和监控:许多系统使用消息队列来收集和传输日志或监控数据。日志数据可以通过消息队列传输到集中式的日志管理系统,进行统一的存储和分析。
  6. 数据流处理:消息队列在实时数据流处理场景中非常有用。例如,金融系统中的交易数据、物联网系统中的传感器数据,都可以通过消息队列进行实时传输和处理。
  7. 事件驱动架构:在事件驱动架构中,消息队列用于传递事件。例如,电商平台可以使用消息队列来传递订单创建、支付成功、发货等事件,相关的服务可以订阅这些事件并作出相应的处理。
  8. 分布式事务:在分布式系统中实现事务一致性非常复杂。通过消息队列,可以实现某种形式的事务管理,如最终一致性。生产者发送消息表示某个事务的状态变化,消费者根据消息的状态变化来处理相应的事务逻辑。

两种模式

消息队列主要有两种模式:点对点(Point-to-Point)模式和发布/订阅(Publish/Subscribe)模式。

点对点模式

  • 基本概念:在点对点模式中,消息生产者将消息发送到队列,消息消费者从队列中读取消息。每条消息只会被一个消费者消费。
  • 特点: - 消息确认:消费者读取到消息后,需要向消息队列发送确认,表示该消息已被成功处理。未被确认的消息会重新投递。- 消息持久化:消息在队列中可以持久化保存,直到被某个消费者成功消费。- 负载均衡:多个消费者可以共同消费一个队列中的消息,实现负载均衡。
  • 适用场景:适用于任务处理、工作队列等场景。例如,一个订单处理系统,订单生成后放入队列,不同的消费者从队列中读取并处理订单。

发布/订阅模式

  • 基本概念:在发布/订阅模式中,消息生产者将消息发布到一个主题(Topic),消息消费者订阅该主题,并接收所有发布到该主题的消息。每条消息可以被多个消费者消费。
  • 特点: - 多播机制:同一条消息可以被所有订阅该主题的消费者接收,适用于广播通知。- 灵活订阅:消费者可以灵活订阅一个或多个主题,根据需要接收不同类型的消息。- 消息持久化:消息可以持久化保存,允许消费者在重启后继续接收之前未消费的消息。
  • 适用场景:适用于事件通知、日志广播、实时数据流等场景。例如,新闻发布系统,新闻发布后,所有订阅该主题的用户都会接收到新闻推送。

RabbitMQ

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个应用层协议,用于消息中间件系统之间的通信。它定义了消息传递的规则和格式,确保不同的系统和应用能够通过消息队列进行可靠的、异步的消息传递。

Spring AMQP(Asynchronous Messaging with RabbitMQ)是Spring框架中用于处理消息队列的一个模块。它提供了一套简化与RabbitMQ进行集成和操作的工具,使得开发人员能够方便地使用消息队列实现异步通信、消息传递、发布/订阅等功能。

Kafka

Kafka是一个分布式的消息系统,用于处理和传输大量的实时数据。它的架构设计使其能够提供高吞吐量和低延迟的消息传递。

首先,Kafka由多个服务器(称为代理)组成,这些代理共同工作来处理数据。这些数据被组织成主题,主题是消息的类别或分组,每个主题又分为多个分区,以实现并行处理和高效存储。

在使用Kafka时,生产者将数据发送到Kafka集群中的特定主题。每个消息都会被写入一个特定的分区。生产者可以根据一些策略来选择将消息发送到哪个分区,如循环选择或基于消息内容的哈希选择。

消费者从Kafka集群中读取数据,它们可以订阅一个或多个主题并处理从这些主题接收到的消息。消费者组允许多个消费者共同处理一个主题中的消息,每个消费者处理不同的分区,确保消息被有效处理而不会重复。

Kafka还具有数据持久化的特点,所有消息都会被持久化到磁盘中,并且可以配置消息的保留策略,确定消息在磁盘中保留的时间。

Kafka的另一个重要特性是它的高可用性和容错性。通过复制,每个分区的数据都会被复制到多个代理中,确保即使某个代理发生故障,数据仍然可用。

架构

Kafka 的基础架构主要由几个关键组件构成,包括主题、分区、代理、生产者、消费者和消费者组。以下是对这些组件的详细介绍:

主题(Topic):主题是消息的分类或分组。生产者将消息发送到某个主题,消费者从主题中读取消息。每个主题可以有多个分区。

分区(Partition):每个主题被划分为多个分区,每个分区是一个有序的、不变的消息序列,分区中的每条消息都有一个唯一的偏移量。分区的设计有助于实现并行处理和高可用性。每个分区在物理上对应一个文件夹,该文件夹里面存储了这个分区的所有消息和索引文件。在创建topic时可指定parition数量,生产者将消息发送到topic时,消息会根据 分区策略 追加到分区文件的末尾,属于顺序写磁盘。

代理(Broker):Kafka 运行在一个或多个服务器(代理)上,这些服务器共同组成一个 Kafka 集群。每个代理负责存储分配给它的分区数据。代理之间通过复制来保证数据的高可用性和容错性。一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。

生产者(Producer):生产者是负责将消息发送到 Kafka 主题的应用程序。生产者可以选择将消息发送到特定的分区,或使用轮询等策略将消息分配到不同的分区。

消费者(Consumer):消费者是负责从 Kafka 主题读取消息的应用程序。消费者可以订阅一个或多个主题,并从这些主题的分区中读取消息。

消费者组(Consumer Group):消费者组是一组协同工作的消费者,它们共同消费一个或多个主题的消息。每个分区的消息只能被同一个消费者组中的一个消费者消费,从而实现了负载均衡。多个消费者组可以独立消费同一个主题的消息。

Kafka Controller:控制器是 Kafka 集群中的一个代理,它负责管理分区的领导者选举和副本分配。当某个代理失败时,控制器会选择新的分区领导者,确保数据的高可用性。

ZooKeeper:Kafka 使用 ZooKeeper 来管理和协调 Kafka 集群。它负责存储集群的元数据,如代理列表、主题和分区信息,以及消费者组的偏移量信息。ZooKeeper 确保 Kafka 集群的高可用性和一致性。

消息存储:Kafka 使用日志文件来存储消息。每个分区对应一个日志文件,消息按顺序追加到日志文件中。Kafka 提供了多种配置选项来管理日志文件的存储和清理,如日志保留策略、日志压缩等。

副本: Kafka 中用来实现数据高可用性的一种机制。每个分区的数据都会被复制到多个 Kafka Broker 上,这些副本组成了一个副本集(Replica Set)。

Kafka 的架构设计旨在提供高吞吐量、低延迟、可扩展性和高可用性,使其成为构建实时数据流处理系统的理想选择。通过理解 Kafka 的基础架构,可以更好地设计和优化基于 Kafka 的应用程序。

大白话 kafka 架构原理 (qq.com)

kafka吞吐量为什么这么高

Kafka 的高吞吐量来自于其独特的架构设计和多种优化技术。以下是 Kafka 能够实现高吞吐量的主要原因:

1. 顺序读写和分区设计

  • 顺序读写:Kafka 的设计使得消息的读写都是顺序进行的。这种顺序操作非常适合磁盘的特性,因为顺序写入的速度要远远快于随机写入。这种操作减少了磁盘寻道时间,从而提升了吞吐量。
  • 分区(Partitioning):Kafka 将每个 Topic 分为多个分区,每个分区可以独立地进行读写操作。这使得 Kafka 能够通过水平扩展分区的数量来处理更高的负载,从而提高整体吞吐量。

2. 零拷贝机制(Zero-Copy)

Kafka 使用 Linux 的零拷贝(Zero-Copy)机制,尤其是在消息的传输过程中。通过系统调用如

sendfile()

,Kafka 可以直接将数据从磁盘读取并发送到网络,而不需要经过用户空间的缓冲区,从而减少了 CPU 使用和内存带宽消耗。这种方式大幅提升了消息传输的效率。

Linux 操作系统的 "zero-copy" 机制是一种优化数据传输的技术,它能够在应用程序和内核之间传递数据时,避免不必要的数据拷贝,从而提高系统的性能和效率。它在网络编程和文件操作中非常有用,特别是在需要大规模数据传输的场景中。

1."Zero-copy" 的工作原理

在传统的数据传输流程中,比如从文件读取数据并通过网络发送到另一个系统,通常会经历多个数据拷贝过程:

  1. 从磁盘读取到内核缓冲区: 数据首先从磁盘读取到操作系统内核空间的缓冲区。
  2. 从内核缓冲区复制到用户空间: 然后数据从内核缓冲区复制到应用程序的用户空间缓冲区。
  3. 从用户空间复制回内核空间: 应用程序处理完数据后,再将其从用户空间复制回内核空间,准备通过网络发送。
  4. 发送数据到网络缓冲区: 最后,数据从内核空间被复制到网络缓冲区,通过网络接口发送出去。

在这个过程中,数据需要多次在用户空间和内核空间之间拷贝,这会带来额外的 CPU 开销和内存带宽消耗。而 "zero-copy" 机制减少甚至避免了这些不必要的拷贝过程。

2.Linux 中的 "zero-copy" 实现方式

Linux 提供了几种 "zero-copy" 的实现方式,主要包括:

  1. sendfile() 系统调用:sendfile() 是 Linux 中常用的 "zero-copy" 系统调用。它直接将文件内容从内核的 Page Cache 发送到网络接口,而不需要将数据拷贝到用户空间。它大大减少了数据拷贝的次数。
  2. splice()vmsplice() 系统调用: 这两个系统调用允许在文件描述符之间传递数据,而无需将数据复制到用户空间。splice() 可以在两个文件描述符之间移动数据,vmsplice() 则将用户空间的内存映射到管道(pipe)中,从而避免数据拷贝。
  3. 内存映射(mmap(): 虽然严格意义上不是 "zero-copy",但是通过 mmap() 可以将文件映射到进程的地址空间中,允许应用程序直接访问文件内容,而无需进行额外的数据拷贝。

优势

  • 减少 CPU 负载:通过减少数据在用户空间和内核空间之间的拷贝次数,"zero-copy" 机制显著降低了 CPU 的负载。
  • 提高吞吐量:避免不必要的内存拷贝,提高了数据传输的效率,适合高吞吐量场景。
  • 降低延迟:减少数据拷贝的时间,有助于降低传输延迟。

场景

  • 网络服务器:如 Web 服务器,使用 sendfile() 从磁盘读取文件并直接发送到网络,适合文件传输、流媒体等应用场景。
  • 数据流处理:如 Kafka 等消息系统,在处理大规模数据时使用 "zero-copy" 技术提高性能。
  • 高性能计算:需要大量数据传输的场景,如数据库系统、科学计算等。

3. Page Cache 优化

Kafka 充分利用了操作系统的 Page Cache,避免频繁的磁盘 I/O 操作。当消息被消费时,如果数据仍然保存在 Page Cache 中,Kafka 可以直接从内存中读取数据,而不需要访问磁盘。这种做法不仅提高了数据读取速度,还减少了磁盘的负载。

Page Cache 是操作系统管理的一块内存区域,用于缓存从磁盘读取的数据。它允许操作系统在内存中保存最近访问的数据块,这样在再次访问这些数据时,不必再次从磁盘读取,从而加速 I/O 操作。

  • 减少磁盘 I/O: 通过将数据缓存到内存中,减少了从磁盘读取数据的需求,降低了磁盘 I/O 的开销。
  • 提高吞吐量和响应速度: 内存访问的速度远远快于磁盘访问,因此利用 Page Cache 可以显著提高 Kafka 的吞吐量和响应速度。
  • 操作系统管理: Kafka 将 Page Cache 的管理交给操作系统,使得 Kafka 不必关心缓存的具体实现和管理,可以更专注于数据流处理和消息的高效传输。

4. 异步 I/O 操作

Kafka 的生产者和消费者客户端都采用了异步 I/O 操作,这意味着消息的发送和接收操作不会阻塞主线程。生产者可以批量发送消息,消费者也可以批量拉取消息。这种批量操作降低了网络和磁盘的开销,提高了消息处理的吞吐量。

5. 高效的存储格式

Kafka 的消息存储格式是简单的二进制日志文件,这些文件以连续的方式存储消息,没有复杂的数据结构和索引。这种设计减少了写入开销,并且在读取时也能高效地顺序读取。

6. 可伸缩的集群架构

Kafka 的集群架构支持水平扩展,可以通过增加更多的 broker 来分担负载。当流量增加时,可以通过增加更多的分区和 broker 来均衡负载,从而提高整个集群的吞吐量。

7. 消息压缩

Kafka 支持消息的批量压缩和解压缩,这减少了传输的数据量,从而提高了网络的利用效率。常用的压缩算法如 gzip、snappy 和 lz4 等,都能在保持较高压缩率的同时,提供良好的压缩和解压缩速度。

8. 数据复制

Kafka 支持跨 broker 的数据复制(Replication),使得即使在某些 broker 出现故障时,其他 broker 仍然可以继续提供服务。这种设计提高了系统的可靠性,同时也在一定程度上提高了读操作的吞吐量。

9. 日志压缩(Log Compaction)

对于那些只关心最新状态的应用,Kafka 通过日志压缩的方式,只保留每个 Key 的最新值,这种方式不仅减少了存储空间,也能提高读取的效率。

“按消息Key保存策略”或“Key-ordering 策略” 是指在Kafka中按照消息的Key进行日志压缩(Log Compaction),以确保对同一个Key的消息,Kafka只保留最新的版本。这种策略主要用于需要保存最新状态的场景,比如保存用户的最新信息、缓存数据的最新版本等。

工作原理:

  1. 消息Key的重要性:在这种策略下,每条消息都必须有一个Key。Kafka会对这些消息进行分区(partition),并在分区内部按照消息的Key进行排序。
  2. 日志压缩:Kafka在后台会不断地进行日志压缩。压缩的过程是,Kafka遍历日志文件中的消息,如果发现同一个Key有多个版本,那么只保留最新的那个版本,其他的旧版本将被删除。
  3. 删除记录(Tombstone):Kafka支持通过发送空值的消息来删除某个Key对应的记录。这种空值的消息被称为“墓碑”(Tombstone),当Kafka发现一个Key的最新值是空值时,最终会将这个Key对应的所有记录删除。

使用场景:

  1. 状态存储:需要保存每个Key的最新状态,且不需要保留历史版本的场景。
  2. 数据去重:在有可能产生重复数据的场景下,可以通过这种策略保留最新的数据,去除重复数据。
  3. 缓存刷新:用来保存最新的缓存数据,当数据更新时只需要更新对应Key的值。

优点:

  • 减少存储空间:通过删除旧版本的数据,节省了存储空间。
  • 维持数据的最新状态:对于状态存储场景,能够确保消费端获取的都是最新的数据。

缺点:

  • 可能延迟数据删除:日志压缩是异步进行的,可能会有一段时间,旧版本的数据仍然保留在日志中。
  • 依赖Key的一致性:如果消息的Key不一致(如没有合理地选择Key),可能导致日志压缩无法生效。

10. 简单的消费者模型

Kafka 使用一种简单的消费者模型,每个消费者读取各自分配到的分区的数据。这种设计避免了传统消息队列中消费者之间的锁竞争问题,进一步提升了消息消费的吞吐量。

生产者

分区

Kafka 中的分区(Partition)是其实现高可用性、高吞吐量和可扩展性的核心设计之一。

分区的好处主要包括以下几个方面:

1. 提高并发和吞吐量
  • 并行处理:每个分区是独立的,可以在不同的 broker 上进行处理。生产者和消费者可以并行地向多个分区写入和读取数据,从而提高 Kafka 集群的整体吞吐量。
  • 负载均衡:通过将数据分布在多个分区和 broker 上,可以有效地分散负载,避免单个 broker 成为性能瓶颈。
2. 扩展性
  • 水平扩展:Kafka 可以通过增加分区的数量来扩展系统的处理能力。当数据量增大时,可以增加分区和 broker 的数量,系统能够横向扩展而不影响现有的架构。
  • 灵活性:新的分区可以随时添加,而无需中断现有的 Kafka 集群运行,这使得 Kafka 的扩展非常灵活。
3. 提高可靠性
  • 分区副本机制:每个分区可以配置多个副本(Replica),这些副本分布在不同的 broker 上。当某个 broker 失效时,Kafka 可以自动从其他副本中选出新的 leader,继续提供服务,从而提高了系统的容错性和可靠性。
  • 数据恢复和再平衡:当某个分区的 leader 出现故障时,Kafka 能够快速地从其他 ISR(In-Sync Replica)中选出新的 leader,确保数据不会丢失。
4. 消息顺序保证
  • 分区内的消息顺序:Kafka 保证同一个分区内的消息是有序的。这对于需要顺序处理消息的应用非常重要,如事件溯源、日志分析等场景。
  • 基于键的分区:生产者可以通过消息键(Key)将相关的消息发送到同一个分区,确保这些消息的顺序性。
5. 消费者组和负载均衡
  • 分区分配:Kafka 的消费者组会自动将分区分配给组内的消费者。每个分区只能被一个消费者处理,从而实现负载均衡。这样,当消费者数量增加时,可以自动分配更多的分区给新加入的消费者,从而实现消费者的动态扩展。
  • 容错性:当某个消费者失效时,其处理的分区可以自动被其他消费者接管,确保消费过程的连续性。
6. 优化资源利用
  • 节省资源:Kafka 分区允许将数据分布在多个 broker 上,利用各个 broker 的存储和计算资源,从而提高了集群的整体资源利用率。
  • 局部性优化:在分布式部署中,分区可以根据数据的地理位置或其他逻辑分区进行划分,优化数据的局部性,提高访问效率

Kafka 生产者可以通过多种方式来提高吞吐量。以下是一些关键的优化策略:

  • 批量大小(batch.size:生产者可以将多个消息聚合成一个批次,批量发送到 Kafka broker。通过设置 batch.size 参数,生产者可以控制批次的大小,批量发送能减少网络请求的频率,从而提高吞吐量。
  • 等待时间(linger.ms:生产者可以通过设置 linger.ms 参数来指定等待一段时间后再发送消息,以便更多的消息可以被聚合到一个批次中。这种方法在负载较低时特别有用,可以增加批量的大小,从而提高吞吐量。
  • 消息压缩(compression.type:生产者可以对消息进行压缩(支持 gzip、snappy、lz4、zstd 等压缩算法),从而减少消息传输的大小,提高网络利用率。压缩后的消息更容易打包成较大的批次,进一步提高吞吐量。
  • 最大请求大小(max.request.size:通过调整 max.request.size 参数,可以控制单个请求中可以发送的最大消息量,以便在网络条件允许的情况下发送更多数据
  • 重试次数(retries:增加重试次数可以提高生产者在网络不稳定情况下的鲁棒性,从而减少消息发送失败的可能性。
  • 请求超时时间(request.timeout.ms:设置合理的请求超时时间,以便在网络延迟时,生产者不会过早地放弃等待,导致重试或失败。
  • 缓冲区大小(buffer.memory:增加生产者的缓冲区大小,使其能够容纳更多的待发送消息,减少因缓冲区满而阻塞的情况。
  • 线程池优化:调节生产者内部的线程池大小,以适应高负载环境下的并发处理需求。
  • 幂等性(enable.idempotence:开启幂等性可以确保消息的唯一性,但也可能稍微降低吞吐量。在追求极致吞吐量时,可以根据需求选择是否开启。
  • 事务支持:对于需要事务支持的场景,可以配置生产者以支持事务,但需要权衡吞吐量和数据一致性。

acks

Kafka 生产者在发送消息时,可以设置不同的应答级别(Acknowledgment level,简称

acks

),以平衡数据可靠性和吞吐量。常见的应答级别有以下几种:

1. **
acks=0

**

  • 无应答:生产者在发送消息后不会等待任何 broker 的确认。这样可以获得最高的吞吐量,因为生产者不会因为等待确认而被阻塞。
  • 缺点:消息可能会在传输过程中丢失,因为生产者不会知道消息是否成功到达 broker。这种设置适用于对数据丢失不敏感的场景。
2. **
acks=1

**

  • Leader 确认:生产者在发送消息后只等待分区的 leader broker 确认消息已经写入到本地日志中,然后就可以继续发送下一条消息。
  • 优点:相比 acks=0,这种模式提供了更高的数据可靠性,因为生产者至少得到了消息已经到达 broker 的确认。
  • 缺点:如果 leader 在消息被 follower 同步之前崩溃,消息可能会丢失。该设置适合需要一定可靠性但不需要完全保证的场景。
3. **
acks=all

(或

acks=-1

)**

  • 全副本确认:生产者在发送消息后,会等待所有 ISR(In-Sync Replicas,同步副本)对消息的确认。这意味着消息已经被写入到所有同步副本中。
  • 优点:这种设置提供了最高的数据可靠性,确保消息即使在 leader 崩溃的情况下也不会丢失。
  • 缺点:由于需要等待所有同步副本的确认,消息的发送延迟会增加,吞吐量可能会受到影响。适合需要强数据一致性的场景。
选择应答级别的权衡
  • **acks=0**:适合需要最大化吞吐量且对数据丢失不敏感的场景。
  • **acks=1**:适合需要较高吞吐量,同时对数据可靠性要求中等的场景。
  • **acks=all**:适合需要确保数据不丢失,优先保证可靠性而非吞吐量的场景。

幂等性

Kafka 的幂等性(Idempotence)是生产者的一项特性,旨在确保消息不会被重复写入。启用幂等性后,即使由于网络故障或重试机制导致生产者多次发送同一条消息,也能保证消息只会被写入一次。幂等性主要解决重复消息问题,是在高可靠性场景下常用的特性。

Kafka 生产者在开启幂等性后,会为每个生产者实例分配一个唯一的

PID

(Producer ID),并为每个分区的每个消息维护一个序列号。当生产者向某个分区发送消息时,消息会携带生产者的

PID

和序列号,Kafka broker 会根据这些信息判断消息是否已经接收过。

  • 当启用幂等性时,Kafka 自动将 acks 设置为 all,以确保所有同步副本都接收到消息,从而保证数据的可靠性。
  • 此外,Kafka 还会自动将 retries 参数设置为一个较大的值(通常是 Integer.MAX_VALUE),以便在失败时无限次重试,确保消息最终能够成功写入。

幂等性的限制
  • 单个分区内的幂等性:Kafka 的幂等性仅对同一生产者发送到同一分区内的消息有效,无法跨分区保证幂等性。
  • 事务:对于需要跨分区、跨 Topic 保证原子性的操作,可以结合 Kafka 的事务功能使用,事务功能通过 transactional.id 参数来标识一个事务性生产者。
适用场景
  • 避免消息重复:在需要严格保证消息不重复写入的场景下,幂等性是必需的。
  • 金融交易系统:例如,处理金融交易、订单系统等,对数据一致性有很高要求的场景。

事务

Kafka 的事务功能允许生产者在多个分区和多个主题(Topic)上进行原子性消息写入。这意味着要么所有消息都成功写入,确保数据一致性,要么所有消息都不会写入,避免部分提交导致的数据不一致问题。

事务的主要功能
  1. 跨分区、跨主题的原子性写入:Kafka 事务可以确保一组消息要么全部成功写入,要么全部失败。即使涉及多个分区和多个主题,也能保证操作的原子性。
  2. 处理“读已提交”数据:Kafka 消费者可以配置为只读取事务已经提交的数据,从而避免消费到未提交的事务消息。
  3. 事务消息与非事务消息隔离:Kafka 通过事务 ID (transactional.id) 将事务消息与非事务消息区分开来,确保事务消息的可靠性。
Kafka 事务的工作原理

Kafka 事务基于两阶段提交(Two-Phase Commit Protocol,2PC)实现:

  1. 初始化事务:生产者通过配置 transactional.id 来启动事务,该 ID 用于唯一标识一个事务性生产者。
  2. 发送消息:生产者开始将消息发送到指定的分区和主题。
  3. 预提交(Prepare Phase):当生产者准备提交事务时,Kafka 会记录所有相关分区的消息偏移量(offset),并将这些消息标记为“准备提交”。
  4. 提交事务(Commit Phase):如果所有相关消息都成功写入,生产者提交事务,这些消息就会对消费者可见。
  5. 回滚事务:如果在事务中发生错误或需要中止操作,生产者可以回滚事务,Kafka 将丢弃该事务中的所有消息,这些消息不会被消费者看到。 Properties props = new Properties(); // 设置事务 id(必须),事务 id 任意起名 properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 初始化事务 kafkaProducer.initTransactions(); // 开启事务 kafkaProducer.beginTransaction(); try { // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { // 发送消息 kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i)); } // int i = 1 / 0; // 提交事务 kafkaProducer.commitTransaction(); } catch (Exception e) { // 终止事务 kafkaProducer.abortTransaction(); } finally { // 5. 关闭资源 kafkaProducer.close(); } }
使用场景

Kafka 事务主要适用于需要严格数据一致性的场景,例如:

  • 金融系统:例如支付处理系统,确保交易记录和资金转移的原子性。
  • 数据库同步:在多个数据库之间同步数据时,确保数据一致性。
  • 订单系统:处理订单和库存更新时,确保订单状态和库存数量的一致性。
注意事项
  • 性能影响:事务机制会增加一些额外的开销,如两阶段提交的通信延迟,因此在性能要求较高的场景下需要权衡。
  • 适用范围:Kafka 事务只适用于 Kafka 内部的消息写入和读取,对于跨系统的分布式事务,需要额外的分布式事务管理器。

数据有序性

Kafka 生产者的数据有序性取决于数据发送到分区的方式和配置。

分区内的顺序性
  • 单分区内的顺序性:在 Kafka 中,消息在同一分区内是有序的。也就是说,如果生产者发送多条消息到同一个分区,Kafka 会按照消息发送的顺序进行写入,并且消费者也会按相同的顺序消费这些消息。
分区间的顺序性
  • 跨分区的无序性:如果消息被发送到不同的分区,Kafka 不保证这些分区之间的消息顺序。由于 Kafka 的分区设计目的是为了实现并行处理和扩展性,不同分区之间的消息可能会并发写入和消费。因此,跨分区的消息顺序在生产和消费时可能会有所不同。
max.in.flight.requests.per.connection

是 Kafka 生产者配置中的一个参数,用于控制在每个连接上可以未完成的请求数量。

  • 如果启用了幂等性(enable.idempotence=true),并且希望保证顺序性,则 Kafka 会要求 max.in.flight.requests.per.connection 不能超过 5,否则顺序性可能会受到影响。
  • 如果这个参数设置为 1,则即使在失败和重试的情况下,也能保证消息的顺序性,但这会降低吞吐量。

Zookeeper

Zookeeper 在 Kafka 集群中扮演着重要角色,负责存储和管理 Kafka 集群的元数据和配置信息。

  • Zookeeper 中存储了当前 Kafka 集群中所有 Broker 的信息,包括每个 Broker 的 ID、IP 地址、端口等。这些信息用于协调 Broker 的加入和退出,以及确保 Broker 之间的协同工作。
  • 对于每个主题,Zookeeper 还记录了分区和副本的分配信息,包括哪些 Broker 是分区的 Leader,以及哪些 Broker 是副本。
  • Zookeeper 记录了当前集群中的 Controller 信息。Controller 是负责管理 Kafka 集群中的分区 Leader 选举和副本分配的特殊 Broker。
  • Zookeeper 记录了每个分区的 ISR 列表,即当前与 Leader 副本保持同步的副本列表。这对 Kafka 的高可用性至关重要,因为 Kafka 只允许从 ISR 中选举新的 Leader。
  • Zookeeper 存储了消费者组的信息,包括消费者组中的成员和每个成员消费的偏移量(对于旧版 Kafka,0.9 及之前版本)。

Kafka 正在逐步抛弃 Zookeeper 的原因主要涉及到以下几个方面:

  • 减少依赖:Zookeeper 是一个独立的分布式协调服务,Kafka 依赖它来管理元数据、协调集群、处理分区的领导者选举等任务。这种依赖增加了 Kafka 部署和运维的复杂性。
  • 统一架构:Kafka 在移除 Zookeeper 之后,采用 Kafka 自身的 Quorum Controller(基于 Raft 协议)来管理元数据和集群状态,使得整个系统架构更为一致。
  • 延迟和吞吐量:使用 Zookeeper 时,Kafka 需要通过网络与 Zookeeper 交互,这增加了元数据更新和 Leader 选举的延迟。
  • 扩展性:Zookeeper 的扩展性和性能在大规模 Kafka 集群中可能成为瓶颈。
  • 一致性保障:Zookeeper 基于 Paxos 协议实现一致性,但由于架构复杂,某些极端情况下可能出现一致性问题。Kafka 新引入的 Quorum Controller 基于 Raft 协议,能提供更简单和可靠的一致性保障。
  • 减少分区脑裂问题:在某些故障场景下,Zookeeper 可能导致 Kafka 集群发生分区脑裂问题,导致不同节点对集群状态有不同的看法。通过使用 Kafka 自身的 Quorum Controller,可以更好地避免这种情况。
  • 动态配置:在新架构中,Kafka 可以更轻松地支持动态配置更新,不需要依赖 Zookeeper 的复杂配置管理。

Kafka Broker

Kafka Broker 是 Kafka 集群中的核心组件,负责处理生产者和消费者的请求,并管理消息的存储和转发。

以下是 Kafka Broker 的总体工作流程:

1.启动并注册
  • 注册到 Zookeeper(旧版本)或 Raft Controller(新版本):Kafka Broker 启动时,会向 Zookeeper 或 Raft Controller 注册自己,并获取集群中的元数据,如其他 Broker 的信息、主题和分区的配置等。
  • 初始化存储:Kafka Broker 会初始化本地存储系统,准备好用于存储消息的日志文件。
2. 处理生产者请求
  • 接收消息:生产者通过 TCP 连接向 Kafka Broker 发送消息。Broker 会根据消息的 Key 和分区器决定消息应存储到哪个分区。
  • 写入日志文件:Broker 将接收到的消息追加到相应分区的日志文件中。Kafka 的日志文件是顺序写入的,这使得写操作非常高效。
  • 同步副本:如果该分区有多个副本,Leader Broker 会将消息同步到其他副本(Follower)。只有当所有副本都确认接收到消息后,Leader 才会向生产者发送 ACK。
3. 管理分区和副本
  • 分区和副本的分配:Kafka Broker 根据集群配置和负载均衡策略,分配主题的分区和副本。在分区的 Leader 失效时,Zookeeper 或 Raft Controller 会选择新的 Leader。
  • ISR 列表管理:Kafka Broker 维护每个分区的 ISR(In-Sync Replicas)列表,记录当前与 Leader 同步的副本列表。
4. 处理消费者请求
  • 消费者订阅:消费者订阅主题后,Broker 会根据消费者组的偏移量记录,找到消费者应消费的分区和偏移量。
  • 消息读取:Broker 从指定的分区和偏移量开始读取消息,并通过 TCP 连接发送给消费者。Kafka 的读取是 O(1) 复杂度,因此读取性能很高。- 顺序写入:Kafka 的消息是以顺序方式追加到分区日志文件中的,每条消息都有一个递增的偏移量。这种顺序写入方式避免了随机写入的磁盘寻道开销。- 顺序读取:消费者读取消息时,通过偏移量直接定位到日志文件中的特定位置,从而实现顺序读取,这样可以充分利用磁盘的顺序读写优势,提高 IO 性能。- 基于偏移量的寻址:Kafka 的每条消息都有唯一的偏移量。消费者通过指定偏移量读取消息时,Kafka 只需根据偏移量直接定位到日志文件中的相应位置,并开始读取消息,而无需遍历文件内容。这种基于偏移量的寻址方式使得读取操作的时间复杂度为 O(1)。- 稀疏索引:Kafka 的日志文件使用稀疏索引,即只有部分偏移量有索引项,对于其他偏移量则通过顺序读取快速定位。稀疏索引既减少了内存消耗,又能保证读取速度。
  • 更新消费偏移量:消费者消费完消息后,Broker 会更新消费者组的偏移量记录,确保消息不会被重复消费。
5. 日志清理
  • 日志压缩和清理:Kafka Broker 定期对日志文件进行清理或压缩,以释放磁盘空间。根据配置,Kafka 可以执行基于时间的清理或基于 Key 的压缩(保留每个 Key 的最新消息)。
6. 处理请求和响应
  • 网络请求处理:Kafka Broker 内部使用线程池来处理网络请求,包括处理生产者的写入请求、消费者的读取请求以及管理请求(如元数据查询)。
  • 响应客户端:Broker 在完成处理后,生成响应并通过网络返回给客户端。
7. 监控和管理
  • 度量和监控:Kafka Broker 会收集各种运行指标,如请求延迟、吞吐量、磁盘使用等。运维人员可以通过这些指标来监控 Kafka 集群的健康状况。
  • 动态配置管理:Broker 支持在运行时动态更新配置,管理员可以在不中断服务的情况下调整 Kafka 的行为。
8. 故障处理
  • 自动故障恢复:当某个 Broker 失效时,Zookeeper 或 Raft Controller 会重新分配该 Broker 管理的分区和副本,确保集群的高可用性。

  • 数据恢复:在 Broker 恢复后,它会自动从其他副本同步缺失的数据,重新加入 ISR 列表。

  • AR(Assigned Replicas)是所有副本的集合。

  • ISR(In-Sync Replicas)是与 Leader 副本保持同步的副本集合。

  • OSR(Out of Sync Replicas)是未与 Leader 副本保持同步的副本集合。

Follower故障处理细节

Leader故障处理细节

自动平衡

kafka文件存储机制

以下是 Kafka 文件存储机制的详细解释:

1. 分区与日志文件
  • 分区(Partition): - 每个 Kafka 主题(Topic)可以有多个分区,每个分区是一个独立的日志文件。这些分区分布在不同的 Kafka Broker 上。
  • 日志文件(Log File): - 每个分区的数据在磁盘上存储为一个日志文件,所有消息按顺序追加到这个日志文件中。Kafka 的日志文件是分段存储的,每个日志段(Log Segment)都是一个物理文件。
2. 日志段与索引
  • 日志段(Log Segment):- 为了管理超大规模的数据,Kafka 会将每个分区的日志文件拆分为多个日志段文件。每个日志段文件都是固定大小(默认为 1GB),每个段内的消息是按顺序存储的。
  • 段文件结构:- 每个日志段由两个文件组成:一个是实际的日志数据文件(以 .log 为后缀),另一个是偏移量索引文件(以 .index 为后缀)。此外,还有一个时间戳索引文件(以 .timeindex 为后缀),用于快速定位基于时间的消息。
  • 索引文件:- Kafka 使用稀疏索引技术。索引文件中只保存一部分消息的偏移量与实际物理位置的映射,这样可以减少索引文件的大小,同时保持良好的查找性能。

文件清理策略

消费者

Kafka 消费者的工作流程涉及从集群中拉取消息、处理消息、提交偏移量等步骤。以下是 Kafka 消费者的总体工作流程的详细说明:

1. 消费者启动
  • 配置初始化:消费者在启动时,需要配置一系列参数,包括 Kafka 集群的 bootstrap.servers 地址、消费的主题(Topic)、消费者组(Consumer Group)等。
  • 连接 Kafka 集群:消费者根据配置的 bootstrap.servers 地址连接到 Kafka 集群。Kafka 集群会返回可用的 Broker 信息,供消费者进行后续的分区分配和消息拉取。
2. 分区分配
  • 消费者组协调(Group Coordinator):每个消费者组都有一个协调者(Group Coordinator),负责管理消费者组内的分区分配。消费者启动后,首先会向协调者注册,表明自己属于哪个消费者组。
  • 分区分配策略: - Range、RoundRobin、Sticky 等:协调者会根据消费者组内的消费者数量和分区数量,按照配置的分区分配策略,将主题中的分区分配给组内的消费者。每个分区只能分配给一个消费者实例。- 分区重新平衡(Rebalance):当消费者组内的消费者数量发生变化(例如有消费者加入或退出组),或者主题的分区数量发生变化时,协调者会触发分区重新平衡,将分区重新分配给消费者。
3. 消息拉取
  • 拉取消息(Pull Model):消费者会根据分配到的分区,从 Kafka Broker 中主动拉取消息。拉取操作是由消费者发起的,消费者决定何时拉取、拉取多少消息。
  • 批量拉取:消费者可以设置参数如 fetch.min.bytesfetch.max.wait.ms 来控制每次拉取的消息量和等待时间,从而优化网络和处理性能。
4. 消息处理
  • 消息消费:消费者从 Broker 拉取消息后,将消息传递给应用程序进行处理。消息处理的逻辑由应用程序自定义,可能涉及数据解析、业务处理、存储等操作。
  • 幂等性处理:在处理消息时,如果需要确保操作的幂等性,应用程序需要设计相应的逻辑,避免消息重复消费带来的问题。
5. 偏移量提交
  • 偏移量管理: - 自动提交(auto.commit.enable):如果开启自动提交,消费者会定期将当前的偏移量提交到 Kafka 中,提交的时间间隔由 auto.commit.interval.ms 参数控制。- 手动提交:如果需要更精细地控制偏移量的提交时机,消费者可以通过手动提交的方式,在消息处理完成后提交偏移量。
  • 提交方式: - 同步提交:消费者调用 commitSync() 方法提交偏移量,该方法会阻塞直到偏移量提交成功为止。- 异步提交:消费者调用 commitAsync() 方法提交偏移量,该方法会异步提交偏移量,不会阻塞当前线程。
6. 故障处理
  • 消费者故障: - 如果消费者在处理过程中发生异常,可能会重新启动或重新平衡分区分配。重新启动的消费者会从上次提交的偏移量处继续消费,确保数据不会丢失。
  • Broker 故障: - 如果消费者无法连接到 Broker,它会不断尝试重连,直到恢复连接。当 Broker 恢复时,消费者会继续消费消息。
7. 消费者停止
  • 优雅关闭:消费者在关闭时,需要优雅地退出消费者组,并提交最后的偏移量。这可以通过 consumer.close() 方法来实现,确保关闭过程中的数据不会丢失或重复消费。

生产者的推模式(Push Model)

  • 定义:生产者负责将消息推送到 Kafka 中。消息发送到特定的主题(Topic),并根据分区策略分配到相应的分区中。
  • 特点: - 控制权在生产者:生产者决定消息的发送时间、发送到的主题和分区。它可以根据消息的 key 或者分区策略,决定将消息发送到哪个分区。- 优化性能:生产者可以批量发送消息,以提高吞吐量。它可以通过设置批量大小(batch.size)和延迟(linger.ms)来控制批量发送的行为。- 灵活性:生产者可以根据网络条件、负载情况调整消息的发送频率和大小,灵活应对各种使用场景。

消费者的拉模式(Pull Model)

  • 定义:消费者从 Kafka 中拉取消息。消费者根据自己的消费能力和需求,主动向 Kafka 请求消息。
  • Kafka的消息获取模式是pull(‌拉)‌模式,‌即消费者主动从broker中拉取数据。‌这种模式的优点在于消费者可以控制消息的获取速率,‌避免因为broker决定消息发送速率而难以适应所有消费者的消费速率的问题。‌然而,‌这种模式的不足之处在于,‌如果没有数据可拉取,‌消费者可能会陷入循环中,‌一直返回空数据,‌这可能会导致资源的浪费。‌为了减轻拉模式的缺点,‌Kafka采用了长轮询机制。‌在这种机制下,‌当消费者请求拉取消息时,‌如果broker中没有满足条件的数据,‌broker不会立即返回空结果,‌而是会等待一段时间,‌如果在这段时间内有新消息到达,‌则将这些新消息返回给消费者。‌这种机制有效地减轻了消费者频繁请求而得不到数据的问题,‌提高了资源的使用效率。‌

消费者组

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

一个分区只能由一个消费者组中的一个消费者进行消费,这是为了确保消息的有序性和数据处理的一致性。

  • 有序性:确保分区内消息按顺序消费,适用于需要严格顺序处理的场景。
  • 一致性:通过分区与消费者的一对一映射,简化偏移量管理,确保数据处理的一致性。
  • 资源利用:避免多消费者竞争资源,提高系统的性能和稳定性。

Rebalance(重新平衡)

是指消费者组内部对主题分区的重新分配过程。Rebalance 是 Kafka 消费者组保证高可用性和负载均衡的关键机制。当消费者组中的消费者或分区发生变化时,Kafka 会自动触发 Rebalance,以确保所有分区都被有效地分配和消费。

Rebalance 触发的场景
  1. 消费者加入组:- 当一个新的消费者加入现有的消费者组时,Kafka 会触发 Rebalance。此时,组内的分区会被重新分配,以便新加入的消费者能够消费一部分分区的数据。
  2. 消费者离开组:- 当某个消费者实例意外退出(如崩溃、网络中断等)或正常关闭时,Kafka 会触发 Rebalance,将原本分配给该消费者的分区重新分配给组内的其他消费者,以确保分区不会出现空闲无人消费的情况。
  3. 主题的分区数发生变化:- 如果主题的分区数量增加或减少,Kafka 也会触发 Rebalance,以便重新分配分区,确保消费者组能够处理所有分区的数据。
  4. 消费者组协调者(Group Coordinator)变更:- 在某些情况下,消费者组的协调者(Group Coordinator)发生变更时,Kafka 也会触发 Rebalance,以重新分配分区。
  5. 消费者消费超时
Rebalance 的过程
  1. 暂停消息消费:- 在 Rebalance 期间,消费者会暂停从分区中拉取消息,以避免在分区重新分配过程中发生消息丢失或重复消费的情况。
  2. 分区重新分配:- 组协调者(Group Coordinator)负责将主题的分区重新分配给消费者组中的各个消费者。分配的策略可以是 Range、RoundRobin、Sticky 等。- 分区重新分配后,每个消费者将收到新的分区列表,明确自己负责消费哪些分区的数据。
  3. 提交偏移量:- Rebalance 之前,消费者通常会提交当前处理的偏移量,以确保 Rebalance 之后能从正确的偏移量处继续消费,避免消息丢失或重复消费。
  4. 恢复消息消费:- 分区分配完成后,消费者恢复从新的分区中拉取消息并处理,消费者组回到正常的工作状态。
Rebalance 的影响
  • 延迟:Rebalance 会导致消费者组中的消息消费暂时中断,直到重新分配完成。这可能会导致短暂的消费延迟。
  • 负载均衡:Rebalance 的主要目的是为了实现分区的负载均衡,确保消费者组内的消费者能够公平地分配分区,避免资源浪费。
  • 容错性:Rebalance 还提升了消费者组的容错性。当消费者实例意外退出时,其他消费者可以接管其分区,保证消息不丢失。
Rebalance 的优化
  • Sticky 分配策略:Sticky 分配策略会尽量保持分区分配的一致性,避免在 Rebalance 时频繁变动分区的归属,减少 Rebalance 的开销。
  • 心跳机制(Heartbeat):消费者定期向协调者发送心跳,如果协调者在规定时间内没有收到心跳信号,就认为消费者失联,从而触发 Rebalance。通过调节心跳间隔,可以影响 Rebalance 的触发频率。
  • Session Timeout 和 Rebalance Timeout:通过调整消费者的 session.timeout.msmax.poll.interval.ms 等配置,可以控制 Rebalance 的频率和持续时间,以减少对消费性能的影响。

分区再分配策略

RangeAssignor

RangeAssignor 会根据消费者组中的消费者数量和主题的分区数量,将分区按顺序均匀分配给每个消费者。每个消费者可能会分配到多个连续的分区。

如果分区数量不能整除消费者数量,部分消费者可能会分配到比其他消费者更多的分区,导致负载不均衡。如果一个消费者故障,此时该消费者负责的分区会分配到另一个消费者上,负载过大。

RoundRobinAssignor

RoundRobinAssignor 会将所有分区和消费者进行排序,然后逐个将分区分配给消费者,确保每个消费者尽可能均匀地分配到分区。

StickyAssignor

StickyAssignor 通过一种粘性(Sticky)的方式分配分区,尽量保持每个消费者之前的分区分配不变,同时尽量均匀地分配新的分区。当消费者数量或分区数量变化时,也尽量减少重新分配的次数。

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,

考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区

到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分

区不变化。

offset(偏移量)

在 Kafka 中,消费者的 offset(偏移量)可以用于跟踪消费者在主题分区中读取到的最后一条消息的位置。

从 Kafka 0.9 版本开始,消费者的 offset 默认保存在 Kafka 自己的内部主题

__consumer_offsets

中。这个主题是一个特殊的 Kafka 主题,Kafka 会自动创建并管理它。

  • 可以在多个消费者组之间共享偏移量。
  • 消费者组重新启动时可以从上次的偏移量继续消费。
  • 通过 Kafka 的复制机制保障了 offset 的高可用性。

在 Kafka 0.9 之前的版本中,消费者的 offset 是保存在 Zookeeper 中的。消费者会将它读取的偏移量定期写入到 Zookeeper 中,这种方式在性能和可靠性方面有所不足,因此在之后的版本中被弃用。

  • 默认情况下,Kafka 使用内部主题 __consumer_offsets 来维护消费者的 offset,这种方式兼顾了性能和可靠性,适用于大多数使用场景。
  • 对于特定需求或早期版本的 Kafka,也可以选择将 offset 保存在 Zookeeper 或其他自定义存储位置。

fetch.max.bytes默认 Default: 52428800(50m)。消费者获取服务器端一批消息最大的字节数。

max.poll.records一次 pol 拉取数据返回消息的最大条数,默认是 500 条

KRaft 的核心概念

KRaft 将 Kafka 的集群管理功能从 Zookeeper 移至 Kafka 自身,通过内置的 Raft 共识算法管理集群的元数据和控制器选举。

  1. Raft 共识算法: - KRaft 使用 Raft 共识算法来确保集群元数据的一致性。Raft 是一种分布式共识算法,能够在多副本之间达成一致性决策。
  2. 单节点控制器: - 在 KRaft 模式下,Kafka 的控制器(Controller)角色由集群中的某一个节点扮演。控制器负责管理分区的 Leader 选举、分区的状态变更等。
  3. 元数据日志(Metadata Log): - KRaft 引入了元数据日志的概念,将集群的元数据变更记录到一个特殊的日志中。所有 Kafka Broker 通过 Raft 协议保持对元数据日志的副本的一致性。

KRaft 的优势

  1. 消除了对 Zookeeper 的依赖:- 通过 KRaft,Kafka 可以完全独立于 Zookeeper 运行,减少了运维复杂度。
  2. 一致性增强:- 由于 KRaft 使用了 Raft 共识算法,Kafka 的元数据一致性和可靠性得到了增强。
  3. 简化架构:- KRaft 将集群管理和元数据维护直接集成到 Kafka Broker 中,简化了系统架构。
  4. 可扩展性增强:- 通过移除 Zookeeper,Kafka 在大规模集群中的可扩展性得到了进一步提升。

Kafka不再依赖外部框架,而是能够独立运行;

controller管理集群时,不再需要从zookeeper中先读取数据,集群性能上升;

由于不依赖zookeeper,集群扩展时不再受到zookeeper读写能力限制;

controller不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强

controller节点的配置,而不是像以前一样对随机controller节点的高负载束手无策。

标签: kafka 分布式

本文转载自: https://blog.csdn.net/m0_61924236/article/details/140372204
版权归原作者 大专er 所有, 如有侵权,请联系我们删除。

“Kafka”的评论:

还没有评论