学习记录来自:尚硅谷大数据技术之 Kafka
消息队列(MQ)
消息队列(MQ)在分布式系统中有着广泛的应用。以下是一些常见的应用场景:
- 解耦系统:消息队列可以用来解耦不同系统或服务之间的依赖。生产者只需将消息发送到队列,不需要关心消费者的处理速度或处理逻辑,消费者可以根据自己的节奏从队列中读取和处理消息。
- 异步处理:在需要异步处理的场景中,消息队列非常有用。生产者可以迅速地将任务提交到队列,而不必等待任务完成。消费者可以在后台异步地处理这些任务。例如,订单处理系统可以立即响应用户的订单请求,并将订单处理任务放入队列进行异步处理。
- 异步调用的优势包括:降低耦合度、提高性能和拓展性方面。缺点,比如依赖于中间件(Broker)的可靠性以及架构复杂性带来的维护挑战。
- 负载调节:消息队列可以帮助调节系统负载。在高峰期,生产者可以快速地将大量请求放入队列,而消费者可以根据自己的处理能力逐步处理这些请求,从而避免系统过载。
- 日志和监控:许多系统使用消息队列来收集和传输日志或监控数据。日志数据可以通过消息队列传输到集中式的日志管理系统,进行统一的存储和分析。
- 数据流处理:消息队列在实时数据流处理场景中非常有用。例如,金融系统中的交易数据、物联网系统中的传感器数据,都可以通过消息队列进行实时传输和处理。
- 事件驱动架构:在事件驱动架构中,消息队列用于传递事件。例如,电商平台可以使用消息队列来传递订单创建、支付成功、发货等事件,相关的服务可以订阅这些事件并作出相应的处理。
- 分布式事务:在分布式系统中实现事务一致性非常复杂。通过消息队列,可以实现某种形式的事务管理,如最终一致性。生产者发送消息表示某个事务的状态变化,消费者根据消息的状态变化来处理相应的事务逻辑。
两种模式
消息队列主要有两种模式:点对点(Point-to-Point)模式和发布/订阅(Publish/Subscribe)模式。
点对点模式:
- 基本概念:在点对点模式中,消息生产者将消息发送到队列,消息消费者从队列中读取消息。每条消息只会被一个消费者消费。
- 特点: - 消息确认:消费者读取到消息后,需要向消息队列发送确认,表示该消息已被成功处理。未被确认的消息会重新投递。- 消息持久化:消息在队列中可以持久化保存,直到被某个消费者成功消费。- 负载均衡:多个消费者可以共同消费一个队列中的消息,实现负载均衡。
- 适用场景:适用于任务处理、工作队列等场景。例如,一个订单处理系统,订单生成后放入队列,不同的消费者从队列中读取并处理订单。
发布/订阅模式:
- 基本概念:在发布/订阅模式中,消息生产者将消息发布到一个主题(Topic),消息消费者订阅该主题,并接收所有发布到该主题的消息。每条消息可以被多个消费者消费。
- 特点: - 多播机制:同一条消息可以被所有订阅该主题的消费者接收,适用于广播通知。- 灵活订阅:消费者可以灵活订阅一个或多个主题,根据需要接收不同类型的消息。- 消息持久化:消息可以持久化保存,允许消费者在重启后继续接收之前未消费的消息。
- 适用场景:适用于事件通知、日志广播、实时数据流等场景。例如,新闻发布系统,新闻发布后,所有订阅该主题的用户都会接收到新闻推送。
RabbitMQ
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个应用层协议,用于消息中间件系统之间的通信。它定义了消息传递的规则和格式,确保不同的系统和应用能够通过消息队列进行可靠的、异步的消息传递。
Spring AMQP(Asynchronous Messaging with RabbitMQ)是Spring框架中用于处理消息队列的一个模块。它提供了一套简化与RabbitMQ进行集成和操作的工具,使得开发人员能够方便地使用消息队列实现异步通信、消息传递、发布/订阅等功能。
Kafka
Kafka是一个分布式的消息系统,用于处理和传输大量的实时数据。它的架构设计使其能够提供高吞吐量和低延迟的消息传递。
首先,Kafka由多个服务器(称为代理)组成,这些代理共同工作来处理数据。这些数据被组织成主题,主题是消息的类别或分组,每个主题又分为多个分区,以实现并行处理和高效存储。
在使用Kafka时,生产者将数据发送到Kafka集群中的特定主题。每个消息都会被写入一个特定的分区。生产者可以根据一些策略来选择将消息发送到哪个分区,如循环选择或基于消息内容的哈希选择。
消费者从Kafka集群中读取数据,它们可以订阅一个或多个主题并处理从这些主题接收到的消息。消费者组允许多个消费者共同处理一个主题中的消息,每个消费者处理不同的分区,确保消息被有效处理而不会重复。
Kafka还具有数据持久化的特点,所有消息都会被持久化到磁盘中,并且可以配置消息的保留策略,确定消息在磁盘中保留的时间。
Kafka的另一个重要特性是它的高可用性和容错性。通过复制,每个分区的数据都会被复制到多个代理中,确保即使某个代理发生故障,数据仍然可用。
架构
Kafka 的基础架构主要由几个关键组件构成,包括主题、分区、代理、生产者、消费者和消费者组。以下是对这些组件的详细介绍:
主题(Topic):主题是消息的分类或分组。生产者将消息发送到某个主题,消费者从主题中读取消息。每个主题可以有多个分区。
分区(Partition):每个主题被划分为多个分区,每个分区是一个有序的、不变的消息序列,分区中的每条消息都有一个唯一的偏移量。分区的设计有助于实现并行处理和高可用性。每个分区在物理上对应一个文件夹,该文件夹里面存储了这个分区的所有消息和索引文件。在创建topic时可指定parition数量,生产者将消息发送到topic时,消息会根据 分区策略 追加到分区文件的末尾,属于顺序写磁盘。
代理(Broker):Kafka 运行在一个或多个服务器(代理)上,这些服务器共同组成一个 Kafka 集群。每个代理负责存储分配给它的分区数据。代理之间通过复制来保证数据的高可用性和容错性。一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。
生产者(Producer):生产者是负责将消息发送到 Kafka 主题的应用程序。生产者可以选择将消息发送到特定的分区,或使用轮询等策略将消息分配到不同的分区。
消费者(Consumer):消费者是负责从 Kafka 主题读取消息的应用程序。消费者可以订阅一个或多个主题,并从这些主题的分区中读取消息。
消费者组(Consumer Group):消费者组是一组协同工作的消费者,它们共同消费一个或多个主题的消息。每个分区的消息只能被同一个消费者组中的一个消费者消费,从而实现了负载均衡。多个消费者组可以独立消费同一个主题的消息。
Kafka Controller:控制器是 Kafka 集群中的一个代理,它负责管理分区的领导者选举和副本分配。当某个代理失败时,控制器会选择新的分区领导者,确保数据的高可用性。
ZooKeeper:Kafka 使用 ZooKeeper 来管理和协调 Kafka 集群。它负责存储集群的元数据,如代理列表、主题和分区信息,以及消费者组的偏移量信息。ZooKeeper 确保 Kafka 集群的高可用性和一致性。
消息存储:Kafka 使用日志文件来存储消息。每个分区对应一个日志文件,消息按顺序追加到日志文件中。Kafka 提供了多种配置选项来管理日志文件的存储和清理,如日志保留策略、日志压缩等。
副本: Kafka 中用来实现数据高可用性的一种机制。每个分区的数据都会被复制到多个 Kafka Broker 上,这些副本组成了一个副本集(Replica Set)。
Kafka 的架构设计旨在提供高吞吐量、低延迟、可扩展性和高可用性,使其成为构建实时数据流处理系统的理想选择。通过理解 Kafka 的基础架构,可以更好地设计和优化基于 Kafka 的应用程序。
大白话 kafka 架构原理 (qq.com)
kafka吞吐量为什么这么高
Kafka 的高吞吐量来自于其独特的架构设计和多种优化技术。以下是 Kafka 能够实现高吞吐量的主要原因:
1. 顺序读写和分区设计
- 顺序读写:Kafka 的设计使得消息的读写都是顺序进行的。这种顺序操作非常适合磁盘的特性,因为顺序写入的速度要远远快于随机写入。这种操作减少了磁盘寻道时间,从而提升了吞吐量。
- 分区(Partitioning):Kafka 将每个 Topic 分为多个分区,每个分区可以独立地进行读写操作。这使得 Kafka 能够通过水平扩展分区的数量来处理更高的负载,从而提高整体吞吐量。
2. 零拷贝机制(Zero-Copy)
Kafka 使用 Linux 的零拷贝(Zero-Copy)机制,尤其是在消息的传输过程中。通过系统调用如
sendfile()
,Kafka 可以直接将数据从磁盘读取并发送到网络,而不需要经过用户空间的缓冲区,从而减少了 CPU 使用和内存带宽消耗。这种方式大幅提升了消息传输的效率。
Linux 操作系统的 "zero-copy" 机制是一种优化数据传输的技术,它能够在应用程序和内核之间传递数据时,避免不必要的数据拷贝,从而提高系统的性能和效率。它在网络编程和文件操作中非常有用,特别是在需要大规模数据传输的场景中。
1."Zero-copy" 的工作原理
在传统的数据传输流程中,比如从文件读取数据并通过网络发送到另一个系统,通常会经历多个数据拷贝过程:
- 从磁盘读取到内核缓冲区: 数据首先从磁盘读取到操作系统内核空间的缓冲区。
- 从内核缓冲区复制到用户空间: 然后数据从内核缓冲区复制到应用程序的用户空间缓冲区。
- 从用户空间复制回内核空间: 应用程序处理完数据后,再将其从用户空间复制回内核空间,准备通过网络发送。
- 发送数据到网络缓冲区: 最后,数据从内核空间被复制到网络缓冲区,通过网络接口发送出去。
在这个过程中,数据需要多次在用户空间和内核空间之间拷贝,这会带来额外的 CPU 开销和内存带宽消耗。而 "zero-copy" 机制减少甚至避免了这些不必要的拷贝过程。
2.Linux 中的 "zero-copy" 实现方式
Linux 提供了几种 "zero-copy" 的实现方式,主要包括:
sendfile()
系统调用:sendfile()
是 Linux 中常用的 "zero-copy" 系统调用。它直接将文件内容从内核的 Page Cache 发送到网络接口,而不需要将数据拷贝到用户空间。它大大减少了数据拷贝的次数。splice()
和vmsplice()
系统调用: 这两个系统调用允许在文件描述符之间传递数据,而无需将数据复制到用户空间。splice()
可以在两个文件描述符之间移动数据,vmsplice()
则将用户空间的内存映射到管道(pipe)中,从而避免数据拷贝。- 内存映射(
mmap()
): 虽然严格意义上不是 "zero-copy",但是通过mmap()
可以将文件映射到进程的地址空间中,允许应用程序直接访问文件内容,而无需进行额外的数据拷贝。
优势
- 减少 CPU 负载:通过减少数据在用户空间和内核空间之间的拷贝次数,"zero-copy" 机制显著降低了 CPU 的负载。
- 提高吞吐量:避免不必要的内存拷贝,提高了数据传输的效率,适合高吞吐量场景。
- 降低延迟:减少数据拷贝的时间,有助于降低传输延迟。
场景
- 网络服务器:如 Web 服务器,使用
sendfile()
从磁盘读取文件并直接发送到网络,适合文件传输、流媒体等应用场景。 - 数据流处理:如 Kafka 等消息系统,在处理大规模数据时使用 "zero-copy" 技术提高性能。
- 高性能计算:需要大量数据传输的场景,如数据库系统、科学计算等。
3. Page Cache 优化
Kafka 充分利用了操作系统的 Page Cache,避免频繁的磁盘 I/O 操作。当消息被消费时,如果数据仍然保存在 Page Cache 中,Kafka 可以直接从内存中读取数据,而不需要访问磁盘。这种做法不仅提高了数据读取速度,还减少了磁盘的负载。
Page Cache 是操作系统管理的一块内存区域,用于缓存从磁盘读取的数据。它允许操作系统在内存中保存最近访问的数据块,这样在再次访问这些数据时,不必再次从磁盘读取,从而加速 I/O 操作。
- 减少磁盘 I/O: 通过将数据缓存到内存中,减少了从磁盘读取数据的需求,降低了磁盘 I/O 的开销。
- 提高吞吐量和响应速度: 内存访问的速度远远快于磁盘访问,因此利用 Page Cache 可以显著提高 Kafka 的吞吐量和响应速度。
- 操作系统管理: Kafka 将 Page Cache 的管理交给操作系统,使得 Kafka 不必关心缓存的具体实现和管理,可以更专注于数据流处理和消息的高效传输。
4. 异步 I/O 操作
Kafka 的生产者和消费者客户端都采用了异步 I/O 操作,这意味着消息的发送和接收操作不会阻塞主线程。生产者可以批量发送消息,消费者也可以批量拉取消息。这种批量操作降低了网络和磁盘的开销,提高了消息处理的吞吐量。
5. 高效的存储格式
Kafka 的消息存储格式是简单的二进制日志文件,这些文件以连续的方式存储消息,没有复杂的数据结构和索引。这种设计减少了写入开销,并且在读取时也能高效地顺序读取。
6. 可伸缩的集群架构
Kafka 的集群架构支持水平扩展,可以通过增加更多的 broker 来分担负载。当流量增加时,可以通过增加更多的分区和 broker 来均衡负载,从而提高整个集群的吞吐量。
7. 消息压缩
Kafka 支持消息的批量压缩和解压缩,这减少了传输的数据量,从而提高了网络的利用效率。常用的压缩算法如 gzip、snappy 和 lz4 等,都能在保持较高压缩率的同时,提供良好的压缩和解压缩速度。
8. 数据复制
Kafka 支持跨 broker 的数据复制(Replication),使得即使在某些 broker 出现故障时,其他 broker 仍然可以继续提供服务。这种设计提高了系统的可靠性,同时也在一定程度上提高了读操作的吞吐量。
9. 日志压缩(Log Compaction)
对于那些只关心最新状态的应用,Kafka 通过日志压缩的方式,只保留每个 Key 的最新值,这种方式不仅减少了存储空间,也能提高读取的效率。
“按消息Key保存策略”或“Key-ordering 策略” 是指在Kafka中按照消息的Key进行日志压缩(Log Compaction),以确保对同一个Key的消息,Kafka只保留最新的版本。这种策略主要用于需要保存最新状态的场景,比如保存用户的最新信息、缓存数据的最新版本等。
工作原理:
- 消息Key的重要性:在这种策略下,每条消息都必须有一个Key。Kafka会对这些消息进行分区(partition),并在分区内部按照消息的Key进行排序。
- 日志压缩:Kafka在后台会不断地进行日志压缩。压缩的过程是,Kafka遍历日志文件中的消息,如果发现同一个Key有多个版本,那么只保留最新的那个版本,其他的旧版本将被删除。
- 删除记录(Tombstone):Kafka支持通过发送空值的消息来删除某个Key对应的记录。这种空值的消息被称为“墓碑”(Tombstone),当Kafka发现一个Key的最新值是空值时,最终会将这个Key对应的所有记录删除。
使用场景:
- 状态存储:需要保存每个Key的最新状态,且不需要保留历史版本的场景。
- 数据去重:在有可能产生重复数据的场景下,可以通过这种策略保留最新的数据,去除重复数据。
- 缓存刷新:用来保存最新的缓存数据,当数据更新时只需要更新对应Key的值。
优点:
- 减少存储空间:通过删除旧版本的数据,节省了存储空间。
- 维持数据的最新状态:对于状态存储场景,能够确保消费端获取的都是最新的数据。
缺点:
- 可能延迟数据删除:日志压缩是异步进行的,可能会有一段时间,旧版本的数据仍然保留在日志中。
- 依赖Key的一致性:如果消息的Key不一致(如没有合理地选择Key),可能导致日志压缩无法生效。
10. 简单的消费者模型
Kafka 使用一种简单的消费者模型,每个消费者读取各自分配到的分区的数据。这种设计避免了传统消息队列中消费者之间的锁竞争问题,进一步提升了消息消费的吞吐量。
生产者
分区
Kafka 中的分区(Partition)是其实现高可用性、高吞吐量和可扩展性的核心设计之一。
分区的好处主要包括以下几个方面:
1. 提高并发和吞吐量
- 并行处理:每个分区是独立的,可以在不同的 broker 上进行处理。生产者和消费者可以并行地向多个分区写入和读取数据,从而提高 Kafka 集群的整体吞吐量。
- 负载均衡:通过将数据分布在多个分区和 broker 上,可以有效地分散负载,避免单个 broker 成为性能瓶颈。
2. 扩展性
- 水平扩展:Kafka 可以通过增加分区的数量来扩展系统的处理能力。当数据量增大时,可以增加分区和 broker 的数量,系统能够横向扩展而不影响现有的架构。
- 灵活性:新的分区可以随时添加,而无需中断现有的 Kafka 集群运行,这使得 Kafka 的扩展非常灵活。
3. 提高可靠性
- 分区副本机制:每个分区可以配置多个副本(Replica),这些副本分布在不同的 broker 上。当某个 broker 失效时,Kafka 可以自动从其他副本中选出新的 leader,继续提供服务,从而提高了系统的容错性和可靠性。
- 数据恢复和再平衡:当某个分区的 leader 出现故障时,Kafka 能够快速地从其他 ISR(In-Sync Replica)中选出新的 leader,确保数据不会丢失。
4. 消息顺序保证
- 分区内的消息顺序:Kafka 保证同一个分区内的消息是有序的。这对于需要顺序处理消息的应用非常重要,如事件溯源、日志分析等场景。
- 基于键的分区:生产者可以通过消息键(Key)将相关的消息发送到同一个分区,确保这些消息的顺序性。
5. 消费者组和负载均衡
- 分区分配:Kafka 的消费者组会自动将分区分配给组内的消费者。每个分区只能被一个消费者处理,从而实现负载均衡。这样,当消费者数量增加时,可以自动分配更多的分区给新加入的消费者,从而实现消费者的动态扩展。
- 容错性:当某个消费者失效时,其处理的分区可以自动被其他消费者接管,确保消费过程的连续性。
6. 优化资源利用
- 节省资源:Kafka 分区允许将数据分布在多个 broker 上,利用各个 broker 的存储和计算资源,从而提高了集群的整体资源利用率。
- 局部性优化:在分布式部署中,分区可以根据数据的地理位置或其他逻辑分区进行划分,优化数据的局部性,提高访问效率
Kafka 生产者可以通过多种方式来提高吞吐量。以下是一些关键的优化策略:
- 批量大小(
batch.size
):生产者可以将多个消息聚合成一个批次,批量发送到 Kafka broker。通过设置batch.size
参数,生产者可以控制批次的大小,批量发送能减少网络请求的频率,从而提高吞吐量。 - 等待时间(
linger.ms
):生产者可以通过设置linger.ms
参数来指定等待一段时间后再发送消息,以便更多的消息可以被聚合到一个批次中。这种方法在负载较低时特别有用,可以增加批量的大小,从而提高吞吐量。 - 消息压缩(
compression.type
):生产者可以对消息进行压缩(支持 gzip、snappy、lz4、zstd 等压缩算法),从而减少消息传输的大小,提高网络利用率。压缩后的消息更容易打包成较大的批次,进一步提高吞吐量。 - 最大请求大小(
max.request.size
):通过调整max.request.size
参数,可以控制单个请求中可以发送的最大消息量,以便在网络条件允许的情况下发送更多数据 - 重试次数(
retries
):增加重试次数可以提高生产者在网络不稳定情况下的鲁棒性,从而减少消息发送失败的可能性。 - 请求超时时间(
request.timeout.ms
):设置合理的请求超时时间,以便在网络延迟时,生产者不会过早地放弃等待,导致重试或失败。 - 缓冲区大小(
buffer.memory
):增加生产者的缓冲区大小,使其能够容纳更多的待发送消息,减少因缓冲区满而阻塞的情况。 - 线程池优化:调节生产者内部的线程池大小,以适应高负载环境下的并发处理需求。
- 幂等性(
enable.idempotence
):开启幂等性可以确保消息的唯一性,但也可能稍微降低吞吐量。在追求极致吞吐量时,可以根据需求选择是否开启。 - 事务支持:对于需要事务支持的场景,可以配置生产者以支持事务,但需要权衡吞吐量和数据一致性。
acks
Kafka 生产者在发送消息时,可以设置不同的应答级别(Acknowledgment level,简称
acks
),以平衡数据可靠性和吞吐量。常见的应答级别有以下几种:
1. **
acks=0
**
- 无应答:生产者在发送消息后不会等待任何 broker 的确认。这样可以获得最高的吞吐量,因为生产者不会因为等待确认而被阻塞。
- 缺点:消息可能会在传输过程中丢失,因为生产者不会知道消息是否成功到达 broker。这种设置适用于对数据丢失不敏感的场景。
2. **
acks=1
**
- Leader 确认:生产者在发送消息后只等待分区的 leader broker 确认消息已经写入到本地日志中,然后就可以继续发送下一条消息。
- 优点:相比
acks=0
,这种模式提供了更高的数据可靠性,因为生产者至少得到了消息已经到达 broker 的确认。 - 缺点:如果 leader 在消息被 follower 同步之前崩溃,消息可能会丢失。该设置适合需要一定可靠性但不需要完全保证的场景。
3. **
acks=all
(或
acks=-1
)**
- 全副本确认:生产者在发送消息后,会等待所有 ISR(In-Sync Replicas,同步副本)对消息的确认。这意味着消息已经被写入到所有同步副本中。
- 优点:这种设置提供了最高的数据可靠性,确保消息即使在 leader 崩溃的情况下也不会丢失。
- 缺点:由于需要等待所有同步副本的确认,消息的发送延迟会增加,吞吐量可能会受到影响。适合需要强数据一致性的场景。
选择应答级别的权衡
- **
acks=0
**:适合需要最大化吞吐量且对数据丢失不敏感的场景。 - **
acks=1
**:适合需要较高吞吐量,同时对数据可靠性要求中等的场景。 - **
acks=all
**:适合需要确保数据不丢失,优先保证可靠性而非吞吐量的场景。
幂等性
Kafka 的幂等性(Idempotence)是生产者的一项特性,旨在确保消息不会被重复写入。启用幂等性后,即使由于网络故障或重试机制导致生产者多次发送同一条消息,也能保证消息只会被写入一次。幂等性主要解决重复消息问题,是在高可靠性场景下常用的特性。
Kafka 生产者在开启幂等性后,会为每个生产者实例分配一个唯一的
PID
(Producer ID),并为每个分区的每个消息维护一个序列号。当生产者向某个分区发送消息时,消息会携带生产者的
PID
和序列号,Kafka broker 会根据这些信息判断消息是否已经接收过。
- 当启用幂等性时,Kafka 自动将
acks
设置为all
,以确保所有同步副本都接收到消息,从而保证数据的可靠性。 - 此外,Kafka 还会自动将
retries
参数设置为一个较大的值(通常是Integer.MAX_VALUE
),以便在失败时无限次重试,确保消息最终能够成功写入。
幂等性的限制
- 单个分区内的幂等性:Kafka 的幂等性仅对同一生产者发送到同一分区内的消息有效,无法跨分区保证幂等性。
- 事务:对于需要跨分区、跨 Topic 保证原子性的操作,可以结合 Kafka 的事务功能使用,事务功能通过
transactional.id
参数来标识一个事务性生产者。
适用场景
- 避免消息重复:在需要严格保证消息不重复写入的场景下,幂等性是必需的。
- 金融交易系统:例如,处理金融交易、订单系统等,对数据一致性有很高要求的场景。
事务
Kafka 的事务功能允许生产者在多个分区和多个主题(Topic)上进行原子性消息写入。这意味着要么所有消息都成功写入,确保数据一致性,要么所有消息都不会写入,避免部分提交导致的数据不一致问题。
事务的主要功能
- 跨分区、跨主题的原子性写入:Kafka 事务可以确保一组消息要么全部成功写入,要么全部失败。即使涉及多个分区和多个主题,也能保证操作的原子性。
- 处理“读已提交”数据:Kafka 消费者可以配置为只读取事务已经提交的数据,从而避免消费到未提交的事务消息。
- 事务消息与非事务消息隔离:Kafka 通过事务 ID (
transactional.id
) 将事务消息与非事务消息区分开来,确保事务消息的可靠性。
Kafka 事务的工作原理
Kafka 事务基于两阶段提交(Two-Phase Commit Protocol,2PC)实现:
- 初始化事务:生产者通过配置
transactional.id
来启动事务,该 ID 用于唯一标识一个事务性生产者。 - 发送消息:生产者开始将消息发送到指定的分区和主题。
- 预提交(Prepare Phase):当生产者准备提交事务时,Kafka 会记录所有相关分区的消息偏移量(offset),并将这些消息标记为“准备提交”。
- 提交事务(Commit Phase):如果所有相关消息都成功写入,生产者提交事务,这些消息就会对消费者可见。
- 回滚事务:如果在事务中发生错误或需要中止操作,生产者可以回滚事务,Kafka 将丢弃该事务中的所有消息,这些消息不会被消费者看到。
Properties props = new Properties(); // 设置事务 id(必须),事务 id 任意起名 properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 初始化事务 kafkaProducer.initTransactions(); // 开启事务 kafkaProducer.beginTransaction(); try { // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { // 发送消息 kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i)); } // int i = 1 / 0; // 提交事务 kafkaProducer.commitTransaction(); } catch (Exception e) { // 终止事务 kafkaProducer.abortTransaction(); } finally { // 5. 关闭资源 kafkaProducer.close(); } }
使用场景
Kafka 事务主要适用于需要严格数据一致性的场景,例如:
- 金融系统:例如支付处理系统,确保交易记录和资金转移的原子性。
- 数据库同步:在多个数据库之间同步数据时,确保数据一致性。
- 订单系统:处理订单和库存更新时,确保订单状态和库存数量的一致性。
注意事项
- 性能影响:事务机制会增加一些额外的开销,如两阶段提交的通信延迟,因此在性能要求较高的场景下需要权衡。
- 适用范围:Kafka 事务只适用于 Kafka 内部的消息写入和读取,对于跨系统的分布式事务,需要额外的分布式事务管理器。
数据有序性
Kafka 生产者的数据有序性取决于数据发送到分区的方式和配置。
分区内的顺序性
- 单分区内的顺序性:在 Kafka 中,消息在同一分区内是有序的。也就是说,如果生产者发送多条消息到同一个分区,Kafka 会按照消息发送的顺序进行写入,并且消费者也会按相同的顺序消费这些消息。
分区间的顺序性
- 跨分区的无序性:如果消息被发送到不同的分区,Kafka 不保证这些分区之间的消息顺序。由于 Kafka 的分区设计目的是为了实现并行处理和扩展性,不同分区之间的消息可能会并发写入和消费。因此,跨分区的消息顺序在生产和消费时可能会有所不同。
max.in.flight.requests.per.connection
是 Kafka 生产者配置中的一个参数,用于控制在每个连接上可以未完成的请求数量。
- 如果启用了幂等性(
enable.idempotence=true
),并且希望保证顺序性,则 Kafka 会要求max.in.flight.requests.per.connection
不能超过 5,否则顺序性可能会受到影响。 - 如果这个参数设置为 1,则即使在失败和重试的情况下,也能保证消息的顺序性,但这会降低吞吐量。
Zookeeper
Zookeeper 在 Kafka 集群中扮演着重要角色,负责存储和管理 Kafka 集群的元数据和配置信息。
- Zookeeper 中存储了当前 Kafka 集群中所有 Broker 的信息,包括每个 Broker 的 ID、IP 地址、端口等。这些信息用于协调 Broker 的加入和退出,以及确保 Broker 之间的协同工作。
- 对于每个主题,Zookeeper 还记录了分区和副本的分配信息,包括哪些 Broker 是分区的 Leader,以及哪些 Broker 是副本。
- Zookeeper 记录了当前集群中的 Controller 信息。Controller 是负责管理 Kafka 集群中的分区 Leader 选举和副本分配的特殊 Broker。
- Zookeeper 记录了每个分区的 ISR 列表,即当前与 Leader 副本保持同步的副本列表。这对 Kafka 的高可用性至关重要,因为 Kafka 只允许从 ISR 中选举新的 Leader。
- Zookeeper 存储了消费者组的信息,包括消费者组中的成员和每个成员消费的偏移量(对于旧版 Kafka,0.9 及之前版本)。
Kafka 正在逐步抛弃 Zookeeper 的原因主要涉及到以下几个方面:
- 减少依赖:Zookeeper 是一个独立的分布式协调服务,Kafka 依赖它来管理元数据、协调集群、处理分区的领导者选举等任务。这种依赖增加了 Kafka 部署和运维的复杂性。
- 统一架构:Kafka 在移除 Zookeeper 之后,采用 Kafka 自身的 Quorum Controller(基于 Raft 协议)来管理元数据和集群状态,使得整个系统架构更为一致。
- 延迟和吞吐量:使用 Zookeeper 时,Kafka 需要通过网络与 Zookeeper 交互,这增加了元数据更新和 Leader 选举的延迟。
- 扩展性:Zookeeper 的扩展性和性能在大规模 Kafka 集群中可能成为瓶颈。
- 一致性保障:Zookeeper 基于 Paxos 协议实现一致性,但由于架构复杂,某些极端情况下可能出现一致性问题。Kafka 新引入的 Quorum Controller 基于 Raft 协议,能提供更简单和可靠的一致性保障。
- 减少分区脑裂问题:在某些故障场景下,Zookeeper 可能导致 Kafka 集群发生分区脑裂问题,导致不同节点对集群状态有不同的看法。通过使用 Kafka 自身的 Quorum Controller,可以更好地避免这种情况。
- 动态配置:在新架构中,Kafka 可以更轻松地支持动态配置更新,不需要依赖 Zookeeper 的复杂配置管理。
Kafka Broker
Kafka Broker 是 Kafka 集群中的核心组件,负责处理生产者和消费者的请求,并管理消息的存储和转发。
以下是 Kafka Broker 的总体工作流程:
1.启动并注册
- 注册到 Zookeeper(旧版本)或 Raft Controller(新版本):Kafka Broker 启动时,会向 Zookeeper 或 Raft Controller 注册自己,并获取集群中的元数据,如其他 Broker 的信息、主题和分区的配置等。
- 初始化存储:Kafka Broker 会初始化本地存储系统,准备好用于存储消息的日志文件。
2. 处理生产者请求
- 接收消息:生产者通过 TCP 连接向 Kafka Broker 发送消息。Broker 会根据消息的 Key 和分区器决定消息应存储到哪个分区。
- 写入日志文件:Broker 将接收到的消息追加到相应分区的日志文件中。Kafka 的日志文件是顺序写入的,这使得写操作非常高效。
- 同步副本:如果该分区有多个副本,Leader Broker 会将消息同步到其他副本(Follower)。只有当所有副本都确认接收到消息后,Leader 才会向生产者发送 ACK。
3. 管理分区和副本
- 分区和副本的分配:Kafka Broker 根据集群配置和负载均衡策略,分配主题的分区和副本。在分区的 Leader 失效时,Zookeeper 或 Raft Controller 会选择新的 Leader。
- ISR 列表管理:Kafka Broker 维护每个分区的 ISR(In-Sync Replicas)列表,记录当前与 Leader 同步的副本列表。
4. 处理消费者请求
- 消费者订阅:消费者订阅主题后,Broker 会根据消费者组的偏移量记录,找到消费者应消费的分区和偏移量。
- 消息读取:Broker 从指定的分区和偏移量开始读取消息,并通过 TCP 连接发送给消费者。Kafka 的读取是 O(1) 复杂度,因此读取性能很高。- 顺序写入:Kafka 的消息是以顺序方式追加到分区日志文件中的,每条消息都有一个递增的偏移量。这种顺序写入方式避免了随机写入的磁盘寻道开销。- 顺序读取:消费者读取消息时,通过偏移量直接定位到日志文件中的特定位置,从而实现顺序读取,这样可以充分利用磁盘的顺序读写优势,提高 IO 性能。- 基于偏移量的寻址:Kafka 的每条消息都有唯一的偏移量。消费者通过指定偏移量读取消息时,Kafka 只需根据偏移量直接定位到日志文件中的相应位置,并开始读取消息,而无需遍历文件内容。这种基于偏移量的寻址方式使得读取操作的时间复杂度为 O(1)。- 稀疏索引:Kafka 的日志文件使用稀疏索引,即只有部分偏移量有索引项,对于其他偏移量则通过顺序读取快速定位。稀疏索引既减少了内存消耗,又能保证读取速度。
- 更新消费偏移量:消费者消费完消息后,Broker 会更新消费者组的偏移量记录,确保消息不会被重复消费。
5. 日志清理
- 日志压缩和清理:Kafka Broker 定期对日志文件进行清理或压缩,以释放磁盘空间。根据配置,Kafka 可以执行基于时间的清理或基于 Key 的压缩(保留每个 Key 的最新消息)。
6. 处理请求和响应
- 网络请求处理:Kafka Broker 内部使用线程池来处理网络请求,包括处理生产者的写入请求、消费者的读取请求以及管理请求(如元数据查询)。
- 响应客户端:Broker 在完成处理后,生成响应并通过网络返回给客户端。
7. 监控和管理
- 度量和监控:Kafka Broker 会收集各种运行指标,如请求延迟、吞吐量、磁盘使用等。运维人员可以通过这些指标来监控 Kafka 集群的健康状况。
- 动态配置管理:Broker 支持在运行时动态更新配置,管理员可以在不中断服务的情况下调整 Kafka 的行为。
8. 故障处理
自动故障恢复:当某个 Broker 失效时,Zookeeper 或 Raft Controller 会重新分配该 Broker 管理的分区和副本,确保集群的高可用性。
数据恢复:在 Broker 恢复后,它会自动从其他副本同步缺失的数据,重新加入 ISR 列表。
AR(Assigned Replicas)是所有副本的集合。
ISR(In-Sync Replicas)是与 Leader 副本保持同步的副本集合。
OSR(Out of Sync Replicas)是未与 Leader 副本保持同步的副本集合。
Follower故障处理细节
Leader故障处理细节
自动平衡
kafka文件存储机制
以下是 Kafka 文件存储机制的详细解释:
1. 分区与日志文件
- 分区(Partition): - 每个 Kafka 主题(Topic)可以有多个分区,每个分区是一个独立的日志文件。这些分区分布在不同的 Kafka Broker 上。
- 日志文件(Log File): - 每个分区的数据在磁盘上存储为一个日志文件,所有消息按顺序追加到这个日志文件中。Kafka 的日志文件是分段存储的,每个日志段(Log Segment)都是一个物理文件。
2. 日志段与索引
- 日志段(Log Segment):- 为了管理超大规模的数据,Kafka 会将每个分区的日志文件拆分为多个日志段文件。每个日志段文件都是固定大小(默认为 1GB),每个段内的消息是按顺序存储的。
- 段文件结构:- 每个日志段由两个文件组成:一个是实际的日志数据文件(以
.log
为后缀),另一个是偏移量索引文件(以.index
为后缀)。此外,还有一个时间戳索引文件(以.timeindex
为后缀),用于快速定位基于时间的消息。 - 索引文件:- Kafka 使用稀疏索引技术。索引文件中只保存一部分消息的偏移量与实际物理位置的映射,这样可以减少索引文件的大小,同时保持良好的查找性能。
文件清理策略
消费者
Kafka 消费者的工作流程涉及从集群中拉取消息、处理消息、提交偏移量等步骤。以下是 Kafka 消费者的总体工作流程的详细说明:
1. 消费者启动
- 配置初始化:消费者在启动时,需要配置一系列参数,包括 Kafka 集群的
bootstrap.servers
地址、消费的主题(Topic)、消费者组(Consumer Group)等。 - 连接 Kafka 集群:消费者根据配置的
bootstrap.servers
地址连接到 Kafka 集群。Kafka 集群会返回可用的 Broker 信息,供消费者进行后续的分区分配和消息拉取。
2. 分区分配
- 消费者组协调(Group Coordinator):每个消费者组都有一个协调者(Group Coordinator),负责管理消费者组内的分区分配。消费者启动后,首先会向协调者注册,表明自己属于哪个消费者组。
- 分区分配策略: - Range、RoundRobin、Sticky 等:协调者会根据消费者组内的消费者数量和分区数量,按照配置的分区分配策略,将主题中的分区分配给组内的消费者。每个分区只能分配给一个消费者实例。- 分区重新平衡(Rebalance):当消费者组内的消费者数量发生变化(例如有消费者加入或退出组),或者主题的分区数量发生变化时,协调者会触发分区重新平衡,将分区重新分配给消费者。
3. 消息拉取
- 拉取消息(Pull Model):消费者会根据分配到的分区,从 Kafka Broker 中主动拉取消息。拉取操作是由消费者发起的,消费者决定何时拉取、拉取多少消息。
- 批量拉取:消费者可以设置参数如
fetch.min.bytes
和fetch.max.wait.ms
来控制每次拉取的消息量和等待时间,从而优化网络和处理性能。
4. 消息处理
- 消息消费:消费者从 Broker 拉取消息后,将消息传递给应用程序进行处理。消息处理的逻辑由应用程序自定义,可能涉及数据解析、业务处理、存储等操作。
- 幂等性处理:在处理消息时,如果需要确保操作的幂等性,应用程序需要设计相应的逻辑,避免消息重复消费带来的问题。
5. 偏移量提交
- 偏移量管理: - 自动提交(auto.commit.enable):如果开启自动提交,消费者会定期将当前的偏移量提交到 Kafka 中,提交的时间间隔由
auto.commit.interval.ms
参数控制。- 手动提交:如果需要更精细地控制偏移量的提交时机,消费者可以通过手动提交的方式,在消息处理完成后提交偏移量。 - 提交方式: - 同步提交:消费者调用
commitSync()
方法提交偏移量,该方法会阻塞直到偏移量提交成功为止。- 异步提交:消费者调用commitAsync()
方法提交偏移量,该方法会异步提交偏移量,不会阻塞当前线程。
6. 故障处理
- 消费者故障: - 如果消费者在处理过程中发生异常,可能会重新启动或重新平衡分区分配。重新启动的消费者会从上次提交的偏移量处继续消费,确保数据不会丢失。
- Broker 故障: - 如果消费者无法连接到 Broker,它会不断尝试重连,直到恢复连接。当 Broker 恢复时,消费者会继续消费消息。
7. 消费者停止
- 优雅关闭:消费者在关闭时,需要优雅地退出消费者组,并提交最后的偏移量。这可以通过
consumer.close()
方法来实现,确保关闭过程中的数据不会丢失或重复消费。
生产者的推模式(Push Model)
- 定义:生产者负责将消息推送到 Kafka 中。消息发送到特定的主题(Topic),并根据分区策略分配到相应的分区中。
- 特点: - 控制权在生产者:生产者决定消息的发送时间、发送到的主题和分区。它可以根据消息的
key
或者分区策略,决定将消息发送到哪个分区。- 优化性能:生产者可以批量发送消息,以提高吞吐量。它可以通过设置批量大小(batch.size
)和延迟(linger.ms
)来控制批量发送的行为。- 灵活性:生产者可以根据网络条件、负载情况调整消息的发送频率和大小,灵活应对各种使用场景。
消费者的拉模式(Pull Model)
- 定义:消费者从 Kafka 中拉取消息。消费者根据自己的消费能力和需求,主动向 Kafka 请求消息。
- Kafka的消息获取模式是pull(拉)模式,即消费者主动从broker中拉取数据。这种模式的优点在于消费者可以控制消息的获取速率,避免因为broker决定消息发送速率而难以适应所有消费者的消费速率的问题。然而,这种模式的不足之处在于,如果没有数据可拉取,消费者可能会陷入循环中,一直返回空数据,这可能会导致资源的浪费。为了减轻拉模式的缺点,Kafka采用了长轮询机制。在这种机制下,当消费者请求拉取消息时,如果broker中没有满足条件的数据,broker不会立即返回空结果,而是会等待一段时间,如果在这段时间内有新消息到达,则将这些新消息返回给消费者。这种机制有效地减轻了消费者频繁请求而得不到数据的问题,提高了资源的使用效率。
消费者组
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
一个分区只能由一个消费者组中的一个消费者进行消费,这是为了确保消息的有序性和数据处理的一致性。
- 有序性:确保分区内消息按顺序消费,适用于需要严格顺序处理的场景。
- 一致性:通过分区与消费者的一对一映射,简化偏移量管理,确保数据处理的一致性。
- 资源利用:避免多消费者竞争资源,提高系统的性能和稳定性。
Rebalance(重新平衡)
是指消费者组内部对主题分区的重新分配过程。Rebalance 是 Kafka 消费者组保证高可用性和负载均衡的关键机制。当消费者组中的消费者或分区发生变化时,Kafka 会自动触发 Rebalance,以确保所有分区都被有效地分配和消费。
Rebalance 触发的场景
- 消费者加入组:- 当一个新的消费者加入现有的消费者组时,Kafka 会触发 Rebalance。此时,组内的分区会被重新分配,以便新加入的消费者能够消费一部分分区的数据。
- 消费者离开组:- 当某个消费者实例意外退出(如崩溃、网络中断等)或正常关闭时,Kafka 会触发 Rebalance,将原本分配给该消费者的分区重新分配给组内的其他消费者,以确保分区不会出现空闲无人消费的情况。
- 主题的分区数发生变化:- 如果主题的分区数量增加或减少,Kafka 也会触发 Rebalance,以便重新分配分区,确保消费者组能够处理所有分区的数据。
- 消费者组协调者(Group Coordinator)变更:- 在某些情况下,消费者组的协调者(Group Coordinator)发生变更时,Kafka 也会触发 Rebalance,以重新分配分区。
- 消费者消费超时
Rebalance 的过程
- 暂停消息消费:- 在 Rebalance 期间,消费者会暂停从分区中拉取消息,以避免在分区重新分配过程中发生消息丢失或重复消费的情况。
- 分区重新分配:- 组协调者(Group Coordinator)负责将主题的分区重新分配给消费者组中的各个消费者。分配的策略可以是 Range、RoundRobin、Sticky 等。- 分区重新分配后,每个消费者将收到新的分区列表,明确自己负责消费哪些分区的数据。
- 提交偏移量:- Rebalance 之前,消费者通常会提交当前处理的偏移量,以确保 Rebalance 之后能从正确的偏移量处继续消费,避免消息丢失或重复消费。
- 恢复消息消费:- 分区分配完成后,消费者恢复从新的分区中拉取消息并处理,消费者组回到正常的工作状态。
Rebalance 的影响
- 延迟:Rebalance 会导致消费者组中的消息消费暂时中断,直到重新分配完成。这可能会导致短暂的消费延迟。
- 负载均衡:Rebalance 的主要目的是为了实现分区的负载均衡,确保消费者组内的消费者能够公平地分配分区,避免资源浪费。
- 容错性:Rebalance 还提升了消费者组的容错性。当消费者实例意外退出时,其他消费者可以接管其分区,保证消息不丢失。
Rebalance 的优化
- Sticky 分配策略:Sticky 分配策略会尽量保持分区分配的一致性,避免在 Rebalance 时频繁变动分区的归属,减少 Rebalance 的开销。
- 心跳机制(Heartbeat):消费者定期向协调者发送心跳,如果协调者在规定时间内没有收到心跳信号,就认为消费者失联,从而触发 Rebalance。通过调节心跳间隔,可以影响 Rebalance 的触发频率。
- Session Timeout 和 Rebalance Timeout:通过调整消费者的
session.timeout.ms
和max.poll.interval.ms
等配置,可以控制 Rebalance 的频率和持续时间,以减少对消费性能的影响。
分区再分配策略
RangeAssignor
RangeAssignor 会根据消费者组中的消费者数量和主题的分区数量,将分区按顺序均匀分配给每个消费者。每个消费者可能会分配到多个连续的分区。
如果分区数量不能整除消费者数量,部分消费者可能会分配到比其他消费者更多的分区,导致负载不均衡。如果一个消费者故障,此时该消费者负责的分区会分配到另一个消费者上,负载过大。
RoundRobinAssignor
RoundRobinAssignor 会将所有分区和消费者进行排序,然后逐个将分区分配给消费者,确保每个消费者尽可能均匀地分配到分区。
StickyAssignor
StickyAssignor 通过一种粘性(Sticky)的方式分配分区,尽量保持每个消费者之前的分区分配不变,同时尽量均匀地分配新的分区。当消费者数量或分区数量变化时,也尽量减少重新分配的次数。
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,
考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区
到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分
区不变化。
offset(偏移量)
在 Kafka 中,消费者的 offset(偏移量)可以用于跟踪消费者在主题分区中读取到的最后一条消息的位置。
从 Kafka 0.9 版本开始,消费者的 offset 默认保存在 Kafka 自己的内部主题
__consumer_offsets
中。这个主题是一个特殊的 Kafka 主题,Kafka 会自动创建并管理它。
- 可以在多个消费者组之间共享偏移量。
- 消费者组重新启动时可以从上次的偏移量继续消费。
- 通过 Kafka 的复制机制保障了 offset 的高可用性。
在 Kafka 0.9 之前的版本中,消费者的 offset 是保存在 Zookeeper 中的。消费者会将它读取的偏移量定期写入到 Zookeeper 中,这种方式在性能和可靠性方面有所不足,因此在之后的版本中被弃用。
- 默认情况下,Kafka 使用内部主题
__consumer_offsets
来维护消费者的 offset,这种方式兼顾了性能和可靠性,适用于大多数使用场景。 - 对于特定需求或早期版本的 Kafka,也可以选择将 offset 保存在 Zookeeper 或其他自定义存储位置。
fetch.max.bytes默认 Default: 52428800(50m)。消费者获取服务器端一批消息最大的字节数。
max.poll.records一次 pol 拉取数据返回消息的最大条数,默认是 500 条
KRaft 的核心概念
KRaft 将 Kafka 的集群管理功能从 Zookeeper 移至 Kafka 自身,通过内置的 Raft 共识算法管理集群的元数据和控制器选举。
- Raft 共识算法: - KRaft 使用 Raft 共识算法来确保集群元数据的一致性。Raft 是一种分布式共识算法,能够在多副本之间达成一致性决策。
- 单节点控制器: - 在 KRaft 模式下,Kafka 的控制器(Controller)角色由集群中的某一个节点扮演。控制器负责管理分区的 Leader 选举、分区的状态变更等。
- 元数据日志(Metadata Log): - KRaft 引入了元数据日志的概念,将集群的元数据变更记录到一个特殊的日志中。所有 Kafka Broker 通过 Raft 协议保持对元数据日志的副本的一致性。
KRaft 的优势
- 消除了对 Zookeeper 的依赖:- 通过 KRaft,Kafka 可以完全独立于 Zookeeper 运行,减少了运维复杂度。
- 一致性增强:- 由于 KRaft 使用了 Raft 共识算法,Kafka 的元数据一致性和可靠性得到了增强。
- 简化架构:- KRaft 将集群管理和元数据维护直接集成到 Kafka Broker 中,简化了系统架构。
- 可扩展性增强:- 通过移除 Zookeeper,Kafka 在大规模集群中的可扩展性得到了进一步提升。
Kafka不再依赖外部框架,而是能够独立运行;
controller管理集群时,不再需要从zookeeper中先读取数据,集群性能上升;
由于不依赖zookeeper,集群扩展时不再受到zookeeper读写能力限制;
controller不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强
controller节点的配置,而不是像以前一样对随机controller节点的高负载束手无策。
版权归原作者 大专er 所有, 如有侵权,请联系我们删除。