如何初始化的bufferPool的
在初始化的时候 初始化BufferPool对象
// 设置缓冲区this.accumulator =newRecordAccumulator(xxxxx,其他参数,newBufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));this.free = bufferPool;
发送消息时
RecordAccumulator.RecordAppendResult result = accumulator.append(xxx);
buffer = free.allocate(size, maxTimeToBlock);// 内存分配
总体架构
在KafkaProudcer初始化的时候,会创建一个32MB的缓冲池,buffer.memory参数可以自定义,同事缓冲池被分成多个块,一个块就是batch.size 默认就是16KB。
我们来分析下,在一个Kafka集群中 如果有3个Broker。那么当一个topic创建的时候,就是三个分区。
分区A: 分区B: 分区C: 三个分区分别存储消息 发送消息。所以在申请的时候,也是按照分区级别进行申请Batch内存块。
但是如果频繁的申请、发送完毕消息,被GC回收,其实是比较消耗资源的方式,所以更好的方式就是通过池化技术,
总体流程
1.申请之后发送完毕消息后,自动归还给BufferPool,避免内存块被频繁回收的问题。
基本属性
// 总内存大小 32MBprivatefinallong totalMemory;// 每个内存块大小 batchSize 默认16Kprivatefinalint poolableSize;// 申请、归还内存的方法的同步锁privatefinalReentrantLock lock;// 空闲内存块privatefinalDeque<ByteBuffer> free;// 需要等待空闲内存块的事件privatefinalDeque<Condition> waiters;/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */// 缓冲池还未分配的空闲内存,新申请的内存块就是从这里获取内存值privatelong nonPooledAvailableMemory;
内存分配
申请内存
org.apache.kafka.clients.producer.internals.BufferPool#allocate
1.判断申请内存大小超过总内存大小 抛出异常
2.申请加锁,如果缓冲区已经关闭,直接释放锁,抛出异常
3.内存够的情况下,如果申请内存等于16K,并且缓冲区内存不为空
4.如果申请内存超过一个batch.size的大小,当前空闲内存总空间 以及回收的内存空间是否足够申请的内存大小
5.内存不够的情况下,申请一个condition 添加到waiter,不断收集空闲的内存,直到大于申请的内存,退出。在申请过程中,await进行阻塞等待。
publicByteBufferallocate(int size,long maxTimeToBlockMs)throwsInterruptedException{if(size >this.totalMemory)thrownewIllegalArgumentException("");ByteBuffer buffer =null;this.lock.lock();if(this.closed){this.lock.unlock();thrownewKafkaException("Producer closed while allocating memory");}try{// size大小等于batchSIze 并且free不为空 直接获取空闲内存块// 这里为什么必须是batchSize 因为如果大于batchSize的话,就无法满足,// 因为batchSize是固定值,不能超过batchSizeif(size == poolableSize &&!this.free.isEmpty())returnthis.free.pollFirst();// 已经回收的内存总大小 = 当前回收内存的个数 * batchSizeint freeListSize =freeSize()*this.poolableSize;// 总空闲内存 大于等于 申请的内存if(this.nonPooledAvailableMemory + freeListSize >= size){// we have enough unallocated or pooled memory to immediately// satisfy the request, but need to allocate the bufferfreeUp(size);// 空闲内存 减去申请的内存大小this.nonPooledAvailableMemory -= size;// 内存足够的情况}else{// 内存不够的情况// we are out of memory and will have to blockint accumulated =0;// 创建本次等待的conditionCondition moreMemory =this.lock.newCondition();try{long remainingTimeToBlockNs =TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);// 添加到类型Deque的waiter中 -- 之后会唤醒this.waiters.addLast(moreMemory);//只有当超过申请内存大小 退出while(accumulated < size){long startWaitNs = time.nanoseconds();long timeNs;boolean waitingTimeElapsed;try{// 阻塞等待
waitingTimeElapsed =!moreMemory.await(remainingTimeToBlockNs,TimeUnit.NANOSECONDS);}finally{long endWaitNs = time.nanoseconds();
timeNs =Math.max(0L, endWaitNs - startWaitNs);recordWaitTime(timeNs);}if(this.closed)thrownewKafkaException("Producer closed while allocating memory");if(waitingTimeElapsed){this.metrics.sensor("buffer-exhausted-records").record();thrownewBufferExhaustedException("xx")}
remainingTimeToBlockNs -= timeNs;if(accumulated ==0&& size ==this.poolableSize &&!this.free.isEmpty()){
buffer =this.free.pollFirst();
accumulated = size;}else{freeUp(size - accumulated);int got =(int)Math.min(size - accumulated,this.nonPooledAvailableMemory);this.nonPooledAvailableMemory -= got;
accumulated += got;}}
accumulated =0;}finally{this.nonPooledAvailableMemory += accumulated;this.waiters.remove(moreMemory);}}}finally{try{if(!(this.nonPooledAvailableMemory ==0&&this.free.isEmpty())&&!this.waiters.isEmpty())this.waiters.peekFirst().signal();}finally{
lock.unlock();}}if(buffer ==null)returnsafeAllocateByteBuffer(size);elsereturn buffer;}
内存回收
内存释放的时候,加锁处理。然后判断规范内存等于batch.size 直接回收给free。
publicvoiddeallocate(ByteBuffer buffer,int size){
lock.lock();try{// 如果归还的内存块大小等于batchSizeif(size ==this.poolableSize && size == buffer.capacity()){// 清空添加到缓冲池中,归还给缓冲池
buffer.clear();this.free.add(buffer);}else{// 直接加在内存未分配的地址,等待JVM GC回收this.nonPooledAvailableMemory += size;}Condition moreMem =this.waiters.peekFirst();if(moreMem !=null)// 唤醒第一个待分配的
moreMem.signal();}finally{
lock.unlock();}}
品一品其中的设计
1.恰到好处的避免频繁的不断的JVM GC,使用内存池的方式,到达资源的复用。
2.结合业务设计batch.size 不能无脑设置消息体大小。如果太大则会导致不断创建新的ByteBuffer 并且不会归还到缓冲池中。
3.配合多线程的等待/唤醒机制来实现同步。
参考文档
https://www.cnblogs.com/rwxwsblog/p/14754810.html
https://greedypirate.github.io/2020/05/02/kafka%E7%BC%93%E5%86%B2%E6%B1%A0-BufferPool-%E5%8E%9F%E7%90%86%E5%89%96%E6%9E%90/#%E5%89%8D%E8%A8%80
https://blog.csdn.net/huaxiawangyong/article/details/132389908
版权归原作者 qxlxi 所有, 如有侵权,请联系我们删除。