Flink 调度源码分析1:拓扑图创建与提交过程
Flink 调度源码分析2:调度过程
Flink 调度源码分析3:Slot 分配策略
Flink 调度源码分析4:Physical Slot 分配过程
系统已经实现的分配策略有:SimpleExecutionSlotAllocator、SlotSharingExecutionSlotAllocator。流计算使用的是 SlotSharingExecutionSlotAllocator,故本文讲述该分配器。
关键概念
- subtask 在 Flink 中,Subtask 用于将作业的逻辑划分为并行的单元进行执行。每个作业任务(Job)会包含一些算子,部分算子进行链接后,形成一个个 task,task 根据其并行度,会生成若干 subtask。每个Subtask都是作业的一个独立执行单元,可以在不同的任务槽(Task Slot)上并行执行。
- ExecutionSlotSharingGroup ExecutionSlotSharingGroup 是用于控制任务槽(Task Slot)之间资源共享的概念。ExecutionSlotSharingGroup 可以将一组 task 划分为一个共享组,这些任务在同一个共享组中的任务槽上运行时会共享资源。不在同一共享组的 task一定不能分到同一 slot 中。
- CoLocationGroup CoLocationGroup 是用于控制任务之间部署位置的概念。通过将 task 划分到同一共位组,可以确保这些 task 在同一个 slot 上执行,从而减少任务之间的网络通信开销,提高作业的执行效率和性能。
整体过程
从物理共享 Slot 分配逻辑 Slot 。该分配器为每个 ExecutionSlotSharingGroup 维护一个共享 Slot。它为共享 Slot 分配一个物理 Slot,然后从中为计划任务分配逻辑 Slot。在任何托管子任务请求共享 Slot 时,物理 Slot 会被分配给共享 Slot。随后的每个共享子任务都会从现有的共享 Slot 中分配一个逻辑 Slot。只有在所有请求的逻辑 Slot 都被释放或取消后,共享/物理 Slot 才能被释放。
Slot 分配过程:
以下是代码编写逻辑,为了方便理解,讲述的过程的划分与下面不完全相符,但内容是一样的。
- 使用 SlotSharingStrategy 将 exection 映射到 ExecutionSlotSharingGroups。
- 检查哪些 ExecutionSlotSharingGroup 已拥有共享 Slot
- 对于所有尚未拥有共享插槽的相关 ExecutionSlotSharingGroup: 1. 使用 SharedSlotProfileRetriever 创建一个 Slot 配置文件;2. 从物理 PhysicalSlotProvider 分配一个物理 Slot;3. 根据返回的物理 Slot 创建共享 Slot。
- 为所有相应共享 Slot 的执行分配逻辑 Slot。
- 如果物理 Slot 请求失败,共享 Slot 中的相关逻辑 Slot 请求将被取消。流程图:
代码:SlotSharingExecutionSlotAllocator.allocateSlotsForVertices():
privateList<SlotExecutionVertexAssignment>allocateSlotsForVertices(List<ExecutionVertexID> executionVertexIds){SharedSlotProfileRetriever sharedSlotProfileRetriever =
sharedSlotProfileRetrieverFactory.createFromBulk(newHashSet<>(executionVertexIds));// 获取每个 ExecutionVertex 的共享组 Map<ExecutionSlotSharingGroup,List<ExecutionVertexID>> executionsByGroup =
executionVertexIds.stream().collect(Collectors.groupingBy(
slotSharingStrategy::getExecutionSlotSharingGroup));Map<ExecutionSlotSharingGroup,SharedSlot> slots =newHashMap<>(executionsByGroup.size());Set<ExecutionSlotSharingGroup> groupsToAssign =newHashSet<>(executionsByGroup.keySet());// 检查共享组是否有 slot Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots = tryAssignExistingSharedSlots(groupsToAssign);
slots.putAll(assignedSlots);
groupsToAssign.removeAll(assignedSlots.keySet());// 对没有 slot 的共享组分配 slot if (!groupsToAssign.isEmpty()) { Map<ExecutionSlotSharingGroup,SharedSlot> allocatedSlots =allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever);
slots.putAll(allocatedSlots);
groupsToAssign.removeAll(allocatedSlots.keySet());// 所有的共享组一定有共享 slot Preconditions.checkState(groupsToAssign.isEmpty()); }// 分配逻辑slot Map<ExecutionVertexID,SlotExecutionVertexAssignment> assignments =allocateLogicalSlotsFromSharedSlots(slots, executionsByGroup);// we need to pass the slots map to the createBulk method instead of using the allocator's // 'sharedSlots' // because if any physical slots have already failed, their shared slots have been removed // from the allocator's 'sharedSlots' by failed logical slots. // 创建批量请求,并检查是否成功注册 SharingPhysicalSlotRequestBulk bulk =createBulk(slots, executionsByGroup);
bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, allocationTimeout);return executionVertexIds.stream().map(assignments::get).collect(Collectors.toList());}
过程1:为节点分配共享组
(分配过程1. 使用 SlotSharingStrategy 将 exection 映射到 ExecutionSlotSharingGroups。)
这部分功能需要使用到 SlotSharingStrategy,其创建代码如下所示:
SlotSharingStrategy slotSharingStrategy =
slotSharingStrategyFactory.create(// 这里的 context 为 DefaultScheduler.DefaultExecutionSlotAllocationContext。
context.getSchedulingTopology(),
context.getLogicalSlotSharingGroups(),
context.getCoLocationGroups());
流程图:
代码:
privateMap<ExecutionVertexID,ExecutionSlotSharingGroup>build(){// 按照拓扑顺序,记录每个 JobVertes 的 ExecutionVertes finalLinkedHashMap<JobVertexID,List<SchedulingExecutionVertex>> allVertices =getExecutionVertices();// loop on job vertices so that an execution vertex will not be added into a group // if that group better fits another execution vertex for(List<SchedulingExecutionVertex> executionVertices : allVertices.values()){// 尝试根据 co-location 限制和上游节点找到共享组 finalList<SchedulingExecutionVertex> remaining =tryFindOptimalAvailableExecutionSlotSharingGroupFor(executionVertices);// 对上一步没找到共享组的节点,找到可用的或创建新的共享组 findAvailableOrCreateNewExecutionSlotSharingGroupFor(remaining);// 更新 constraintToExecutionSlotSharingGroupMap// 以便与该节点在同一 CoLocationGroup 的节点可以选择同一 slotupdateConstraintToExecutionSlotSharingGroupMap(executionVertices);}return executionSlotSharingGroupMap;}
说明:
- 根据 CoLocationGroup 寻找共享组: 每个节点的共享组信息会存储在 constraintToExecutionSlotSharingGroupMap 中,如果当前 CoLocationGroup 中已经有节点选择过共享组,则可从该映射中查找到这个共享组,进而将这个共享组分配给当前的节点。如果当前 CoLocationGroup 中没有节点选择过共享组,这一步就不会生效,之后当前节点分配到 slot 后,便会更新 constraintToExecutionSlotSharingGroupMap。
- 根据上游节点查找共享组: 首先获取与当前节点连接的上游的 ConsumedPartitionGroup。然后根据 ConsumedPartitionGroup,从名为 candidateGroupsForConsumedPartitionGroup 的数据结构中获取到所有上游节点使用的共享组,同时在该数据结构中维护当前节点的 ConsumedPartitionGroup 对应的共享组。从这些共享组中挑选一个可用的即可。怎么算可用,当前共享组中没有同一 JobVertex 的其它 ExecutionVertex 即可用。
- 从可用的共享组中选择一个: 会维护一个 availableGroupsForJobVertex,里面存储了所有可以用的共享组。一个 ExecutionVertex 获取一个共享组后,该共享组会从 availableGroupsForJobVertex 中相应的 JobVertex 对应的可用共享组集合中剔除。新建一个共享组后,该共享组会添加至 availableGroupsForJobVertex 中。
- 创建新的共享组: 根据该节点的资源描述文件创建一个 ExecutionSlotSharingGroup 类。
过程2:为共享组分配 slot
过程2.1:检查共享组是否有对应的 slot
这部分比较简单,不做过多讲述。
代码:
privateMap<ExecutionSlotSharingGroup,SharedSlot>tryAssignExistingSharedSlots(Set<ExecutionSlotSharingGroup> executionSlotSharingGroups){Map<ExecutionSlotSharingGroup,SharedSlot> assignedSlots =newHashMap<>(executionSlotSharingGroups.size());for(ExecutionSlotSharingGroup group : executionSlotSharingGroups){// 检查是否有对应的 slotSharedSlot sharedSlot = sharedSlots.get(group);if(sharedSlot !=null){
assignedSlots.put(group, sharedSlot);}}return assignedSlots;}
说明:
- 如果已经为共享组分配过 slot,相关信息会记录在 sharedSlots 中。那么,如果 sharedSlots 记录了相关信息,就说明共享组已经有对应的 slot,否则会在后面为其分配 slot,并把信息存入 sharedSlots,以避免重复分配。
过程2.2:对没有 slot 的共享组分配 slot
流程图:
代码:
privateMap<ExecutionSlotSharingGroup,SharedSlot>allocateSharedSlots(Set<ExecutionSlotSharingGroup> executionSlotSharingGroups,SharedSlotProfileRetriever sharedSlotProfileRetriever){List<PhysicalSlotRequest> slotRequests =newArrayList<>();Map<ExecutionSlotSharingGroup,SharedSlot> allocatedSlots =newHashMap<>();Map<SlotRequestId,ExecutionSlotSharingGroup> requestToGroup =newHashMap<>();Map<SlotRequestId,ResourceProfile> requestToPhysicalResources =newHashMap<>();// 对每个共享组创建 slot 请求 for(ExecutionSlotSharingGroup group : executionSlotSharingGroups){SlotRequestId physicalSlotRequestId =newSlotRequestId();// 使用 SharedSlotProfileRetriever 创建 slot 配置文件 ResourceProfile physicalSlotResourceProfile =getPhysicalSlotResourceProfile(group);SlotProfile slotProfile =
sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);// 创建 PhysicalSlot 请求 PhysicalSlotRequest request =newPhysicalSlotRequest(
physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);// 存储 slot 请求,以及该请求与共享组和物理资源配置的对应
slotRequests.add(request);
requestToGroup.put(physicalSlotRequestId, group);
requestToPhysicalResources.put(physicalSlotRequestId, physicalSlotResourceProfile);}// 对于上一步创建的所有请求 slotRequests,从 PhysicalSlotProvider 分配一个 PhysicalSlot Map<SlotRequestId,CompletableFuture<PhysicalSlotRequest.Result>> allocateResult =
slotProvider.allocatePhysicalSlots(slotRequests);// 根据返回的 PhysicalSlot 创建 SharedSlot
allocateResult.forEach((slotRequestId, resultCompletableFuture)->{ExecutionSlotSharingGroup group = requestToGroup.get(slotRequestId);// 获取新创建的 PhysicalSlot CompletableFuture<PhysicalSlot> physicalSlotFuture =
resultCompletableFuture.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);// 根据创建的新的 PhysicalSlot 创建共享组,并维护相关信息 SharedSlot slot =newSharedSlot(
slotRequestId,
requestToPhysicalResources.get(slotRequestId),
group,
physicalSlotFuture,
slotWillBeOccupiedIndefinitely,this::releaseSharedSlot);
allocatedSlots.put(group, slot);Preconditions.checkState(!sharedSlots.containsKey(group));
sharedSlots.put(group, slot);});return allocatedSlots;}
说明:
- 注意获取 PhysicalSlot 时,获取的是 physicalSlotFuture,这是一个 CompletableFuture,这个值会在创建 LogicalSlot 时获取到,以保证在 PhysicalSlot 请求完毕后再分配 LogicalSlot。
过程3:为节点分配 LogicalSlot
代码:
CompletableFuture<LogicalSlot>allocateLogicalSlot(ExecutionVertexID executionVertexId){Preconditions.checkArgument(
executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId),"Trying to allocate a logical slot for execution %s which is not in the ExecutionSlotSharingGroup",
executionVertexId);// 检查当前 executionVertex 是否已经分配过 LogicalSlot CompletableFuture<SingleLogicalSlot> logicalSlotFuture =
requestedLogicalSlots.getValueByKeyA(executionVertexId);if(logicalSlotFuture !=null){LOG.debug("Request for {} already exists",getLogicalSlotString(executionVertexId));}else{// 如果没分配过 LogicalSlot,则为其分配
logicalSlotFuture =allocateNonExistentLogicalSlot(executionVertexId);}return logicalSlotFuture.thenApply(Function.identity());}// 分配 slot 需要调用下面的函数privateCompletableFuture<SingleLogicalSlot>allocateNonExistentLogicalSlot(ExecutionVertexID executionVertexId){CompletableFuture<SingleLogicalSlot> logicalSlotFuture;SlotRequestId logicalSlotRequestId =newSlotRequestId();String logMessageBase =getLogicalSlotString(logicalSlotRequestId, executionVertexId);LOG.debug("Request a {}", logMessageBase);// 分配逻辑 slot // 等待 PhysicalSlot 申请完成,便创建 LogicalSlot 并返回
logicalSlotFuture =
slotContextFuture.thenApply(
physicalSlot ->{LOG.debug("Allocated {}", logMessageBase);returncreateLogicalSlot(physicalSlot, logicalSlotRequestId);});// 记录已分配的逻辑 slot
requestedLogicalSlots.put(executionVertexId, logicalSlotRequestId, logicalSlotFuture);// If the physical slot request fails (slotContextFuture), it will also fail the logicalSlotFuture. // Therefore, the next `exceptionally` callback will call removeLogicalSlotRequest and do // the cleanup in requestedLogicalSlots and eventually in sharedSlots // 逻辑 slot 分配异常时执行
logicalSlotFuture.exceptionally(
cause ->{LOG.debug("Failed {}", logMessageBase, cause);removeLogicalSlotRequest(logicalSlotRequestId);returnnull;});return logicalSlotFuture;}
说明:
- requestedLogicalSlots 是一个具有两个键的映射结构,键为 ExecutionVertexID 和 SlotRequestId,值为 LogicalSlot。已分配的 LogicalSlot 都会记录在这个数据结构中。
- 这一步就是为每一个 ExecutionVertex 创建一个 SingleLogicalSlot 对象。
版权归原作者 北_鱼 所有, 如有侵权,请联系我们删除。