前言
从Kafka的角度来看:
- Kafka是一个分布式的、基于发布-订阅模式的消息队列系统,用于构建实时数据流管道和流应用程序。
- Kafka使用主题(Topic)来组织和存储消息,生产者(Producer)向主题发布消息,消费者(Consumer)从主题订阅并消费消息。
- Kafka具有高吞吐量、低延迟、高可扩展性和高可用性等特点,能够处理大规模的实时数据流。
- Kafka采用分区(Partition)机制,将主题划分为多个分区,以实现并行处理和负载均衡。
- Kafka使用ZooKeeper进行分布式协调和元数据管理,保证系统的一致性和可靠性。
- Kafka提供了丰富的客户端API,支持多种编程语言,方便应用程序与Kafka集群进行交互。
从源码的角度来看:
- Kafka源码采用Scala和Java编程语言编写,遵循面向对象和函数式编程的设计原则。
- 源码的核心组件包括: - Broker:Kafka集群中的服务器节点,负责接收、存储和分发消息。- Producer:消息生产者,负责将消息发布到Kafka集群中的主题。- Consumer:消息消费者,负责从Kafka集群中订阅和消费消息。- Controller:Kafka集群中的控制器,负责管理和协调集群的状态和元数据。- ZooKeeper:分布式协调服务,用于管理Kafka集群的元数据和协调节点之间的通信。
- 源码采用模块化设计,将系统功能划分为多个独立的模块,如网络通信、消息存储、消息分发、副本管理等。
- 源码广泛使用并发编程技术,如多线程、异步I/O、锁和同步机制等,以实现高性能和高并发处理。
- 源码重视可扩展性和可插拔性,提供了多个可配置的组件和插件机制,允许用户根据需求进行定制和扩展。
- 源码注重性能优化,采用零拷贝、批量处理、内存映射等技术,以提高系统的吞吐量和降低延迟。
- 源码遵循严格的编码规范和设计原则,如SOLID原则、设计模式等,保证代码的可读性、可维护性和可测试性。
副本管理(Replica Management)
核心类:
ReplicaManager.scala
职责:
- 管理分区的主副本和从副本:在Kafka中,每个分区有一个领导者(Leader)和多个跟随者(Follower)。
ReplicaManager
负责维护这些副本的状态和关系。 - 处理副本同步和故障转移:确保所有副本的数据一致性,并在领导者发生故障时快速选举新的领导者,以保持系统的高可用性。
关键方法:
becomeLeaderOrFollower(correlationId: Int, ...)
: 这个方法用于将某个副本角色转变为领导者或跟随者。它返回当前分区的领导者和跟随者的状态列表。fetchMessages(timeout: Long, ...)
: 该方法处理从副本中拉取消息的请求,返回指定超时时间内的消息数据。
关键概念:
- 领导者选举:当现有领导者不可用时,
ReplicaManager
会触发领导者选举,确保分区始终有一个有效的领导者。 - 数据同步:
ReplicaManager
确保所有从副本与领导者的数据保持同步,通常通过ISR(In-Sync Replica)机制实现。
控制器(Controller)
核心类:
KafkaController.scala
职责:
- 集群元数据管理:负责维护和更新整个Kafka集群的元数据,包括主题、分区、副本等信息。
- 处理Broker的上下线:监控Broker的状态,当有Broker上线或下线时,更新集群的元数据,并执行相应的操作(如重新分配分区)。
- 管理分区的领导者选举:在需要时(如Broker故障),负责触发并管理分区领导者的重新选举。
关键方法:
onBrokerStartup(id: Int): Unit
: 处理Broker启动时的逻辑,包括将其加入集群、重新分配分区等。onBrokerFailure(id: Int): Unit
: 处理Broker故障时的逻辑,重新选举分区的领导者,并将受影响的分区迁移到其他Broker。
关键概念:
- Zookeeper集成:早期Kafka使用Zookeeper进行控制器的选举和元数据管理,尽管在最新版本中,Kafka已经逐步移除了对Zookeeper的依赖。
- 高可用性:控制器本身需要具备高可用性,通常通过选举机制确保集群中只有一个活跃的控制器。
网络层(Network Layer)
核心类:
SocketServer.scala
职责:
- 处理客户端连接和请求:负责接收来自生产者、消费者以及其他客户端的网络连接和请求。
- 实现高效的非阻塞I/O:通过NIO(Non-blocking I/O)实现高效的网络通信,支持大规模的并发连接。
关键方法:
acceptNewConnections(): Unit
: 接受新的网络连接请求,并初始化相应的处理器。processNewResponses(): Unit
: 处理来自服务器的响应,将其发送回客户端。
关键概念:
- 协议通信:支持Kafka自定义的二进制协议,确保高效的数据传输。
- 性能优化:通过非阻塞I/O、批处理等技术优化网络性能,减少延迟和提升吞吐量。
消息协议(Message Protocol)
核心包:
org.apache.kafka.common.protocol
职责:
- 定义客户端和服务器之间的通信协议:包括请求和响应的格式、序列化和反序列化方式。
- 确保数据的一致性和兼容性:通过版本控制和协议升级机制,保证不同版本的客户端和服务器之间可以顺利通信。
关键概念:
- 请求/响应模型:Kafka采用请求/响应模式进行客户端和服务器之间的通信,支持多种类型的请求(如生产、消费、元数据查询等)。
- 序列化机制:使用高效的序列化协议(如Kafka自己的二进制协议)减少网络传输的开销。
生产者(Producer)
核心类:
KafkaProducer.java
职责:
- 实现消息发送逻辑:负责将生产者发送的消息传递给Kafka集群。
- 优化机制:包括分区选择、批处理、压缩等,以提升发送效率和减少网络开销。
关键方法:
public Future<RecordMetadata> send(ProducerRecord<K, V> record)
: 发送一条消息到指定的主题和分区,并返回一个Future
对象,可以用于跟踪发送结果。
关键概念:
- 分区策略:生产者可以根据键值、轮询或自定义策略选择消息要发送的分区。
- 批处理和压缩:将多条消息批量发送,并对批量数据进行压缩,减少网络传输次数和带宽使用。
消费者(Consumer)
核心类:
KafkaConsumer.java
职责:
- 实现消息消费逻辑:负责从Kafka集群中拉取并处理消息。
- 管理机制:包括分区分配、偏移量管理、心跳机制等,确保消费的可靠性和高效性。
关键方法:
public ConsumerRecords<K, V> poll(Duration timeout)
: 从Kafka拉取指定超时时间内的消息记录。
关键概念:
- 消费者组:多个消费者可以组成一个消费者组,协作消费不同的分区,实现横向扩展。
- 自动和手动提交偏移量:消费者可以选择自动提交已消费的偏移量,或手动控制偏移量的提交,以实现可靠性。
- 心跳机制:保持与Kafka集群的连接状态,防止被误认为是故障并触发重新分配分区。
流处理(Stream Processing)
核心包:
org.apache.kafka.streams
职责:
- 实现流处理DSL(Domain Specific Language):提供用于定义流处理拓扑的高级API,简化流处理应用的开发。
- 高级功能:包括状态存储、窗口操作、时间处理等,支持复杂的流处理逻辑。
关键概念:
- 状态ful处理:支持有状态的操作,如聚合、连接等,通过内置的状态存储机制(如RocksDB)保持中间状态。
- 时间语义:支持事件时间和处理时间,允许基于时间窗口进行数据分组和聚合。
连接器框架(Connectors Framework)
核心包:
org.apache.kafka.connect
职责:
- 提供数据导入导出的框架:简化与外部系统(如数据库、文件系统、消息队列等)的集成,支持数据的批量导入和导出。
- 支持自定义Source和Sink连接器:允许用户根据具体需求开发自定义的连接器,扩展Kafka Connect的功能。
关键概念:
- Source连接器:从外部系统读取数据并写入Kafka主题。
- Sink连接器:从Kafka主题读取数据并写入到外部系统。
- 分布式和独立模式:支持运行在分布式集群或单节点的独立模式,根据需要选择合适的部署方式。
安全模块(Security Module)
核心包:
org.apache.kafka.common.security
职责:
- 实现认证和授权机制:确保只有经过认证的客户端和用户可以访问Kafka集群,并根据权限控制其操作。
- 支持多种安全协议:包括SSL(Secure Sockets Layer)、SASL(Simple Authentication and Security Layer)等,提供多层次的安全保障。
关键概念:
- 认证(Authentication):验证客户端和服务器的身份,常用方法包括SSL证书认证和SASL机制(如GSSAPI、PLAIN等)。
- 授权(Authorization):基于ACL(Access Control Lists)控制用户对主题、集群和其他资源的访问权限。
- 加密传输:通过SSL/TLS对客户端和服务器之间的通信进行加密,防止数据被窃听和篡改。
下面挑我熟悉的几个讲讲
Kafka的分区日志的存储机制:
- 分区日志基本概念
特点:
- 顺序写入
- 不可变性
- 分段存储
- 偏移量索引
- 消息压缩
类似:
- 类似于写日记
- 每天一页(segment)
- 有目录(index)
- 按日期查找(offset)
- 分段存储(Segment)
结构:
segment1: [0-999] # 已封存
segment2: [1000-1999] # 已封存
segment3: [2000-...] # 活跃段
文件组成:
- .log (数据文件)
- .index (偏移量索引)
- .timeindex (时间索引)
示例:
/topic-0/
00000000000000000000.log
00000000000000000000.index
00000000000001000000.log
00000000000001000000.index
- 索引机制
稀疏索引:
offset: position
0: 0
100: 4096
200: 8192
查找过程:
1. 定位segment
2. 找最近索引项
3. 顺序扫描
示例代码:
class Index {
// 物理位置
private long position;
// 消息大小
private int size;
// 时间戳
private long timestamp;
}
- 写入流程
classLogSegment{privateFileChannel fileChannel;privateIndex index;publicvoidappend(Message message){// 写入消息long position = fileChannel.position();
fileChannel.write(message.toByteBuffer());// 更新索引if(shouldIndex(position)){
index.add(message.offset(), position);}}}
- 读取流程
classLogSegment{publicMessageread(long offset){// 查找索引Index.Entry entry = index.find(offset);// 定位文件位置
fileChannel.position(entry.position);// 读取消息ByteBuffer buffer =ByteBuffer.allocate(entry.size);
fileChannel.read(buffer);returnMessage.parse(buffer);}}
- 压缩策略
日志压缩类型:
1. 删除压缩
- 删除过期消息
- 保留最新状态
2. 合并压缩
- 合并相同key消息
- 只保留最新值
示例:
原始日志:
key1:v1, key2:v1, key1:v2, key2:v2
压缩后:
key1:v2, key2:v2
- 清理策略
classLogCleaner{publicvoidclean(){// 基于时间清理deleteSegmentsBefore(System.currentTimeMillis()- retention);// 基于大小清理while(size()> maxSize){deleteOldestSegment();}}}
- 文件格式
消息格式:
+----------------+----------------+----------------+
| Length | Header | Payload |
+----------------+----------------+----------------+
| 4 bytes | variable | variable |
索引格式:
+----------------+----------------+----------------+
| Offset | Position | Size |
+----------------+----------------+----------------+
| 8 bytes | 8 bytes | 4 bytes |
- 性能优化
classLogWriter{// 内存映射privateMappedByteBuffer buffer;// 批量写入publicvoidappendBatch(List<Message> messages){ByteBuffer batch =prepareBatch(messages);
buffer.put(batch);}// 预分配空间privatevoidpreallocate(long size){
fileChannel.truncate(size);}}
- 容错机制
classLogRecovery{publicvoidrecover(){// 检查文件完整性verifyChecksum();// 重建索引rebuildIndex();// 截断损坏部分truncateCorrupted();}}
- 并发控制
classLogManager{privatefinalLock writeLock =newReentrantLock();privatefinalMap<Integer,LogSegment> segments;publicvoidwrite(Message msg){
writeLock.lock();try{activeSegment().append(msg);}finally{
writeLock.unlock();}}}
- 监控指标
classLogMetrics{// 大小监控publiclongtotalSize();// 消息数量publiclongmessageCount();// 段文件数量publicintsegmentCount();// 写入延迟publicvoidrecordAppendLatency(long latency);}
关键设计考虑:
- 顺序写入提高性能
- 分段管理便于清理
- 索引加速查找
- 压缩节省空间
- 不变性保证一致性
Kafka Streams
- 基本定位
Kafka:
- 分布式消息队列
- 存储层和传输层
- 消息持久化
Kafka Streams:
- 流处理计算框架
- 计算层
- 实时数据处理
- 架构对比
Kafka架构:
Producer -> Broker(Topic/Partition) -> Consumer
Kafka Streams架构:
Source Topic -> Stream Processing -> Sink Topic
↓
State Store(状态存储)
- 核心概念
// Kafka核心概念-Topic(主题)-Partition(分区)-Producer(生产者)-Consumer(消费者)// Kafka Streams核心概念-KStream(记录流)-KTable(变更日志)-GlobalKTable(全局表)-Processor(处理器)
- 代码示例
// Kafka ProducerKafkaProducer<String,String> producer =newKafkaProducer<>(props);
producer.send(newProducerRecord<>("topic","key","value"));// Kafka ConsumerKafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"));// Kafka StreamsStreamsBuilder builder =newStreamsBuilder();KStream<String,String> stream = builder.stream("input-topic");
stream.filter((key, value)-> value.length()>5).mapValues(value -> value.toUpperCase()).to("output-topic");
- 状态管理
// Kafka: 无状态Consumer.poll()-> process ->Consumer.commit()// Kafka Streams: 有状态
builder.stream("input").groupByKey().count()// 状态存储.toStream().to("output");
- 处理语义
Kafka:
- At least once
- At most once
- Exactly once (事务)
Kafka Streams:
- 自动exactly-once语义
- 状态存储容错
- 自动重平衡
- 应用场景
Kafka适用:
- 消息队列
- 日志收集
- 事件总线
- 数据管道
Kafka Streams适用:
- 实时计算
- 数据清洗
- 实时ETL
- 流式统计
- 配置示例
# Kafka配置
bootstrap.servers=localhost:9092
acks=all
retries=3
# Kafka Streams配置
application.id=my-stream-app
bootstrap.servers=localhost:9092
state.dir=/tmp/kafka-streams
processing.guarantee=exactly_once
- 处理示例
// 复杂流处理StreamsBuilder builder =newStreamsBuilder();// 读取输入流KStream<String,Order> orders = builder.stream("orders",Consumed.with(Serdes.String(), orderSerde));// 处理逻辑
orders.groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5))).aggregate(()->0L,(key, order, total)-> total + order.getAmount(),Materialized.as("order-store")).toStream().to("order-totals");
- 容错和扩展
Kafka:
- 副本机制
- 分区分配
- Leader选举
Kafka Streams:
- 状态存储备份
- 任务分配
- Standby副本
- 监控指标
// Kafka指标- 生产者延迟
- 消费者延迟
- 分区数量
- 副本同步状态
// Kafka Streams指标- 处理速率
- 状态存储大小
- 重平衡次数
- 处理延迟
- 集成方式
// Spring Boot集成@EnableKafkaStreams@ConfigurationpublicclassKafkaStreamsConfig{@BeanpublicKStream<String,String>kStream(StreamsBuilder streamsBuilder){KStream<String,String> stream = streamsBuilder.stream("input");
stream.mapValues(String::toUpperCase).to("output");return stream;}}
主要区别:
- Kafka是基础设施,Streams是计算框架
- Kafka关注存储,Streams关注计算
- Kafka无状态,Streams有状态
- Kafka通用性强,Streams专注流处理
Kafka分区器(Partitioner)
Kafka分区器负责决定消息发送到哪个分区
- 分区器接口
publicinterfacePartitionerextendsConfigurable,Closeable{// 计算分区publicintpartition(String topic,// 主题Object key,// 消息keybyte[] keyBytes,// 序列化后的keyObject value,// 消息值byte[] valueBytes,// 序列化后的valueCluster cluster // 集群元数据);// 关闭资源voidclose();}
- 默认分区策略(DefaultPartitioner)
publicclassDefaultPartitionerimplementsPartitioner{publicintpartition(String topic,Object key,byte[] keyBytes){// 有key: 对key的hash值取模if(keyBytes !=null){returnUtils.toPositive(Utils.murmur2(keyBytes))% numPartitions;}// 无key: 轮询选择分区int nextValue =nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);return availablePartitions.get(nextValue % availablePartitions.size()).partition();}}
- 自定义分区器示例
publicclassCustomPartitionerimplementsPartitioner{@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){// 获取分区数List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 自定义分区逻辑if(key instanceofString){String strKey =(String) key;// 根据key的首字母分区char firstChar = strKey.charAt(0);returnMath.abs(firstChar % numPartitions);}// 默认分区returnnewRandom().nextInt(numPartitions);}}
- 常见分区策略
// 1. 轮询策略publicintroundRobin(String topic,int numPartitions){return nextCounter.getAndIncrement()% numPartitions;}// 2. 随机策略publicintrandom(int numPartitions){returnnewRandom().nextInt(numPartitions);}// 3. 按key范围分区publicintrangePartition(String key,int numPartitions){int hash =Math.abs(key.hashCode());int partitionSize =Integer.MAX_VALUE / numPartitions;return hash / partitionSize;}// 4. 地理位置分区publicintlocationBasedPartition(String location){switch(location){case"NORTH":return0;case"SOUTH":return1;case"EAST":return2;case"WEST":return3;default:returnnewRandom().nextInt(4);}}
- 生产者配置
Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer",StringSerializer.class.getName());
props.put("partitioner.class",CustomPartitioner.class.getName());KafkaProducer<String,String> producer =newKafkaProducer<>(props);
- 分区器使用场景
// 1. 按业务类型分区if(order.getType().equals("VIP")){return0;// VIP订单专用分区}else{return1;// 普通订单分区}// 2. 按地理位置分区returnMath.abs(order.getCity().hashCode())% numPartitions;// 3. 按时间分区return(int)(System.currentTimeMillis()% numPartitions);
- 分区均衡性考虑
publicclassBalancedPartitionerimplementsPartitioner{privateCounter[] partitionCounters;@Overridepublicintpartition(...){// 选择负载最小的分区int minCount =Integer.MAX_VALUE;int selectedPartition =0;for(int i =0; i < partitionCounters.length; i++){if(partitionCounters[i].get()< minCount){
minCount = partitionCounters[i].get();
selectedPartition = i;}}
partitionCounters[selectedPartition].increment();return selectedPartition;}}
- 性能优化
publicclassCachedPartitionerimplementsPartitioner{privatefinalLoadingCache<String,Integer> partitionCache;publicCachedPartitioner(){
partitionCache =CacheBuilder.newBuilder().maximumSize(10000).expireAfterWrite(1,TimeUnit.HOURS).build(newCacheLoader<String,Integer>(){@OverridepublicIntegerload(String key){returncalculatePartition(key);}});}@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){return partitionCache.getUnchecked((String)key);}}
- 监控和统计
publicclassMonitoredPartitionerimplementsPartitioner{privatefinalMap<Integer,AtomicLong> partitionStats =newConcurrentHashMap<>();privatefinalTimer partitionTimer =newTimer();@Overridepublicintpartition(...){int partition =calculatePartition();// 记录分区使用统计
partitionStats.computeIfAbsent(partition,
k ->newAtomicLong()).incrementAndGet();return partition;}publicMap<Integer,Long>getPartitionStats(){return partitionStats.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
e -> e.getValue().get()));}}
- 错误处理
publicclassSafePartitionerimplementsPartitioner{@Overridepublicintpartition(...){try{returncalculatePartition();}catch(Exception e){
log.error("Partition calculation failed", e);// 发生错误时的后备策略returnnewRandom().nextInt(numPartitions);}}}
kafka可移植的特性
- 消息批量写入机制
publicclassBatchWriter{privatefinalint batchSize;privatefinalLinkedList<ProducerRecord<K,V>> recordBatch;privatefinalint compressionType;publicvoidaddToBatch(ProducerRecord<K,V>record){
recordBatch.add(record);if(recordBatch.size()>= batchSize){flush();}}privatevoidflush(){if(recordBatch.isEmpty())return;// 1. 合并消息ByteBuffer buffer =ByteBuffer.allocate(calculateBatchSize());for(ProducerRecord<K,V>record: recordBatch){writeRecord(buffer,record);}// 2. 压缩ByteBuffer compressed =compress(buffer);// 3. 写入磁盘
channel.write(compressed);
recordBatch.clear();}}
- 零拷贝实现
publicclassZeroCopyTransfer{privatefinalFileChannel fileChannel;publiclongtransferTo(SocketChannel socketChannel,long position,long count){// 直接从文件传输到socket,避免用户空间拷贝return fileChannel.transferTo(position, count, socketChannel);}// 使用mmap实现零拷贝publicMappedByteBuffermmap(long position,long size){return fileChannel.map(FileChannel.MapMode.READ_WRITE,
position,
size
);}}
- 日志恢复机制
publicclassLogRecovery{privatefinalFile logFile;privatefinalFile checkpointFile;publicvoidrecover(){// 1. 读取检查点信息long checkpointOffset =readCheckpoint();// 2. 验证日志完整性validateLog(checkpointOffset);// 3. 截断损坏的部分truncateCorrupted();// 4. 重建索引rebuildIndex();}privatevoidrebuildIndex(){// 扫描日志文件重建索引try(FileChannel channel =newFileInputStream(logFile).getChannel()){ByteBuffer buffer =ByteBuffer.allocate(BUFFER_SIZE);long position =0;while(channel.read(buffer)!=-1){
buffer.flip();while(buffer.hasRemaining()){// 解析消息记录Recordrecord=Record.readFrom(buffer);// 更新索引updateIndex(record.offset(), position);
position +=record.size();}
buffer.clear();}}}}
- 分段合并机制
publicclassSegmentCompactor{privatefinalList<LogSegment> segments;publicvoidcompact(){// 1. 选择需要合并的段List<LogSegment> toCompact =selectSegmentsToCompact();// 2. 创建临时合并段LogSegment mergedSegment =createTemporarySegment();// 3. 合并过程Map<String,Record> latestRecords =newHashMap<>();for(LogSegment segment : toCompact){for(Recordrecord: segment.readRecords()){// 只保留每个key的最新值
latestRecords.put(record.key(),record);}}// 4. 写入合并后的记录for(Recordrecord: latestRecords.values()){
mergedSegment.append(record);}// 5. 替换原始段replaceSegments(toCompact, mergedSegment);}}
- 热点数据缓存
publicclassMessageCache{privatefinalLRUCache<Long,ByteBuffer> cache;publicByteBufferreadMessage(long offset){// 1. 尝试从缓存读取ByteBuffer cached = cache.get(offset);if(cached !=null){return cached;}// 2. 从磁盘读取ByteBuffer message =readFromDisk(offset);// 3. 更新缓存
cache.put(offset, message);return message;}privateclassLRUCache<K,V>extendsLinkedHashMap<K,V>{privatefinalint maxSize;@OverrideprotectedbooleanremoveEldestEntry(Map.Entry<K,V> eldest){returnsize()> maxSize;}}}
- 异步刷盘机制
publicclassAsyncFlusher{privatefinalScheduledExecutorService scheduler;privatefinalQueue<ByteBuffer> flushQueue;publicvoidscheduleFlush(){
scheduler.scheduleAtFixedRate(()->{if(!flushQueue.isEmpty()){flush();}},0, flushInterval,TimeUnit.MILLISECONDS);}privatevoidflush(){List<ByteBuffer> batch =newArrayList<>();
flushQueue.drainTo(batch, maxBatchSize);// 批量刷盘for(ByteBuffer buffer : batch){
channel.write(buffer);}// 强制刷盘if(forceFlush){
channel.force(false);}}}
- 存储空间预分配
publicclassPreAllocator{privatestaticfinalint BLOCK_SIZE =1024*1024;// 1MBpublicvoidpreAllocate(File file,long size){try(RandomAccessFile raf =newRandomAccessFile(file,"rw")){// 预分配文件空间
raf.setLength(size);// 写入空字节ByteBuffer zeros =ByteBuffer.allocate(BLOCK_SIZE);FileChannel channel = raf.getChannel();for(long written =0; written < size; written += BLOCK_SIZE){
channel.write(zeros);
zeros.clear();}}}}
- 存储监控和统计
publicclassStorageStats{privatefinalMap<String,Meter> writeMeters;privatefinalMap<String,Meter> readMeters;publicvoidrecordWrite(String topic,int bytes){
writeMeters.get(topic).mark(bytes);}publicvoidrecordRead(String topic,int bytes){
readMeters.get(topic).mark(bytes);}publicStorageMetricsgetMetrics(){returnnewStorageMetrics(calculateTotalBytes(),calculateWriteRate(),calculateReadRate(),calculateSegmentCount(),calculateDiskUsage());}}
这些机制共同保证了 Kafka 存储系统的:
- 高性能(批量写入、零拷贝)
- 可靠性(日志恢复、刷盘机制)
- 空间效率(压缩、合并)
- 监控能力(统计指标)
Kafka的消息存储
- 文件系统存储结构
${kafka_home}/data/ # Kafka数据目录
├── topic-0-0/ # topic名称-分区号
│ ├── 00000000000000000000.index # 偏移量索引文件
│ ├── 00000000000000000000.log # 实际数据文件
│ ├── 00000000000000000000.timeindex # 时间索引文件
│ ├── 00000000000367254.index
│ ├── 00000000000367254.log
│ └── 00000000000367254.timeindex
├── topic-0-1/
└── topic-0-2/
- 日志文件结构
publicclassLogSegment{// .log文件结构// 消息集合: 多条消息记录顺序存储// 消息记录: // offset: 8 bytes// messageSize: 4 bytes// crc: 4 bytes// magic: 1 byte// attributes: 1 byte// timestamp: 8 bytes// key length: 4 bytes// key: N bytes// value length: 4 bytes// value: N bytesprivatefinalFileChannel fileChannel;privatefinalFile file;privatevolatilelong size;publicvoidappend(ByteBuffer buffer){int written = fileChannel.write(buffer);
size += written;}}
- 内存映射文件
publicclassMappedFileQueue{privatefinalString storePath;privatefinalint mapSize;// 映射的文件大小privatefinalList<MappedFile> mappedFiles;publicMappedFilecreateMappedFile(){File file =newFile(storePath);FileChannel channel =newRandomAccessFile(file,"rw").getChannel();// 创建内存映射MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE,0, mapSize);returnnewMappedFile(file, mappedByteBuffer);}}
- 页缓存使用
publicclassPageCache{// 利用操作系统的页缓存publicvoidwriteMessage(byte[] message){// 直接写入文件系统// 操作系统会自动将热数据缓存到页缓存中FileOutputStream fos =newFileOutputStream(logFile);
fos.write(message);// 强制刷盘(取决于配置)if(flushOnWrite){
fos.getFD().sync();}}}
- 消息索引
publicclassIndexFile{// .index文件结构// 每个索引项固定20字节// relativeOffset: 4 bytes (相对baseOffset的偏移量)// position: 4 bytes (消息在日志文件中的物理位置)// timestamp: 8 bytes (消息时间戳)// size: 4 bytes (消息大小)privatefinalMappedByteBuffer mmap;privatefinallong baseOffset;publicvoidappend(long offset,int position){// 写入索引项
mmap.putInt((int)(offset - baseOffset));
mmap.putInt(position);}}
- 消息在磁盘上的分布
publicclassMessageDistribution{// 消息分布策略// 1. 分区分布publicintassignPartition(String key,int numPartitions){if(key ==null){// 轮询分配return nextPartition++% numPartitions;}// 根据key哈希分配returnMath.abs(key.hashCode())% numPartitions;}// 2. 日志段管理publicLogSegmentrollNewSegment(long baseOffset){// 当前日志段满足滚动条件时创建新的日志段String fileName =String.format("%020d.log", baseOffset);returnnewLogSegment(newFile(dir, fileName), baseOffset);}}
- 消息清理机制
publicclassLogCleaner{// 删除策略publicvoiddeleteByRetention(){long retention =System.currentTimeMillis()- retentionMs;
segments.forEach(segment ->{if(segment.lastModified()< retention){
segment.delete();}});}// 压缩策略publicvoidcompactLog(){Map<String,Message> messageMap =newHashMap<>();// 只保留每个key的最新值
segments.forEach(segment ->{
segment.read().forEach(message ->{
messageMap.put(message.key(), message);});});}}
- 副本存储
publicclassReplicaManager{privatefinalMap<TopicPartition,Replica> allReplicas;// 副本同步publicvoidreplicateMessages(TopicPartition tp,long offset){Replica replica = allReplicas.get(tp);// 从leader获取消息ByteBuffer messages =fetchFromLeader(tp, offset);// 写入本地存储
replica.append(messages);}}
- 存储配置
# 存储相关配置
log.dirs=/path/to/kafka/data # 数据目录
log.segment.bytes=1073741824 # 日志段大小,默认1GB
log.retention.hours=168 # 消息保留时间,默认7天
log.retention.bytes=-1 # 消息保留大小,默认无限制
log.flush.interval.messages=10000 # 刷盘间隔消息数
log.flush.interval.ms=1000 # 刷盘间隔时间
- 存储监控
publicclassStorageMonitor{publicvoidmonitor(){// 监控指标long diskUsage =calculateDiskUsage();long segmentCount =countSegments();double writeRate =calculateWriteRate();double readRate =calculateReadRate();// 告警阈值检查if(diskUsage > diskUsageThreshold){alertDiskUsage();}}}
SpringBoot 集成 Kafka
- 基础配置
# application.ymlspring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
batch-size:16384buffer-memory:33554432consumer:group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit:false
- 生产者配置和使用
@ConfigurationpublicclassKafkaProducerConfig{@BeanpublicProducerFactory<String,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}@BeanpublicMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG,3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);return props;}@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}}// 使用示例@ServicepublicclassKafkaProducerService{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;publicvoidsendMessage(String topic,String message){
kafkaTemplate.send(topic, message).addCallback(
result -> log.info("消息发送成功: {}", message),
ex -> log.error("消息发送失败: {}", ex.getMessage()));}}
- 消费者配置和使用
@ConfigurationpublicclassKafkaConsumerConfig{@BeanpublicConsumerFactory<String,String>consumerFactory(){Map<String,Object> props =newHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"my-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");returnnewDefaultKafkaConsumerFactory<>(props);}@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);// 设置并发消费者数量
factory.setBatchListener(true);// 启用批量监听return factory;}}// 使用示例@ServicepublicclassKafkaConsumerService{@KafkaListener(topics ="my-topic", groupId ="my-group")publicvoidlisten(ConsumerRecord<String,String>record){
log.info("收到消息: {}",record.value());}// 批量消费@KafkaListener(topics ="my-topic", batch ="true")publicvoidlistenBatch(List<ConsumerRecord<String,String>> records){
records.forEach(record-> log.info("批量消费消息: {}",record.value()));}}
- 异常处理和重试机制
@ConfigurationpublicclassKafkaErrorHandlingConfig{@BeanpublicConsumerFactory<String,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(newSeekToCurrentErrorHandler(newDeadLetterPublishingRecoverer(kafkaTemplate()),newFixedBackOff(1000L,3)));// 重试3次,间隔1秒return factory;}}// 使用示例@ServicepublicclassKafkaConsumerService{@KafkaListener(topics ="my-topic")@RetryableTopic(
attempts ="3",
backoff =@Backoff(delay =1000, multiplier =2.0),
autoCreateTopics ="false")publicvoidlisten(ConsumerRecord<String,String>record){// 处理消息}}
- 事务支持
@ConfigurationpublicclassKafkaTransactionConfig{@BeanpublicProducerFactory<String,String>producerFactory(){DefaultKafkaProducerFactory<String,String> factory =newDefaultKafkaProducerFactory<>(producerConfigs());
factory.setTransactionIdPrefix("tx-");return factory;}@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}}// 使用示例@ServicepublicclassKafkaTransactionService{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;@TransactionalpublicvoidsendMessagesInTransaction(){
kafkaTemplate.executeInTransaction(operations ->{
operations.send("topic1","message1");
operations.send("topic2","message2");returnnull;});}}
- 自定义序列化/反序列化
// 自定义对象@DatapublicclassUser{privateString id;privateString name;}// 配置@ConfigurationpublicclassKafkaConfig{@BeanpublicProducerFactory<String,User>userProducerFactory(){Map<String,Object> config =newHashMap<>();
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);returnnewDefaultKafkaProducerFactory<>(config);}@BeanpublicKafkaTemplate<String,User>userKafkaTemplate(){returnnewKafkaTemplate<>(userProducerFactory());}}
- 监控和指标
packagecom.yourcompany.yourproject.config;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importio.micrometer.core.instrument.MeterRegistry;importio.micrometer.core.instrument.simple.SimpleMeterRegistry;importjava.util.Collections;@ConfigurationpublicclassKafkaMonitoringConfig{@BeanpublicMeterRegistrymeterRegistry(){returnnewSimpleMeterRegistry();}@BeanpublicKafkaListenerContainerFactory<?>kafkaListenerContainerFactory(ConsumerFactory<String,String> consumerFactory,MeterRegistry meterRegistry){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setMicrometerTags(Collections.singletonMap("myTag","myValue"));return factory;}}
本文转载自: https://blog.csdn.net/jsjbrdzhh/article/details/143610825
版权归原作者 youyouiknow 所有, 如有侵权,请联系我们删除。
版权归原作者 youyouiknow 所有, 如有侵权,请联系我们删除。