0


【Spark的五种Join策略解析】

join基本流程

Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
img
对于每条来自streamIter的记录,都要去buildIter中查找匹配的记录,所以buildIter一定要是查找性能较优的数据结构。spark提供了三种join实现:sort merge join、broadcast join以及hash join。

五种join策略

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

大表join小表

Shuffle Hash Join

image-20220929103146591
Join 步骤:把大表和小表按照相同的分区算法和分区数进行分区(Join 的 keys 进行分区),保证了 hash 值一样(相同key)的数据都分发到同一个分区中(分区内不排序),然后在同一个 Executor 中两张表 hash 值一样的分区就可以在本地进行 hash Join 。在进行 Join 之前,还会对小表的分区构建 Hash 桶(这就要求每个分区都不能太大),便于查找。

注意,和broadcast hash join的区别,这里并没有广播小表,在双方shuffle后的分区内,小表转成Hash桶与大表进行hash join。

苛刻的条件:

  • buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件
  • 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false
  • 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中
  • streamIter的大小是buildIter三倍以上

特点:

  • 仅支持等值连接,join key不需要排序
  • 支持除了全外连接(full outer joins)之外的所有join类型
  • 需要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比较大,可能会造成OOM,不适合严重倾斜的join
  • 对于FullOuter Join,需要建立双向hash表,代价太大。因此FullOuterJoin默认都是基于SortJoin来实现

Broadcast Hash Join

image-20220929103203808
将小表的数据广播到 Spark 所有的 Executor 端,只能用于等值连接。避免了 Shuffle 操作。一般而言,Broadcast Hash Join 会比其他 Join 策略执行的要快。因为他直接在一个map中完成了,也称之为map join

Join 步骤:

  • 利用 collect 算子将小表的数据从 Executor 端拉到 Driver 端
  • 在 Driver 端调用 sparkContext.broadcast 广播到所有 Executor 端
  • 在 Executor 端使用广播的数据与大表进行 Join 操作

使用条件:

  • 必须为等值连接,不要求 Join 的 keys 可排序
  • 小表大小小于 spark.sql.autoBroadcastJoinThreshold(default 10M)设定的值

Broadcast Nested Loop Join

该方式是在没有合适的JOIN机制可供选择时,最终会选择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.

最小的数据集被广播到另一个数据集的每个分区上,执行一个嵌套循环来执行join, 也就是说数据集1的每条记录都尝试join数据集2的每条记录(最笨的方法),效率比较低。既可以做等值join也可以做非等值join,而且是非等值join的默认策略。

没有排序,就是广播小表到每个分区上,尝试join每条记录,效率低!

大表之间join

Sort Merge Join

image-20220929103221072
先hash到同一个分区且排好序,然后再在分区内顺序查找比对

对表的大小没有条件,不管分区多大,SortMergeJoin 都不用把一侧的数据全部加载到内存中,而是即用即丢;两个序列都有序。从头遍历,碰到 key 相同的就输出,如果不同,左边小就继续取左边,反之取右边,由于排序的特性,每次处理完一条记录后只需要从上一次结束的位置开始查找,SortMergeJoinExec执行时就能够避免大量无用的操作,提高了大数据量下sql join 的稳定性。

Join 步骤:

  • shuffle: 将两张表按照 join key 进行shuffle,保证join key值相同的记录会被分在相应的分区
  • sort: 对每个分区内的数据进行排序
  • merge: 排序后再对相应的分区内的记录进行连接

使用条件:

  • 等值连接
  • 参与 join 的 key 可排序

Cartesian Join

笛卡尔积

如果左表有n个分区,右表有m个分区,那么笛卡尔积后的分区数是K=n * m个;并且这K个分区中,第K(i)个分区获取的左表分区为 kn=i / m,获取的右表分区为 km=i % m,然后kn和km这两个分区做笛卡尔积;由于是以分区为单位,所以不会触发shuffle;

join策略选择

等值连接的情况

有join提示(hints)的情况,按照下面的顺序
  • Broadcast Hint:如果join类型支持,则选择broadcast hash join
  • Sort merge hint:如果join key是排序的,则选择 sort-merge join
  • shuffle hash hint:如果join类型支持, 选择 shuffle hash join
  • shuffle replicate NL hint: 如果是内连接,选择笛卡尔积方式
没有join提示(hints)的情况,则逐个对照下面的规则

image-20220929194248135

  • 如果join类型支持,并且其中一张表能够被广播(spark.sql.autoBroadcastJoinThreshold值,默认是10MB),则选择 broadcast hash join
  • 如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(可以构建一个hash map) ,则选择shuffle hash join
  • 如果join keys 是排序的,则选择sort-merge join
  • 如果是内连接,选择 cartesian join
  • 没有可以选择的执行策略,则最终选择broadcast nested loop join,即使可能会发生OOM

非等值连接情况

有join提示(hints),按照下面的顺序
  • broadcast hint:选择broadcast nested loop join.
  • shuffle replicate NL hint: 如果是内连接,则选择cartesian product join
没有join提示(hints),则逐个对照下面的规则
  • 如果一张表足够小(可以被广播),则选择 broadcast nested loop join
  • 如果是内连接,则选择cartesian product join
  • 如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join

实验

非等值连接默认是BroadcastNestedLoopJoin

scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")
res1:String=true

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res2:String=10485760

scala>val data1 = Seq(10,20,20,30,40,10,40,20,20,20,20,50)
data1: Seq[Int]= List(10,20,20,30,40,10,40,20,20,20,20,50)

scala>val df1 = data1.toDF("id1")
df1: org.apache.spark.sql.DataFrame =[id1: int]

scala>val data2 = Seq(30,20,40,50)
data2: Seq[Int]= List(30,20,40,50)

scala>val df2 = data2.toDF("id2")
df2: org.apache.spark.sql.DataFrame =[id2: int]

scala>val dfJoined = df1.join(df2, $"id1">= $"id2")//非等值连接
dfJoined: org.apache.spark.sql.DataFrame =[id1: int, id2: int]// 注意查看执行计划是BroadcastNestedLoopJoin
scala> dfJoined.queryExecution.executedPlan
res3: org.apache.spark.sql.execution.SparkPlan =
BroadcastNestedLoopJoin BuildRight, Inner,(id1#3>= id2#8):- LocalTableScan [id1#3]+- BroadcastExchange IdentityBroadcastMode
   +- LocalTableScan [id2#8]

image-20220929190714574

shuffle hash join

scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold",2)

scala> spark.conf.set("spark.sql.join.preferSortMergeJoin","false")

scala>val dfhashJoined = df1.join(df2, $"id1"=== $"id2")//等值连接
dfhashJoined: org.apache.spark.sql.DataFrame =[id1: int, id2: int]

scala> dfhashJoined.queryExecution.executedPlan
res7: org.apache.spark.sql.execution.SparkPlan =
ShuffledHashJoin [id1#3],[id2#8], Inner, BuildRight
:- Exchange hashpartitioning(id1#3,200):+- LocalTableScan [id1#3]+- Exchange hashpartitioning(id2#8,200)+- LocalTableScan [id2#8]

image-20220929191340105

sort MergeJoin

scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)

scala> spark.conf.set("spark.sql.join.preferSortMergeJoin","true")

scala>val sortJoined = df1.join(df2, $"id1"=== $"id2")
sortJoined: org.apache.spark.sql.DataFrame =[id1: int, id2: int]

scala> sortJoined.queryExecution.executedPlan
res11: org.apache.spark.sql.execution.SparkPlan =*(3) SortMergeJoin [id1#3],[id2#8], Inner
:-*(1) Sort [id1#3 ASC NULLS FIRST],false,0:+- Exchange hashpartitioning(id1#3,200):+- LocalTableScan [id1#3]+-*(2) Sort [id2#8 ASC NULLS FIRST],false,0+- Exchange hashpartitioning(id2#8,200)+- LocalTableScan [id2#8]

scala> sortJoined.show

image-20220929193014333

spark3 join策略提示

  1. Broadcast HashJoin
有三种方式
SELECT/*+ BROADCAST(t1) */*FROM t1 INNERJOIN t2 ON t1.key= t2.key;SELECT/*+ BROADCASTJOIN (t1) */*FROM t1 leftJOIN t2 ON t1.key= t2.key;SELECT/*+ MAPJOIN(t2) */*FROM t1 rightJOIN t2 ON t1.key= t2.key;
  1. shuffle sort merge Join
SELECT/*+ SHUFFLE_MERGE(t1) */*FROM t1 INNERJOIN t2 ON t1.key= t2.key;SELECT/*+ MERGEJOIN(t2) */*FROM t1 INNERJOIN t2 ON t1.key= t2.key;SELECT/*+ MERGE(t1) */*FROM t1 INNERJOIN t2 ON t1.key= t2.key;
  1. shuffle Hash Join
SELECT/*+ SHUFFLE_HASH(t1) */*FROM t1 INNERJOIN t2 ON t1.key= t2.key;
标签: spark ajax

本文转载自: https://blog.csdn.net/u012432611/article/details/132824637
版权归原作者 岸芷汀兰whu 所有, 如有侵权,请联系我们删除。

“【Spark的五种Join策略解析】”的评论:

还没有评论