0


Netty 的 内存池 是如何实现的

对于今天的源码剖析,你可以带着下面这么几个问题:

1 PoolArena中的PoolSubpage数组和PoolChunk中的PoolSubpage数组有什么关联?

2 PoolThreadCache中的MemoryRegionCache数组与PoolSubpage是否有联系?

3 Netty内存池的整体结构是什么样的?

4 如果让你来介绍Netty内存池,你如何来描述?

内存池源码剖析

调试用例

上一节,我们说了Netty根据请求的大小将其分成四类:Tiny、Small、Normal、Huge,这四类请求的分界线分别为512B、8KB、16MB,针对这四类请求,Netty的处理逻辑并不一样,但是,本节,我们并不打算把这四种全部讲解一遍,我们只讲解其中的一个——Tiny。

另外,为了加快分配内存的速度,Netty还使用了线程缓存,而线程缓存实际上是在有回收内存的情况下才有效,所以,我们设计的调试用例里面还应该包括回收内存的部分,即ReferenceCountUtil.release(byteBuf)。

publicclassByteBufTest{publicstaticvoidmain(String[] args){// 1. 创建池化的分配器ByteBufAllocator allocator =newPooledByteBufAllocator(false);// 2. 分配一个40B的ByteBufByteBuf byteBuf = allocator.heapBuffer(40);// 3. 写入数据
        byteBuf.writeInt(4);// 4. 读取数据System.out.println(byteBuf.readInt());// 5. 回收内存ReferenceCountUtil.release(byteBuf);// 6. 分配一个30B的ByteBufByteBuf byteBuf2 = allocator.heapBuffer(30);// 7. 再次分配一个40B的ByteBufByteBuf byteBuf3 = allocator.heapBuffer(40);}}

我们先创建一个池化的分配器,使用其分配一个40B的ByteBuf,接着写入数据并读取数据,然后回收,理论上来说,这个40B的ByteBuf会进入线程缓存,接着再分配一个30B的ByteBuf,观察其是否会从线程缓存中获取,最后再分配一个40B的ByteBuf,观察其是否会从线程缓存中获取。

源码调试

  1. 创建池化的分配器
  2. 让我们先在创建分配器那行打个断点,跟踪进去:
publicPooledByteBufAllocator(boolean preferDirect,int nHeapArena,int nDirectArena,int pageSize,int maxOrder,int tinyCacheSize,int smallCacheSize,int normalCacheSize,boolean useCacheForAllThreads,int directMemoryCacheAlignment){super(preferDirect);// 这是一个ThreadLocal,用于存放线程缓存PoolThreadCache对象
    threadCache =newPoolThreadLocalCache(useCacheForAllThreads);// Tiny的缓存个数,默认512this.tinyCacheSize = tinyCacheSize;// Small的缓存个数,默认256this.smallCacheSize = smallCacheSize;// Normal的缓存个数,默认64this.normalCacheSize = normalCacheSize;// pageSize默认为8KB// maxOrder表示树的最大高度,默认为11// 计算每个Chunk的大小,默认为16MB
    chunkSize =validateAndCalculateChunkSize(pageSize, maxOrder);// ...// 计算每页大小的左移位数// pageSize=8KB=10 0000 0000 0000=1<<13// pageShifts表示pageSize是1左移13位得来的// 所以pageShifts为13int pageShifts =validateAndCalculatePageShifts(pageSize);// 初始化堆内存池heapArenas// 默认为CPU核数*2,同时根据堆内存大小调整if(nHeapArena >0){
        heapArenas =newArenaArray(nHeapArena);List<PoolArenaMetric> metrics =newArrayList<PoolArenaMetric>(heapArenas.length);for(int i =0; i < heapArenas.length; i ++){// 创建HeapArena并加入到数组中PoolArena.HeapArena arena =newPoolArena.HeapArena(this,
                                                                pageSize, maxOrder, pageShifts, chunkSize,
                                                                directMemoryCacheAlignment);
            heapArenas[i]= arena;// 同时加入监控
            metrics.add(arena);}
        heapArenaMetrics =Collections.unmodifiableList(metrics);}else{
        heapArenas =null;
        heapArenaMetrics =Collections.emptyList();}// 初始化直接内存池directArenas// 默认为CPU核数*2,同时根据直接内存大小调整if(nDirectArena >0){
        directArenas =newArenaArray(nDirectArena);List<PoolArenaMetric> metrics =newArrayList<PoolArenaMetric>(directArenas.length);for(int i =0; i < directArenas.length; i ++){// 创建DirectArena并加入到数组中PoolArena.DirectArena arena =newPoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
            directArenas[i]= arena;// 同时加入监控
            metrics.add(arena);}
        directArenaMetrics =Collections.unmodifiableList(metrics);}else{
        directArenas =null;
        directArenaMetrics =Collections.emptyList();}// 监控
    metric =newPooledByteBufAllocatorMetric(this);}

在PooledByteBufAllocator的构造方法中,主要是初始化一些属性,比如chunkSize、heapArenas和directArenas数组等,默认地,chunkSize为16M,heapArenas和directArenas数组大小为2倍的CPU核数,同时根据内存大小动态调整。

2. 分配一个40B的ByteBuf

// PooledByteBufAllocator#newHeapBuffer@OverrideprotectedByteBufnewHeapBuffer(int initialCapacity,int maxCapacity){// 先从ThreadLocal中获取一个PoolThreadCache线程缓存对象PoolThreadCache cache = threadCache.get();// 因为我们创建的是基于堆内存的ByteBuf,所以使用heapArena// 另外,在PoolThreadCache初始化的时候会绑定一个最少使用的heapArena// 所以这里是可以获取到的// 具体可参考PooledByteBufAllocator.PoolThreadLocalCache#initialValue()方法PoolArena<byte[]> heapArena = cache.heapArena;finalByteBuf buf;if(heapArena !=null){// 使用heapArena来分配
        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);}else{
        buf =PlatformDependent.hasUnsafe()?newUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity):newUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);}returntoLeakAwareBuffer(buf);}

在这段代码中,首先从threadCache中获取了一个PoolThreadCache对象cache,再从这个cache中获取一个heapArena,这个heapArena是从allocator的heapArenas数组中取的一个最少使用的,在cache初始化的时候绑定进去的,为什么绑定的是最少使用的呢?

我们知道,heapArenas数组的大小是固定的,而线程是可能无限多的,每个线程都要绑定一个heapArena,那么,怎么绑定才能减少线程之间的竞争呢?答案很明显,绑定最少使用的那个heapArena。比如,heapArenas数组大小为16,而线程也正好为16,这样的话,线程之间完全没竞争关系,如果按照数组的顺序绑定,最后这16个线程都会绑定到同一个heapArena,竞争非常剧烈。

OK,我们继续跟踪到heapArena.allocate()中:

// PoolArena#allocate(PoolThreadCache, int, int)PooledByteBuf<T>allocate(PoolThreadCache cache,int reqCapacity,int maxCapacity){// 使用对象池创建一个池化的ByteBuf,先不展开,跳过// PooledByteBuf中有一个memory字段用来存放数据// 从对象池中返回的这个buf,它的memory大小为0// 内存池的作用就是为了给这个memory分配空间PooledByteBuf<T> buf =newByteBuf(maxCapacity);// 继续深入allocate(cache, buf, reqCapacity);return buf;}// PoolArena#allocate(PoolThreadCache, PooledByteBuf<T>, int)privatevoidallocate(PoolThreadCache cache,PooledByteBuf<T> buf,finalint reqCapacity){// 将请求的大小规范化,这里40B会被规范化到48Bfinalint normCapacity =normalizeCapacity(reqCapacity);// 判断是否小于8KBif(isTinyOrSmall(normCapacity)){// capacity < pageSizeint tableIdx;PoolSubpage<T>[] table;// 是否为Tiny,即小于512Bboolean tiny =isTiny(normCapacity);if(tiny){// < 512// 先尝试从线程缓存中分配内存if(cache.allocateTiny(this, buf, reqCapacity, normCapacity)){// 分配成功则返回return;}// 计算48B在tinySubpagePools数组中的索引// 16、32、48,所以这里的索引是3
            tableIdx =tinyIdx(normCapacity);// PoolArena的tinySubpagePools数组,大小为32
            table = tinySubpagePools;}else{// 如果为Small,从这里走if(cache.allocateSmall(this, buf, reqCapacity, normCapacity)){return;}
            tableIdx =smallIdx(normCapacity);
            table = smallSubpagePools;}// 获取头节点,head中不存放任何数据,仅用来加锁使用finalPoolSubpage<T> head = table[tableIdx];// 分段锁的用法,减少锁竞争synchronized(head){// 因为还没有分配过任何内存,所以next为空,先跳过这一段finalPoolSubpage<T> s = head.next;// ...// 分配成功会返回}// 按PoolArena加锁,又是分段锁的用法// 这里就体会到了上面按最少使用去绑定PoolArena的魅力了吧// 减少了锁竞争synchronized(this){// 这个方法名有点歧义// 并不是说按Normal的请求去处理allocateNormal(buf, reqCapacity, normCapacity);}incTinySmallAllocation(tiny);return;}// 如果是Normal请求if(normCapacity <= chunkSize){// ...}else{// 如果是Huge请求allocateHuge(buf, reqCapacity);}}

这段代码中运用了大量的分段锁技巧:

1 PoolArena,每个线程绑定一个最少使用的PoolArena,充分减少锁竞争。
2 tinySubpagePools,按16B分成32段,只有分配相同大小(规范后的大小)的请求时且还是两个线程绑定到同一个PoolArena的情况下才有锁竞争。

结合ConcurrentHashMap中分段锁的用法,我们归纳出来一个规律:
分段锁一般都是通过数组来实现的
通过某种规则将多个线程的操作分离到数组中不同的位置上,
以便降低线程之间修改同一段数组的竞争,所以,有时候也叫作锁分离

比如,在ConcurrentHashMap中,是根据key值hash到不同的桶中进行处理。在PoolArena中,是通过线程绑定最小使用PoolArena来达到锁分离的目的。在tinySubpagePools中,是通过将请求分割成不同的段进行处理减少锁竞争。

OK,让我们继续跟踪到allocateNormal()内部:

// PoolArena#allocateNormalprivatevoidallocateNormal(PooledByteBuf<T> buf,int reqCapacity,int normCapacity){// 先尝试从已有的PoolChunk中分配内存// 此时是第一次请求,所以,没有任何可用的PoolChunkif(q050.allocate(buf, reqCapacity, normCapacity)|| q025.allocate(buf, reqCapacity, normCapacity)|| q000.allocate(buf, reqCapacity, normCapacity)|| qInit.allocate(buf, reqCapacity, normCapacity)|| q075.allocate(buf, reqCapacity, normCapacity)){return;}// key1, 创建一个新的PoolChunkPoolChunk<T> c =newChunk(pageSize, maxOrder, pageShifts, chunkSize);// key2, 使用这个PoolChunk分配内存boolean success = c.allocate(buf, reqCapacity, normCapacity);assert success;// 这新的PoolChunk加入到qInit中
    qInit.add(c);}

这个方法中,有两个关键的地方:

1 创建PoolChunk,通过上一节的分析,我们知道,数据全部都是存储在PoolChunk的memory字段中的,那么,这个memory是如何创建的呢?

2 使用PoolChunk分配内存,如何分配?

// PoolArena.HeapArena#newChunk@OverrideprotectedPoolChunk<byte[]>newChunk(int pageSize,int maxOrder,int pageShifts,int chunkSize){returnnewPoolChunk<byte[]>(this,newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize,0);}// PoolArena.HeapArena#newByteArrayprivatestaticbyte[]newByteArray(int size){// 调用PlatformDependentreturnPlatformDependent.allocateUninitializedArray(size);}publicstaticbyte[]allocateUninitializedArray(int size){// 默认地,UNINITIALIZED_ARRAY_ALLOCATION_THRESHOLD=-1// 创建了一个大小为16MB的byte[]数组,在堆内存中return UNINITIALIZED_ARRAY_ALLOCATION_THRESHOLD <0|| UNINITIALIZED_ARRAY_ALLOCATION_THRESHOLD > size ?newbyte[size]:PlatformDependent0.allocateUninitializedArray(size);}// PoolChunk的构造方法PoolChunk(PoolArena<T> arena,T memory,int pageSize,int maxOrder,int pageShifts,int chunkSize,int offset){
    unpooled =false;this.arena = arena;// memory就是上面创建的byte[]数组this.memory = memory;// 8KBthis.pageSize = pageSize;// 13this.pageShifts = pageShifts;// 11this.maxOrder = maxOrder;// 16MBthis.chunkSize = chunkSize;// 0this.offset = offset;// 12,表示如果满二叉树中的节点更新到了12,则表示此节点已完全分配了
    unusable =(byte)(maxOrder +1);// 24,用于计算满二叉树中的节点占用的空间大小,比如2048占用8K,而1024占用16K
    log2ChunkSize =log2(chunkSize);// -8192,表示subpage溢出的掩码
    subpageOverflowMask =~(pageSize -1);// 初始时可使用的内存等于PoolChunk的整个空间,即16MB
    freeBytes = chunkSize;assert maxOrder <30:"maxOrder should be < 30, but is: "+ maxOrder;// 一个PoolChunk可以被分成多少页,1<<11=2048
    maxSubpageAllocs =1<< maxOrder;// 创建memoryMap和depthMap,满二叉树,最后一层的节点数正好等于上面所有层的节点数// 所以它们的大小为2048*2=4096
    memoryMap =newbyte[maxSubpageAllocs <<1];
    depthMap =newbyte[memoryMap.length];int memoryMapIndex =1;// 初始化memoryMap和depthMap中的元素为每个节点的高度for(int d =0; d <= maxOrder;++ d){int depth =1<< d;for(int p =0; p < depth;++ p){
            memoryMap[memoryMapIndex]=(byte) d;
            depthMap[memoryMapIndex]=(byte) d;
            memoryMapIndex ++;}}// 一个PoolChunk可以被分割成2048个Page// Page在Netty中同样使用PoolSubpage来表示
    subpages =newSubpageArray(maxSubpageAllocs);
    cachedNioBuffers =newArrayDeque<ByteBuffer>(8);}
标签:

本文转载自: https://blog.csdn.net/dgutliangxuan/article/details/119385413
版权归原作者 人月IT 所有, 如有侵权,请联系我们删除。

“Netty 的 内存池 是如何实现的”的评论:

还没有评论