0


spark学习

本地跑SPARK任务一定要关了VPN,不然无法分配地址,会一致报错random IP。

spark为什么比hadoop快?

  1. 使用了内存缓存技术(并未在内存中持久化,shuffle时要写入磁盘)
  2. spark task启动时间快。spark启用fork线程方法,hadoop采用创建新进程(主要是MapReduce创建多进程)。
  3. spark只有在shuffle时将数据写入磁盘,hadoop多个mr作业间都要依赖磁盘交互
  4. spark的缓存机制比hdfs更高效。

sparkSession:从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext来实现对数据的加载、转换、处理等工作,并且实现了SQLcontext和HiveContext的所有功能。

我们在新版本中并不需要之前那么繁琐的创建很多对象,只需要创建一个SparkSession对象即可。

SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成SQLContext自身中的表。

用户群:互联网大厂,字节几乎所有产品线、百度基于spark推出的bigsql、美团地图

应用场景:海量批处理、实时流计算、图计算、数据分析、机器学习

sparksession是sparksql的入口

  1. 核心概念

HDFS:存储数据的集群,主从结构。

结构:1master(namenode,NN)和多个slave(DateNode,DN)

NN:

  1. 负责客户端请求的响应
  2. 负责元数据(文件的名称,副本系数,Block存放的DN)的管理

DN:

  1. 存储用户文件对应的block
  2. 定期向NN发送心跳信息,汇报本身及其所有的block的信息

client:

  1. 文件切分,文件上传HDFS时,client将文件切分为一个一个block
  2. 与datanode(DN)交互,获取文件的存储位置
  3. 与datanode(DN)交互,读写数据
  4. client会提供一些命令,管理HDFS,比如格式化namenode
  5. client会提供一些命令访问hdfs,比如增删改查

MapReduce:计算

hdfs文件的物理存储是分块(block),可以设置,默认大小在 Hadoop2.x/3.x 是128M(134217728),1.x 版本中是 64M。

  • HDFS 的块设置太小,会增加寻址时间,程序一直在找块的开始位置;
  • 如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间,导致程序在处理这块数据时,会非常慢。

hdfs详细介绍和常用命令:Hadoop生态圈(二)- HDFS操作详解_apache hadoop 如何使用hdfs-CSDN博客

1.1 yarn:资源管理

driver->AM(application master,进行资源调度,为driver分配执行的master)->master

1.2 spark三大数据结构

RDD

弹性分布式数据集。

rdd只有在遇到collect时才会执行逻辑,不保存数据,但是IO可以临时存储一部分数据。

累加器

分布式共享只写变量。

广播变量

分布式共享只读变量。

当需要的数据集不大时,可以通过广播的方式放到每个节点,在每个节点上与大数据集实现本地join,这样可以加快jion的速度。

从Spark Row 到 GenericRowWithSchema

Dataframe.collect () 是常用的将分布式数据载入到Driver的方法,得到的是Array[GenericRowWithSchema]类型,常常需要从GenericRowWithSchema提取数据,具体所以了解GenericRowWithSchema类型是十分有必要的。 而GenericRowWithSchema继承自 org.apache.spark.sql.Row,自己本身并没有定义多少方法。

dataframe的repartition:repartition(numPartitions, *cols) 重新分区(内存中,用于并行度),用于分区数变多,设置变小一般也只是action算子时才开始shuffing;而且当参数numPartitions小于当前分区个数时会保持当前分区个数等于失效。

1.3 shuffle

有些运算需要将各节点上的同一类数据汇集到某一节点进行计算,将分布在不同节点的数据按照一定规则汇集到一起的过程为shuffle。

哪些操作会引起shuffle:

  1. repartition
  2. coalesce:有shuffle参数,设置为true则会shuffle,false则只会合并分区
  3. groupby/groupbyKey
  4. reduceByKey/foldBykey
  5. aggregateByKey
  6. sortBy / sortByKey
  7. join操作
  8. distinct:全局shuffle
  9. cartesian:RDD的笛卡尔积操作

1.4 sparkConf

设置sparkSession的环境,注意一旦获得了sparkSession,最好不要修改sparkSession的参数了

shop设置sparkSession参数的方式:/opt/meituan/spark-2.2/bin/spark-submit

1.5 spark的stage

Exchange:用于两个stage的衔接阶段,处理数据的分区和分布。当需要对数据进行重分区时(例如,当执行join、groupBy等操作时),Spark会创建一个Exchange Stage。

WholeStageCodegen:全阶段代码生成,通过字节码提高CPU效率,减少CPU的执行时间。

在没有WholeStageCodegen的情况下,spark sql在执行物理计划时,每个操作符(filter,map等)都会生成一个迭代器,然后通过迭代器的方法逐行处理数据,这样效率不高,因为每行数据都需要通过虚拟函数调用,这会带来额外的CPU开销。

1.6 spark的操作

join

  • Shuffle Join:- 缺点:需要大量网络传输和磁盘I/O,比broadcast join慢- 使用场景:- 数据集都很大- 内存资源有限- 步骤:- 根据join的键,将两个数据集的数据进行分区和排序,所以相同键的数据会发送到同一个节点上- 在每个节点,spark先进行排序,然后进行join操作- 合并各个节点的结果
  • Broadcast Join- 缺点:占用节点内存- 使用场景:- 有一个数据集比较小- 内存充足- 提升执行速度- 步骤:- 较小的数据集被广播到每个节点,与大数据集进行本地join,可以避免shuffle操作

创建临时视图

作用:

  1. 执行SQL查询:一旦你创建了一个临时视图,你就可以像查询数据库表一样对这个视图执行SQL查询。
  2. 数据重用:如果你有一个DataFrame或者DataSet,你需要在多个地方使用这个数据,你可以创建一个临时视图,然后在需要的地方通过查询这个视图来使用这个数据。
  3. 数据共享:如果你有一个DataFrame或者DataSet,你需要在多个Spark作业或者Spark应用中共享这个数据,你可以创建一个临时视图,然后在其他作业或者应用中通过查询这个视图来使用这个数据。

要求:

  • 视图名由字母、数字和下划线组成,不能包含其他

2. 相比scala新增方法

  • reduceByKey:相同key的数据进行聚合。

Spark 读取csv文件操作,option参数解释:Spark 读取csv文件操作,option参数解释_spark option-CSDN博客

3. hive表

3.1 分区

意义:减轻存储数据的压力,分为静态分区和动态分区。

在数据处理的阶段被用到,具体在以下两个阶段:

  1. 数据加载阶段:- 通过在数据加载时指定分区键来实现,这样数据会根据分区键被分配到不同的分区中。- 后续可以直接对某个分区的数据进行操作,而不是对所有数据集操作。
  2. 数据处理阶段:- 如果需要对数据进行聚合等操作,可以使用静态分区先放到不同的分区中,避免后续的重新分配和网络传输。- 如果需要对数据进行复杂的转换曹组,可以使用动态分区。

静态分区

create table employees

(

name string,

salary float,

subordinated array<string>,

deductions map<string,float>,

address structstreet:string,city:string,state:string,zip:int

)

partitioned by (country string,state string)

//然后添加分区表

alter table employees add partition (country="china",state="Asia");

动态分区

往hive分区表中插入数据时,如果需要创建的分区很多,比如以表中某个字段进行分区存储,则需要复制粘贴修改很多sql去执行,效率低。因为hive是批处理系统所以hive提供了一个动态分区功能,其可以基于查询参数的位置去推断分区的名称,从而建立分区。

动态分区默认是没有开启。开启后默认是以严格模式执行的,在这种模式下需要至少一个分区字段是静态的。这有助于阻止因设计错误导致导致查询差生大量的分区。

//动态分区模式时是严格模式,也就是至少有一个静态分区。

hive.exec.dynamici.partition=true; #开启动态分区,默认是false

set hive.exec.dynamic.partition.mode=nonstrict //分区模式,默认strict

然后插入数据

注意:使用,insert...select 往表中导入数据时,查询的字段个数必须和目标的字段个数相同,不能多,也不能少,否则会报错。但是如果字段的类型不一致的话,则会使用null值填充,不会报错。而使用load data形式往hive表中装载数据时,则不会检查。如果字段多了则会丢弃,少了则会null值填充。同样如果字段类型不一致,也是使用null值填充。

自动分区

静态分区+动态分区,静态分区必须在动态分区的前面,动态分区字段一般在最后。

4. spark多线程

每个spark应用程序都需要一个SparkSession(Context)来配置和执行操作。 SparkSession对象是线程安全的,可以根据需要传递给你的Spark应用程序。

4.1 有共享的成员变量

  1. 工具类使用object。说明工具类是单例的,若对成员变量进行修改会有线程安全问题。在函数内部使用,是在excutor中创建的,每个excutor中有一个工具类,多个task共享一个工具类。
  2. 工具类使用class。说明工具类是多例的,不会有线程安全问题。每个task有一个工具类的实例。因为工具类是在driver端创建,然后要通过网络发给excutor的,所以必须要实现序列化。序列化可以通过new类的时候序列化或者在calss上实现序列化。

4.2 没有共享的成员变量

  1. 工具类使用object或者class都没有线程安全问题,但class需要序列化。

5 使用场景

打印日志

println打印的日志可以在driver端显现

但需要注意:

  • 如果是在transformation惰性算子(如map,filter,toDf)中,则该打印语句可能没被执行,只有在action操作时才会被打印出来。

统计数量:accumulator

收集报错日志等string类型的信息:accumulatorTool工具

6 spark调优

公司万象资源:https://km.sankuai.com/page/1330779823

spark操作分类:https://km.sankuai.com/page/119088877

6.1 原理

掌握rdd,DAG,调度,存储,内存管理的原理;

调优手段:

  • 通用:- 应用开发的基本原则- shuffle、配置项的设置
  • 数据分析通用:- spark sql的调优

etl:extract transform load

调优案例1:采用函数式编程、在函数体中减少副作用(如修改变量的状态)

反面案例:将给定数据条目的行数据转变为一个(String, Int)的二元组,读取该数据条目的第二和第4列

val extractFields: Seq[Row] => Seq[(String, Int)] = {
  (rows: Seq[Row]) => {
    var fields = Seq[(String, Int)]()
    rows.map(row => {
        fields = fields :+ (row.getString(2), row.getInt(4))
    })
  fields
  }

问题:fields会有追加操作,端到端的执行效率(即开始到结束)低

优化:不用fields承接,直接转换为二元组

示例:

val extractFields: Seq[Row] => Seq[(String, Int)] = {
  (rows: Seq[Row]) => 
    rows.map(row => (row.getString(2), row.getInt(4))).toSeq
}

结果:效率提升一倍

触发Spark延迟计算的Actions算子主要有两类:一类是将分布式计算结果直接落盘的操作,如DataFrame的write、RDD的saveAsTextFile等;另一类是将分布式结果收集到Driver端的操作,如first、take、collect。

编程时注意:尽量减少数据量大的表的全表扫描!

6.1.1 RDD

RDD是一个弹性数据集,核心特性有partitions,partitioner,dependencies和compute。

dependencies和computer构成了rdd的容错性。

partitions和partitioner构成了rdd的拓展性。

注意:

  • 只有key-value的rdd才有分区,非key-value的rdd分区是null
  • 每个rdd的分区范围是:0~numPartitions-1

分区的特点:

  1. RDD的的分区数就是并行度,每个分区会启动一个task。
  2. 分区体现了数据在集群中的物理分布
  3. 可调整,可通过repartition和coalescs
  4. 容错:RDD 的容错性是通过 lineage(血统信息)来实现的。如果某个分区的数据丢失,Spark 可以通过血统信息重新计算丢失的分区数据,而不需要重新计算整个 RDD。
  5. 分区器:通过不同的分区方式实现分区

分区方式:

  • HashPartitioner:可能导致每个分区中数据量的不均匀
  • RangePartitioner:将一定范围内的数映射到某一个分区内,可以保证不同分区间的有序性(a分区都大于或小于b分区),不能保证分区内的有序性。
  • CustomPartitioner:用户自己实现,可以自定义分区的方式,需要继承org.apache.spark.Partitioner类

不同分区方法采用的分区方式:

6.1.2 内存计算的含义

DAG:有向无环图

DAG的划分:从spark的action动作回溯,每次shuffle操作会划分一个stage,每个stage内部可能有多个stages和多个rdd

spark的内存计算:

  1. spark可以将数据cache到内存中进行,可以加快数据的存取速度
  2. spark stage内部的流水线操作:spark会将同一个stage下的多个stages函数(算子)融合为一个函数进行,避免中间变量的产生。注意:这一步可以极大地提升运行效率

综上:spark中减少shuffle的操作。

6.1.3 spark调度系统

调度系统的核心职责:

  1. 根据DAG划分stage
  2. 划分task和taskSet
  3. 获取集群的硬件资源情况,对每个计算节点,构建每一个计算节点的executor的内存、核数的画像。对外,提供wokerOffer,即提供计算能力,由taskScheduler调度使用。
  4. 按照调度原则决定任务/组的执行顺序
  5. 依序将任务分发到excutor

调度系统的三个核心组件:上面的1、2由DAGScheduler负责,4由TaskScheduler,3、5由SchedulerBackend

调度的核心原则:数据不动,任务动

调度策略分为stage之间的和stage内部的;

stage之间的有:

  • FIFO:先来先调度
  • FAIR:用户自定义调度池,每个调度池有不同的优先级,任务关联到不同的调度池机油不同的优先级

stage内部的有:按照任务优先级分发

  • Stages内部的任务调度相对来说简单得多。当TaskScheduler接收到来自SchedulerBackend的WorkerOffer后,TaskScheduler会优先挑选那些满足本地性级别要求的任务进行分发。众所周知,本地性级别有4种:Process local < Node local < Rack local < Any。从左至右任务查询数据的效率不断降低

请区分以下两个spark程序的区别:

功能为:Label Encoding(标签编码),将连续的字符串转为离散的数字,供机器学习使用。

//函数定义
def findIndex(templatePath: String, interest: String): Int = {
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
searchMap.getOrElse(interest, -1)
}
 
//Dataset中的函数调用
findIndex(filePath, "体育-篮球-NBA-湖人")
//函数定义
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)
 
//Dataset中的函数调用
partFunc("体育-篮球-NBA-湖人")

结论:代码2比代码1在相同环境下快了15倍!

原因:代码2用了高级函数的特性,将findIndex设置为变量,findIndex中将得到的searchMap的getOrElse方法作为参数返回。执行时driver段只需执行一次读取文件构建map的操作。

反观代码1,读取文件和构建map有多少分区就执行多少次。

spark的不同本地性级别及含义:

  • PROCESS_LOCAL:进程本地化,代表任务所需数据都在一个进程
  • NODE_LOCAL:节点本地化,数据需要在不同进程交换/读取文件,速度稍慢;有两种可能:1)在不同进程中。2)task需要的数据在磁盘,或HDFS刚好有block在worker的executor中。
  • RACK_LOCAL 机架本地化,数据在同一机架的不同节点上。需要通过网络传输数据以及文件 IO,比 NODE_LOCAL 慢。情况一:task 计算的数据在 worker2 的 EXecutor 中。情况二:task 计算的数据在 work2 的磁盘上。ANY 跨机架,数据在非同一机架的网络上,速度最慢。
  1. DAGScheduler在创建Tasks的过程中,是如何设置每一个任务的本地性级别?答:DAGScheduler切分stage时,会通过submitStage创建每个stage的task,创建Tasks时,会通过getPreferrdeLocations先获取每个task的preferLocation,task的tpreferLocation和partition的preferLocation一样;taskSchedule会在运行时管理task的locality levels。综上:DAGScheduler和taskSchedule共同设置每个任务的本地性级别。
  2. 在计算与存储分离的云计算环境中,Node local本地性级别成立吗?你认为哪些情况下成立?哪些情况下不成立?

成立的情况:1)混合部署:有的节点既承担存储,也承担计算。2)缓存机制:有缓存的情况下,就算计算与存储分离,缓存有效的情况下也可以实现Node Local级别。3)数据复制:数据被复制到机器上。

不成立:1)完全分离,没有缓存且不支持数据复制。2)存储服务化:比如使用Amazon s3存储。

6.1.4 存储系统

spark存储系统的服务对象:rdd缓存,广播变量,shuffle的中间文件

  • rdd缓存:将rdd缓存到内存或磁盘上,优点:1)通过截断DAG,降低失败重新计算的开销。2)通过缓存,减少每次失败从头计算的开销。

  • shuffle的中间文件:存储位置,数据大小等信息都由存储系统维护。

map阶段:shuffle write操作,根据reducer的规则将数据写到本地磁盘。

reduce阶段:shuffle read操作,从各节点下载数据分片,并根据需要进行聚合。

  • 广播变量:利用存储系统,广播变量可以在executor进程内保存全量数据。

存储系统的核心组件:blockManager,blockManagerMaster,MemoryStore,DiskStore,DiskBlockManager。

blockManager:负责executor端数据的本地存取和跨节点传输,具体有:

  • 对外:与Driver的blockManagerMaster通信,会报本地元信息,按需拉取远程数据存取状态。不同executor间的blockManager通过server/client模式彼此交互数据。
  • 对内:通过组织存取系统的内部组件实现数据的存取、收发。

blockManager如何存储rdd缓存,shuffle中间件和广播变量呢?

有两种存储抽象,分为memoryStoreDiskStore,blockManager利用它们实现数据的存取。

  • 对于广播变量:全部存储在executor进程中,所以由memoryStore管理
  • 对于rdd缓存:既有memoryStore管理,也可以由diskStore管理
  • 对于shuffle中间变量:通常会落到本地节点,文件的落盘和访问由diskStore管理

这两种存储抽象是以什么形式存储的呢?

spark支持对象值(Object Values)和字节数组(Byte Array)。

rdd中数据以分区作为区分,数据存储中以block(块)作为区分,所以分区和block是一一对应的,即一个分区会被物化为存储中的一个block。

memoryStore同时支持两种:统一采用MemoryEntry抽象对这两种类型进行封装,对于存储数据的管理,通过LinkedHashMap[BlockId, MemoryEntry]进行。在rdd缓存场景中,数据的存储是通过把rdd的迭代器转变为一个个MemoryEntry进行的。具体步骤:

  1. rdd迭代器->数据值:调用putIteratorAsValues或是putIteratorAsBytes方法,把RDD迭代器展开为数据值,然后把这些数据值暂存到一个叫做ValuesHolder的数据结构里。
  2. 数据值->Array/byteBuffer:直接调用ValuesHolder的方法将其转换为需要的类型,转为MemoryEntry结构。注意:这一步不涉及内存拷贝,也不耗额外的内存。问题:为什么不涉及内存拷贝?可能原因:第1步转换时,已经存在数组中了,所以直接返回了该数组。
  3. MemoryEntry->链式哈希字典LinkedHashMap[BlockId, MemoryEntry]:哈希字典存储每个blockId和对应的数据信息,此时rdd算是缓存到内存中了。
  4. 注意:内存不够时会采用LRU算法定期清理过期的key

diskStore比较简单:

只支持字节数组类型的存储,通过putBytes方法将文件落盘,再通过getBytes方法读取文件为数据块。至于文件的路径,数据块与文件的对应关系由DiskBlockManager负责管理。

以shuffle中间件为例,介绍diskStore的管理方式:

stage之间的数据分发由SortShuffleManager负责,在Shuffle write过程中,有3类结果文件:temp_shuffle_XXX、shuffle_XXX.data和shuffle_XXX.index。Data文件存储分区数据,由temp合并而来.

6.1.5 内存管理基础

spark分为堆内(on heap)内存+堆外(off heap)内存

堆内内存:

  • 存储spark的rdd、dataframe、dateset,运行时的类对象
  • 由JVM管理,管理成本低,但GC会带来额外的性能消耗
  • spark对堆内内存的预估不精确(因为申请和删除会有延迟)

堆外内存:

  • 存储shuffle中间变量,广播变量,spark sql的中间结果
  • 利用unsafe类+反射直接申请和管理内存,管理成本高
  • 用紧凑的二进制数组存储应用数据
  • 预估准确

内存划分:

  • storageMemory:rdd缓存和广播变量的存放
  • executionMemory:shuffle时需要的计算和存储空间(执行时需要的内存空间)
  • ReserveMemory:存储spark内部对象,如BlockManager等
  • UserMemory:存储用户定义的数据结构

执行任务主要分为两类:一类是Shuffle Map阶段的数据转换、映射、排序、聚合、归并等操作;另一类是Shuffle Reduce阶段的数据排序和聚合操作。它们所涉及的数据结构,都需要消耗执行内存

动态内存管理:storageMemory和executionMemory的内存可以互相使用。

使用原则:

  1. 如果对方空间有空闲,可以互相抢占
  2. executionMemory抢占的空间,如果因rdd缓存等需要,需要使用storageMemory,等executionMemory的任务执行完后再释放
  3. storageMemory抢占的空间,如果因执行任务需要使用executionMemory,此时缓存的数据需要落盘/清除,给executionMemory腾空间

6.1.6 通用性能调优篇

调优方法:

  • 坐享其成
  • 能省则省、能拖则拖
  • 跳出单机思维

原则一:坐享其成:利用spark开发的特性,如钨丝计划(Tungsten),AQE特性,SQL function

Tungsten特点:

  • 存储:存储形式是紧凑的二进制,比JVM存储高效更多
  • 内存管理:利用unsafe+反射管理内存,比JVM管理更高效,内存评估也更准确
  • 运行:运行时利用全阶段代码生成取代火山迭代,开启后比开启前高效16倍
  • 如何使用:废弃rdd,改用dataframe,dateset

AQE(自适应查询引擎):

  • 自适应调节执行计划,可以更改逻辑计划,进而改变物理计划
  • 会进行自动分区合并,合并小分区
  • 解决数据倾斜问题:会自动给key加盐(随机前缀,让数据均匀),还会把大分区拆成多个小分区
  • 自动调整join策略:将join变为Sort Merge Join,或者如果join的表能放到广播变量里,就回自动变成Broadcast Join
  • 如何使用:spark.sql.adaptive.enabled,默认false,true为打开

spark执行计划:

原则二:能省则省,能拖则拖

原则:

  • 尽量将节省数据扫描和处理的操作往前提
  • 减少shuffle操作
  • 即使需要也尽量往后放

示例:

  • filter等减少数据的扫描行数的操作往前放
  • select等减少数据量和列的操作往前放
  • 引起shuffle的操作往后放

示例:

val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
val rootPath: String = _
 
//读取日志文件,去重、并展开userInterestList
def createDF(rootPath: String, date: String): DataFrame = {
val path: String = rootPath + date
val df = spark.read.parquet(path)
.distinct
.withColumn("userInterest", explode(col("userInterestList")))
df
}
 
//提取字段、过滤,再次去重,把多天的结果用union合并
val distinctItems: DataFrame = dates.map{
case date: String =>
val df: DataFrame = createDF(rootPath, date)
.select("userId", "itemId", "userInterest", "accessFreq")
.filter("accessFreq in ('High', 'Medium')")
.distinct
df
}.reduce(_ union _)
val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
val rootPath: String = _
 
val filePaths: List[String] = dates.map(rootPath + _)
 
/**
一次性调度所有文件
先进行过滤和列剪枝
然后再展开userInterestList
最后统一去重
*/
val distinctItems = spark.read.parquet(filePaths: _*)
.filter("accessFreq in ('High', 'Medium'))")
.select("userId", "itemId", "userInterestList")
.withColumn("userInterest", explode(col("userInterestList")))
.select("userId", "itemId", "userInterest")
.distinct

原则三:跳出单机思想

单机思想的一个示例:在map中做与转换操作无关的事情,比如实例化一个类,获取该类的方法

6.1.7 spark调优参数

硬件类

并行度:出发点是数据

  • 数据分区的数量

并行计算任务数:出发点数计算任务,是CPU

  • 可同时执行任务的数量:单个executor可执行的任务数= 单个CPU核数/单个任务的CPU核数;spark集群的任务数=spark的executor数*单个executor可执行的任务数

堆内堆外内存的分配原则:

  • 如果数据比较扁平,且数据大多是定长字段,可以更多使用堆外内存(堆外用指针+偏移量的二进制数组存储)
  • 如果数据比较复杂,用JVM堆内内存更好

spark与内存相关的配置项:

  • spark.memory.fraction:spark的可用内存占全部内存的比例,剩余的是user.memory(即存储用户自定义数据结构的空间),比例默认0.6,可以适当调高/调低

  • spark.local.dir:这个参数允许开发者设置磁盘目录,存储rdd cache落盘数据和shuffle中间文件

spark硬件资源类的配置项有:

问题:

  1. 并行度设置过大会带来哪些弊端?文件过于分散,小分区过多,调度系统开销会增大
  2. 在Shuffle的计算过程中,有哪些Spark内置的数据结构可以充分利用堆外内存资源?
  3. 堆外与堆内的取舍,你还能想到其他的制约因素吗?中间变量和运行时变量的互相制约
  4. 如果内存资源足够丰富,有哪些方式可以开辟内存文件系统,用于配置spark.local.dir参数?

shuffle和spark sql类

shuffle类:通过调整map和reduce的缓冲区大小来调整刷盘的次数,进而调整单次IO的执行效率

  • spark.shuffle.file.buffer:
  • spark.reducer.maxSizeInFlight:
  • spark.shuffle.sort.bypassMergeThreshold:调整reduce的分区数(默认是200),当分区数小于200时就不进行排序。shuffle时会自动进行sorted排序,对于repartition/groupby等仅分组无需分区的操作会带来额外开销,

spark sql大类配置
  • 开启AQE:spark.sql.adaptive.enabled设置为true

7 spark本地调试技巧

  1. 如果包含复杂sql
标签: spark 学习 大数据

本文转载自: https://blog.csdn.net/qqqahhh/article/details/136418011
版权归原作者 qqqahhh 所有, 如有侵权,请联系我们删除。

“spark学习”的评论:

还没有评论