0


spark sql解析过程详解

spark sql解析

spark-sql解析图
spark sql解析过程这里直接引用论文

Spark SQL: Relational Data Processing in Spark

中的流程图,整体流程非常的清晰。下面将按顺序进去讲解。
从Analysis这个阶段开始,主要流程都是在QueryExecution类中进行处理的。

// Analysis阶段
lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) {
  // We can't clone `logical` here, which will reset the `_analyzed` flag.
  sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}

// 优化阶段
lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
  // clone the plan to avoid sharing the plan instance between different stages like analyzing,
  // optimizing and planning.
  val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
  // We do not want optimized plans to be re-analyzed as literals that have been constant folded
  // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state
  // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.
  plan.setAnalyzed()
  plan
}

// 转换成物理计划
lazy val sparkPlan: SparkPlan = {
  // We need to materialize the optimizedPlan here because sparkPlan is also tracked under
  // the planning phase
  assertOptimized()
  executePhase(QueryPlanningTracker.PLANNING) {
    // Clone the logical plan here, in case the planner rules change the states of the logical
    // plan.
    QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
  }
}
 
// 转换成可执行计划
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = {
  // We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure
  // that the optimization time is not counted as part of the planning phase.
  assertOptimized()
  executePhase(QueryPlanningTracker.PLANNING) {
    // clone the plan to avoid sharing the plan instance between different stages like analyzing,
    // optimizing and planning.
    QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
  }
}

// 转换成rdd
lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(
executedPlan.execute(), sparkSession.sessionState.conf)

antlr4的解析

依靠antlr4的语法分析能力,将定义的sql语法解析成对应的LogicalPlan。这一块内容比较多,以后专门开一篇来讲解。这里只要记住将sql解析成了一颗LogicalPlan树,树的每个节点都是TreeNode。
拿一个简单的例子来看看:

val sql =
    """
      |select a.name, b.age
      |from local.ods.member1 a join local.ods.member1 b
      |on a.name = b.name
      |""".stripMargin

  val logicalPlan: LogicalPlan = spark.sessionState.sqlParser.parsePlan(sql)

这里得到未解析的LogicalPlan

'Project ['a.name, 'b.age]
+- 'Join Inner, ('a.name = 'b.name)
   :- 'SubqueryAlias a
   :  +- 'UnresolvedRelation [local, ods, member1], [], false
   +- 'SubqueryAlias b
      +- 'UnresolvedRelation [local, ods, member1], [], false
  1. 顶层节点是Project节点里面包含了两个UnresolvedAttribute,一个是a.name,一个是b.age
  2. Project节点的child节点是Join节点,Join节点里面有一个left节点和一个right节点, 它们的类为SubqueryAlias。Join节点的joinType为Inner类型, condition为Some(('a.name = 'b.name))。
  3. 左节点和右节点类似,看一个就行。左节点的child节点为UnresolveRelation节点,里面有一个multipartIdentifier属性,它是一个ArrayBuffer类型,里面有三个元素:local, ods,member1。这个UnresolveRelation节点是LeafNode,所以它没有子节点了。

Analysis阶段

Analysis阶段依次调用了Analyzer的execute方法–>RuleExecutor的execute方法。后面的这个execute方法里面接受一个LogicalPlan。
这里面主要有一个batches成员,定义了一系列的规则。

 /** A batch of rules. */
 protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

 /** Defines a sequence of rule batches, to be overridden by the implementation. */
 protected def batches: Seq[Batch]

Analyzer里面重写的batches:

override def batches: Seq[Batch] = Seq(
   Batch("Substitution", fixedPoint,
     // This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
     // However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
     // very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
     // at the beginning of analysis.
     OptimizeUpdateFields,
     CTESubstitution,
     WindowsSubstitution,
     EliminateUnions,
     SubstituteUnresolvedOrdinals),
   Batch("Disable Hints", Once,
     new ResolveHints.DisableHints),
   Batch("Hints", fixedPoint,
     ResolveHints.ResolveJoinStrategyHints,
     ResolveHints.ResolveCoalesceHints),
   Batch("Simple Sanity Check", Once,
     LookupFunctions),
   Batch("Resolution", fixedPoint,
     ResolveTableValuedFunctions ::
     ResolveNamespace(catalogManager) ::
     new ResolveCatalogs(catalogManager) ::
     ResolveUserSpecifiedColumns ::
     ResolveInsertInto ::
     ResolveRelations ::
     ResolveTables ::
     ResolvePartitionSpec ::
     AddMetadataColumns ::
     ResolveReferences ::
     ResolveCreateNamedStruct ::
     ResolveDeserializer ::
     ResolveNewInstance ::
     ResolveUpCast ::
     ResolveGroupingAnalytics ::
     ResolvePivot ::
     ResolveOrdinalInOrderByAndGroupBy ::
     ResolveAggAliasInGroupBy ::
     ResolveMissingReferences ::
     ExtractGenerator ::
     ResolveGenerate ::
     ResolveFunctions ::
     ResolveAliases ::
     ResolveSubquery ::
     ResolveSubqueryColumnAliases ::
     ResolveWindowOrder ::
     ResolveWindowFrame ::
     ResolveNaturalAndUsingJoin ::
     ResolveOutputRelation ::
     ExtractWindowExpressions ::
     GlobalAggregates ::
     ResolveAggregateFunctions ::
     TimeWindowing ::
     ResolveInlineTables ::
     ResolveHigherOrderFunctions(v1SessionCatalog) ::
     ResolveLambdaVariables ::
     ResolveTimeZone ::
     ResolveRandomSeed ::
     ResolveBinaryArithmetic ::
     ResolveUnion ::
     TypeCoercion.typeCoercionRules ++
     extendedResolutionRules : _*),
   Batch("Apply Char Padding", Once,
     ApplyCharTypePadding),
   Batch("Post-Hoc Resolution", Once,
     Seq(ResolveNoopDropTable) ++
     postHocResolutionRules: _*),
   Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
   Batch("Remove Unresolved Hints", Once,
     new ResolveHints.RemoveAllHints),
   Batch("Nondeterministic", Once,
     PullOutNondeterministic),
   Batch("UDF", Once,
     HandleNullInputsForUDF,
     ResolveEncodersInUDF),
   Batch("UpdateNullability", Once,
     UpdateAttributeNullability),
   Batch("Subquery", Once,
     UpdateOuterReferences),
   Batch("Cleanup", fixedPoint,
     CleanupAliases)
 )

这里是一些解析规则,继承于

Plan

类。

batches.foreach { batch =>
  val batchStartPlan = curPlan
  var iteration = 1
  var lastPlan = curPlan
  var continue = true
  
  // Run until fix point (or the max number of iterations as specified in the strategy.
  while (continue) {
    curPlan = batch.rules.foldLeft(curPlan) {
      case (plan, rule) =>
        val result = rule(plan)
        ...

核心部分就是这个循环,将上面的解析规则一个个应用到plan上面。

rule(plan)

这里调用的是rule的apply方法。上面的例子中有一个

UnresolvedRelation 

,所以这里就选用这个plan对应的规则

ResolveRelations

跟进去看看。

ResolveRelations

的apply方法首先调用了一个

ResolveTempViews

的规则。

object ResolveTempViews extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
    case u @ UnresolvedRelation(ident, _, isStreaming) =>
      lookupAndResolveTempView(ident, isStreaming).getOrElse(u)
    case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _, _) =>
      lookupAndResolveTempView(ident)
        .map(view => i.copy(table = view))
        .getOrElse(i)
    // TODO (SPARK-27484): handle streaming write commands when we have them.
    ...

这里对plan调用

resolveOperatorsUp

的方法。

 /**
  * Returns a copy of this node where `rule` has been recursively applied first to all of its
  * children and then itself (post-order, bottom-up). When `rule` does not apply to a given node,
  * it is left unchanged.  This function is similar to `transformUp`, but skips sub-trees that
  * have already been marked as analyzed.
  *
  * @param rule the function use to transform this nodes children
  */
 def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
   if (!analyzed) {
     AnalysisHelper.allowInvokingTransformsInAnalyzer {
       val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
       if (self fastEquals afterRuleOnChildren) {
         CurrentOrigin.withOrigin(origin) {
           rule.applyOrElse(self, identity[LogicalPlan])
         }
       } else {
         CurrentOrigin.withOrigin(origin) {
           rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
         }
       }
     }
   } else {
     self
   }
 }

这个方法说的是返回此节点的副本,将rule递归应用于自己的孩子节点,然后是自己(后序,自下而上)。 当rule不适用于给定节点时,保持plan不变。 这个函数类似于 transformUp ,但是跳过已经分析过的节点。这个方法接受的是一个偏函数,在这个例子里面,会自下而上的匹配plan。最下面的节点是

UnresolvedRelation

,所以会调用下面的

lookupAndResolveTempView

方法。

case u @ UnresolvedRelation(ident, _, isStreaming) =>
  lookupAndResolveTempView(ident, isStreaming).getOrElse(u)

这个方法的核心是在当前的catalog里面寻找注册的view:

val tmpView = identifier match {
   case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
   case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2)
   case _ => None
 }

不是view就返回None。

ResolveRelations

这个rule调用ResolveTempViews(plan)这个方法后紧接着又调用了一次

resolveOperatorsUp

方法;

case u: UnresolvedRelation =>
  lookupRelation(u.multipartIdentifier, u.options, u.isStreaming)
    .map(resolveViews).getOrElse(u)

这次匹配到

UnresolvedRelation

,会调用

lookupRelation

方法。这个方法的逻辑是:
1)如果被解析的catalog不是当前会话(session)的catalog,返回None
2)如果relation不在catalog里面,返回None
3)如果发现的是v1 table,返回v1 relation,其他的返回v2 relation。

private def lookupRelation(
    identifier: Seq[String],
    options: CaseInsensitiveStringMap,
    isStreaming: Boolean): Option[LogicalPlan] = {
  expandRelationName(identifier) match {
    case SessionCatalogAndIdentifier(catalog, ident) =>
    ...
    case _ => None

因为是iceberg的表,这里的没有匹配上,返回了None。这里很奇怪,明明是

UnresolvedRelation

的节点,为什么在

ResolveRelations

里面没有做任何处理,但是在Analysis的过程中确实被解析过了。debug发现,iceberg的

UnresolvedRelation

是在

ResolveTables

规则中处理的。

object ResolveTables extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
    case u: UnresolvedRelation =>
      lookupV2Relation(u.multipartIdentifier, u.options, u.isStreaming)

这里面调用了一个

lookupV2Relation

的方法。

private def lookupV2Relation(
    identifier: Seq[String],
    options: CaseInsensitiveStringMap,
    isStreaming: Boolean): Option[LogicalPlan] =
  expandRelationName(identifier) match {
    case NonSessionCatalogAndIdentifier(catalog, ident) =>
      CatalogV2Util.loadTable(catalog, ident) match {
        case Some(table) =>
          if (isStreaming) {
            Some(StreamingRelationV2(None, table.name, table, options,
              table.schema.toAttributes, Some(catalog), Some(ident), None))
          } else {
            Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
          }
        case None => None
      }
    case _ => None
  }

这个跟上面的不同在于这里是

NonSessionCatalogAndIdentifier

上面匹配的是

SessionCatalogAndIdentifier

。根据注释来看

lookupV2Relation

这个函数是从v2 catalog里面查找DataSourceV2。Iceberg的实现是基于DataSourceV2,因此匹配到了这里的规则。然后将

UnresolvedRelation

转换成了

RelationV2

。结果如下:

Project [name#6, age#11]
+- Join Inner, (name#6 = name#9)
   :- SubqueryAlias a
   :  +- SubqueryAlias local.ods.member1
   :     +- RelationV2[name#6, sex#7, age#8] local.ods.member1
   +- SubqueryAlias b
      +- SubqueryAlias local.ods.member1
         +- RelationV2[name#9, sex#10, age#11] local.ods.member1

这里只给出了数据源解析部分的分析,其他部分感兴趣的可以自己debug看看。

Optimizer阶段

题外

这题我看到了scala一个神奇的操作,抽象类里面定义的是

protected def batches: Seq[Batch]

一个方法,在子类里面用一个成员去重写。

object Optimize extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("Subqueries", Once,
      EliminateSubqueryAliases) ::
    // this batch must reach expected state in one pass
    Batch("Filter Pushdown One Pass", Once,
      ReorderJoin,
      PushDownPredicates
    ) :: Nil
}

正题

Optimizer优化整体的流程和Analysis使用的是相同的流程,Optimizer对batches也进行了重写。

final override def batches: Seq[Batch] = {
  val excludedRulesConf =
    SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq)
  val excludedRules = excludedRulesConf.filter { ruleName =>
    val nonExcludable = nonExcludableRules.contains(ruleName)
    if (nonExcludable) {
      logWarning(s"Optimization rule '${ruleName}' was not excluded from the optimizer " +
        s"because this rule is a non-excludable rule.")
    }
    !nonExcludable
  }
  if (excludedRules.isEmpty) {
    defaultBatches
  } else {
    defaultBatches.flatMap { batch =>
      val filteredRules = batch.rules.filter { rule =>
        val exclude = excludedRules.contains(rule.ruleName)
        if (exclude) {
          logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.")
        }
        !exclude
      }
      if (batch.rules == filteredRules) {
        Some(batch)
      } else if (filteredRules.nonEmpty) {
        Some(Batch(batch.name, batch.strategy, filteredRules: _*))
      } else {
        logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " +
          s"as all enclosed rules have been excluded.")
        None
      }
    }
  }
}

去掉排除的规则,如果配置中排除规则为空,则使用默认的规则defaultBatches。默认的规则非常的多,这里就不一一列出了。

def defaultBatches: Seq[Batch] = {
  val operatorOptimizationRuleSet =
    Seq(
      // Operator push down
      PushProjectionThroughUnion,
      ReorderJoin,
      EliminateOuterJoin,
      PushDownPredicates,
      PushDownLeftSemiAntiJoin,
      PushLeftSemiLeftAntiThroughJoin,
      LimitPushDown,
      ColumnPruning,
      // Operator combine
      ...

大概是这个样子,粗略地可以看到有谓语下压,列裁剪之类的优化操作,这些是在Optimizer里面做的。这里具体的过程等有时间的时候再详细看看,留个坑。得到的优化后的逻辑计划如下:

Project [name#6, age#11]
+- Join Inner, (name#6 = name#9)
   :- Filter isnotnull(name#6)
   :  +- RelationV2[name#6] local.ods.member1
   +- Filter isnotnull(name#9)
      +- RelationV2[name#9, age#11] local.ods.member1

Physical Plan阶段

将optimazedPlan转换成物理执行计划SparkPlan。

lazyval sparkPlan: SparkPlan = {
  // We need to materialize the optimizedPlan here because sparkPlan is also tracked under
  // the planning phase
  assertOptimized()
  executePhase(QueryPlanningTracker.PLANNING) {
    // Clone the logical plan here, in case the planner rules change the states of the logical
    // plan.
    QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
  }
}
def createSparkPlan(
    sparkSession: SparkSession,
    planner: SparkPlanner,
    plan: LogicalPlan): SparkPlan = {
  // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
  //       but we will implement to choose the best plan.
  planner.plan(ReturnAnswer(plan)).next()
}

这里对优化后的plan做了一层包装,这个ReturnAnswer节点的注释是规则的模式匹配只作用于最顶层的逻辑计划。然后将其丢入了SparkPlan中,这里有一个next是因为plan方法返回的是一个迭代器。

def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
  // Obviously a lot to do here still...

  // Collect physical plan candidates.
  val candidates = strategies.iterator.flatMap(_(plan))

  // The candidates may contain placeholders marked as [[planLater]],
  // so try to replace them by their child plans.
  val plans = candidates.flatMap { candidate =>
    val placeholders = collectPlaceholders(candidate)

    if (placeholders.isEmpty) {
      // Take the candidate as is because it does not contain placeholders.
      Iterator(candidate)
    } else {
      // Plan the logical plan marked as [[planLater]] and replace the placeholders.
      placeholders.iterator.foldLeft(Iterator(candidate)) {
        case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
          // Plan the logical plan for the placeholder.
          val childPlans = this.plan(logicalPlan)

          candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
            childPlans.map { childPlan =>
              // Replace the placeholder by the child plan
              candidateWithPlaceholders.transformUp {
                case p if p.eq(placeholder) => childPlan
              }
            }
          }
      }
    }
  }

这个plan方法调用的是父类QueryPlanner的plan方法,也是物理执行计划的核心部分。我们顺着看第一个部分

strategies.iterator.flatMap(_(plan))

,SparkPlanner重写了策略,下面是实际执行的策略:

override def strategies: Seq[Strategy] =
  experimentalMethods.extraStrategies ++
    extraPlanningStrategies ++ (
    LogicalQueryStageStrategy ::
    PythonEvals ::
    new DataSourceV2Strategy(session) ::
    FileSourceStrategy ::
    DataSourceStrategy ::
    SpecialLimits ::
    Aggregation ::
    Window ::
    JoinSelection ::
    InMemoryScans ::
    BasicOperators :: Nil)

可以看到有数据源,聚合,窗口,Join等。将上面的策略依次执行到当前的节点,注意这里是不处理子节点的,子节点会在后面递归调用到这里时处理。优化后的逻辑计划顶层节点是Project节点,外面包装了ReturnAnswer节点,这个节点会被

SpecialLimits

策略转换成一个

PlanLater

节点,然后被扔进

collectPlaceholders

方法中。

override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = {
  plan.collect {
    case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan
  }
}

这个collect方法会对当前节点和children节点作用collect后面的偏函数,这个偏函数处理的是PlanLater节点,当前只有根节点是PlanLater节点,返回的是一个元祖,第一个元素是当前PlanLater节点的本身,第二个元素是当前节点被转换前的节点,也就是Project节点。这个PlanLater节点相当于是一个占位符,这个占位符对应的是没有处理的节点。后面会用处理后的节点来替换这个占位符。接着,Project节点被递归调用

val childPlans = this.plan(logicalPlan)

,重新扔进了plan方法中。这一次strategies中能够处理Project节点的策略就会起作用对其进行转换。在这个递归调用的作用下,从上至下所有的节点都会被对应的策略转换。Project->Join->Filter->RelationV2。

candidateWithPlaceholders.transformUp {
     case p if p.eq(placeholder) => childPlan
}

然后从下往上对

PlanLater

进行替换。最终用

Project

的物理计划

ProjectExec

PlanLater(Project)

替换并返回。最终的转换结果如下:

Project [name#6, age#11]
+- BroadcastHashJoin [name#6], [name#9], Inner, BuildLeft, false
   :- Project [name#6]
   :  +- Filter isnotnull(name#6)
   :     +- BatchScan[name#6] local.ods.member1 [filters=name IS NOT NULL]
   +- Project [name#9, age#11]
      +- Filter isnotnull(name#9)
         +- BatchScan[name#9, age#11] local.ods.member1 [filters=name IS NOT NULL]

中间具体的转换策略等有时间再写,留个坑。

Prepare Executed Plan

执行准备阶段,构建一系列的规则用作执行的准备。这些规则保证子查询执行计划的构建,确保数据分区和排序的正确性,

insert whole stage

代码的生成,通过复用

exchange

subqueries

来减少工作。一下是具体的规则:

private[execution] def preparations(
    sparkSession: SparkSession,
    adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
  // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
  // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
  adaptiveExecutionRule.toSeq ++
  Seq(
    CoalesceBucketsInJoin,
    PlanDynamicPruningFilters(sparkSession),
    PlanSubqueries(sparkSession),
    RemoveRedundantProjects,
    EnsureRequirements,
    // `RemoveRedundantSorts` needs to be added after `EnsureRequirements` to guarantee the same
    // number of partitions when instantiating PartitioningCollection.
    RemoveRedundantSorts,
    DisableUnnecessaryBucketedScan,
    ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules),
    CollapseCodegenStages(),
    ReuseExchange,
    ReuseSubquery
  )

这里看一个例子:

object ReuseExchange extends Rule[SparkPlan] {

  def apply(plan: SparkPlan): SparkPlan = {
    if (!conf.exchangeReuseEnabled) {
      return plan
    }
    // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
    val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()

    // Replace a Exchange duplicate with a ReusedExchange
    def reuse: PartialFunction[Exchange, SparkPlan] = {
      case exchange: Exchange =>
        val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
        val samePlan = sameSchema.find { e =>
          exchange.sameResult(e)
        }
        if (samePlan.isDefined) {
          // Keep the output of this exchange, the following plans require that to resolve
          // attributes.
          ReusedExchangeExec(exchange.output, samePlan.get)
        } else {
          sameSchema += exchange
          exchange
        }
    }

    plan transformUp {
      case exchange: Exchange => reuse(exchange)
    } transformAllExpressions {
      // Lookup inside subqueries for duplicate exchanges
      case in: InSubqueryExec =>
        val newIn = in.plan.transformUp {
          case exchange: Exchange => reuse(exchange)
        }
        in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
    }
  }
}

可以看到,有一个

exchanges

的HashMap存储了当前的Exchange信息,key为一个StructType对象,StructType存储了Join的字段信息,value是存储了Exchange的一个ArrayBuffer。这里先是在map中找到Join字段的Exchange的ArrayBuffer,如果找到了就与ArrayBuffer中的Exchange进行匹配,匹配上了就返回

ReusedExchangeExec

复用的Exchage对象。如果没有匹配上就向map中进行添加。上面的执行计划中有一个

BroadcastExchangeExec

节点,实现了

BroadcastExchangeLike

特质,这个特质又继承了

Exchange

的抽象类。例子中的exchanges内容如下所示:

(StructType(StructField(name,StringType,false)),

ArrayBuffer(BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#38]
+- *(1) Filter isnotnull(name#6)
   +- BatchScan[name#6] local.ods.member1 [filters=name IS NOT NULL]))

标签: 大数据 spark hadoop

本文转载自: https://blog.csdn.net/u012794781/article/details/127842905
版权归原作者 Chrollo 所有, 如有侵权,请联系我们删除。

“spark sql解析过程详解”的评论:

还没有评论