0


Trino:分区表上的SQL提交 & 查询流程浅析

Trino SQL执行过程的关键特性

  1. ClientCoordinatorWorker之间的通讯,基于HTTP协议
  2. SQL提交、解析、调度、执行等的流程全异步,最大化运行效率。
  3. 逻辑计划树被在Coordinator侧被拆分成PlanFragment,可以对应于Spark中的Stage概念,会通过StageScheduler被调度。
  4. 一个PlanFragment对应一个Stage,而一个Stage对应一个或多个Data Partition;一个Data Partition对应一个SqlTask,它是在Worker结点上处理数据的实体(容器)。
  5. 一个Partition被拆分成一个或多个Split,是任务调度的最小单元,由Coordinator生成且数量确定。
  6. 一个Split会由Worker负责处理,而一个Split会被分割成一个或多个Page,它是数据处理的最小单元。
  7. Worker可以并行处理Split,且按时间片的调度线程处理Split,如果执行线程超过时间片的限额或由于其它原因被阻塞,则线程会主动放弃当前Split的执行。(具体过程会在另外的文章中分析)
  8. Block是Trino集群中数据传输的最小单元,以列式的格式存储和压缩。
  9. 默认情况下,如果资源足够,一个SQL作业的所有Stage会被同时调度和创建,此时作业的处理模型类似Flink,是一个流式的过程,一旦有一个Source Split生成,就可以流经所有的Stage产生最终的输出结果
  10. 默认情况下,所有的SQL作业共用一个Resource Group,一旦有一个可用的Worker,就会调度作业执行。
  11. SQL执行期间,每一个SqlTask的执行都是全内存的,意味着无法进行故障恢复,因此一旦某个任务失败,则最终整个作业的会失败。

SQL提交&注册

通过

/v1/statement/queued

API向coordinator提交新的Query,会首先将此query放入QueryManager的缓存池中,然后返回给客户端下一次应该访问的地址。
客户端提交SQL成功后,会立即调用

queued/{queryId}/{slug}/{token}

REST API,轮询SQL的执行状态。

publicclassQueuedStatementResource{@ResourceSecurity(AUTHENTICATED_USER)@POST@Produces(APPLICATION_JSON)publicResponsepostStatement(String statement,@ContextHttpServletRequest servletRequest,@ContextHttpHeaders httpHeaders,@ContextUriInfo uriInfo){if(isNullOrEmpty(statement)){throwbadRequest(BAD_REQUEST,"SQL statement is empty");}// 注册新的query这里仅仅是创建Query实例,并添加到QueryManager的缓存池中Query query =registerQuery(statement, servletRequest, httpHeaders);returncreateQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo));}privateQueryregisterQuery(String statement,HttpServletRequest servletRequest,HttpHeaders httpHeaders){Optional<String> remoteAddress =Optional.ofNullable(servletRequest.getRemoteAddr());Optional<Identity> identity =Optional.ofNullable((Identity) servletRequest.getAttribute(AUTHENTICATED_IDENTITY));MultivaluedMap<String,String> headers = httpHeaders.getRequestHeaders();SessionContext sessionContext = sessionContextFactory.createSessionContext(headers, alternateHeaderName, remoteAddress, identity);// 创建一个SQL实例,维护当前SQL生命周期内的各种信息Query query =newQuery(statement, sessionContext, dispatchManager, queryInfoUrlFactory);// 将Query实例注册到QueryManager
        queryManager.registerQuery(query);// let authentication filter know that identity lifecycle has been handed off
        servletRequest.setAttribute(AUTHENTICATED_IDENTITY,null);return query;}}

Query类

维护SQL运行时状态,可以通过此类获取SQL运行期间的状态信息;同时也负责与Client交互,提供对SQL任务的管理能力。

privatestaticfinalclassQuery{privatefinalString query;privatefinalSessionContext sessionContext;privatefinalDispatchManager dispatchManager;privatefinalQueryId queryId;privatefinalOptional<URI> queryInfoUrl;privatefinalSlug slug =Slug.createNew();privatefinalAtomicLong lastToken =newAtomicLong();privatefinallong initTime =System.nanoTime();privatefinalAtomicReference<Boolean> submissionGate =newAtomicReference<>();privatefinalSettableFuture<Void> creationFuture =SettableFuture.create();publicQuery(String query,SessionContext sessionContext,DispatchManager dispatchManager,QueryInfoUrlFactory queryInfoUrlFactory){this.query =requireNonNull(query,"query is null");this.sessionContext =requireNonNull(sessionContext,"sessionContext is null");this.dispatchManager =requireNonNull(dispatchManager,"dispatchManager is null");this.queryId = dispatchManager.createQueryId();requireNonNull(queryInfoUrlFactory,"queryInfoUrlFactory is null");this.queryInfoUrl = queryInfoUrlFactory.getQueryInfoUrl(queryId);}publicbooleanisCreated(){return creationFuture.isDone();}privateListenableFuture<Void>waitForDispatched(){// 只能调用`queued/{queryId}/{slug}/{token}` REST API,获取SQL任务的状态时,才会调用此方法,触发当前SQL任务的提交submitIfNeeded();if(!creationFuture.isDone()){returnnonCancellationPropagating(creationFuture);}// otherwise, wait for the query to finishreturn dispatchManager.waitForDispatched(queryId);}privatevoidsubmitIfNeeded(){if(submissionGate.compareAndSet(null,true)){// 尝试向dispatcherManager提交一个SQL任务
                creationFuture.setFuture(dispatchManager.createQuery(queryId, slug, sessionContext, query));}}publicQueryResultsgetQueryResults(long token,UriInfo uriInfo){// 客户端获取结果}publicvoidcancel(){
            creationFuture.addListener(()-> dispatchManager.cancelQuery(queryId),directExecutor());}publicvoiddestroy(){
            sessionContext.getIdentity().destroy();}}

QueuedStatementResource.QueryManager类

负责维护所有活着的Query实例,为REST API提供快速获取Query功能;同时也负责检查客户端提交超时逻辑,详见tryAbandonSubmissionWithTimeout(clientTimeout)的检查条件。
对于Server来说,只有触发了

Query::waitForDispatched()

方法,才将任务的状态设置为

submitted

。那如果客户端提交一个SQL执行后失联,肯定不会再调用REST API获取SQL的执行状态了,因此就不可能触发这个方法,这个期间段就被算作提交时间。

@ThreadSafeprivatestaticclassQueryManager{privatefinalConcurrentMap<QueryId,Query> queries =newConcurrentHashMap<>();privatefinalScheduledExecutorService scheduledExecutorService =newSingleThreadScheduledExecutor(daemonThreadsNamed("drain-state-query-manager"));privatefinalDuration querySubmissionTimeout;publicQueryManager(Duration querySubmissionTimeout){this.querySubmissionTimeout =requireNonNull(querySubmissionTimeout,"querySubmissionTimeout is null");}publicvoidinitialize(DispatchManager dispatchManager){
            scheduledExecutorService.scheduleWithFixedDelay(()->syncWith(dispatchManager),200,200,MILLISECONDS);}privatevoidsyncWith(DispatchManager dispatchManager){
            queries.forEach((queryId, query)->{if(shouldBePurged(dispatchManager, query)){removeQuery(queryId);}});}privatebooleanshouldBePurged(DispatchManager dispatchManager,Query query){if(query.isSubmissionAbandoned()){// Query submission was explicitly abandonedreturntrue;}if(query.tryAbandonSubmissionWithTimeout(querySubmissionTimeout)){// Query took too long to be submitted by the clientreturntrue;}if(query.isCreated()&&!dispatchManager.isQueryRegistered(query.getQueryId())){// Query was created in the DispatchManager, and DispatchManager has already purged the queryreturntrue;}returnfalse;}privatevoidremoveQuery(QueryId queryId){Optional.ofNullable(queries.remove(queryId)).ifPresent(QueryManager::destroyQuietly);}publicvoidregisterQuery(Query query){Query existingQuery = queries.putIfAbsent(query.getQueryId(), query);checkState(existingQuery ==null,"Query already registered");}@NullablepublicQuerygetQuery(QueryId queryId){return queries.get(queryId);}}

Query实例的调度

只有当前客户端尝试获取SQL的执行状态时,才会触发SQL任务的提交,提交到。

publicclassDispatchManager{publicListenableFuture<Void>createQuery(QueryId queryId,Slug slug,SessionContext sessionContext,String query){requireNonNull(queryId,"queryId is null");requireNonNull(sessionContext,"sessionContext is null");requireNonNull(query,"query is null");checkArgument(!query.isEmpty(),"query must not be empty string");checkArgument(queryTracker.tryGetQuery(queryId).isEmpty(),"query %s already exists", queryId);// It is important to return a future implementation which ignores cancellation request.// Using NonCancellationPropagatingFuture is not enough; it does not propagate cancel to wrapped future// but it would still return true on call to isCancelled() after cancel() is called on it.DispatchQueryCreationFuture queryCreationFuture =newDispatchQueryCreationFuture();// 异步创建
        dispatchExecutor.execute(()->{try{createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);}finally{
                queryCreationFuture.set(null);}});return queryCreationFuture;}private<C>voidcreateQueryInternal(QueryId queryId,Slug slug,SessionContext sessionContext,String query,ResourceGroupManager<C> resourceGroupManager){Session session =null;PreparedQuery preparedQuery =null;try{if(query.length()> maxQueryLength){int queryLength = query.length();
                query = query.substring(0, maxQueryLength);thrownewTrinoException(QUERY_TEXT_TOO_LARGE,format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength));}// decode session
            session = sessionSupplier.createSession(queryId, sessionContext);// check query execute permissions
            accessControl.checkCanExecuteQuery(sessionContext.getIdentity());// prepare query// 对用户SQL进行Parsing,产生AST实例
            preparedQuery = queryPreparer.prepareQuery(session, query);// select resource groupOptional<String> queryType =getQueryType(preparedQuery.getStatement()).map(Enum::name);// 如果没有配置ResourceGroup的分配策略,则默认会将当前SQL分析到全局队列中,所有的SQL共享集群SelectionContext<C> selectionContext = resourceGroupManager.selectGroup(newSelectionCriteria(
                    sessionContext.getIdentity().getPrincipal().isPresent(),
                    sessionContext.getIdentity().getUser(),
                    sessionContext.getIdentity().getGroups(),
                    sessionContext.getSource(),
                    sessionContext.getClientTags(),
                    sessionContext.getResourceEstimates(),
                    queryType));// apply system default session properties (does not override user set properties)
            session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, queryType, selectionContext.getResourceGroupId());// mark existing transaction as active
            transactionManager.activateTransaction(session,isTransactionControlStatement(preparedQuery.getStatement()), accessControl);// 将query和preparedQuery封装成一个DispatchQuery实例,实际上是一个LocalDispatchQuery类的实例,这个过程是异步的。// 它提供了如下的方法,帮助上层获取任务的调度状态。//     ListenableFuture<Void> getDispatchedFuture();//     DispatchInfo getDispatchInfo();//// Trino中SQL执行的每一个阶段基本上都是异步的,为了能够在异步情况下正确管理Query的生命周期,都需要在相应的阶段创建一个// 对应的实例,例如这里的DispatchQuery。DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(
                    session,
                    query,
                    preparedQuery,
                    slug,
                    selectionContext.getResourceGroupId());// DispatchQuery一旦创建成功,就会将这个对象添加到QueryTracker对象中的,由它管理SQL的执行生命周期boolean queryAdded =queryCreated(dispatchQuery);if(queryAdded &&!dispatchQuery.isDone()){// 如果SQL成功被添加进了QueryTracker,但是dispatchQuery还没有完成创建,则先将它放进提交到resource group中,等待被调度try{
                    resourceGroupManager.submit(dispatchQuery, selectionContext, dispatchExecutor);}catch(Throwable e){// dispatch query has already been registered, so just fail it directly
                    dispatchQuery.fail(e);}}}catch(Throwable throwable){// creation must never fail, so register a failed query in this caseif(session ==null){
                session =Session.builder(sessionPropertyManager).setQueryId(queryId).setIdentity(sessionContext.getIdentity()).setSource(sessionContext.getSource().orElse(null)).build();}// 如果发生了任务异常,会创建一个FailedDispatchQuery的实例,记录失败的种信息。Optional<String> preparedSql =Optional.ofNullable(preparedQuery).flatMap(PreparedQuery::getPrepareSql);DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, preparedSql,Optional.empty(), throwable);queryCreated(failedDispatchQuery);}}}

创建LocalDispatchQuery实例

publicclassLocalDispatchQueryFactoryimplementsDispatchQueryFactory{@OverridepublicDispatchQuerycreateDispatchQuery(Session session,String query,PreparedQuery preparedQuery,Slug slug,ResourceGroupId resourceGroup){WarningCollector warningCollector = warningCollectorFactory.create();// 为新提交的Query实例,创建一个新的状态机QueryStateMachine stateMachine =QueryStateMachine.begin(
                query,
                preparedQuery.getPrepareSql(),
                session,
                locationFactory.createQueryLocation(session.getQueryId()),
                resourceGroup,isTransactionControlStatement(preparedQuery.getStatement()),
                transactionManager,
                accessControl,
                executor,
                metadata,
                warningCollector,getQueryType(preparedQuery.getStatement()));// It is important that `queryCreatedEvent` is called here. Moving it past the `executor.submit` below// can result in delivering query-created event after query analysis has already started.// That can result in misbehaviour of plugins called during analysis phase (e.g. access control auditing)// which depend on the contract that event was already delivered.//// Note that for immediate and in-order delivery of query events we depend on synchronous nature of// QueryMonitor and EventListenerManager.
        queryMonitor.queryCreatedEvent(stateMachine.getBasicQueryInfo(Optional.empty()));// 异步的方式,创建QueryExecution实例,实际上是SqlQueryExecution的实例ListenableFuture<QueryExecution> queryExecutionFuture = executor.submit(()->{QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(preparedQuery.getStatement().getClass());if(queryExecutionFactory ==null){thrownewTrinoException(NOT_SUPPORTED,"Unsupported statement type: "+ preparedQuery.getStatement().getClass().getSimpleName());}try{// 创建return queryExecutionFactory.createQueryExecution(preparedQuery, stateMachine, slug, warningCollector);}catch(Throwable e){if(e instanceofError){if(e instanceofStackOverflowError){
                        log.error(e,"Unhandled StackOverFlowError; should be handled earlier; to investigate full stacktrace you may need to enable -XX:MaxJavaStackTraceDepth=0 JVM flag");}else{
                        log.error(e,"Unhandled Error");}// wrapping as RuntimeException to guard us from problem that code downstream which investigates queryExecutionFuture may not necessarily handle// Error subclass of Throwable well.RuntimeException wrappedError =newRuntimeException(e);
                    stateMachine.transitionToFailed(wrappedError);throw wrappedError;}
                stateMachine.transitionToFailed(e);throw e;}});// 返回LocalDispatchQuery的实例,可以看到这个实例,会有接收queryExecutionFuture变量,意味着只有当queryExecutionFuture.isDone()时,// 才标识着此实例创建完成。returnnewLocalDispatchQuery(
                stateMachine,
                queryExecutionFuture,
                queryMonitor,
                clusterSizeMonitor,
                executor,// queryManager是一个SqlQueryManager的实例对象,它内部维护着QueryTracker的引用,因此可以在更上层管理SQL任务的生命周期
                queryManager::createQuery);}}

创建SqlQueryExecution实例

通过SqlQueryExecutionFactory.createQueryExecution()创建对象。

@ThreadSafepublicclassSqlQueryExecutionimplementsQueryExecution{privateSqlQueryExecution(PreparedQuery preparedQuery,QueryStateMachine stateMachine,Slug slug,PlannerContext plannerContext,AnalyzerFactory analyzerFactory,SplitSourceFactory splitSourceFactory,NodePartitioningManager nodePartitioningManager,NodeScheduler nodeScheduler,List<PlanOptimizer> planOptimizers,PlanFragmenter planFragmenter,RemoteTaskFactory remoteTaskFactory,int scheduleSplitBatchSize,ExecutorService queryExecutor,ScheduledExecutorService schedulerExecutor,FailureDetector failureDetector,NodeTaskMap nodeTaskMap,ExecutionPolicy executionPolicy,SplitSchedulerStats schedulerStats,StatsCalculator statsCalculator,CostCalculator costCalculator,DynamicFilterService dynamicFilterService,WarningCollector warningCollector,TableExecuteContextManager tableExecuteContextManager,TypeAnalyzer typeAnalyzer,TaskManager coordinatorTaskManager){try(SetThreadName ignored =newSetThreadName("Query-%s", stateMachine.getQueryId())){this.slug =requireNonNull(slug,"slug is null");this.plannerContext =requireNonNull(plannerContext,"plannerContext is null");this.splitSourceFactory =requireNonNull(splitSourceFactory,"splitSourceFactory is null");this.nodePartitioningManager =requireNonNull(nodePartitioningManager,"nodePartitioningManager is null");this.nodeScheduler =requireNonNull(nodeScheduler,"nodeScheduler is null");this.planOptimizers =requireNonNull(planOptimizers,"planOptimizers is null");this.planFragmenter =requireNonNull(planFragmenter,"planFragmenter is null");this.queryExecutor =requireNonNull(queryExecutor,"queryExecutor is null");this.schedulerExecutor =requireNonNull(schedulerExecutor,"schedulerExecutor is null");this.failureDetector =requireNonNull(failureDetector,"failureDetector is null");this.nodeTaskMap =requireNonNull(nodeTaskMap,"nodeTaskMap is null");this.executionPolicy =requireNonNull(executionPolicy,"executionPolicy is null");this.schedulerStats =requireNonNull(schedulerStats,"schedulerStats is null");this.statsCalculator =requireNonNull(statsCalculator,"statsCalculator is null");this.costCalculator =requireNonNull(costCalculator,"costCalculator is null");this.dynamicFilterService =requireNonNull(dynamicFilterService,"dynamicFilterService is null");this.tableExecuteContextManager =requireNonNull(tableExecuteContextManager,"tableExecuteContextManager is null");checkArgument(scheduleSplitBatchSize >0,"scheduleSplitBatchSize must be greater than 0");this.scheduleSplitBatchSize = scheduleSplitBatchSize;// 保存状态机的引用this.stateMachine =requireNonNull(stateMachine,"stateMachine is null");// analyze query// preparedQuery保存了SQL文本Parsing后的Statement(AST),因此这里基于此对象,对AST进行解析this.analysis =analyze(preparedQuery, stateMachine, warningCollector, analyzerFactory);// 向状态机注册Listener,一旦状态机的状态被设置为完成状态,就注销dynamicFilterService服务,这个服务的任务会在其它文章中详解。
            stateMachine.addStateChangeListener(state ->{if(!state.isDone()){return;}unregisterDynamicFilteringQuery(
                        dynamicFilterService.getDynamicFilteringStats(stateMachine.getQueryId(), stateMachine.getSession()));

                tableExecuteContextManager.unregisterTableExecuteContextForQuery(stateMachine.getQueryId());});// when the query finishes cache the final query info, and clear the reference to the output stageAtomicReference<SqlQueryScheduler> queryScheduler =this.queryScheduler;
            stateMachine.addStateChangeListener(state ->{if(!state.isDone()){return;}// query is now done, so abort any work that is still running// 失败是完成状态的一种SqlQueryScheduler scheduler = queryScheduler.get();if(scheduler !=null){
                    scheduler.abort();}});this.remoteTaskFactory =newMemoryTrackingRemoteTaskFactory(requireNonNull(remoteTaskFactory,"remoteTaskFactory is null"), stateMachine);this.typeAnalyzer =requireNonNull(typeAnalyzer,"typeAnalyzer is null");this.coordinatorTaskManager =requireNonNull(coordinatorTaskManager,"coordinatorTaskManager is null");}}}

LocalDispatchQuery的创建及执行

实际上就是QueryExecution实例的执行,进入这个过程,实际上还需要经过ResourceGroup的筛选,筛选细节不是这里的重点,
因此略过,只需要知道ResourceGroup最终会调用

LocalDispatchQuery::startWaitingForResources

方法。

资源检测

publicclassLocalDispatchQueryimplementsDispatchQuery{@OverridepublicvoidstartWaitingForResources(){// 将状态机的状态设置为WAITING_RESOURCESif(stateMachine.transitionToWaitingForResources()){waitForMinimumWorkers();}}privatevoidwaitForMinimumWorkers(){// 只有当有足够的Workers结点时,才会开始执行queryExecution实例,但由于我们没有修改默认的参数// 因此这里的限制条件是,一旦有1个Worker可用,就会触发startExecution(queryExecution)的调用。// wait for query execution to finish constructionaddSuccessCallback(queryExecutionFuture, queryExecution ->{Session session = stateMachine.getSession();int executionMinCount =1;// always wait for 1 node to be upif(queryExecution.shouldWaitForMinWorkers()){
                executionMinCount =getRequiredWorkers(session);}ListenableFuture<Void> minimumWorkerFuture = clusterSizeMonitor.waitForMinimumWorkers(executionMinCount,getRequiredWorkersMaxWait(session));// when worker requirement is met, start the executionaddSuccessCallback(minimumWorkerFuture,()->startExecution(queryExecution));addExceptionCallback(minimumWorkerFuture, throwable -> queryExecutor.execute(()-> stateMachine.transitionToFailed(throwable)));// cancel minimumWorkerFuture if query fails for some reason or is cancelled by user
            stateMachine.addStateChangeListener(state ->{if(state.isDone()){
                    minimumWorkerFuture.cancel(true);}});});}privatevoidstartExecution(QueryExecution queryExecution){
        queryExecutor.execute(()->{// 将状态机的状态设置为DISPATCHINGif(stateMachine.transitionToDispatching()){try{// 提交给querySubmitter,就是在前面提到的queryManager::createQuery方法,最终会路由到SqlQueryExecution::start方法
                    querySubmitter.accept(queryExecution);if(notificationSentOrGuaranteed.compareAndSet(false,true)){
                        queryExecution.addFinalQueryInfoListener(queryMonitor::queryCompletedEvent);}}catch(Throwable t){// this should never happen but be safe
                    stateMachine.transitionToFailed(t);
                    log.error(t,"query submitter threw exception");throw t;}finally{
                    submitted.set(null);}}});}}

SqlQueryExecution::start()

@ThreadSafepublicclassSqlQueryExecutionimplementsQueryExecution{@Overridepublicvoidstart(){try(SetThreadName ignored =newSetThreadName("Query-%s", stateMachine.getQueryId())){try{// 将状态机的状态设置为PlANNINGif(!stateMachine.transitionToPlanning()){// query already started or finishedreturn;}// 启动监听线程,一旦在发现状态机的状态处理失败状态,则强制中止PLANNINGAtomicReference<Thread> planningThread =newAtomicReference<>(currentThread());
                stateMachine.getStateChange(PLANNING).addListener(()->{if(stateMachine.getQueryState()==FAILED){synchronized(this){Thread thread = planningThread.get();if(thread !=null){
                                thread.interrupt();}}}},directExecutor());try{// 优化逻辑计划树,并切分为PlanFragments,以便能够调度Plan片段执行PlanRoot plan =planQuery();// DynamicFilterService needs plan for query to be registered.// Query should be registered before dynamic filter suppliers are requested in distribution planning.// 注册动态裁剪服务registerDynamicFilteringQuery(plan);// 调度plan执行,内部会创建SqlQueryScheduler实例,负责调度PlanFragments的分发和状态管理,这个过程是异步的planDistribution(plan);}finally{synchronized(this){
                        planningThread.set(null);// Clear the interrupted flag in case there was a race condition where// the planning thread was interrupted right after planning completes aboveThread.interrupted();}}

                tableExecuteContextManager.registerTableExecuteContextForQuery(getQueryId());// 将状态机的状态设置为STARTINGif(!stateMachine.transitionToStarting()){// query already started or finishedreturn;}// if query is not finished, start the scheduler, otherwise cancel itSqlQueryScheduler scheduler = queryScheduler.get();if(!stateMachine.isDone()){// 调用SqlQueryScheduler::start()方法,开始调度执行
                    scheduler.start();}}catch(Throwable e){fail(e);throwIfInstanceOf(e,Error.class);}}}}

SqlQueryExecution::planDistribution

@ThreadSafepublicclassSqlQueryExecutionimplementsQueryExecution{privatevoidplanDistribution(PlanRoot plan){// if query was canceled, skip creating schedulerif(stateMachine.isDone()){return;}// record output fieldPlanFragment rootFragment = plan.getRoot().getFragment();
        stateMachine.setColumns(((OutputNode) rootFragment.getRoot()).getColumnNames(),
                rootFragment.getTypes());// build the stage execution objects (this doesn't schedule execution)SqlQueryScheduler scheduler =newSqlQueryScheduler(
                stateMachine,
                plan.getRoot(),
                nodePartitioningManager,
                nodeScheduler,
                remoteTaskFactory,
                plan.isSummarizeTaskInfos(),
                scheduleSplitBatchSize,
                queryExecutor,
                schedulerExecutor,
                failureDetector,
                nodeTaskMap,
                executionPolicy,
                schedulerStats,
                dynamicFilterService,
                tableExecuteContextManager,
                plannerContext.getMetadata(),
                splitSourceFactory,
                coordinatorTaskManager);

        queryScheduler.set(scheduler);// if query was canceled during scheduler creation, abort the scheduler// directly since the callback may have already firedif(stateMachine.isDone()){
            scheduler.abort();
            queryScheduler.set(null);}}}
生成StageManager实例,同时为每一个PlanFragment生成SqlStage实例

SqlStage负责维护跟踪所有归属它的任务的生命周期管理,以及状态维护

publicclassSqlQueryScheduler{privatestaticclassStageManager{privatestaticStageManagercreate(QueryStateMachine queryStateMachine,Session session,Metadata metadata,RemoteTaskFactory taskFactory,NodeTaskMap nodeTaskMap,ExecutorService executor,SplitSchedulerStats schedulerStats,SubPlan planTree,boolean summarizeTaskInfo){ImmutableMap.Builder<StageId,SqlStage> stages =ImmutableMap.builder();ImmutableList.Builder<SqlStage> coordinatorStagesInTopologicalOrder =ImmutableList.builder();ImmutableList.Builder<SqlStage> distributedStagesInTopologicalOrder =ImmutableList.builder();StageId rootStageId =null;ImmutableMap.Builder<StageId,Set<StageId>> children =ImmutableMap.builder();ImmutableMap.Builder<StageId,StageId> parents =ImmutableMap.builder();// 从Root Plan自顶向下、广度优先遍历,获取所有的SubPlansfor(SubPlan planNode :Traverser.forTree(SubPlan::getChildren).breadthFirst(planTree)){PlanFragment fragment = planNode.getFragment();// 一个SubPlan或是PlanFragment就是一个Stage(同Spark中的概念相近),StageId的取值为{queryId}-{fragmentId}SqlStage stage =createSqlStage(getStageId(session.getQueryId(), fragment.getId()),
                        fragment,extractTableInfo(session, metadata, fragment),
                        taskFactory,
                        session,
                        summarizeTaskInfo,
                        nodeTaskMap,
                        executor,
                        schedulerStats);StageId stageId = stage.getStageId();
                stages.put(stageId, stage);// 以拓扑序,维护所有的Stagesif(fragment.getPartitioning().isCoordinatorOnly()){
                    coordinatorStagesInTopologicalOrder.add(stage);}else{
                    distributedStagesInTopologicalOrder.add(stage);}// 由于外层遍历是自顶向下的,因此每一个Stage就是最上游的Stage,即root stageif(rootStageId ==null){
                    rootStageId = stageId;}// 维护Stages之间的依赖关系Set<StageId> childStageIds = planNode.getChildren().stream().map(childStage ->getStageId(session.getQueryId(), childStage.getFragment().getId())).collect(toImmutableSet());
                children.put(stageId, childStageIds);
                childStageIds.forEach(child -> parents.put(child, stageId));}StageManager stageManager =newStageManager(
                    queryStateMachine,
                    stages.build(),
                    coordinatorStagesInTopologicalOrder.build(),
                    distributedStagesInTopologicalOrder.build(),
                    rootStageId,
                    children.build(),
                    parents.build());
            stageManager.initialize();return stageManager;}}}

SqlQueryScheduler::start()

publicclassSqlQueryScheduler{publicsynchronizedvoidstart(){if(started){return;}
        started =true;if(queryStateMachine.isDone()){return;}// when query is done or any time a stage completes, attempt to transition query to "final query info ready"
        queryStateMachine.addStateChangeListener(state ->{if(!state.isDone()){return;}DistributedStagesScheduler distributedStagesScheduler;// synchronize to wait on distributed scheduler creation if it is currently in processsynchronized(this){
                distributedStagesScheduler =this.distributedStagesScheduler.get();}if(state ==QueryState.FINISHED){// 如果状态机的状态被设置为FINISHED,就取消所有正在调度的Stages
                coordinatorStagesScheduler.cancel();if(distributedStagesScheduler !=null){
                    distributedStagesScheduler.cancel();}// 通过StageManager完成
                stageManager.finish();}elseif(state ==QueryState.FAILED){
                coordinatorStagesScheduler.abort();if(distributedStagesScheduler !=null){
                    distributedStagesScheduler.abort();}
                stageManager.abort();}

            queryStateMachine.updateQueryInfo(Optional.ofNullable(getStageInfo()));});// 调度Stages执行
        coordinatorStagesScheduler.schedule();Optional<DistributedStagesScheduler> distributedStagesScheduler =createDistributedStagesScheduler(currentAttempt.get());
        distributedStagesScheduler.ifPresent(scheduler -> distributedStagesSchedulingTask = executor.submit(scheduler::schedule,null));}}

DistributedStagesScheduler::schedule()

DistributedStagesScheduler的创建

负责调度所有的SqlStages执行。

publicstaticPipelinedDistributedStagesSchedulercreate(QueryStateMachine queryStateMachine,// Query级别的状态机SplitSchedulerStats schedulerStats,// 记录Splits的调度信息NodeScheduler nodeScheduler,// 负责为Split分配合适的Worker NodeNodePartitioningManager nodePartitioningManager,// 提供获取对数据页Page进行Partitioning相关信息StageManager stageManager,// 管理所有的StagesCoordinatorStagesScheduler coordinatorStagesScheduler,// 负责调度所有的Stages到Coordinator结点ExecutionPolicy executionPolicy,// 执行策略器,AllAtOnceExecutionPolicy和PhasedExecutionPolicyFailureDetector failureDetector,ScheduledExecutorService executor,// Stages调度时的线程池SplitSourceFactory splitSourceFactory,// 创建Source Splits的工厂类int splitBatchSize,// 一次调度的最大Splits数量DynamicFilterService dynamicFilterService,TableExecuteContextManager tableExecuteContextManager,RetryPolicy retryPolicy,int attempt){// 由于DistributedStagesScheduler是负责Stages的调度器,这有别与QueryStateMachine的状态,因此这里要创建一个独立的状态机// 负责维护PipelinedDistributedStagesScheduler的状态DistributedStagesSchedulerStateMachine stateMachine =newDistributedStagesSchedulerStateMachine(queryStateMachine.getQueryId(), executor);// 使用Map以PartitioningHandle缓存所有的NodePartitionMap实例,由于PlanFragment对应的PartitioningHandle实例相同// 因此可以避免干次生成NodePartitionMap实例Map<PartitioningHandle,NodePartitionMap> partitioningCacheMap =newHashMap<>();// 根据具体的Connector提供的PartitioningHandle,生成NodePartitonMap://     NodePartitonMap记录了WorkerNode -> PartitionId的映射关系,它的生成可以由Connector提供,//     例如IcebergPartitioningHandle,也可以使用系统默认的实现SystemPartitioningHandle。// 如何生成NodePartitionMap实例,见后面的子章节。Function<PartitioningHandle,NodePartitionMap> partitioningCache = partitioningHandle ->
                    partitioningCacheMap.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap(queryStateMachine.getSession(), handle));// 为每一个PlanFragment实例,创建Bucket -> PartitionId的映射// Butcket即桶,类似Hive中的Bucket概念,是对一个数据Partition中的数据的进一步细化,因此一个Partition会包含多个bucketsMap<PlanFragmentId,Optional<int[]>> bucketToPartitionMap =createBucketToPartitionMap(
                    coordinatorStagesScheduler.getBucketToPartitionForStagesConsumedByCoordinator(),
                    stageManager,
                    partitioningCache);// 为每一个PlanFragment创建OutputBufferManager实例,用于创建和维护这个Fragment的输出缓存区// OutputBufferManager分根据PartitioningHandle的不同类型,创建不一样的OutputBuffers,一共有如下三种:// BufferType type = //   if partitioningHandle.equals(FIXED_BROADCAST_DISTRIBUTION) then BROADCAST;//   else if (partitioningHandle.equals(FIXED_ARBITRARY_DISTRIBUTION) then ARBITRARY;//   else PARTITIONED;Map<PlanFragmentId,OutputBufferManager> outputBufferManagers =createOutputBufferManagers(
                    coordinatorStagesScheduler.getOutputBuffersForStagesConsumedByCoordinator(),
                    stageManager,
                    bucketToPartitionMap);TaskLifecycleListener coordinatorTaskLifecycleListener = coordinatorStagesScheduler.getTaskLifecycleListener();if(retryPolicy !=RetryPolicy.NONE){// when retries are enabled only close exchange clients on coordinator when the query is finishedTaskLifecycleListenerBridge taskLifecycleListenerBridge =newTaskLifecycleListenerBridge(coordinatorTaskLifecycleListener);
                coordinatorTaskLifecycleListener = taskLifecycleListenerBridge;
                stateMachine.addStateChangeListener(state ->{if(state ==DistributedStagesSchedulerState.FINISHED){
                        taskLifecycleListenerBridge.notifyNoMoreSourceTasks();}});}// 为所有的要调度的Stages创建对应的PipelinedStageExecution实例,每一个PipelinedStageExecution实例则负责各自的Stage的生命周期管理Map<StageId,PipelinedStageExecution> stageExecutions =newHashMap<>();for(SqlStage stage : stageManager.getDistributedStagesInTopologicalOrder()){Optional<SqlStage> parentStage = stageManager.getParent(stage.getStageId());// TaskLifecycleListener提供了为Stage创建任务的接口TaskLifecycleListener taskLifecycleListener;if(parentStage.isEmpty()|| parentStage.get().getFragment().getPartitioning().isCoordinatorOnly()){// output will be consumed by coordinator// parentStage是Root或是PlanFragment的分区策略是仅位于Coordiantor时,设置这个Stage的生命周期为Coordiator
                    taskLifecycleListener = coordinatorTaskLifecycleListener;}else{// 非Root Stage时,则获取已经绑定了的实例StageId parentStageId = parentStage.get().getStageId();PipelinedStageExecution parentStageExecution =requireNonNull(stageExecutions.get(parentStageId),()->"execution is null for stage: "+ parentStageId);
                    taskLifecycleListener = parentStageExecution.getTaskLifecycleListener();}PlanFragment fragment = stage.getFragment();// 创建PipelinedStageExecution,负责执行调度&执行当前Stage,会为每一个Partition创建RemoteTask实例,并调度到相应的Worker Node执行PipelinedStageExecution stageExecution =createPipelinedStageExecution(
                        stageManager.get(fragment.getId()),
                        outputBufferManagers,
                        taskLifecycleListener,
                        failureDetector,
                        executor,
                        bucketToPartitionMap.get(fragment.getId()),
                        attempt);
                stageExecutions.put(stage.getStageId(), stageExecution);}ImmutableMap.Builder<StageId,StageScheduler> stageSchedulers =ImmutableMap.builder();for(PipelinedStageExecution stageExecution : stageExecutions.values()){List<PipelinedStageExecution> children = stageManager.getChildren(stageExecution.getStageId()).stream().map(stage ->requireNonNull(stageExecutions.get(stage.getStageId()),()->"stage execution not found for stage: "+ stage)).collect(toImmutableList());// 每一个StageExecution实例,创建对应的StageScheduler实例,负责当前Stage的调度执行,Trino实现了几个不同实现类://  FixedSourcePartitionedScheduler//  FixedCountScheduler//  () -> SourcePartitionedSchedulerStageScheduler scheduler =createStageScheduler(
                        queryStateMachine,
                        stageExecution,
                        splitSourceFactory,
                        children,
                        partitioningCache,
                        nodeScheduler,
                        nodePartitioningManager,
                        splitBatchSize,
                        dynamicFilterService,
                        executor,
                        tableExecuteContextManager);
                stageSchedulers.put(stageExecution.getStageId(), scheduler);}// 创建PipelinedDistributedStagesScheduler实例,负责所有的Stages的调度执行PipelinedDistributedStagesScheduler distributedStagesScheduler =newPipelinedDistributedStagesScheduler(
                    stateMachine,
                    queryStateMachine,
                    schedulerStats,
                    stageManager,
                    executionPolicy.createExecutionSchedule(stageExecutions.values()),
                    stageSchedulers.build(),ImmutableMap.copyOf(stageExecutions),
                    dynamicFilterService);
            distributedStagesScheduler.initialize();return distributedStagesScheduler;}/**
         * 为每一个PlanFragment计算bucketToPartition的映射关系。
         */privatestaticMap<PlanFragmentId,Optional<int[]>>createBucketToPartitionMap(Map<PlanFragmentId,Optional<int[]>> bucketToPartitionForStagesConsumedByCoordinator,StageManager stageManager,Function<PartitioningHandle,NodePartitionMap> partitioningCache){ImmutableMap.Builder<PlanFragmentId,Optional<int[]>> result =ImmutableMap.builder();// 忽略,只有在Coordinator上调度时,才会有值
            result.putAll(bucketToPartitionForStagesConsumedByCoordinator);for(SqlStage stage : stageManager.getDistributedStagesInTopologicalOrder()){PlanFragment fragment = stage.getFragment();// Optional<int[]> bucketToPartition =getBucketToPartition(fragment.getPartitioning(), partitioningCache, fragment.getRoot(), fragment.getRemoteSourceNodes());for(SqlStage childStage : stageManager.getChildren(stage.getStageId())){
                    result.put(childStage.getFragment().getId(), bucketToPartition);}}return result.build();}privatestaticOptional<int[]>getBucketToPartition(PartitioningHandle partitioningHandle,Function<PartitioningHandle,NodePartitionMap> partitioningCache,PlanNode fragmentRoot,List<RemoteSourceNode> remoteSourceNodes){if(partitioningHandle.equals(SOURCE_DISTRIBUTION)|| partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)){// SOURCE_DISTRIBUTION表示一个TableScan算子,而SCALED_WRITER_DISTRIBUTION表示Table Write算子// 因此这种类型的PlanFragment只会有一个分桶returnOptional.of(newint[1]);}elseif(searchFrom(fragmentRoot).where(node -> node instanceofTableScanNode).findFirst().isPresent()){if(remoteSourceNodes.stream().allMatch(node -> node.getExchangeType()==REPLICATE)){returnOptional.empty();}else{// remote source requires nodePartitionMap// remote source类型的算子,需要从上游的PlanFragment读取分区的数据,因此bucket到partition的映射关系,需要// 根据绑定的partitioningHandle得到,partitioningCache在之前已经被初始化过了NodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);returnOptional.of(nodePartitionMap.getBucketToPartition());}}else{// 其它类型,例如ARBITRARY_DISTRIBUTION、FIXED_HASH_DISTRIBUTION等,计算过程同remote source相似NodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);List<InternalNode> partitionToNode = nodePartitionMap.getPartitionToNode();// todo this should asynchronously wait a standard timeout period before failingcheckCondition(!partitionToNode.isEmpty(),NO_NODES_AVAILABLE,"No worker nodes available");returnOptional.of(nodePartitionMap.getBucketToPartition());}}}
创建NodePartitioningMap实例,维护bucket -> Partition -> Node的映射关系

此实例保存了两个重要的数据结构:
partitionToNode:Data Partition -> Worker Node的映射集合
bucketToPartition:Data Bucket -> Data Partition的映射集合
根据上面两个Map变量,可以做到根据分区键,计算每一个数据行的Bucket ID,就可以知道这一行数据归于哪个Partition,
进而知道应该分布到哪个Worker Node上

publicNodePartitionMapgetNodePartitioningMap(Session session,PartitioningHandle partitioningHandle){requireNonNull(session,"session is null");requireNonNull(partitioningHandle,"partitioningHandle is null");if(partitioningHandle.getConnectorHandle()instanceofSystemPartitioningHandle){// 返回系统默认的对象return((SystemPartitioningHandle) partitioningHandle.getConnectorHandle()).getNodePartitionMap(session, nodeScheduler);}// 获取Connector自己实现的Bucket -> Node的映射集合,Connector可以实现接口,定义buckets的数量,以及构建bucket到worker node映射// 由于我们讨论的Iceberg Connector,因此会createArbitraryBucketToNode(...)方法得到实例ConnectorBucketNodeMap connectorBucketNodeMap =getConnectorBucketNodeMap(session, partitioningHandle);// safety check for crazy partitioningcheckArgument(connectorBucketNodeMap.getBucketCount()<1_000_000,"Too many buckets in partitioning: %s", connectorBucketNodeMap.getBucketCount());List<InternalNode> bucketToNode;if(connectorBucketNodeMap.hasFixedMapping()){
            bucketToNode =getFixedMapping(connectorBucketNodeMap);}else{CatalogName catalogName = partitioningHandle.getConnectorId().orElseThrow(()->newIllegalArgumentException("No connector ID for partitioning handle: "+ partitioningHandle));// Create a bucket to node mapping. Consecutive buckets are assigned// to shuffled nodes (e.g "1 -> node2, 2 -> node1, 3 -> node2, 4 -> node1, ...").// 这里必然有这样的不等式:buckets的数量 >= 可用的Workers的数量// Iceberg Connector仅仅定义了buckets数量,没有定义bucket到node映射关系,并且buckets的数量=活跃worker数量
            bucketToNode =createArbitraryBucketToNode(
                    nodeScheduler.createNodeSelector(session,Optional.of(catalogName)).allNodes(),
                    connectorBucketNodeMap.getBucketCount());}// 前面创建了bucket到worker的映射关系,下面就要构建Bucket与Partition的关系// 创建一个数组,大小为Buckets的数量,同时bucketToPartition[i]存放的是对应的PartitionIdint[] bucketToPartition =newint[connectorBucketNodeMap.getBucketCount()];// BiMap,保证keys和values都各自不重复,也就意味着一个Worker Node唯一对应一个PartitionBiMap<InternalNode,Integer> nodeToPartition =HashBiMap.create();int nextPartitionId =0;// 初始值for(int bucket =0; bucket < bucketToNode.size(); bucket++){InternalNode node = bucketToNode.get(bucket);// bucketToNode中可能会存在重复的Value,即多个Bucket映射到多个Worker NodeInteger partitionId = nodeToPartition.get(node);if(partitionId ==null){// 如果partitionId不存在,即找到了一个新的Worker,那么就递增partitionId// 不难看出在Trino内部,一个WorkerNode就是一个Partition
                partitionId = nextPartitionId++;
                nodeToPartition.put(node, partitionId);}// 记录bucketId到PartitionId的映射
            bucketToPartition[bucket]= partitionId;}// 收集所有的WorkerNodeList<InternalNode> partitionToNode =IntStream.range(0, nodeToPartition.size()).mapToObj(partitionId -> nodeToPartition.inverse().get(partitionId)).collect(toImmutableList());// 返回实例returnnewNodePartitionMap(partitionToNode, bucketToPartition,getSplitToBucket(session, partitioningHandle));}
创建StageScheduler实例,负责一个Stage的调度执行
privatestaticclassPipelinedDistributedStagesSchedulerimplementsDistributedStagesScheduler{privatestaticStageSchedulercreateStageScheduler(QueryStateMachine queryStateMachine,PipelinedStageExecution stageExecution,SplitSourceFactory splitSourceFactory,List<PipelinedStageExecution> childStageExecutions,Function<PartitioningHandle,NodePartitionMap> partitioningCache,NodeScheduler nodeScheduler,NodePartitioningManager nodePartitioningManager,int splitBatchSize,DynamicFilterService dynamicFilterService,ScheduledExecutorService executor,TableExecuteContextManager tableExecuteContextManager){Session session = queryStateMachine.getSession();PlanFragment fragment = stageExecution.getFragment();PartitioningHandle partitioningHandle = fragment.getPartitioning();// 尝试为当前的Fragment,为每一个TableScanNode创建一个SplitSource实例,用于对数据源的数据进行切分,生成一系列的数据片段。// 对于Iceberg Connector来说,就是对DataFile进行切分,返回一批IcebergSplit。// SplitSource的提供的接口的最终调用会代理到IcebergSplitSource。// 在创建的过程中还会涉及到SplitManager的对象,不过不在这里解析了。Map<PlanNodeId,SplitSource> splitSources = splitSourceFactory.createSplitSources(session, fragment);if(!splitSources.isEmpty()){
                queryStateMachine.addStateChangeListener(newStateChangeListener<>(){privatefinalAtomicReference<Collection<SplitSource>> splitSourcesReference =newAtomicReference<>(splitSources.values());@OverridepublicvoidstateChanged(QueryState newState){if(newState.isDone()){// ensure split sources are closed and release memoryCollection<SplitSource> sources = splitSourcesReference.getAndSet(null);if(sources !=null){closeSplitSources(sources);}}}});}if(partitioningHandle.equals(SOURCE_DISTRIBUTION)){// 如果当前PlanFragment的分区类型是SOURCE_DISTRIBUTION,说明这个Fragment是上游的SubPlan,负责从数据源加载数据// nodes are selected dynamically based on the constraints of the splits and the system loadEntry<PlanNodeId,SplitSource> entry =getOnlyElement(splitSources.entrySet());PlanNodeId planNodeId = entry.getKey();SplitSource splitSource = entry.getValue();Optional<CatalogName> catalogName =Optional.of(splitSource.getCatalogName()).filter(catalog ->!isInternalSystemConnector(catalog));NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, catalogName);// placementPolicy负责根据nodelSelector的实现,为Split分配合适的WorkerNode,SplitPlacementPolicy placementPolicy =newDynamicSplitPlacementPolicy(nodeSelector, stageExecution::getAllTasks);checkArgument(!fragment.getStageExecutionDescriptor().isStageGroupedExecution());// 返回一个封装了SourcePartitionedScheduler实例的对象returnnewSourcePartitionedSchedulerAsStageScheduler(
                        stageExecution,
                        planNodeId,
                        splitSource,
                        placementPolicy,
                        splitBatchSize,
                        dynamicFilterService,
                        tableExecuteContextManager,()-> childStageExecutions.stream().anyMatch(PipelinedStageExecution::isAnyTaskBlocked));}elseif(partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)){// ...return scheduler;}else{// 如果不是包含TableScan的PlanFragment,比如是一个JOIN类型的Fragment,它存在如下三种情况//    left is Source, right is RemoteSource//    left is RemoteSource, right is RemoteSource//    left is Source, right is Sourceif(!splitSources.isEmpty()){// contains local sourceList<PlanNodeId> schedulingOrder = fragment.getPartitionedSources();Optional<CatalogName> catalogName = partitioningHandle.getConnectorId();checkArgument(catalogName.isPresent(),"No connector ID for partitioning handle: %s", partitioningHandle);List<ConnectorPartitionHandle> connectorPartitionHandles;boolean groupedExecutionForStage = fragment.getStageExecutionDescriptor().isStageGroupedExecution();// 如果一个Stage被标记为Grouped,这个Stage必须是被Partitioning了,因此可以等价地认为// 一个Group就是一个Bucket,因此这个Group中的Splits都对应同一个Partition,又对应同一个Worker Nodeif(groupedExecutionForStage){
                        connectorPartitionHandles = nodePartitioningManager.listPartitionHandles(session, partitioningHandle);checkState(!ImmutableList.of(NOT_PARTITIONED).equals(connectorPartitionHandles));}else{// 如果不是分组
                        connectorPartitionHandles =ImmutableList.of(NOT_PARTITIONED);}BucketNodeMap bucketNodeMap;List<InternalNode> stageNodeList;if(fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType()==REPLICATE)){// no remote sourceboolean dynamicLifespanSchedule = fragment.getStageExecutionDescriptor().isDynamicLifespanSchedule();
                        bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, dynamicLifespanSchedule);// verify execution is consistent with planner's decision on dynamic lifespan scheduleverify(bucketNodeMap.isDynamic()== dynamicLifespanSchedule);// 如果Fragment仅包含本地的TableScanNode,那么所有可用的Worker结点,都是当前Stage可以被调度执行的结点// 因此
                        stageNodeList =newArrayList<>(nodeScheduler.createNodeSelector(session, catalogName).allNodes());Collections.shuffle(stageNodeList);}else{// cannot use dynamic lifespan scheduleverify(!fragment.getStageExecutionDescriptor().isDynamicLifespanSchedule());// remote source requires nodePartitionMapNodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);if(groupedExecutionForStage){// 如果是Grouped Stage,则需要M个不同的ConnectorPartitionHandle实例,用来计算BucketID,// 同时M == Buckets的数量,才能保存每一个BucketId都对应不同的分区。checkState(connectorPartitionHandles.size()== nodePartitionMap.getBucketToPartition().length);}
                        stageNodeList = nodePartitionMap.getPartitionToNode();
                        bucketNodeMap = nodePartitionMap.asBucketNodeMap();}// 在这种情况下,Buckets的数量是固定的,因此数据源的分区数量也是固定的,因此创建FixedSourcePartitionedScheduler实例returnnewFixedSourcePartitionedScheduler(
                            stageExecution,
                            splitSources,
                            fragment.getStageExecutionDescriptor(),
                            schedulingOrder,
                            stageNodeList,
                            bucketNodeMap,
                            splitBatchSize,getConcurrentLifespansPerNode(session),
                            nodeScheduler.createNodeSelector(session, catalogName),
                            connectorPartitionHandles,
                            dynamicFilterService,
                            tableExecuteContextManager);}else{// all sources are remote// 如果都是RemoteSources Plan Node,即要读取的数据来自上游的OutputBufers,// 因此这里Partitions的数量,取决于上游,为当前的Stage创建分区任务的数量也是确定的// 例如当Buckets数量 == Partitions数量 == Node数量时,同一个Stage的不同分区上的任务,会发送到不同的WorkerNode;// 但如果Buckets数量多于node数量,一个Stage的多个分区可能会同时运行在一个Node上面NodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);List<InternalNode> partitionToNode = nodePartitionMap.getPartitionToNode();// todo this should asynchronously wait a standard timeout period before failingcheckCondition(!partitionToNode.isEmpty(),NO_NODES_AVAILABLE,"No worker nodes available");returnnewFixedCountScheduler(stageExecution, partitionToNode);}}}}

DistributedStagesScheduler的调度

privatestaticclassPipelinedDistributedStagesSchedulerimplementsDistributedStagesScheduler{@Overridepublicvoidschedule(){// 调度开始checkState(started.compareAndSet(false,true),"already started");try(SetThreadName ignored =newSetThreadName("Query-%s", queryStateMachine.getQueryId())){while(!executionSchedule.isFinished()){List<ListenableFuture<Void>> blockedStages =newArrayList<>();// 获取要调度的Stages,默认配置下,会调度所有的Stages运行,而不考虑Stages之间的依赖for(PipelinedStageExecution stageExecution : executionSchedule.getStagesToSchedule()){// 由StageExecution实例代理调度绑定的Stage执行
                        stageExecution.beginScheduling();// perform some scheduling work,异步ScheduleResult result = stageSchedulers.get(stageExecution.getStageId()).schedule();// modify parent and children based on the results of the schedulingif(result.isFinished()){// 如果Stage完成了,那么就设置完成状态
                            stageExecution.schedulingComplete();}elseif(!result.getBlocked().isDone()){// 如果Stage的状态为BLOCKED,可能是由于前置Stage还没有数据输出
                            blockedStages.add(result.getBlocked());}
                        schedulerStats.getSplitsScheduledPerIteration().add(result.getSplitsScheduled());if(result.getBlockedReason().isPresent()){switch(result.getBlockedReason().get()){caseWRITER_SCALING:// no-opbreak;caseWAITING_FOR_SOURCE:
                                    schedulerStats.getWaitingForSource().update(1);break;caseSPLIT_QUEUES_FULL:
                                    schedulerStats.getSplitQueuesFull().update(1);break;caseMIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:caseNO_ACTIVE_DRIVER_GROUP:break;default:thrownewUnsupportedOperationException("Unknown blocked reason: "+ result.getBlockedReason().get());}}}// wait for a state change and then schedule again,如果还有被BLOCKED的Stage,则需要进行超时检测if(!blockedStages.isEmpty()){try(TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()){tryGetFutureValue(whenAnyComplete(blockedStages),1,SECONDS);}for(ListenableFuture<Void> blockedStage : blockedStages){
                            blockedStage.cancel(true);}}}for(PipelinedStageExecution stageExecution : stageExecutions.values()){PipelinedStageExecution.State state = stageExecution.getState();if(state !=SCHEDULED&& state !=RUNNING&& state !=FLUSHING&&!state.isDone()){thrownewTrinoException(GENERIC_INTERNAL_ERROR,format("Scheduling is complete, but stage %s is in state %s", stageExecution.getStageId(), state));}}}catch(Throwable t){fail(t,Optional.empty());}finally{RuntimeException closeError =newRuntimeException();for(StageScheduler scheduler : stageSchedulers.values()){try{
                        scheduler.close();}catch(Throwable t){fail(t,Optional.empty());// Self-suppression not permittedif(closeError != t){
                            closeError.addSuppressed(t);}}}}}}

FixedSourcePartitionedScheduler::schedule()

GROUP_WIDE,Split的Life Cycle为Task Group级别,只会由中继Stage/PlanFragment的产生,其对应的Split用于读取上游已经被Partitioned的数据(因此可以简单地认为一个Group,就是一个Data Partition)。当有多个SourceScheduler调度Splits时,同一个Source上
的调度策略是按GroupId顺序高度,且同一个时刻只能调度一个Group的Splits执行;而其它SourceScheduler可以并行地调度不同的Group。

TASK_WIDE,Split的Life Cycle为Task级别,可以先简单地认为就是Table Scan Stage执行时的Split的LifeCycle。每一个SourceScheduler之间互相不影响,只看当前SqlTask的剩余资源来决定是否要调度新的Splits。

publicclassFixedSourcePartitionedSchedulerimplementsStageScheduler{@OverridepublicScheduleResultschedule(){// schedule a task on every node in the distributionList<RemoteTask> newTasks =ImmutableList.of();if(scheduledTasks.isEmpty()){// 如果这个Stage还没有调度过任务,就为所有的分区创建RemoteTask任务ImmutableList.Builder<RemoteTask> newTasksBuilder =ImmutableList.builder();for(InternalNode node : nodes){// 遍历当前Stage所有可用的Worker Nodes// 一个Node,就对应唯一一个分区Optional<RemoteTask> task = stageExecution.scheduleTask(node, partitionIdAllocator.getNextId(),ImmutableMultimap.of(),ImmutableMultimap.of());if(task.isPresent()){
                    scheduledTasks.put(node, task.get());
                    newTasksBuilder.add(task.get());}}
            newTasks = newTasksBuilder.build();}boolean allBlocked =true;List<ListenableFuture<Void>> blocked =newArrayList<>();BlockedReason blockedReason =BlockedReason.NO_ACTIVE_DRIVER_GROUP;if(groupedLifespanScheduler.isPresent()){// Start new driver groups on the first scheduler if necessary,// i.e. when previous ones have finished execution (not finished scheduling).//// Invoke schedule method to get a new SettableFuture every time.// Reusing previously returned SettableFuture could lead to the ListenableFuture retaining too many listeners.
            blocked.add(groupedLifespanScheduler.get().schedule(sourceSchedulers.get(0)));}int splitsScheduled =0;// SourceSchedulers保存了每一个Source的调度器实例,即SourcePartitionedScheduler实例,它们负责调度各自的SplitsIterator<SourceScheduler> schedulerIterator = sourceSchedulers.iterator();List<Lifespan> driverGroupsToStart =ImmutableList.of();boolean shouldInvokeNoMoreDriverGroups =false;while(schedulerIterator.hasNext()){SourceScheduler sourceScheduler = schedulerIterator.next();// 如果是分组调度,意味着底层调度Splits的策略是按分组来的,只有当一个SourceScheduler某个分组的Splits调度完成了,// 下一个SourceScheduler才能调度相应分组的Splits,而其它的分组被BLOCKED,真正第一个SourceScheduler又完成有其它分组上的调度for(Lifespan lifespan : driverGroupsToStart){
                sourceScheduler.startLifespan(lifespan,partitionHandleFor(lifespan));}if(shouldInvokeNoMoreDriverGroups){
                sourceScheduler.noMoreLifespans();}// 调用FixedSourcePartitionedScheduler::schedule()方法ScheduleResult schedule = sourceScheduler.schedule();// 累加当前Stage总共调度的Splits数量
            splitsScheduled += schedule.getSplitsScheduled();if(schedule.getBlockedReason().isPresent()){
                blocked.add(schedule.getBlocked());
                blockedReason = blockedReason.combineWith(schedule.getBlockedReason().get());}else{verify(schedule.getBlocked().isDone(),"blockedReason not provided when scheduler is blocked");
                allBlocked =false;}// 如果SourceScheduler instanceOf AsGroupedSourceScheduler,那么drainCompletedLifespans()方法总是会返回对应的LifeSpan对象
            driverGroupsToStart = sourceScheduler.drainCompletedLifespans();if(schedule.isFinished()){
                stageExecution.schedulingComplete(sourceScheduler.getPlanNodeId());
                schedulerIterator.remove();
                sourceScheduler.close();
                shouldInvokeNoMoreDriverGroups =true;}else{
                shouldInvokeNoMoreDriverGroups =false;}}if(allBlocked){// 如果所有的SourcePartitionedScheduler被BLOCKED了,那么就返回Blocked信息returnnewScheduleResult(sourceSchedulers.isEmpty(), newTasks,whenAnyComplete(blocked), blockedReason, splitsScheduled);}else{// 有正在运行的SourcePartitionedScheduler,就返回已经调度的Splits信息returnnewScheduleResult(sourceSchedulers.isEmpty(), newTasks, splitsScheduled);}}}
SourcePartitionedScheduler::schedule()

最底层的Splits调度器,负责调度、执行SOURCE类型的Stage。

publicclassSourcePartitionedSchedulerimplementsSourceScheduler{@OverridepublicsynchronizedScheduleResultschedule(){dropListenersFromWhenFinishedOrNewLifespansAdded();int overallSplitAssignmentCount =0;ImmutableSet.Builder<RemoteTask> overallNewTasks =ImmutableSet.builder();List<ListenableFuture<?>> overallBlockedFutures =newArrayList<>();boolean anyBlockedOnPlacements =false;boolean anyBlockedOnNextSplitBatch =false;boolean anyNotBlocked =false;// 遍历每一个ScheduleGroup实例,一个ScheduleGroup对应了// for(Entry<Lifespan,ScheduleGroup> entry : scheduleGroups.entrySet()){Lifespan lifespan = entry.getKey();ScheduleGroup scheduleGroup = entry.getValue();Set<Split> pendingSplits = scheduleGroup.pendingSplits;if(scheduleGroup.state ==ScheduleGroupState.NO_MORE_SPLITS|| scheduleGroup.state ==ScheduleGroupState.DONE){verify(scheduleGroup.nextSplitBatchFuture ==null);}elseif(pendingSplits.isEmpty()){// try to get the next batchif(scheduleGroup.nextSplitBatchFuture ==null){// 实际上是通过IcebergConnectorSplitSource获取下一批要调度处理的Splits,// 注意通过splitBatchSize - pendingSplits.size()限制了最大被调度的Splits数量
                    scheduleGroup.nextSplitBatchFuture = splitSource.getNextBatch(scheduleGroup.partitionHandle, lifespan, splitBatchSize - pendingSplits.size());long start =System.nanoTime();addSuccessCallback(scheduleGroup.nextSplitBatchFuture,()-> stageExecution.recordGetSplitTime(start));}if(scheduleGroup.nextSplitBatchFuture.isDone()){// 如果nextSplitBatchFuture完成,意味着拿到了Splits实例,因此就可以立即调度了SplitBatch nextSplits =getFutureValue(scheduleGroup.nextSplitBatchFuture);
                    scheduleGroup.nextSplitBatchFuture =null;// 将所有的Splits添加到等待队列中
                    pendingSplits.addAll(nextSplits.getSplits());if(nextSplits.isLastBatch()){// 如果是最后一批要调度的Splits,则追加一个EmptySplit的实例,以便通知Worker Node上的SqlTask任务停止运行if(scheduleGroup.state ==ScheduleGroupState.INITIALIZED&& pendingSplits.isEmpty()){// Add an empty split in case no splits have been produced for the source.// For source operators, they never take input, but they may produce output.// This is well handled by the execution engine.// However, there are certain non-source operators that may produce output without any input,// for example, 1) an AggregationOperator, 2) a HashAggregationOperator where one of the grouping sets is ().// Scheduling an empty split kicks off necessary driver instantiation to make this work.
                            pendingSplits.add(newSplit(
                                    splitSource.getCatalogName(),newEmptySplit(splitSource.getCatalogName()),
                                    lifespan));}// 通知当前的SourceScheduler,不需要再调度了
                        scheduleGroup.state =ScheduleGroupState.NO_MORE_SPLITS;}}else{
                    overallBlockedFutures.add(scheduleGroup.nextSplitBatchFuture);
                    anyBlockedOnNextSplitBatch =true;continue;}}Multimap<InternalNode,Split> splitAssignment =ImmutableMultimap.of();if(!pendingSplits.isEmpty()){if(!scheduleGroup.placementFuture.isDone()){
                    anyBlockedOnPlacements =true;continue;}if(scheduleGroup.state ==ScheduleGroupState.INITIALIZED){
                    scheduleGroup.state =ScheduleGroupState.SPLITS_ADDED;}if(state ==State.INITIALIZED){
                    state =State.SPLITS_ADDED;}// calculate placements for splits,为每一个Split计算应该被分发到哪个Worker NodeSplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
                splitAssignment = splitPlacementResult.getAssignments();// remove splits with successful placements
                splitAssignment.values().forEach(pendingSplits::remove);// AbstractSet.removeAll performs terribly here.
                overallSplitAssignmentCount += splitAssignment.size();// if not completed placed, mark scheduleGroup as blocked on placementif(!pendingSplits.isEmpty()){
                    scheduleGroup.placementFuture = splitPlacementResult.getBlocked();
                    overallBlockedFutures.add(scheduleGroup.placementFuture);
                    anyBlockedOnPlacements =true;}}// if no new splits will be assigned, update state and attach completion eventMultimap<InternalNode,Lifespan> noMoreSplitsNotification =ImmutableMultimap.of();if(pendingSplits.isEmpty()&& scheduleGroup.state ==ScheduleGroupState.NO_MORE_SPLITS){
                scheduleGroup.state =ScheduleGroupState.DONE;if(!lifespan.isTaskWide()){InternalNode node =((BucketedSplitPlacementPolicy) splitPlacementPolicy).getNodeForBucket(lifespan.getId());
                    noMoreSplitsNotification =ImmutableMultimap.of(node, lifespan);}}// assign the splits with successful placements
            overallNewTasks.addAll(assignSplits(splitAssignment, noMoreSplitsNotification));// Assert that "placement future is not done" implies "pendingSplits is not empty".// The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line.// However, there are other reasons that could lead to this.// Note that `computeAssignments` is quite broken:// 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked.// 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion.// As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here.if(scheduleGroup.nextSplitBatchFuture ==null&& scheduleGroup.pendingSplits.isEmpty()&& scheduleGroup.state !=ScheduleGroupState.DONE){
                anyNotBlocked =true;}}// * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked.//   If state is NO_MORE_SPLITS/FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now.// * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source.//   * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures,//     which may contain recently published splits. We must not ignore those.//   * If any scheduleGroup is still in DISCOVERING_SPLITS state, it means it hasn't realized that there will be no more splits.//     Next time it invokes getNextBatch, it will realize that. However, the invocation will fail we tear down splitSource now.if((state ==State.NO_MORE_SPLITS|| state ==State.FINISHED)||(noMoreScheduleGroups && scheduleGroups.isEmpty()&& splitSource.isFinished())){switch(state){caseINITIALIZED:// We have not scheduled a single split so far.// But this shouldn't be possible. See usage of EmptySplit in this method.thrownewIllegalStateException("At least 1 split should have been scheduled for this plan node");caseSPLITS_ADDED:
                    state =State.NO_MORE_SPLITS;Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();// Here we assume that we can get non-empty tableExecuteSplitsInfo only for queries which facilitate single split source.// TODO support grouped execution
                    tableExecuteSplitsInfo.ifPresent(info ->{TableExecuteContext tableExecuteContext = tableExecuteContextManager.getTableExecuteContextForQuery(stageExecution.getStageId().getQueryId());
                        tableExecuteContext.setSplitsInfo(info);});

                    splitSource.close();// fall throughcaseNO_MORE_SPLITS:
                    state =State.FINISHED;
                    whenFinishedOrNewLifespanAdded.set(null);// fall throughcaseFINISHED:
                    splitSource.getMetrics().ifPresent(stageExecution::updateConnectorMetrics);returnnewScheduleResult(true,
                            overallNewTasks.build(),
                            overallSplitAssignmentCount);}thrownewIllegalStateException("Unknown state");}if(anyNotBlocked){returnnewScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);}if(anyBlockedOnNextSplitBatch
                && scheduledTasks.isEmpty()&& dynamicFilterService.isCollectingTaskNeeded(stageExecution.getStageId().getQueryId(), stageExecution.getFragment())){// schedule a task for collecting dynamic filters in case probe split generator is waiting for themcreateTaskOnRandomNode().ifPresent(overallNewTasks::add);}boolean anySourceTaskBlocked =this.anySourceTaskBlocked.getAsBoolean();if(anySourceTaskBlocked){// Dynamic filters might not be collected due to build side source tasks being blocked on full buffer.// In such case probe split generation that is waiting for dynamic filters should be unblocked to prevent deadlock.
            dynamicFilterService.unblockStageDynamicFilters(stageExecution.getStageId().getQueryId(), stageExecution.getAttemptId(), stageExecution.getFragment());}if(groupedExecution){
            overallNewTasks.addAll(finalizeTaskCreationIfNecessary());}elseif(anyBlockedOnPlacements && anySourceTaskBlocked){// In a broadcast join, output buffers of the tasks in build source stage have to// hold onto all data produced before probe side task scheduling finishes,// even if the data is acknowledged by all known consumers. This is because// new consumers may be added until the probe side task scheduling finishes.//// As a result, the following line is necessary to prevent deadlock// due to neither build nor probe can make any progress.// The build side blocks due to a full output buffer.// In the meantime the probe side split cannot be consumed since// builder side hash table construction has not finished.
            overallNewTasks.addAll(finalizeTaskCreationIfNecessary());}ScheduleResult.BlockedReason blockedReason;if(anyBlockedOnNextSplitBatch){
            blockedReason = anyBlockedOnPlacements ?MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:WAITING_FOR_SOURCE;}else{
            blockedReason = anyBlockedOnPlacements ?SPLIT_QUEUES_FULL:NO_ACTIVE_DRIVER_GROUP;}

        overallBlockedFutures.add(whenFinishedOrNewLifespanAdded);returnnewScheduleResult(false,
                overallNewTasks.build(),nonCancellationPropagating(asVoid(whenAnyComplete(overallBlockedFutures))),
                blockedReason,
                overallSplitAssignmentCount);}}

SqlTask的创建

SqlTask,运行在Worker Node上,每一个SqlTask对应 一个Stage中的一个分区,它负责处理这个分区上的所有Splits。
客户端通过

/v1/task/{taskId}

,请求对应的Worker Node创建相应的任务实例。

@ResourceSecurity(INTERNAL_ONLY)@POST@Path("{taskId}")@Consumes(MediaType.APPLICATION_JSON)@Produces(MediaType.APPLICATION_JSON)publicvoidcreateOrUpdateTask(@PathParam("taskId")TaskId taskId,TaskUpdateRequest taskUpdateRequest,@ContextUriInfo uriInfo,@SuspendedAsyncResponse asyncResponse){requireNonNull(taskUpdateRequest,"taskUpdateRequest is null");Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager, taskUpdateRequest.getExtraCredentials());if(injectFailure(session.getTraceToken(), taskId,RequestType.CREATE_OR_UPDATE_TASK, asyncResponse)){return;}// 创建任务TaskInfo taskInfo = taskManager.updateTask(session,
                taskId,
                taskUpdateRequest.getFragment(),
                taskUpdateRequest.getSources(),
                taskUpdateRequest.getOutputIds(),
                taskUpdateRequest.getDynamicFilterDomains());if(shouldSummarize(uriInfo)){
            taskInfo = taskInfo.summarize();}

        asyncResponse.resume(Response.ok().entity(taskInfo).build());}

SqlTaskManager::updateTask

publicclassSqlTaskManagerimplementsTaskManager,Closeable{privatefinalLoadingCache<TaskId,SqlTask> tasks =CacheBuilder.newBuilder().build(CacheLoader.from(
                taskId ->createSqlTask(
                        taskId,
                        locationFactory.createLocalTaskLocation(taskId),
                        nodeInfo.getNodeId(),
                        queryContexts.getUnchecked(taskId.getQueryId()),
                        sqlTaskExecutionFactory,
                        taskNotificationExecutor,
                        sqlTask -> finishedTaskStats.merge(sqlTask.getIoStats()),
                        maxBufferSize,
                        maxBroadcastBufferSize,
                        failedTasks)));@OverridepublicTaskInfoupdateTask(Session session,TaskId taskId,Optional<PlanFragment> fragment,List<TaskSource> sources,OutputBuffers outputBuffers,Map<DynamicFilterId,Domain> dynamicFilterDomains){try{return versionEmbedder.embedVersion(()->doUpdateTask(session, taskId, fragment, sources, outputBuffers, dynamicFilterDomains)).call();}catch(Exception e){throwIfUnchecked(e);// impossible, doUpdateTask does not throw checked exceptionsthrownewRuntimeException(e);}}privateTaskInfodoUpdateTask(Session session,TaskId taskId,Optional<PlanFragment> fragment,List<TaskSource> sources,OutputBuffers outputBuffers,Map<DynamicFilterId,Domain> dynamicFilterDomains){requireNonNull(session,"session is null");requireNonNull(taskId,"taskId is null");requireNonNull(fragment,"fragment is null");requireNonNull(sources,"sources is null");requireNonNull(outputBuffers,"outputBuffers is null");SqlTask sqlTask = tasks.getUnchecked(taskId);// 创建一个新的SqlTask实例QueryContext queryContext = sqlTask.getQueryContext();if(!queryContext.isMemoryLimitsInitialized()){// 如果限制了当前Query运行时的内存,则需要更新相关的属性long sessionQueryMaxMemoryPerNode =getQueryMaxMemoryPerNode(session).toBytes();long sessionQueryTotalMaxMemoryPerNode =getQueryMaxTotalMemoryPerNode(session).toBytes();// Session properties are only allowed to decrease memory limits, not increase them
            queryContext.initializeMemoryLimits(resourceOvercommit(session),min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode),min(sessionQueryTotalMaxMemoryPerNode, queryMaxTotalMemoryPerNode));}// 更新SqlTask的心跳信息,实际上就是系统当前的时间// 每一个SqlTask的心跳信息,都会在查找或更新时,被更新,以保证能够根据上一次的心跳时间,判断它是不是失联了
        sqlTask.recordHeartbeat();// 更新SqlTask实例运行时的参数return sqlTask.updateTask(session, fragment, sources, outputBuffers, dynamicFilterDomains);}

SqlTask的更新

创建SqlTask实例时,或是Coordinator调了新的Splits时,会执行更新过程。

publicclassSqlTask{/**
     * 此方法的所有参数,都来自客户端发送的TaskUpdateRequest的对象,因此在生成Worker端的执行任务时,此Fragment上的输入(Split)、
     * 输出(outputBuffers)都已经确定了。
     * session: 保存了Sql执行时的客户端侧会话信息
     * fragment: 当前SqlTask要执行的逻辑计划片段
     * sources: 当前SqlTask要读取的数据源的Split描述信息,这些Split要么读取remote source,要么读取table。
     * outputBuffers: SqlTask的输出缓存区队列,在Coordinator侧创建PipelinedStageExecution实例时就已经被确定了,一共有如下三种类型:
     *     BroadcastOutputBufferManager:广播输出数据,只有一个Partition,因此只有一个buffer
     *     ScaledOutputBufferManager:动态扩展输出Buffer的数量,因此buffers数量是不固定的,被用于写出数据的任务
     *     PartitionedOutputBufferManager:按分区数量创建相同数量的outputBuffer,因此每一个Buffer都对应一个分区ID,供下游Stage
     *                                     消费。在当前的执行流程分析场景下,用到的是此类的实例。
     */publicTaskInfoupdateTask(Session session,Optional<PlanFragment> fragment,List<TaskSource> sources,OutputBuffers outputBuffers,Map<DynamicFilterId,Domain> dynamicFilterDomains){try{// trace token must be set first to make sure failure injection for getTaskResults requests works as expected
            session.getTraceToken().ifPresent(traceToken::set);// The LazyOutput buffer does not support write methods, so the actual// output buffer must be established before drivers are created (e.g.// a VALUES query).
            outputBuffer.setOutputBuffers(outputBuffers);// assure the task execution is only created onceSqlTaskExecution taskExecution;synchronized(this){// is task already complete?TaskHolder taskHolder = taskHolderReference.get();if(taskHolder.isFinished()){return taskHolder.getFinalTaskInfo();}
                taskExecution = taskHolder.getTaskExecution();if(taskExecution ==null){checkState(fragment.isPresent(),"fragment must be present");// 创建SqlTaskExecution实例,负责在当前的Worker结点分析和执行fragment    
                    taskExecution = sqlTaskExecutionFactory.create(
                            session,
                            queryContext,
                            taskStateMachine,
                            outputBuffer,
                            fragment.get(),this::notifyStatusChanged);
                    taskHolderReference.compareAndSet(taskHolder,newTaskHolder(taskExecution));
                    needsPlan.set(false);}}if(taskExecution !=null){// 一旦发现taskExecution实例,就将要处理的数据源Splits添加到等待队列中
                taskExecution.addSources(sources);// 同时更新dynamicFilter产生的(可以在处理Split时用于过滤数据的值集合)。
                taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains);}}catch(Error e){failed(e);throw e;}catch(RuntimeException e){failed(e);}returngetTaskInfo();}}
SqlTaskExecution的创建

创建此实例时,会在创建过程中,生成真正可执行的物理执行计划实例LocalExecutionPlan。

publicclassSqlTaskExecutionFactory{publicSqlTaskExecutioncreate(Session session,QueryContext queryContext,TaskStateMachine taskStateMachine,OutputBuffer outputBuffer,PlanFragment fragment,Runnable notifyStatusChanged){// 创建TaskContext实例,维护了当前SqlTask运行时的各种信息,例如各种metricsTaskContext taskContext = queryContext.addTaskContext(
                taskStateMachine,
                session,
                notifyStatusChanged,
                perOperatorCpuTimerEnabled,
                cpuTimerEnabled);LocalExecutionPlan localExecutionPlan;try(SetThreadName ignored =newSetThreadName("Task-%s", taskStateMachine.getTaskId())){try{// planner是一个LocalExecutionPlanner类型的实例,用于将逻辑计划PlanFragment转换成本地可执行的// 物理执行计划LocalExecutionPlan
                localExecutionPlan = planner.plan(
                        taskContext,
                        fragment.getRoot(),TypeProvider.copyOf(fragment.getSymbols()),
                        fragment.getPartitioningScheme(),
                        fragment.getStageExecutionDescriptor(),
                        fragment.getPartitionedSources(),
                        outputBuffer);}catch(Throwable e){// planning failed
                taskStateMachine.failed(e);throwIfUnchecked(e);thrownewRuntimeException(e);}}returncreateSqlTaskExecution(
                taskStateMachine,
                taskContext,
                outputBuffer,
                localExecutionPlan,
                taskExecutor,
                taskNotificationExecutor,
                splitMonitor);}}
LocalExecutionPlanner::plan

将PlanFragment转换成本地可执行的物理计划LocalExecutionPlan。

publicclassLocalExecutionPlanner{publicLocalExecutionPlanplan(TaskContext taskContext,PlanNode plan,TypeProvider types,PartitioningScheme partitioningScheme,StageExecutionDescriptor stageExecutionDescriptor,List<PlanNodeId> partitionedSourceOrder,OutputBuffer outputBuffer){// 得到当前Fragment的输出布局(layout)List<Symbol> outputLayout = partitioningScheme.getOutputLayout();if(partitioningScheme.getPartitioning().getHandle().equals(FIXED_BROADCAST_DISTRIBUTION)||
                partitioningScheme.getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION)||
                partitioningScheme.getPartitioning().getHandle().equals(SCALED_WRITER_DISTRIBUTION)||
                partitioningScheme.getPartitioning().getHandle().equals(SINGLE_DISTRIBUTION)||
                partitioningScheme.getPartitioning().getHandle().equals(COORDINATOR_DISTRIBUTION)){// 由于数据是基于Partition的,因此跳过returnplan(taskContext, stageExecutionDescriptor, plan, outputLayout, types, partitionedSourceOrder,newTaskOutputFactory(outputBuffer));}// We can convert the symbols directly into channels, because the root must be a sink and therefore the layout is fixedList<Integer> partitionChannels;List<Optional<NullableValue>> partitionConstants;List<Type> partitionChannelTypes;if(partitioningScheme.getHashColumn().isPresent()){
            partitionChannels =ImmutableList.of(outputLayout.indexOf(partitioningScheme.getHashColumn().get()));
            partitionConstants =ImmutableList.of(Optional.empty());
            partitionChannelTypes =ImmutableList.of(BIGINT);}else{// 收集分区列的下标。对于常量分区值,则赋值-1
            partitionChannels = partitioningScheme.getPartitioning().getArguments().stream().map(argument ->{if(argument.isConstant()){return-1;}return outputLayout.indexOf(argument.getColumn());}).collect(toImmutableList());// 收集分区常量值
            partitionConstants = partitioningScheme.getPartitioning().getArguments().stream().map(argument ->{if(argument.isConstant()){returnOptional.of(argument.getConstant());}returnOptional.<NullableValue>empty();}).collect(toImmutableList());// 收集分区字段的类型
            partitionChannelTypes = partitioningScheme.getPartitioning().getArguments().stream().map(argument ->{if(argument.isConstant()){return argument.getConstant().getType();}return types.get(argument.getColumn());}).collect(toImmutableList());}// 得到计算分区ID的函数,一般地,执行Read SQL时,它是一个BucketPartitionFunction的实例// PartitionFunction提供了计算分区ID的方法,getPartition(Page page, int position),即给定一个数据页中的某一行数据,// 而计算PartitionId的算法,一共内置如下几类://    SINGLE: Single partition can only have one bucket//    HASH: HashBucketFunction,根据分区字段,计算得到HASH值,后面再将HASH值对partitions数量取余得到某行数据对应的分区ID//    ROUND_ROBIN: 净某一行数据以顺序遍历地方式,分区分区IDPartitionFunction partitionFunction = nodePartitioningManager.getPartitionFunction(taskContext.getSession(), partitioningScheme, partitionChannelTypes);OptionalInt nullChannel =OptionalInt.empty();Set<Symbol> partitioningColumns = partitioningScheme.getPartitioning().getColumns();// partitioningColumns expected to have one column in the normal case, and zero columns when partitioning on a constant// 对于常量分区,则不需要额外的列;对于指定了分区字段的情况,则需要一个额外的分区列,来保存每一行的分区值(例如对于HASH分区算法,存储// 的是HASH值)。checkArgument(!partitioningScheme.isReplicateNullsAndAny()|| partitioningColumns.size()<=1);if(partitioningScheme.isReplicateNullsAndAny()&& partitioningColumns.size()==1){
            nullChannel =OptionalInt.of(outputLayout.indexOf(getOnlyElement(partitioningColumns)));}returnplan(
                taskContext,
                stageExecutionDescriptor,
                plan,
                outputLayout,
                types,
                partitionedSourceOrder,// 创建一个PartitionedOutputFactory类型的实例,它提供了创建PartitionedOutputOperator实例的方法。// 而PartitionedOutputOperator是此PlanFragment上一系列执行算子的最后一个,负责对Data Page按PartitioningColumns// 计算PartitionID,并扔进OutputBuffer中。
                operatorFactories.partitionedOutput(
                        taskContext,
                        partitionFunction,
                        partitionChannels,
                        partitionConstants,
                        partitioningScheme.isReplicateNullsAndAny(),
                        nullChannel,
                        outputBuffer,
                        maxPagePartitioningBufferSize));}publicLocalExecutionPlanplan(TaskContext taskContext,StageExecutionDescriptor stageExecutionDescriptor,PlanNode plan,// PlanFragment的根结点,对应于此逻辑子计划的最上层NodeList<Symbol> outputLayout,// 此PlanFragment的结果的布局信息,实际上就是要输出的列符号TypeProvider types,// 用于描述 每一个Symbol的类型List<PlanNodeId> partitionedSourceOrder,// 保存了所有要输出的Source源的PlanNodeId,例如JOIN,有左、右两个SourceOutputFactory outputOperatorFactory){Session session = taskContext.getSession();// 保存了本地物理执行计划运行时的上下文信息LocalExecutionPlanContext context =newLocalExecutionPlanContext(taskContext, types);// 从PlanFragment的根逻辑计划结点plan开始访问,构建物理执行计划树,实际上就是一组作用于Split之上的Operators(Driver)PhysicalOperation physicalOperation = plan.accept(newVisitor(session, stageExecutionDescriptor), context);// 对齐逻辑计划的outputLayout和物理执行计划的输出layoutputFunction<Page,Page> pagePreprocessor =enforceLoadedLayoutProcessor(outputLayout, physicalOperation.getLayout());// 收集逻辑逻辑计划的输出字段的类型List<Type> outputTypes = outputLayout.stream().map(types::get).collect(toImmutableList());// 创建一个新的Driver。需要将OutputOperator与physicalOperaton串联到一个物理执行流水线中,分配一个新的PipelineId。// 其中physicalOperaton作为流水线的Source Operator,而OutputOperator作为流水线中的Output Operator。
        context.addDriverFactory(
                context.isInputDriver(),true,// 标识新的Driver的类型为,OutputnewPhysicalOperation(
                        outputOperatorFactory.createOutputOperator(
                                context.getNextOperatorId(),
                                plan.getId(),
                                outputTypes,
                                pagePreprocessor,newPagesSerdeFactory(plannerContext.getBlockEncodingSerde(),isExchangeCompressionEnabled(session))),
                        physicalOperation),
                context.getDriverInstanceCount());// notify operator factories that planning has completed
        context.getDriverFactories().stream().map(DriverFactory::getOperatorFactories).flatMap(List::stream).filter(LocalPlannerAware.class::isInstance).map(LocalPlannerAware.class::cast).forEach(LocalPlannerAware::localPlannerComplete);returnnewLocalExecutionPlan(context.getDriverFactories(), partitionedSourceOrder, stageExecutionDescriptor);}}
SqlTaskExecution的构建
privateSqlTaskExecution(TaskStateMachine taskStateMachine,TaskContext taskContext,OutputBuffer outputBuffer,LocalExecutionPlan localExecutionPlan,TaskExecutor taskExecutor,SplitMonitor splitMonitor,Executor notificationExecutor){this.taskStateMachine =requireNonNull(taskStateMachine,"taskStateMachine is null");this.taskId = taskStateMachine.getTaskId();this.taskContext =requireNonNull(taskContext,"taskContext is null");this.outputBuffer =requireNonNull(outputBuffer,"outputBuffer is null");this.taskExecutor =requireNonNull(taskExecutor,"taskExecutor is null");this.notificationExecutor =requireNonNull(notificationExecutor,"notificationExecutor is null");this.splitMonitor =requireNonNull(splitMonitor,"splitMonitor is null");try(SetThreadName ignored =newSetThreadName("Task-%s", taskId)){// index driver factories// 从执行计划,得到Source结点IDSet<PlanNodeId> partitionedSources =ImmutableSet.copyOf(localExecutionPlan.getPartitionedSourceOrder());// 保存所有生成周期为Split级别的DriverSplitRunnerFactory实例ImmutableMap.Builder<PlanNodeId,DriverSplitRunnerFactory> driverRunnerFactoriesWithSplitLifeCycle =ImmutableMap.builder();// 保存所有生命周期为Task级别的DriverSplitRunnerFactory的实例ImmutableList.Builder<DriverSplitRunnerFactory> driverRunnerFactoriesWithTaskLifeCycle =ImmutableList.builder();// 保存所有生命周期为Group级别的DriverSplitRunnerFactory实例ImmutableList.Builder<DriverSplitRunnerFactory> driverRunnerFactoriesWithDriverGroupLifeCycle =ImmutableList.builder();for(DriverFactory driverFactory : localExecutionPlan.getDriverFactories()){// 获取当前Driver的最上游的PlanNodeIdOptional<PlanNodeId> sourceId = driverFactory.getSourceId();if(sourceId.isPresent()&& partitionedSources.contains(sourceId.get())){// 如果这个Driver有输入,同时是一个分区类型的Source Node,那么这个Driver的生命周期就是与Split绑定的,即// 绑定的Split被处理完,那么这个Driver就没用了。
                    driverRunnerFactoriesWithSplitLifeCycle.put(sourceId.get(),newDriverSplitRunnerFactory(driverFactory,true));}else{// 如果这个Driver是一个下游的Driver实例,switch(driverFactory.getPipelineExecutionStrategy()){caseGROUPED_EXECUTION:// 如果是GROUP LifeSpan,那么就需要每一个Drvier创建一个Runner Factory,添加到相应的等待队列中
                            driverRunnerFactoriesWithDriverGroupLifeCycle.add(newDriverSplitRunnerFactory(driverFactory,false));break;caseUNGROUPED_EXECUTION:// 如果是GROUP LifeSpan,那么就需要每一个Drvier创建一个Runner
                            driverRunnerFactoriesWithTaskLifeCycle.add(newDriverSplitRunnerFactory(driverFactory,false));break;default:thrownewUnsupportedOperationException();}}}this.driverRunnerFactoriesWithSplitLifeCycle = driverRunnerFactoriesWithSplitLifeCycle.build();this.driverRunnerFactoriesWithDriverGroupLifeCycle = driverRunnerFactoriesWithDriverGroupLifeCycle.build();this.driverRunnerFactoriesWithTaskLifeCycle = driverRunnerFactoriesWithTaskLifeCycle.build();this.pendingSplitsByPlanNode =this.driverRunnerFactoriesWithSplitLifeCycle.keySet().stream().collect(toImmutableMap(identity(), ignore ->newPendingSplitsForPlanNode()));this.status =newStatus(
                    taskContext,
                    localExecutionPlan.getDriverFactories().stream().collect(toImmutableMap(DriverFactory::getPipelineId,DriverFactory::getPipelineExecutionStrategy)));this.schedulingLifespanManager =newSchedulingLifespanManager(localExecutionPlan.getPartitionedSourceOrder(), localExecutionPlan.getStageExecutionDescriptor(),this.status);checkArgument(this.driverRunnerFactoriesWithSplitLifeCycle.keySet().equals(partitionedSources),"Fragment is partitioned, but not all partitioned drivers were found");// Pre-register Lifespans for ungrouped partitioned drivers in case they end up get no splits.for(Entry<PlanNodeId,DriverSplitRunnerFactory> entry :this.driverRunnerFactoriesWithSplitLifeCycle.entrySet()){PlanNodeId planNodeId = entry.getKey();DriverSplitRunnerFactory driverSplitRunnerFactory = entry.getValue();if(driverSplitRunnerFactory.getPipelineExecutionStrategy()==UNGROUPED_EXECUTION){this.schedulingLifespanManager.addLifespanIfAbsent(Lifespan.taskWide());this.pendingSplitsByPlanNode.get(planNodeId).getLifespan(Lifespan.taskWide());}}// don't register the task if it is already completed (most likely failed during planning above)if(!taskStateMachine.getState().isDone()){
                taskHandle =createTaskHandle(taskStateMachine, taskContext, outputBuffer, localExecutionPlan, taskExecutor);}else{
                taskHandle =null;}// 追加一个Listener,当前Outputbuffer处于FINISHED状态时,检查当前的SqkTaskExecution是否完成了。
            outputBuffer.addStateChangeListener(newCheckTaskCompletionOnBufferFinish(SqlTaskExecution.this));}}
SqlTaskExecution::addSources

addSources()方法,用于将客户端(Coordinator)发送的新Splits,按类型添加到相应调度队列中,并尝试调度之。

publicclassSqlTaskExecution{publicvoidaddSources(List<TaskSource> sources){requireNonNull(sources,"sources is null");checkState(!Thread.holdsLock(this),"Cannot add sources while holding a lock on the %s",getClass().getSimpleName());try(SetThreadName ignored =newSetThreadName("Task-%s", taskId)){// update our record of sources and schedule drivers for new partitioned splits// 返回的updatedUnpartitionedSources集合,包含了所有非分区类型的、未处理完成的Splits// 而分区类型的Splits则通过SqlTaskExecution::schedulePartitionedSource(..)方法被调度Map<PlanNodeId,TaskSource> updatedUnpartitionedSources =updateSources(sources);// 调度所有的非分区类型的Splits// tell existing drivers about the new splits; it is safe to update drivers// multiple times and out of order because sources contain full record of// the unpartitioned splitsfor(WeakReference<Driver> driverReference : drivers){Driver driver = driverReference.get();// the driver can be GCed due to a failure or a limitif(driver ==null){// remove the weak reference from the list to avoid a memory leak// NOTE: this is a concurrent safe operation on a CopyOnWriteArrayList
                    drivers.remove(driverReference);continue;}Optional<PlanNodeId> sourceId = driver.getSourceId();if(sourceId.isEmpty()){continue;}TaskSource sourceUpdate = updatedUnpartitionedSources.get(sourceId.get());if(sourceUpdate ==null){continue;}
                driver.updateSource(sourceUpdate);}// we may have transitioned to no more splits, so check for completioncheckTaskCompletion();}}}

SqlTask的调度&执行

SqlTaskExecution::schedulePartitionedSource

SqlTask每收到新的Splits,就调用

schedulePartitionedSource(TaskSource)

方法调度Splits。

privatesynchronizedvoidschedulePartitionedSource(TaskSource sourceUpdate){mergeIntoPendingSplits(sourceUpdate.getPlanNodeId(), sourceUpdate.getSplits(), sourceUpdate.getNoMoreSplitsForLifespan(), sourceUpdate.isNoMoreSplits());while(true){// SchedulingLifespanManager tracks how far each Lifespan has been scheduled. Here is an example.// Let's say there are 4 source pipelines/nodes: A, B, C, and D, in scheduling order.// And we're processing 3 concurrent lifespans at a time. In this case, we could have//// * Lifespan 10:  A   B  [C]  D; i.e. Pipeline A and B has finished scheduling (but not necessarily finished running).// * Lifespan 20: [A]  B   C   D// * Lifespan 30:  A  [B]  C   D//// To recap, SchedulingLifespanManager records the next scheduling source node for each lifespan.// schedulingLifespanManager维护了两种类型的LifeSpan://   Task Wide:Split的运行时辐射范围对应于split/task lifecycle,与Group的Source Pipeline是互斥的,//              同一时刻只能有一个task wide的pipeline和一个group wide的pipeline并行调度执行。//   Task Group Wide:Split的运行辐射范围对应于Driver Group lifecycle,如果有多个Source Pipeline,那么对于//                    相同的Group(就是一个Partition),同一时刻只能有一个在调度&执行的Source Pipeline;对于//                    不同的Group,可以并行调度。//// 获取还需要调度执行的LifeSpan,调度属于这个范围内的Splits执行。Iterator<SchedulingLifespan> activeLifespans = schedulingLifespanManager.getActiveLifespans();boolean madeProgress =false;while(activeLifespans.hasNext()){SchedulingLifespan schedulingLifespan = activeLifespans.next();Lifespan lifespan = schedulingLifespan.getLifespan();// Continue using the example from above. Let's say the sourceUpdate adds some new splits for source node B.//// For lifespan 30, it could start new drivers and assign a pending split to each.// Pending splits could include both pre-existing pending splits, and the new ones from sourceUpdate.// If there is enough driver slots to deplete pending splits, one of the below would happen.// * If it is marked that all splits for node B in lifespan 30 has been received, SchedulingLifespanManager//   will be updated so that lifespan 30 now processes source node C. It will immediately start processing them.// * Otherwise, processing of lifespan 30 will be shelved for now.//// It is possible that the following loop would be a no-op for a particular lifespan.// It is also possible that a single lifespan can proceed through multiple source nodes in one run.//// When different drivers in the task has different pipelineExecutionStrategy, it adds additional complexity.// For example, when driver B is ungrouped and driver A, C, D is grouped, you could have something like this://     TaskWide   :     [B]//     Lifespan 10:  A  [ ]  C   D//     Lifespan 20: [A]      C   D//     Lifespan 30:  A  [ ]  C   D// In this example, Lifespan 30 cannot start executing drivers in pipeline C because pipeline B// hasn't finished scheduling yet (albeit in a different lifespan).// Similarly, it wouldn't make sense for TaskWide to start executing drivers in pipeline B until at least// one lifespan has finished scheduling pipeline A.// This is why getSchedulingPlanNode returns an Optional.while(true){Optional<PlanNodeId> optionalSchedulingPlanNode = schedulingLifespan.getSchedulingPlanNode();if(optionalSchedulingPlanNode.isEmpty()){break;}PlanNodeId schedulingPlanNode = optionalSchedulingPlanNode.get();// driverRunnerFactoriesWithSplitLifeCycle存储的PlanNode实际上就是Source Node类型,因此这些Split// 对应了数据源的数据,因此需要被Repartitioning,以便能够被Trino对数据进行分桶(重分区),满足在之前章节讲到的// PartitionId -> WorkNode的数据分发策略。DriverSplitRunnerFactory partitionedDriverRunnerFactory = driverRunnerFactoriesWithSplitLifeCycle.get(schedulingPlanNode);PendingSplits pendingSplits = pendingSplitsByPlanNode.get(schedulingPlanNode).getLifespan(lifespan);// Enqueue driver runners with driver group lifecycle for this driver life cycle, if not already enqueued.if(!lifespan.isTaskWide()&&!schedulingLifespan.getAndSetDriversForDriverGroupLifeCycleScheduled()){// 如果当前要调度的LifeSpan的类型为Grouped,同时还没有被调度,就走这里,为当前SqlTask的所有的Pipeline// 创建DriverRunner。// 此时,这个SqlTask是某个一个中继Stage/PlanFragment的某个分区的任务实例,因此属于这个LifSpan的Split的// 信息已经确定了,而且已经被分区过了,因此内部会调用enqueueDriverSplitRunner(true, runners);方法,// 直接唤起每一个DriverRunnerscheduleDriversForDriverGroupLifeCycle(lifespan);}// Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination.// 如果lifespan是TASK WIDE,那么这些Split是叶子Split,即TableScan Splits,因此不能直接唤起它们的Runner,// 需要根据当前Worker Node的负载进行调度ImmutableList.Builder<DriverSplitRunner> runners =ImmutableList.builder();for(ScheduledSplit scheduledSplit : pendingSplits.removeAllSplits()){// create a new driver for the split
                        runners.add(partitionedDriverRunnerFactory.createDriverRunner(scheduledSplit, lifespan));}enqueueDriverSplitRunner(false, runners.build());// If all driver runners have been enqueued for this plan node and driver life cycle combination,// move on to the next plan node.if(pendingSplits.getState()!=NO_MORE_SPLITS){break;}// 到这里,不会再有新的Split了,那么就进当前的SqlTaskExecution实例进行清理
                    partitionedDriverRunnerFactory.noMoreDriverRunner(ImmutableList.of(lifespan));
                    pendingSplits.markAsCleanedUp();

                    schedulingLifespan.nextPlanNode();
                    madeProgress =true;if(schedulingLifespan.isDone()){break;}}}if(!madeProgress){break;}}if(sourceUpdate.isNoMoreSplits()){// 通知SchedulingLifespanManager,当前TaskSource对应的PlanNode的工作已经完成。
            schedulingLifespanManager.noMoreSplits(sourceUpdate.getPlanNodeId());}}

SqlTaskExecution::scheduleDriversForTaskLifeCycle

SqlTask接收到创建请求时,会尝试创建SqlTaskExecution,作为此SqlTask的执行实体,并在完成实例创建后,调用
scheduleDriversForTaskLifeCycle()方法开始调度。

// scheduleDriversForTaskLifeCycle and scheduleDriversForDriverGroupLifeCycle are similar.// They are invoked under different circumstances, and schedules a disjoint set of drivers, as suggested by their names.// They also have a few differences, making it more convenient to keep the two methods separate.privatevoidscheduleDriversForTaskLifeCycle(){// This method is called at the beginning of the task.// It schedules drivers for all the pipelines that have task life cycle.List<DriverSplitRunner> runners =newArrayList<>();for(DriverSplitRunnerFactory driverRunnerFactory : driverRunnerFactoriesWithTaskLifeCycle){for(int i =0; i < driverRunnerFactory.getDriverInstances().orElse(1); i++){
                runners.add(driverRunnerFactory.createDriverRunner(null,Lifespan.taskWide()));}}// driverRunnerFactoriesWithTaskLifeCycle存储的是中继续Stage/PlanFragment对应的某个分区的DriverSplitRunners,因此可以立即执行enqueueDriverSplitRunner(true, runners);for(DriverSplitRunnerFactory driverRunnerFactory : driverRunnerFactoriesWithTaskLifeCycle){
            driverRunnerFactory.noMoreDriverRunner(ImmutableList.of(Lifespan.taskWide()));verify(driverRunnerFactory.isNoMoreDriverRunner());}}

SqlTaskExecution::scheduleDriversForDriverGroupLifeCycle

privatevoidscheduleDriversForDriverGroupLifeCycle(Lifespan lifespan){// This method is called when a split that belongs to a previously unseen driver group is scheduled.// It schedules drivers for all the pipelines that have driver group life cycle.if(lifespan.isTaskWide()){checkArgument(driverRunnerFactoriesWithDriverGroupLifeCycle.isEmpty(),"Instantiating pipeline of driver group lifecycle at task level is not allowed");return;}List<DriverSplitRunner> runners =newArrayList<>();for(DriverSplitRunnerFactory driverSplitRunnerFactory : driverRunnerFactoriesWithDriverGroupLifeCycle){for(int i =0; i < driverSplitRunnerFactory.getDriverInstances().orElse(1); i++){
                runners.add(driverSplitRunnerFactory.createDriverRunner(null, lifespan));}}// 与scheduleDriversForTaskLifeCycle方法类似,这里的DriverSplitRunners是属于某个中继Stage/PlanFragment,可以立即执行enqueueDriverSplitRunner(true, runners);for(DriverSplitRunnerFactory driverRunnerFactory : driverRunnerFactoriesWithDriverGroupLifeCycle){
            driverRunnerFactory.noMoreDriverRunner(ImmutableList.of(lifespan));}}

DriverSplitRunner的执行,基于时间片的Split调度

DriverSplitRunner,负责应用一组物理

Operators到一个Split上

,表示一段完整的数据处理过程,且数据处理的最小单元是

Page

待后续的文章中详解,放在这里,篇幅过长了。

Pipeline,就是一组可独立运行的Operators的描述

Driver是对Pipeline的执行实例,因此一个Pipeline可以由多个Drivers并行执行

时间片

标签: OLAP java 大数据

本文转载自: https://blog.csdn.net/u014445499/article/details/135206884
版权归原作者 Dreammmming Time 所有, 如有侵权,请联系我们删除。

“Trino:分区表上的SQL提交 & 查询流程浅析”的评论:

还没有评论