最近在生产加工时,遇到了一种情况:
在postgresql中可以正常运行的代码,放到spark中却怎么也跑不动。
造成这种结果的问题sql如下,可以先看一看:
select t1.dateas partition_date,
t2.user_id as user_id
from(selectdatefrom
table_date
wheredateBETWEEN DATE_FORMAT('$now.date','yyyy-MM-01')and'$now.date') t1
leftjoin(select partition_date,
user_id
from wallet_sign_ss
where partition_date='$now.date'groupby1,2) t2
on date_format(t1.date,'yyyy-MM')= date_format(t2.partition_date,'yyyy-MM')
排查原因后发现,是由于小表join大表,导致spark的数据分发模式选择了shuffle join大类。
接下来我将讲解spark sql在各种情况下的的join策略选择,原理及其原因,对于生产实践中处理方式的启发:
Spark join支持的3种关联实现机制
Spark join支持3种关联实现机制,分别是hash join、sort merge join、nested loop join
hash join
其中hash join执行效率最高,因为哈希表具有o(1)的查找效率,不过,需要首先在build阶段构建出哈希表才行,因此hash join对内存的要求比较高,适用于内存能够容纳基表数据的计算场景。
sort merge join
相比之下,sort merge join就没有内存方面的限制,排序、合并,smj都可以利用磁盘来完成计算。所以,在稳定性方面,smj更胜一筹。
hash join的执行效率为o(M),smj的执行效率是o(m+n),并没有相差太多。这主要得益于SMJ的排序阶段。因此,如果准备参与Join的两张表是有序表,采用SMJ来实现关联是最优的。
Nested loop join虽然看上去多余,嵌套的双层for循环带来的计算复杂度为O(MN),不过,尺有所短寸有所长,执行高效的HJ和SMJ只能用于等值关联,也就是说关联条件必须是等式
nested loop join
像on wallet_users.partition_date> trade_users.partition_date 这样的关联条件,HJ和SMJ是无能为力的。相反,NLJ既可以处理等值关联,也可以应付不等值关联,可以说是数据关联在实现机制上的最后一道防线。
spark的两种分发机制
与此同时,spark还有两种分发机制,分别为shuffle和广播。
在分布式环境下,两张表的数据各自散落在不同的计算节点与executors进程。因此,要想完成数据关联,spark sql必须先要把join keys相同的数据,分发到同一个executors中去才行。比如我们打算对两张表按照partition_date做关联,那么对于日期值相同的数据,必须要保证它们坐落在同样的executors进程里,spark sql才能利用刚刚说的hj/smj/nlj,以executors(进程)为粒度并行地完成数据关联。换句话说,以join keys为基准,两张表的数据分布保持一致,是spark sql执行分布式数据关联的前提。而能满足这个前提的途径只有两个:shuffle和广播。
因此,按照分发模式划分,数据关联可以分为shuffle join和broadcast join两大类。通常来说,在性能方面,broadcast会比shuffle更胜一筹。
Shuffle join
在没有开发者干预的情况下,spark sql通常采用shuffle join来完成分布式环境下的数据关联。对于参与join的两张表,spark sql按照如下规则,决定不同数据应当分发到哪个executors中去:
1、根据join keys计算哈希值 2、将哈希值对并行度取模
如果左边和右表在并行度(分区数)上是一致的,因此,按照规则分发数据之后,一定能够保证id字段值相同的薪资数据与员工数据坐落在同样的executors中。
在map阶段,数据分散在不同的executors中,经过shuffle过后,join keys相同的记录被分发到了同样的executors中去。接下来,在reduce阶段,reduce task就可以使用Hj/smj/nlj算法在executors内部完成数据关联的计算。
Spark sql之所以通常会采用shuffle join,原因在于在任何情况下,不论数据的体量是大是小,不管内存是否足够,shuffle join都能够完成数据关联的计算。但是性能上的损耗会非常大。Shuffle Join需要将两张表的数据在全网范围内进行分发,这会导致大量的网络传输和数据洗牌操作,消耗大量的网络带宽和计算资源。
Broadcast join
在Broadcast Join的执行过程中,Spark SQL首先会从各个Executors收集小表(例如Date表)的所有数据分片,这些数据分片被聚集到Driver端,构建一个广播变量(例如bcDate),Driver端将收集到的小表数据分片整合成一个完整的广播变量,这个广播变量包含了小表的所有数据,构建好的广播变量(例如bcDate)被分发到集群中所有的Executors。每个Executor都会收到这个广播变量,并将其保存在内存中。在进行Join操作时,大表数据保持不动,每个Executor通过广播变量直接关联小表数据,从而完成Join操作。
尽管广播变量的创建与分发同样需要消耗网络带宽,但相比Shuffle Join中两张表的全网分发,因为仅仅通过分发体量较小的数据表来完成数据关联,Spark SQL的执行性能显然要高效得多。这种小投入、大产出,用极小的成本去博取高额的性能收益,可以说是“四两拨千斤”!
Spark sql支持的join策略总结
不论是shuffle join还是broadcast join,一旦数据分发完毕,理论上可以采用HJ、SMJ、NLJ这3种实现机制中的任意一种,完成Executors内部的数据关联。因此,两种分发模式,与三种实现机制,组合起来,共有6种分布式join策略。
对于等值关联,spark sql优先考虑采用broadcast hj策略,其次是shuffle smj,最后是shuffle hj,对于不等值关联,spark sql优先级为 broadcast nlj>shuffle nlj。
可以看到,不论是等值或者不等值关联,只要广播join的前提成立,spark sql一定会优先选择广播join,而根据上述的原理,广播join实现的前提条件便是:被广播表的全量数据能够完全放入driver的内存、以及各个executors的内存。
同时,为了避免因广播表过大而引入新的性能隐患,spark sql要求被广播表的内存大小不能超过8GB。
在实际生产中,尽量在join时将小表放在大表的后面,便可以在spark sql中触发广播join,从而大幅减少需要分发的数据量,提升spark sql的执行性能。修改后的代码如下:
SELECT
t1.partition_date AS partition_date,
t2.user_id AS user_id
FROM(SELECT
partition_date,
user_id
FROM
wallet_sign_ss
WHERE partition_date ='$now.date'GROUPBY
partition_date, user_id
) t2
JOIN(SELECTdateAS partition_date
FROM
table_date
WHEREdateBETWEEN DATE_FORMAT('$now.date','yyyy-MM-01')AND'$now.date')t1
ON DATE_FORMAT(t1.partition_date,'yyyy-MM')= DATE_FORMAT(t2.partition_date,'yyyy-MM')) month_wallet_users
总结下来,spark sql执行broadcast join的条件如下:
1、被广播表的全量数据能够完全放入driver的内存、以及各个executors的内存。
2、spark sql要求被广播表的内存大小不能超过8GB。
3、在join时将小表放在大表的后面,才会被spark sql解析后执行broadcast join。
而以上条件任一不满足时,spark sql就会退化到shuffle join的策略。
因此在不等值关联中,就只剩shuffle nlj这一种策略。
而在等值关联的场景中,仍有shuffle smj和shuffle hj这两种选择。虽然这两种方法各有千秋,但是shuffle在map阶段往往会对数据做排序,这恰恰符合SMJ对于排序的需求,复杂度为O(M+N),这样的执行效率与HJ的O(M)可以说是不相上下。另外SMJ在执行稳定性上,远远优于HJ,因此在内存受限的情况下,SMJ可以充分利用磁盘来顺利完成关联计算。总的来说,相较于shuffle SMJ, spark sql向来是对shuffle HJ视而不见的。
Spark sql在各自情况下的策略选择总结如下:
满足广播条件的等值连接:broadcast HJ
满足广播条件的非等值连接:bradcast NLJ
不满足广播条件的等值连接:shuffle SMJ
不满足广播条件的非等值连接:shuffle NLJ
今天的分享到这里,希望大家多多批评指正,知识为开源内容,代码已做脱敏处理,谢谢!
版权归原作者 共勉zzz 所有, 如有侵权,请联系我们删除。