0


Spark 3.3.x版本中的动态分区裁剪(DPP,Dynamic Partition Pruning)的实现及应用剖析

文章目录

Dynamic Partition Pruning(DPP)的作用

一种通用的描述是,DPP在分区级别过滤数据,注意它有别于

  1. Partitioin Filter

。DPP是在

  1. execute

阶段生效,对从数据源加载的

  1. InputPartition

(Spark内部计算数据时定义的数据类型)进一步过滤,减少传递到下游算子的数据量;而

  1. Partition Filter

则在Planning阶段就可以生效,对要加载的

  1. Catalog Partition

进行过滤,因此这两类

  1. Filter

有先后顺序,即先利用

  1. Partition Filter

加载可见分区,然后再利用DPP对加载后的分区过滤。
希望通过这篇文章,能够帮助你手动推出DPP的完整处理过程。

*当然DPP要过滤的对象是

  1. InputPartition

还是其它类似的数据结构,则跟具体的实现有关,这里仅描述一种通常的处理过程。*

注意区别如下4个概念:

Partition Filter:仅包含分区列的过滤条件,右值在planning阶段就可以确定。
Data Filter:仅包含非分区列的过滤条件,右值不确定。
Runtime Filter:可以包含分区列、非分区列的过滤条件,右值只能在execute阶段才能确定 (bloom filter)。
Source Filter:包含非分区列的过滤条件,右值是字面值,ODPS 表,传递给ODPS服务。

其中Runtime Filter也值得再深入讨论,因为Spark还会利用Subquery + Aggregation组合而成的子计划,优化JOIN计划(跟Mysql 中的Indexed Join的功能相似),主要是基于等值JOIN条件,构建BloomFilter数据结构,并将其作为Filter插入到JOIN的Application Side(如LEFT JOIN,就是指LEFT SIDE)。
更多Runtime Filter的故事,待后续的章节。

DPP生效的一些要点

  1. 将关联子查询/IN表达式/Exists表达式等转换成LEFT SEMI JOIN的子查询,并被封装成DynamicPruningExpression
  2. DynamicPruningExpression被会下推到pruning plan,例如a LEFT JOIN b,其中a即是pruning plan,而b是filtering plan
  3. 在默认参数配置下,Filtering plan被转换成可以被广播的子查询,它的输出列集是JOIN KEYs。
  4. 默认情况下DPP只能复用已有的Broadcast Stage起作用: 设置 spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=false,允许基于代价模型,进行DPP。
  5. Filtering Plan的子计划得有过滤条件: Bad: WHERE a.id IN (SELECT id FROM b WHERE a.id = b.id) Good: WHERE a.id IN (SELECT id FROM b WHERE a.id = b.id AND b.id IS NOT NULL)
  6. 当不限制broadcast only时,可以适当调整如下的参数优化DPP: 默认情况下,spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio=0.5,在计算代价时,用于估算DPP生效时,pruning侧(被过滤)的数据集的减少数据量,只有当减少的量大于filtering侧的读入数据量时,才会应用DPP; 默认情况下,spark.sql.optimizer.dynamicPartitionPruning.useStats=true,使得DPP的过滤效果的估算更加准备,避免性能回退,但对于ODPS表上的查询,又依赖于如下的两上特殊参数: spark.sql.odps.prunedPartitions.statistic.enable=true,默认允许收集统计信息 spark.sql.odps.prunedPartitions.statistic.countThreshold=512,默认分区数量小于此值时才收集
  7. 尽量避免非等值的关联过滤 形如:WHERE a.id IN (SELECT b.id FROM b WHERE a.id > b.id) 考虑转换成WHERE a.id IN (SELECT b.id FROM b WHERE b.id > 0) + JOIN的组合

DPP生效的简单SQL示例

假设表a、b拥有相同的字段定义,其中id字段是分区字段。
那么如下的SQL表示从表a中查询id字段值在子查询返回的结果集的行。

  1. -- 其中id在表agb中都是分区字段
  2. SELECT *
  3. FROM a
  4. WHERE a.id IN (SELECT id FROM b WHERE id = 1)

如上面的SQL,在表a JOIN 表b时有过滤条件

  1. a.id IN (SELECT id FROM b WHERE id = 1)

,因此可以尝试应用DPP优化规则,将过滤条件下推到读表a,基于分区字段

  1. id

进行分区过滤。
开启DPP优化,SQL的执行逻辑是,读取表a中,id字段在满足

  1. SELECT id FROM b WHERE id = 1

条件的分区数据,然后与表b JOIn;如果没有开启DPP优化,SQL的执行逻辑是,全量读表a的数据,然后与表b JOIN。

经过DPP优化后,上述示例最终等价转换为

  1. LEFT SEMI JOIN

的句型:

  1. SELECT*FROM a
  2. LEFT SEMI JOIN(SELECT id FROM b WHERE id =1)ON a.id = b.id

DPP生效SQL的解析示例

  1. SELECT*FROM a
  2. WHERE a.id IN(SELECT id
  3. FROM b
  4. WHERE b.id = a.id AND b.id >0)

经过SQL解析后,生成的初始逻辑计划树,简单表示如下,由于IN条件的执行依赖于外部表的字段,即a.id,因此是不能直接进行物化执行的,需要对这类关联/依赖子查询进行改写。

  1. Project [*]
  2. Filter [a.id]In(ListQuery []:
  3. Project [b.id]
  4. Filter [b.id = a.id, b.id >0]
  5. Relation [b.id])
  6. Relation [a.*]

带有DPP信息的逻辑执行计划:

  1. Project [a.*]
  2. LeftSemiJoin [b.id = a.id, b.id = a.id]
  3. Filter [DynamicPruningSubquery(Project [b.id]
  4. Filter [b.id >0]
  5. Relation [b.id])]
  6. Relation [a.*]
  7. Project [b.id]
  8. Filter [b.id >0]
  9. Relation [b.id]

最终的物理执行计划:

  1. -- Physical Plan-- DatasourceV2Strategy物化逻辑计划树时,会下推DynamicPruning类型的过滤表达式到BatchScanExec-- 在这里就是DynamicPruningExpression,由于FilterExec只有一个表达式,因此会被完全消除。
  2. ProjectExec [a.*]
  3. BroadcastJoinExec [a.*][b.id = a.id, b.id = a.id]
  4. BatchScanExec [a.*][runtimeFilters = DynamicPruningExpression(
  5. InSubqueryExec(a.id,
  6. SubqueryBroadcastExec(Project [b.id]
  7. Filter [b.id >0]
  8. BatchScanExec [b.id])))]
  9. BroadcaseExchangeExec
  10. ProjectExec [b.id]
  11. FilterExec [b.id >0]
  12. BatchScanExec [b.id]

Deduplicate Correlated Subquery

Rule Name: PullupCorrelatedPredicates

对于示例中的SQL句型,会生成如下的逻辑计划(其中

  1. b.id = a.id

会被单独抽出来,以备后续的处理),

  1. ListQuery

的子计划,由于没有了外部关联/依赖,因此可以独立地执行。

  1. Project [*]
  2. Filter [a.id]In(ListQuery [b.id = a.id]:
  3. Project [b.id]
  4. Filter [b.id >0]
  5. Relation [b.id])
  6. Relation [a.*]
  1. PullupCorrelatedPredicates

的实现过程及分析如下:

  1. object PullupCorrelatedPredicates extends Rule[LogicalPlan]with PredicateHelper {/**
  2. * Returns the correlated predicates and a updated plan that removes the outer references.
  3. */privatedef pullOutCorrelatedPredicates(
  4. sub: LogicalPlan,
  5. outer: LogicalPlan):(LogicalPlan, Seq[Expression])={// 存储所有的逻辑计划树与关联过滤条件的映射关系,由于关联过滤条件不能被对应的逻辑计划树直接处理// 因此需要抽取出来,以便将这些关联过滤条件,上推到与outer join结点中,作为新的join conditions。val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]]/** Determine which correlated predicate references are missing from this plan. */def missingReferences(p: LogicalPlan): AttributeSet ={val localPredicateReferences = p.collect(predicateMap).flatten
  6. .map(_.references).reduceOption(_ ++ _).getOrElse(AttributeSet.empty)
  7. localPredicateReferences -- p.outputSet
  8. }// Simplify the predicates before pulling them out.// 先简化表达式,然后自底向上,抽出关联过滤条件,同时在必然的结点中,追加由于// 需要某个子树额外输出的attributes。val transformed = BooleanSimplification(sub) transformUp {case f @ Filter(cond, child)=>// 返回关联过滤条件和非关联过滤条件,例如// SELECT * FROM t1 a// WHERE// NOT EXISTS (SELECT * FROM t1 b WHERE a.i = b.i AND b.i > 0)// EXISTS子查询处理后的结果为:(Seq(a.i = b.i), Seq(b.i > 0))val(correlated, local)=
  9. splitConjunctivePredicates(cond).partition(containsOuter)// Rewrite the filter without the correlated predicates if any.
  10. correlated match{case Nil => f
  11. case xs if local.nonEmpty =>// 如果子查询存在非关联的过滤条件时,就会将这些过滤条件组成一个新的Filter结点,// 替换原来的孩子计划树val newFilter = Filter(local.reduce(And), child)
  12. predicateMap += newFilter -> xs
  13. newFilter
  14. case xs =>// 只存在关联过滤条件时,保持原来的孩子计划树
  15. predicateMap += child -> xs
  16. child
  17. }case p @ Project(expressions, child)=>// 如果当前的sub计划树的存在project,则可能由于抽取了孩子子树的关联过滤条件,而// 这些filters中的某些attributes,并不会出现在project结点中,因此这里需要将这些// 丢失的属性追加到project中,才能保证被“上推”的过滤条件能够正确读取相应的字段。val referencesToAdd = missingReferences(p)if(referencesToAdd.nonEmpty){
  18. Project(expressions ++ referencesToAdd, child)}else{
  19. p
  20. }case a @ Aggregate(grouping, expressions, child)=>// 同理Project结点的处理方式,只不过这里多了grouping表达式的处理,需要也把这些// 不需要参与聚合的属性,追加到聚合过程中,以便关联过滤条件“上移”后,能够正确读取。val referencesToAdd = missingReferences(a)if(referencesToAdd.nonEmpty){
  21. Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child)}else{
  22. a
  23. }case p =>
  24. p
  25. }// Make sure the inner and the outer query attributes do not collide.// In case of a collision, change the subquery plan's output to use// different attribute by creating alias(s).val baseConditions = predicateMap.values.flatten.toSeq
  26. val outerPlanInputAttrs = outer.inputSet
  27. val(newPlan, newCond)=if(outerPlanInputAttrs.nonEmpty){// 由于当前子查询的output attributes和父查询的input attributes可能存在重复的属性,// 因此要上推的关联过滤条件,也可能存在重复的属性,// 如果不去重,关联过滤条件被上推到JOIN后,由于可能产生`a = a`的情况,其中等式左边的属性// 字段名a来自原sub计划树中,右侧字段名a来自outer计划树,显示作为JOIN条件时,会被优化成true,// 打破预期的JOIN结构,因此这里会对这种情况进行重写,对来自sub计划树的attributes进行重命名,// 这样就能保证新的join条件的左右两侧属性是来自于`逻辑上不同的表`。val(plan, deDuplicatedConditions)=
  28. DecorrelateInnerQuery.deduplicate(transformed, baseConditions, outerPlanInputAttrs)// 返回解耦后的,新的子查询,同时返回新的JOIN条件(plan, stripOuterReferences(deDuplicatedConditions))}else{// outerPlanInputAttrs为空,暂时没有想到或找到合适的用例,但一种可能的情况是// outer plan是一个LocalRelation()的实例,它没有输出,仅仅表示一个空的集合。(transformed, stripOuterReferences(baseConditions))}(newPlan, newCond)}privatedef rewriteSubQueries(plan: LogicalPlan): LogicalPlan ={/**
  29. * This function is used as a aid to enforce idempotency of pullUpCorrelatedPredicate rule.
  30. * In the first call to rewriteSubqueries, all the outer references from the subplan are
  31. * pulled up and join predicates are recorded as children of the enclosing subquery expression.
  32. * The subsequent call to rewriteSubqueries would simply re-records the `children` which would
  33. * contains the pulled up correlated predicates (from the previous call) in the enclosing
  34. * subquery expression.
  35. */def getJoinCondition(newCond: Seq[Expression], oldCond: Seq[Expression]): Seq[Expression]={if(newCond.isEmpty) oldCond else newCond
  36. }def decorrelate(
  37. sub: LogicalPlan,
  38. outer: LogicalPlan,
  39. handleCountBug:Boolean=false):(LogicalPlan, Seq[Expression])={if(SQLConf.get.decorrelateInnerQueryEnabled){
  40. DecorrelateInnerQuery(sub, outer, handleCountBug)}else{
  41. pullOutCorrelatedPredicates(sub, outer)}}
  42. plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)){case ScalarSubquery(sub, children, exprId, conditions)if children.nonEmpty =>val(newPlan, newCond)= decorrelate(sub, plan)
  43. ScalarSubquery(newPlan, children, exprId, getJoinCondition(newCond, conditions))case Exists(sub, children, exprId, conditions)if children.nonEmpty =>val(newPlan, newCond)= pullOutCorrelatedPredicates(sub, plan)
  44. Exists(newPlan, children, exprId, getJoinCondition(newCond, conditions))case ListQuery(sub, children, exprId, childOutputs, conditions)if children.nonEmpty =>// 对应示例的SQL句型:WHERE a IN (subquery)val(newPlan, newCond)= pullOutCorrelatedPredicates(sub, plan)
  45. ListQuery(newPlan, children, exprId, childOutputs, getJoinCondition(newCond, conditions))case LateralSubquery(sub, children, exprId, conditions)if children.nonEmpty =>val(newPlan, newCond)= decorrelate(sub, plan, handleCountBug =true)
  46. LateralSubquery(newPlan, children, exprId, getJoinCondition(newCond, conditions))}}/**
  47. * Pull up the correlated predicates and rewrite all subqueries in an operator tree..
  48. */def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
  49. _.containsPattern(PLAN_EXPRESSION)){case j: LateralJoin =>val newPlan = rewriteSubQueries(j)// Since a lateral join's output depends on its left child output and its lateral subquery's// plan output, we need to trim the domain attributes added to the subquery's plan output// to preserve the original output of the join.if(!j.sameOutput(newPlan)){
  50. Project(j.output, newPlan)}else{
  51. newPlan
  52. }// Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries.case q: UnaryNode =>
  53. rewriteSubQueries(q)case s: SupportsSubquery =>
  54. rewriteSubQueries(s)}}

Rewrite Predicates as Join

Rule Name: RewritePredicateSubquery
经过

  1. PullupCorrelatedPredicates

优化规则的应用,原本的关联过滤条件会从子查询中抽取出来,生成一个新

  1. ListQuery

结点。随后的过程,就是经过

  1. RewritePredicateSubquery

规则,再次改写,生成合适的JOIN结点。

  1. -- Before
  2. Project [a.*]
  3. Filter [a.id]In(ListQuery:
  4. Project [b.id]
  5. Filter [(b.id IN(SELECT id FROM c WHERE c.id>0)= a.id, b.id >0]
  6. Relation [b.id])
  7. Relation [a.*]-- After
  8. Project [a.*]
  9. LeftSemiJoin [b.id = a.id, b.id = a.id]
  10. Relation [a.*]
  11. Project [b.id]
  12. Filter [b.id >0]
  13. Relation [b.id]
  1. RewritePredicateSubquery

的实现过程及分析如下,:

  1. object RewritePredicateSubquery extends Rule[LogicalPlan]with PredicateHelper {privatedef buildJoin(
  2. outerPlan: LogicalPlan,
  3. subplan: LogicalPlan,
  4. joinType: JoinType,
  5. condition: Option[Expression]): Join ={// Deduplicate conflicting attributes if any.val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan, None, condition)// 生成一个新的JOIN计划,其中dedupSubplan就是,ListQuery结点对应的子计划// condition就是抽取出来的关联子查询
  6. Join(outerPlan, dedupSubplan, joinType, condition, JoinHint.NONE)}/**
  7. * 解耦自我join的子查询,型如:
  8. * SELECT * FROM t1 a
  9. * WHERE a.i EXISTS (SELECT i FROM t1 b WHERE a.i = b.i)
  10. * 逻辑上会被转换成如下的SQL:
  11. * SELECT a.* FROM t1 a
  12. * LEFT SEMI JOIN (SELECT i FROM t1) b
  13. * ON a.i = b.i
  14. * 不难看出,a与b对应的真实表是同一个,因此可能存在a.i = b.i被解析为true literal,导致
  15. * 解析问题。
  16. * 但从Spark 3.3.x版本的测试看,SPARK-21835、SPARK-26078的示例总是不会相同的attributes,
  17. * 可能是在某个历史版本才出现的问题吧。
  18. **/privatedef dedupSubqueryOnSelfJoin(
  19. outerPlan: LogicalPlan,
  20. subplan: LogicalPlan,
  21. valuesOpt: Option[Seq[Expression]],
  22. condition: Option[Expression]= None): LogicalPlan ={// SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,// the produced join then becomes unresolved and break structural integrity. We should// de-duplicate conflicting attributes.// SPARK-26078: it may also happen that the subquery has conflicting attributes with the outer// values. In this case, the resulting join would contain trivially true conditions (e.g.// id#3 = id#3) which cannot be de-duplicated after. In this method, if there are conflicting// attributes in the join condition, the subquery's conflicting attributes are changed using// a projection which aliases them and resolves the problem.val outerReferences = valuesOpt.map(values =>
  23. AttributeSet.fromAttributeSets(values.map(_.references))).getOrElse(AttributeSet.empty)val outerRefs = outerPlan.outputSet ++ outerReferences
  24. val duplicates = outerRefs.intersect(subplan.outputSet)if(duplicates.nonEmpty){
  25. condition.foreach { e =>val conflictingAttrs = e.references.intersect(duplicates)if(conflictingAttrs.nonEmpty){throw QueryCompilationErrors.conflictingAttributesInJoinConditionError(
  26. conflictingAttrs, outerPlan, subplan)}}val rewrites = AttributeMap(duplicates.map { dup =>
  27. dup -> Alias(dup, dup.toString)()}.toSeq)val aliasedExpressions = subplan.output.map { ref =>
  28. rewrites.getOrElse(ref, ref)}
  29. Project(aliasedExpressions, subplan)}else{
  30. subplan
  31. }}def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
  32. _.containsAnyPattern(EXISTS_SUBQUERY, LIST_SUBQUERY)){// 匹配的SQL子句型,如:WHERE a.id IN (SELECT id FROM b WHERE id = 1)case Filter(condition, child)if SubqueryExpression.hasInOrCorrelatedExistsSubquery(condition)=>val(withSubquery, withoutSubquery)=
  33. splitConjunctivePredicates(condition).partition(SubqueryExpression.hasInOrCorrelatedExistsSubquery)// 构建新的过滤表达式,不带有exist/in (subquery)模式的表达式// Construct the pruned filter condition.val newFilter: LogicalPlan = withoutSubquery match{case Nil => child
  34. case conditions => Filter(conditions.reduce(And), child)}// Filter the plan by applying left semi and left anti joins.
  35. withSubquery.foldLeft(newFilter){case(p, Exists(sub, _, _, conditions))=>val(joinCond, outerPlan)= rewriteExistentialExpr(conditions, p)
  36. buildJoin(outerPlan, sub, LeftSemi, joinCond)case(p, InSubquery(values, ListQuery(sub, _, _, _, conditions)))=>// Deduplicate conflicting attributes if any.// 到这里conditions,已经包含了原本在sub树中的关联过滤条件,这里再次尝试// 消除self join的情况,但实际测试中,相关的单元测试的SQL总是不会出现self join// 的问题,因此newSub始终等于sub。val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values))// 为所有的JOIN keys生成一个新的等值条件,左侧来自于outer plan,右侧来自于sub// 如果condidtioins包含了某个key的等值条件,这里依然会重复生成,因此有一定的冗余// 不过会在后续的过程被优化掉。val inConditions = values.zip(newSub.output).map(EqualTo.tupled)// 递归对join条件进行重写,替换其中的exists/in表达式为ExistenceJoin,// 例如有如下的改写逻辑(其中a.id = (b.id IN (SELECT id FROM t2))是带有subquery的predicate:// Filter(a.id = (b.id IN (SELECT id FROM t2)), Relation(b))// ==>// Filter(// Join(Relation(b),// Subquery(SELECT id FROM t2),// ExistenceJoin,// b.id = t2.id,// ExistenceJoin)// ))// val(joinCond, outerPlan)= rewriteExistentialExpr(inConditions ++ conditions, p)// 生成一个新的JOIN,以替换原来的形如://Filter i#254 IN (list#253 [i#254 && (i#254 = i#257)])//: +- Project [i#257]//: +- Relation default.t1[i#257,j#258] parquet//+- Relation default.t1[i#254,j#255] parquet// 转换为//Join LeftSemi, ((i#254 = i#257) AND (i#254 = i#257))//:- Relation default.t1[i#254,j#255] parquet//+- Project [i#257]// +- Relation default.t1[i#257,j#258] parquet
  37. Join(outerPlan, newSub, LeftSemi, joinCond, JoinHint.NONE)case other => other // 这里删除了其它匹配模式的处理逻辑,不仅仅包含上面的两个case}// 匹配的SQL句型,如:SELECT a.id IN (SELECT id FROM b WHERE id = 1)case u: UnaryNode if u.expressions.exists(
  38. SubqueryExpression.hasInOrCorrelatedExistsSubquery)=>var newChild = u.child
  39. u.mapExpressions(expr =>{val(newExpr, p)= rewriteExistentialExpr(Seq(expr), newChild)
  40. newChild = p
  41. // The newExpr can not be None
  42. newExpr.get
  43. }).withNewChildren(Seq(newChild))}/**
  44. * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query
  45. * into an existential join. It returns the rewritten expression together with the updated plan.
  46. * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in
  47. * the Analyzer.
  48. */privatedef rewriteExistentialExpr(
  49. exprs: Seq[Expression],
  50. plan: LogicalPlan):(Option[Expression], LogicalPlan)={var newPlan = plan
  51. val newExprs = exprs.map { e =>
  52. e.transformDownWithPruning(_.containsAnyPattern(EXISTS_SUBQUERY, IN_SUBQUERY)){case Exists(sub, _, _, conditions)=>val exists = AttributeReference("exists", BooleanType, nullable =false)()
  53. newPlan =
  54. buildJoin(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))
  55. exists
  56. case Not(InSubquery(values, ListQuery(sub, _, _, _, conditions)))=>val exists = AttributeReference("exists", BooleanType, nullable =false)()// Deduplicate conflicting attributes if any.val newSub = dedupSubqueryOnSelfJoin(newPlan, sub, Some(values))val inConditions = values.zip(sub.output).map(EqualTo.tupled)// To handle a null-aware predicate not-in-subquery in nested conditions// (e.g., `v > 0 OR t1.id NOT IN (SELECT id FROM t2)`), we transform// `inCondition` (t1.id=t2.id) into `(inCondition) OR ISNULL(inCondition)`.//// For example, `SELECT * FROM t1 WHERE v > 0 OR t1.id NOT IN (SELECT id FROM t2)`// is transformed into a plan below;// == Optimized Logical Plan ==// Project [id#78, v#79]// +- Filter ((v#79 > 0) OR NOT exists#83)// +- Join ExistenceJoin(exists#83), ((id#78 = id#80) OR isnull((id#78 = id#80)))// :- Relation[id#78,v#79] parquet// +- Relation[id#80] parquetval nullAwareJoinConds = inConditions.map(c => Or(c, IsNull(c)))val finalJoinCond =(nullAwareJoinConds ++ conditions).reduceLeft(And)
  57. newPlan = Join(newPlan, newSub, ExistenceJoin(exists), Some(finalJoinCond), JoinHint.NONE)
  58. Not(exists)case InSubquery(values, ListQuery(sub, _, _, _, conditions))=>val exists = AttributeReference("exists", BooleanType, nullable =false)()// Deduplicate conflicting attributes if any.val newSub = dedupSubqueryOnSelfJoin(newPlan, sub, Some(values))val inConditions = values.zip(newSub.output).map(EqualTo.tupled)val newConditions =(inConditions ++ conditions).reduceLeftOption(And)
  59. newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions, JoinHint.NONE)
  60. exists
  61. }}(newExprs.reduceOption(And), newPlan)}}

Rewrite Join With Dynamic Subquery

Rule Name: PartitionPruning

  1. -- Before-- id是一个分区字段
  2. Project [a.*]
  3. LeftSemiJoin [b.id = a.id, b.id = a.id]
  4. Relation [a.*]
  5. Project [b.id]
  6. Filter [b.id >0]
  7. Relation [b.id]-- After-- Cond1: Left Semi Join,可以对左侧表进行动态过滤-- Cond2: id是分区字段,因此过滤条件能够被下推到scan-- Cond3: JOIN右侧表计划树,包含Filter条件,b.id > 0-- Cond4: -- JOIN Key/Pruning keya.id/b.id,基于列的stats信息估算出,DPP的过滤比filterRatio-- a.iddistincts数量 > b.iddistinct数量时,filterRatio=1-distinct_b_id/distinct_a_id-- 其它情况则是spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio = 0.5-- 得到裁剪收益benefitsfilterRatio * stats_size_a > stats_size_b,因此可以广播表bid字段的数据集。
  8. Project [a.*]
  9. LeftSemiJoin [b.id = a.id, b.id = a.id]
  10. Filter [DynamicPruningSubquery(Project [b.id]
  11. Filter [b.id >0]
  12. Relation [b.id])]
  13. Relation [a.*]
  14. Project [b.id]
  15. Filter [b.id >0]
  16. Relation [b.id]
  1. PartitionPruning

的实例逻辑及分析:

  1. object PartitionPruning extends Rule[LogicalPlan]with PredicateHelper with JoinSelectionHelper {/**
  2. * Insert a dynamic partition pruning predicate on one side of the join using the filter on the
  3. * other side of the join.
  4. * - to be able to identify this filter during query planning, we use a custom
  5. * DynamicPruning expression that wraps a regular In expression
  6. * - we also insert a flag that indicates if the subquery duplication is worthwhile and it
  7. * should run regardless of the join strategy, or is too expensive and it should be run only if
  8. * we can reuse the results of a broadcast
  9. */privatedef insertPredicate(
  10. pruningKey: Expression,
  11. pruningPlan: LogicalPlan,
  12. filteringKey: Expression,
  13. filteringPlan: LogicalPlan,
  14. joinKeys: Seq[Expression],
  15. partScan: LogicalPlan): LogicalPlan ={val reuseEnabled = conf.exchangeReuseEnabled
  16. val index = joinKeys.indexOf(filteringKey)// prunning plan被裁剪掉的数据集大小,大于于右边表时,才是有收益的lazyval hasBenefit = pruningHasBenefit(pruningKey, partScan, filteringKey, filteringPlan)if(reuseEnabled || hasBenefit){// 只有开启了stage reuse功能,实际上只能是reuse broadcast stage;或是有收益的,才会插入DPP// insert a DynamicPruning wrapper to identify the subquery during query planning
  17. Filter(
  18. DynamicPruningSubquery(
  19. pruningKey,
  20. filteringPlan,
  21. joinKeys,
  22. index,
  23. conf.dynamicPartitionPruningReuseBroadcastOnly ||!hasBenefit),
  24. pruningPlan)}else{// abort dynamic partition pruning
  25. pruningPlan
  26. }}/**
  27. * Given an estimated filtering ratio we assume the partition pruning has benefit if
  28. * the size in bytes of the partitioned plan after filtering is greater than the size
  29. * in bytes of the plan on the other side of the join. We estimate the filtering ratio
  30. * using column statistics if they are available, otherwise we use the config value of
  31. * `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
  32. */privatedef pruningHasBenefit(
  33. partExpr: Expression,
  34. partPlan: LogicalPlan,
  35. otherExpr: Expression,
  36. otherPlan: LogicalPlan):Boolean={// get the distinct counts of an attribute for a given tabledef distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt]={
  37. plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)}// the default filtering ratio when CBO stats are missing, but there is a// predicate that is likely to be selectiveval fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
  38. // the filtering ratio based on the type of the join condition and on the column statisticsval filterRatio =(partExpr.references.toList, otherExpr.references.toList)match{// filter out expressions with more than one attribute on any side of the operatorcase(leftAttr :: Nil, rightAttr :: Nil)if conf.dynamicPartitionPruningUseStats =>// get the CBO stats for each attribute in the join conditionval partDistinctCount = distinctCounts(leftAttr, partPlan)val otherDistinctCount = distinctCounts(rightAttr, otherPlan)val availableStats = partDistinctCount.isDefined && partDistinctCount.get >0&&
  39. otherDistinctCount.isDefined
  40. if(!availableStats){
  41. fallbackRatio
  42. }elseif(partDistinctCount.get.toDouble <= otherDistinctCount.get.toDouble){// there is likely an estimation error, so we fallback
  43. fallbackRatio
  44. }else{1- otherDistinctCount.get.toDouble / partDistinctCount.get.toDouble
  45. }case _ => fallbackRatio
  46. }val estimatePruningSideSize = filterRatio * partPlan.stats.sizeInBytes.toFloat
  47. val overhead = calculatePlanOverhead(otherPlan)
  48. estimatePruningSideSize > overhead
  49. }/**
  50. * Calculates a heuristic overhead of a logical plan. Normally it returns the total
  51. * size in bytes of all scan relations. We don't count in-memory relation which uses
  52. * only memory.
  53. */privatedef calculatePlanOverhead(plan: LogicalPlan):Float={val(cached, notCached)= plan.collectLeaves().partition(p => p match{case _: InMemoryRelation =>truecase _ =>false})val scanOverhead = notCached.map(_.stats.sizeInBytes).sum.toFloat
  54. val cachedOverhead = cached.map {case m: InMemoryRelation if m.cacheBuilder.storageLevel.useDisk &&!m.cacheBuilder.storageLevel.useMemory =>
  55. m.stats.sizeInBytes.toFloat
  56. case m: InMemoryRelation if m.cacheBuilder.storageLevel.useDisk =>
  57. m.stats.sizeInBytes.toFloat *0.2case m: InMemoryRelation if m.cacheBuilder.storageLevel.useMemory =>0.0}.sum.toFloat
  58. scanOverhead + cachedOverhead
  59. }/**
  60. * Search a filtering predicate in a given logical plan
  61. */privatedef hasSelectivePredicate(plan: LogicalPlan):Boolean={
  62. plan.exists {case f: Filter => isLikelySelective(f.condition)case _ =>false}}/**
  63. * To be able to prune partitions on a join key, the filtering side needs to
  64. * meet the following requirements:
  65. * (1) it can not be a stream
  66. * (2) it needs to contain a selective predicate used for filtering
  67. */privatedef hasPartitionPruningFilter(plan: LogicalPlan):Boolean={!plan.isStreaming && hasSelectivePredicate(plan)}privatedef prune(plan: LogicalPlan): LogicalPlan ={
  68. plan transformUp {// skip this rule if there's already a DPP subquery on the LHS of a joincase j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _)=> j
  69. case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _)=> j
  70. case j @ Join(left, right, joinType, Some(condition), hint)=>// 只会对JOIN结构生效var newLeft = left
  71. var newRight = right
  72. // extract the left and right keys of the join conditionval(leftKeys, rightKeys)= j match{case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, _, _, _)=>(lkeys, rkeys)case _ =>(Nil, Nil)}// checks if two expressions are on opposite sides of the joindef fromDifferentSides(x: Expression, y: Expression):Boolean={def fromLeftRight(x: Expression, y: Expression)=!x.references.isEmpty && x.references.subsetOf(left.outputSet)&&!y.references.isEmpty && y.references.subsetOf(right.outputSet)
  73. fromLeftRight(x, y)|| fromLeftRight(y, x)}
  74. splitConjunctivePredicates(condition).foreach {case EqualTo(a: Expression, b: Expression)if fromDifferentSides(a, b)=>val(l, r)=if(a.references.subsetOf(left.outputSet)&&
  75. b.references.subsetOf(right.outputSet)){
  76. a -> b
  77. }else{
  78. b -> a
  79. }// there should be a partitioned table and a filter on the dimension table,// otherwise the pruning will not triggervar filterableScan = getFilterableTableScan(l, left)if(filterableScan.isDefined && canPruneLeft(joinType)&&
  80. hasPartitionPruningFilter(right)){// 左边表是prunning plan,右边表是filtering plan// 只有当右侧表有过滤条件时,才会会左边表插入DPP predicate
  81. newLeft = insertPredicate(l, newLeft, r, right, rightKeys, filterableScan.get)}else{
  82. filterableScan = getFilterableTableScan(r, right)if(filterableScan.isDefined && canPruneRight(joinType)&&
  83. hasPartitionPruningFilter(left)){
  84. newRight = insertPredicate(r, newRight, l, left, leftKeys, filterableScan.get)}}case _ =>}// 返回一个新的plan结点
  85. Join(newLeft, newRight, joinType, Some(condition), hint)}}overridedef apply(plan: LogicalPlan): LogicalPlan = plan match{// Do not rewrite subqueries.case s: Subquery if s.correlated => plan
  86. case _ if!conf.dynamicPartitionPruningEnabled => plan
  87. case _ => prune(plan)}}

Rewrite Dynamic Subquery as Dynamic Expression

Rule Name: PlanDynamicPruningFilters

  1. -- Before-- id是一个分区字段-- Cond1: Left Semi Join,可以对左侧表进行动态过滤-- Cond2: id是分区字段,因此过滤条件能够被下推到scan-- Cond3: JOIN右侧表计划树,包含Filter条件,b.id > 0-- Cond4: JOIN Key/ Pruning keya.id/b.id,基于列的stats信息估算出,DPP的过滤比filterRatio-- a.iddistincts数量 > b.iddistinct数量时,filterRatio=1-distinct_b_id/distinct_a_id-- 其它情况则是spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio = 0.5-- 得到裁剪收益benefitsfilterRatio * stats_size_a > stats_size_b,因此可以广播b-- 假设benefits = ture
  2. Project [a.*]
  3. LeftSemiJoin [b.id = a.id, b.id = a.id]
  4. Filter [DynamicPruningSubquery(Project [b.id]
  5. Filter [b.id >0]
  6. Relation [b.id]]
  7. Relation [a.*]
  8. Project [b.id]
  9. Filter [b.id >0]
  10. Relation [b.id]-- After-- case1: 支持exchange reuse,而且计划树中存在broadcast计划,且与filtering plan相同,即DPS,时-- DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))-- case2: filtering plan只能被广播时-- DynamicPruningExpression(Literal.TrueLiteral)-- case3: 即使不能走broadcast,但裁剪有收益-- DynamicPruningExpression(expressions.InSubquery(-- Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))
  11. Project [a.*]
  12. LeftSemiJoin [b.id = a.id, b.id = a.id]
  13. Filter [DynamicPruningExpression(
  14. InSubqueryExec(a.id,
  15. SubqueryBroadcastExec(Project [b.id]
  16. Filter [b.id >0]
  17. Relation [b.id]))
  18. Relation [a.*]
  19. Project [b.id]
  20. Filter [b.id >0]
  21. Relation [b.id]
  1. PlanDynamicPruningFilters

优化规则的实现逻辑及分析:

  1. caseclass PlanDynamicPruningFilters(sparkSession: SparkSession)extends Rule[SparkPlan]with PredicateHelper {/**
  2. * Identify the shape in which keys of a given plan are broadcasted.
  3. */privatedef broadcastMode(keys: Seq[Expression], output: AttributeSeq): BroadcastMode ={val packedKeys = BindReferences.bindReferences(HashJoin.rewriteKeyExpr(keys), output)
  4. HashedRelationBroadcastMode(packedKeys)}overridedef apply(plan: SparkPlan): SparkPlan ={if(!conf.dynamicPartitionPruningEnabled){return plan
  5. }
  6. plan.transformAllExpressionsWithPruning(_.containsPattern(DYNAMIC_PRUNING_SUBQUERY)){case DynamicPruningSubquery(
  7. value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId)=>val sparkPlan = QueryExecution.createSparkPlan(
  8. sparkSession, sparkSession.sessionState.planner, buildPlan)// Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is// the first to be applied (apart from `InsertAdaptiveSparkPlan`).val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
  9. plan.exists {case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _)=>
  10. left.sameResult(sparkPlan)case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _)=>
  11. right.sameResult(sparkPlan)case _ =>false}if(canReuseExchange){// 只有当支持复用broadcast stage时,才能够应用DPP,因此这里会通过broadcast机制拿到// filtering plan的结果集,以在运行时对pruning plan(被裁剪的plan)的描述分区进一步删减val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)val mode = broadcastMode(buildKeys, executedPlan.output)// plan a broadcast exchange of the build side of the joinval exchange = BroadcastExchangeExec(mode, executedPlan)val name = s"dynamicpruning#${exprId.id}"// place the broadcast adaptor for reusing the broadcast results on the probe sideval broadcastValues =
  12. SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
  13. DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))}elseif(onlyInBroadcast){// it is not worthwhile to execute the query, so we fall-back to a true literal// 如果显示指定了只能利用broadcast实现DPP,同时整个计划树中不存在与filtering plan相同的// broadcast stage时,返回字面量true,表示dpp失效。
  14. DynamicPruningExpression(Literal.TrueLiteral)}else{// 如果不强制DPP只能依赖broadcast机制生效,同时DPP裁剪是有收益的,那么就改写SQL,构建一个子查询,// 采集filtering plan的与join key相关的distinct数据集,以便在运行时对prunning plan裁剪// we need to apply an aggregate on the buildPlan in order to be column prunedval alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)()val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan)
  15. DynamicPruningExpression(expressions.InSubquery(
  16. Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))}}}}

Push Down Dynamic Expression And Materialization

Strategy Name: DataSourceV2Strategy
从逻辑计划树转换为物理计划树的过程中,会将DPP过滤条件,下推到

  1. BatchScanExec

算子,以便能够在生成RDD时(execution阶段)能够应用这些条件,过滤分区。

  1. -- Before-- case1: 支持exchange reuse,而且计划树中存在broadcast计划,且与filtering plan相同,即DPS,时-- DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))-- case2: filtering plan只能被广播时-- DynamicPruningExpression(Literal.TrueLiteral)-- case3: 即使不能走broadcast,但裁剪有收益-- DynamicPruningExpression(expressions.InSubquery(-- Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))
  2. Project [a.*]
  3. LeftSemiJoin [b.id = a.id, b.id = a.id]
  4. Filter [DynamicPruningExpression(
  5. InSubqueryExec(a.id,
  6. SubqueryBroadcastExec(Project [b.id]
  7. Filter [b.id >0]
  8. Relation [b.id])))]
  9. Relation [a.*]
  10. Project [b.id]
  11. Filter [b.id >0]
  12. Relation [b.id]-- Physical Plan-- DatasourceV2Strategy物化逻辑计划树时,会下推DynamicPruning类型的过滤表达式到BatchScanExec-- 在这里就是DynamicPruningExpression,由于FilterExec只有一个表达式,因此会被完全消除
  13. ProjectExec [a.*]
  14. BroadcastJoinExec [a.*][b.id = a.id, b.id = a.id]-- Filter算子被消除了
  15. BatchScanExec [a.*][runtimeFilters = DynamicPruningExpression(
  16. InSubqueryExec(a.id,
  17. SubqueryBroadcastExec(Project [b.id]
  18. Filter [b.id >0]
  19. Relation [b.id])))]
  20. BroadcastExchangeExec
  21. ProjectExec [b.id]
  22. FilterExec [b.id >0]
  23. BatchScanExec [b.id]

Pruning Partitions at Runtime

BatchScanExec::compute被调用时,即生成RDD时,才会应用DPP过滤。

  1. /**
  2. * Physical plan node for scanning a batch of data from a data source v2.
  3. */caseclass BatchScanExec(
  4. output: Seq[AttributeReference],@transient scan: Scan,
  5. runtimeFilters: Seq[Expression],
  6. keyGroupedPartitioning: Option[Seq[Expression]]= None)extends DataSourceV2ScanExecBase {@transientoverridelazyval inputPartitions: Seq[InputPartition]= batch.planInputPartitions()@transientprivatelazyval filteredPartitions: Seq[Seq[InputPartition]]={// 将DPP表达式转换成Spark统一的表达式,即Source Filterval dataSourceFilters = runtimeFilters.flatMap {case DynamicPruningExpression(e)=> DataSourceStrategy.translateRuntimeFilter(e)case _ => None
  7. }if(dataSourceFilters.nonEmpty){val originalPartitioning = outputPartitioning
  8. // the cast is safe as runtime filters are only assigned if the scan can be filtered// 在这里,如果Scan实例,确实支持了runtime filter的功能,那么会在运行时将DynamicPruningExpression下推到数据源val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]// Scan::filter接口,提供了一个入口,方便用户将Source Filter按自己的需求,再次进行转换,// 例如Parquet Filter
  9. filterableScan.filter(dataSourceFilters.toArray)// call toBatch again to get filtered partitions// 生成最终的`InputPartition`集合,它们经过了dataSourceFilters洗礼。val newPartitions = scan.toBatch.planInputPartitions()
  10. originalPartitioning match{case p: KeyGroupedPartitioning =>if(newPartitions.exists(!_.isInstanceOf[HasPartitionKey])){thrownew SparkException("Data source must have preserved the original partitioning "+"during runtime filtering: not all partitions implement HasPartitionKey after "+"filtering")}val newRows =new InternalRowSet(p.expressions.map(_.dataType))
  11. newRows ++= newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey())val oldRows = p.partitionValuesOpt.get
  12. if(oldRows.size != newRows.size){thrownew SparkException("Data source must have preserved the original partitioning "+"during runtime filtering: the number of unique partition values obtained "+
  13. s"through HasPartitionKey changed: before ${oldRows.size}, after ${newRows.size}")}if(!oldRows.forall(newRows.contains)){thrownew SparkException("Data source must have preserved the original partitioning "+"during runtime filtering: the number of unique partition values obtained "+
  14. s"through HasPartitionKey remain the same but do not exactly match")}
  15. groupPartitions(newPartitions).get.map(_._2)case _ =>// no validation is needed as the data source did not report any specific partitioning
  16. newPartitions.map(Seq(_))}}else{
  17. partitions
  18. }}overridelazyval inputRDD: RDD[InternalRow]={if(filteredPartitions.isEmpty && outputPartitioning == SinglePartition){// return an empty RDD with 1 partition if dynamic filtering removed the only split
  19. sparkContext.parallelize(Array.empty[InternalRow],1)}else{new DataSourceRDD(
  20. sparkContext, filteredPartitions, readerFactory, supportsColumnar, customMetrics)}}overridedef doExecute(): RDD[InternalRow]={val numOutputRows = longMetric("numOutputRows")
  21. inputRDD.map { r =>
  22. numOutputRows +=1
  23. r
  24. }}}

扩展知识

DPP的设计实现

类结构

下图展示了Spark中与DPP相关的类定义,其中

  1. DynamicPruning

是一个接口,标识了一个

  1. Logical Plan

结点是不是DPP相关的。
为了能够完成DPP的功能,Spark实现了两个具体的表达式(Expression)类,

  1. DynamicPruningSubquery

  1. DynamicPruningExpression

DynamicPruningSubquery

其中

  1. DynamicPruningSubquery

维护了可以进行DPP的过滤条件的细节,如在前一节的SQL示例中提到的JOIN过滤条件

  1. a.id IN (SELECT id FROM b WHERE id = 1)

,因此它包含了一个子查询。

  1. caseclass DynamicPruningSubquery(
  2. pruningKey: Expression,// 被裁剪的JOIN侧的字段,如前面的SQL示例中提到的a.id字段
  3. buildQuery: LogicalPlan,// 被广播的子查询
  4. buildKeys: Seq[Expression],// 被广播的子查询对应的所有JOIN keys,如前面的SQL示例中提到的b.id
  5. broadcastKeyIndex:Int,// 被广播的子查询的输出字段的索引,例如前面的SQL示例中的JOIN条件a.id = b.id,其中a.id对应于pruningKey,b.id对应于broadcastKey
  6. onlyInBroadcast:Boolean,// 用于标识过滤子查询的结果是否只能被Broadcast到JOIN的另一侧(被过滤侧)
  7. exprId: ExprId = NamedExpression.newExprId,
  8. hint: Option[HintInfo]= None)extends SubqueryExpression(buildQuery, Seq(pruningKey), exprId, Seq.empty, hint)with DynamicPruning
  9. with Unevaluable
  10. with UnaryLike[Expression]

DynamicPruningExpression

  1. DynamicPruningExpression

则是对

  1. DynamicPruningSubquery

的封装和替代,维护的信息逻辑是是子查询的结果集,因此它与DynamicPruningSubquery类有前后关系。

  1. // child成员变量,对应了DynamicPruningSubquery返回的结果集caseclass DynamicPruningExpression(child: Expression)extends UnaryExpression
  2. with DynamicPruning

简单来说,Spark会在planning阶段,先收集可以进行DPP的信息,生成

  1. DynamicPruningSubquery

结点;然后对

  1. DynamicPruningSubquery

进行分析,按一定的规则可以DPP的逻辑计划。

生成DynamicPruningSubquery

在逻辑计划树中插入

  1. DynamicPruningSubquery

结点,是通过PartitionPruning优化规则实现的,它被注册在

  1. SparkOptimizerdefaultBatches

中,因此所有的Query都会尝试应用此规则。

  1. object PartitionPruning extends Rule[LogicalPlan]with PredicateHelper with JoinSelectionHelper {privatedef prune(plan: LogicalPlan): LogicalPlan ={
  2. plan transformUp {// skip this rule if there's already a DPP subquery on the LHS of a joincase j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _)=> j
  3. case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _)=> j
  4. case j @ Join(left, right, joinType, Some(condition), hint)=>var newLeft = left
  5. var newRight = right
  6. // extract the left and right keys of the join conditionval(leftKeys, rightKeys)= j match{case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, _, _, _)=>(lkeys, rkeys)case _ =>(Nil, Nil)}// checks if two expressions are on opposite sides of the joindef fromDifferentSides(x: Expression, y: Expression):Boolean={def fromLeftRight(x: Expression, y: Expression)=!x.references.isEmpty && x.references.subsetOf(left.outputSet)&&!y.references.isEmpty && y.references.subsetOf(right.outputSet)
  7. fromLeftRight(x, y)|| fromLeftRight(y, x)}
  8. splitConjunctivePredicates(condition).foreach {case EqualTo(a: Expression, b: Expression)if fromDifferentSides(a, b)=>val(l, r)=if(a.references.subsetOf(left.outputSet)&&
  9. b.references.subsetOf(right.outputSet)){
  10. a -> b
  11. }else{
  12. b -> a
  13. }// there should be a partitioned table and a filter on the dimension table,// otherwise the pruning will not triggervar filterableScan = getFilterableTableScan(l, left)if(filterableScan.isDefined && canPruneLeft(joinType)&&
  14. hasPartitionPruningFilter(right)){
  15. newLeft = insertPredicate(l, newLeft, r, right, rightKeys, filterableScan.get)}else{
  16. filterableScan = getFilterableTableScan(r, right)if(filterableScan.isDefined && canPruneRight(joinType)&&
  17. hasPartitionPruningFilter(left)){
  18. newRight = insertPredicate(r, newRight, l, left, leftKeys, filterableScan.get)}}case _ =>}
  19. Join(newLeft, newRight, joinType, Some(condition), hint)}}overridedef apply(plan: LogicalPlan): LogicalPlan = plan match{// Do not rewrite subqueries.case s: Subquery if s.correlated => plan
  20. case _ if!conf.dynamicPartitionPruningEnabled => plan
  21. case _ => prune(plan)}}

Subquery(子查询)的定义及分类

依赖子查询

由于了表b上的子查询,包含了外部查询(这里指表a)的字段/列,因此Spark不会对这一类子查询应用动态裁剪优化规则。
其中a.id是一个类型为OuterReference的属性,因此它已经在外层的Query scope中被解析了;而b.id是一个类型为AttributeReference的属性,故这种有内、外依赖关系的查询,被称之为关联/依赖查询。

  1. SELECT*FROM a
  2. WHEREEXISTS(SELECT*FROM b
  3. WHERE b.id > a.id)

非依赖子查询

内查询(表b上的查询)与外查询(表a上的查询)没有关联关系,因此Spark可以修改计划,应用动态裁剪功能优化规则。

  1. SELECT*FROM a
  2. WHEREEXISTS(SELECT*FROM b
  3. WHERE b.id >10)

几类常见的Subquery

Lateral
  1. SELECT*FROM t LATERAL (SELECT*FROM u) uu
Exists
  1. SELECT*FROM a
  2. WHEREEXISTS(SELECT*FROM b
  3. WHERE b.id >10)
IN
  1. SELECT*FROM a
  2. WHERE a.id IN(SELECT id
  3. FROM b)
Scala
  1. SELECT(SELECTCURRENT_DATE())
Table Valued Function
  1. SELECT*FROM my_tvf(TABLE(v1),TABLE(SELECT1))

本文转载自: https://blog.csdn.net/u014445499/article/details/142418070
版权归原作者 Dreammmming Time 所有, 如有侵权,请联系我们删除。

“Spark 3.3.x版本中的动态分区裁剪(DPP,Dynamic Partition Pruning)的实现及应用剖析”的评论:

还没有评论