这篇文章将主要介绍 Kafka 生产者中的 RecordAccumulator。RecordAccumlator 负责对生产者将要发送的消息进行分组缓存,其内部对每个 topic 维持了一个 TopicInfo 来存储缓存的消息,TopicInfo 内部对该 topic 的每个 partition 维持了一个队列,消息被聚合成 ProducerBatch 存储在队列中。sender 线程每次对队列中聚合的 ProducerBatch 进行发送,从而减少网络传输的资源消耗,提升性能。
1. 计算 Partition
在将消息放入 RecordAccumulator 之前,首先要计算分区 partition。在 doSend() 方法中通过调用 partition() 来计算消息的分区。
privateFuture<RecordMetadata>doSend(ProducerRecord<K,V> record,Callback callback){// Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION,// which means that the RecordAccumulator would pick a partition using built-in logic (which may// take into account broker load, the amount of data produced to each partition, etc.).int partition =partition(record, serializedKey, serializedValue, cluster);}
partition() 方法的具体实现:
privateintpartition(ProducerRecord<K,V> record,byte[] serializedKey,byte[] serializedValue,Cluster cluster){// 如果消息有 partition,则直接返回该分区if(record.partition()!=null)return record.partition();// 自定义分区器,通过自定义分区器的 partitioner.partition() 逻辑来计算分区if(partitioner !=null){int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);if(customPartition <0){thrownewIllegalArgumentException(String.format("The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));}return customPartition;}// 如果消息有 key,并且没有设定 partitionerIgnoreKeys if(serializedKey !=null&&!partitionerIgnoreKeys){// **调用 BuiltInPartitioner.partitionForKey 计算 key 的 hash 值来选择分区**returnBuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());}// 如果消息没有 key,或者设定了 partitionerIgnoreKeys// 返回 RecordMetadata.UNKNOWN_PARTITION,表明任意分区均可用// 后续会由 RecordAccumulator 来计算分区else{returnRecordMetadata.UNKNOWN_PARTITION;}}
partitionForKey() 方法中使用 murmur2 算法来对序列化后 key byte 数组计算 hash 值,然后对该 topic 的分区数量取模来选择分区。
publicstaticintpartitionForKey(finalbyte[] serializedKey,finalint numPartitions){returnUtils.toPositive(Utils.murmur2(serializedKey))% numPartitions;}
2. buffer pool 内存池
在 RecordAccumulator 中,使用了缓存池对内存进行分配和回收,缓存池的默认总大小是32M,内部由指定大小的 ByteBuffer 组成的队列 free 和一块 nonPooledAvailableMemory 组成。在分配空间时,如果大小为 ByteBuffer 的大小,可以直接将 ByteBuffer 进行分配。
// buffer pool 总大小,总大小为 nonPooledAvailableMemory的大小 加上 ByteBuffer队列free的大小privatefinallong totalMemory;// 单个 ByteBuffer 的大小privatefinalint poolableSize;privatefinalReentrantLock lock;// ByteBuffer 队列 freeprivatefinalDeque<ByteBuffer> free;// 等待分配空间的线程privatefinalDeque<Condition> waiters;// nonPooledAvailableMemory 的大小privatelong nonPooledAvailableMemory;privatefinalMetrics metrics;privatefinalTime time;privatefinalSensor waitTime;privateboolean closed;
在 doSend() 方法中会调用 allocate() 方法对消息分配内存,然后放入 RecordAccumulator。
publicByteBufferallocate(int size,long maxTimeToBlockMs)throwsInterruptedException{// 超过总大小,抛出异常if(size >this.totalMemory)thrownewIllegalArgumentException("Attempt to allocate "+ size
+" bytes, but there is a hard limit of "+this.totalMemory
+" on memory allocations.");ByteBuffer buffer =null;// 分配时加锁this.lock.lock();if(this.closed){this.lock.unlock();thrownewKafkaException("Producer closed while allocating memory");}try{// 如果大小为单个 ByteBuffer 大小,且free不为空,则直接使用 free 的第一个 ByteBuffer 分配空间if(size == poolableSize &&!this.free.isEmpty())returnthis.free.pollFirst();// 计算 free 中未分配的总空间int freeListSize =freeSize()*this.poolableSize;// 如果 nonPooledAvailableMemory 加上 free 中未分配的空间足够进行此次空间分配if(this.nonPooledAvailableMemory + freeListSize >= size){// 则将 free 中的空间分配给 nonPooledAvailableMemory 以满足此次空间分配freeUp(size);this.nonPooledAvailableMemory -= size;}else{// 剩余空间不足以进行此次分配,则会阻塞当前线程int accumulated =0;Condition moreMemory =this.lock.newCondition();try{// 线程阻塞的最大时长long remainingTimeToBlockNs =TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);this.waiters.addLast(moreMemory);// 循环等待,直到有足够的空间进行分配while(accumulated < size){long startWaitNs = time.nanoseconds();long timeNs;boolean waitingTimeElapsed;try{// 阻塞等待,并计算是否超时// Condition.await() 会让线程阻塞并释放锁,等待其他线程释放空间
waitingTimeElapsed =!moreMemory.await(remainingTimeToBlockNs,TimeUnit.NANOSECONDS);}finally{long endWaitNs = time.nanoseconds();
timeNs =Math.max(0L, endWaitNs - startWaitNs);recordWaitTime(timeNs);}if(this.closed)thrownewKafkaException("Producer closed while allocating memory");// 等待分配空间超时,抛出异常if(waitingTimeElapsed){this.metrics.sensor("buffer-exhausted-records").record();thrownewBufferExhaustedException("Failed to allocate "+ size +" bytes within the configured max blocking time "+ maxTimeToBlockMs +" ms. Total memory: "+totalMemory()+" bytes. Available memory: "+availableMemory()+" bytes. Poolable size: "+poolableSize()+" bytes");}
remainingTimeToBlockNs -= timeNs;// accumulated 为 0,此次分配还未累积空间,且需要的 size 为一个ByteBuffer的size,// 且 free 中有可分配的 ByteBuffer,则直接将 ByteBuffer 分配if(accumulated ==0&& size ==this.poolableSize &&!this.free.isEmpty()){// just grab a buffer from the free list
buffer =this.free.pollFirst();
accumulated = size;}else{// 每次分配一部分空间,循环不断累积空间以满足此次分配freeUp(size - accumulated);int got =(int)Math.min(size - accumulated,this.nonPooledAvailableMemory);this.nonPooledAvailableMemory -= got;
accumulated += got;}}// Don't reclaim memory on throwable since nothing was thrown
accumulated =0;}finally{// When this loop was not able to successfully terminate don't loose available memorythis.nonPooledAvailableMemory += accumulated;this.waiters.remove(moreMemory);}}}finally{// 如果还有空间剩余,并且有线程在等待空间,则唤醒一个等待线程try{if(!(this.nonPooledAvailableMemory ==0&&this.free.isEmpty())&&!this.waiters.isEmpty())this.waiters.peekFirst().signal();}finally{// Another finally... otherwise find bugs complains
lock.unlock();}}if(buffer ==null)// 累计空间然后分配returnsafeAllocateByteBuffer(size);else// 直接分配了 ByteBufferreturn buffer;}
deallocate() 方法用于释放已分配的内存。
publicvoiddeallocate(ByteBuffer buffer){if(buffer !=null)deallocate(buffer, buffer.capacity());}
publicvoiddeallocate(ByteBuffer buffer,int size){
lock.lock();try{// 如果大小为 ByteBuffer 的大小,则将 buffer 直接放入 free 队列if(size ==this.poolableSize && size == buffer.capacity()){
buffer.clear();this.free.add(buffer);}else{// 否则由 GC 来释放空间,将空间放入 nonPooledAvailableMemorythis.nonPooledAvailableMemory += size;}// 唤醒第一个正在等待分配空间的线程Condition moreMem =this.waiters.peekFirst();if(moreMem !=null)
moreMem.signal();}finally{
lock.unlock();}}
3. RecordAccumulator内部结构和消息加入缓存
RecordAccumulator 内部对于每个 topic 维持了一个 TopicInfo。其内部存储了当前 topic 所有 partition 上的 ProducerBatch 队列。
privatestaticclassTopicInfo{// 包含当前 topic 所有 partition 上的 ProducerBatch 队列。publicfinalConcurrentMap<Integer/*partition*/,Deque<ProducerBatch>> batches =newCopyOnWriteMap<>();publicfinalBuiltInPartitioner builtInPartitioner;publicTopicInfo(BuiltInPartitioner builtInPartitioner){this.builtInPartitioner = builtInPartitioner;}}
当客户端调用 send() 发送一条消息,send() 方法底层调用了 doSend() 方法。doSend() 方法中在发送消息前调用 accumulator.append() 将消息加入 RecordAccumulator。如下为 doSend() 方法中将消息加入 RecordAccumulator 的部分。
privateFuture<RecordMetadata>doSend(ProducerRecord<K,V> record,Callback callback){// **调用 append() 方法将消息加入 RecordAccumulator**// 对于上文返回 RecordMetadata.UNKNOWN_PARTITION 的消息,// 在 append() 内部计算其分区,然后分区会写入 appendCallbacks.topicPartition.RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);assert appendCallbacks.getPartition()!=RecordMetadata.UNKNOWN_PARTITION;// 如果 abortForNewBatch 为 true,这一步是为了可以调用自定义的 onNewBatch() 方法if(result.abortForNewBatch){int prevPartition = partition;// 调用自定义的 onNewBatch() 方法onNewBatch(record.topic(), cluster, prevPartition);
partition =partition(record, serializedKey, serializedValue, cluster);if(log.isTraceEnabled()){
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);}// **再次调用 append() 方法,此时 abortForNewBatch 为 false**
result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs,false, nowMs, cluster);}if(transactionManager !=null){
transactionManager.maybeAddPartition(appendCallbacks.topicPartition());}// 一个 batch 写满或者创建了新 btach 时唤醒 sender 线程发送if(result.batchIsFull || result.newBatchCreated){
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());this.sender.wakeup();}return result.future;}
append() 方法负责将消息加入 RecordAccumulator。
publicRecordAppendResultappend(String topic,int partition,long timestamp,byte[] key,byte[] value,Header[] headers,AppendCallbacks callbacks,long maxTimeToBlock,boolean abortOnNewBatch,long nowMs,Cluster cluster)throwsInterruptedException{// 获取当前主题的 TopicInfo,即获取当前 topic 所有分区上的 ProducerBatch 队列TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k ->newTopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));// CAS增加正在 append 的 batches 的数量// appendsInProgress是为了追踪正在 append 的 batches 的数量,以便执行 abortIncompleteBatches()
appendsInProgress.incrementAndGet();ByteBuffer buffer =null;if(headers ==null) headers =Record.EMPTY_HEADERS;try{while(true){finalBuiltInPartitioner.StickyPartitionInfo partitionInfo;finalint effectivePartition;// 如果之前没有计算出分区,则根据 broker 负载通过内置的加权随机算法来选择一个分区,保证负载均衡if(partition ==RecordMetadata.UNKNOWN_PARTITION){
partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
effectivePartition = partitionInfo.partition();}else{
partitionInfo =null;
effectivePartition = partition;}// 在 appendCallbacks 设定 partition 分区,用于保存 partitionsetPartition(callbacks, effectivePartition);// 根据 partition 获取对应的 deque,如果不存在则创建一个 dequeDeque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k ->newArrayDeque<>());synchronized(dq){// 如果分区信息发生变化,则重新循环if(partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))continue;// **调用 tryAppend(),尝试将消息写入对应的 deque**// 第一次运行到这里时,由于 deque 是空的,所以 tryAppend() 会失败RecordAppendResult appendResult =tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);if(appendResult !=null){// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).boolean enableSwitch =allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);return appendResult;}}// 代码执行到这里说明 tryAppend() 的结果返回了 null,说明 ProducerBatch 空间不足或者 deque 中没有可用的 ProducerBatch // 如果 abortOnNewBatch 为 true,则构造一个空的 RecordAppendResult 并返回// 在外部的 doSend() 方法中随后会再次调用 append() 方法,并指定 abortOnNewBatch 为 false// 这样做是为了在外部的 doSend() 方法中可以调用自定义的 onNewBatch() 方法if(abortOnNewBatch){// Return a result that will cause another call to append.returnnewRecordAppendResult(null,false,false,true,0);}if(buffer ==null){byte maxUsableMagic = apiVersions.maxUsableProduceMagic();// 选取 batchsize 和 估算的record的size 中较大的值进行空间分配int size =Math.max(this.batchSize,AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);// 尝试从缓存池分配 size 大小的空间,如果空间不足,则会阻塞
buffer = free.allocate(size, maxTimeToBlock);
nowMs = time.milliseconds();}synchronized(dq){// 如果分区信息发生变化,则重新循环if(partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))continue;// **调用 appendNewBatch() 构建一个新的 ProducerBatch**RecordAppendResult appendResult =appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);// 将 buffer 设定为 null, finally块在 buffer 不为 null 的情况下会释放 buffer 的内存// 当消息正常写入 buffer 时,将 buffer 设定为 null 使其内存不被释放// 当消息写入发生异常时,可以释放掉已分配的 buffer 空间if(appendResult.newBatchCreated)
buffer =null;// // 如果有未满的 ProducerBatch,则禁用分区切换boolean enableSwitch =allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);return appendResult;}}}finally{// 释放内存空间
free.deallocate(buffer);// CAS减少正在 append 的 batches 的数量
appendsInProgress.decrementAndGet();}}
tryAppend() 方法负责将消息写入 deque 中的 ProducerBatch。
privateRecordAppendResulttryAppend(long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,Deque<ProducerBatch> deque,long nowMs){if(closed)thrownewKafkaException("Producer closed while send in progress");// 获取 deque 中的最后一个 ProducerBatchProducerBatch last = deque.peekLast();// 如果存在 ProducerBatchif(last !=null){// 最后一个 ProducerBatch 写入消息前的初始大小int initialBytes = last.estimatedSizeInBytes();// 尝试将消息写入最后一个 ProducerBatch FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);// 返回 null 代表写入失败,则关闭写入if(future ==null){
last.closeForRecordAppends();}else{// 写入成功int appendedBytes = last.estimatedSizeInBytes()- initialBytes;returnnewRecordAppendResult(future, deque.size()>1|| last.isFull(),false,false, appendedBytes);}}// 代码执行到这里说明 deque 中没有 ProducerBatch,则直接返回 nullreturnnull;}
appendNewBatch() 方法创建一个新的 ProducerBatch 并将其加入 deque。
privateRecordAppendResultappendNewBatch(String topic,int partition,Deque<ProducerBatch> dq,long timestamp,byte[] key,byte[] value,Header[] headers,AppendCallbacks callbacks,ByteBuffer buffer,long nowMs){assert partition !=RecordMetadata.UNKNOWN_PARTITION;// 再次尝试 tryAppend(),如果其他线程已经创建了 ProducerBatch,则 tryAppend()成功可以直接返回// 防止由于多个线程创建了多个 ProducerBatch,并且只在队尾写入,导致前面创建的 ProducerBatch 空间浪费RecordAppendResult appendResult =tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);if(appendResult !=null){return appendResult;}// 构建 ProducerBatchMemoryRecordsBuilder recordsBuilder =recordsBuilder(buffer, apiVersions.maxUsableProduceMagic());ProducerBatch batch =newProducerBatch(newTopicPartition(topic, partition), recordsBuilder, nowMs);FutureRecordMetadata future =Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callbacks, nowMs));// 将 ProducerBatch 加入 deque
dq.addLast(batch);// 将 ProducerBatch 加入未完成消息发送的队列
incomplete.add(batch);returnnewRecordAppendResult(future, dq.size()>1|| batch.isFull(),true,false, batch.estimatedSizeInBytes());}
4. sender 线程
sender 线程在 runOnce() 中会调用 sendProducerData() 来发送缓存在 RecordAccumulator 中的消息。
privatelongsendProducerData(long now){// 获取元数据MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();// 获取已经可以发送的分区RecordAccumulator.ReadyCheckResult result =this.accumulator.ready(metadataSnapshot, now);// 如果有 leader 节点未知的分区,则强制更新元数据if(!result.unknownLeaderTopics.isEmpty()){// The set of topics with unknown leader contains topics with leader election pending as well as// topics which may have expired. Add the topic again to metadata to ensure it is included// and request metadata update, since there are messages to send to the topic.for(String topic : result.unknownLeaderTopics)this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);this.metadata.requestUpdate(false);}// 在结果中继续过滤,通过检查与节点的连接,移除没有准备好的节点Iterator<Node> iter = result.readyNodes.iterator();long notReadyTimeout =Long.MAX_VALUE;while(iter.hasNext()){Node node = iter.next();if(!this.client.ready(node, now)){// Update just the readyTimeMs of the latency stats, so that it moves forward// every time the batch is ready (then the difference between readyTimeMs and// drainTimeMs would represent how long data is waiting for the node).this.accumulator.updateNodeLatencyStats(node.id(), now,false);
iter.remove();
notReadyTimeout =Math.min(notReadyTimeout,this.client.pollDelayMs(node, now));}else{// Update both readyTimeMs and drainTimeMs, this would "reset" the node// latency.this.accumulator.updateNodeLatencyStats(node.id(), now,true);}}// 从 RecordAccumulator 取出数据,按照 node 节点和 ProducerBatch 进行映射,交由网络层发送给对应的节点Map<Integer,List<ProducerBatch>> batches =this.accumulator.drain(metadataSnapshot, result.readyNodes,this.maxRequestSize, now);addToInflightBatches(batches);if(guaranteeMessageOrder){// Mute all the partitions drainedfor(List<ProducerBatch> batchList : batches.values()){for(ProducerBatch batch : batchList)this.accumulator.mutePartition(batch.topicPartition);}}// 处理已经过期的消息
accumulator.resetNextBatchExpiryTime();List<ProducerBatch> expiredInflightBatches =getExpiredInflightBatches(now);List<ProducerBatch> expiredBatches =this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);if(!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());for(ProducerBatch expiredBatch : expiredBatches){String errorMessage ="Expiring "+ expiredBatch.recordCount +" record(s) for "+ expiredBatch.topicPartition
+":"+(now - expiredBatch.createdMs)+" ms has passed since batch creation";failBatch(expiredBatch,newTimeoutException(errorMessage),false);if(transactionManager !=null&& expiredBatch.inRetry()){// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);}}// 更新 metrics
sensors.updateProduceRequestMetrics(batches);// 将 pollTimeout 设定为 下一次检查结点ready的延迟时间 和 下一次batch过期时间 中较小的值long pollTimeout =Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout =Math.min(pollTimeout,this.accumulator.nextExpiryTimeMs()- now);
pollTimeout =Math.max(pollTimeout,0);// 如果有准备好的结点,将 pollTimeout 设定为 0,立即发送消息if(!result.readyNodes.isEmpty()){
log.trace("Nodes with data ready to send: {}", result.readyNodes);// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout =0;}// 通过 NetworkClient 发送消息sendProduceRequests(batches, now);return pollTimeout;}
版权归原作者 SolitudeCoding 所有, 如有侵权,请联系我们删除。