0


spark sql解析过程详解

spark sql解析

spark-sql解析图
spark sql解析过程这里直接引用论文

  1. Spark SQL: Relational Data Processing in Spark

中的流程图,整体流程非常的清晰。下面将按顺序进去讲解。
从Analysis这个阶段开始,主要流程都是在QueryExecution类中进行处理的。

  1. // Analysis阶段
  2. lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) {
  3. // We can't clone `logical` here, which will reset the `_analyzed` flag.
  4. sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
  5. }
  6. // 优化阶段
  7. lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
  8. // clone the plan to avoid sharing the plan instance between different stages like analyzing,
  9. // optimizing and planning.
  10. val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
  11. // We do not want optimized plans to be re-analyzed as literals that have been constant folded
  12. // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state
  13. // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.
  14. plan.setAnalyzed()
  15. plan
  16. }
  17. // 转换成物理计划
  18. lazy val sparkPlan: SparkPlan = {
  19. // We need to materialize the optimizedPlan here because sparkPlan is also tracked under
  20. // the planning phase
  21. assertOptimized()
  22. executePhase(QueryPlanningTracker.PLANNING) {
  23. // Clone the logical plan here, in case the planner rules change the states of the logical
  24. // plan.
  25. QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
  26. }
  27. }
  28. // 转换成可执行计划
  29. // executedPlan should not be used to initialize any SparkPlan. It should be
  30. // only used for execution.
  31. lazy val executedPlan: SparkPlan = {
  32. // We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure
  33. // that the optimization time is not counted as part of the planning phase.
  34. assertOptimized()
  35. executePhase(QueryPlanningTracker.PLANNING) {
  36. // clone the plan to avoid sharing the plan instance between different stages like analyzing,
  37. // optimizing and planning.
  38. QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
  39. }
  40. }
  41. // 转换成rdd
  42. lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(
  43. executedPlan.execute(), sparkSession.sessionState.conf)

antlr4的解析

依靠antlr4的语法分析能力,将定义的sql语法解析成对应的LogicalPlan。这一块内容比较多,以后专门开一篇来讲解。这里只要记住将sql解析成了一颗LogicalPlan树,树的每个节点都是TreeNode。
拿一个简单的例子来看看:

  1. val sql =
  2. """
  3. |select a.name, b.age
  4. |from local.ods.member1 a join local.ods.member1 b
  5. |on a.name = b.name
  6. |""".stripMargin
  7. val logicalPlan: LogicalPlan = spark.sessionState.sqlParser.parsePlan(sql)

这里得到未解析的LogicalPlan

  1. 'Project ['a.name, 'b.age]
  2. +- 'Join Inner, ('a.name = 'b.name)
  3. :- 'SubqueryAlias a
  4. : +- 'UnresolvedRelation [local, ods, member1], [], false
  5. +- 'SubqueryAlias b
  6. +- 'UnresolvedRelation [local, ods, member1], [], false
  1. 顶层节点是Project节点里面包含了两个UnresolvedAttribute,一个是a.name,一个是b.age
  2. Project节点的child节点是Join节点,Join节点里面有一个left节点和一个right节点, 它们的类为SubqueryAlias。Join节点的joinType为Inner类型, condition为Some(('a.name = 'b.name))。
  3. 左节点和右节点类似,看一个就行。左节点的child节点为UnresolveRelation节点,里面有一个multipartIdentifier属性,它是一个ArrayBuffer类型,里面有三个元素:local, ods,member1。这个UnresolveRelation节点是LeafNode,所以它没有子节点了。

Analysis阶段

Analysis阶段依次调用了Analyzer的execute方法–>RuleExecutor的execute方法。后面的这个execute方法里面接受一个LogicalPlan。
这里面主要有一个batches成员,定义了一系列的规则。

  1. /** A batch of rules. */
  2. protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
  3. /** Defines a sequence of rule batches, to be overridden by the implementation. */
  4. protected def batches: Seq[Batch]

Analyzer里面重写的batches:

  1. override def batches: Seq[Batch] = Seq(
  2. Batch("Substitution", fixedPoint,
  3. // This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
  4. // However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
  5. // very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
  6. // at the beginning of analysis.
  7. OptimizeUpdateFields,
  8. CTESubstitution,
  9. WindowsSubstitution,
  10. EliminateUnions,
  11. SubstituteUnresolvedOrdinals),
  12. Batch("Disable Hints", Once,
  13. new ResolveHints.DisableHints),
  14. Batch("Hints", fixedPoint,
  15. ResolveHints.ResolveJoinStrategyHints,
  16. ResolveHints.ResolveCoalesceHints),
  17. Batch("Simple Sanity Check", Once,
  18. LookupFunctions),
  19. Batch("Resolution", fixedPoint,
  20. ResolveTableValuedFunctions ::
  21. ResolveNamespace(catalogManager) ::
  22. new ResolveCatalogs(catalogManager) ::
  23. ResolveUserSpecifiedColumns ::
  24. ResolveInsertInto ::
  25. ResolveRelations ::
  26. ResolveTables ::
  27. ResolvePartitionSpec ::
  28. AddMetadataColumns ::
  29. ResolveReferences ::
  30. ResolveCreateNamedStruct ::
  31. ResolveDeserializer ::
  32. ResolveNewInstance ::
  33. ResolveUpCast ::
  34. ResolveGroupingAnalytics ::
  35. ResolvePivot ::
  36. ResolveOrdinalInOrderByAndGroupBy ::
  37. ResolveAggAliasInGroupBy ::
  38. ResolveMissingReferences ::
  39. ExtractGenerator ::
  40. ResolveGenerate ::
  41. ResolveFunctions ::
  42. ResolveAliases ::
  43. ResolveSubquery ::
  44. ResolveSubqueryColumnAliases ::
  45. ResolveWindowOrder ::
  46. ResolveWindowFrame ::
  47. ResolveNaturalAndUsingJoin ::
  48. ResolveOutputRelation ::
  49. ExtractWindowExpressions ::
  50. GlobalAggregates ::
  51. ResolveAggregateFunctions ::
  52. TimeWindowing ::
  53. ResolveInlineTables ::
  54. ResolveHigherOrderFunctions(v1SessionCatalog) ::
  55. ResolveLambdaVariables ::
  56. ResolveTimeZone ::
  57. ResolveRandomSeed ::
  58. ResolveBinaryArithmetic ::
  59. ResolveUnion ::
  60. TypeCoercion.typeCoercionRules ++
  61. extendedResolutionRules : _*),
  62. Batch("Apply Char Padding", Once,
  63. ApplyCharTypePadding),
  64. Batch("Post-Hoc Resolution", Once,
  65. Seq(ResolveNoopDropTable) ++
  66. postHocResolutionRules: _*),
  67. Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
  68. Batch("Remove Unresolved Hints", Once,
  69. new ResolveHints.RemoveAllHints),
  70. Batch("Nondeterministic", Once,
  71. PullOutNondeterministic),
  72. Batch("UDF", Once,
  73. HandleNullInputsForUDF,
  74. ResolveEncodersInUDF),
  75. Batch("UpdateNullability", Once,
  76. UpdateAttributeNullability),
  77. Batch("Subquery", Once,
  78. UpdateOuterReferences),
  79. Batch("Cleanup", fixedPoint,
  80. CleanupAliases)
  81. )

这里是一些解析规则,继承于

  1. Plan

类。

  1. batches.foreach { batch =>
  2. val batchStartPlan = curPlan
  3. var iteration = 1
  4. var lastPlan = curPlan
  5. var continue = true
  6. // Run until fix point (or the max number of iterations as specified in the strategy.
  7. while (continue) {
  8. curPlan = batch.rules.foldLeft(curPlan) {
  9. case (plan, rule) =>
  10. val result = rule(plan)
  11. ...

核心部分就是这个循环,将上面的解析规则一个个应用到plan上面。

  1. rule(plan)

这里调用的是rule的apply方法。上面的例子中有一个

  1. UnresolvedRelation

,所以这里就选用这个plan对应的规则

  1. ResolveRelations

跟进去看看。

  1. ResolveRelations

的apply方法首先调用了一个

  1. ResolveTempViews

的规则。

  1. object ResolveTempViews extends Rule[LogicalPlan] {
  2. def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
  3. case u @ UnresolvedRelation(ident, _, isStreaming) =>
  4. lookupAndResolveTempView(ident, isStreaming).getOrElse(u)
  5. case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _, _) =>
  6. lookupAndResolveTempView(ident)
  7. .map(view => i.copy(table = view))
  8. .getOrElse(i)
  9. // TODO (SPARK-27484): handle streaming write commands when we have them.
  10. ...

这里对plan调用

  1. resolveOperatorsUp

的方法。

  1. /**
  2. * Returns a copy of this node where `rule` has been recursively applied first to all of its
  3. * children and then itself (post-order, bottom-up). When `rule` does not apply to a given node,
  4. * it is left unchanged. This function is similar to `transformUp`, but skips sub-trees that
  5. * have already been marked as analyzed.
  6. *
  7. * @param rule the function use to transform this nodes children
  8. */
  9. def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
  10. if (!analyzed) {
  11. AnalysisHelper.allowInvokingTransformsInAnalyzer {
  12. val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
  13. if (self fastEquals afterRuleOnChildren) {
  14. CurrentOrigin.withOrigin(origin) {
  15. rule.applyOrElse(self, identity[LogicalPlan])
  16. }
  17. } else {
  18. CurrentOrigin.withOrigin(origin) {
  19. rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
  20. }
  21. }
  22. }
  23. } else {
  24. self
  25. }
  26. }

这个方法说的是返回此节点的副本,将rule递归应用于自己的孩子节点,然后是自己(后序,自下而上)。 当rule不适用于给定节点时,保持plan不变。 这个函数类似于 transformUp ,但是跳过已经分析过的节点。这个方法接受的是一个偏函数,在这个例子里面,会自下而上的匹配plan。最下面的节点是

  1. UnresolvedRelation

,所以会调用下面的

  1. lookupAndResolveTempView

方法。

  1. case u @ UnresolvedRelation(ident, _, isStreaming) =>
  2. lookupAndResolveTempView(ident, isStreaming).getOrElse(u)

这个方法的核心是在当前的catalog里面寻找注册的view:

  1. val tmpView = identifier match {
  2. case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
  3. case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2)
  4. case _ => None
  5. }

不是view就返回None。

  1. ResolveRelations

这个rule调用ResolveTempViews(plan)这个方法后紧接着又调用了一次

  1. resolveOperatorsUp

方法;

  1. case u: UnresolvedRelation =>
  2. lookupRelation(u.multipartIdentifier, u.options, u.isStreaming)
  3. .map(resolveViews).getOrElse(u)

这次匹配到

  1. UnresolvedRelation

,会调用

  1. lookupRelation

方法。这个方法的逻辑是:
1)如果被解析的catalog不是当前会话(session)的catalog,返回None
2)如果relation不在catalog里面,返回None
3)如果发现的是v1 table,返回v1 relation,其他的返回v2 relation。

  1. private def lookupRelation(
  2. identifier: Seq[String],
  3. options: CaseInsensitiveStringMap,
  4. isStreaming: Boolean): Option[LogicalPlan] = {
  5. expandRelationName(identifier) match {
  6. case SessionCatalogAndIdentifier(catalog, ident) =>
  7. ...
  8. case _ => None

因为是iceberg的表,这里的没有匹配上,返回了None。这里很奇怪,明明是

  1. UnresolvedRelation

的节点,为什么在

  1. ResolveRelations

里面没有做任何处理,但是在Analysis的过程中确实被解析过了。debug发现,iceberg的

  1. UnresolvedRelation

是在

  1. ResolveTables

规则中处理的。

  1. object ResolveTables extends Rule[LogicalPlan] {
  2. def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
  3. case u: UnresolvedRelation =>
  4. lookupV2Relation(u.multipartIdentifier, u.options, u.isStreaming)

这里面调用了一个

  1. lookupV2Relation

的方法。

  1. private def lookupV2Relation(
  2. identifier: Seq[String],
  3. options: CaseInsensitiveStringMap,
  4. isStreaming: Boolean): Option[LogicalPlan] =
  5. expandRelationName(identifier) match {
  6. case NonSessionCatalogAndIdentifier(catalog, ident) =>
  7. CatalogV2Util.loadTable(catalog, ident) match {
  8. case Some(table) =>
  9. if (isStreaming) {
  10. Some(StreamingRelationV2(None, table.name, table, options,
  11. table.schema.toAttributes, Some(catalog), Some(ident), None))
  12. } else {
  13. Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
  14. }
  15. case None => None
  16. }
  17. case _ => None
  18. }

这个跟上面的不同在于这里是

  1. NonSessionCatalogAndIdentifier

上面匹配的是

  1. SessionCatalogAndIdentifier

。根据注释来看

  1. lookupV2Relation

这个函数是从v2 catalog里面查找DataSourceV2。Iceberg的实现是基于DataSourceV2,因此匹配到了这里的规则。然后将

  1. UnresolvedRelation

转换成了

  1. RelationV2

。结果如下:

  1. Project [name#6, age#11]
  2. +- Join Inner, (name#6 = name#9)
  3. :- SubqueryAlias a
  4. : +- SubqueryAlias local.ods.member1
  5. : +- RelationV2[name#6, sex#7, age#8] local.ods.member1
  6. +- SubqueryAlias b
  7. +- SubqueryAlias local.ods.member1
  8. +- RelationV2[name#9, sex#10, age#11] local.ods.member1

这里只给出了数据源解析部分的分析,其他部分感兴趣的可以自己debug看看。

Optimizer阶段

题外

这题我看到了scala一个神奇的操作,抽象类里面定义的是

  1. protected def batches: Seq[Batch]

一个方法,在子类里面用一个成员去重写。

  1. object Optimize extends RuleExecutor[LogicalPlan] {
  2. val batches =
  3. Batch("Subqueries", Once,
  4. EliminateSubqueryAliases) ::
  5. // this batch must reach expected state in one pass
  6. Batch("Filter Pushdown One Pass", Once,
  7. ReorderJoin,
  8. PushDownPredicates
  9. ) :: Nil
  10. }

正题

Optimizer优化整体的流程和Analysis使用的是相同的流程,Optimizer对batches也进行了重写。

  1. final override def batches: Seq[Batch] = {
  2. val excludedRulesConf =
  3. SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq)
  4. val excludedRules = excludedRulesConf.filter { ruleName =>
  5. val nonExcludable = nonExcludableRules.contains(ruleName)
  6. if (nonExcludable) {
  7. logWarning(s"Optimization rule '${ruleName}' was not excluded from the optimizer " +
  8. s"because this rule is a non-excludable rule.")
  9. }
  10. !nonExcludable
  11. }
  12. if (excludedRules.isEmpty) {
  13. defaultBatches
  14. } else {
  15. defaultBatches.flatMap { batch =>
  16. val filteredRules = batch.rules.filter { rule =>
  17. val exclude = excludedRules.contains(rule.ruleName)
  18. if (exclude) {
  19. logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.")
  20. }
  21. !exclude
  22. }
  23. if (batch.rules == filteredRules) {
  24. Some(batch)
  25. } else if (filteredRules.nonEmpty) {
  26. Some(Batch(batch.name, batch.strategy, filteredRules: _*))
  27. } else {
  28. logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " +
  29. s"as all enclosed rules have been excluded.")
  30. None
  31. }
  32. }
  33. }
  34. }

去掉排除的规则,如果配置中排除规则为空,则使用默认的规则defaultBatches。默认的规则非常的多,这里就不一一列出了。

  1. def defaultBatches: Seq[Batch] = {
  2. val operatorOptimizationRuleSet =
  3. Seq(
  4. // Operator push down
  5. PushProjectionThroughUnion,
  6. ReorderJoin,
  7. EliminateOuterJoin,
  8. PushDownPredicates,
  9. PushDownLeftSemiAntiJoin,
  10. PushLeftSemiLeftAntiThroughJoin,
  11. LimitPushDown,
  12. ColumnPruning,
  13. // Operator combine
  14. ...

大概是这个样子,粗略地可以看到有谓语下压,列裁剪之类的优化操作,这些是在Optimizer里面做的。这里具体的过程等有时间的时候再详细看看,留个坑。得到的优化后的逻辑计划如下:

  1. Project [name#6, age#11]
  2. +- Join Inner, (name#6 = name#9)
  3. :- Filter isnotnull(name#6)
  4. : +- RelationV2[name#6] local.ods.member1
  5. +- Filter isnotnull(name#9)
  6. +- RelationV2[name#9, age#11] local.ods.member1

Physical Plan阶段

将optimazedPlan转换成物理执行计划SparkPlan。

  1. lazyval sparkPlan: SparkPlan = {
  2. // We need to materialize the optimizedPlan here because sparkPlan is also tracked under
  3. // the planning phase
  4. assertOptimized()
  5. executePhase(QueryPlanningTracker.PLANNING) {
  6. // Clone the logical plan here, in case the planner rules change the states of the logical
  7. // plan.
  8. QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
  9. }
  10. }
  1. def createSparkPlan(
  2. sparkSession: SparkSession,
  3. planner: SparkPlanner,
  4. plan: LogicalPlan): SparkPlan = {
  5. // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
  6. // but we will implement to choose the best plan.
  7. planner.plan(ReturnAnswer(plan)).next()
  8. }

这里对优化后的plan做了一层包装,这个ReturnAnswer节点的注释是规则的模式匹配只作用于最顶层的逻辑计划。然后将其丢入了SparkPlan中,这里有一个next是因为plan方法返回的是一个迭代器。

  1. def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
  2. // Obviously a lot to do here still...
  3. // Collect physical plan candidates.
  4. val candidates = strategies.iterator.flatMap(_(plan))
  5. // The candidates may contain placeholders marked as [[planLater]],
  6. // so try to replace them by their child plans.
  7. val plans = candidates.flatMap { candidate =>
  8. val placeholders = collectPlaceholders(candidate)
  9. if (placeholders.isEmpty) {
  10. // Take the candidate as is because it does not contain placeholders.
  11. Iterator(candidate)
  12. } else {
  13. // Plan the logical plan marked as [[planLater]] and replace the placeholders.
  14. placeholders.iterator.foldLeft(Iterator(candidate)) {
  15. case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
  16. // Plan the logical plan for the placeholder.
  17. val childPlans = this.plan(logicalPlan)
  18. candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
  19. childPlans.map { childPlan =>
  20. // Replace the placeholder by the child plan
  21. candidateWithPlaceholders.transformUp {
  22. case p if p.eq(placeholder) => childPlan
  23. }
  24. }
  25. }
  26. }
  27. }
  28. }

这个plan方法调用的是父类QueryPlanner的plan方法,也是物理执行计划的核心部分。我们顺着看第一个部分

  1. strategies.iterator.flatMap(_(plan))

,SparkPlanner重写了策略,下面是实际执行的策略:

  1. override def strategies: Seq[Strategy] =
  2. experimentalMethods.extraStrategies ++
  3. extraPlanningStrategies ++ (
  4. LogicalQueryStageStrategy ::
  5. PythonEvals ::
  6. new DataSourceV2Strategy(session) ::
  7. FileSourceStrategy ::
  8. DataSourceStrategy ::
  9. SpecialLimits ::
  10. Aggregation ::
  11. Window ::
  12. JoinSelection ::
  13. InMemoryScans ::
  14. BasicOperators :: Nil)

可以看到有数据源,聚合,窗口,Join等。将上面的策略依次执行到当前的节点,注意这里是不处理子节点的,子节点会在后面递归调用到这里时处理。优化后的逻辑计划顶层节点是Project节点,外面包装了ReturnAnswer节点,这个节点会被

  1. SpecialLimits

策略转换成一个

  1. PlanLater

节点,然后被扔进

  1. collectPlaceholders

方法中。

  1. override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = {
  2. plan.collect {
  3. case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan
  4. }
  5. }

这个collect方法会对当前节点和children节点作用collect后面的偏函数,这个偏函数处理的是PlanLater节点,当前只有根节点是PlanLater节点,返回的是一个元祖,第一个元素是当前PlanLater节点的本身,第二个元素是当前节点被转换前的节点,也就是Project节点。这个PlanLater节点相当于是一个占位符,这个占位符对应的是没有处理的节点。后面会用处理后的节点来替换这个占位符。接着,Project节点被递归调用

  1. val childPlans = this.plan(logicalPlan)

,重新扔进了plan方法中。这一次strategies中能够处理Project节点的策略就会起作用对其进行转换。在这个递归调用的作用下,从上至下所有的节点都会被对应的策略转换。Project->Join->Filter->RelationV2。

  1. candidateWithPlaceholders.transformUp {
  2. case p if p.eq(placeholder) => childPlan
  3. }

然后从下往上对

  1. PlanLater

进行替换。最终用

  1. Project

的物理计划

  1. ProjectExec

  1. PlanLater(Project)

替换并返回。最终的转换结果如下:

  1. Project [name#6, age#11]
  2. +- BroadcastHashJoin [name#6], [name#9], Inner, BuildLeft, false
  3. :- Project [name#6]
  4. : +- Filter isnotnull(name#6)
  5. : +- BatchScan[name#6] local.ods.member1 [filters=name IS NOT NULL]
  6. +- Project [name#9, age#11]
  7. +- Filter isnotnull(name#9)
  8. +- BatchScan[name#9, age#11] local.ods.member1 [filters=name IS NOT NULL]

中间具体的转换策略等有时间再写,留个坑。

Prepare Executed Plan

执行准备阶段,构建一系列的规则用作执行的准备。这些规则保证子查询执行计划的构建,确保数据分区和排序的正确性,

  1. insert whole stage

代码的生成,通过复用

  1. exchange

  1. subqueries

来减少工作。一下是具体的规则:

  1. private[execution] def preparations(
  2. sparkSession: SparkSession,
  3. adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
  4. // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
  5. // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
  6. adaptiveExecutionRule.toSeq ++
  7. Seq(
  8. CoalesceBucketsInJoin,
  9. PlanDynamicPruningFilters(sparkSession),
  10. PlanSubqueries(sparkSession),
  11. RemoveRedundantProjects,
  12. EnsureRequirements,
  13. // `RemoveRedundantSorts` needs to be added after `EnsureRequirements` to guarantee the same
  14. // number of partitions when instantiating PartitioningCollection.
  15. RemoveRedundantSorts,
  16. DisableUnnecessaryBucketedScan,
  17. ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules),
  18. CollapseCodegenStages(),
  19. ReuseExchange,
  20. ReuseSubquery
  21. )

这里看一个例子:

  1. object ReuseExchange extends Rule[SparkPlan] {
  2. def apply(plan: SparkPlan): SparkPlan = {
  3. if (!conf.exchangeReuseEnabled) {
  4. return plan
  5. }
  6. // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
  7. val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
  8. // Replace a Exchange duplicate with a ReusedExchange
  9. def reuse: PartialFunction[Exchange, SparkPlan] = {
  10. case exchange: Exchange =>
  11. val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
  12. val samePlan = sameSchema.find { e =>
  13. exchange.sameResult(e)
  14. }
  15. if (samePlan.isDefined) {
  16. // Keep the output of this exchange, the following plans require that to resolve
  17. // attributes.
  18. ReusedExchangeExec(exchange.output, samePlan.get)
  19. } else {
  20. sameSchema += exchange
  21. exchange
  22. }
  23. }
  24. plan transformUp {
  25. case exchange: Exchange => reuse(exchange)
  26. } transformAllExpressions {
  27. // Lookup inside subqueries for duplicate exchanges
  28. case in: InSubqueryExec =>
  29. val newIn = in.plan.transformUp {
  30. case exchange: Exchange => reuse(exchange)
  31. }
  32. in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
  33. }
  34. }
  35. }

可以看到,有一个

  1. exchanges

的HashMap存储了当前的Exchange信息,key为一个StructType对象,StructType存储了Join的字段信息,value是存储了Exchange的一个ArrayBuffer。这里先是在map中找到Join字段的Exchange的ArrayBuffer,如果找到了就与ArrayBuffer中的Exchange进行匹配,匹配上了就返回

  1. ReusedExchangeExec

复用的Exchage对象。如果没有匹配上就向map中进行添加。上面的执行计划中有一个

  1. BroadcastExchangeExec

节点,实现了

  1. BroadcastExchangeLike

特质,这个特质又继承了

  1. Exchange

的抽象类。例子中的exchanges内容如下所示:

  1. (StructType(StructField(name,StringType,false)),
  2. ArrayBuffer(BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#38]
  3. +- *(1) Filter isnotnull(name#6)
  4. +- BatchScan[name#6] local.ods.member1 [filters=name IS NOT NULL]))

标签: 大数据 spark hadoop

本文转载自: https://blog.csdn.net/u012794781/article/details/127842905
版权归原作者 Chrollo 所有, 如有侵权,请联系我们删除。

“spark sql解析过程详解”的评论:

还没有评论