0


【Apache Spark 】第 7 章优化和调优 Spark 应用程序

🔎大家好,我是Sonhhxg_柒,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流🔎

📝个人主页-Sonhhxg_柒的博客_CSDN博客 📃

🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝​

📣系列专栏 - 机器学习【ML】 自然语言处理【NLP】 深度学习【DL】

🖍foreword

✔说明⇢本人讲解主要包括Python、机器学习(ML)、深度学习(DL)、自然语言处理(NLP)等内容。

如果你对这个系列感兴趣的话,可以关注订阅哟👋

在上一章中,我们详细介绍了如何在 Java 和 Scala 中使用数据集。我们探索了 Spark 如何管理内存以适应 Dataset 构造,并将其作为其统一和高级 API 的一部分,并且我们考虑了与使用 Datasets 相关的成本以及如何降低这些成本。

除了降低成本,我们还想考虑如何优化和调整 Spark。在本章中,我们将讨论一组启用优化的 Spark 配置,查看 Spark 的连接策略系列,并检查 Spark UI,寻找不良行为的线索。

优化和调整 Spark 以提高效率

虽然 Spark 有许多用于调优的配置,但本书只会介绍少数最重要和最常用的调优配置。有关按功能主题分组的综合列表,您可以仔细阅读文档。

查看和设置 Apache Spark 配置

您可以通过三种方式获取和设置 Spark 属性。第一种是通过一组配置文件。在您的部署

  1. $SPARK_HOME

目录(安装 Spark 的位置)中,有许多配置文件:conf/spark-defaults.conf.templateconf/log4j.properties.templateconf/spark-env.sh.template。更改这些文件中的默认值并在不使用 . 模板后缀指示 Spark 使用这些新值。

笔记

conf/spark-defaults.conf文件中的配置更改适用于 Spark 集群以及提交到集群的所有 Spark 应用程序。

第二种方法是直接在您的 Spark 应用程序中或在提交应用程序时在命令行中指定 Spark 配置

  1. spark-submit

,使用以下

  1. --conf

标志:

  1. spark-submit --conf spark.sql.shuffle.partitions=5 --conf
  2. "spark.executor.memory=2g" --class main.scala.chapter7.SparkConfig_7_1 jars/main-
  3. scala-chapter7_2.12-1.0.jar

以下是在 Spark 应用程序本身中执行此操作的方法:

  1. // In Scala
  2. import org.apache.spark.sql.SparkSession
  3. def printConfigs(session: SparkSession) = {
  4. // Get conf
  5. val mconf = session.conf.getAll
  6. // Print them
  7. for (k <- mconf.keySet) { println(s"${k} -> ${mconf(k)}\n") }
  8. }
  9. def main(args: Array[String]) {
  10. // Create a session
  11. val spark = SparkSession.builder
  12. .config("spark.sql.shuffle.partitions", 5)
  13. .config("spark.executor.memory", "2g")
  14. .master("local[*]")
  15. .appName("SparkConfig")
  16. .getOrCreate()
  17. printConfigs(spark)
  18. spark.conf.set("spark.sql.shuffle.partitions",
  19. spark.sparkContext.defaultParallelism)
  20. println(" ****** Setting Shuffle Partitions to Default Parallelism")
  21. printConfigs(spark)
  22. }
  23. spark.driver.host -> 10.8.154.34
  24. spark.driver.port -> 55243
  25. spark.app.name -> SparkConfig
  26. spark.executor.id -> driver
  27. spark.master -> local[*]
  28. spark.executor.memory -> 2g
  29. spark.app.id -> local-1580162894307
  30. spark.sql.shuffle.partitions -> 5

第三种选择是通过 Spark shell 的编程接口。与 Spark 中的其他一切一样,API 是主要的交互方法。通过该

  1. SparkSession

对象,您可以访问大多数 Spark 配置设置。

例如,在 Spark REPL 中,这段 Scala 代码显示了 Spark 以本地模式启动的本地主机上的 Spark 配置(有关可用不同模式的详细信息,请参阅第 1 章中的“部署模式”):

  1. // In Scala
  2. // mconf is a Map[String, String]
  3. scala> val mconf = spark.conf.getAll
  4. ...
  5. scala> for (k <- mconf.keySet) { println(s"${k} -> ${mconf(k)}\n") }
  6. spark.driver.host -> 10.13.200.101
  7. spark.driver.port -> 65204
  8. spark.repl.class.uri -> spark://10.13.200.101:65204/classes
  9. spark.jars ->
  10. spark.repl.class.outputDir -> /private/var/folders/jz/qg062ynx5v39wwmfxmph5nn...
  11. spark.app.name -> Spark shell
  12. spark.submit.pyFiles ->
  13. spark.ui.showConsoleProgress -> true
  14. spark.executor.id -> driver
  15. spark.submit.deployMode -> client
  16. spark.master -> local[*]
  17. spark.home -> /Users/julesdamji/spark/spark-3.0.0-preview2-bin-hadoop2.7
  18. spark.sql.catalogImplementation -> hive
  19. spark.app.id -> local-1580144503745

您还可以仅查看 Spark SQL 特定的 Spark 配置:

  1. // In Scala
  2. spark.sql("SET -v").select("key", "value").show(5, false)
  1. # In Python
  2. spark.sql("SET -v").select("key", "value").show(n=5, truncate=False)
  3. +------------------------------------------------------------+-----------+
  4. |key |value |
  5. +------------------------------------------------------------+-----------+
  6. |spark.sql.adaptive.enabled |false |
  7. |spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin |0.2 |
  8. |spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled|true |
  9. |spark.sql.adaptive.shuffle.localShuffleReader.enabled |true |
  10. |spark.sql.adaptive.shuffle.maxNumPostShufflePartitions |<undefined>|
  11. +------------------------------------------------------------+-----------+
  12. only showing top 5 rows

或者,您可以通过 Spark UI 的 Environment 选项卡(我们将在本章稍后讨论)以只读值访问 Spark 的当前配置,如图 7-1所示。

图 7-1。Spark 3.0 UI 的环境选项卡

要以编程方式设置或修改现有配置,首先检查该属性是否可修改。将返回或。所有可修改的配置都可以使用 API 设置为新值:

  1. spark.conf.isModifiable("*<config_name>*")
  1. true
  1. false
  1. // In Scala
  2. scala> spark.conf.get("spark.sql.shuffle.partitions")
  3. res26: String = 200
  4. scala> spark.conf.set("spark.sql.shuffle.partitions", 5)
  5. scala> spark.conf.get("spark.sql.shuffle.partitions")
  6. res28: String = 5
  1. # In Python
  2. >>> spark.conf.get("spark.sql.shuffle.partitions")
  3. '200'
  4. >>> spark.conf.set("spark.sql.shuffle.partitions", 5)
  5. >>> spark.conf.get("spark.sql.shuffle.partitions")
  6. '5'

在您可以设置 Spark 属性的所有方式中,优先顺序决定了哪些值被接受。在spark-defaults.conf中定义的任何值或标志将首先被读取,然后是在命令行中提供的

  1. spark-submit

那些,最后是

  1. SparkSession

在 Spark 应用程序中设置的那些。所有这些属性将被合并,在 Spark 应用程序中重置的任何重复属性优先。同样,命令行上提供的值将取代配置文件中的设置,前提是它们不会在应用程序本身中被覆盖。

正如您将在下一节中看到的那样,调整或提供正确的配置有助于提高性能。这里的建议来源于社区从业者的观察,重点是如何最大化 Spark 的集群资源利用率,以适应大规模的工作负载.

为大型工作负载扩展 Spark

大型 Spark 工作负载通常是批处理作业——有些在夜间运行,而有些则在白天定期安排。在任何一种情况下,这些作业都可能处理数十 TB 或更多的数据。为避免因资源不足或性能逐渐下降而导致作业失败,您可以启用或更改一些 Spark 配置。这些配置影响三个 Spark 组件:Spark 驱动程序、执行程序和在执行程序上运行的 shuffle 服务。

Spark driver 的职责是与集群管理器协调以在集群中启动 executor 并在其上安排 Spark 任务。对于大型工作负载,您可能有数百个任务。本节介绍了一些您可以调整或启用的配置,以优化资源利用率、并行化任务并避免大量任务的瓶颈。一些优化想法和见解来自 Facebook 等使用 TB 级规模的 Spark 的大数据公司,他们在 Spark + AI 峰会上与 Spark 社区分享了这些想法和见解。1

静态与动态资源分配

当您将计算资源指定为 的命令行参数时

  1. spark-submit

,就像我们之前所做的那样,您限制了限制。这意味着,如果由于工作量大于预期而导致任务在驱动程序中排队,稍后需要更多资源,Spark 将无法容纳或分配额外的资源。

相反,如果您使用 Spark 的动态资源分配配置,Spark 驱动程序可以随着大型工作负载的需求起伏而请求更多或更少的计算资源。在您的工作负载是动态的(即它们对计算容量的需求不同)的场景中,使用动态分配有助于适应突然的峰值。

一个有用的用例是流式传输,其中数据流量可能不均匀。另一个是按需数据分析,在高峰时段您可能会有大量的 SQL 查询。启用动态资源分配可以让 Spark 更好地利用资源,在不使用的时候释放执行者,并在需要时获取新的执行者。

笔记

除了处理大型或变化的工作负载时,动态分配在多租户环境中也很有用,其中 Spark 可以与 YARN、Mesos 或 Kubernetes 中的其他应用程序或服务一起部署。但是请注意,Spark 不断变化的资源需求可能会同时影响其他需要资源的应用程序。

要启用和配置动态分配,您可以使用如下设置。请注意,这里的数字是任意的;适当的设置将取决于您的工作负载的性质,并且应该相应地进行调整。其中一些配置无法在 Spark REPL 中设置,因此您必须以编程方式设置它们:

  1. spark.dynamicAllocation.enabled true
  2. spark.dynamicAllocation.minExecutors 2
  3. spark.dynamicAllocation.schedulerBacklogTimeout 1m
  4. spark.dynamicAllocation.maxExecutors 20
  5. spark.dynamicAllocation.executorIdleTimeout 2min

默认情况下

  1. spark.dynamicAllocation.enabled

设置为

  1. false

。当使用此处显示的设置启用时,Spark 驱动程序将要求集群管理器创建两个执行程序以启动,至少 (

  1. spark.dynamicAllocation.minExecutors

)。随着任务队列积压的增加,每次超过积压超时( )时都会请求新的执行

  1. spark.dynamicAllocation.schedulerBacklogTimeout

器。在这种情况下,只要有超过 1 分钟未调度的待处理任务,驱动程序就会请求启动一个新的执行器来调度积压的任务,最多 20 个(

  1. spark.dynamicAllocation.maxExecutors

)。相反,如果 executor 完成任务并空闲 2 分钟 (

  1. spark.dynamicAllocation.executorIdleTimeout

),Spark 驱动程序将终止它。

配置 Spark 执行器的内存和 shuffle 服务

仅仅启用动态资源分配是不够的。您还必须了解 Spark 如何布局和使用执行程序内存,以免执行程序内存不足或受到 JVM 垃圾收集的困扰。

每个执行程序可用的内存量由 控制

  1. spark.executor.memory

。这分为三个部分,如图 7-2 所示:执行内存、存储内存和保留内存。在保留 300 MB 内存后,默认划分为 60% 用于执行内存,40% 用于存储,以防止出现 OOM 错误。Spark文档建议这适用于大多数情况,但您可以调整

  1. spark.executor.memory

希望任一部分用作基线的比例。当存储内存不被使用时,Spark 可以获取它以在执行内存中使用以用于执行目的,反之亦然。

图 7-2。执行器内存布局

执行内存用于 Spark 洗牌、连接、排序和聚合。由于不同的查询可能需要不同的内存量,因此专用于此的可用内存的一部分(默认情况下)可能很难调整,但很容易调整

  1. spark.memory.fraction

  1. 0.6

相比之下,存储内存主要用于缓存用户数据结构和从 DataFrame 派生的分区。

在 map 和 shuffle 操作期间,Spark 写入和读取本地磁盘的 shuffle 文件,因此存在大量 I/O 活动。这可能会导致瓶颈,因为默认配置对于大规模 Spark 作业来说不是最理想的。在 Spark 作业的这个阶段,知道要调整哪些配置可以降低这种风险。

在表 7-1中,我们捕获了一些建议的配置进行调整,以便这些操作期间的映射、溢出和合并过程不会受到低效 I/O 的阻碍,并使这些操作能够在写入最终的 shuffle 分区之前使用缓冲内存到磁盘。调整每个执行器上运行的 shuffle 服务也有助于提高整体性能适用于大型 Spark工作负载。
表 7-1。在 map 和 shuffle 操作期间调整 I/O 的 Spark 配置配置默认值、推荐和描述

  1. spark.driver.memory

默认值为

  1. 1g

(1 GB)。这是分配给 Spark 驱动程序以从执行程序接收数据的内存量。这通常在

  1. spark-submit

with期间更改

  1. --driver-memory


仅当您希望驱动程序从诸如 之类的操作中接收到大量数据时

  1. collect()

,或者如果您的驱动程序内存不足时,才更改此设置。

  1. spark.shuffle.file.buffer

默认值为 32 KB。推荐为 1 MB。这允许 Spark 在将最终映射结果写入磁盘之前进行更多缓冲。

  1. spark.file.transferTo

默认为

  1. true

。将其设置为

  1. false

将强制 Spark 在最终写入磁盘之前使用文件缓冲区传输文件;这将减少 I/O 活动。

  1. spark.shuffle.unsafe.file.output.buffer

默认值为 32 KB。这控制了在 shuffle 操作期间合并文件时可能的缓冲量。通常,较大的值(例如 1 MB)更适合较大的工作负载,而默认值适用于较小的工作负载。

  1. spark.io.compression.lz4.blockSize

默认值为 32 KB。增加到 512 KB。您可以通过增加块的压缩大小来减小 shuffle 文件的大小。

  1. spark.shuffle.service.​index.cache.size

默认为 100m。高速缓存条目仅限于以字节为单位的指定内存占用量。

  1. spark.shuffle.registration.​timeout

默认值为 5000 毫秒。增加到 120000 毫秒。

  1. spark.shuffle.registration.maxAttempts

默认值为 3。如果需要,增加到 5。

笔记

此表中的建议不适用于所有情况,但它们应该让您了解如何根据您的工作负载调整这些配置。与性能调整中的其他一切一样,您必须进行试验,直到找到正确的平衡点。

最大化 Spark 并行性

Spark 的大部分效率是由于它能够大规模并行运行多个任务。要了解如何最大限度地提高并行性(即尽可能多地并行读取和处理数据),您必须了解 Spark 如何将数据从存储中读取到内存中,以及分区对 Spark 的意义。

在数据管理用语中,分区是一种将数据排列到磁盘上的可配置和可读块或连续数据块的子集的方法。如有必要,这些数据子集可以由进程中的多个线程独立并行地读取或处理。这种独立性很重要,因为它允许数据处理的大规模并行。

Spark 在并行处理其任务方面的效率令人尴尬。正如您在第 2 章中所了解的,对于大规模工作负载,Spark 作业将有许多阶段,并且在每个阶段中都会有许多任务。Spark 最多会为每个内核的每个任务安排一个线程,并且每个任务将处理一个不同的分区。为了优化资源利用率和最大化并行性,理想的分区至少与执行器上的核心数一样多,如图 7-3 所示。如果分区数多于每个执行程序上的核心数,则所有核心都保持忙碌状态。您可以将分区视为并行性的原子单元:在单个内核上运行的单个线程可以在单个分区上工作。

图 7-3。Spark 任务、核心、分区和并行度的关系

如何创建分区

如前所述,Spark 的任务将数据处理为从磁盘读取到内存的分区。磁盘上的数据以块或连续文件块的形式排列,具体取决于存储。默认情况下,数据存储上的文件块大小范围为 64 MB 到 128 MB。例如,在 HDFS 和 S3 上,默认大小为 128 MB(这是可配置的)。这些块的连续集合构成一个分区。

Spark 中分区的大小由

  1. spark.sql.files.maxPartitionBytes

. 默认值为 128 MB。您可以减小大小,但这可能会导致所谓的“小文件问题”——许多小分区文件,由于文件系统操作(如打开、关闭和列出)而引入过多的磁盘 I/O 和性能下降目录,在分布式文件系统上可能很慢。

当您显式使用 DataFrame API 的某些方法时,也会创建分区。例如,在创建大型 DataFrame 或从磁盘读取大型文件时,您可以显式指示 Spark 创建一定数量的分区:

  1. // In Scala
  2. val ds = spark.read.textFile("../README.md").repartition(16)
  3. ds: org.apache.spark.sql.Dataset[String] = [value: string]
  4. ds.rdd.getNumPartitions
  5. res5: Int = 16
  6. val numDF = spark.range(1000L * 1000 * 1000).repartition(16)
  7. numDF.rdd.getNumPartitions
  8. numDF: org.apache.spark.sql.Dataset[Long] = [id: bigint]
  9. res12: Int = 16

最后,在洗牌阶段创建洗牌分区。默认情况下,shuffle 分区的数量设置为 200

  1. spark.sql.shuffle.partitions

。您可以根据您拥有的数据集的大小调整此数字,以减少通过网络发送给执行者任务的小分区数量。

笔记

对于较小的或流式工作负载,默认值

  1. spark.sql.shuffle.partitions

太高;您可能希望将其减少到较低的值,例如执行器上的核心数或更少。

  1. groupBy()

在or之类的操作

  1. join()

(也称为宽转换)期间创建的随机分区会消耗网络和磁盘 I/O 资源。在这些操作期间,shuffle 会将结果溢出到 executor 中指定位置的本地磁盘

  1. spark.local.directory

。拥有用于此操作的高性能 SSD 磁盘将提高性能。

为 shuffle 阶段设置的 shuffle 分区数量没有神奇的公式;该数字可能会因您的用例、数据集、内核数量和可用的执行程序内存量而异——这是一种反复试验的方法。2

除了为大型工作负载扩展 Spark 之外,为了提高性能,您还需要考虑缓存或持久化您经常访问的 DataFrame 或表。我们将在下一节探讨各种缓存和持久性选项.

数据的缓存和持久性

缓存和持久性有什么区别?在 Spark 中,它们是同义词。两个 API 调用

  1. cache()

  1. persist()

提供这些功能。后者提供了对数据存储方式和位置的更多控制——在内存和磁盘上,序列化和非序列化。两者都有助于提高经常访问的 DataFrame 或表的性能。

DataFrame.cache()

  1. cache()

将存储尽可能多的跨 Spark 执行器在内存中读取的分区(参见图 7-2)。虽然 DataFrame 可能会被部分缓存,但分区不能被部分缓存(例如,如果您有 8 个分区,但内存中只能容纳 4.5 个分区,则只会缓存 4 个)。但是,如果不是所有分区都被缓存,当您想再次访问数据时,必须重新计算未缓存的分区,从而减慢您的 Spark 作业。

让我们看一个示例,说明在访问 DataFrame 时缓存大型 DataFrame 如何提高性能:

  1. // In Scala
  2. // Create a DataFrame with 10M records
  3. val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
  4. df.cache() // Cache the data
  5. df.count() // Materialize the cache
  6. res3: Long = 10000000
  7. Command took 5.11 seconds
  8. df.count() // Now get it from the cache
  9. res4: Long = 10000000
  10. Command took 0.44 seconds

第一个

  1. count()

实现缓存,而第二个访问缓存,从而使该数据集的访问时间快近 12 倍。

笔记

当您使用

  1. cache()

or

  1. persist()

时,DataFrame 不会完全缓存,直到您调用一个遍历每条记录的操作(例如,

  1. count()

)。如果您使用类似 的操作

  1. take(1)

,则只会缓存一个分区,因为 Catalyst 意识到您不需要计算所有分区来检索一条记录。

观察 DataFrame 如何在本地主机上的一个执行程序中存储,如图 7-4 所示,我们可以看到它们都适合内存(回想一下,在低级别 DataFrames 由 RDD 支持)。

图 7-4。缓存分布在执行器内存中的 12 个分区中

DataFrame.persist()

  1. persist(StorageLevel.*LEVEL*)

细致入微,提供对如何通过StorageLevel. 表 7-2总结了不同的存储级别。磁盘上的数据始终使用Java 或 Kryo 序列化进行序列化.
表 7-2。存储级别存储级别描述

  1. MEMORY_ONLY

数据直接作为对象存储,仅存储在内存中。

  1. MEMORY_ONLY_SER

数据被序列化为紧凑的字节数组表示并仅存储在内存中。要使用它,必须以一定的代价对其进行反序列化。

  1. MEMORY_AND_DISK

数据直接作为对象存储在内存中,但如果内存不足,则将其余部分序列化并存储在磁盘上。

  1. DISK_ONLY

数据被序列化并存储在磁盘上。

  1. OFF_HEAP

数据存储在堆外。Spark 使用堆外内存进行存储和查询执行;请参阅“配置 Spark 执行器的内存和 shuffle 服务”。

  1. MEMORY_AND_DISK_SER

类似

  1. MEMORY_AND_DISK

,但数据在存储在内存中时会被序列化。(数据存储在磁盘上时总是序列化的。)

笔记

每个

  1. StorageLevel

(除了

  1. OFF_HEAP

)都有一个等价的

  1. LEVEL_NAME_2

这意味着在两个不同的 Spark 执行器上复制两次:

  1. MEMORY_ONLY_2

  1. MEMORY_AND_DISK_SER_2

等。虽然这个选项很昂贵,但它允许在两个地方进行数据本地化,提供容错性并让 Spark 可以选择将任务安排到本地数据的副本。

让我们看一下与上一节相同的示例,但使用

  1. persist()

方法:

  1. // In Scala
  2. import org.apache.spark.storage.StorageLevel
  3. // Create a DataFrame with 10M records
  4. val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
  5. df.persist(StorageLevel.DISK_ONLY) // Serialize the data and cache it on disk
  6. df.count() // Materialize the cache
  7. res2: Long = 10000000
  8. Command took 2.08 seconds
  9. df.count() // Now get it from the cache
  10. res3: Long = 10000000
  11. Command took 0.38 seconds

从图 7-5可以看出,数据保存在磁盘上,而不是内存中。要取消缓存数据,只需调用

  1. DataFrame.unpersist()

.


图 7-5。缓存分布在执行器磁盘中的 12 个分区中

最后,不仅可以缓存 DataFrame,还可以缓存从 DataFrame 派生的表或视图。这使它们在 Spark UI 中的名称更具可读性。例如:

  1. // In Scala
  2. df.createOrReplaceTempView("dfTable")
  3. spark.sql("CACHE TABLE dfTable")
  4. spark.sql("SELECT count(*) FROM dfTable").show()
  5. +--------+
  6. |count(1)|
  7. +--------+
  8. |10000000|
  9. +--------+
  10. Command took 0.56 seconds

何时缓存和持久化

缓存的常见用例是您希望重复访问大型数据集以进行查询或转换的场景。一些例子包括:

  • 迭代机器学习训练中常用的DataFrames
  • DataFrames 通常用于在 ETL 期间进行频繁的转换或构建数据管道

何时不缓存和持久化

并非所有用例都要求缓存。一些可能不需要缓存 DataFrame 的场景包括:

  • 太大而无法放入内存的 DataFrame
  • 不需要频繁使用的 DataFrame 上的廉价转换,无论大小如何

作为一般规则,您应该明智地使用内存缓存,因为它可能会在序列化和反序列化时产生资源成本,具体取决于所

  1. StorageLevel

使用的。

接下来,我们将重点讨论一些常见的 Spark 连接操作,这些操作会触发昂贵的数据移动,需要集群的计算和网络资源,以及我们如何通过组织数据来缓解这种移动.

Spark 家族加入

连接操作是大数据分析中的一种常见转换类型,其中两个数据集以表或 DataFrame 的形式通过一个共同的匹配键合并。与关系型数据库类似,Spark DataFrame 和 Dataset API 以及 Spark SQL 提供了一系列连接转换:内连接、外连接、左连接、右连接等。所有这些操作都会触发跨 Spark 执行器的大量数据移动。

这些转换的核心是 Spark 如何计算要生成的数据、要写入磁盘的键和关联数据,以及如何将这些键和数据传输到节点,作为

  1. groupBy()

  1. join()

  1. agg()

  1. sortBy()

和等操作的一部分

  1. reduceByKey()

。这种运动通常被称为shuffle

Spark 有五种不同的连接策略,通过它可以在执行器之间交换移动、排序、分组和合并数据:广播哈希连接 (BHJ)、随机哈希连接 (SHJ)、随机排序合并连接 (SMJ)、广播嵌套循环连接(BNLJ)和随机复制嵌套循环连接(又名笛卡尔积连接)。我们将在这里只关注其中两个(BHJ 和 SMJ),因为它们是您会遇到的最常见的。

广播哈希连接

也称为map-side-only join,广播散列连接用于两个数据集,一个小(适合驱动程序和执行程序的内存)和另一个大到可以理想地避免移动,需要在某些情况下连接条件或列。使用 Spark广播变量,驱动程序将较小的数据集广播到所有 Spark 执行器,如图 7-6所示,然后在每个执行器上与较大的数据集连接。这种策略避免了大交换。


图 7-6。BHJ:将较小的数据集广播给所有执行者

默认情况下,如果较小的数据集小于 10 MB,Spark 将使用广播连接。此配置设置在

  1. spark.sql.autoBroadcastJoinThreshold

; 您可以根据每个执行程序和驱动程序中的内存量来减少或增加大小。如果您确信您有足够的内存,您可以使用大于 10 MB(甚至高达 100 MB)的 DataFrame 的广播连接。

一个常见的用例是当您在两个 DataFrame 之间有一组公共键时,一个比另一个拥有更少的信息,并且您需要两者的合并视图。例如,考虑一个简单的案例,您有一个包含世界各地足球运动员的大型数据集

  1. playersDF

和他们效力的足球俱乐部的较小数据集

  1. clubsDF

,并且您希望通过一个公共键加入他们:

  1. // In Scala
  2. import org.apache.spark.sql.functions.broadcast
  3. val joinedDF = playersDF.join(broadcast(clubsDF), "key1 === key2")

笔记

在这段代码中,我们强制 Spark 进行广播连接,但如果较小数据集的大小低于

  1. spark.sql.autoBroadcastJoinThreshold

.

BHJ 是 Spark 提供的最简单和最快的连接,因为它不涉及数据集的任何洗牌;广播后,执行者可以在本地使用所有数据。您只需要确保在 Spark 驱动程序和执行程序方面都有足够的内存来将较小的数据集保存在内存中。

在操作之后的任何时候,您都可以在物理计划中看到通过执行以下操作执行了哪些连接操作:

  1. joinedDF.explain(mode)

在 Spark 3.0 中,您可以使用它来显示可读且易于理解的输出。模式包括、、、和。

  1. joinedDF.explain('*mode*')
  1. 'simple'
  1. 'extended'
  1. 'codegen'
  1. 'cost'
  1. 'formatted'

何时使用广播哈希连接

在以下条件下使用这种类型的联接以获得最大收益:

  • 当更小和更大的数据集中的每个键被 Spark 散列到同一个分区时
  • 当一个数据集比另一个小得多时(并且在 10 MB 的默认配置内,或者如果您有足够的内存则更多)
  • 当您只想执行 equi-join 时,根据匹配的未排序键组合两个数据集
  • 当您不担心过度使用网络带宽或 OOM 错误时,因为较小的数据集将被广播到所有 Spark 执行器

指定

  1. -1

in值

  1. spark.sql.autoBroadcastJoinThreshold

将导致 Spark 总是诉诸随机排序合并连接,我们将在下一节中讨论。

随机排序合并加入

排序合并算法是一种有效的方法,可以通过一个可排序的、唯一的、可以分配或存储在同一分区中的公共键来合并两个大型数据集——即,两个具有公共哈希键的数据集结束up 在同一个分区上。从 Spark 的角度来看,这意味着每个数据集中具有相同键的所有行都在同一执行程序的同一分区上进行哈希处理。显然,这意味着数据必须在执行者之间托管或交换。

顾名思义,这个连接方案有两个阶段:排序阶段和合并阶段。排序阶段通过所需的连接键对每个数据集进行排序;合并阶段迭代每个数据集的行中的每个键,如果两个键匹配,则合并行。

默认情况下,通过

  1. SortMergeJoin

启用

  1. spark.sql.join.preferSortMergeJoin

。以下是本书GitHub 存储库中可用于本章的独立应用程序笔记本的代码片段。主要思想是采用两个具有一百万条记录的大型 DataFrame,并将它们连接到两个公共键上,

  1. uid == users_id

.

这些数据是合成的,但说明了这一点:

  1. // In Scala
  2. import scala.util.Random
  3. // Show preference over other joins for large data sets
  4. // Disable broadcast join
  5. // Generate data
  6. ...
  7. spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
  8. // Generate some sample data for two data sets
  9. var states = scala.collection.mutable.Map[Int, String]()
  10. var items = scala.collection.mutable.Map[Int, String]()
  11. val rnd = new scala.util.Random(42)
  12. // Initialize states and items purchased
  13. states += (0 -> "AZ", 1 -> "CO", 2-> "CA", 3-> "TX", 4 -> "NY", 5-> "MI")
  14. items += (0 -> "SKU-0", 1 -> "SKU-1", 2-> "SKU-2", 3-> "SKU-3", 4 -> "SKU-4",
  15. 5-> "SKU-5")
  16. // Create DataFrames
  17. val usersDF = (0 to 1000000).map(id => (id, s"user_${id}",
  18. s"user_${id}@databricks.com", states(rnd.nextInt(5))))
  19. .toDF("uid", "login", "email", "user_state")
  20. val ordersDF = (0 to 1000000)
  21. .map(r => (r, r, rnd.nextInt(10000), 10 * r* 0.2d,
  22. states(rnd.nextInt(5)), items(rnd.nextInt(5))))
  23. .toDF("transaction_id", "quantity", "users_id", "amount", "state", "items")
  24. // Do the join
  25. val usersOrdersDF = ordersDF.join(usersDF, $"users_id" === $"uid")
  26. // Show the joined results
  27. usersOrdersDF.show(false)
  28. +--------------+--------+--------+--------+-----+-----+---+---+----------+
  29. |transaction_id|quantity|users_id|amount |state|items|uid|...|user_state|
  30. +--------------+--------+--------+--------+-----+-----+---+---+----------+
  31. |3916 |3916 |148 |7832.0 |CA |SKU-1|148|...|CO |
  32. |36384 |36384 |148 |72768.0 |NY |SKU-2|148|...|CO |
  33. |41839 |41839 |148 |83678.0 |CA |SKU-3|148|...|CO |
  34. |48212 |48212 |148 |96424.0 |CA |SKU-4|148|...|CO |
  35. |48484 |48484 |148 |96968.0 |TX |SKU-3|148|...|CO |
  36. |50514 |50514 |148 |101028.0|CO |SKU-0|148|...|CO |
  37. |65694 |65694 |148 |131388.0|TX |SKU-4|148|...|CO |
  38. |65723 |65723 |148 |131446.0|CA |SKU-1|148|...|CO |
  39. |93125 |93125 |148 |186250.0|NY |SKU-3|148|...|CO |
  40. |107097 |107097 |148 |214194.0|TX |SKU-2|148|...|CO |
  41. |111297 |111297 |148 |222594.0|AZ |SKU-3|148|...|CO |
  42. |117195 |117195 |148 |234390.0|TX |SKU-4|148|...|CO |
  43. |253407 |253407 |148 |506814.0|NY |SKU-4|148|...|CO |
  44. |267180 |267180 |148 |534360.0|AZ |SKU-0|148|...|CO |
  45. |283187 |283187 |148 |566374.0|AZ |SKU-3|148|...|CO |
  46. |289245 |289245 |148 |578490.0|AZ |SKU-0|148|...|CO |
  47. |314077 |314077 |148 |628154.0|CO |SKU-3|148|...|CO |
  48. |322170 |322170 |148 |644340.0|TX |SKU-3|148|...|CO |
  49. |344627 |344627 |148 |689254.0|NY |SKU-3|148|...|CO |
  50. |345611 |345611 |148 |691222.0|TX |SKU-3|148|...|CO |
  51. +--------------+--------+--------+--------+-----+-----+---+---+----------+
  52. only showing top 20 rows

检查我们的最终执行计划,我们注意到 Spark 使用了 a

  1. SortMergeJoin

,正如预期的那样,加入两个 DataFrame。

  1. Exchange

操作是map操作的结果在每个executor上的shuffle :

  1. usersOrdersDF.explain()
  2. == Physical Plan ==
  3. InMemoryTableScan [transaction_id#40, quantity#41, users_id#42, amount#43,
  4. state#44, items#45, uid#13, login#14, email#15, user_state#16]
  5. +- InMemoryRelation [transaction_id#40, quantity#41, users_id#42, amount#43,
  6. state#44, items#45, uid#13, login#14, email#15, user_state#16],
  7. StorageLevel(disk, memory, deserialized, 1 replicas)
  8. +- *(3) SortMergeJoin [users_id#42], [uid#13], Inner
  9. :- *(1) Sort [users_id#42 ASC NULLS FIRST], false, 0
  10. : +- Exchange hashpartitioning(users_id#42, 16), true, [id=#56]
  11. : +- LocalTableScan [transaction_id#40, quantity#41, users_id#42,
  12. amount#43, state#44, items#45]
  13. +- *(2) Sort [uid#13 ASC NULLS FIRST], false, 0
  14. +- Exchange hashpartitioning(uid#13, 16), true, [id=#57]
  15. +- LocalTableScan [uid#13, login#14, email#15, user_state#16]

此外,Spark UI(我们将在下一节讨论)显示了整个作业的三个阶段:

  1. Exchange

and

  1. Sort

操作发生在最后阶段,然后是结果的合并,如图7-7和7-8所示. 这是昂贵的,并且需要在执行者

  1. Exchange

之间的网络中对分区进行洗牌。


图 7-7。分桶前:Spark 的各个阶段

图 7-8。分桶前:需要交换

优化 shuffle 排序合并连接

  1. Exchange

如果我们为想要执行频繁等连接的公共排序键或列创建分区桶,我们可以从这个方案中消除这一步。也就是说,我们可以创建明确数量的存储桶来存储特定的排序列(每个存储桶一个键)。以这种方式对数据进行预排序和重组可以提高性能,因为它允许我们跳过昂贵的

  1. Exchange

操作并直接进入

  1. WholeStageCodegen

.

在本章笔记本的以下代码片段(可在本书的GitHub

  1. users_id

存储库中获得)中,我们按将加入的和

  1. uid

列进行排序和存储桶,并将存储桶保存为 Parquet 格式的 Spark 托管表:

  1. // In Scala
  2. import org.apache.spark.sql.functions._
  3. import org.apache.spark.sql.SaveMode
  4. // Save as managed tables by bucketing them in Parquet format
  5. usersDF.orderBy(asc("uid"))
  6. .write.format("parquet")
  7. .bucketBy(8, "uid")
  8. .mode(SaveMode.OverWrite)
  9. .saveAsTable("UsersTbl")
  10. ordersDF.orderBy(asc("users_id"))
  11. .write.format("parquet")
  12. .bucketBy(8, "users_id")
  13. .mode(SaveMode.OverWrite)
  14. .saveAsTable("OrdersTbl")
  15. // Cache the tables
  16. spark.sql("CACHE TABLE UsersTbl")
  17. spark.sql("CACHE TABLE OrdersTbl")
  18. // Read them back in
  19. val usersBucketDF = spark.table("UsersTbl")
  20. val ordersBucketDF = spark.table("OrdersTbl")
  21. // Do the join and show the results
  22. val joinUsersOrdersBucketDF = ordersBucketDF
  23. .join(usersBucketDF, $"users_id" === $"uid")
  24. joinUsersOrdersBucketDF.show(false)
  25. +--------------+--------+--------+---------+-----+-----+---+---+----------+
  26. |transaction_id|quantity|users_id|amount |state|items|uid|...|user_state|
  27. +--------------+--------+--------+---------+-----+-----+---+---+----------+
  28. |144179 |144179 |22 |288358.0 |TX |SKU-4|22 |...|CO |
  29. |145352 |145352 |22 |290704.0 |NY |SKU-0|22 |...|CO |
  30. |168648 |168648 |22 |337296.0 |TX |SKU-2|22 |...|CO |
  31. |173682 |173682 |22 |347364.0 |NY |SKU-2|22 |...|CO |
  32. |397577 |397577 |22 |795154.0 |CA |SKU-3|22 |...|CO |
  33. |403974 |403974 |22 |807948.0 |CO |SKU-2|22 |...|CO |
  34. |405438 |405438 |22 |810876.0 |NY |SKU-1|22 |...|CO |
  35. |417886 |417886 |22 |835772.0 |CA |SKU-3|22 |...|CO |
  36. |420809 |420809 |22 |841618.0 |NY |SKU-4|22 |...|CO |
  37. |659905 |659905 |22 |1319810.0|AZ |SKU-1|22 |...|CO |
  38. |899422 |899422 |22 |1798844.0|TX |SKU-4|22 |...|CO |
  39. |906616 |906616 |22 |1813232.0|CO |SKU-2|22 |...|CO |
  40. |916292 |916292 |22 |1832584.0|TX |SKU-0|22 |...|CO |
  41. |916827 |916827 |22 |1833654.0|TX |SKU-1|22 |...|CO |
  42. |919106 |919106 |22 |1838212.0|TX |SKU-1|22 |...|CO |
  43. |921921 |921921 |22 |1843842.0|AZ |SKU-4|22 |...|CO |
  44. |926777 |926777 |22 |1853554.0|CO |SKU-2|22 |...|CO |
  45. |124630 |124630 |22 |249260.0 |CO |SKU-0|22 |...|CO |
  46. |129823 |129823 |22 |259646.0 |NY |SKU-4|22 |...|CO |
  47. |132756 |132756 |22 |265512.0 |AZ |SKU-2|22 |...|CO |
  48. +--------------+--------+--------+---------+-----+-----+---+---+----------+
  49. only showing top 20 rows

连接的输出按

  1. uid

和排序

  1. users_id

,因为我们保存了按升序排序的表。因此,在

  1. SortMergeJoin

. 查看 Spark UI(图 7-9),我们可以看到我们跳过

  1. Exchange

  1. WholeStageCodegen

.

  1. Exchange

与分桶前的物理计划相比,物理计划还显示没有执行:

  1. joinUsersOrdersBucketDF.explain()
  2. == Physical Plan ==
  3. *(3) SortMergeJoin [users_id#165], [uid#62], Inner
  4. :- *(1) Sort [users_id#165 ASC NULLS FIRST], false, 0
  5. : +- *(1) Filter isnotnull(users_id#165)
  6. : +- Scan In-memory table `OrdersTbl` [transaction_id#163, quantity#164,
  7. users_id#165, amount#166, state#167, items#168], [isnotnull(users_id#165)]
  8. : +- InMemoryRelation [transaction_id#163, quantity#164, users_id#165,
  9. amount#166, state#167, items#168], StorageLevel(disk, memory, deserialized, 1
  10. replicas)
  11. : +- *(1) ColumnarToRow
  12. : +- FileScan parquet
  13. ...


图 7-9。分桶后:不需要交换

何时使用随机排序合并连接

在以下条件下使用这种类型的联接以获得最大收益:

  • 当两个大数据集中的每个key都可以被Spark排序并散列到同一个分区时
  • 当您只想执行 equi-joins 以根据匹配的排序键组合两个数据集时
  • 当您想防止ExchangeSort操作以保存跨网络的大洗牌时

到目前为止,我们已经涵盖了与调整和优化 Spark 相关的操作方面,以及 Spark 如何在两个常见的连接操作期间交换数据。我们还演示了如何通过使用分桶避免大量数据交换来提高随机排序合并连接操作的性能。

正如您在前面的图中所见,Spark UI 是一种可视化这些操作的有用方式。它显示了收集的指标和程序的状态,揭示了有关可能的性能瓶颈的大量信息和线索。在本章的最后一节,我们将讨论在 Spark UI 中要查找的内容.

检查 Spark UI

Spark 提供了一个精致的 Web UI,允许我们检查应用程序的各种组件。它提供有关内存使用、作业、阶段和任务的详细信息,以及事件时间线、日志和各种指标和统计信息,可以让您深入了解 Spark 应用程序中发生的事情,包括 Spark 驱动程序级别和单个执行程序.

作业将

  1. spark-submit

启动 Spark UI,您可以在本地主机(在本地模式下)或通过 Spark 驱动程序(在其他模式下)在默认端口 4040 上连接到它。

Spark UI 选项卡之旅

Spark UI 有六个选项卡,如图 7-10所示,每个选项卡都提供了探索的机会。让我们来看看每个选项卡向我们揭示了什么。

图 7-10。Spark UI 选项卡

此讨论适用于 Spark 2.x 和 Spark 3.0。虽然 Spark 3.0 中的大部分 UI 都相同,但它还添加了第七个选项卡,即结构化流。第 12 章对此进行了预览。

工作和阶段

正如您在第 2 章中所了解的,Spark 将应用程序分解为作业、阶段和任务。Jobs 和 Stages 选项卡允许您浏览这些并深入到细粒度级别以检查各个任务的详细信息。您可以查看它们的完成状态并查看与 I/O、内存消耗、执行持续时间等相关的指标。

图 7-11显示了带有展开的 Event Timeline 的 Jobs 选项卡,显示了何时将执行程序添加到集群或从集群中删除。它还提供集群中所有已完成作业的表格列表。持续时间列指示完成每个作业(由第一列中的作业 ID 标识)完成的时间。如果此时间很长,则表明您可能希望调查该作业的各个阶段,以查看哪些任务可能导致延迟。在此摘要页面中,您还可以访问每个作业的详细信息页面,包括 DAG 可视化和已完成阶段的列表。


图 7-11。“作业”选项卡提供事件时间线视图和所有已完成作业的列表

Stages 选项卡提供应用程序中所有作业的所有阶段的当前状态摘要。您还可以访问每个阶段的详细信息页面,提供 DAG 及其任务的指标(图 7-12)。除了一些其他可选的统计信息,您还可以查看每个任务的平均持续时间、垃圾收集 (GC) 所花费的时间以及读取的 shuffle 字节/记录数。如果正在从远程执行程序读取 shuffle 数据,则较高的 Shuffle Read Blocked Time 可能表示 I/O 问题。高 GC 时间表示堆上的对象过多(您的执行程序可能内存不足)。如果一个阶段的最大任务时间远大于中位数,那么您的分区中的数据分布不均匀可能会导致数据倾斜。寻找这些明显的迹象。


图 7-12。“阶段”选项卡提供有关阶段及其任务的详细信息

您还可以在此页面上查看每个执行者的汇总指标以及各个任务的细分。

Executors(执行者)

Executors 选项卡提供有关为应用程序创建的执行程序的信息。如图7-13 所示,您可以深入了解有关资源使用(磁盘、内存、内核)、GC 花费的时间、shuffle 期间写入和读取的数据量等细节。


图 7-13。Executors 选项卡显示 Spark 应用程序使用的执行程序的详细统计信息和指标

除了汇总统计信息之外,您还可以查看每个单独的执行程序如何使用内存以及用于什么目的。当您在 DataFrame 或托管表上使用

  1. cache()

or方法时,这也有助于检查资源使用情况,我们将在下面讨论。

  1. persist()

Storage(贮存)

在“Shuffle Sort Merge Join”的 Spark 代码中,我们在分桶后缓存了两个托管表。Storage 选项卡,如图 7-14所示,提供有关应用程序由于

  1. cache()

or

  1. persist()

方法而缓存的任何表或 DataFrame 的信息。

图 7-14。存储选项卡显示有关内存使用情况的详细信息

点击图 7-14中的“In-memory table UsersTbl”链接更进一步,显示了该表是如何跨 1 个执行程序和 8 个分区缓存在内存和磁盘上的——这个数字对应于我们的桶数为该表创建(见图 7-15)。


图 7-15。Spark UI 显示跨执行器内存的缓存表分布

SQL

作为 Spark 应用程序的一部分执行的 Spark SQL 查询的效果可通过 SQL 选项卡进行跟踪和查看。您可以查看查询的执行时间、执行的作业及其持续时间。例如,在我们的

  1. SortMergeJoin

示例中,我们执行了一些查询;所有这些都显示在图 7-16中,并带有进一步向下钻取的链接。


图 7-16。SQL 选项卡显示有关已完成 SQL 查询的详细信息

单击查询的描述会显示包含所有物理运算符的执行计划的详细信息,如图 7-17所示。在计划的每个物理运算符下(此处为

  1. Scan In-memory table

  1. HashAggregate

和)是

  1. Exchange

SQL指标。

当我们想要检查物理运算符的详细信息并发现发生了什么时,这些指标很有用:扫描了多少行,写入了多少 shuffle 字节等。


图 7-17。Spark UI 显示 SQL 查询的详细统计信息

Environment(环境)

图 7-18所示的 Environment 选项卡与其他选项卡一样重要。了解 Spark 应用程序运行的环境会发现许多有助于故障排除的线索。事实上,必须知道设置了哪些环境变量、包含了哪些 jar、设置了哪些 Spark 属性(以及它们各自的值,特别是如果您调整了“优化和调整 Spark 以提高效率”中提到的一些配置),什么系统属性已设置,使用的运行时环境(如 JVM 或 Java 版本)等。所有这些只读详细信息都是信息的金矿,可在您发现 Spark 应用程序中的任何异常行为时补充您的调查工作。


图 7-18。Environment 选项卡显示 Spark 集群的运行时属性

调试 Spark 应用程序

在本节中,我们浏览了 Spark UI 中的各个选项卡。如您所见,UI 提供了大量信息,可用于调试和解决 Spark 应用程序的问题。除了我们在这里介绍的内容之外,它还提供对驱动程序和执行程序 stdout/stderr 日志的访问,您可能在其中记录了调试信息。

通过 UI 进行调试与在您最喜欢的 IDE 中逐步调试应用程序是一个不同的过程——更像是侦查,跟踪面包屑的踪迹——尽管如果您更喜欢这种方法,您也可以在IntelliJ IDEA等IDE 中调试 Spark 应用程序本地主机。

Spark 3.0 UI 选项卡揭示了有关所发生事情的深刻见解,以及对驱动程序和执行程序 stdout/stderr 日志的访问,您可能在其中记录了调试信息。

最初,这些过多的信息可能会让新手不知所措。但随着时间的推移,您将了解在每个选项卡中要查找的内容,并且您将开始能够更快地检测和诊断异常。模式将变得清晰,通过经常访问这些选项卡并在运行一些 Spark 示例后熟悉它们,您将习惯于通过 UI 调整和检查 Spark 应用程序.

概括

在本章中,我们讨论了一些优化 Spark 应用程序的优化技术。如您所见,通过调整一些默认的 Spark 配置,您可以改进大型工作负载的扩展、增强并行性并最大限度地减少 Spark 执行器之间的内存不足。您还了解了如何使用具有适当级别的缓存和持久化策略来加快对常用数据集的访问,我们检查了 Spark 在复杂聚合过程中使用的两个常用连接,并演示了如何通过排序键对 DataFrame 进行分桶,您可以跳过昂贵的洗牌操作。

最后,为了从视觉上了解性能,Spark UI 完成了这张图片。尽管 UI 内容丰富且详细,但它并不等同于 IDE 中的逐步调试;然而,我们通过检查和收集来自六个 Spark UI 选项卡上可用的指标和统计数据、计算和内存使用数据以及 SQL 查询执行跟踪的见解,展示了如何成为 Spark 侦探。

标签: spark apache scala

本文转载自: https://blog.csdn.net/sikh_0529/article/details/127409012
版权归原作者 Sonhhxg_柒 所有, 如有侵权,请联系我们删除。

“【Apache Spark 】第 7 章优化和调优 Spark 应用程序”的评论:

还没有评论