0


Spark中的JOIN机制

Spark中的JOIN机制

1、Hash Join概述

Apache Spark共提供了五种JOIN机制,其中常用的有三种:Shuffle Hash Join、Broadcast Hash Join及Sort Merge Join,它们都基于Hash Join,只不过需要在Hash Join前后进行Shuffle、Broadcast或Sort

实际上,Hash Join算法来自于传统数据库,而Shuffle、Broadcast和Sort是大数据(数据仓库)在分布式场景下两者结合的产物。因此,我们也说大数据(数据仓库)是由传统数据库发展而来的

通常情况下,Hash Join使用两个表中较小的表在内存中建立以Join Key为Key的哈希/散列表(Hash Table),然后扫描较大的表,同样对大表Join Key进行Hash后探测哈希/散列表,找出与哈希/散列表匹配的行

Hash Join主要分为两个阶段:建立阶段(Build Phase)和探测阶段(Probe Phase)

  • Bulid Phase:较小的表被构建成以Join Key为Key的Hash Table,较小的表也称Build Table
  • Probe Phase:扫描较大表的行并计算Join Key的哈希值,与Build Table哈希表比对,若相同则进行JOIN,较大的表也称Probe Table

值得注意的是,Hash Join适用于较小的表完全可以放于内存的情况,如果表较大,无法构造在内存中,则优化器会将它分成若干个Partition,将不能放入内存的部分写入磁盘,此时会多一个写的代价,I/O性能差

Apache Spark将参与JOIN的两张表抽象为流式遍历表(StreamIter)和查找表(BuildIter),通常StreamIter为大表,BuildIter为小表,这是由Spark根据JOIN策略自动决定的。对于每条来自StreamIter的记录,都要去BuildIter中查找匹配的记录

2、影响JOIN的因素

影响JOIN性能和JOIN结果的因素主要包括:数据集的大小、JOIN的连接条件以及JOIN的连接类型

1)数据集的大小

参与JOIN的数据集大小会直接影响JOIN操作的执行效率。同样,也会影响Spark JOIN机制的选择

2)JOIN的连接条件

JOIN的条件涉及字段之间的逻辑比较关系。根据JOIN的条件,JOIN可分为两大类:等值连接和非等值连接

  • 等值连接:涉及字段之间一个或多个必须同时满足的相等条件(运算连接符为=
  • 非等值连接:涉及字段之间一个或多个必须同时满足的非相等条件(运算连接符不为=

3)JOIN的连接类型

在输入数据集的记录之间应用连接条件之后,JOIN类型会影响JOIN操作的结果。Spark主要有以下几种JOIN类型:

  • 内连接(Inner Join:返回连接条件都匹配的记录
  • 外连接(Outer Join:分为左外连接、右外链接和全外连接
  • 交叉连接(Cross Join:交叉连接返回两个表的笛卡尔乘积
  • 半/反连接(Semi/Anti Join:返回匹配/不匹配右表的左表数据

3、Spark中的JOIN机制

Spark共提供了五种JOIN机制来执行具体的JOIN操作:

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Product Join
  • Broadcast Nested Loop Join

3.1、Shuffle Hash Join

当要JOIN的表数据量较大时,一般选择Shuffle Hash Join。这样可以将大表按照Join Key进行重分区,保证每个相同的Join Key都发送到同一个分区中

在这里插入图片描述
Shuffle Hash Join的基本步骤如下:

  • Shuffle阶段:对于两张参与JOIN的表,分别根据Join Key进行重分区,该过程会涉及Shuffle,保证Join Key值相同的记录被分到同一个分区
  • Hash Join阶段:对于每个Shuffle Read分区,会将分区中较小表(BuildIter)构建成一个Hash Table放入内存,然后根据Join Key与较大的表记录进行匹配

Shuffle Hash Join的条件与特点如下:

  • BuildIter的总体估计大小超过spark.sql.autoBroadcastJoinThreshold(default 10MB)设置的值,即不满足Broadcast Hash Join条件
  • 仅支持等值连接,且参数spark.sql.join.preferSortMergeJoin(default true)需要设置为false,即不满足Sort Merge Join条件(Join Key不需要排序)
  • 支持除了全外连接(Full Outer Join)外的所有JOIN类型,对于Full Outer Join,需要建立双向Hash Table,代价太大,因此Full Outer Join默认都是基于Sort Merge Join来实现
  • 需要对小表构建Hash Table,属于内存密集型操作,如果构建Hash Table的一侧数据较大,可能会造成数据倾斜和OOM
  • StreamIter的大小是BuildIter的三倍以上

Shuffle Hash Join:Shuffle会带来较多的网络I/O开销,因此性能较差。同时,在Executor的内存使用方面,如果Executor的数量足够多,每个分区处理的数据量可以控制到比较小

3.2、Broadcast H

标签: spark sql

本文转载自: https://blog.csdn.net/weixin_55629186/article/details/140479171
版权归原作者 对许 所有, 如有侵权,请联系我们删除。

“Spark中的JOIN机制”的评论:

还没有评论