SparkSQL调优
文章目录
Explain 查看执行计划
语法
sparkSession.sql("xxx").explain()
- explain(mode=“simple”):只展示物理执行计划。
- explain(mode=“extended”):展示物理执行计划和逻辑执行计划。
- explain(mode=“codegen”) :展示要 Codegen 生成的可执行 Java 代码。
- explain(mode=“cost”):展示优化后的逻辑执行计划以及相关的统计。
- explain(mode=“formatted”):以分隔的方式输出,它会输出更易读的物理执行计划,并展示每个节点的详细信息。
执行计划处理流程
分析 – 逻辑优化 – 生成物理执行计划 – 评估模型分析 – 代码生成
- == Parsed Logical Plan == :Unresolved 逻辑执行计划 - Parser 组件检查 SQL 语法上是否有问题,然后生成 Unresolved(未决断)的逻辑计划,不检查表名、不检查列名
- == Analyzed Logical Plan == :Resolved 逻辑执行计划 - 通过访问 Spark 中的 Catalog 存储库来解析验证语义、列名、类型、表名等
- == Optimized Logical Plan == :优化后的逻辑执行计划 - Catalyst 优化器根据各种规则进行优化
- == Physical Plan == :物理执行计划 - HashAggregate 运算符表示数据聚合,一般 HashAggregate 是成对出现,第一个HashAggregate 是将执行节点本地的数据进行局部聚合,另一个 HashAggregate 是将各个分区的数据进一步进行聚合计算- Exchange 运算符其实就是 shuffle,表示需要在集群上移动数据。很多时候HashAggregate 会以 Exchange 分隔开来- Project 运算符是 SQL 中的投影操作,就是选择列(例如:select name, age…)- BroadcastHashJoin 运算符表示通过基于广播方式进行 HashJoin- LocalTableScan 运算符就是全表扫描本地的表
资源调优
内存说明
spark.memory.fraction=(估算 storage 内存+估算 Execution 内存)/(估算 storage 内存+估算 Execution 内存+估算 Other 内存)
spark.memory.storageFraction =(估算 storage 内存)/(估算 storage 内存+估算Execution 内存)
Storage 堆内内存=(spark.executor.memory–300MB)spark.memory.fractionspark.memory.storageFraction
Execution 堆内内存=(spark.executor.memory–300MB)spark.memory.fraction(1-spark.memory.storageFraction)
spark任务提交到yarn上运行命令
${SPARK_DIR}/bin/spark-submit \--masteryarn\
--deploy-mode cluster \--queue root.default \
--driver-memory 4g \
--executor-memory 8g \
--num-executors 6\
--executor-cores 2\--confspark.sql.shuffle.partitions=2000\--confspark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER \--confspark.executor.extraJavaOptions='-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'\--class com.test.DataETLMain \
/opt/spark_sql_test.jar
参数说明–queue任务提交到yarn的队列–driver-memory每个 driver 的内存–executor-cores每个 executor 的最大核数(3~6 之间比较合理 )–num-executorsexecutor 的个数–executor-memory每个 executor 的内存–conf任务运行配置
CPU优化
修改并行度(分区个数)
- rdd:
spark.default.parallelism
- 设置 RDD 的默认并行度,没有设置时,由 join、reduceByKey 和 parallelize 等转换决定 - sparksql:
spark.sql.shuffle.partitions
- 适用 SparkSQL 时,Shuffle Reduce 阶段默认的并行度,默认 200。此参数只能控制Spark sql、DataFrame、DataSet 分区个数。不能控制 RDD 分区个数
SparkSQL语法优化
基于RBO优化
- 谓词下推:将过滤条件的谓词逻辑都尽可能提前执行,减少下游处理的数据量
- 列裁剪:列剪裁就是扫描数据源的时候,只读取那些与查询相关的字段
- 常量替换
基于CBO优化
CBO 优化主要在物理计划层面,原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划
通过
spark.sql.cbo.enabled
来开启,默认是 false
参数描述默认值spark.sql.cbo.enabledCBO 总开关
true 表示打开,false 表示关闭
要使用该功能,需确保相关表和列的统计信息已经生成falsespark.sql.cbo.joinReorder.enabled使用 CBO 来自动调整连续的 inner join 的顺序
true:表示打开
false:表示关闭要使用该功能,需确保相关表和列的统计信息已经生成,且CBO 总开关打开falsespark.sql.cbo.joinReorder.dp.threshold使用 CBO 来自动调整连续 inner join 的表的个数阈值
如果超出该阈值,则不会调整 join 顺序12
广播join
Spark join 策略中,如果当一张小表足够小并且可以先缓存到内存中,那么可以使用Broadcast Hash Join,其原理就是先将小表聚合到 driver 端,再广播到各个大表分区中,那么再次进行 join 的时候,就相当于大表的各自分区的数据与小表进行本地 join,从而规避了shuffle
方式一:通过参数指定自动广播
spark.sql.autoBroadcastJoinThreshold
:广播join默认值 10M
可更改参数值:
- 方式一:在程序里面添加参数值
sparkConf.set("spark.sql.autoBroadcastJoinThreshold","20m")
- 方式二:在执行命令配置中添加参数值
--confspark.sql.autoBroadcastJoinThreshold=20m
方式二:强行广播
使用Hint注解方式
//TODO SQL Hint方式
val sqlstr1 ="""
|select /*+ BROADCASTJOIN(sc) */
| sc.courseid,
| csc.courseid
|from sale_course sc join course_shopping_cart csc
|on sc.courseid=csc.courseid
""".stripMargin
val sqlstr2 ="""
|select /*+ BROADCAST(sc) */
| sc.courseid,
| csc.courseid
|from sale_course sc join course_shopping_cart csc
|on sc.courseid=csc.courseid
""".stripMargin
val sqlstr3 ="""
|select /*+ MAPJOIN(sc) */
| sc.courseid,
| csc.courseid
|from sale_course sc join course_shopping_cart csc
|on sc.courseid=csc.courseid
""".stripMargin
SMB Join
SMB JOIN 是 sort merge bucket 操作,需要进行分桶,首先会进行排序,然后根据 key值合并,把相同 key 的数据放到同一个 bucket 中(按照 key 进行 hash)。分桶的目的其实就是把大表化成小表。相同 key 的数据都在同一个桶中之后,再进行 join 操作,那么在联合的时候就会大幅度的减小无关项的扫描
使用条件:
- 两表进行分桶,桶的个数必须相等
- 两边进行 join 时,join 列=排序列=分桶列
数据倾斜
Join 数据倾斜优化
广播join
适用于小表 join 大表。小表足够小,可被加载进 Driver 并通过 Broadcast 方法广播到各个 Executor 中
解决逻辑:在小表 join 大表时如果产生数据倾斜,那么广播 join 可以直接规避掉此 shuffle 阶段。直接优化掉 stage。并且广播 join 也是 Spark Sql 中最常用的优化方案
拆分大 key 打散大表 扩容小表
适用于 join 时出现数据倾斜
解决逻辑:
- 将存在倾斜的表,根据抽样结果,拆分为倾斜 key(skew 表)和没有倾斜 key(common)的两个数据集
- 将 skew 表的 key 全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集(old 表)整体与随机前缀集作笛卡尔乘积(即将数据量扩大 N 倍,得到 new 表)
- 打散的 skew 表 join 扩容的 new 表
- 将 skew 表的 key 去掉前缀
Job 优化
Map 端优化
Map 端预聚合
map-side 预聚合,就是在每个节点本地对相同的 key 进行一次聚合操作
读取小文件优化
读取的数据源有很多小文件,会造成查询性能的损耗,大量的数据分片信息以及对应产生的 Task 元信息也会给 Spark Driver 的内存造成压力,带来单点问题
设置参数:
# 一个分区最大字节数,默认 128mspark.sql.files.maxPartitionBytes=128MB
# 打开一个文件的开销,默认 4mspark.files.openCostInBytes=4194304
增大 map 溢写时输出流 buffer
- map 端 Shuffle Write 有一个缓冲区,初始阈值 5m,超过会尝试增加到 2当前使用内存。如果申请不到内存,则进行溢写*(这个参数是 internal,指定无效,资源足够会自动扩容,所以不需要我们去设置)
spark.shuffle.spill.initialMemoryThreshold
:5242880 - Shuffle 文件涉及到序列化,是采取批的方式读写,默认按照每批次 1 万条去读写(这个参数是 internal,指定无效)
spark.shuffle.spill.batchSize
:10000 - 溢写时使用输出流缓冲区默认 32k,这些缓冲区减少了磁盘搜索和系统调用次数,适当提高可以提升溢写效率
spark.shuffle.file.buffer
:32
Reduce 端优化
增大 reduce 缓冲区,减少拉取次数
Spark Shuffle 过程中,shuffle reduce task 的 buffer 缓冲区大小决定了 reduce task 每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能
spark.reducer.maxSizeInFlight
reduce 端数据拉取缓冲区的大小设置,默认为 48MB
调节 reduce 端拉取数据重试次数
Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试
spark.shuffle.io.maxRetrie
reduce 端拉取数据重试次数设置,该参数就代表了可以重试的最大次数。默认为 3
调节 reduce 端拉取数据等待间隔
Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如 60s),以增加 shuffle 操作的稳定性
spark.shuffle.io.retryWait
reduce 端拉取数据等待间隔设置,默认值为 5s
Spark AQE
Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution),即自适应查询执行。AQE 是Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化
动态合并分区
动态切换 Join 策略
动态优化 Join 倾斜
版权归原作者 code@fzk 所有, 如有侵权,请联系我们删除。