Skew Join
是 Spark 中为了解决数据倾斜问题而设计的一种优化机制。数据倾斜是指在分布式计算中,由于某些
key
具有大量数据,而其他
key
数据较少,导致某些分区的数据量特别大,造成计算负载不均衡。数据倾斜会导致个别节点出现性能瓶颈,影响整个任务的完成时间。
Skew Join
的优化机制在 Spark 中主要解决了
JOIN
操作中的数据倾斜问题。为了更好地理解
Skew Join
的原理和实现,我们需要从数据倾斜产生的原因、Spark 如何识别数据倾斜、以及
Skew Join
的优化策略和底层实现等方面来进行详细解释。
一、什么是数据倾斜
数据倾斜指的是当某些
key
关联了异常大量的数据,而其他
key
关联的数据量较少时,数据分布的不均衡会导致计算瓶颈。例如,在
JOIN
操作中,如果表 A 中某个
key
具有大量的数据,而表 B 中同样的
key
也有大量数据,当这两个表基于这个
key
进行
JOIN
时,由于该
key
被分配到一个或少数几个分区,相关的任务会处理大量的数据,而其他分区的任务数据量却较少。这会导致部分任务比其他任务运行时间长,从而影响整个任务的执行时间。
二、Spark 中如何识别数据倾斜
在执行
JOIN
操作时,Spark 会通过数据采样和统计信息来检测是否存在数据倾斜。Spark SQL 可以通过分析数据分布,计算每个
key
的数据量,当发现某些
key
占据了大量的行时,Spark 会将其标记为 "倾斜的 key"。对于这些倾斜的
key
,Spark 会进行特殊处理,避免过度集中在某些分区中。
Spark 的
Skew Join
优化主要依赖于配置参数和数据采样来检测并处理这些倾斜的
key
。
检测数据倾斜的主要参数:
- spark.sql.autoSkewJoin.enabled: 默认是
false
,如果设置为true
,Spark 会自动检测和处理数据倾斜的JOIN
操作。 - spark.sql.skewJoin.threshold: 用来设定 Spark 如何判断某个分区是否倾斜。该参数设置的值是数据倾斜的阈值,通常是一个比例值,如果某个分区的数据量超过该比例值,则会被视为倾斜的分区。
三、Skew Join 的底层原理
当 Spark 识别出
JOIN
中存在数据倾斜时,
Skew Join
会将倾斜的
key
拆分成多个子任务分别处理。具体而言,
Skew Join
的主要思想是将倾斜的
key
拆分到多个不同的分区,从而将任务的计算负载均匀分布,避免单个分区处理过多数据。
以下是
Skew Join
的执行流程:
普通的非倾斜
key
处理: 对于普通的非倾斜key
,Skew Join
没有特别的处理方式,Spark 直接按照key
进行Shuffle
,将数据发送到相应的分区,并进行JOIN
操作。倾斜的
key
处理:对于检测到的倾斜
key
,Spark 会进行特殊处理,具体步骤如下:
- Spark 会将倾斜的
key
的数据进行重新分片,将大数据量的倾斜key
拆分成多个子分区。 - 然后对于每一个子分区,分别与另一个表中的对应数据进行
JOIN
。 - 通过多次
JOIN
操作,将这些子分区结果合并为最终的JOIN
输出结果。
** 3. Hash Salt(哈希加盐)**:
为了避免倾斜的
key
被集中到同一个分区,Spark 会通过对倾斜的
key
添加一个随机的
salt
(盐值)来打散数据。具体来说,Spark 会将倾斜的
key
拆分成多个子
key
,通过附加随机数(
salt
),使得这些子
key
被分布到不同的分区。
伪代码展示:
// 倾斜 key 的原始 join
tableA.join(tableB, "key")
// Skew Join 处理
val skewKeys = getSkewKeys()
for (skewKey <- skewKeys) {
val saltedTableA = tableA.filter($"key" === skewKey).withColumn("salt", rand())
val saltedTableB = tableB.filter($"key" === skewKey).withColumn("salt", rand())
saltedTableA.join(saltedTableB, Seq("key", "salt"))
}
通过引入
salt
,可以有效地将数据均匀分布到不同的分区,减少单个分区处理的数据量。
四、Skew Join 的源代码实现
在 Spark SQL 中,
Skew Join
是作为
PhysicalPlan
中
Join
的一个优化执行计划。关键类为
EnsureRequirements
,其主要职责是对
Join
的物理计划执行前进行必要的调整,包括处理数据倾斜的
Skew Join
优化。
以下是
EnsureRequirements
中处理数据倾斜的相关部分源代码:
private def applySkewJoin(plan: SparkPlan): SparkPlan = plan match {
case join @ ShuffledHashJoinExec(_, _, _, _, left, right) =>
// 检查是否有数据倾斜
if (isSkewed(join)) {
// 处理 skew join,使用 hash salt 拆分倾斜的 key
val skewJoin = handleSkewJoin(join)
skewJoin
} else {
join
}
case other => other
}
在
EnsureRequirements
中,
applySkewJoin
函数会检测当前的
JOIN
是否存在数据倾斜问题。如果检测到数据倾斜,
handleSkewJoin
函数会对数据进行处理,创建一个带有
salt
的
Skew Join
执行计划。
具体实现步骤:
- 检测数据倾斜:
isSkewed(join)
函数负责检测JOIN
中的分区是否有数据倾斜。通常,通过采样和统计每个分区的数据量,来判断某个分区的数据量是否超出设定的阈值(spark.sql.skewJoin.threshold
)。 - 处理倾斜数据:
handleSkewJoin(join)
函数是Skew Join
的核心实现。它会通过对倾斜的key
添加salt
进行打散,使得数据均匀分布到多个子分区。
private def handleSkewJoin(join: ShuffledHashJoinExec): SparkPlan = {
val skewKeys = getSkewKeys(join)
val saltedLeft = splitAndSalt(join.left, skewKeys)
val saltedRight = splitAndSalt(join.right, skewKeys)
saltedLeft.join(saltedRight)
}
private def splitAndSalt(plan: SparkPlan, skewKeys: Seq[KeyType]): SparkPlan = {
// 对每个倾斜 key 进行拆分并添加 salt
plan.transform {
case rdd: RDD[_] =>
rdd.mapPartitionsInternal { iter =>
iter.flatMap { row =>
val key = getJoinKey(row)
if (skewKeys.contains(key)) {
val salt = Random.nextInt(numSplits) // 随机生成 salt
Some((key, salt, row))
} else {
Some((key, row))
}
}
}
}
}
在上面的代码中,
splitAndSalt
函数将每个倾斜的
key
拆分成多个子
key
,并为它们添加随机
salt
,从而打散数据,均匀分布到不同的分区。
五、Skew Join 的优化策略
Spark 中
Skew Join
的优化需要考虑以下几个方面:
- 自动启用 Skew Join:通过设置
spark.sql.autoSkewJoin.enabled
为true
,Spark 会自动检测并处理倾斜的JOIN
操作。对于那些倾斜的分区,Spark 会自动进行Skew Join
优化。 - 调优 salt 值:
salt
的值影响了倾斜数据被打散的粒度。通过调节salt
的随机范围,可以控制数据的打散程度。如果salt
的范围太小,数据可能仍然集中在某些分区;如果范围太大,则可能会产生过多的小分区,导致计算开销增加。 - 采样优化:通过调整采样参数,Spark 可以更好地识别出数据倾斜的
key
,从而提高Skew Join
的处理效率。spark.sql.skewJoin.threshold
参数允许用户设定数据倾斜的阈值。 - 数据预处理:在某些场景中,用户可以通过在数据加载和预处理阶段手动解决数据倾斜问题。例如,用户可以通过聚合或者过滤数据的方式,减少倾斜
key
的数据量。
六、总结
Skew Join
是 Spark 中为了解决数据倾斜问题而提供的一种重要优化机制。其核心思想是通过检测数据倾斜的
key
,并对这些
key
进行分片和哈希加盐处理,使得倾斜的数据被均匀分布到不同的分区,从而避免计算负载的不均衡。通过
Skew Join
,Spark 可以显著提高
JOIN
操作的性能,尤其是在数据倾斜严重的场景下。
合理的参数调优和数据预处理是确保
Skew Join
有效的关键。
版权归原作者 goTsHgo 所有, 如有侵权,请联系我们删除。