文章目录
在Flink内存模型中我们已经知道,Flink会将内存按照使用方式、内存类型分为不同的内存区域,底层会借助MemorySegment对内存块进行管理和访问,MemorySegment的使用场景有很多,本文我们主要看下ManagedMemory和NetworkBuffer是如何申请和使用MemorySegment内存块的。
一.ManagedMemory(算子)内存的申请与使用
1. tm内存申请与使用大致流程
Task使用的物理计算资源主要是TaskSlot提供的,TaskSlot由TaskManager中TaskSlotTable组件创建和管理。
- 创建MemoryManager:JobManager申请到足够的Slot计算资源后,会在TaskSlotTable中创建相应的TaskSlot,然后对TaskSlot基本环境进行初始化,包括在TaskSlot内部创建MemoryManager组件。最终使用MemoryManager管理当前TaskSlot的内存计算资源。
- task线程使用内存:当Task线程启动时,会直接从TaskSlot中获取MemoryManager组件申请内存空间。通过MemoryManager对MemorySegment内存空间进行管理,这一步对应内存模型中的ManagedMemory,也被称为托管内存。
2. 创建MemoryManager实例
在TaskSlot的构造器中调用createMemoryManager()方法创建MemoryManager实例,管理当前TaskSlot(代表一个线程的资源) 中的内存空间
/**
创建具有**给定**容量和给定页面大小的内存管理器。
这是 MemoryManager 的生产版本,一旦 MemoryManager
的所有者准备好处置,它就会检查内存泄漏 ( verifyEmpty() )。
参数:
memorySize – 该内存管理器管理的堆外内存的总大小。
pageSize – 内存管理器分配的页面大小。
**/privatestaticMemoryManagercreateMemoryManager(ResourceProfile resourceProfile,int pageSize){returnMemoryManager.create(resourceProfile.getManagedMemory().getBytes(), pageSize);}
在TaskSlot.createMemoryManager()方法中,会根据ResourceProfile参数获取内存空间大小,默认设置为非堆ing。其中pageSize参数就是MemorySegment的大小,如下代码默认为32kb。
[email protected](Documentation.Sections.ALL_TASK_MANAGER)publicstaticfinalConfigOption<MemorySize>MEMORY_SEGMENT_SIZE=key("taskmanager.memory.segment-size").memoryType().defaultValue(MemorySize.parse("32kb")).withDescription("Size of memory buffers used by the network stack and the memory manager.");
3. 算子使用通过MemoryManager使用内存
MemoryManager创建完毕后,会通过TaskSlot将MemoryManager对象传递给Task,此时Task会通过将MemoryManager封装在Environment变量中,然后传递给算子。
算子接收到MemoryManager对象后,通过MemoryManager动态申请内存空间,最终用于算子的具体计算过程。
需要注意的是:并不是所有的算子都会使用MemoryManager申请内存空间,这个步骤主要针对批计算类型的算子,例如HashJoinOperator、SortMergeJoinOperator和SortOperator等,这些算子往往需要借助非常大的内存空间进行数据的排序等操作。
4. ManagedMemory内存空间申请流程
申请ManagedMemory内存空间,是调用MemoryManager.allocatePages()方法执行的,见如下逻辑。
- 1)从AllocationRequest参数中获取MemorySegment的空集合、申请Pages总数量以及资源Owner(与内存关联的所有者:slot?还是算子?)等参数,并对参数进行非空和状态检查;
- 2)计算申请内存大小,并预留出内存空间;
- 3)根据page数、pageCleanup、owner等,开始分配内存,将内存以MemorySegment为单位,并维护一个set集合,最终返回给算子使用。
/**
从此内存管理器分配一组内存段。
分配的总内存不会超过构造函数中声明的大小限制。
参数:
owner – 与内存段关联的所有者,用于后备释放。
target – 将分配的内存页放入其中的列表。 numberOfPages – 要分配的页数。
**/publicvoidallocatePages(Object owner,Collection<MemorySegment> target,int numberOfPages)throwsMemoryAllocationException{// sanity check Preconditions.checkNotNull(owner,"The memory owner must not be null.");Preconditions.checkState(!isShutDown,"Memory manager has been shut down.");Preconditions.checkArgument(
numberOfPages <= totalNumberOfPages,"Cannot allocate more segments %s than the max number %s",
numberOfPages,
totalNumberOfPages);// reserve array space, if applicable if(target instanceofArrayList){((ArrayList<MemorySegment>) target).ensureCapacity(numberOfPages);}//计算申请内存大小,并预留空间(以免申请过程中被用掉)long memoryToReserve = numberOfPages * pageSize;try{
memoryBudget.reserveMemory(memoryToReserve);}catch(MemoryReservationException e){thrownewMemoryAllocationException(String.format("Could not allocate %d pages", numberOfPages), e);}//创建pageCleanup方法用于清理unsafe内存Runnable pageCleanup =this::releasePage;
allocatedSegments.compute(
owner,(o, currentSegmentsForOwner)->{Set<MemorySegment> segmentsForOwner =
currentSegmentsForOwner ==null?newHashSet<>(numberOfPages): currentSegmentsForOwner;for(long i = numberOfPages; i >0; i--){//分配内存MemorySegment segment =allocateOffHeapUnsafeMemory(getPageSize(), owner, pageCleanup);
target.add(segment);
segmentsForOwner.add(segment);}return segmentsForOwner;});Preconditions.checkState(!isShutDown,"Memory manager has been concurrently shut down.");}
如下如下算子会申请内存使用:
二.NetworkBuffer内存申请与使用
在Flink内存模型中,另外一个非常重要的堆外内存使用区域就是Network内存。Network内存主要用于网络传输中Buffer数据的缓冲区。
1. NetworkBuffer构造器
在NetworkBufferPool的构造器中可以看出,创建NetworkBufferPool时会根据用户配置的NetworkBuffer数量,调用MemorySegmentFactory创建相应的MemorySegment内存空间,再通过LocalBufferPool应用到ResultSubPartition或InputChannel组件中。
publicNetworkBufferPool(int numberOfSegmentsToAllocate,int segmentSize,Duration requestSegmentsTimeout){this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;this.memorySegmentSize = segmentSize;Preconditions.checkNotNull(requestSegmentsTimeout);checkArgument(
requestSegmentsTimeout.toMillis()>0,"The timeout for requesting exclusive buffers should be positive.");this.requestSegmentsTimeout = requestSegmentsTimeout;finallong sizeInLong =(long) segmentSize;try{this.availableMemorySegments =newArrayDeque<>(numberOfSegmentsToAllocate);}catch(OutOfMemoryError err){thrownewOutOfMemoryError("Could not allocate buffer queue of length "+ numberOfSegmentsToAllocate
+" - "+ err.getMessage());}try{//申请segment内存,并放到availableMemorySegments中。for(int i =0; i < numberOfSegmentsToAllocate; i++){
availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize,null));}}catch(OutOfMemoryError err){//如果申请过程中失败,则释放已申请的内存,算出缺少多少内存int allocated = availableMemorySegments.size();// free some memory
availableMemorySegments.clear();long requiredMb =(sizeInLong * numberOfSegmentsToAllocate)>>20;long allocatedMb =(sizeInLong * allocated)>>20;long missingMb = requiredMb - allocatedMb;thrownewOutOfMemoryError("Could not allocate enough memory segments for NetworkBufferPool "+"(required (MB): "+ requiredMb
+", allocated (MB): "+ allocatedMb
+", missing (MB): "+ missingMb
+"). Cause: "+ err.getMessage());}
availabilityHelper.resetAvailable();//计算共申请了多少mb:20:为2的20次方long allocatedMb =(sizeInLong * availableMemorySegments.size())>>20;LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
allocatedMb,
availableMemorySegments.size(),
segmentSize);}
参考:《Flink设计与实现:核心原理与源码解析》- 张利兵
版权归原作者 roman_日积跬步-终至千里 所有, 如有侵权,请联系我们删除。