👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO 专家博主
⛪️ 个人社区:个人社区
💞 个人主页:个人主页
🙉 专栏地址: ✅ Java 中级
🙉八股文专题:剑指大厂,手撕 Java 八股文
文章目录
1. Kafka分区的基本概念
Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流应用。Kafka 的核心概念之一是分区(Partition),它在 Kafka 中扮演着至关重要的角色。以下是关于 Kafka 分区的基本概念:
Topic 和 Partition
- Topic:在 Kafka 中,消息被发布到 Topic 中。Topic 可以被视为消息的类别或主题。
- Partition:每个 Topic 可以分为多个 Partition。Partition 是 Topic 的物理分割,每个 Partition 都是一个有序的、不可变的消息序列。每个 Partition 在一个 Broker 上有副本(Replica),以实现容错和高可用性。
Partition 的特性
- 顺序性:在一个 Partition 内,消息是按顺序存储的。这意味着如果一个消费者从一个 Partition 中消费消息,它可以保证这些消息是按照它们被写入的顺序来接收的。
- 不可变性:一旦消息被写入 Partition,就不能被修改或删除(除非根据配置策略进行日志清理)。
- 独立性:不同的 Partition 之间是独立的,可以分布在不同的 Broker 上。这使得 Kafka 能够水平扩展,增加更多的 Broker 来处理更多的 Partition。
Partition 的作用
- 负载均衡:通过将 Topic 分割成多个 Partition,并将这些 Partition 分布在不同的 Broker 上,Kafka 能够实现负载均衡,提高系统的吞吐量。
- 容错性:每个 Partition 可以有多个副本(Replicas)。其中一个副本是 Leader,负责处理所有的读写请求;其他副本是 Follower,只负责同步 Leader 的数据。如果 Leader 宕机,Kafka 会自动从 Follower 中选举一个新的 Leader。
- 并行处理:多个消费者可以同时从同一个 Topic 的不同 Partition 中消费消息,从而实现并行处理。每个消费者组中的消费者可以独立地消费不同的 Partition,提高处理速度。
Partition 的管理
- 创建 Topic 时指定 Partition 数量:可以在创建 Topic 时指定 Partition 的数量。例如,使用
kafka-topics.sh
命令:kafka-topics.sh --create --topic my-topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
- 动态调整 Partition 数量:Kafka 允许在运行时增加 Partition 的数量,但不能减少。增加 Partition 数量通常需要重新分配 Partition,这可能会导致短暂的服务中断。
- Leader 选举:当 Leader 宕机时,Kafka 会从 ISR(In-Sync Replicas)列表中选择一个新的 Leader。ISR 列表包含与 Leader 保持同步的所有副本。
Partition 的配置
- Replication Factor:每个 Partition 可以有多个副本,以提高容错性。复制因子(Replication Factor)指定了每个 Partition 应该有多少个副本。
- **ISR (In-Sync Replicas)**:ISR 列表包含了所有与 Leader 保持同步的副本。只有 ISR 列表中的副本才有资格被选为新的 Leader。
- **HW (High Watermark)**:HW 是 Partition 中最小的 LEO(Log End Offset),即 ISR 中所有副本都已同步的数据位置。消费者只能读取 HW 之前的消息。
Partition 的使用场景
- 日志聚合:Kafka 可以用来收集和处理大规模的日志数据。
- 消息队列:Kafka 可以用作可靠的消息队列,支持发布/订阅模式。
- 事件驱动架构:Kafka 可以作为事件驱动架构的核心组件,用于解耦系统和服务之间的通信。
- 流处理:Kafka 可以与流处理框架(如 Apache Flink、Apache Spark Streaming)集成,实现实时数据处理。
Kafka 的 Partition 机制是其高性能、可扩展性和容错性的关键。通过将 Topic 分割成多个 Partition,并将这些 Partition 分布在不同的 Broker 上,Kafka 能够实现负载均衡和高吞吐量。此外,通过多副本机制,Kafka 提供了高可用性和容错性,确保即使部分 Broker 故障,系统仍能继续运行。理解 Partition 的基本概念对于设计和优化 Kafka 应用至关重要。
2.分区的高可用性设计
Kafka 的分区(Partition)高可用性设计是确保数据可靠性和系统稳定性的关键。通过多副本机制和 Leader 选举,Kafka 能够在单个 Broker 故障时继续提供服务,并且保证数据不丢失。以下是 Kafka 分区高可用性的主要设计要点:
多副本机制
- Replication Factor:每个 Partition 可以有多个副本(Replicas),通常配置为 2 或 3 个副本。其中一个副本是 Leader,负责处理所有的读写请求;其他副本是 Follower,只负责同步 Leader 的数据。
- **ISR (In-Sync Replicas)**:ISR 列表包含了所有与 Leader 保持同步的副本。只有 ISR 列表中的副本才有资格被选为新的 Leader。如果某个 Follower 无法跟上 Leader 的速度,它会被从 ISR 列表中移除。
Leader 选举
- 自动选举:当 Leader 宕机或不可用时,Kafka 会自动从 ISR 列表中选择一个新的 Leader。这个过程由 Controller 负责管理。
- 最小 LEO 选择:通常情况下,会选择 ISR 列表中 Log End Offset (LEO) 最小的副本作为新的 Leader,以确保数据的一致性。
数据复制
- 同步复制:Follower 会定期从 Leader 拉取数据并进行同步。一旦 Follower 同步了 Leader 的数据,它就会向 Leader 发送确认消息。
- 异步复制:为了提高性能,Follower 的同步可以是异步的。但是,这可能会导致短暂的数据不一致。
容错性
- 故障检测:Controller 通过心跳机制或其他方式检测到某个 Broker 宕机或变得不可用。
- 重新分配:一旦检测到 Leader 故障,Controller 会从 ISR 列表中选择一个新的 Leader,并更新集群的元数据。
日志清理和保留策略
- 日志清理:Kafka 支持基于时间或大小的日志清理策略。过期的日志会被删除,但至少会保留一个最小的时间段,以确保数据的持久性。
- 日志压缩:对于某些需要长期保存的关键数据,Kafka 支持日志压缩功能,只保留每条消息的最新版本。
网络隔离和脑裂
- 网络分区:在网络分区的情况下,可能会出现多个子网中的 Broker 都认为自己是 Leader 的情况。Kafka 通过 Zookeeper 来协调这种情况,确保只有一个有效的 Leader。
- 多数原则:在选举新的 Leader 时,Kafka 会遵循“多数原则”,即新的 Leader 必须获得大多数 ISR 副本的支持。
配置参数
- **
min.insync.replicas
**:设置 ISR 列表中必须存在的最小副本数。如果 ISR 中的副本数少于这个值,Broker 将拒绝生产者的写入请求。 - **
replica.lag.time.max.ms
**:设置 Follower 与 Leader 之间允许的最大滞后时间。超过这个时间,Follower 会被从 ISR 列表中移除。 - **
unclean.leader.election.enable
**:控制是否允许从不在 ISR 列表中的副本中选举新的 Leader。默认情况下,这是禁用的,以避免数据丢失。
监控和告警
- 监控工具:使用监控工具(如 Prometheus、Grafana 等)来监控 Kafka 集群的状态,包括 Broker 的健康状况、ISR 列表的变化、Leader 选举事件等。
- 告警机制:设置告警规则,当检测到异常情况(如 ISR 列表为空、Leader 选举频繁等)时,及时通知运维人员。
Kafka 的分区高可用性设计通过多副本机制、Leader 选举、数据复制和日志管理等措施,确保了系统的可靠性和稳定性。合理的配置和监控可以帮助你更好地管理和维护 Kafka 集群,确保在故障发生时能够快速恢复服务。理解这些设计要点对于构建高可用的 Kafka 应用至关重要。
3. 副本同步与 ISR
在 Apache Kafka 中,副本同步和 ISR(In-Sync Replicas)是确保数据一致性和高可用性的关键机制。下面是关于副本同步与 ISR 的详细解释:
副本同步
Kafka 通过多副本机制来提高数据的可靠性和容错性。每个 Partition 可以有多个副本,其中一个副本是 Leader,负责处理所有的读写请求;其他副本是 Follower,只负责同步 Leader 的数据。
同步过程
- Follower 从 Leader 拉取数据:- Follower 会定期向 Leader 发送 Fetch 请求,请求最新的消息。- Leader 收到 Fetch 请求后,将最新的消息发送给 Follower。
- Follower 写入本地日志:- Follower 将从 Leader 获取的消息写入自己的本地日志文件。- Follower 会记录每条消息的偏移量(Offset),并更新其 Log End Offset (LEO)。
- Follower 发送确认:- Follower 在成功写入消息后,会向 Leader 发送一个确认消息(Ack),表示已经同步了这些消息。- Leader 会记录哪些 Follower 已经同步了特定的消息。
- **Leader 更新 HW (High Watermark)**:- Leader 会根据所有 Follower 的确认情况,更新 High Watermark (HW)。- HW 是所有 ISR 中最小的 LEO,表示所有 ISR 中都已同步的数据位置。
- 生产者确认:- 生产者可以选择等待所有 ISR 中的副本确认(
acks=all
)或仅等待 Leader 确认(acks=1
)。- 如果生产者设置acks=all
,则必须等到所有 ISR 中的副本都确认后才会收到成功响应。
ISR (In-Sync Replicas)
ISR 是一组与 Leader 保持同步的副本集合。只有 ISR 中的副本才有资格被选为新的 Leader。如果某个 Follower 无法跟上 Leader 的速度,它会被从 ISR 列表中移除。
ISR 的管理
- 初始 ISR:- 当 Topic 创建时,所有副本都被认为是 ISR 的一部分。
- Follower 落后:- 如果 Follower 无法及时同步 Leader 的数据,超过了配置的滞后时间(
replica.lag.time.max.ms
),它会被从 ISR 中移除。- 这个配置参数决定了 Follower 与 Leader 之间允许的最大滞后时间。 - Follower 重新加入 ISR:- 如果 Follower 能够赶上 Leader 的进度,并且满足一定的条件(如滞后时间小于配置值),它可以重新加入 ISR。
- Leader 选举:- 当 Leader 宕机或不可用时,Controller 会从 ISR 列表中选择一个新的 Leader。- 通常会选择 ISR 列表中 LEO 最小的副本作为新的 Leader,以确保数据的一致性。
配置参数
- **
min.insync.replicas
**:设置 ISR 列表中必须存在的最小副本数。如果 ISR 中的副本数少于这个值,Broker 将拒绝生产者的写入请求。 - **
replica.lag.time.max.ms
**:设置 Follower 与 Leader 之间允许的最大滞后时间。超过这个时间,Follower 会被从 ISR 列表中移除。 - **
unclean.leader.election.enable
**:控制是否允许从不在 ISR 列表中的副本中选举新的 Leader。默认情况下,这是禁用的,以避免数据丢失。 - 副本同步:Follower 通过定期从 Leader 拉取数据并写入本地日志来同步数据。Leader 会根据 Follower 的确认情况更新 High Watermark (HW)。
- ISR:ISR 是一组与 Leader 保持同步的副本集合。只有 ISR 中的副本才有资格被选为新的 Leader。Follower 会根据其同步状态动态地加入或离开 ISR。
- 配置参数:通过配置
min.insync.replicas
和replica.lag.time.max.ms
来控制 ISR 的行为,确保数据的一致性和可靠性。
理解副本同步和 ISR 的工作机制对于设计高可用的 Kafka 应用至关重要。通过合理的配置和监控,可以确保 Kafka 集群在故障发生时能够快速恢复服务,同时保证数据不丢失。
4. 数据一致性策略
在 Apache Kafka 中,数据一致性是通过多种机制来保证的,包括多副本机制、ISR(In-Sync Replicas)、Leader 选举以及生产者的确认策略。以下是一些关键的数据一致性策略:
多副本机制
- Replication Factor:每个 Partition 可以有多个副本(通常配置为 2 或 3 个)。其中一个副本是 Leader,负责处理所有的读写请求;其他副本是 Follower,只负责同步 Leader 的数据。
- 高可用性:通过多副本机制,即使某个 Broker 宕机,Kafka 也能从其他副本中选举一个新的 Leader 来继续提供服务。
ISR (In-Sync Replicas)
- 定义:ISR 是一组与 Leader 保持同步的副本集合。只有 ISR 中的副本才有资格被选为新的 Leader。
- 同步条件:Follower 必须能够及时同步 Leader 的数据,否则会被从 ISR 中移除。同步的条件由
replica.lag.time.max.ms
参数控制。 - Leader 选举:当 Leader 宕机时,Controller 会从 ISR 列表中选择一个新的 Leader,确保数据的一致性。
生产者的确认策略
acks
参数: -acks=0
:生产者发送消息后不等待任何确认,这种方式最快但最不可靠。-acks=1
:生产者发送消息后等待 Leader 的确认,只要 Leader 写入成功就认为消息已提交。这种方式平衡了性能和可靠性。-acks=all
(或-1
):生产者发送消息后等待所有 ISR 中的副本确认。这是最可靠的方式,但性能较低。
最小同步副本数
- **
min.insync.replicas
**:这个参数设置了 ISR 列表中必须存在的最小副本数。如果 ISR 中的副本数少于这个值,Broker 将拒绝生产者的写入请求,以避免数据丢失。
不干净的 Leader 选举
- **
unclean.leader.election.enable
**:默认情况下,这个参数是禁用的。如果启用了这个参数,Kafka 允许从不在 ISR 列表中的副本中选举新的 Leader。这可能会导致数据丢失,但在某些情况下可以提高系统的可用性。
High Watermark (HW)
- 定义:HW 是 ISR 中所有副本都已同步的数据位置。消费者只能读取 HW 之前的消息。
- 更新:Leader 会根据所有 Follower 的确认情况更新 HW。只有当所有 ISR 中的副本都确认了某条消息,该消息才会被标记为已提交。
日志清理和保留策略
- 日志清理:Kafka 支持基于时间或大小的日志清理策略。过期的日志会被删除,但至少会保留一个最小的时间段,以确保数据的持久性。
- 日志压缩:对于某些需要长期保存的关键数据,Kafka 支持日志压缩功能,只保留每条消息的最新版本。
事务支持
- 幂等性生产者:启用幂等性生产者(
enable.idempotence=true
)可以防止重复消息,确保每条消息只会被写入一次。 - 事务:Kafka 支持事务,允许生产者将多个消息作为一个原子操作进行提交。这样可以确保要么所有消息都被提交,要么都不提交。
Kafka 通过多副本机制、ISR、Leader 选举、生产者的确认策略等多种机制来保证数据的一致性和可靠性。合理配置这些参数和策略,可以确保在故障发生时数据不会丢失,并且系统能够快速恢复服务。以下是几个关键点:
- 多副本:确保数据的高可用性。
- ISR:确保只有同步的副本才能成为新的 Leader。
- 确认策略:控制生产者的写入行为,平衡性能和可靠性。
- 最小同步副本数:确保 ISR 中有足够的副本。
- 不干净的 Leader 选举:权衡数据一致性和系统可用性。
- High Watermark:确保消费者只能读取已提交的数据。
- 日志清理和保留策略:管理数据的生命周期。
- 事务支持:确保消息的幂等性和原子性。
通过这些机制,Kafka 能够在大规模分布式环境中提供可靠的数据传输和存储。理解并正确配置这些参数对于构建高一致性的 Kafka 应用至关重要。
5. 分区再均衡
在 Apache Kafka 中,分区再均衡(Rebalance)是指消费者组(Consumer Group)中的消费者重新分配 Topic 的 Partition 的过程。这个过程通常发生在以下几种情况下:
- 新消费者加入:当一个新的消费者加入到现有的消费者组时。
- 消费者离开:当一个消费者离开消费者组时,例如消费者宕机或主动退出。
- 订阅变更:当消费者组订阅的 Topic 或者 Topic 的 Partition 数量发生变化时。
分区再均衡的过程
- 触发条件
- 新消费者加入:新的消费者向 Broker 发送心跳,Broker 检测到新的消费者并触发再均衡。
- 消费者离开:Broker 在一段时间内没有收到某个消费者的心跳,认为该消费者已经离开,并触发再均衡。
- 订阅变更:消费者组订阅的 Topic 或 Partition 数量发生变化时,Broker 会触发再均衡。
- 协调器(Coordinator)的作用
- 检测变化:Kafka 集群中的每个 Broker 都可以作为消费者组的协调器。协调器负责检测消费者组的变化,并触发再均衡。
- 发送 Rebalance 请求:当检测到上述变化之一时,协调器会向所有消费者发送 Rebalance 请求。
- 消费者的行为
- 停止消费:收到 Rebalance 请求后,消费者会停止从当前分配的 Partition 中消费消息。
- 提交偏移量:消费者会提交当前已处理的消息偏移量,以确保在再均衡后可以从正确的位置继续消费。
- 等待再均衡完成:消费者进入
REBALANCING
状态,等待再均衡完成。
- 再均衡策略
- Range 策略(默认策略): - 将 Partition 划分为连续的范围,并将这些范围分配给消费者。- 适用于消费者数量少于 Partition 数量的情况。
- RoundRobin 策略: - 将 Partition 均匀地分配给消费者,不考虑 Partition 的顺序。- 适用于消费者数量多于或等于 Partition 数量的情况。
- 自定义策略: - 用户可以实现自定义的再均衡策略,以满足特定的需求。
- 再均衡完成
- 分配 Partition:协调器根据再均衡策略重新分配 Partition 给消费者。
- 恢复消费:消费者接收到新的 Partition 分配后,从新的位置开始消费消息。
再均衡的影响
- 短暂的停顿:在再均衡期间,消费者会暂停消费消息,这会导致短暂的服务中断。
- 重复消费:如果消费者在提交偏移量之前发生再均衡,可能会导致消息被重复消费。
- 数据丢失:如果消费者在再均衡前没有提交偏移量,可能会导致部分消息未被处理而丢失。
优化再均衡
- 减少再均衡频率:通过增加消费者的 Session Timeout 和 Heartbeat Interval,可以减少不必要的再均衡。
- 使用幂等性生产者和事务:确保消息不会被重复处理,提高数据一致性。
- 选择合适的再均衡策略:根据具体场景选择 Range 或 RoundRobin 策略,或者实现自定义策略。
- 预热时间:在消费者开始消费之前,可以设置一个预热时间,让消费者有时间进行必要的初始化操作。
分区再均衡是 Kafka 消费者组中非常重要的机制,它确保了消费者能够均匀地分配 Topic 的 Partition,从而实现负载均衡。理解再均衡的过程和影响,以及如何优化再均衡,对于构建高可用和高性能的 Kafka 应用至关重要。通过合理的配置和策略,可以减少再均衡带来的负面影响,提高系统的稳定性和可靠性。
6. 故障恢复机制
Apache Kafka 的故障恢复机制是确保系统高可用性和数据一致性的关键。Kafka 通过多种机制来处理 Broker 和消费者组的故障,确保在发生故障时能够快速恢复服务。以下是 Kafka 故障恢复的主要机制:
- Broker 故障恢复
a. 多副本机制
- Replication Factor:每个 Partition 可以有多个副本(通常配置为 2 或 3 个)。其中一个副本是 Leader,负责处理所有的读写请求;其他副本是 Follower,只负责同步 Leader 的数据。
- **ISR (In-Sync Replicas)**:ISR 是一组与 Leader 保持同步的副本集合。只有 ISR 中的副本才有资格被选为新的 Leader。
b. Leader 选举
- 自动选举:当 Leader 宕机或不可用时,Controller 会从 ISR 列表中选择一个新的 Leader。这个过程是自动的,无需人工干预。
- 最小 LEO 选择:通常情况下,会选择 ISR 列表中 Log End Offset (LEO) 最小的副本作为新的 Leader,以确保数据的一致性。
c. Zookeeper 协调
- 心跳机制:每个 Broker 会定期向 Zookeeper 发送心跳。如果 Controller 在一段时间内没有收到某个 Broker 的心跳,它会认为该 Broker 已经宕机,并触发 Leader 选举。
- 临时节点:Broker 会在 Zookeeper 中创建一个临时节点
/controller
,用于标识当前的 Controller。如果该 Broker 宕机,临时节点会被自动删除,触发新的 Controller 选举。
- 消费者组故障恢复
a. 再均衡(Rebalance)
- 新消费者加入:当一个新的消费者加入到现有的消费者组时,会触发再均衡。
- 消费者离开:当一个消费者离开消费者组时(例如消费者宕机或主动退出),会触发再均衡。
- 订阅变更:当消费者组订阅的 Topic 或者 Topic 的 Partition 数量发生变化时,会触发再均衡。
b. 协调器(Coordinator)的作用
- 检测变化:Kafka 集群中的每个 Broker 都可以作为消费者组的协调器。协调器负责检测消费者组的变化,并触发再均衡。
- 发送 Rebalance 请求:当检测到上述变化之一时,协调器会向所有消费者发送 Rebalance 请求。
c. 消费者的行为
- 停止消费:收到 Rebalance 请求后,消费者会停止从当前分配的 Partition 中消费消息。
- 提交偏移量:消费者会提交当前已处理的消息偏移量,以确保在再均衡后可以从正确的位置继续消费。
- 等待再均衡完成:消费者进入
REBALANCING
状态,等待再均衡完成。 - 恢复消费:消费者接收到新的 Partition 分配后,从新的位置开始消费消息。
- 日志清理和保留策略
- 日志清理:Kafka 支持基于时间或大小的日志清理策略。过期的日志会被删除,但至少会保留一个最小的时间段,以确保数据的持久性。
- 日志压缩:对于某些需要长期保存的关键数据,Kafka 支持日志压缩功能,只保留每条消息的最新版本。
- 事务支持
- 幂等性生产者:启用幂等性生产者(
enable.idempotence=true
)可以防止重复消息,确保每条消息只会被写入一次。 - 事务:Kafka 支持事务,允许生产者将多个消息作为一个原子操作进行提交。这样可以确保要么所有消息都被提交,要么都不提交。
- 监控和告警
- 监控工具:使用监控工具(如 Prometheus、Grafana 等)来监控 Kafka 集群的状态,包括 Broker 的健康状况、ISR 列表的变化、Leader 选举事件等。
- 告警机制:设置告警规则,当检测到异常情况(如 ISR 列表为空、Leader 选举频繁等)时,及时通知运维人员。
Kafka 通过以下几种主要机制来实现故障恢复:
- 多副本机制:通过多个副本确保数据的高可用性。
- ISR 和 Leader 选举:确保只有同步的副本才能成为新的 Leader。
- 再均衡:在消费者组发生变化时重新分配 Partition,确保负载均衡。
- 日志管理和保留策略:管理数据的生命周期,确保数据的持久性。
- 事务支持:确保消息的幂等性和原子性。
- 监控和告警:实时监控集群状态,及时发现和处理问题。
理解这些机制并合理配置相关参数,可以帮助你构建高可用和可靠的 Kafka 应用。通过合理的配置和监控,可以确保 Kafka 集群在故障发生时能够快速恢复服务,同时保证数据不丢失。
7. 用 java 模拟实现 kafka 的分区设计
为了模拟 Kafka 的分区设计,我们可以创建一个简单的 Java 程序来展示如何实现 Topic、Partition 和消息的发布与消费。这个示例将包括以下组件:
- Topic:包含多个 Partition。
- Partition:存储消息并支持顺序读写。
- Broker:管理 Topic 和 Partition。
- Producer:向指定的 Partition 发送消息。
- Consumer:从指定的 Partition 消费消息。
项目结构
kafka-simulator
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ ├── Broker.java
│ │ │ ├── Consumer.java
│ │ │ ├── Message.java
│ │ │ ├── Partition.java
│ │ │ ├── Producer.java
│ │ │ ├── Topic.java
│ │ │ └── Main.java
代码实现
Message.java
定义消息类。
packagecom.example;publicclassMessage{privatefinalString key;privatefinalString value;publicMessage(String key,String value){this.key = key;this.value = value;}publicStringgetKey(){return key;}publicStringgetValue(){return value;}@OverridepublicStringtoString(){return"Message{"+"key='"+ key +'\''+", value='"+ value +'\''+'}';}}
Partition.java
定义 Partition 类,用于存储消息。
packagecom.example;importjava.util.ArrayList;importjava.util.List;publicclassPartition{privatefinalint id;privatefinalList<Message> messages;publicPartition(int id){this.id = id;this.messages =newArrayList<>();}publicintgetId(){return id;}publicsynchronizedvoidaddMessage(Message message){
messages.add(message);}publicsynchronizedMessageconsumeMessage(){if(messages.isEmpty()){returnnull;}return messages.remove(0);}publicsynchronizedintgetMessageCount(){return messages.size();}}
Topic.java
定义 Topic 类,包含多个 Partition。
packagecom.example;importjava.util.ArrayList;importjava.util.List;publicclassTopic{privatefinalString name;privatefinalList<Partition> partitions;publicTopic(String name,int numPartitions){this.name = name;this.partitions =newArrayList<>(numPartitions);for(int i =0; i < numPartitions; i++){
partitions.add(newPartition(i));}}publicStringgetName(){return name;}publicPartitiongetPartition(int index){if(index <0|| index >= partitions.size()){thrownewIllegalArgumentException("Invalid partition index: "+ index);}return partitions.get(index);}publicintgetNumPartitions(){return partitions.size();}}
Broker.java
定义 Broker 类,管理 Topic 和 Partition。
packagecom.example;importjava.util.HashMap;importjava.util.Map;publicclassBroker{privatefinalMap<String,Topic> topics;publicBroker(){this.topics =newHashMap<>();}publicvoidcreateTopic(String name,int numPartitions){if(topics.containsKey(name)){thrownewIllegalArgumentException("Topic already exists: "+ name);}
topics.put(name,newTopic(name, numPartitions));}publicTopicgetTopic(String name){return topics.get(name);}publicbooleantopicExists(String name){return topics.containsKey(name);}}
Producer.java
定义 Producer 类,用于发送消息到指定的 Partition。
packagecom.example;publicclassProducer{privatefinalBroker broker;publicProducer(Broker broker){this.broker = broker;}publicvoidsend(String topicName,int partitionIndex,Message message){Topic topic = broker.getTopic(topicName);if(topic ==null){thrownewIllegalArgumentException("Topic does not exist: "+ topicName);}Partition partition = topic.getPartition(partitionIndex);
partition.addMessage(message);System.out.println("Sent to "+ topicName +" - Partition "+ partitionIndex +": "+ message);}}
Consumer.java
定义 Consumer 类,用于从指定的 Partition 消费消息。
packagecom.example;publicclassConsumer{privatefinalBroker broker;publicConsumer(Broker broker){this.broker = broker;}publicMessageconsume(String topicName,int partitionIndex){Topic topic = broker.getTopic(topicName);if(topic ==null){thrownewIllegalArgumentException("Topic does not exist: "+ topicName);}Partition partition = topic.getPartition(partitionIndex);Message message = partition.consumeMessage();if(message !=null){System.out.println("Consumed from "+ topicName +" - Partition "+ partitionIndex +": "+ message);}return message;}}
Main.java
主程序,演示如何使用上述类。
packagecom.example;publicclassMain{publicstaticvoidmain(String[] args){// 创建 BrokerBroker broker =newBroker();// 创建 Topic
broker.createTopic("my-topic",3);// 创建 ProducerProducer producer =newProducer(broker);// 发送消息
producer.send("my-topic",0,newMessage("key1","value1"));
producer.send("my-topic",1,newMessage("key2","value2"));
producer.send("my-topic",2,newMessage("key3","value3"));// 创建 ConsumerConsumer consumer =newConsumer(broker);// 消费消息
consumer.consume("my-topic",0);
consumer.consume("my-topic",1);
consumer.consume("my-topic",2);}}
运行结果
运行
Main.java
后,你会看到类似如下的输出:
Sent to my-topic - Partition 0: Message{key='key1', value='value1'}
Sent to my-topic - Partition 1: Message{key='key2', value='value2'}
Sent to my-topic - Partition 2: Message{key='key3', value='value3'}
Consumed from my-topic - Partition 0: Message{key='key1', value='value1'}
Consumed from my-topic - Partition 1: Message{key='key2', value='value2'}
Consumed from my-topic - Partition 2: Message{key='key3', value='value3'}
通过
Broker
、
Topic
、
Partition
、
Producer
和
Consumer
类,我们实现了基本的消息发布和消费功能。这个示例可以帮助你理解 Kafka 分区的基本原理,并为更复杂的应用提供基础。实际的 Kafka 集群会涉及更多的细节,如多副本机制、Leader 选举、ISR 管理等。
精彩专栏推荐订阅:在下方专栏👇🏻
✅ 2023年华为OD机试真题(A卷&B卷)+ 面试指导
✅ 精选100套 Java 项目案例
✅ 面试需要避开的坑(活动)
✅ 你找不到的核心代码
✅ 带你手撕 Spring
✅ Java 初阶
版权归原作者 激流丶 所有, 如有侵权,请联系我们删除。