参考文档及示例代码均基于pyspark==3.1.2
1.什么是RDD?
RDD,弹性分布式数据集(Resilient Distributed Datasets),即一个分布于多个节点机器上的数据集合。为开发人员提供编程抽象,具有只读的特点。这里只读的意思是,当对RDD中的数据修改时,并不修改原RDD,而是返回一个新的RDD。注意RDD本身并不保存数据,只是定义了一组计算规则。
RDD中的弹性体现在:
1)容错性:包括基于血缘关系的容错和自动失败重试的容错。
- 血缘关系的容错:RDD中一个分区的数据丢失,可以通过RDD间的血缘关系重新计算得到该分区的数据。单个节点的故障不影响其他节点的任务处理。
- 自动失败重试的容错:包括task失败重试和stage失败重试,由spark自动支持。且stage失败重试时只重试任务失败的分区,而不是全部计算。
2)计算存储方面:内存和磁盘空间的自动切换和管理。包括计算过程中RDD的存储,及持久化时持久化级别的动态管理。
- 计算过程中RDD的存储:当内存使用完毕时自动溢写磁盘,使得内存较小时也可以处理大数据量。
- 持久化方面:开发者可以自定义选择持久化级别,包括持久化内存,持久化磁盘,持久化内存磁盘相结合的方式。
3)计算过程中可动态调整分区(repartition、coalesce)。
2.job、stage、task如何划分?
job:应用程序中每遇到一个action算子就会划分为一个job。
stage:一个job任务中从后往前划分,分区间每产生了shuffle也就是宽依赖则划分为一个stage,stage的划分体现了spark的pipeline思想,即数据在内存中尽可能的往后多计算,减少磁盘或者网络IO。
task:RDD中一个分区对应一个task。
3.什么是宽窄依赖?
根据分区之间是否产生shuffle来确定。
宽依赖:上游一个分区的数据被打散到下游的多个分区,1:N
窄依赖:上游一个分区的数据全部进入到下游的一个分区,可以是1:1,也可以是N:1
4.spark有哪几种部署模式?
1.Local:本地模式,运行在单个机器,一般用作测试环境。
2.Standalone:一个基于Master+Slaves的资源调度集群。spark任务提交给Master调度管理,是spark自带的一个调度系统。
3.Yarn:spark客户端直接连接yarn,不需要额外构建spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:driver程序的运行节点。yarn-client时driver运行在本地提交任务的客户端,yarn-cluster是driver运行在集群中随机的任一节点。
4.Mesos:比较少用,不了解。
5.K8s:spark后续高版本新增支持。
5.spark中的算子分为哪些类型,举例说明。
spark中算子类型分为两类:
1)转换算子(Transformation):惰性求值,需要action算子进行触发才会执行。返回一个新的RDD。不负责数据存储,只是定义了一个计算规则。
- map:对RDD中的每个元素应用规则。 filter:对RDD中的每个元素按规则过滤。 groupByKey:将相同key的数据合并。 glom:将RDD中的每个分区合并为一个列表。 union:合并两个RDD。 simple:抽样。 注:关于持久化类算子,也有人叫控制算子(cache、persist、checkpoint),严格意义上也属于转换算子,需要动作算子才能触发。
2)动作算子(Action):触发spark任务执行,立即构建DAG有向无环图,不返回RDD,返回RDD的结果或者没有返回值。
- collect:以数组形式获取RDD中所有元素。 count:获取RDD中元素个数。 first:获取RDD中的第一个元素,等价于take(1)。 take:通过指定参数n获取RDD中前n个元素。 top:通过指定参数n获取RDD中排序后的前n个元素。
更多RDD相关API参考官方文档:https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.html#rdd-apis
6.cache、persist、checkpoint的区别,及各自的使用场景?
共同点:1)都用来做持久化,避免多个action算子对同一个RDD的重复计算。2)都遵循spark的惰性执行策略,需要通过action算子触发执行。
区别:
- cache:仅持久化到内存,MEMORY_ONLY级别。等价于persist的默认持久化级别。
- persist:默认持久化到内存(MEMORY_ONLY),但同时支持开发者自定义存储级别,例如仅磁盘(DISK_ONLY),磁盘内存结合(MEMORY_AND_DISK)。 更多的存储级别设置及使用场景参考:https://spark.apache.org/docs/3.1.2/rdd-programming-guide.html#rdd-persistence
- checkpoint:将数据持久化到节点指定路径中(sc.setCheckpointDir方法设置),如果执行模式是cluster则检查点路径必须为HDFS路径。该方法与上述两种方法最大的不同点在于会截断RDD的血缘关系,而上述两种方法不会截断血缘关系,只是起到了缓存数据避免重复计算的作用。checkpoint实际使用中有两点需要注意:1)checkpoint之前不要触发RDD的动作算子,否则会截断血缘关系,导致checkpoint重新计算时找不到血缘链条从而保存不到数据。2)checkpoint前最好将需要保存的RDD通过cache或者persist缓存一下,避免RDD的重复计算。
7.广播变量与累加器。
广播变量和累加器是spark中提供的两种共享变量,分别用来解决广播通信和任务结果汇总的两种业务场景问题。详细参考官方文档:https://spark.apache.org/docs/3.1.2/rdd-programming-guide.html#shared-variables
1)广播变量
简而言之,就是在每个集群节点中缓存一份driver端定义的公共变量,且该被广播的变量在executor中只读。
当不使用广播变量的时候,spark任务中需要用到的公共变量会copy到每个task中,这种方式弊端一是重复存储占用内存资源,二是增加了IO操作。而使用广播变量,driver端定义的公共变量只会往每个集群中的worker节点中copy一份,由executor中的所有task共享。且该方法的底层实现涉及到了序列化与反序列化以及高效的广播算法,所以效率比较高。
demo:
from pyspark.sql import SparkSession
"""
需求:从rdd中过滤掉singer中歌手的歌曲
"""
spark = SparkSession.builder \
.master("local[*]") \
.appName("broadcast_demo") \
.config("spark.executor.instances","4") \
.config("spark.executor.cores","2") \
.config("spark.executor.memory","1g") \
.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([("梁静茹","向左转向右转"),("梁静茹","亲亲"),("王诗安","Home"),("李宗盛","山丘"),("邵夷贝","未来俱乐部")],2)print(f"过滤前:{rdd.collect()}")
singer =["梁静茹","王诗安"]# 设置广播变量并将singer广播到executor
bc = sc.broadcast(singer)# 根据广播变量过滤并输出过滤结果
rdd_filter = rdd.filter(lambda x: x[0]notin bc.value)print(f"过滤后:{rdd_filter.collect()}")
sc.stop()
spark.stop()
2)累加器
累加器,简要的概括,是一种分布式共享只写变量。在driver端定义,并被序列化到每个executor中,在使用时被反序列化。所有executor中的task持有一个累加器的副本进行累加操作。并将结果回传给driver进行汇总。spark原生支持数值型累加器,也支持开发人员自定义累计器类型。
demo:
from pyspark.sql import SparkSession
"""
需求:统计rdd中属于singer中歌手的歌曲数量
"""
spark = SparkSession.builder \
.master("local[*]") \
.appName("accumulator_demo") \
.config("spark.executor.instances","4") \
.config("spark.executor.cores","2") \
.config("spark.executor.memory","1g") \
.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([("梁静茹","向左转向右转"),("梁静茹","亲亲"),("王诗安","Home"),("李宗盛","山丘"),("邵夷贝","未来俱乐部")],2)
singer =["梁静茹","王诗安"]# 初始化一个初值为0的累加器
acc = sc.accumulator(0)# 定义map函数,统计属于singer的歌曲数量defmap_fun(x, s):if x[0]in s:
acc.add(1)# 使用collect算子触发执行map函数并输出结果
rdd.map(lambda x: map_fun(x, singer)).collect()print(f"属于singer的歌曲数量:{acc.value}")
sc.stop()
spark.stop()
8.reduceByKey与groupByKey的区别?
9.spark数据倾斜及通用调优。
10.map与flatMap区别?
map:对RDD中的每个元素应用规则,并返回一个新的元素。也就是结果RDD的元素数量与原始RDD元素数量相等。
flatMap:对RDD中每个元素应用规则,并返回一个集合,集合中的元素可以为0个或多个。在此基础之上,再对所有的集合进行flat平铺操作,可以理解为将各个集合元素合并到一起。
demo:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName("demo") \
.config("spark.executor.instances","4") \
.config("spark.executor.cores","2") \
.config("spark.executor.memory","1g") \
.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([2,3,4],2)
rdd1 = rdd.map(lambda x:range(1, x))
rdd2 = rdd.flatMap(lambda x:range(1, x))print(f"map: {rdd1.collect()}")print(f"flatMap: {rdd2.collect()}")
sc.stop()
spark.stop()
11.spark中的shuffle有哪几种方式?
两种。早期的HashShuffle,和后期的SortShuffle。
HashShuffle(后续高版本已被SortShuffle取代):
- 未优化:基于对下游分区个数hash取模实现,下游有多少个分区,上游每个task都会产生多少个小文件,带来的问题是小文件过多,增大磁盘和网络IO,拖慢执行效率。同时上游每个task维护了多个小文件缓冲区,增加内存压力。理论上的小文件个数 = map task数量 x 下游分区数量。
- 优化后:HashShuffle的优化其实就是针对上游task产生的小文件的合并优化。未优化前,每个task维护各自的缓冲区并生成和下游分区数量相等的小文件,优化后,每个executor中属于同一个的core的task,会产生和下游分区数量相等的小文件并复用同一组小文件。所以理论上的小文件个数 = 上游core个数 x 下游分区数量。
SortShuffle:
- 普通SortShuffle:上游的每个map task会不断地往磁盘溢写小文件(溢写前会进行排序),每次溢写产生一个小文件,最终将所有属于同一个task溢写的小文件merge为一个大文件,并且产生一个索引文件,下游的reduce task根据索引文件去读取属于自己分区的数据。即产生的小文件个数 = map task数量 x 2。
- bypass机制:这种机制,可以理解为,在未优化的HashShuffle机制基础上,对同一个task产生的小文件进行了一个合并的功能,产生一个大文件,同时生成一个索引文件。这种机制相比普通SortShuffle省略了排序的过程。产生的文件个数 = map task数量 x 2。触发该机制的两个阈值条件:1)reduce task数量 <
spark.shuffle.sort.bypassMergeThreshold
参数的值,默认为200。2)不是聚合类的shuffle算子。准确来说,不是map端预聚合的算子(eg:reduceByKey,因为为了聚合的高效,通常要求数据有序,而bypass机制并不对数据排序)。
12.spark为什么比MR快?
1.spark是基于内存的计算引擎,MR是基于磁盘的计算引擎。spark会将中间计算结果最大化保存内存,而MR的中间计算结果序列化到磁盘,过多的磁盘IO导致执行效率低下。
2.DAG计算模型,通过对不需要shuffle的task组成任务链,使得数据尽可能的往后计算,减少磁盘IO和网络IO。
3. 灵活高效的容错机制。spark对于任务出错的分区,会根据血缘关系只重新计算出错分区子任务,而不是重新计算整个任务。
4.资源申请方面,spark属于粗粒度申请,MR属于细粒度申请。spark在应用开始会一次性申请资源,在应用程序生命周期内一直持有这些资源,为任务动态分配。MR每一个map任务或reduce任务都会进行资源的申请和释放。
5.spark基于线程执行,MR基于进程执行。线程的启动开销快于进程。
6.MR有reduce一定发生shuffle,且shuffle必然伴随着排序。spark有reduce不一定有shuffle,有shuffle也不一定排序。
13.spark中产生shuffle的算子举例。
版权归原作者 atwdy 所有, 如有侵权,请联系我们删除。