本地跑SPARK任务一定要关了VPN,不然无法分配地址,会一致报错random IP。
spark为什么比hadoop快?
- 使用了内存缓存技术(并未在内存中持久化,shuffle时要写入磁盘)
- spark task启动时间快。spark启用fork线程方法,hadoop采用创建新进程(主要是MapReduce创建多进程)。
- spark只有在shuffle时将数据写入磁盘,hadoop多个mr作业间都要依赖磁盘交互
- spark的缓存机制比hdfs更高效。
sparkSession:从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext来实现对数据的加载、转换、处理等工作,并且实现了SQLcontext和HiveContext的所有功能。
我们在新版本中并不需要之前那么繁琐的创建很多对象,只需要创建一个SparkSession对象即可。
SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成SQLContext自身中的表。
用户群:互联网大厂,字节几乎所有产品线、百度基于spark推出的bigsql、美团地图
应用场景:海量批处理、实时流计算、图计算、数据分析、机器学习
sparksession是sparksql的入口
核心概念
HDFS:存储数据的集群,主从结构。
结构:1master(namenode,NN)和多个slave(DateNode,DN)
NN:
- 负责客户端请求的响应
- 负责元数据(文件的名称,副本系数,Block存放的DN)的管理
DN:
- 存储用户文件对应的block
- 定期向NN发送心跳信息,汇报本身及其所有的block的信息
client:
- 文件切分,文件上传HDFS时,client将文件切分为一个一个block
- 与datanode(DN)交互,获取文件的存储位置
- 与datanode(DN)交互,读写数据
- client会提供一些命令,管理HDFS,比如格式化namenode
- client会提供一些命令访问hdfs,比如增删改查
MapReduce:计算
hdfs文件的物理存储是分块(block),可以设置,默认大小在 Hadoop2.x/3.x 是128M(134217728),1.x 版本中是 64M。
- HDFS 的块设置太小,会增加寻址时间,程序一直在找块的开始位置;
- 如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间,导致程序在处理这块数据时,会非常慢。
hdfs详细介绍和常用命令:Hadoop生态圈(二)- HDFS操作详解_apache hadoop 如何使用hdfs-CSDN博客
1.1 yarn:资源管理
driver->AM(application master,进行资源调度,为driver分配执行的master)->master
1.2 spark三大数据结构
RDD
弹性分布式数据集。
rdd只有在遇到collect时才会执行逻辑,不保存数据,但是IO可以临时存储一部分数据。
累加器
分布式共享只写变量。
广播变量
分布式共享只读变量。
当需要的数据集不大时,可以通过广播的方式放到每个节点,在每个节点上与大数据集实现本地join,这样可以加快jion的速度。
从Spark Row 到 GenericRowWithSchema
Dataframe.collect () 是常用的将分布式数据载入到Driver的方法,得到的是Array[GenericRowWithSchema]类型,常常需要从GenericRowWithSchema提取数据,具体所以了解GenericRowWithSchema类型是十分有必要的。 而GenericRowWithSchema继承自 org.apache.spark.sql.Row,自己本身并没有定义多少方法。
dataframe的repartition:repartition(numPartitions, *cols) 重新分区(内存中,用于并行度),用于分区数变多,设置变小一般也只是action算子时才开始shuffing;而且当参数numPartitions小于当前分区个数时会保持当前分区个数等于失效。
1.3 shuffle
有些运算需要将各节点上的同一类数据汇集到某一节点进行计算,将分布在不同节点的数据按照一定规则汇集到一起的过程为shuffle。
哪些操作会引起shuffle:
- repartition
- coalesce:有shuffle参数,设置为true则会shuffle,false则只会合并分区
- groupby/groupbyKey
- reduceByKey/foldBykey
- aggregateByKey
- sortBy / sortByKey
- join操作
- distinct:全局shuffle
- cartesian:RDD的笛卡尔积操作
1.4 sparkConf
设置sparkSession的环境,注意一旦获得了sparkSession,最好不要修改sparkSession的参数了
shop设置sparkSession参数的方式:/opt/meituan/spark-2.2/bin/spark-submit
1.5 spark的stage
Exchange:用于两个stage的衔接阶段,处理数据的分区和分布。当需要对数据进行重分区时(例如,当执行join、groupBy等操作时),Spark会创建一个Exchange Stage。
WholeStageCodegen:全阶段代码生成,通过字节码提高CPU效率,减少CPU的执行时间。
在没有WholeStageCodegen的情况下,spark sql在执行物理计划时,每个操作符(filter,map等)都会生成一个迭代器,然后通过迭代器的方法逐行处理数据,这样效率不高,因为每行数据都需要通过虚拟函数调用,这会带来额外的CPU开销。
1.6 spark的操作
join
- Shuffle Join:- 缺点:需要大量网络传输和磁盘I/O,比broadcast join慢- 使用场景:- 数据集都很大- 内存资源有限- 步骤:- 根据join的键,将两个数据集的数据进行分区和排序,所以相同键的数据会发送到同一个节点上- 在每个节点,spark先进行排序,然后进行join操作- 合并各个节点的结果
- Broadcast Join- 缺点:占用节点内存- 使用场景:- 有一个数据集比较小- 内存充足- 提升执行速度- 步骤:- 较小的数据集被广播到每个节点,与大数据集进行本地join,可以避免shuffle操作
创建临时视图
作用:
- 执行SQL查询:一旦你创建了一个临时视图,你就可以像查询数据库表一样对这个视图执行SQL查询。
- 数据重用:如果你有一个DataFrame或者DataSet,你需要在多个地方使用这个数据,你可以创建一个临时视图,然后在需要的地方通过查询这个视图来使用这个数据。
- 数据共享:如果你有一个DataFrame或者DataSet,你需要在多个Spark作业或者Spark应用中共享这个数据,你可以创建一个临时视图,然后在其他作业或者应用中通过查询这个视图来使用这个数据。
要求:
- 视图名由字母、数字和下划线组成,不能包含其他
2. 相比scala新增方法
- reduceByKey:相同key的数据进行聚合。
Spark 读取csv文件操作,option参数解释:Spark 读取csv文件操作,option参数解释_spark option-CSDN博客
3. hive表
3.1 分区
意义:减轻存储数据的压力,分为静态分区和动态分区。
在数据处理的阶段被用到,具体在以下两个阶段:
- 数据加载阶段:- 通过在数据加载时指定分区键来实现,这样数据会根据分区键被分配到不同的分区中。- 后续可以直接对某个分区的数据进行操作,而不是对所有数据集操作。
- 数据处理阶段:- 如果需要对数据进行聚合等操作,可以使用静态分区先放到不同的分区中,避免后续的重新分配和网络传输。- 如果需要对数据进行复杂的转换曹组,可以使用动态分区。
静态分区
create table employees
(
name string,
salary float,
subordinated array<string>,
deductions map<string,float>,
address structstreet:string,city:string,state:string,zip:int
)
partitioned by (country string,state string)
//然后添加分区表
alter table employees add partition (country="china",state="Asia");
动态分区
往hive分区表中插入数据时,如果需要创建的分区很多,比如以表中某个字段进行分区存储,则需要复制粘贴修改很多sql去执行,效率低。因为hive是批处理系统,所以hive提供了一个动态分区功能,其可以基于查询参数的位置去推断分区的名称,从而建立分区。
动态分区默认是没有开启。开启后默认是以严格模式执行的,在这种模式下需要至少一个分区字段是静态的。这有助于阻止因设计错误导致导致查询差生大量的分区。
//动态分区模式时是严格模式,也就是至少有一个静态分区。
hive.exec.dynamici.partition=true; #开启动态分区,默认是false
set hive.exec.dynamic.partition.mode=nonstrict //分区模式,默认strict
然后插入数据
注意:使用,insert...select 往表中导入数据时,查询的字段个数必须和目标的字段个数相同,不能多,也不能少,否则会报错。但是如果字段的类型不一致的话,则会使用null值填充,不会报错。而使用load data形式往hive表中装载数据时,则不会检查。如果字段多了则会丢弃,少了则会null值填充。同样如果字段类型不一致,也是使用null值填充。
自动分区
静态分区+动态分区,静态分区必须在动态分区的前面,动态分区字段一般在最后。
4. spark多线程
每个spark应用程序都需要一个SparkSession(Context)来配置和执行操作。 SparkSession对象是线程安全的,可以根据需要传递给你的Spark应用程序。
4.1 有共享的成员变量
- 工具类使用object。说明工具类是单例的,若对成员变量进行修改会有线程安全问题。在函数内部使用,是在excutor中创建的,每个excutor中有一个工具类,多个task共享一个工具类。
- 工具类使用class。说明工具类是多例的,不会有线程安全问题。每个task有一个工具类的实例。因为工具类是在driver端创建,然后要通过网络发给excutor的,所以必须要实现序列化。序列化可以通过new类的时候序列化或者在calss上实现序列化。
4.2 没有共享的成员变量
- 工具类使用object或者class都没有线程安全问题,但class需要序列化。
5 使用场景
打印日志
println打印的日志可以在driver端显现
但需要注意:
- 如果是在transformation惰性算子(如map,filter,toDf)中,则该打印语句可能没被执行,只有在action操作时才会被打印出来。
统计数量:accumulator
收集报错日志等string类型的信息:accumulatorTool工具
6 spark调优
公司万象资源:https://km.sankuai.com/page/1330779823
spark操作分类:https://km.sankuai.com/page/119088877
6.1 原理
掌握rdd,DAG,调度,存储,内存管理的原理;
调优手段:
- 通用:- 应用开发的基本原则- shuffle、配置项的设置
- 数据分析通用:- spark sql的调优
etl:extract transform load
调优案例1:采用函数式编程、在函数体中减少副作用(如修改变量的状态)
反面案例:将给定数据条目的行数据转变为一个(String, Int)的二元组,读取该数据条目的第二和第4列
val extractFields: Seq[Row] => Seq[(String, Int)] = {
(rows: Seq[Row]) => {
var fields = Seq[(String, Int)]()
rows.map(row => {
fields = fields :+ (row.getString(2), row.getInt(4))
})
fields
}
问题:fields会有追加操作,端到端的执行效率(即开始到结束)低
优化:不用fields承接,直接转换为二元组
示例:
val extractFields: Seq[Row] => Seq[(String, Int)] = {
(rows: Seq[Row]) =>
rows.map(row => (row.getString(2), row.getInt(4))).toSeq
}
结果:效率提升一倍
触发Spark延迟计算的Actions算子主要有两类:一类是将分布式计算结果直接落盘的操作,如DataFrame的write、RDD的saveAsTextFile等;另一类是将分布式结果收集到Driver端的操作,如first、take、collect。
编程时注意:尽量减少数据量大的表的全表扫描!
6.1.1 RDD
RDD是一个弹性数据集,核心特性有partitions,partitioner,dependencies和compute。
dependencies和computer构成了rdd的容错性。
partitions和partitioner构成了rdd的拓展性。
注意:
- 只有key-value的rdd才有分区,非key-value的rdd分区是null
- 每个rdd的分区范围是:0~numPartitions-1
分区的特点:
- RDD的的分区数就是并行度,每个分区会启动一个task。
- 分区体现了数据在集群中的物理分布
- 可调整,可通过repartition和coalescs
- 容错:RDD 的容错性是通过 lineage(血统信息)来实现的。如果某个分区的数据丢失,Spark 可以通过血统信息重新计算丢失的分区数据,而不需要重新计算整个 RDD。
- 分区器:通过不同的分区方式实现分区
分区方式:
- HashPartitioner:可能导致每个分区中数据量的不均匀
- RangePartitioner:将一定范围内的数映射到某一个分区内,可以保证不同分区间的有序性(a分区都大于或小于b分区),不能保证分区内的有序性。
- CustomPartitioner:用户自己实现,可以自定义分区的方式,需要继承org.apache.spark.Partitioner类
不同分区方法采用的分区方式:
6.1.2 内存计算的含义
DAG:有向无环图
DAG的划分:从spark的action动作回溯,每次shuffle操作会划分一个stage,每个stage内部可能有多个stages和多个rdd
spark的内存计算:
- spark可以将数据cache到内存中进行,可以加快数据的存取速度
- spark stage内部的流水线操作:spark会将同一个stage下的多个stages函数(算子)融合为一个函数进行,避免中间变量的产生。注意:这一步可以极大地提升运行效率
综上:spark中减少shuffle的操作。
6.1.3 spark调度系统
调度系统的核心职责:
- 根据DAG划分stage
- 划分task和taskSet
- 获取集群的硬件资源情况,对每个计算节点,构建每一个计算节点的executor的内存、核数的画像。对外,提供wokerOffer,即提供计算能力,由taskScheduler调度使用。
- 按照调度原则决定任务/组的执行顺序
- 依序将任务分发到excutor
调度系统的三个核心组件:上面的1、2由DAGScheduler负责,4由TaskScheduler,3、5由SchedulerBackend
调度的核心原则:数据不动,任务动
调度策略分为stage之间的和stage内部的;
stage之间的有:
- FIFO:先来先调度
- FAIR:用户自定义调度池,每个调度池有不同的优先级,任务关联到不同的调度池机油不同的优先级
stage内部的有:按照任务优先级分发
- Stages内部的任务调度相对来说简单得多。当TaskScheduler接收到来自SchedulerBackend的WorkerOffer后,TaskScheduler会优先挑选那些满足本地性级别要求的任务进行分发。众所周知,本地性级别有4种:Process local < Node local < Rack local < Any。从左至右任务查询数据的效率不断降低
请区分以下两个spark程序的区别:
功能为:Label Encoding(标签编码),将连续的字符串转为离散的数字,供机器学习使用。
//函数定义
def findIndex(templatePath: String, interest: String): Int = {
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
searchMap.getOrElse(interest, -1)
}
//Dataset中的函数调用
findIndex(filePath, "体育-篮球-NBA-湖人")
//函数定义
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)
//Dataset中的函数调用
partFunc("体育-篮球-NBA-湖人")
结论:代码2比代码1在相同环境下快了15倍!
原因:代码2用了高级函数的特性,将findIndex设置为变量,findIndex中将得到的searchMap的getOrElse方法作为参数返回。执行时driver段只需执行一次读取文件和构建map的操作。
反观代码1,读取文件和构建map有多少分区就执行多少次。
spark的不同本地性级别及含义:
- PROCESS_LOCAL:进程本地化,代表任务所需数据都在一个进程
- NODE_LOCAL:节点本地化,数据需要在不同进程交换/读取文件,速度稍慢;有两种可能:1)在不同进程中。2)task需要的数据在磁盘,或HDFS刚好有block在worker的executor中。
- RACK_LOCAL 机架本地化,数据在同一机架的不同节点上。需要通过网络传输数据以及文件 IO,比 NODE_LOCAL 慢。情况一:task 计算的数据在 worker2 的 EXecutor 中。情况二:task 计算的数据在 work2 的磁盘上。ANY 跨机架,数据在非同一机架的网络上,速度最慢。
- DAGScheduler在创建Tasks的过程中,是如何设置每一个任务的本地性级别?答:DAGScheduler切分stage时,会通过submitStage创建每个stage的task,创建Tasks时,会通过getPreferrdeLocations先获取每个task的preferLocation,task的tpreferLocation和partition的preferLocation一样;taskSchedule会在运行时管理task的locality levels。综上:DAGScheduler和taskSchedule共同设置每个任务的本地性级别。
- 在计算与存储分离的云计算环境中,Node local本地性级别成立吗?你认为哪些情况下成立?哪些情况下不成立?
成立的情况:1)混合部署:有的节点既承担存储,也承担计算。2)缓存机制:有缓存的情况下,就算计算与存储分离,缓存有效的情况下也可以实现Node Local级别。3)数据复制:数据被复制到机器上。
不成立:1)完全分离,没有缓存且不支持数据复制。2)存储服务化:比如使用Amazon s3存储。
6.1.4 存储系统
spark存储系统的服务对象:rdd缓存,广播变量,shuffle的中间文件。
rdd缓存:将rdd缓存到内存或磁盘上,优点:1)通过截断DAG,降低失败重新计算的开销。2)通过缓存,减少每次失败从头计算的开销。
shuffle的中间文件:存储位置,数据大小等信息都由存储系统维护。
map阶段:shuffle write操作,根据reducer的规则将数据写到本地磁盘。
reduce阶段:shuffle read操作,从各节点下载数据分片,并根据需要进行聚合。
- 广播变量:利用存储系统,广播变量可以在executor进程内保存全量数据。
存储系统的核心组件:blockManager,blockManagerMaster,MemoryStore,DiskStore,DiskBlockManager。
blockManager:负责executor端数据的本地存取和跨节点传输,具体有:
- 对外:与Driver的blockManagerMaster通信,会报本地元信息,按需拉取远程数据存取状态。不同executor间的blockManager通过server/client模式彼此交互数据。
- 对内:通过组织存取系统的内部组件实现数据的存取、收发。
blockManager如何存储rdd缓存,shuffle中间件和广播变量呢?
有两种存储抽象,分为memoryStore和DiskStore,blockManager利用它们实现数据的存取。
- 对于广播变量:全部存储在executor进程中,所以由memoryStore管理
- 对于rdd缓存:既有memoryStore管理,也可以由diskStore管理
- 对于shuffle中间变量:通常会落到本地节点,文件的落盘和访问由diskStore管理
这两种存储抽象是以什么形式存储的呢?
spark支持对象值(Object Values)和字节数组(Byte Array)。
rdd中数据以分区作为区分,数据存储中以block(块)作为区分,所以分区和block是一一对应的,即一个分区会被物化为存储中的一个block。
memoryStore同时支持两种:统一采用MemoryEntry抽象对这两种类型进行封装,对于存储数据的管理,通过LinkedHashMap[BlockId, MemoryEntry]进行。在rdd缓存场景中,数据的存储是通过把rdd的迭代器转变为一个个MemoryEntry进行的。具体步骤:
- rdd迭代器->数据值:调用putIteratorAsValues或是putIteratorAsBytes方法,把RDD迭代器展开为数据值,然后把这些数据值暂存到一个叫做ValuesHolder的数据结构里。
- 数据值->Array/byteBuffer:直接调用ValuesHolder的方法将其转换为需要的类型,转为MemoryEntry结构。注意:这一步不涉及内存拷贝,也不耗额外的内存。问题:为什么不涉及内存拷贝?可能原因:第1步转换时,已经存在数组中了,所以直接返回了该数组。
- MemoryEntry->链式哈希字典LinkedHashMap[BlockId, MemoryEntry]:哈希字典存储每个blockId和对应的数据信息,此时rdd算是缓存到内存中了。
- 注意:内存不够时会采用LRU算法定期清理过期的key
diskStore比较简单:
只支持字节数组类型的存储,通过putBytes方法将文件落盘,再通过getBytes方法读取文件为数据块。至于文件的路径,数据块与文件的对应关系由DiskBlockManager负责管理。
以shuffle中间件为例,介绍diskStore的管理方式:
stage之间的数据分发由SortShuffleManager负责,在Shuffle write过程中,有3类结果文件:temp_shuffle_XXX、shuffle_XXX.data和shuffle_XXX.index。Data文件存储分区数据,由temp合并而来.
6.1.5 内存管理基础
spark分为堆内(on heap)内存+堆外(off heap)内存
堆内内存:
- 存储spark的rdd、dataframe、dateset,运行时的类对象
- 由JVM管理,管理成本低,但GC会带来额外的性能消耗
- spark对堆内内存的预估不精确(因为申请和删除会有延迟)
堆外内存:
- 存储shuffle中间变量,广播变量,spark sql的中间结果
- 利用unsafe类+反射直接申请和管理内存,管理成本高
- 用紧凑的二进制数组存储应用数据
- 预估准确
内存划分:
- storageMemory:rdd缓存和广播变量的存放
- executionMemory:shuffle时需要的计算和存储空间(执行时需要的内存空间)
- ReserveMemory:存储spark内部对象,如BlockManager等
- UserMemory:存储用户定义的数据结构
执行任务主要分为两类:一类是Shuffle Map阶段的数据转换、映射、排序、聚合、归并等操作;另一类是Shuffle Reduce阶段的数据排序和聚合操作。它们所涉及的数据结构,都需要消耗执行内存。
动态内存管理:storageMemory和executionMemory的内存可以互相使用。
使用原则:
- 如果对方空间有空闲,可以互相抢占
- executionMemory抢占的空间,如果因rdd缓存等需要,需要使用storageMemory,等executionMemory的任务执行完后再释放
- storageMemory抢占的空间,如果因执行任务需要使用executionMemory,此时缓存的数据需要落盘/清除,给executionMemory腾空间
6.1.6 通用性能调优篇
调优方法:
- 坐享其成
- 能省则省、能拖则拖
- 跳出单机思维
原则一:坐享其成:利用spark开发的特性,如钨丝计划(Tungsten),AQE特性,SQL function
Tungsten特点:
- 存储:存储形式是紧凑的二进制,比JVM存储高效更多
- 内存管理:利用unsafe+反射管理内存,比JVM管理更高效,内存评估也更准确
- 运行:运行时利用全阶段代码生成取代火山迭代,开启后比开启前高效16倍
- 如何使用:废弃rdd,改用dataframe,dateset
AQE(自适应查询引擎):
- 自适应调节执行计划,可以更改逻辑计划,进而改变物理计划
- 会进行自动分区合并,合并小分区
- 解决数据倾斜问题:会自动给key加盐(随机前缀,让数据均匀),还会把大分区拆成多个小分区
- 自动调整join策略:将join变为Sort Merge Join,或者如果join的表能放到广播变量里,就回自动变成Broadcast Join
- 如何使用:spark.sql.adaptive.enabled,默认false,true为打开
spark执行计划:
原则二:能省则省,能拖则拖
原则:
- 尽量将节省数据扫描和处理的操作往前提
- 减少shuffle操作
- 即使需要也尽量往后放
示例:
- filter等减少数据的扫描行数的操作往前放
- select等减少数据量和列的操作往前放
- 引起shuffle的操作往后放
示例:
val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
val rootPath: String = _
//读取日志文件,去重、并展开userInterestList
def createDF(rootPath: String, date: String): DataFrame = {
val path: String = rootPath + date
val df = spark.read.parquet(path)
.distinct
.withColumn("userInterest", explode(col("userInterestList")))
df
}
//提取字段、过滤,再次去重,把多天的结果用union合并
val distinctItems: DataFrame = dates.map{
case date: String =>
val df: DataFrame = createDF(rootPath, date)
.select("userId", "itemId", "userInterest", "accessFreq")
.filter("accessFreq in ('High', 'Medium')")
.distinct
df
}.reduce(_ union _)
val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
val rootPath: String = _
val filePaths: List[String] = dates.map(rootPath + _)
/**
一次性调度所有文件
先进行过滤和列剪枝
然后再展开userInterestList
最后统一去重
*/
val distinctItems = spark.read.parquet(filePaths: _*)
.filter("accessFreq in ('High', 'Medium'))")
.select("userId", "itemId", "userInterestList")
.withColumn("userInterest", explode(col("userInterestList")))
.select("userId", "itemId", "userInterest")
.distinct
原则三:跳出单机思想
单机思想的一个示例:在map中做与转换操作无关的事情,比如实例化一个类,获取该类的方法
6.1.7 spark调优参数
硬件类
并行度:出发点是数据
- 数据分区的数量
并行计算任务数:出发点数计算任务,是CPU
- 可同时执行任务的数量:单个executor可执行的任务数= 单个CPU核数/单个任务的CPU核数;spark集群的任务数=spark的executor数*单个executor可执行的任务数
堆内堆外内存的分配原则:
- 如果数据比较扁平,且数据大多是定长字段,可以更多使用堆外内存(堆外用指针+偏移量的二进制数组存储)
- 如果数据比较复杂,用JVM堆内内存更好
spark与内存相关的配置项:
spark.memory.fraction:spark的可用内存占全部内存的比例,剩余的是user.memory(即存储用户自定义数据结构的空间),比例默认0.6,可以适当调高/调低
spark.local.dir:这个参数允许开发者设置磁盘目录,存储rdd cache落盘数据和shuffle中间文件
spark硬件资源类的配置项有:
问题:
- 并行度设置过大会带来哪些弊端?文件过于分散,小分区过多,调度系统开销会增大
- 在Shuffle的计算过程中,有哪些Spark内置的数据结构可以充分利用堆外内存资源?
- 堆外与堆内的取舍,你还能想到其他的制约因素吗?中间变量和运行时变量的互相制约
- 如果内存资源足够丰富,有哪些方式可以开辟内存文件系统,用于配置spark.local.dir参数?
shuffle和spark sql类
shuffle类:通过调整map和reduce的缓冲区大小来调整刷盘的次数,进而调整单次IO的执行效率
- spark.shuffle.file.buffer:
- spark.reducer.maxSizeInFlight:
- spark.shuffle.sort.bypassMergeThreshold:调整reduce的分区数(默认是200),当分区数小于200时就不进行排序。shuffle时会自动进行sorted排序,对于repartition/groupby等仅分组无需分区的操作会带来额外开销,
spark sql大类配置
- 开启AQE:spark.sql.adaptive.enabled设置为true
7 spark本地调试技巧
- 如果包含复杂sql
版权归原作者 qqqahhh 所有, 如有侵权,请联系我们删除。