0


Spark原理及调优

spark官档

  • hints:https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html
  • 调优参数:https://spark.apache.org/docs/latest/sql-performance-tuning.html#join-strategy-hints-for-sql-queries
  • 作者几乎把所有的RDD API查了个遍,仔细研究每一个算子的含义和运行原理,汇总不同算子的适用场景,总结哪些算子会引入Shuffle,对比同类功能算子的差异与优劣势,比如map和mapPartitions,再比如groupByKey、reduceByKey和aggregateByKey。 除此之外,Spark官网的Configuration页面作者也查阅了无数次,汇总与性能有关的配置项,不停地做对比实验,比较不同参数配置下的执行性能。遇到与认知不符的实验结果,就再回去反复咀嚼Spark的核心原理,从RDD和调度系统,到内存管理和存储系统,再到内存计算和Shuffle,如此往复,乐此不疲。 RDD、DAG、调度系统、存储系统和内存管理

01性能调优的必要性

Spark天生的执行效率再高,也需要你针对具体的应用场景和运行环境进行性能调优。仅仅实现业务逻辑常常是不够的。
第一个案例讲的是,在函数被频繁调用的情况下,函数里面一个简单变量所引入的性能开销被成倍地放大。第二个例子讲的是,不恰当的实现方式导致海量数据被反复地扫描成百上千次(foreach collect 大表)。
**

性能调优的收益显而易见

**:一来可以节约成本,尤其是按需付费的云上成本,更短的执行时间意味着更少的花销;二来可以提升开发的迭代效率,尤其是对于从事数据分析、数据科学、机器学习的同学来说,更高的执行效率可以更快地获取数据洞察,更快地找到模型收敛的最优解。因此你看,性能调优不是一件锦上添花的事情,而是开发者必须要掌握的一项傍身技能

02 性能调优的本质

从硬件资源消耗的角度切入,往往是个不错的选择。我们都知道,从硬件的角度出发,计算负载划分为计算密集型、内存密集型和IO密集型。如果我们能够明确手中的应用属于哪种类型,自然能够缩小搜索范围,从而更容易锁定性能瓶颈。
**

性能调优的本质我们可以归纳为4点。

**

  1. 性能调优不是一锤子买卖,补齐一个短板,其他板子可能会成为新的短板。因此,它是一个动态、持续不断的过程。
  2. 能调优的手段和方法是否高效,取决于它针对的是木桶的长板还是瓶颈。针对瓶颈,事半功倍;针对长板,事倍功半。
  3. 性能调优的方法和技巧,没有一定之规,也不是一成不变,随着木桶短板的此消彼长需要相应的动态切换。
  4. 性能调优的过程收敛于一种所有木板齐平、没有瓶颈的状态。

性能调优的方法与手段

  1. 应用代码 开发阶段都有哪些常规操作、常见误区,从而尽量避免在代码中留下性能隐患。
  2. Spark配置项 Spark官网上罗列了近百个配置项,看得人眼花缭乱,但并不是所有的配置项都和性能调优息息相关,因此我们需要对它们进行甄别、归类。

系统化的性能调优方法论,归纳为4条:

  1. 通过不同的途径如专家经验或运行时诊断来定位性能瓶颈;
  2. 从不同场景(典型场景)、不同视角(硬件资源)出发,综合运用不同层面(应用代码、Spark配置项)的调优手段和方法;
  3. 随着性能瓶颈的此消彼长,动态灵活地在不同层面之间切换调优方法;
  4. 让性能调优的过程收敛于不同硬件资源在运行时达到一种平衡、无瓶颈的状态。性能调优的最终目的,是在所有参与计算的硬件资源之间寻求协同与平衡,让硬件资源达到一种平衡、无瓶颈的状态。

03 RDD 弹性分布式数据集

一 RDD为何如此重要

首先,RDD作为Spark对于分布式数据模型的抽象,是构建Spark分布式内存计算引擎的基石。很多Spark核心概念与核心组件,如DAG和调度系统都衍生自RDD。因此,深入理解RDD有利于你更全面、系统地学习Spark的工作原理。
其次,尽管RDD API使用频率越来越低,绝大多数人也都已经习惯于DataFrame和Dataset API,但是,无论采用哪种API或是哪种开发语言,你的应用在Spark内部最终都会转化为RDD之上的分布式计算。换句话说,如果你想要在运行时判断应用的性能瓶颈,前提是你要对RDD足够了解。还记得吗?定位性能瓶颈是Spark性能调优的第一步。
深入理解RDD有利于你跳出单机思维模式,避免在应用代码中留下性能隐患。

二 深入理解RDD

从薯片的加工流程看RDD:

  1. 为了充分利用每一颗土豆、降低生产成本,工坊使用 3 条流水线来同时生产 3 种不同尺寸的桶装薯片。3 条流水线可以同时加工 3 颗土豆,每条流水线的作业流程都是一样的,分别是清洗、切片、烘焙、分发和装桶。
  2. 土豆工坊的每条流水线就像是分布式环境中的计算节点;
  3. 不同的食材形态,如带泥的土豆、土豆切片、烘烤的土豆片等等,对应的就是RDD;partitions
  4. 每一种食材形态都会依赖上一种形态,如烤熟的土豆片依赖上一个步骤的生土豆切片。这种依赖关系对应的就是RDD中的dependencies属性;生成该RDD所依赖的父RDD不同环节的加工方法对应RDD的compute属性; 生成该RDD的计算接口
  5. 同一种食材形态在不同流水线上的具体实物,就是RDD的partitions属性; RDD的所有数据分片实体
  6. 食材按照什么规则被分配到哪条流水线,对应的就是RDD的partitioner属性。 7.划分数据分片的规则

三 RDD的核心特征和属性

横向属性:分布式

在分布式运行环境中,RDD封装的数据在物理上散落在不同计算节点的内存或是磁盘中,这些散落的数据被称“数据分片”,RDD的分区规则决定了哪些数据分片应该散落到哪些节点中去。RDD的partitions属性对应着RDD分布式数据实体中所有的数据分片,而partitioner属性则定义了划分数据分片的分区规则,如按哈希取模或是按区间划分等。

纵向属性:容错性

在Spark中,任何一个 RDD 都不是凭空产生的,每个 RDD 都是基于某种计算逻辑从某个“数据源”转换而来。RDD的dependencies属性记录了生成RDD 所需的“数据源”,术语叫做父依赖(或父RDD),compute方法则封装了从父 RDD到当前RDD转换的计算逻辑。

04 到底啥叫“内存计算”

第一层含义:分布式数据缓存

只有需要频繁访问的数据集才有必要cache,对于一次性访问的数据集,cache不但不能提升执行效率,反而会产生额外的性能开销,让结果适得其反。

第二层含义:Stage内的流水线式计算模式

什么是DAG

  1. 在Spark的DAG中,顶点是一个个RDD,边则是RDD之间通过dependencies属性构成的父子关系。
  2. 从开发者的视角出发,DAG的构建是通过在分布式数据集上不停地调用算子来完成的。

DAG的划分

  1. 如果用一句话来概括从DAG到Stages的转化过程,那应该是:以Actions算子为起点,从后向前回溯DAG,以Shuffle操作为边界去划分Stages。
  2. DAG->在分布式环境中执行的4个环节 回溯DAG并划分Stages 在Stages中创建分布式任务 分布式任务的分发 分布式任务的执行

Stage中的内存计算

  1. 常规计算:缓存中间数据 MapReduce提供两类计算抽象,分别是Map和Reduce:Map抽象允许开发者通过实现map 接口来定义数据处理逻辑;Reduce抽象则用于封装数据聚合逻辑。MapReduce计算模型最大的问题在于,所有操作之间的数据交换都以磁盘为媒介。例如,两个Map操作之间的计算,以及Map与Reduce操作之间的计算都是利用本地磁盘来交换数据的。不难想象,这种频繁的磁盘I/O必定会拖累用户应用端到端的执行性能。
  2. 同一Stage内所有算子融合为一个函数,Stage的输出结果由这个函数一次性作用在输入数据集而产生。

05 调度系统:数据不动代码动到底是什么意思

1. 案例:对用户兴趣特征做Label Encoding

在第1种实现方式中,函数是一个接收两个形参的普通标量函数,Dataset调用这个函数在千亿级样本上做Label encoding。
第2种实现方式,2个计算步骤被封装到一个高阶函数中。用户代码先在Driver端用模板文件调用这个高阶函数,完成第一步计算建立字典的过程,同时输出一个只带一个形参的标量函数,这个标量函数内携带了刚刚建好的映射字典。最后,Dataset将这个标量函数作用于千亿样本之上做Label encoding。

2. Spark调度系统的工作流程包含如下5个步骤:

将DAG拆分为不同的运行阶段Stages;
创建分布式任务Tasks和任务组TaskSet;
获取集群内可用的硬件资源情况;
按照调度规则决定优先调度哪些任务/组;
依序将分布式任务分发到执行器Executor。

3. 调度系统中的核心组件有哪些?

需求端 - DAGScheduler

把用户DAG拆分为Stages
在Stage内创建计算任务Tasks,这些任务囊括了用户通过组合不同算子实现的数据转换逻辑。然后,执行器Executors接收到Tasks,会将其中封装的计算函数应用于分布式数据分片,去执行分布式的计算过程。

供给端 - SchedulerBackend

对内,SchedulerBackend用ExecutorData对Executor进行资源画像;
对外,SchedulerBackend以WorkerOffer为粒度提供计算资源,WorkerOffer封装了Executor ID、主机地址和CPU核数,用来表示一份可用于调度任务的空闲资源

中介平台 - TaskScheduler

TaskScheduler的职责是,基于既定的规则与策略达成供需双方的匹配与撮合。
一个是

不同Stages之间的调度

优先级: 对于这种Stages之间的任务调度,TaskScheduler提供了2种调度模式,分别是FIFO(先到先得)和FAIR(公平调度)
一个是

Stages内不同任务之间

的调度优先级。
任务自带调度意愿,它通过本地性级别告诉TaskScheduler自己更乐意被调度到哪里去。
Spark调度系统的原则是尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销。

06 存储系统:是时间换空间还是空间换时间?

1 Spark存储系统是为谁服务的?

RDD缓存:将RDD以缓存的形式物化到内存或磁盘的过程。

对于一些计算成本和访问频率都比较高的

RDD

来说,缓存有两个好处:一是通过截断DAG,可以降低失败重试的计算开销;二是通过对缓存内容的访问,可以有效减少从头计算的次数,从整体上提升作业端到端的执行性能。

Shuffle中间文件:Shuffle Map阶段的输出结果,这些结果会以文件的形式暂存于本地磁盘

在集群范围内,Reducer想要拉取属于自己的那部分中间数据,就必须要知道这些数据都存储在哪些节点,以及什么位置。而这些关键的元信息,正是由Spark存储系统保存并维护的。因此你看,没有存储系统,

Shuffle

是玩不转的。

广播变量:往往用于在集群范围内分发访问频率较高的小数据

利用存储系统,

广播变量

可以在Executors进程范畴内保存全量数据。

2 存储系统的基本组件有哪些?

BlockManager:Executors端负责统一管理和协调数据的本地存取与跨节点传输

对外,

BlockManager与Driver端的BlockManagerMaster通信

,不仅定期向BlockManagerMaster汇报本地数据元信息,还会不定时按需拉取全局数据存储状态。另外,不同Executors的BlockManager之间也会以Server/Client模式跨节点推送和拉取数据块。
对内,

BlockManager通过组合存储系统内部组件的功能来实现数据的存与取、收与发


BlockManager正是利用MemoryStore和DiskStore来分别管理数据在内存和磁盘中的存取。

  1. 广播变量的全量数据存储在Executors进程中,因此它由MemoryStore管理。
  2. Shuffle中间文件往往会落盘到本地节点,所以这些文件的落盘和访问就要经由DiskStore。
  3. RDD缓存会稍微复杂一些,由于RDD缓存支持内存缓存和磁盘缓存两种模式,因此我们要视情况而定,缓存在内存中的数据会封装到MemoryStore,缓存在磁盘上的数据则交由DiskStore管理。

数据的存储形式

  1. 对象值(Object Values)和字节数组(Byte Array)
  2. 序列化和反序列化
  3. 对象值和字节数组二者之间存在着一种博弈关系,也就是所谓的“以空间换时间”和“以时间换空间”,两者之间该如何取舍,我们还是要看具体的应用场景。核心原则就是:如果想省地儿,你可以优先考虑字节数组;如果想以最快的速度访问对象,还是对象值更直接一些。
  4. DiskStore只能存储序列化后的字节数组,凡是落盘的东西,都需要先进行序列化。

透过RDD缓存看MemoryStore

  1. MemoryStore同时支持存储对象值和字节数组这两种不同的数据形式
  2. 统一采用MemoryEntry数据抽象对它们进行封装,实现类有两个 :DeserializedMemoryEntry 封装原始对象值 Array[T];SerializedMemoryEntry 封装序列化之后的字节数组 ByteBuffe
  3. LinkedHashMap[BlockId, MemoryEntry]:得益于MemoryEntry对于对象值和字节数组的统一封装,MemoryStore能够借助一种高效的数据结构来统一存储与访问数据块:LinkedHashMap[BlockId, MemoryEntry],即 Key 为BlockId,Value 是MemoryEntry的链式哈希字典。 在这个字典中,一个Block对应一个MemoryEntry。 在逻辑关系上,RDD的数据分片与存储系统的Block一一对应,也就是说一个RDD数据分片会被物化成一个内存或磁盘上的Block。
  4. RDD缓存步骤 第一步:Unroll Memory,把RDD迭代器展开为数据值,然后把这些数据值暂存到一个叫做ValuesHolder的数据结构里。 第二步:Transfer,为了节约存储空间,我们需要调用toArray或者toBuffer函数,将RDD迭代器展开的数据值转换成MemoryEntry数据结构 第三步:存储数据的元数据信息。这些包含RDD数值的MemoryEntry以及其对应的blockId会被存入LinkedHashMap<key=blockId,value=MemoryEntry引用的>的链式数据字典中 📝1. “如果内存空间不足以容纳整个RDD怎么办?”很简单,强行把大RDD塞进有限的内存空间肯定不是明智之举,所以Spark会按照LRU策略逐一清除字典中最近、最久未使用的Block,以及其对应的MemoryEntry。相比频繁的展开、物化、换页所带来的性能开销,缓存下来的部分数据对于RDD高效访问的贡献可以说微乎其微。 📝2. 当所有RDD都被转换成MemoryEntry并且将元数据信息存储到LinkedHashMap中便完成了RDD的数据缓存到内存中的过程。

3 透过Shuffle看DiskStore

DiskStore存取的本质在字节序列和磁盘文件之间的转换
DiskBlockManager用于逻辑数据块Block与磁盘文件系统中物理文件的对应关系,每个Block都对应一个磁盘文件
**

Shuffle过程

**

  1. Shuffle write 有3类结果文件:temp_shuffle_XXX、shuffle_XXX.data和shuffle_XXX.index 在Shuffle write的不同阶段,Shuffle manager通过BlockManager调用DiskStore的putBytes方法将数据块写入文件。文件由DiskBlockManager创建,文件名就是putBytes方法中的Block ID,这些文件会以“temp_shuffle”或“shuffle”开头,保存在spark.local.dir目录下的子目录里。
  2. Shuffle read 在Shuffle read阶段,Shuffle manager再次通过BlockManager调用DiskStore的getBytes方法,读取data文件和index文件,将文件内容转化为数据块,最终这些数据块会通过网络分发到Reducer端进行聚合计算。

4. 思考

结合RDD数据存储到MemoryStore的过程,你能推演出通过MemoryStore通过getValues/getBytes方法去访问RDD缓存内容的过程吗?
RDD缓存的逆过程,细节待考证
参考RDD缓存存储的过程,你能推演出广播变量存入MemoryStore的流程吗?
我的回答:driver->Executor,变量->MemoryEntry->LinkedHashMap,待考证

07 内存管理基础:Spark如何高效利用有限的存储空间?

1 内存的管理模式

堆内内存:堆内内存的申请与释放统一由JVM代劳
堆外内存:Spark通过调用Unsafe的allocateMemory和freeMemory方法直接在操作系统内存中申请、释放内存空间。这样的内存管理方式自然不再需要垃圾回收机制,也就免去了它带来的频繁扫描和回收引入的性能开销。更重要的是,空间的申请与释放可以精确计算,因此Spark对堆外可用内存的估算会更精确,对内存的利用率也更有把握。

2 内存区域的划分

  1. 堆内内存Spark.executor.memory 执行和缓存的内存空间,包括execution memory和storage memory User Memory的内存空间,它用于存储开发者自定义数据结构。 Reserved Memory,它被用来存储各种Spark内部对象,例如存储系统中的BlockManager、DiskBlockManager等等。
  2. 堆外内存Spark.memory.offHeap.size Execution Memory:一块用于执行分布式任务,如Shuffle、Sort和Aggregate等操作 Storage Memory:一块用于缓存RDD和广播变量等数据
  3. 执行与缓存内存 数据集缓存和Stage内的流水线计算 Spark1.6之后推出了统一内存管理模式:统一内存管理模式指的是Execution Memory和Storage Memory之间可以相互转化;执行内存要比缓存内存具有更高的优先级执行内存主要有两项任务:Shuffle map 阶段的数据转换、映射、排序、聚合和归并等操作;Shuffle reduce 阶段完成数据的排序和聚合抢占内存的规则:如果对方的内存空间有空闲,双方就都可以抢占;对于RDD缓存任务抢占的执行内存,当执行任务有内存需要时,RDD缓存任务必须立即归还抢占的内存,涉及的RDD缓存数据要么落盘、要么清除;对于分布式计算任务抢占的Storage Memory内存空间,即便RDD缓存任务有收回内存的需要,也要等到任务执行完毕才能释放。

3从代码看内存消耗

内存区域分为Reserved Memory、User Memory、Execution Memory和Storage Memory。
其中,Reserved Memory用于存储Spark内部对象,User Memory用于存储用户自定义的数据结构,Execution Memory用于分布式任务执行,而Storage Memory则用来容纳RDD缓存和广播变量。

08 应用开发三原则:如何拓展自己的开发边界

一 坐享其成

1 如何利用钨丝计划的优势?

它的优势是,可以通过对数据模型与算法的优化,把Spark应用程序的执行性能提升一个数量级。
在数据结构方面,Tungsten自定义了紧凑的二进制格式。存储效率+计算开销
Tungsten利用Java Unsafe API开辟堆外(Off Heap Memory)内存来管理对象。对内存占用的估计更精确,不需要像Java heap 那样反复进行垃圾回收
Tungsten用全阶段代码生成(Whol Stage Code Generation)取代火山迭代模型,提高CPU的使用效率

2 如何利用AQE的优势?

自适应查询执行Adaptive Query Execution
Spark SQL的优化过程可以大致分为语法分析、语义解析、逻辑计划和物理计划这几个环节。Spark3.0之前仅仅在编译时基于规则和策略遍历AST查询语法树,来优化逻辑计划,一旦基于最佳逻辑计划选定最佳物理执行计划,Spark就会严格按照这个物理执行计划机械地执行这个计划。AQE可以让Spark在运行时的各个阶段,周期性地动态的调整前面的逻辑计划,然后根据优化的逻辑执行计划重新选定最优的物理执行计划,从而调整运行时后续的执行计划。

AQE的改进方面

1 自动分区合并
AQE会自动检测过小的数据分区,并对它们自动合并
2 数据倾斜
处理不当则很容易出现OOM,这时候Spark会对数据集进行自动加盐,通过把Key打散来均衡数据在不同节点上的分布。
3 Join策略调整
当两个有序表要进行数据关联的时候,Spark SQL在优化过程中总会选择Sort Merge Join的实现方式。但有一种情况是,其中一个表在排序前需要对数据进行过滤,过滤后的表小到足可以由广播变量容纳。这个时候,Broadcast Join比Sort Merge Join的效率更高。但是,3.0版本之前的优化过程是静态的,做不到动态切换Join方式。
针对这种情况,AQE会根据运行时的统计数据,去动态地调整Join策略,把之前敲定的Sort Merge Join改为Broadcast Join,从而改善应用的执行性能。

参数调整

spark.sql.adaptive.enabled:是否启动Aqe

再比如,使用Parquet、ORC等文件格式,去坐享谓词下推带来的数据读取效率。

二 能省则省、能拖则拖

1 what:省的是数据量,拖的是shuffle操作

2 why:

数据量越少就需要越少的计算负载,越低的计算负载就会有更快的处理速度
shuffle操作执行的越晚,需要落盘和分发的数据就越少,更低的磁盘IO和网络开销就意味着更高的执行效率。

2 how

  1. 尽量把能节省数据扫描量和数据处理量的操作往前推;
  2. 尽力消灭掉Shuffle,省去数据落盘与分发的开销;
  3. 如果不能干掉Shuffle,尽可能地把涉及Shuffle的操作拖到最后去执行。 ‘

3 跳出单机思维

举一个例子
为了生成训练样本,我们需要对两张大表进行关联。根据“能省则省、能拖则拖”原则,我们想把其中一张表变小,把Shuffle Join转换为Broadcast Join,这样一来就可以把Shuffle的环节省掉了。
尽管两张表的尺寸都很大,但右表的Payload只有一列,其他列都是Join keys,所以只要我们把Join keys干掉,右表完全可以放到广播变量里。但是,直接干掉Join keys肯定不行,因为左右表数据关联是刚需。那么,我们能否换个方式把它们关联在一起呢?
受Hash Join工作原理的启发,我们想到可以把所有的Join keys拼接在一起,然后用哈希算法生成一个固定长度的字节序列,把它作为新的Join key。这样一来,右表中原始的Join keys就可以拿掉,右表的尺寸也可以大幅削减,小到可以放到广播变量里。同时,新的Join key还能保证左右表中数据的关联关系保持不变,一举两得。
为了对拼接的Join keys进行哈希运算,我们需要事先准备好各种哈希算法,然后再转换左、右表。接到这样的需求之后,同学小A立马在右表上调用了map算子,并且在map算子内通过实例化Util类获取哈希算法,最后在拼接的Join keys上进行哈希运算完成了转换。

4 归纳这件事的意义和价值。

我们之所以把各种开发技巧归纳为开发原则,一方面是遵循这些原则,你能在不知不觉中避开很多性能上的坑。但更重要的是,从这些原则出发,向外推演,我们往往能发现更多的开发技巧,从而能拓展自己的“常规操作”边界,做到举一反三,真正避免“调优思路枯竭”的窘境。

09-10 调优配置项速查

一 配置项的分类

在Spark分布式计算环境中,负责计算负载的主要是Execution,而driver主要负责分布式调度,调优空间有限

官网全量配置项的分类

  1. 硬件资源类 与CPU、内存、磁盘有关的资源项 平衡不同硬件资源的利用率
  2. Spark shuffle类
  3. Spark SQL类 无论是在Streaming、Mllib、Graph等子框架中,还是在PySpark中,只要你使用DataFrame API,Spark在运行时都会使用Spark SQL做统一优化。因此,我们需要梳理出一类配置项,去充分利用Spark SQL的先天性能优势。

二 硬件资源类调优配置项

1 哪些配置项与CPU设置有关

  1. 集群:spark.cores.max 集群范围内满配CPU核数
  2. Executor:spark.executor.cores 单个Executor内CPU核数
  3. 计算任务:spark.task.cores 单个任务消耗的CPU核数
  4. 并行度 并行度的出发点是数据,它明确了数据划分的粒度。 spark.default.parallelism 未指定分区数时的默认并行度 spark.sql.shuffle.partitions 数据关联、聚合操作中Reducer的并行度

2 哪些配置项与内存设置有关

  1. 内存的基础配置项 spark.executor.memory 单个executor内堆内内内存的大小 spark.memory.offHeap.size 单个executor内堆外内存的大小,只有在spark.memory.offHeap.enabled设置为true时才能生效 spark.memory.fraction 堆内内存中,执行内存和缓存的比例,1-userMemory所占的比例 spark.memory.storageFraction 堆内内存用于缓存RDD的内存占比 spark.rdd.compress RDD缓存是否压缩,默认不压缩
  2. 堆外与堆内的平衡堆内内存 对于需要处理的数据集,如果数据模式比较扁平,而且字段多是定长数据类型,就更多地使用堆外内存。相反地,如果数据模式很复杂,嵌套结构或变长字段很多,就更多采用JVM堆内内存会更加稳妥。User Memory与Spark可用内存如何分配? 当在JVM内平衡Spark可用内存和User Memory时,你需要考虑你的应用中类似的自定义数据结构多不多、占比大不大?然后再相应地调整两块内存区域的相对占比 举例子label EncodingExecution Memory该如何与Storage Memory平衡? 在打算把大面积的内存空间用于RDD cache之前,你需要衡量这么做可能会对执行效率产生的影响。 缓存密集型 兼顾RDD访问和应用的整体执行效率 首先,你可以放弃对象值的缓存方式,改用序列化的缓存方式,序列化会把多个对象转换成一个字节数组。这样,对象个数的问题就得到了初步缓解。 其次,我们可以调节spark.rdd.compress这个参数。RDD缓存默认是不压缩的,启用压缩之后,缓存的存储效率会大幅提升,有效节省缓存内存的占用,从而把更多的内存空间留给分布式任务执行。

3 哪些配置项与磁盘设置有关

spark.local.dir:这个参数允许开发者设置磁盘目录,该目录用于存储RDD cache落盘数据块和Shuffle中间文件。

三 Shuffle类配置项

  1. spark.shuffle.file.buffer:map输出端写入缓冲区大小,我们可以通过spark.shuffle.file.buffer来扩大写缓冲区的大小,缓冲区越大,能够缓存的落盘数据越多,Spark需要刷盘的次数就越少,I/O效率也就能得到整体的提升。
  2. spark.reducer.maxSizeInFlight: reduce输入端的读取缓冲区的大小:我们就可以通过spark.reducer.maxSizeInFlight配置项控制Reduce端缓冲区大小,来调节Shuffle过程中的网络负载。 在Reduce阶段,因为Spark会通过网络从不同节点的磁盘中拉取中间文件,它们又会以数据块的形式暂存到计算节点的读缓冲区(Read Buffer)。缓冲区越大,可以暂存的数据块越多,在数据总量不变的情况下,拉取数据所需的网络请求次数越少,单次请求的网络吞吐越高,网络I/O的效率也就越高。

四 Spark SQL大类配置项

1 开启AQE

AQE功能默认是禁用的,想要使用这些特性,我们需要先通过配置项spark.sql.adaptive.enabled来开启AQE

2 哪些配置项与自动分区合并有关?

AQE事先并不判断哪些分区足够小,而是按照分区编号进行扫描,当扫描量超过“目标尺寸”时,就合并一次。

  1. spark.sql.adaptive.advisoryPartitionSizeInBytes 由开发者指定分区合并后的推荐尺寸
  2. spark.sql.adaptive.coalescePartitions.minPartitionNum 分区合并后,数据值的分区数不能低于该值 这个参数的目的就是避免并行度过低导致CPU资源利用不充分。

3 哪些配置项与自动数据倾斜处理有关?

  1. spark.sql.adaptive.skewJoin.enabled
  2. spark.sql.adaptive.skewJoin.skewedPartitionFactor
  3. spark.sql.adative.skewJoin.skewedPartitionThresholdInBytes 首先,分区尺寸必须要大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes参数的设定值,才有可能被判定为倾斜分区。
  4. spark.sql.adaptive.advisoryPartitionSizeInBytes 以字节为单位,拆分倾斜分区的数据粒度

4 哪些配置项与Join策略调整有关?

  1. broadcast join Broadcast Join的精髓在于“以小博大”,它以广播的方式将小表的全量数据分发到集群中所有的Executors,大表的数据不需要以Join keys为基准去Shuffle,就可以与小表数据原地进行关联操作
  2. AQE动态Join策略调整可以在运行时将Shuffle Joins降级为Broadcast Join,同时,运行时的数据量估算要比编译时准确得多,因此相比静态优化会更可靠。不过,需要我们注意的是,Shuffle过后非空分区占比要小于nonEmptyPartitionRatioForBroadcastJoin才能触发Broadcast Join的降级优化。在这里插入图片描述

11 shuffle的工作原理,为什么shuffle是一时无两的性能杀手

一 用仙女散花的策略类比shuffle的map和reduce阶段

从老师分发花朵,到5个小同学把花朵按照颜色归类,对应的是Shuffle的Map阶段
也可以叫shuffle write
大家把归类的花朵分发到相应的课桌,这个过程类似于Shuffle的Reduce阶段。
也可以叫shuffle read

二 Map阶段是如何输出中间文件的?

1首先以结果为导向,先看一下map阶段输出是什么?

Map阶段最终生产的数据会以中间文件的形式物化到磁盘中,这些中间文件就存储在spark.local.dir设置的文件目录里。
中间文件包含两种类型
一类是后缀为data的数据文件,存储的内容是Map阶段生产的待分发数据;
另一类是后缀为index的索引文件,它记录的是数据文件中不同分区的偏移地址。
这里的分区是指Reduce阶段的分区,因此,分区数量与Reduce阶段的并行度保持一致。

2 Map是怎么输出文件的?

用groupByKey实现“仙女散花”

  1. 对于分片中的数据记录,逐一计算其目标分区,并将其填充到PartitionedPairBuffer; “PartitionedPairBuffer”,它本质上就是一种数组形式的缓存结构。
  2. PartitionedPairBuffer填满后,如果分片中还有未处理的数据记录,就对Buffer中的数据记录按(目标分区ID,Key)进行排序,将所有数据溢出到临时文件,同时清空缓存; Spark需要一种计算机制,来保障在数据总量超出可用内存的情况下,依然能够完成计算。这种机制就是:排序、溢出、归并。
  3. 重复步骤1、2,直到分片中所有的数据记录都被处理;
  4. 对所有临时文件和PartitionedPairBuffer归并排序,最终生成数据文件和索引文件。

3用reduceByKey实现“仙女散花”

在计算的过程中,reduceByKey采用一种叫做PartitionedAppendOnlyMap的数据结构来填充数据记录。
这个数据结构是一种Map,而Map的Value值是可累加、可更新的。
PartitionedAppendOnlyMap非常适合聚合类的计算场景,如计数、求和、均值计算、极值计算等等。
依靠高效的内存数据结构、更少的磁盘文件、更小的文件尺寸,我们就能大幅降低了Shuffle过程中的磁盘和网络开销。

4Reduce阶段是如何进行数据分发的?

Reduce Task通过网络拉取中间文件的过程,实际上就是不同Stages之间数据分发的过程。
Shuffle中数据分发的网络开销,会随着Map Task与Reduce Task的线性增长,呈指数级爆炸。m*n

三 性能杀手

1 对于Shuffle来说,它需要消耗所有的硬件资源

无论是PartitionedPairBuffer、PartitionedAppendOnlyMap这些内存数据结构,还是读写缓冲区,都会消耗宝贵的内存资源;
由于内存空间有限,因此溢出的临时文件会引入大量磁盘I/O,而且,Map阶段输出的中间文件也会消耗磁盘;
呈指数级增长的跨节点数据分发,带来的网络开销更是不容小觑。

2 Shuffle消耗的不同硬件资源之间很难达到平衡。

磁盘和网络的消耗是Shuffle中必需的环节。

12-13 广播变量

1 如何理解广播变量?

分发到task还是executor?
我们需要降低数据结构分发的频次。
广播变量是一种分发机制,它一次性封装目标数据结构,以Executors为粒度去做数据分发
在广播变量的运行机制下,封装成广播变量的数据,由Driver端以Executors为粒度分发,每一个Executors接收到广播变量之后,将其交给BlockManager管理。

由于广播变量携带的数据已经通过专门的途径存储到BlockManager中,因此分发到Executors的Task不需要再携带同样的数据。

1.1 广播普通变量
  1. 任务分发:以task为粒度,driver->task
  2. 创建广播变量:以executor为粒度,driver->executor
  3. 读取广播变量:executor的公共仓库blockmanager->task
1.2 广播分布式数据集

分布式数据集的数据源不在Driver端,而是来自所有的Executors,Executors中的每个分布式任务负责生产全量数据集的一部分,也就是图中不同的数据分区。
因此广播分布式数据集分两步走
步骤1就是Driver从所有的Executors拉取这些数据分区,然后在本地构建全量数据。
步骤2与从普通变量创建广播变量的过程类似。 Driver把汇总好的全量数据分发给各个Executors,Executors将接收到的全量数据缓存到存储系统的BlockManager中。

2 如何用广播变量克制Shuffle?

2.1 什么是 Shuffle Join

在不进行任何调优的情况下,Spark默认采用Shuffle Join的方式来做到这一点。Shuffle Join的过程主要有两步。

第一步就是对参与关联的左右表分别进行Shuffle,Shuffle的分区规则是先对Join keys计算哈希值,再把哈希值对分区数取模。由于左右表的分区数是一致的,因此Shuffle过后,一定能够保证userID相同的交易记录和用户数据坐落在同一个Executors内。
在这里插入图片描述

Shuffle完成之后,第二步就是在同一个Executors内,Reduce task就可以对userID一致的记录进行关联操作。但是,由于交易表是事实表,数据体量异常庞大,对TB级别的数据进行Shuffle,想想都觉得可怕!因此,上面对两个DataFrame直接关联的代码,还有很大的调优空间。我们该怎么做呢?话句话说,对于分布式环境中的数据关联来说,要想确保交易记录和与之对应的用户信息在同一个Executors中,我们有没有其他办法呢?

2.2. 克制Shuffle的方式

Driver从所有Executors收集userDF所属的所有数据分片,在本地汇总用户数据,然后给每一个Executors都发送一份全量数据的拷贝。既然每个Executors都有userDF的全量数据,这个时候,交易表的数据分区待在原地、保持不动,就可以轻松地关联到一致的用户数据。如此一来,我们不需要对数据体量巨大的交易表进行Shuffle,同样可以在分布式环境中,完成两张表的数据关联。
在这里插入图片描述

我们在做数据关联的时候,把Shuffle Joins转换为Broadcast Joins,就可以用小表广播来代替大表的全网分发,真正做到克制Shuffle。
利用广播变量,我们成功地避免了海量数据在集群内的存储、分发,节省了原本由Shuffle引入的磁盘和网络开销,大幅提升运行时执行性能。当然,采用广播变量优化也是有成本的,毕竟广播变量的创建和分发,也是会带来网络开销的。但是,相比大表的全网分发,小表的网络开销几乎可以忽略不计。

3 如何让Spark SQL选择Broadcast Joins?

利用配置项强制广播

spark.sql.autoBroadcastJoinThreshold
对于参与Join的两张表来说,任意一张表的尺寸小于设置的大小,Spark就在运行时采用Broadcast Joins的实现方式去做数据关联。
使用广播阈值配置项让Spark优先选择Broadcast Joins的关键,就是要确保至少有一张表的存储尺寸小于广播阈值。
第一步,把要预估大小的数据表缓存到内存,比如直接在DataFrame或是Dataset上调用cache方法;第二步,读取Spark SQL执行计划的统计数据。

在这里插入图片描述

利用API强制广播

既然数据量的预估这么麻烦,有没有什么办法,不需要配置广播阈值,就可以让Spark SQL选择Broadcast Joins?还真有,而且办法还不止一种。

开发者可以通过Join Hints或是SQL functions中的broadcast函数,来强制Spark SQL在运行时采用Broadcast Joins的方式做数据关联。下面我就来分别讲一讲它们的含义和作用,以及该如何使用。必须要说明的是,这两种方式是等价的,并无优劣之分,只不过有了多样化的选择之后,你就可以根据自己的偏好和习惯来灵活地进行开发。

用Join Hints强制广播
Join Hints中的Hints表示“提示”,它指的是在开发过程中使用特殊的语法,明确告知Spark SQL在运行时采用哪种Join策略。一旦你启用了Join Hints,不管你的数据表是不是满足广播阈值,Spark SQL都会尽可能地尊重你的意愿和选择,使用Broadcast Joins去完成数据关联。

我们来举个例子,假设有两张表,一张表的内存大小在100GB量级,另一张小一些,2GB左右。在广播阈值被设置为2GB的情况下,并没有触发Broadcast Joins,但我们又不想花费时间和精力去精确计算小表的内存占用到底是多大。在这种情况下,我们就可以用Join Hints来帮我们做优化,仅仅几句提示就可以帮我们达到目的。

val table1: DataFrame = spark.read.parquet(path1)
val table2: DataFrame = spark.read.parquet(path2)
table1.createOrReplaceTempView("t1")
table2.createOrReplaceTempView("t2")
 
val query: String = “select/*+ broadcast(t2) */*from t1 innerjoin t2 on t1.key= t2.key”
val queryResutls: DataFrame = spark.sql(query)

你看,在上面的代码示例中,只要在SQL结构化查询语句里面加上一句/*+ broadcast(t2) */提示,我们就可以强制Spark SQL对小表t2进行广播,在运行时选择Broadcast Joins的实现方式。提示语句中的关键字,除了使用broadcast外,我们还可以用broadcastjoin或者mapjoin,它们实现的效果都一样。

如果你不喜欢用SQL结构化查询语句,尤其不想频繁地在Spark SQL上下文中注册数据表,你也可以在DataFrame的DSL语法中使用Join Hints。

table1.join(table2.hint(“broadcast”), Seq(“key”), “inner”)

在上面的DSL语句中,我们只要在table2上调用hint方法,然后指定broadcast关键字,就可以同样达到强制广播表2的效果。

总之,Join Hints让开发者可以灵活地选择运行时的Join策略,对于熟悉业务、了解数据的同学来说,Join Hints允许开发者把专家经验凌驾于Spark SQL的优化引擎之上,更好地服务业务。

不过,Join Hints也有个小缺陷。如果关键字拼写错误,Spark SQL在运行时并不会显示地抛出异常,而是默默地忽略掉拼写错误的hints,假装它压根不存在。因此,在使用Join Hints的时候,需要我们在编译时自行确认Debug和纠错。

用broadcast函数强制广播
如果你不想等到运行时才发现问题,想让编译器帮你检查类似的拼写错误,那么你可以使用强制广播的第二种方式:broadcast函数。这个函数是类库org.apache.spark.sql.functions中的broadcast函数。调用方式非常简单,比Join Hints还要方便,只需要用broadcast函数封装需要广播的数据表即可,如下所示。

import org.apache.spark.sql.functions.broadcast
table1.join(broadcast(table2), Seq(“key”), “inner”)

你可能会问:“既然开发者可以通过Join Hints和broadcast函数强制Spark SQL选择Broadcast Joins,那我是不是就可以不用理会广播阈值的配置项了?”其实还真不是。我认为,以广播阈值配置为主,以强制广播为辅,往往是不错的选择。

广播阈值的设置,更多的是把选择权交给Spark SQL,尤其是在AQE的机制下,动态Join策略调整需要这样的设置在运行时做出选择。强制广播更多的是开发者以专家经验去指导Spark SQL该如何选择运行时策略。二者相辅相成,并不冲突,开发者灵活地运用就能平衡Spark SQL优化策略与专家经验在应用中的比例。

广播变量不是银弹
首先,从性能上来讲,Driver在创建广播变量的过程中,需要拉取分布式数据集所有的数据分片。
其次,从功能上来讲,并不是所有的Joins类型都可以转换为Broadcast Joins。

14 如何高效利用CPU

性能调优的最终目的,是在所有参与计算的硬件资源之间寻求协同与平衡,让硬件资源达到一种平衡、无瓶颈的状态。对于CPU来说,最需要协同和平衡的硬件资源非内存莫属。原因主要有两方面:一方面,在处理延迟方面,只有内存能望其项背;另一方面,在主板上内存通过数据总线直接向CPU寄存器供给数据。因此,理顺它们之间的关系,可以为性能调优奠定更好的基础。

1 CPU与内存的平衡本质上是什么?

Spark中CPU与内存的平衡,其实就是CPU与执行内存之间的协同与配比。
要想平衡CPU与执行内存之间的协同和配比,我们需要使用3类配置参数,它们分别控制着并行度、执行内存大小和集群的并行计算能力。
否则CPU与执行内存之间的平衡就会被打破,要么CPU工作不饱和,要么OOM内存溢出
任务并发过程中多个线程抢占内存资源时需要遵循的基本逻辑。
执行内存抢占规则就是,在同一个Executor中,当有多个(记为N)线程尝试抢占执行内存时,需要遵循2条基本原则:
执行内存总大小(记为M)为两部分之和,一部分是Execution Memory初始大小,另一部分是Storage Memory剩余空间
每个线程分到的可用内存有一定的上下限,下限是M/N/2,上限是M/N,也就是均值

2 三足鼎立:并行度、并发度与执行内存

2.1 3类配置项

并行
并行度指的是为了实现分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。
并行度可以通过两个参数来设置,分别是spark.default.parallelism和spark.sql.shuffle.partitions。
并发度
同一时间内,一个Executor内部可以同时运行的最大任务数量。

  1. Executor的线程池大小由参数spark.executor.cores决定
  2. 每个任务在执行期间需要消耗的线程数由spark.task.cpus配置项给定
  3. 两者相除得到的商就是并发度

执行内存
堆内执行内存的初始值
spark.executor.memory * spark.memory.fraction * (1 - spark.memory.storageFraction)
堆外执行内存
spark.memory.offHeap.size * (1 - spark.memory.storageFraction)
在统一内存管理模式下,在Storage Memory没有被RDD缓存占满的情况下,执行任务可以动态地抢占Storage Memory
可分配的执行内存总量会随着缓存任务和执行任务的此消彼长,而动态变化。但无论怎么变,可用的执行内存总量,都不会低于配置项设定的初始值。

2.2 CPU低效原因之一:线程挂起

在给定执行内存总量M和线程总数N的情况下,为了保证每个线程都有机会拿到适量的内存去处理数据,Spark用HashMap数据结构,以(Key,Value)的方式来记录每个线程消耗的内存大小,并确保所有的Value值都不超过M/N。在一些极端情况下,有些线程申请不到所需的内存空间,能拿到的内存合计还不到M/N/2。这个时候,Spark就会把线程挂起,直到其他线程释放了足够的内存空间为止。
如果分布式数据集的并行度设置得当,因任务调度滞后而导致的线程挂起问题就会得到缓解。

2.2 CPU低效原因之二:调度开销

数据过于分散会带来严重的副作用:调度开销骤增。
当数据过于分散,分布式任务数量会大幅增加,但每个任务需要处理的数据量却少之又少,就CPU消耗来说,相比花在数据处理上的比例,任务调度上的开销几乎与之分庭抗礼。
如何优化CPU利用率?
首先,在一个Executor中,每个CPU线程能够申请到的内存比例是有上下限的,最高不超过1/N,最低不少于1/N/2,其中N代表线程池大小。
其次,在给定线程池大小和执行内存的时候,并行度较低、数据分片较大容易导致CPU线程挂起,线程频繁挂起不利于提升CPU利用率,而并行度过高、数据过于分散会让调度开销更显著,也利于提升CPU利用率。
最后,在给定执行内存M、线程池大小N和数据总量D的时候,想要有效地提升CPU利用率,我们就要计算出最佳并行度P,计算方法是让数据分片的平均大小D/P坐落在(M/N/2, M/N)区间。这样,在运行时,我们的CPU利用率往往不会太差。
在这里插入图片描述

15-17 内存视角

1 从一个实例开始

在Label Encoding的业务场景中,我们需要对用户兴趣特征做Encoding。依据模板中兴趣字符串及其索引位置,我们的任务是把千亿条样本中的用户兴趣转换为对应的索引值。模板文件的内容示例如下所示。

//模板文件//用户兴趣
体育-篮球-NBA-湖人
军事-武器-步枪-AK47
/**
输入参数:模板文件路径,用户兴趣字符串
返回值:用户兴趣字符串对应的索引值
*///函数定义
val findIndex: (String)=>(String)=>Int= {
(filePath)=>
val source = Source.fromFile(filePath,"UTF-8")
val lines= source.getLines().toArray
source.close()
val searchMap =lines.zip(0 until lines.size).toMap
(interest)=> searchMap.getOrElse(interest,-1)
}
val partFunc = findIndex(filePath)//Dataset中的函数调用
partFunc("体育-篮球-NBA-湖人")

下面,咱们先一起回顾一下代码实现思路,再来分析它目前存在的性能隐患,最后去探讨优化它的方法。

首先,findIndex函数的主体逻辑比较简单,就是读取模板文件和构建Map映射,以及查找用户兴趣并返回索引。不过,findIndex函数被定义成了高阶函数。这样一来,当以模板文件为实参调用这个高阶函数的时候,我们会得到一个内置了Map查找字典的标量函数partFunc,最后在千亿样本上调用partFunc完成数据转换。利用高阶函数,我们就避免了让Executor中的每一个Task去读取模板文件,以及从头构建Map字典这种执行低效的做法。

在运行时,这个函数在Driver端会被封装到一个又一个的Task中去,随后Driver把这些Task分发到Executor,Executor接收到任务之后,交由线程池去执行(调度系统的内容可以回顾第5讲)。这个时候,每个Task就像是一架架小飞机,携带着代码“乘客”和数据“行李”,从Driver飞往Executor。Task小飞机在Executor机场着陆之后,代码“乘客”乘坐出租车或是机场大巴,去往JVM stack;数据“行李”则由专人堆放在JVM Heap,也就是我们常说的堆内内存。

回顾Label encoding中的findIndex函数不难发现,其中大部分都是代码“乘客”,唯一的数据“行李”是名为searchMap的Map字典。像这样用户自定义的数据结构,消耗的内存区域就是堆内内存的User Memory(Spark对内存区域的划分内容可以回顾一下第7讲)。

2 User Memory性能隐患

回顾到这里,你觉得findIndex函数有没有性能隐患呢?你可以先自己思考一下,有了答案之后再来看我下面的分析。

答案当然是“有”。首先,每架小飞机都携带这么一份数据“大件行李”,自然需要消耗更多的“燃油”,这里的“燃油”指的是Task分发过程中带来的网络开销。其次,因为每架小飞机着陆之后,都会在Executor的“旅客行李专区”User Memory寄存上这份同样的数据“行李”,所以,User Memory需要确保有足够的空间可以寄存所有旅客的行李,也就是大量的重复数据。

那么,User Memory到底需要准备出多大的内存空间才行呢?我们不妨来算一算。这样的计算并不难,只需要用飞机架次乘以行李大小就可以了。

用户自定义的数据结构往往是用于辅助函数完成计算任务的,所以函数执行完毕之后,它携带的数据结构的生命周期也就告一段落。因此,在Task的数量统计上,我们不必在意一个Executor总共需要处理多少个Task,只需要关注它在同一时间可以并行处理的Task数量,也就是Executor的线程池大小即可。

我们说过,Executor线程池大小由spark.executor.cores和spark.task.cpus这两个参数的商(spark.executor.cores/spark.task.cpus)决定,我们暂且把这个商记作#threads。

接下来是估算数据“行李”大小,由于searchMap并不是分布式数据集,因此我们不必采用先Cache,再提取Spark执行计划统计信息的方式。对于这样的Java数据结构,我们完全可以在REPL中,通过Java的常规方法估算数据存储大小,估算得到的searchMap大小记为#size。

好啦!现在,我们可以算出,User Memory至少需要提供#threads * #size这么大的内存空间,才能支持分布式任务完成计算。但是,对于User Memory内存区域来说,使用#threads * #size的空间去重复存储同样的数据,本身就是降低了内存的利用率。那么,我们该怎么省掉#threads * #size的内存消耗呢?

2.2.1 性能调优

学习过广播变量之后,想必你头脑中已经有了思路。没错,咱们可以尝试使用广播变量,来对示例中的代码进行优化。

仔细观察findIndex函数,我们不难发现,函数的核心计算逻辑有两点。一是读取模板文件、创建Map映射字典;二是以给定字符串对字典进行查找,并返回查找结果。显然,千亿样本转换的核心需求是其中的第二个环节。既然如此,我们完全可以把创建好的Map字典封装成广播变量,然后分发到各个Executors中去。

有了广播变量的帮忙,凡是发往同一个Executor的Task小飞机,都无需亲自携带数据“行李”,这些大件行李会由“联邦广播快递公司”派货机专门发往各个Executors,Driver和每个Executors之间,都有一班这样的货运专线。思路说完了,优化后的代码如下所示。

/**
广播变量实现方式
*///定义广播变量
val source = Source.fromFile(filePath,"UTF-8")
val lines= source.getLines().toArray
source.close()
val searchMap =lines.zip(0 until lines.size).toMap
val bcSearchMap = sparkSession.sparkContext.broadcast(searchMap)//在Dataset中访问广播变量
bcSearchMap.value.getOrElse("体育-篮球-NBA-湖人",-1)

上面代码的实现思路很简单:第一步还是读取模板文件、创建Map字典;第二步,把Map字典封装为广播变量。这样一来,在对千亿样本进行转换时,我们直接通过bcSearchMap.value读取广播变量内容,然后,通过调用Map字典的getOrElse方法来获取用户兴趣对应的索引值。

相比最开始的第一种实现方式,第二种实现方式的代码改动还是比较小的,那这一版代码对内存的消耗情况有什么改进呢?

我们发现,Task小飞机的代码“乘客”换人了!小飞机之前需要携带函数findIndex,现在则换成了一位“匿名的乘客”:一个读取广播变量并调用其getOrElse方法的匿名函数。由于这位“匿名的乘客”将大件行李托运给了“联邦广播快递公司”的专用货机,因此,Task小飞机着陆后,没有任何“行李”需要寄存到User Memory。换句话说,优化后的版本不会对User Memory内存区域进行占用,所以第一种实现方式中#threads * #size的内存消耗就可以省掉了。

2.2.2 Storage Memory规划

这样一来,原来的内存消耗转嫁到了广播变量身上。但是,广播变量也会消耗内存,这会不会带来新的性能隐患呢?那我们就来看看,广播变量消耗的具体是哪块内存区域。

回顾存储系统那一讲,我们说过,Spark存储系统主要有3个服务对象,分别是Shuffle中间文件、RDD缓存和广播变量。它们都由Executor上的BlockManager进行管理,对于数据在内存和磁盘中的存储,BlockManager利用MemoryStore和DiskStore进行抽象和封装。

那么,广播变量所携带的数据内容会物化到MemoryStore中去,以Executor为粒度为所有Task提供唯一的一份数据拷贝MemoryStore产生的内存占用会被记入到Storage Memory的账上。因此,广播变量消耗的就是Storage Memory内存区域。

接下来,我们再来盘算一下,第二种实现方式究竟需要耗费多少内存空间。由于广播变量的分发和存储以Executor为粒度,因此每个Executor消耗的内存空间,就是searchMap一份数据拷贝的大小。searchMap的大小我们刚刚计算过就是#size。

明确了Storage Memory内存区域的具体消耗之后,我们自然可以根据公式:(spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction去有针对性地调节相关的内存配置项。

2.2.3 内存规划两步走

现在,咱们在两份不同的代码实现下,分别定量分析了不同内存区域的消耗与占用。对于这些消耗做到心中有数,我们自然就能够相应地去调整相关的配置项参数。基于这样的思路,想要最大化内存利用率,我们需要遵循两个步骤:

  1. 预估内存占用
  2. 调整内存配置项

我们以堆内内存为例,来讲一讲内存规划的两步走具体该如何操作。我们都知道,堆内内存划分为Reserved Memory、User Memory、Storage Memory和Execution Memory这4个区域。预留内存固定为300MB,不用理会,其他3个区域需要你去规划。

预估内存占用
首先,我们来说内存占用的预估,主要分为三步。

第一步,计算User Memory的内存消耗。我们先汇总应用中包含的自定义数据结构,并估算这些对象的总大小#size,然后用#size乘以Executor的线程池大小,即可得到User Memory区域的内存消耗#User。

第二步,计算Storage Memory的内存消耗。我们先汇总应用中涉及的广播变量和分布式数据集缓存,分别估算这两类对象的总大小,分别记为#bc、#cache。另外,我们把集群中的Executors总数记作#E。这样,每个Executor中Storage Memory区域的内存消耗的公式就是:#Storage = #bc + #cache / #E。

第三步,计算执行内存的消耗。学习上一讲,我们知道执行内存的消耗与多个因素有关。第一个因素是Executor线程池大小#threads,第二个因素是数据分片大小,而数据分片大小取决于数据集尺寸#dataset和并行度#N。因此,每个Executor中执行内存的消耗的计算公式为:#Execution = #threads * #dataset / #N。

调整内存配置项

得到这3个内存区域的预估大小#User、#Storage、#Execution之后,调整相关的内存配置项就是一件水到渠成的事情(由公式(spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction)可知),这里我们也可以分为3步。

首先,根据定义,spark.memory.fraction可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)计算得到。

同理,spark.memory.storageFraction的数值应该参考(#Storage)/(#Storage + #Execution)。

最后,对于Executor堆内内存总大小spark.executor.memory的设置,我们自然要参考4个内存区域的总消耗,也就是300MB + #User + #Storage + #Execution。不过,我们要注意,利用这个公式计算的前提是,不同内存区域的占比与不同类型的数据消耗一致。

总的来说,在内存规划的两步走中,第一步预估不同区域的内存占比尤为关键,因为第二步中参数的调整完全取决于第一步的预估结果。如果你按照这两个步骤去设置相关的内存配置项,相信你的应用在运行时就能够充分利用不同的内存区域,避免出现因参数设置不当而导致的内存浪费现象,从而在整体上提升内存利用率。

小结
合理划分Spark所有的内存区域,是同时提升CPU与内存利用率的基础。因此,掌握内存规划很重要,在今天这一讲,我们把内存规划归纳为两步走。
第一步是预估内存占用。
求出User Memory区域的内存消耗,公式为:#User=#size乘以Executor线程池的大小。
求出每个Executor中Storage Memory区域的内存消耗,公式为:#Storage = #bc + #cache / #E。
求出执行内存区域的内存消耗,公式为:#Execution = #threads * #dataset / #N。
第二步是调整内存配置项:根据公式得到的3个内存区域的预估大小#User、#Storage、#Execution,去调整(spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction公式中涉及的所有配置项。
spark.memory.fraction可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)计算得到。
spark.memory.storageFraction的数值应该参考(#Storage)/(#Storage + #Execution)。
spark.executor.memory的设置,可以通过公式300MB + #User + #Storage + #Execution得到。
这里,我还想多说几句,内存规划两步走终归只是手段,它最终要达到的效果和目的,是确保不同内存区域的占比与不同类型的数据消耗保持一致,从而实现内存利用率的最大化。

3 如何有效避免cache滥用

3.1 Cache的工作原理

缓存的存储级别
存储介质:内存还是磁盘,或是两者都有。
存储形式:对象值还是序列化的字节数组,带SER字样的表示以序列化方式存储,不带SER则表示采用对象值。
副本数量:存储级别名字最后的数字代表拷贝数量,没有数字默认为1份副本。
在这里插入图片描述
最常用的只有两个:MEMORY_ONLY和MEMORY_AND_DISK,它们分别是RDD缓存和DataFrame缓存的默认存储级别

3.2 缓存的计算过程

这两种存储级别都是先尝试把数据缓存到内存在这里插入图片描述

无论是RDD还是DataFrame,它们的数据分片都是以迭代器Iterator的形式存储的。

  1. 要把数据缓存下来,我们先得把迭代器展开成实实在在的数据值,这一步叫做Unroll
  2. 展开的对象值暂时存储在一个叫做ValuesHolder的数据结构里,然后转换为MemoryEntry。转换的实现方式是toArray,因此它不产生额外的内存开销,这一步转换叫做Transfer
  3. MemoryEntry和与之对应的BlockID,以Key、Value的形式存储到哈希字典(LinkedHashMap)中

3.3 缓存的销毁过程

在这里插入图片描述很多情况下,应用中数据缓存的需求会超过Storage Memory区域的空间供给。虽然缓存任务可以抢占Execution Memory区域的空间,但“出来混,迟早是要还的”,随着执行任务的推进,缓存任务抢占的内存空间还是要“吐”出来。这个时候,Spark就要执行缓存的销毁过程。

你不妨把Storage Memory想象成一家火爆的网红餐厅,待缓存的数据分片是一位又一位等待就餐的顾客。当需求大于供给,顾客数量远超餐位数量的时候,Spark自然要制定一些规则,来合理地“驱逐”那些尸位素餐的顾客,把位置腾出来及时服务那些排队等餐的人。

那么问题来了,Spark基于什么规则“驱逐”顾客呢?接下来,我就以同时缓存多个分布式数据集的情况为例,带你去分析一下在内存受限的情况下会发生什么。

我们用一张图来演示这个过程,假设MemoryStore中存有4个RDD/Data Frame的缓存数据,这4个分布式数据集各自缓存了一些数据分片之后,Storage Memory区域就被占满了。当RDD1尝试把第6个分片缓存到MemoryStore时,却发现内存不足,塞不进去了。

这种情况下,Spark就会逐一清除一些“尸位素餐”的MemoryEntry来释放内存,从而获取更多的可用空间来存储新的数据分片。这个过程叫做Eviction,它的中文翻译还是蛮形象的,就叫做驱逐,也就是把MemoryStore中那些倒霉的MemoryEntry驱逐出内存。

回到刚才的问题,Spark是根据什么规则选中的这些倒霉蛋呢?这个规则叫作LRU(Least Recently Used),基于这个算法,最近访问频率最低的那个家伙就是倒霉蛋。因为LRU是比较基础的数据结构算法,笔试、面试的时候经常会考,所以它的概念我就不多说了。

我们要知道的是,Spark是如何实现LRU的。这里,Spark使用了一个巧妙的数据结构:LinkedHashMap,这种数据结构天然地支持LRU算法。

LinkedHashMap使用两个数据结构来维护数据,一个是传统的HashMap,另一个是双向链表。HashMap的用途在于快速访问,根据指定的BlockId,HashMap以O(1)的效率返回MemoryEntry。双向链表则不同,它主要用于维护元素(也就是BlockId和MemoryEntry键值对)的访问顺序。凡是被访问过的元素,无论是插入、读取还是更新都会被放置到链表的尾部。因此,链表头部保存的刚好都是“最近最少访问”的元素

如此一来,当内存不足需要驱逐缓存的数据块时,Spark只利用LinkedHashMap就可以做到按照“最近最少访问”的原则,去依次驱逐缓存中的数据分片了。

除此之外,在存储系统那一讲,有同学问MemoryStore为什么使用LinkedHashMap,而不用普通的Map来存储BlockId和MemoryEntry的键值对。我刚才说的就是答案了。

回到图中的例子,当RDD1试图缓存第6个数据分片,但可用内存空间不足时,Spark 会对LinkedHashMap从头至尾扫描,边扫描边记录MemoryEntry大小,当倒霉蛋的总大小超过第6个数据分片时,Spark停止扫描。

有意思的是,倒霉蛋的选取规则遵循“兔子不吃窝边草”,同属一个RDD的MemoryEntry不会被选中。就像图中的步骤4展示的一样,第一个蓝色的MemoryEntry会被跳过,紧随其后打叉的两个MemoryEntry被选中。

因此,总结下来,在清除缓存的过程中,Spark遵循两个基本原则:

LRU:按照元素的访问顺序,优先清除那些“最近最少访问”的BlockId、MemoryEntry键值对
兔子不吃窝边草:在清除的过程中,同属一个RDD的MemoryEntry拥有“赦免权”

3.4 退化为MapReduce

Cache的用武之地
2条基本原则:如果RDD/DataFrame/Dataset在应用中的引用次数为1,就坚决不使用Cache,如果引用次数大于1,且运行成本占比超过30%,应当考虑启用Cache

运行成本占比 指的是计算某个分布式数据集所消耗的总时间与作业执行时间的比值。

3.5 Cache的注意事项

.cache是惰性操作,因此在调用.cache之后,需要先用Action算子触发缓存的物化过程。
只有count才会触发缓存的完全物化
first、take和show这3个算子只会把涉及的数据物化
用.unpersist来清理弃用的缓存数据,它是.cache的逆操作
异步模式:调用unpersist()或是unpersist(False)
同步模式:调用unpersist(True)

4 OOM问题

4.1 哪里会发生OOM?

发生OOM的LOC(Line Of Code),也就是代码位置在哪?
OOM发生在Driver端,还是在Executor端?
如果是发生在Executor端,OOM到底发生在哪一片内存区域?

4.2 Driver端的OOM

Spark在Driver端的计算任务
创建小规模的数据集合:使用parallelize、createDataFrame等API创建数据集
收集计算结果:通过take、show、collect等算子把结果收集到Driver端
Driver端的两类OOM问题
创建的数据集超过内存上限
收集的结果集超过内存上限
广播变量,collect等原因

4.3 Executor端的OOM

执行内存分为4个区域:Reserved Memory、User Memory、Storage Memory和Execution Memory
在Executors中,与任务执行有关的内存区域才存在OOM的隐患。
User Memory的OOM
spark.executor.memory * ( 1 - spark.memory.fraction)
Execution Memory的OOM
数据量并不是决定OOM与否的关键因素,数据分布与Execution Memory的运行时规划是否匹配才是。
一旦分布式任务的内存请求超出1/N这个上限,Execution Memory就会出现OOM问题。
它不仅仅与内存空间大小、数据分布有关,还与Executor线程池和运行时任务调度有关。

两个常见的实例:

  1. 数据膨胀 表象:磁盘中的数据加载到JVM中会膨胀,数据请求超出1/N上限 调优思路 把数据打散,提高数据分片数量、降低数据粒度,让膨胀之后的数据量降到100MB左右 加大内存配置,结合Executor线程池调整,提高1/N上限到300MB
  2. 数据倾斜 消除数据倾斜,让所有的数据分片尺寸都不大于100MB 调整Executor线程池、内存、并行度等相关配置,提高1/N上限到300MB 维持并发度、并行度不变,增大执行内存设置,提高1/N上限到300MB 维持并发度、执行内存不变,使用相关配置项来提升并行度将数据打散,让所有的数据分片尺寸都缩小到100MB以内

18 磁盘视角

1 磁盘在功能上的作用

溢出临时文件
存储shuffle中间文件
缓存分布式数据集
也就是说,凡是带DISK字样的存储模式,都会把内存中放不下的数据缓存到磁盘。
性能上的价值
磁盘复用,它指的是Shuffle Write阶段产生的中间文件被多次计算重复利用的过程
失败重试中的磁盘复用
磁盘复用的收益之一就是缩短失败重试的路径,在保障作业稳定性的同时提升执行性能。
ReuseExchange机制下的磁盘复用
相同或是相似的物理计划可以共享Shuffle计算的中间结果
reuseExchange的触发条件
多个查询所依赖的分区规则要与Shuffle中间数据的分区规则保持一致
多个查询所涉及的字段(Attributes)要保持一致

19 网络视角:如何降低网络开销?

在平衡不同硬件资源的时候,相比CPU、内存、磁盘,网络开销无疑是最拖后腿的那一个,这一点在处理延迟上表现得非常明显。
在这里插入图片描述

下图就是不同硬件资源的处理延迟对比结果,我们可以看到最小的处理单位是纳秒。你可能对纳秒没什么概念,所以为了方便对比,我把纳秒等比放大到秒。这样,其他硬件资源的处理延迟也会跟着放大。最后一对比我们会发现,网络延迟是以天为单位的!

因此,要想维持硬件资源之间的平衡,尽可能地降低网络开销是我们在性能调优中必须要做的。今天这一讲,我就按照数据进入系统的时间顺序,也就是数据读取、数据处理和数据传输的顺序,带你去分析和总结数据生命周期的不同阶段有效降低网络开销的方法。

1 数据读写

Spark的数据存储格式和数据存储系统五花八门
在这里插入图片描述
但是不管是什么存储系统,也不管是什么存储格式,数据传输的网络开销都是取决于任务与数据的本地性关系,也就是任务的本地性级别

1.1.1 任务的本地性级别

PROCESS_LOCAL:任务与数据同在一个JVM进程中,内存计算
NODE_LOCAL:任务与数据同在一个计算节点,数据可能在磁盘上或是另一个JVM进程中,会产生磁盘IO开销
RACK_LOCAL:任务与数据不在同一节点,但在同一个物理机架上,会产生同机架网络开销
ANY:任务与数据是跨机架、甚至是跨DC(Data Center,数据中心)的关系,会产生跨机架网络开销
根据定义我们很容易判断出,不同本地性级别下的计算任务是否会引入磁盘或网络开销,结果如下表所示。从表格中我们不难发现,从PROCESS_LOCAL到ANY,数据访问效率是逐级变差的。在读取数据源阶段,数据还未加载到内存,任务没有办法调度到PROCESS_LOCAL级别。因此,这个阶段我们能够调度的最佳级别是NODE_LOCAL。
在这里插入图片描述
根据NODE_LOCAL的定义,在这个级别下,调度的目标节点至少在磁盘上存有Spark计算任务所需的数据分片。这也就意味着,在集群部署上,Spark集群与外部存储系统在物理上是紧紧耦合在一起的。相反,如果Spark集群与存储集群在物理上是分开的,那么任务的本地性级别只能退化到RACK_LOCAL,甚至是ANY,来通过网络获取所需的数据分片。

因此,对于Spark加HDFS和Spark加MongoDB来说,是否会引入网络开销完全取决于它们的部署模式。物理上紧耦合,在NODE_LOCAL级别下,Spark用磁盘I/O替代网络开销获取数据;物理上分离,网络开销就无法避免。

除此之外,物理上的隔离与否同样会影响数据的写入效率。当数据处理完毕,需要将处理结果落盘到外部存储的时候,紧耦合模式下的数据写入会把数据分片落盘到本地节点,避免网络开销。

值得一提的是,在企业的私有化DC中更容易定制化集群的部署方式,大家通常采用紧耦合的方式来提升数据访问效率。但是在公有云环境中,计算集群在物理上往往和存储系统隔离,因此数据源的读取只能走网络。

通过上面的分析,对于数据读写占比较高的业务场景,我们就可以通过在集群的部署模式上做规划,从而在最开始部署Spark集群的时候就提前做好准备。

2 数据处理

数据读取完成后,就进入数据处理环节了。那在数据处理的过程中,都有哪些技巧能够帮助减少网络开销呢?

2.1 能省则省

说起数据处理中的网络开销,我猜你最先想到的操作就是Shuffle。Shuffle作为大多数计算场景的“性能瓶颈担当”,确实是网络开销的罪魁祸首。根据“能省则省”的开发原则,我们自然要想尽办法去避免Shuffle。在数据关联的场景中,省去Shuffle最好的办法,就是把Shuffle Joins转化为Broadcast Joins。关于这方面的调优技巧,我们在广播变量那几讲有过详细的讲解,你可以翻回去看一看。尽管广播变量的创建过程也会引入网络传输,但是,两害相权取其轻,相比Shuffle的网络开销,广播变量的开销算是小巫见大巫了。

遵循“能省则省”的原则,把Shuffle消除掉自然是最好的。如果实在没法避免Shuffle,我们要尽可能地在计算中多使用Map端聚合,去减少需要在网络中分发的数据量。这方面的典型做法就是用reduceByKey、aggregateByKey替换groupByKey,不过在RDD API使用频率越来越低的当下,这个调优技巧实际上早就名存实亡了。但是,Map端聚合的思想并不过时。为什么这么说呢?下面,我通过一个小例子来你详细讲一讲。

在绝大多数2C(To Consumer)的业务场景中,我们都需要刻画用户画像。我们的小例子就是“用户画像”中的一环,:给定用户表,按照用户群组统计兴趣列表,要求兴趣列表内容唯一,也就是不存在重复的兴趣项,用户表的Schema如下表所示。
在这里插入图片描述
要获取群组兴趣列表,我们应该先按照groupId分组,收集群组内所有用户的兴趣列表,然后再把列表中的兴趣项展平,最后去重得到内容唯一的兴趣列表。应该说思路还是蛮简单的,我们先来看第一版实现代码。

val filePath: String = _
val df = spark.read.parquent(filePath)
df.groupBy(“groupId”).agg(array_distinct(flatten(collect_list(col(“interestList”)))))

这版实现分别用collect_list、flatten和array_distinct,来做兴趣列表的收集、展平和去重操作,它完全符合业务逻辑。不过,见到“收集”类的操作,比如groupByKey,以及这里的collect_list,我们应该本能地提高警惕。因为这类操作会把最细粒度的全量数据在全网分发。相比其他算子,这类算子引入的网络开销最大。

那我们是不是可以把它们提前到Map端,从而减少Shuffle中需要分发的数据量呢?当然可以。比如,对于案例中的收集操作,我们可以在刚开始收集兴趣列表的时候就在Map端做一次去重,然后去查找DataFrame开发API,看看有没有与collect_list对应的Map端聚合算子。

因此,在数据处理环节,我们要遵循“能省则省”的开发原则,主动削减计算过程中的网络开销。对于数据关联场景,我们要尽可能地把Shuffle Joins转化为Broadcast Joins来消除Shuffle。如果确实没法避免Shuffle,我们可以在计算中多使用Map端聚合,减少需要在网络中分发的数据量。

除了Shuffle之外,还有个操作也会让数据在网络中分发,这个操作很隐蔽,我们经常注意不到它,它就是多副本的RDD缓存

比如说,在实时流处理这样的场景下,对于系统的高可用性,应用的要求比较高,因此你可能会用“_2”甚至是“_3”的存储模式,在内存和磁盘中缓存多份数据拷贝。当数据副本数大于1的时候,本地数据分片就会通过网络被拷贝到其他节点,从而产生网络开销。虽然这看上去只是存储模式字符串的一个微小改动,但在运行时,它会带来很多意想不到的开销。因此,如果你的应用对高可用特性没有严格要求,我建议你尽量不要滥用多副本的RDD缓存,

多副本是为了实现系统的高可用性
可用性的计算=系统的平均无故障时长/(系统的平均无故障时长+平均故障修复时间)
当数据副本数大于1的时候,本地数据分片就会通过网络被拷贝到其他节点,从而产生网络开销。

3 数据传输

3. 1序列化

在落盘或是在网络传输之前,数据都是需要先进行序列化的
SPARK提供的两种序列化方式
Java Serializer和Kyro Serializer
通常来说,Kryo Serializer相比Java serializer,在处理效率和存储效率两个方面都会胜出数倍。
kyro序列话方式
对于一些自定义的数据结构来说,如果你没有明确把这些类型向Kryo Serializer注册的话,虽然它依然会帮你做序列化的工作,但它序列化的每一条数据记录都会带一个类名字,这个类名字是通过反射机制得到的,会非常长。在上亿的样本中,存储开销自然相当可观。

3.2 配置项

在这里插入图片描述

20 RDD和DataFrame:既生瑜,何生亮?

1 RDD之痛:优化空间受限

在这里插入图片描述

表格中高亮显示的就是RDD转换和聚合算子,它们都是高阶函数。高阶函数指的是形参包含函数的函数,或是返回结果包含函数的函数。为了叙述方便,我们把那些本身是高阶函数的RDD算子,简称“高阶算子”。

对于这些高阶算子,开发者需要以Lambda函数的形式自行提供具体的计算逻辑。以map为例,我们需要明确对哪些字段做映射,以什么规则映射。再以filter为例,我们需要指明以什么条件在哪些字段上过滤。

但这样一来,Spark只知道开发者要做map、filter,但并不知道开发者打算怎么做map和filter。也就是说,在RDD的开发模式下,Spark Core只知道“做什么”,而不知道“怎么做”。这会让Spark Core两眼一抹黑,除了把Lambda函数用闭包的形式打发到Executors以外,实在是没有什么额外的优化空间。

对于Spark Core来说,优化空间受限最主要的影响,莫过于让应用的执行性能变得低下。一个典型的例子,就是相比Java或者Scala,PySpark实现的应用在执行性能上相差悬殊。原因在于,在RDD的开发模式下,即便是同一个应用,不同语言实现的版本在运行时也会有着天壤之别。

在这里插入图片描述

当我们使用Java或者Scala语言做开发时,所有的计算都在JVM进程内完成,如图中左侧的Spark计算节点所示。

而当我们在PySpark上做开发的时候,只能把由RDD算子构成的计算代码,一股脑地发送给Python进程。Python进程负责执行具体的脚本代码,完成计算之后,再把结果返回给Executor进程。由于每一个Task都需要一个Python进程,如果RDD的并行度为#N,那么整个集群就需要#N个这样的Python进程与Executors交互。不难发现,其中的任务调度、数据计算和数据通信等开销,正是PySpark性能低下的罪魁祸首。

2 DataFrame应运而生

2.1 DataFrame和RDD之间的区别

首先,用一句话来概括,DataFrame就是携带数据模式(Data Schema)的结构化分布式数据集,而RDD是不带Schema的分布式数据集。因此,从数据表示(Data Representation)的角度来看,是否携带Schema是它们唯一的区别。带Schema的数据表示形式决定了DataFrame只能封装结构化数据,而RDD则没有这个限制,所以除了结构化数据,它还可以封装半结构化和非结构化数据。

其次,从开发API上看,RDD算子多是高阶函数,这些算子允许开发者灵活地实现业务逻辑,表达能力极强。

DataFrame的表达能力却很弱。一来,它定义了一套DSL(Domain Specific Language)算子,如select、filter、agg、groupBy等等。由于DSL语言是为解决某一类任务而专门设计的计算机语言,非图灵完备,因此,语言表达能力非常有限。二来,DataFrame中的绝大多数算子都是标量函数(Scalar Functions),它们的形参往往是结构化的数据列(Columns),表达能力也很弱。

你可能会问:“相比RDD,DataFrame的表示和表达能力都变弱了,那它是怎么解决RDD优化空间受限的核心痛点呢?”

当然,仅凭DataFrame在API上的改动就想解决RDD的核心痛点,比登天还难。DataFrame API最大的意义在于,它为Spark引擎的内核优化打开了全新的空间。

首先,DataFrame中Schema所携带的类型信息,让Spark可以根据明确的字段类型设计定制化的数据结构,从而大幅提升数据的存储和访问效率。其次,DataFrame中标量算子确定的计算逻辑,让Spark可以基于启发式的规则和策略,甚至是动态的运行时信息去优化DataFrame的计算过程。

3 Spark SQL智能大脑

那么问题来了,有了DataFrame API,负责引擎内核优化的那个幕后英雄是谁?为了支持DataFrame开发模式,Spark从1.3版本开始推出Spark SQL。Spark SQL的核心组件有二,其一是Catalyst优化器,其二是Tungsten。关于Catalyst和Tungsten的特性和优化过程,我们在后面的两讲再去展开,今天这一讲,咱们专注在它们和DataFrame的关系。

3.1 Catalyst:执行过程优化

我们先来说说Catalyst的优化过程。当开发者通过Actions算子触发DataFrame的计算请求时,Spark内部会发生一系列有趣的事情。

首先,基于DataFrame确切的计算逻辑,Spark会使用第三方的SQL解析器ANTLR来生成抽象语法树(AST,Abstract Syntax Tree)。既然是树,就会有节点和边这两个基本的构成元素。节点记录的是标量算子(如select、filter)的处理逻辑,边携带的是数据信息:关系表和数据列,如下图所示。这样的语法树描述了从源数据到DataFrame结果数据的转换过程。
在这里插入图片描述
在Spark中,语法树还有个别名叫做“Unresolved Logical Plan”。它正是Catalyst优化过程的起点。之所以取名“Unresolved”,是因为边上记录的关系表和数据列仅仅是一些字符串,还没有和实际数据对应起来。举个例子,Filter之后的那条边,输出的数据列是joinKey和payLoad。这些字符串的来源是DataFrame的DSL查询,Catalyst并不确定这些字段名是不是有效的,更不知道每个字段都是什么类型。

因此,Catalyst做的第一步优化,就是结合DataFrame的Schema信息,确认计划中的表名、字段名、字段类型与实际数据是否一致。这个过程也叫做把“Unresolved Logical Plan”转换成“Analyzed Logical Plan”。
在这里插入图片描述
基于解析过后的“Analyzed Logical Plan”,Catalyst才能继续做优化。利用启发式的规则和执行策略,Catalyst最终把逻辑计划转换为可执行的物理计划。总之,Catalyst的优化空间来源DataFrame的开发模式。

3.2 Tungsten:数据结构优化

说完Catalyst,我接着再来说说Tungsten。在开发原则那一讲,我们提到过Tungsten使用定制化的数据结构Unsafe Row来存储数据,Unsafe Row的优点是存储效率高、GC效率高。Tungsten之所以能够设计这样的数据结构,仰仗的也是DataFrame携带的Schema。Unsafe Row我们之前讲过,这里我再带你简单回顾一下。在这里插入图片描述
Tungsten是用二进制字节序列来存储每一条用户数据的,因此在存储效率上完胜Java Object。比如说,如果我们要存储上表中的数据,用Java Object来存储会消耗100个字节数,而使用Tungsten仅需要不到20个字节,如下图所示。
在这里插入图片描述
但是,要想实现上图中的二进制序列,Tungsten必须要知道数据条目的Schema才行。也就是说,它需要知道每一个字段的数据类型,才能决定在什么位置安放定长字段、安插Offset,以及存放变长字段的数据值。DataFrame刚好能满足这个前提条件。

我们不妨想象一下,如果数据是用RDD封装的,Tungsten还有可能做到这一点吗?当然不可能。这是因为,虽然RDD也带类型,如RDD[Int]、RDD[(Int, String)],但如果RDD中携带的是开发者自定义的数据类型,如RDD[User]或是RDD[Product],Tungsten就会两眼一抹黑,完全不知道你的User和Product抽象到底是什么。成也萧何、败也萧何,RDD的通用性是一柄双刃剑,在提供开发灵活性的同时,也让引擎内核的优化变得无比困难。

总的来说,基于DataFrame简单的标量算子和明确的Schema定义,借助Catalyst优化器和Tungsten,Spark SQL有能力在运行时构建起一套端到端的优化机制。这套机制运用启发式的规则与策略,以及运行时的执行信息,将原本次优、甚至是低效的查询计划转换为高效的执行计划,从而提升端到端的执行性能。因此,在DataFrame的开发框架下,不论你使用哪种开发语言,开发者都能共享Spark SQL带来的性能福利.
在这里插入图片描述
最后,我们再来回顾最开始提到的面试题:“从2.0版本至今,Spark对于其他子框架的完善与优化,相比Spark SQL占比很低。这是否意味着,Spark未来的发展重心是数据分析,其他场景如机器学习、流计算会逐渐边缘化吗?”

最初,Spark SQL确实仅仅是运行SQL和DataFrame应用的子框架,但随着优化机制的日趋完善,Spark SQL逐渐取代Spark Core,演进为新一代的引擎内核。到目前为止,所有子框架的源码实现都已从RDD切换到DataFrame。因此,和PySpark一样,像Streaming、Graph、Mllib这些子框架实际上都是通过DataFrame API运行在Spark SQL之上,它们自然可以共享Spark SQL引入的种种优化机制。

形象地说,Spark SQL就像是Spark的智能大脑,凡是通过DataFrame这双“眼睛”看到的问题,都会经由智能大脑这个指挥中心,统筹地进行分析与优化,优化得到的行动指令,最终再交由Executors这些“四肢”去执行。

21 Catalyst逻辑计划:你的SQL语句是怎么被优化的

逻辑计划解析
Catalyst把“Unresolved Logical Plan”转换为“Analyzed Logical Plan”
确认表名、字段名、字段类型与实际数据是否一致
逻辑计划优化
Catalyst基于一些既定的启发式规则(Heuristics Based Rules),把“Analyzed Logical Plan”转换为“Optimized Logical Plan”。
catalyst优化器可以划分到三个范畴
减少需要扫描和处理的数据量,降低后续计算的负载
谓词下推(Predicate Pushdown)
“谓词”指代的是像用户表上“age < 30”这样的过滤条件,“下推”指代的是把这些谓词沿着执行计划向下,推到离数据源最近的地方,从而在源头就减少数据扫描量。
对于Parquet、ORC这类存储格式,结合文件注脚(Footer)中的统计信息,下推的谓词能够大幅减少数据扫描量,降低磁盘I/O开销。
列剪裁(Column Pruning)
列剪裁就是扫描数据源的时候,只读取那些与查询相关的字段。
常量替换 (Constant Folding)
Catalys的优化过程
自顶向下实行transformDown
从“Analyzed Logical Plan”到“Optimized Logical Plan”的转换,就是从一个TreeNode生成另一个TreeNode的过程。
Cache Manager优化

22 Catalyst物理执行计划:你的SQL语句是怎么被优化的?

逻辑优化的每一步仅仅是从逻辑上表明Spark SQL需要“做什么”,并没有从执行层面说明具体该“怎么做”。
具体怎么做:物理执行计划
为了让查询计划(Query Plan)变得可操作、可执行,Catalyst的物理优化阶段(Physical Planning)可以分为两个环节
优化Spark Plan
在优化Spark Plan的过程中,Catalyst基于既定的优化策略(Strategies),把逻辑计划中的关系操作符一一映射成物理操作符,生成Spark Plan。
生成Physical Plan
优化Spark Plan
所有优化策略在转换方式上都大同小异,都是使用基于模式匹配的偏函数(Partial Functions),把逻辑计划中的操作符平行映射为Spark Plan中的物理算子。
Catalyst都有哪些Join策略?
5种join策略
Broadcast Hash Join(BHJ)
Shuffle Sort Merge Join(SMJ)
Shuffle Hash Join(SHJ)
Broadcast Nested Loop Join(BNLJ)
Shuffle Cartesian Product Join(CPJ)
两种数据分发方式
broadcast
shuffle
三种join实现机制
hash join
sort merge join
nested loop join
JoinSelection 如何选择join策略
条件型信息
Join类型,也就是是否等值、连接形式等,这种信息的来源是查询语句本身
内表尺寸,这些信息的来源就比较广泛了,可以是Hive表之上的ANALYZE TABLE语句,也可以是Spark对于Parquet、ORC、CSV等源文件的尺寸预估,甚至是来自AQE的动态统计信息
指令型信息:join hints
生成Physical Plan
Preparation Rules有6组规则,这些规则作用到Spark Plan之上就会生成Physical Plan,而Physical Plan最终会由Tungsten转化为用于计算RDD的分布式任务。
6种 preparation rules
EnsureRequirements
含义:确保每个操作符的输入要求,必要时添加shuffle、sort等操作
作用:为physical plan补充必要的操作步骤,保证spark plan中的每个步骤能够顺利进行
参考小Q计算过程中EnsureRequirements在project后面添加的Shuffle和sort计算
collapseCodegenStages
含义:tungsten的优化机制-全阶段代码生成,whole stage code generation
作用:在同一个stage内部,尽可能将所有操作和计算步骤捏合成一个函数,提成计算效率
ReuseExchange
含义:内存或磁盘中的存储复用
作用:同一个执行计划可以共享广播变量或者shuffle的中间结果,避免重复计算
ReuseSubquery
含义:自查询复用
作用:复用同样的查询结果,避免重复计算
Plansubquery
含义:生成自查询
作用:对子查询使用preparation rules
ExtractPythonUDFs
含义:提取Python的UDF函数
作用:把Python UDF分发到单独的Python进程

23 Tungsten:钨丝计划

Tungsten又叫钨丝计划,它围绕内核引擎主要有两方面的优化
数据结构设计
全阶段代码生成WSCG:whole stage code generation
Tungsten在数据结构方面的设计
Unsafe Row:二进制数据结构
使用JVM传统的对象方式来存储Schema具有的缺点
首先,存储开销大。
其次,在JVM堆内内存中,对象数越多垃圾回收效率越低。
UnsafeRow
字节数组的存储方式消除存储开销
使用一个数组对象就能够实现对一条数据的封装,显著降低GC压力
基于内存页的内存管理
tungsten地址分为两个部分
前64位预留给Java Object
后64位是偏移地址Offset
对于On Heap空间的Tungsten地址
前64位存储的是JVM堆内对象的引用或者说指针,后64位Offset存储的是数据在该对象内的偏移地址
Off Heap空间
在堆外的空间中,由于Spark是通过Java Unsafe API直接管理操作系统内存,不存在内存对象的概念
前64位存储的是null值,后64位则用于在堆外空间中直接寻址操作系统的内存空间
对比Java标准库(java.util.HashMap)和Tungsten模式下的HashMap
存储开销
首先,Tungsten放弃了链表的实现方式,使用数组加内存页的方式来实现HashMap。数组中存储的元素是Hash code和Tungsten内存地址,也就是Object引用外加Offset的128位地址。Tungsten HashMap使用128位地址来寻址数据元素,相比java.util.HashMap大量的链表指针,在存储开销上更低。
GC效率
其次,Tungsten HashMap的存储单元是内存页,内存页本质上是Java Object,一个内存页可以存储多个数据条目。因此,相比标准库中的HashMap,使用内存页大幅缩减了存储所需的对象数量。比如说,我们需要存储一百万条数据记录,标准库的HashMap至少需要三百万的JVM对象才能存下,而Tungsten HashMap可能只需要几个或是十几个内存页就能存下。对比下来,它们所需的JVM对象数量可以说是天壤之别,显然,Tungsten的实现方式对于GC更加友好。
CPU cache利用率
再者,内存页本质上是JVM对象,其内部使用连续空间来存储数据,内存页加偏移量可以精准地定位到每一个数据元素。因此,在需要扫描HashMap全量数据的时候,得益于内存页中连续存储的方式,内存的访问方式从原来的随机访问变成了顺序读取(Sequential Access)。顺序内存访问会大幅提升CPU cache利用率,减少CPU中断,显著提升CPU利用率。
如何理解WSCG?
内存计算的第二层含义了,它指的是在同一个Stage内部,把多个RDD的compute函数捏合成一个,然后把这一个函数一次性地作用在输入数据上。
WSCG指的是基于同一Stage内操作符之间的调用关系,生成一份“手写代码”,真正把所有计算融合为一个统一的函数。
什么是火山迭代模型?
语法树当中的每个操作符都需要完成如下步骤
从内存中读取父操作符的输出结果作为输入数据
调用hasNext、next方法,以操作符逻辑处理数据,如过滤、投影、聚合等等
将处理后的结果以统一的标准形式输出到内存,供下游算子消费
“手写代码”解决了VI计算模型的两个核心痛点:操作符之间频繁的虚函数调用,以及操作符之间数据交换引入的内存随机访问。手写代码中的每一条指令都是明确的,可以顺序加载到CPU寄存器,源数据也可以顺序地加载到CPU的各级缓存中,因此,CPU的缓存命中率和工作效率都会得到大幅提升。
WSCG是如何在运行时动态生成代码的?
本质上,WSCG机制的工作过程就是基于一份“性能较差的代码”,在运行时动态地(On The Fly)重构出一份“性能更好的代码”。
手写代码的生成过程分为两个步骤
从父节点到子节点,递归调用doProduce,生成代码框架
从子节点到父节点,递归调用doConsume,向框架填充每一个操作符的运算逻辑
以select count(user_id) from citizens where city = 'beijing’为例
首先,在Stage顶端节点也就是Project之上,添加WholeStageCodeGen节点。WholeStageCodeGen节点通过调用doExecute来触发整个代码生成过程的计算。doExecute会递归调用子节点的doProduce函数,直到遇到Shuffle Boundary为止。这里,Shuffle Boundary指的是Shuffle边界,要么是数据源,要么是上一个Stage的输出。在叶子节点(也就是Scan)调用的doProduce函数会先把手写代码的框架生成出来,如图中右侧蓝色部分的代码。
while (table.hasNext()) {internalRow row table.next();blabla}
然后,Scan中的doProduce会反向递归调用每个父节点的doConsume函数。不同操作符在执行doConsume函数的过程中,会把关系表达式转化成Java代码,然后把这份代码像做“完形填空”一样,嵌入到刚刚的代码框架里。比如图中橘黄色的doConsume生成的if语句,其中包含了判断地区是否为北京的条件,以及紫色的doConsume生成了获取必需字段userId的Java代码。
if row.getString(2) == ‘beijing’ {int user_id =row.getString(0);rowWriter.write(0,ser_id; };
ret = rowWriter.getRow();
就这样,Tungsten利用CollapseCodegenStages规则,经过两层递归调用把Catalyst输出的Spark Plan加工成了一份“手写代码”,并把这份手写代码会交付给DAGScheduler。
拿到代码之后,DAGScheduler再去协调自己的两个小弟TaskScheduler和SchedulerBackend,完成分布式任务调度。

24-26 Spark3.0的三个新特性:自适应查询执行(AQE)、动态分区剪裁(DPP)和扩展的 Join Hints

AQE的三个特性怎样才能用好?

1 Spark为什么需要AQE?

启发式的优化又叫RBO(Rule Based Optimization,基于规则的优化)
谓词下推等经验主义的优化策略
CBO(Cost Based Optimization,基于成本的优化)
CBO的特点是“实事求是”,基于数据表的统计信息(如表大小、数据列分布)来选择优化策略。
缺点:窄/慢/静
窄指的是适用面太窄,CBO仅支持注册到Hive Metastore的数据表,但在大量的应用场景中,数据源往往是存储在分布式文件系统的各类文件,如Parquet、ORC、CSV等等。

慢指的是统计信息的搜集效率比较低。对于注册到Hive Metastore的数据表,开发者需要调用ANALYZE TABLE COMPUTE STATISTICS语句收集统计信息,而各类信息的收集会消耗大量时间。
静指的是静态优化,这一点与RBO一样。CBO结合各类统计信息制定执行计划,一旦执行计划交付运行,CBO的使命就算完成了。换句话说,如果在运行时数据分布发生动态变化,CBO先前制定的执行计划并不会跟着调整、适配。

2 AQE自定义查询执行

AQE到底是什么?
AQE是Spark SQL的一种动态优化机制,在运行时,每当Shuffle Map阶段执行完毕,AQE都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。
AQE优化机制触发的时机是Shuffle Map阶段执行完毕。
也就是说,AQE优化的频次与执行计划中Shuffle的次数一致。
AQE的规则以及执行策略
首先,AQE赖以优化的统计信息与CBO不同,这些统计信息并不是关于某张表或是哪个列,而是Shuffle Map阶段输出的中间文件。
其次,结合Spark SQL端到端优化流程图我们可以看到,AQE从运行时获取统计信息,在条件允许的情况下,优化决策会分别作用到逻辑计划和物理计划。
1个逻辑优化规则和3个物理优化策略
join策略调整
自动分区合并
自动倾斜处理

3 如何用好AQE

AQE三大特性

  1. Join策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从Shuffle Sort Merge Join降级(Demote)为执行效率更高的Broadcast Hash Join。
  2. 自动分区合并:在Shuffle过后,Reduce Task数据分布参差不齐,AQE将自动合并过小的数据分区。
  3. 自动倾斜处理:结合配置项,AQE自动拆分Reduce阶段过大的数据分区,降低单个Reduce Task的工作负载。
Join策略调整

DemoteBroadcastHashJoin
DemoteBroadcastHashJoin规则的作用,是把Shuffle Joins降级为Broadcast Joins。需要注意的是,这个规则仅适用于Shuffle Sort Merge Join这种关联机制,其他机制如Shuffle Hash Join、Shuffle Nested Loop Join都不支持。
shuffle map阶段完成后,会对map中间文件进行两条判断
中间文件尺寸总和小于广播阈值spark.sql.autoBroadcastJoinThreshold
空文件占比小于配置项spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin

OptimizeLocalShuffleReader
采取OptimizeLocalShuffleReader策略可以省去Shuffle常规步骤中的网络分发,Reduce Task可以就地读取本地节点(Local)的中间文件,完成与广播小表的关联操作。
OptimizeLocalShuffleReader物理策略的生效与否由一个配置项决定。这个配置项是spark.sql.adaptive.localShuffleReader.enabled

自动分区合并

在Reduce阶段,当Reduce Task从全网把数据分片拉回,AQE按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起。
目标分区尺寸参数
spark.sql.adaptive.advisoryPartitionSizeInBytes,由开发者指定分区合并后的推荐尺寸。
spark.sql.adaptive.coalescePartitions.minPartitionNum,分区合并后,分区数不能低于该值。
会写到物理执行计划中

自动倾斜处理

在Reduce阶段,当Reduce Task所需处理的分区尺寸大于一定阈值时,利用OptimizeSkewedJoin策略,AQE会把大分区拆成多个小分区。
倾斜分区和拆分粒度的配置项
spark.sql.adaptive.skewJoin.skewedPartitionFactor,判定倾斜的膨胀系数
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,判定倾斜的最低阈值
spark.sql.adaptive.advisoryPartitionSizeInBytes,以字节为单位,定义拆分粒度
缺点:*只能够改善单个executor中的数据倾斜,AQE的自动倾斜处理机制只能以Task为粒度来平衡工作负载
总的来说,当应用场景中的数据倾斜比较简单,比如虽然有倾斜但数据分布相对均匀,或是关联计算中只有一边倾斜,我们完全可以依赖AQE的自动倾斜处理机制。但是,当我们的场景中数据倾斜变得复杂,
*比如数据中不同Key的分布悬殊,或是参与关联的两表都存在大量的倾斜,我们就需要衡量AQE的自动化机制与手工处理倾斜之间的利害得失

4 DPP该怎么用

分区剪裁

如果过滤谓词中包含分区键,那么Spark SQL对分区表做扫描的时候,是完全可以跳过(剪掉)不满足谓词条件的分区目录,这就是分区剪裁
DPP指的是在数据关联的场景中,Spark SQL利用维度表提供的过滤信息,减少事实表中数据的扫描量、降低I/O开销,从而提升执行性能。

动态分区剪裁

使用DPP加速事实表的读取和访问的三个条件
事实表必须是分区表,而且分区字段(可以是多个)必须包含Join Key。
DPP仅支持等值Joins,不支持大于、小于这种不等值关联关系。
维度表过滤之后的数据集要小于广播阈值,开发者要注意调整配置项spark.sql.autoBroadcastJoinThreshold
使用广播变量封装过滤之后的维度表数据
在维度表做完过滤之后,Spark SQL在其上构建哈希表(Hash Table),这个哈希表的Key就是用于关联的Join Key
实现的两个作用
第一个作用就是给事实表用来做分区剪裁,如图中的步骤1所示,哈希表中的Key Set刚好可以用来给事实表过滤符合条件的数据分区。
第二个作用就是参与后续的Broadcast Join数据关联,如图中的步骤2所示。这里的哈希表,本质上就是Hash Join中的Build Table,其中的Key、Value,记录着数据关联中所需的所有字段,如users.id、users.name,刚好拿来和事实表做Broadcast Hash Join。

Join Hints 指南:不同场景下,如何选择join策略?

Join的实现方式详解

NLJ的工作原理:嵌套循环
对于外表中的每一条数据记录,内层的for循环会逐条扫描内表的所有记录,依次判断记录的Join Key是否满足关联条件
NLJ算法的计算复杂度是O(M * N)
SMJ的工作原理:先排序、再归并
SMJ刚开始工作的时候,内外表的游标都会先锚定在两张表的第一条记录上,然后再对比游标所在记录的Join Key。
外表Join Key等于内表Join Key,满足关联条件,把两边的数据记录拼接并输出,然后把外表的游标滑动到下一条记录
外表Join Key小于内表Join Key,不满足关联条件,把外表的游标滑动到下一条记录
外表Join Key大于内表Join Key,不满足关联条件,把内表的游标滑动到下一条记录
SMJ算法的计算复杂度为O(M + N)
HJ的工作原理:把内表扫描的计算复杂度降低至O(1)
Build阶段和Probe阶段
在Build阶段,基于内表,算法使用既定的哈希函数构建哈希表。哈希表中的Key是Join Key应用(Apply)哈希函数之后的哈希值,表中的Value同时包含了原始的Join Key和Payload。
在Probe阶段,算法遍历每一条数据记录,先是使用同样的哈希函数,以动态的方式(On The Fly)计算Join Key的哈希值。然后,用计算得到的哈希值去查询刚刚在Build阶段创建好的哈希表。如果查询失败,说明该条记录与维度表中的数据不存在关联关系;如果查询成功,则继续对比两边的Join Key。如果Join Key一致,就把两边的记录进行拼接并输出,从而完成数据关联。

分布式环境下的Join

分布式环境中的数据关联在计算环节依然遵循着NLJ、SMJ和HJ这3种实现方式,只不过是增加了网络分发这一变数。
在Spark的分布式计算环境中,数据在网络中的分发主要有两种方式,分别是Shuffle和广播
如果采用Shuffle的分发方式来完成数据关联,那么外表和内表都需要按照Join Key在集群中做全量的数据分发。因为只有这样,两个数据表中Join Key相同的数据记录才能分配到同一个Executor进程,从而完成关联计算,如下图所示。
如果采用广播机制的话,情况会大有不同。在这种情况下,Spark只需要把内表(基表)封装到广播变量,然后在全网进行分发。由于广播变量中包含了内表的全量数据,因此体量较大的外表只要“待在原地、保持不动”,就能轻松地完成关联计算。

Spark如何选择Join策略?

网络分发和计算开销
等值Join下,Spark如何选择Join策略?
在等值数据关联中,Spark会尝试按照BHJ > SMJ > SHJ的顺序依次选择Join策略
BHJ:一是连接类型不能是全连接(Full Outer Join);二是基表要足够小,可以放到广播变量里面去。
SHJ:首先,外表大小至少是内表的3倍。其次,内表数据分片的平均大小要小于广播变量阈值。
SMJ没有这么多的附加条件,无论是单表排序,还是两表做归并关联,都可以借助磁盘来完成
spark.sql.join.preferSortMergeJoin
不等值Join下,Spark如何选择Join策略?
由于不等值Join只能使用NLJ来实现,因此Spark SQL可选的Join策略只剩下BNLJ和CPJ

27 大表Join小表:广播变量容不下小表怎么办?

案例1:Join Key远大于Payload

  1. 基于现有的Join Keys去生成一个全新的数据列,它可以叫“Hash Key”。生成的方法分两步:
    把所有Join Keys拼接在一起,把性别、年龄、一直到小时拼接成一个字符串,如图中步骤1、3所示
    使用哈希算法(如MD5或SHA256)对拼接后的字符串做哈希运算,得到哈希值即为“Hash Key”,如上图步骤2、4所示
  2. 消除哈希冲突隐患的方法其实很多,比如“二次哈希”,也就是我们用两种哈希算法来生成Hash Key数据列
    Join Keys远大于Payload的数据关联,我们可以使用映射方法(如哈希运算),用较短的字符串来替换超长的Join Keys,从而大幅缩减小表的存储空间。如果缩减后的小表,足以放进广播变量,我们就可以将SMJ转换为BHJ,=来消除繁重的Shuffle计算。需要注意的是,映射方法要能够有效地避免“映射冲突”的问题,避免出现不同的Join Keys被映射成同一个数值。
    案例2:过滤条件的Selectivity较高
    对于两张表都远超广播阈值的关联场景来说,如果我们不做任何调优的,Spark就会选择SMJ策略计算。
    对于案例中的这种星型关联,我们还可以利用DPP机制来减少事实表的扫描量,进一步减少I/O开销、提升性能。
    如果小表携带过滤条件,且过滤条件的选择性很高,我们可以通过开启AQE的Join策略调整特性,在运行时把SMJ转换为BHJ,从而大幅提升执行性能。
    案例3:小表数据分布均匀
    当参与Join的两张表尺寸相差悬殊且小表数据分布均匀的时候,SHJ往往比SMJ的执行效率更高
    使用Join Hints来强制Spark SQL去选择SHJ策略进行关联计算
    如果小表不带过滤条件,且尺寸远超广播阈值。如果小表本身的数据分布比较均匀,我们可以考虑使用Join hints强行要求Spark SQL在运行时选择SHJ关联策略。一般来说,在“大表Join小表”的场景中,相比SMJ,SHJ的执行效率会更好一些。背后的原因在于,小表构建哈希表的开销,要小于两张表排序的开销。

28-29 大表join大表

1 “分而治之”的调优思路

1.1 如何理解“分而治之”

“分而治之”中一个关键的环节就是内表拆分,我们要求每一个子表的尺寸相对均匀,且都小到可以放进广播变量
拆分的关键在于拆分列的选取
如何保证内表拆分的粒度足够细?
为了兼顾执行性能与开发效率,拆分列的基数要足够大,这样才能让子表小到足以放进广播变量,但同时,拆分列的基数也不宜过大,否则实现“分而治之”的开发成本就会陡然上升。通常来说,日期列往往是个不错的选择。

1.2 如何避免外表的重复扫描?

第一种是将外表全量缓存到内存,不过这种方法对于内存空间的要求较高,不具备普适性。
第二种假设外表的分区键包含Join Keys,那么,每一个内表子表都可以通过DPP机制,帮助与之关联的外表减少数据扫描量。
“分而治之”调优思路实战
“负隅顽抗”的调优思路
数据分布均匀
SHJ hint:/*+ shuffle_hash(orders) */
使用SHJ的条件
两张表数据分布均匀。
内表所有数据分片,能够完全放入内存。
先根据并发度与执行内存,计算出可供每个Task消耗的内存上下限,然后结合分布式数据集尺寸与上下限,倒推出与之匹配的并行度。

1.3 数据倾斜

数据倾斜的三种情况

  1. 内表倾斜
  2. 外表倾斜
  3. 双表倾斜

以task为粒度解决数据倾斜
AQE会检测数据倾斜,将倾斜分区拆分为多个数据分区。同时,AQE会复制内表的分区数据,保护两表之间的数据关联

配置项参数
spark.sql.adaptive.skewJoin.skewedPartitionFactor,判定倾斜的膨胀系数
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,判定倾斜的最低阈值
spark.sql.adaptive.advisoryPartitionSizeInBytes,以字节为单位定义拆分粒度
以executor为粒度解决数据倾斜
分而治之:对于内外表中两组不同的数据,我们分别采用不同的方法做关联计算,然后通过Union操作,再把两个关联计算的结果集做合并,最终得到“大表Join大表”的计算结果
“两阶段Shuffle”: 在不破坏原有关联关系的前提下,在集群范围内以Executors为粒度平衡计算负载
“加盐、Shuffle、关联、聚合”: 外表的处理称作“随机加盐”,具体的操作方法是,对于任意一个倾斜的Join Key,我们都给它加上1到#N之间的一个随机后缀。一般将N设置为executor的总数比较合适。
内表的处理称为“复制加盐”,具体的操作方法是,对于任意一个倾斜的Join Key,我们都把原数据复制(#N – 1)份,从而得到#N份数据副本。对于每一份副本,我们为其Join Key追加1到#N之间的固定后缀,让它与打散后的外表数据保持一致。
内外表分别加盐之后,数据倾斜问题就被消除了。这个时候,我们就可以使用常规优化方法,比如,将Shuffle Sort Merge Join转化为Shuffle Hash Join,去继续执行Shuffle、关联和聚合操作。到此为止,“两阶段Shuffle” 的第一阶段执行完毕,我们得到了初步的聚合结果,这些结果是以打散的Join Keys为粒度进行计算得到的。
“去盐化、Shuffle、聚合”
首先,我们把每一个Join Key的后缀去掉,这一步叫做“去盐化”。
然后,我们按照原来的Join Key再做一遍Shuffle和聚合计算,这一步计算得到的结果,就是“分而治之”当中倾斜部分的计算结果。
以Executors为粒度的调优实战

1.4 执行性能与开发成本的博弈

我们要明确的是,分而治之外加两阶段Shuffle的调优技巧的初衷,是为了解决AQE无法以Executors为粒度平衡计算负载的问题。因此,这项技巧取舍的关键就在于,Executors之间的负载倾斜是否构成整个关联计算的性能瓶颈。


本文转载自: https://blog.csdn.net/ALX3li/article/details/130915605
版权归原作者 冬至喵喵 所有, 如有侵权,请联系我们删除。

“Spark原理及调优”的评论:

还没有评论