0


大数据面试题——spark

文章目录

讲一下spark 的运行架构

👉**Cluster Manager(Master)**:在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器

👉 Worker节点:从节点,负责控制计算节点,启动Executor或者Driver。

👉 Driver: 运行Application 的main()函数

👉 Executor:执行器,是为某个Application运行在worker node上的一个进程

一个spark程序的执行流程

1、启动:用户程序启动

SparkContext

,是程序的总入口,初始化过程中启动

DAGScheduler

作业调度和

TaskScheduler

任务调度。
2、生成作业:

DAGScheduler

:根据

shuffleDependency

将作业划分为不同的

stage

,根据 RDD之间的依赖关系,宽依赖和窄依赖,划分原则就是遇见窄依赖就放进当前stage,遇到宽依赖则断开。(相当于shuffle是前后的stage分界线)每一个stage里面都会划分一个taskset,也就是数据集,而DAGSchedule的下一个任务就是将这个TaskSet传给TaskSchedule(在最后一个 stage划分结束,就会触发作业的提交)。
3、提交任务集:

TaskScheduler

:分配 Task到哪一个executor上去执行,

SchedulerBackend

配合

TaskScheduler 

完成具体任务的资源分配。
4、任务执行:

Executor

:实际任务的运行最终都 Execter 类来执行,对每个任务创建一个TaskRunner类,交给线程池去实现。


  1. spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGSchedulerTaskScheduler
  2. TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。
  3. Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executer。
  4. Executor 启动后,会自己反向注册到 TaskScheduler 中。 所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。
  5. 每执行到一个 Action,就会创建一个 Job。Job 会提交给 DAGScheduler。
  6. DAGScheduler 会将 Job划分为多个 stage,然后每个 stage 创建一个 TaskSet。
  7. TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。
  8. Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。(TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)

讲一下宽依赖和窄依赖

区别宽窄依赖的核心点是 子RDD的partition与父RDD的partition是否是1对多的关系,

如果是这样的关系的话,说明多个父rdd的partition需要经过shuffle过程汇总到一个子rdd的partition,这样就是一次宽依赖,在DAGScheduler中会产生stage的切分.

窄依赖:Narrow Dependency
父RDD和子RDD是一对一的依赖关系,如map,filter

宽依赖:Shuffle Dependency
本质就是shuffle。如reduceByKey,groupyByKey,父RDD一个分区数据给了子RDD的多个分区

存在shuffle就是宽依赖,否则就是窄依赖

RDD 作为数据结构,本质上是一个只读的分区记录集合。一个 RDD 可以包含多个分区,每个分区就是一个数据集片段。

首先,窄依赖可以支持在同一个节点上,以 pipeline 形式执行多条命令(也叫同一个 Stage 的操作),例如在执行了 map 后,紧接着执行 filter。相反,宽依赖需要所有的父分区都是可用的,可能还需要调用类似 MapReduce 之类的操作进行跨节点传递

其次,则是从失败恢复的角度考虑。窄依赖的失败恢复更有效,因为它只需要重新计算丢失的 parent partition 即可,而且可以并行地在不同节点进行重计算(一台机器太慢就会重新调度到多个节点进行)。

spark的stage是如何划分的

stage的划分依据就是看是否产生了shuflle(即宽依赖),遇到一个shuffle操作就划分为前后两个stage.

Spark的 RDD容错机制。

两个方法:利用“血缘(Lineage)容错”和检查点(checkpoint)机制。

  1. “血缘”容错:利用依赖关系进行数据恢复,在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。 在窄依赖中,在子RDD 的分区丢失、重算父RDD分区时,父RDD相应分区计算所得到的数据都是子RDD分区的数据(一对一),并不存在冗余计算。 而宽依赖需要父RDD的所有分区都存在,重算花销大。在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的(其他子分区也会获得父亲重新计算的数据),会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此如果使用Checkpoint算子来做检查点。
  2. 检查点机制:在宽依赖当中,如果 Lineage过长,重算开销会很大,通过设定检查点,在依赖关系中,对关系中间预算的结果进行数据冗余条份,所以数据恢复时,就可以l从检查点开始进行重新计算Lineage,减少开销。

checkpoint 检查点机制?

应用场景:当 spark 应用程序特别复杂,从初始的 RDD 开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用 checkpoint 功能。

原因:对于特别复杂的 Spark 应用,会出现某个反复使用的 RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。

Checkpoint 首先会调用

SparkContext 

setCheckPointDIR()

方法,设置一个容错的文件系统的目录,比如说

 HDFS

;然后对 RDD 调用

checkpoint()

方法。之后在

RDD

所处的

job

运行结束之后,会启动一个单独的

job

,来将

checkpoint 

过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。

检查点机制是我们在

spark streaming 

中用来保障容错性的主要机制,它可以使

spark streaming

阶段性的把应用数据存储到诸如 HDFS 等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:

  1. 控制发生失败时需要重算的状态数。Spark streaming 可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。
  2. 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样 spark streaming 就可以读取之前运行的程序处理数据的进度,并从那里继续。

RDD、DAG、 Stage、 Task 、 Job

RDD RDD 是 Spark 的灵魂,也称为弹性分布式数据集。一个 RDD 代表一个可以被分区的只读数据集。RDD 内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。

DAG Spark 中使用 DAG 对 RDD 的关系进行建模,描述了 RDD 的依赖关系,这种关系也被称之为 lineage(血缘),RDD 的依赖关系使用 Dependency 维护。

Stage 在 DAG 中又进行 Stage 的划分,划分的依据是依赖是否是 shuffle 的,每个 Stage 又可以划分成若干 Task。接下来的事情就是 Driver 发送 Task 到 Executor,Executor 线程池去执行这些 task,完成之后将结果返回给 Driver。

Job Spark 的 Job 来源于用户执行 action 操作(这是 Spark 中实际意义的 Job),就是从 RDD 中获取结果的操作,而不是将一个 RDD 转换成另一个 RDD 的 transformation 操作。

Task 一个 Stage 内,最终的 RDD 有多少个 partition,就会产生多少个 task。

Spark的shuffle介绍

shuffle简介:在 DAG 阶段以shuffle为界,划分 stage,
上游 stage做 map task,每个maptask将计算结果数据分成多份,每一份对应到下游stage 的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write;
下游stage 做reduce task,每个reduce task通过网络拉取上游 stage中所有map task的指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑。

Shuffle版本也随着spark不断进步和优化:
从2.0开始,把

Sort Based Shuffle

Tungsten-Sort

全部统一到

Sort Based Shuffle

中,

Hash Based Shuffle

退出历史舞台。
目前spark2.1,直接把

SortBased Shuffle

的writer分为三种:

BypassMergeSortShuffleWriter

,

SortShuffleWriter

UnsafeShuffle Writer

  1. BypassMergeSortShuffle Writer: Hash Shuffle 中的HashShuffle Writer 实现基本一致,唯一的区别在于,map端的多个输出文件会被汇总为一个文件。所有分区的数据会合并为同一个文件,会生成一个索引文件,是为了索引到每个分区的起始地址,可以随机 access某个partition 的所有数据。
  2. SortShuffleWriter:会对分区内进行排序或者全局排序。 处理步骤:使用 PartitionedAppendOnlyMap或者 PartitionedPairBuffer 在内存中进行排序,排序的K是(partitionld, hash (key))这样一个元组。如果超过内存limit,spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionld 时排厅,如宋 partona相同,再根据hash (key)进行比较排序。如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件和当前内存中的数据结构中的数据进行merge sort,实现全局排序。 最终读取的时候,从整个全局merge后的读取迭代器中读取的数据,就是按照partitionld 从小到大排序的数据,误取过在中使用丹仅州刀区力权,并且记录每个分区文件的起始写入位置,把这些位置数据写入索引文件中。
  3. UnsafeShuffleWriter:优化部分是在shuffle write进行序列化写入过程中,直接对二进制进行排序,减少了内存消耗,最终只是 partition 级别的排序。 但是这种需要一定条件:对单条记录、shuffle数量有限制,而且不能带有聚合函数。排序实现:利用一个LongArray存储分区 ID、pageNumber、offset in page,并对这个数组排序。每次插入一条 record 到 page 中,就把 partionld + pageNumber + offset in page,作可以迭代器 PackedRecordPointer 为一个元素插入到 LongArray中。要想反向获得 record, 定义的数据结构就是[24 bit partition number][13 bit memory page number][27 bit offset inpage]然后到根据该指针可以拿到真实的record。

Spark为什么快,Spark SQL 一定比 Hive 快吗

Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。

  1. 消除了冗余的 HDFS 读写: Hadoop 每次 shuffle 操作后,必须写到磁盘,而 Spark 在 shuffle 后不一定落盘,可以 persist 到内存中,以便迭代时使用。如果操作复杂,很多的 shufle 操作,那么 Hadoop 的读写 IO 时间会大大增加,也是 Hive 更慢的主要原因了。
  2. 消除了冗余的 MapReduce 阶段: Hadoop 的 shuffle 操作一定连着完整的 MapReduce 操作,冗余繁琐。而 Spark 基于 RDD 提供了丰富的算子操作,且 reduce 操作产生 shuffle 数据,可以缓存在内存中。
  3. JVM 的优化: Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多少。

Spark的 partitioner 都有哪些?/RDD的分区函数

RDD分区函数HashPatitoner和 RangePatitoner 实现

1、

HashPatitioner

分区:对于给定的key值,计算其 hashcode除以分区数取余,最后这个值就是分区的id。可能会出现数据不均匀,因为同一 key值都在同一分区。

2、

RangerPartitoner

分区:将一定范围的数映射到某一分区内。
第一步:确定边界,从所有的RDD当中随机抽取样本并进行排序,依次来确定分区的rangerBounds(边界)(这里因为RDD如果很大的话,没法按照计算总量,所以需要用到蓄水池抽样)。
第二步:计算key值在rangerBounds所处的范围,得出分区id。

额外:蓄水池抽样算法,不知总量的随机抽样。找个 leetcode敲下理解。


leetcode382.Linked List Random Node(算法随机采样)
Init : a reservoir with the size: k   //k is the sample number
fori=k+1 to N
    M=random(1, i);if(M<k)
    SWAP the Mth value and ith value
end for

Spark运行模式(资源调度框架的使用,了解)

  1. Local模式:启动多线程或者多进程对程序进行单机调试的。
  2. 分布式部署模式: 👉Standalone模式:独立模式,自带完整的模式。在架构上和 MapReduce1比较,具有一致性,都是由Master、worker组成(只是名称不一样),资源抽象为粗粒式的slot,多少slot多少task。 👉Spark on YARN:因为现在企业用到 hadoop是基于YARN 的,为了融合spark,进行统一资源管理。有两种方式, YARN-client(用于交互,client当中运行sparkContext进程进行任务分发监控),YARN-cluster 任务的分发和监控放在 MRAPPmaster当中。 👉 Spark on mesos:YARN和 mesos都是统一资源管理和调度系统。mesos支持粗粒式和细粒式调度,前者节省了资源调度时间的开销,后者是不存在资源的浪费,但是资源调度延迟较大。

Spark中的算子都有哪些

总的来说,spark分为三大类算子:

  • Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理; Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算
  • Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业; Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统
  • controller 控制操作:Spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。 控制算子有三种,cache,persist,(RDD 持久化原理)checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

1. Value数据类型的Transformation算子

输入分区与输出分区一对一型

  • map算子
  • flatMap算子
  • mapPartitions算子
  • glom算子

输入分区与输出分区多对一型

  • union算子
  • cartesian算子

输入分区与输出分区多对多型

  • grouBy算子

输出分区为输入分区子集型

  • filter算子
  • distinct算子
  • subtract算子
  • sample算子
  • takeSample算子

Cache型

  • cache算子
  • persist算子

2. Key-Value数据类型的Transfromation算子

输入分区与输出分区一对一

  • mapValues算子

对单个RDD或两个RDD聚集

  • combineByKey算子
  • reduceByKey算子
  • partitionBy算子
  • Cogroup算子

连接

  • join算子
  • leftOutJoin 和 rightOutJoin算子

3. Action算子

无输出

  • foreach算子

HDFS算子

  • saveAsTextFile算子
  • saveAsObjectFile算子

Scala集合和数据类型

  • collect算子
  • collectAsMap算子
  • reduceByKeyLocally算子
  • lookup算子
  • count算子
  • top算子
  • reduce算子
  • fold算子
  • aggregate算子
  • countByValue
  • countByKey

RDD 中 reduceBykey 与 groupByKey 哪个性能好,为什么

reduceByKey:reduceByKey 会在结果发送至 reducer 之前会对每个 mapper 在本地进行 merge,有点类似于在 MapReduce 中的 combiner。这样做的好处在于,在 map 端进行一次 reduce 之后,数据量会大幅度减小,从而减小传输,保证 reduce 端能够更快的进行结果计算。

groupByKey:groupByKey 会对每一个 RDD 中的 value 值进行聚合形成一个序列 (Iterator),此操作发生在 reduce 端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成 OutOfMemoryError。

所以在进行大量数据的 reduce 操作时候建议使用 reduceByKey。不仅可以提高速度,还可以防止使用 groupByKey 造成的内存溢出问题。

RDD 持久化原理?

spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中。

调用 cache() 和 persist() 方法即可。cache() 和 persist() 的区别在于,cache() 是 persist() 的一种简化方式,cache() 的底层就是调用 persist() 的无参版本 persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中清除缓存,可以使用 unpersist() 方法。RDD 持久化是可以手动选择不同的策略的。在调用 persist() 时传入对应的 StorageLevel 即可。

checkpoint 和持久化机制的区别?

最主要的区别在于持久化只是将数据保存在 BlockManager 中,但是 RDD 的 lineage(血缘关系,依赖关系) 是不变的。但是 checkpoint 执行完之后,rdd 已经没有之前所谓的依赖 rdd 了,而只有一个强行为其设置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改变了。

持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是 checkpoint 的数据通常是保存在高可用的文件系统中,比如 HDFS 中,所以数据丢失可能性比较低

RDD懒加载是什么意思

Transformation

操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Acion 操作的时候才会真正触发运算,这也就是懒加载.

RDD有哪些特点

顾名思义,从字面理解RDD就是 Resillient Distributed Dataset,即弹性分布式数据集
它是Spark提供的核心抽象。
RDD在抽象上来讲是一种抽象的分布式的数据集。它是被分区的,每个分区分布在集群中的不同的节点上。从而可以让数据进行并行的计算

rdd 分布式弹性数据集,简单的理解成一种数据结构,是 spark 框架上的通用货币。所有算子都是基于 rdd 来执行的,不同的场景会有不同的 rdd 实现类,但是都可以进行互相转换。rdd 执行过程中会形成 dag 图,然后形成 lineage 保证容错性等。从物理的角度来看 rdd 存储的是 block 和 node 之间的映射。

RDD 在逻辑上是一个 hdfs 文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让 RDD 中的数据可以被并行操作(分布式数据集)

比如有个 RDD 有 90W 数据,3 个 partition,则每个分区上有 30W 数据。RDD 通常通过 Hadoop 上的文件,即 HDFS 或者 HIVE 表来创建,还可以通过应用程序中的集合来创建;RDD 最重要的特性就是容错性,可以自动从节点失败中恢复过来。即如果某个结点上的 RDD partition 因为节点故障,导致数据丢失,那么 RDD 可以通过自己的数据来源重新计算该 partition。这一切对使用者都是透明的。

RDD 是 spark 提供的核心抽象,全称为弹性分布式数据集。

它主要特点就是弹性和容错性。
弹性:RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘
容错性:RDD可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。

spark 解决了 hadoop 的哪些问题(spark VS MR)?

  1. MR:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手;Spark:Spark 采用 RDD 计算模型,简单容易上手。
  2. MR:只提供 map 和 reduce 两个操作,表达能力欠缺;Spark:Spark 采用更加丰富的算子模型,包括 map、flatmap、groupbykey、reducebykey 等;
  3. MR:一个 job 只能包含 map 和 reduce 两个阶段,复杂的任务需要包含很多个 job,这些 job 之间的管理以来需要开发者自己进行管理;Spark:Spark 中一个 job 可以包含多个转换操作,在调度时可以生成多个 stage,而且如果多个 map 操作的分区不变,是可以放在同一个 task 里面去执行;
  4. MR:中间结果存放在 hdfs 中;Spark:Spark 的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁盘,而不是 hdfs;
  5. MR:只有等到所有的 map task 执行完毕后才能执行 reduce task;Spark:Spark 中分区相同的转换构成流水线在一个 task 中执行,分区不同的需要进行 shuffle 操作,被划分成不同的 stage 需要等待前面的 stage 执行完才能执行。
  6. MR:只适合 batch 批处理,时延高,对于交互式处理和实时处理支持不够;Spark:Spark streaming 可以将流拆成时间间隔的 batch 进行处理,实时计算。

spark有哪几种join

Spark 中和 join 相关的算子有这几个

join

fullOuterJoin

leftOuterJoin

rightOuterJoin
  • join join函数会输出两个RDD中key相同的所有项,并将它们的value联结起来,它联结的key要求在两个表中都存在,类似于SQL中的INNER JOIN。但它不满足交换律,a.join(b)与b.join(a)的结果不完全相同,值插入的顺序与调用关系有关。
  • leftOuterJoin leftOuterJoin会保留对象的所有key,而用None填充在参数RDD other中缺失的值,因此调用顺序会使结果完全不同。如下面展示的结果,
  • rightOuterJoin rightOuterJoin与leftOuterJoin基本一致,区别在于它的结果保留的是参数other这个RDD中所有的key。
  • fullOuterJoin fullOuterJoin会保留两个RDD中所有的key,因此所有的值列都有可能出现缺失的情况,所有的值列都会转为Some对象。

hadoop 和 spark 的相同点和不同点?

Hadoop 底层使用 MapReduce 计算架构,只有 map 和 reduce 两种操作,表达能力比较欠缺,而且在 MR 过程中会重复的读写 hdfs,造成大量的磁盘 io 读写操作,所以适合高时延环境下批处理计算的应用;

Spark 是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,数据分析更加快速,所以适合低时延环境下计算的应用;

spark 与 hadoop 最大的区别在于迭代式计算模型。基于 mapreduce 框架的 Hadoop 主要分为 map 和 reduce 两个阶段,两个阶段完了就结束了,所以在一个 job 里面能做的处理很有限;spark 计算模型是基于内存的迭代式计算模型,可以分为 n 个阶段,根据用户编写的 RDD 算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以 spark 相较于 mapreduce,计算模型更加灵活,可以提供更强大的功能。

但是 spark 也有劣势,由于 spark 基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的轻局昂下,可能会出现各种各样的问题,比如 OOM 内存溢出等情况,导致 spark 程序可能无法运行起来,而 mapreduce 虽然运行缓慢,但是至少可以慢慢运行完。

Hadoop/MapReduce 和 Spark 最适合的都是做离线型的数据分析,但 Hadoop 特别适合是单次分析的数据量 “很大” 的情景,而 Spark 则适用于数据量不是很大的情景。

  1. 一般情况下,对于中小互联网和企业级的大数据应用而言,单次分析的数量都不会 “很大”,因此可以优先考虑使用 Spark。
  2. 业务通常认为 Spark 更适用于机器学习之类的 “迭代式” 应用,80GB 的压缩数据(解压后超过 200GB),10 个节点的集群规模,跑类似 “sum+group-by” 的应用,MapReduce 花了 5 分钟,而 spark 只需要 2 分钟。

如何将spark-sql的Row转成Java对象?

Dataset转POJO

  1. 将查询出的结果转为RDD
  2. 将RDD创建为DataFrame,并传入schema参数
  3. 调用as方法,将Dataset转为相应的POJO Dataset
  4. 调用collectAsList()方法
SparkSession spark =CloudUtils.getSparkSession();// 查询原始数据Dataset<Row> student = spark.sql("select * from `event`.`student`");// 生成schemaList<StructField> fields =newArrayList<>();
        fields.add(DataTypes.createStructField("id",DataTypes.StringType,true));
        fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
        fields.add(DataTypes.createStructField("major",DataTypes.StringType,true));StructType schema =DataTypes.createStructType(fields);// 转换查询结果为POJO ListList<Student> students = spark.createDataFrame(student.toJavaRDD(), schema).as(Encoders.bean(Student.class)).collectAsList();System.out.println(students);

Dataset中的日期类型为timestamp和java中的Date类型不兼容,和Timestamp类型相互兼容。
为了解决上述问题,我们可以先将Dataset转为JSON,然后将JSON转为POJO,代码

// 查出数据并转为json集合List<String> jsonList = spark.sql("select * from `event`.`user`").toJSON().collectAsList();// 将json转为pojo,这里使用的是FastJSON        List<User> users = jsonList.stream().map(jsonString -> JSON.parseObject(jsonString,User.class)).collect(Collectors.toList());System.out.println(users);

POJO转Dataset

// 获取users列表List<User> users =createUsers();// 使用createDataFrame转为datasetDataset<Row> ds = spark.createDataFrame(users,User.class);// 将驼峰式列名改为下划线式列名,camelToUnderline方法网上搜索String[] columns = ds.columns();String[] newColumns =Arrays.stream(columns).map(column ->camelToUnderline(column)).toArray(String[]::new);// 转为新的df(重命名后的)
        ds.toDF(newColumns);
        ds.show();

--------------------------------

Spark SQL三种join

Mysql 的 join怎么实现的?
对于Spark来说有3中Join的实现,每种 Join对应着不同的应用场景:

  • Broadcast Hash Join:适合一张较小的表和一张大表进行join
  • Shuffle Hash Join :适合一张小表和一张大表进行join,或者是两张小表之间的join
  • Sort Merge Join:适合两张较大的表之间进行 join
  1. Hash Join:小表会作为Build Table,大表作为Probe Table,依次读取Build Table的数据,对于每一行数据根据join key进行 hash,hash到对应的Bucket,生成 hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存;再依次扫描 Probe Table ( order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件,如果匹配成功就可以将两者 join在一起(为什么 Build Table选择小表?因为构建的 HashTable最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景)。
  2. Broadcast Hash Join:当有限维度表和事实表进行Join操作时,为了避免shuffle,我们可以将大小有限的维度表的全部数据分发到每个节点上,供事实表使用。executor存储维度表中的全部数据,一定程度上牺牲了空间,换取 shuffle操作大量的耗时。 👉(1)broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于bittorrete的 p2p思路; 👉(2) hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探;
  3. Shuffle Hash Join:利用key相同必然分区相同的这个原理,SparkSQL将较大表的join分而治之,先将表划分成n个分区,再对两个表中相对应分区的数据分别进行 Hash Join,这样即在一定程度上减少了driver广播一侧表的压力,也减少了executor端取整张被广播表的内存消耗。

数据倾斜的产生和解决办法?

数据倾斜以为着某一个或者某几个 partition 的数据特别大,导致这几个 partition 上的计算需要耗费相当长的时间。

在 spark 中同一个应用程序划分成多个 stage,这些 stage 之间是串行执行的,而一个 stage 里面的多个 task 是可以并行执行,task 数目由 partition 数目决定,如果一个 partition 的数目特别大,那么导致这个 task 执行时间很长,导致接下来的 stage 无法执行,从而导致整个 job 执行变慢。

避免数据倾斜,一般是要选用合适的 key,或者自己定义相关的 partitioner,通过加盐或者哈希值来拆分这些 key,从而将这些数据分散到不同的 partition 去执行。

如下算子会导致 shuffle 操作,是导致数据倾斜可能发生的关键点所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;

Spark 主备切换机制原理知道吗?

Master 实际上可以配置两个,Spark 原生的 standalone 模式是支持 Master 主备切换的。当 Active Master 节点挂掉以后,我们可以将 Standby Master 切换为 Active Master。

Spark Master 主备切换可以基于两种机制,一种是基于文件系统的,一种是基于 ZooKeeper 的。

基于文件系统的主备切换机制,需要在 Active Master 挂掉之后手动切换到 Standby Master 上;

而基于 Zookeeper 的主备切换机制,可以实现自动切换 Master。

RDD的缓存级别都有哪些

NONE :什么类型都不是
DISK_ONLY:磁盘
DISK_ONLY_2:磁盘;双副本
MEMORY_ONLY: 内存;反序列化;把RDD作为反序列化的方式存储,假如RDD的内容存不下,剩余的分区在以后需要时会重新计算,不会刷到磁盘上。
MEMORY_ONLY_2:内存;反序列化;双副本
MEMORY_ONLY_SER:内存;序列化;这种序列化方式,每一个partition以字节数据存储,好处是能带来更好的空间存储,但CPU耗费高
MEMORY_ONLY_SER_2 : 内存;序列化;双副本
MEMORY_AND_DISK:内存 + 磁盘;反序列化;双副本;RDD以反序列化的方式存内存,假如RDD的内容存不下,剩余的会存到磁盘
MEMORY_AND_DISK_2 : 内存 + 磁盘;反序列化;双副本
MEMORY_AND_DISK_SER:内存 + 磁盘;序列化
MEMORY_AND_DISK_SER_2:内存 + 磁盘;序列化;双副本

讲一下spark的几种部署方式

目前,除了local模式为本地调试模式以为, Spark支持三种分布式部署方式,分别是standalone、spark on mesos和 spark on YARN

  • Standalone模式即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。从一定程度上说,该模式是其他两种的基础。目前Spark在standalone模式下是没有任何单点故障问题的,这是借助zookeeper实现的,思想类似于Hbase master单点故障解决方案。将Spark standalone与MapReduce比较,会发现它们两个在架构上是完全一致的:- 都是由master/slaves服务组成的,且起初master均存在单点故障,后来均通过zookeeper解决(Apache MRv1的JobTracker仍存在单点问题,但CDH版本得到了解决);- 各个节点上的资源被抽象成粗粒度的slot,有多少slot就能同时运行多少task。不同的是,MapReduce将slot分为map slot和reduce slot,它们分别只能供Map Task和Reduce Task使用,而不能共享,这是MapReduce资源利率低效的原因之一,而Spark则更优化一些,它不区分slot类型,只有一种slot,可以供各种类型的Task使用,这种方式可以提高资源利用率,但是不够灵活,不能为不同类型的Task定制slot资源。总之,这两种方式各有优缺点。
  • Spark On YARN模式****spark on yarn 的支持两种模式:- yarn-cluster:适用于生产环境;- yarn-client:适用于交互、调试,希望立即看到app的输出yarn-cluster和yarn-client的区别在于yarn appMaster,每个yarn app实例有一个appMaster进程,是为app启动的第一个container;负责从ResourceManager请求资源,获取到资源后,告诉NodeManager为其启动container。yarn-cluster和yarn-client模式内部实现还是有很大的区别。如果你需要用于生产环境,那么请选择yarn-cluster;而如果你仅仅是Debug程序,可以选择yarn-client。
  • Spark On Mesos模式Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。目前在Spark On Mesos环境中,用户可选择两种调度模式之一运行自己的应用程序- 粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。- 细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。

spark on yarn 模式下的 cluster模式和 client模式有什么区别

  1. yarn-cluster 适用于生产环境。而 yarn-client 适用于交互和调试,也就是希望快速地看到 application 的输出.
  2. yarn-cluster 和 yarn-client 模式的区别其实就是 Application Master 进程的区别,yarn-cluster 模式下,driver 运行在 AM(Application Master)中,它负责向 YARN 申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉 Client,作业会继续在 YARN 上运行。然而 yarn-cluster 模式不适合运行交互类型的作业。而 yarn-client 模式下,Application Master 仅仅向 YARN 请求 executor,Client 会和请求的container 通信来调度他们工作,也就是说 Client 不能离开。

spark2.0为什么放弃了akka 而用netty

  1. 很多Spark用户也使用Akka,但是由于Akka不同版本之间无法互相通信,这就要求用户必须使用跟Spark完全一样的Akka版本,导致用户无法升级Akka。
  2. Spark的Akka配置是针对Spark自身来调优的,可能跟用户自己代码中的Akka配置冲突。
  3. Spark用的Akka特性很少,这部分特性很容易自己实现。同时,这部分代码量相比Akka来说少很多,debug比较容易。如果遇到什么bug,也可以自己马上fix,不需要等Akka上游发布新版本。而且,Spark升级Akka本身又因为第一点会强制要求用户升级他们使用的Akka,对于某些用户来说是不现实的。

spark的各种HA, master/worker/executor的ha

  • Master异常spark可以在集群运行时启动一个或多个standby Master,当 Master 出现异常时,会根据规则启动某个standby master接管,在standlone模式下有如下几种配置- ZOOKEEPER集群数据持久化到zk中,当master出现异常时,zk通过选举机制选出新的master,新的master接管是需要从zk获取持久化信息- FILESYSTEM集群元数据信息持久化到本地文件系统, 当master出现异常时,只需要在该机器上重新启动master,启动后新的master获取持久化信息并根据这些信息恢复集群状态- CUSTOM自定义恢复方式,对 standloneRecoveryModeFactory 抽象类 进行实现并把该类配置到系统中,当master出现异常时,会根据用户自定义行为恢复集群- None不持久化集群的元数据, 当 master出现异常时, 新启动的Master 不进行恢复集群状态,而是直接接管集群

  • Worker异常Worker 以定时发送心跳给 Master, 让 Master 知道 Worker 的实时状态,当worker出现超时时,Master 调用 timeOutDeadWorker 方法进行处理,在处理时根据 Worker 运行的是 Executor 和 Driver 分别进行处理- 如果是Executor, Master先把该 Worker 上运行的Executor 发送信息ExecutorUpdate给对应的Driver,告知Executor已经丢失,同时把这些Executor从其应用程序列表删除, 另外, 相关Executor的异常也需要处理- 如果是Driver, 则判断是否设置重新启动,如果需要,则调用Master.shedule方法进行调度,分配合适节点重启Driver, 如果不需要重启, 则删除该应用程序

  • Executor异常1. Executor发生异常时由ExecutorRunner捕获该异常并发送ExecutorStateChanged信息给Worker2. Worker接收到消息时, 在Worker的 handleExecutorStateChanged 方法中, 根据Executor状态进行信息更新,同时把Executor状态发送给Master3. Master在接受Executor状态变化消息之后,如果发现其是异常退出,会尝试可用的Worker节点去启动Executor

spark的内存管理机制

spark的内存结构分为3大块:storage/execution/系统自留

  • storage 内存:用于缓存 RDD、展开 partition、存放 Direct Task Result、存放广播变量。在 Spark Streaming receiver 模式中,也用来存放每个 batch 的 blocks
  • execution 内存:用于 shuffle、join、sort、aggregation 中的缓存、buffer
  • 系统自留:- 在 spark 运行过程中使用:比如序列化及反序列化使用的内存,各个对象、元数据、临时变量使用的内存,函数调用使用的堆栈等- 作为误差缓冲:由于 storage 和 execution 中有很多内存的使用是估算的,存在误差。当 storage 或 execution 内存使用超出其最大限制时,有这样一个安全的误差缓冲在可以大大减小 OOM 的概率

1.6版本以前的问题

  • 旧方案最大的问题是 storage 和 execution 的内存大小都是固定的,不可改变,即使 execution 有大量的空闲内存且 storage 内存不足,storage 也无法使用 execution 的内存,只能进行 spill,反之亦然。所以,在很多情况下存在资源浪费
  • 旧方案中,只有 execution 内存支持 off heap,storage 内存不支持 off heap

新方案的改进

  • 新方案 storage 和 execution 内存可以互相借用,当一方内存不足可以向另一方借用内存,提高了整体的资源利用率
  • 新方案中 execution 内存和 storage 内存均支持 off heap

spark中的广播变量

图片来源 /文字来源

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QEvJrIkw-1641869257567)(…/pictures/spark中的广播变量.png)]

顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去。这样的场景很多,比如 driver 上有一张表,其他节点上运行的 task 需要 lookup 这张表,那么 driver 可以先把这张表 copy 到这些节点,这样 task 就可以在本地查表了。如何实现一个可靠高效的 broadcast 机制是一个有挑战性的问题。先看看 Spark 官网上的一段话:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

问题:为什么只能 broadcast 只读的变量?

这就涉及一致性的问题,如果变量可以被更新,那么一旦变量被某个节点更新,其他节点要不要一块更新?如果多个节点同时在更新,更新顺序是什么?怎么做同步?还会涉及 fault-tolerance 的问题。为了避免维护数据一致性问题,Spark 目前只支持 broadcast 只读变量。

问题:broadcast 到节点而不是 broadcast 到每个 task?

因为每个 task 是一个线程,而且同在一个进程运行 tasks 都属于同一个 application。因此每个节点(executor)上放一份就可以被所有 task 共享。

问题: 具体怎么用 broadcast?

driver program 例子:

val data = List(1, 2, 3, 4, 5, 6)
val bdata = sc.broadcast(data)

val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)

driver 使用

sc.broadcast()

声明要 broadcast 的 data,bdata 的类型是 Broadcast。

rdd.transformation(func)

需要用 bdata 时,直接在 func 中调用,比如上面的例子中的 map() 就使用了 bdata.value.size。

问题:怎么实现 broadcast?

broadcast 的实现机制很有意思:

1. 分发 task 的时候先分发 bdata 的元信息

Driver 先建一个本地文件夹用以存放需要 broadcast 的 data,并启动一个可以访问该文件夹的 HttpServer。当调用

val bdata = sc.broadcast(data)

时就把 data 写入文件夹,同时写入 driver 自己的 blockManger 中(StorageLevel 为内存+磁盘),获得一个 blockId,类型为 BroadcastBlockId。当调用

rdd.transformation(func)

时,如果 func 用到了 bdata,那么 driver submitTask() 的时候会将 bdata 一同 func 进行序列化得到 serialized task,注意序列化的时候不会序列化 bdata 中包含的 data。上一章讲到 serialized task 从 driverActor 传递到 executor 时使用 Akka 的传消息机制,消息不能太大,而实际的 data 可能很大,所以这时候还不能 broadcast data。

driver 为什么会同时将 data 放到磁盘和 blockManager 里面?放到磁盘是为了让 HttpServer 访问到,放到 blockManager 是为了让 driver program 自身使用 bdata 时方便(其实我觉得不放到 blockManger 里面也行)。

那么什么时候传送真正的 data?在 executor 反序列化 task 的时候,会同时反序列化 task 中的 bdata 对象,这时候会调用 bdata 的 readObject() 方法。该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,如果不在就使用下面的两种 fetch 方式之一去将 data fetch 过来。得到 data 后,将其存放到 blockManager 里面,这样后面运行的 task 如果需要 bdata 就不需要再去 fetch data 了。如果在,就直接拿来用了。

下面探讨 broadcast data 时候的两种实现方式:

2. HttpBroadcast

顾名思义,HttpBroadcast 就是每个 executor 通过的 http 协议连接 driver 并从 driver 那里 fetch data。

Driver 先准备好要 broadcast 的 data,调用

sc.broadcast(data)

后会调用工厂方法建立一个 HttpBroadcast 对象。该对象做的第一件事就是将 data 存到 driver 的 blockManager 里面,StorageLevel 为内存+磁盘,blockId 类型为 BroadcastBlockId。

同时 driver 也会将 broadcast 的 data 写到本地磁盘,例如写入后得到

/var/folders/87/grpn1_fn4xq5wdqmxk31v0l00000gp/T/spark-6233b09c-3c72-4a4d-832b-6c0791d0eb9c/broadcast_0

, 这个文件夹作为 HttpServer 的文件目录。

Driver 和 executor 启动的时候,都会生成 broadcastManager 对象,调用 HttpBroadcast.initialize(),driver 会在本地建立一个临时目录用来存放 broadcast 的 data,并启动可以访问该目录的 httpServer。

Fetch data:在 executor 反序列化 task 的时候,会同时反序列化 task 中的 bdata 对象,这时候会调用 bdata 的 readObject() 方法。该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,如果不在就使用 http 协议连接 driver 上的 httpServer,将 data fetch 过来。得到 data 后,将其存放到 blockManager 里面,这样后面运行的 task 如果需要 bdata 就不需要再去 fetch data 了。如果在,就直接拿来用了。

HttpBroadcast 最大的问题就是 driver 所在的节点可能会出现网络拥堵,因为 worker 上的 executor 都会去 driver 那里 fetch 数据。

3. TorrentBroadcast

为了解决 HttpBroadast 中 driver 单点网络瓶颈的问题,Spark 又设计了一种 broadcast 的方法称为 TorrentBroadcast,这个类似于大家常用的 BitTorrent 技术。基本思想就是将 data 分块成 data blocks,然后假设有 executor fetch 到了一些 data blocks,那么这个 executor 就可以被当作 data server 了,随着 fetch 的 executor 越来越多,有更多的 data server 加入,data 就很快能传播到全部的 executor 那里去了。

HttpBroadcast 是通过传统的 http 协议和 httpServer 去传 data,在 TorrentBroadcast 里面使用在上一章介绍的 blockManager.getRemote() => NIO ConnectionManager 传数据的方法来传递,读取数据的过程与读取 cached rdd 的方式类似,可以参阅 CacheAndCheckpoint 中的最后一张图。

下面讨论 TorrentBroadcast 的一些细节:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tQsYRSJQ-1641869257567)(https://github.com/JerryLead/SparkInternals/raw/master/markdown/PNGfigures/TorrentBroadcast.png)]

driver 端:

Driver 先把 data 序列化到 byteArray,然后切割成 BLOCK_SIZE(由

spark.broadcast.blockSize = 4MB

设置)大小的 data block,每个 data block 被 TorrentBlock 对象持有。切割完 byteArray 后,会将其回收,因此内存消耗虽然可以达到 2 * Size(data),但这是暂时的。

完成分块切割后,就将分块信息(称为 meta 信息)存放到 driver 自己的 blockManager 里面,StorageLevel 为内存+磁盘,同时会通知 driver 自己的 blockManagerMaster 说 meta 信息已经存放好。通知 blockManagerMaster 这一步很重要,因为 blockManagerMaster 可以被 driver 和所有 executor 访问到,信息被存放到 blockManagerMaster 就变成了全局信息。

之后将每个分块 data block 存放到 driver 的 blockManager 里面,StorageLevel 为内存+磁盘。存放后仍然通知 blockManagerMaster 说 blocks 已经存放好。到这一步,driver 的任务已经完成。

Executor 端:

executor 收到 serialized task 后,先反序列化 task,这时候会反序列化 serialized task 中包含的 bdata 类型是 TorrentBroadcast,也就是去调用 TorrentBroadcast.readObject()。这个方法首先得到 bdata 对象,然后发现 bdata 里面没有包含实际的 data。怎么办?先询问所在的 executor 里的 blockManager 是会否包含 data(通过查询 data 的 broadcastId),包含就直接从本地 blockManager 读取 data。否则,就通过本地 blockManager 去连接 driver 的 blockManagerMaster 获取 data 分块的 meta 信息,获取信息后,就开始了 BT 过程。

BT 过程:task 先在本地开一个数组用于存放将要 fetch 过来的 data blocks

arrayOfBlocks = new Array[TorrentBlock](totalBlocks)

,TorrentBlock 是对 data block 的包装。然后打乱要 fetch 的 data blocks 的顺序,比如如果 data block 共有 5 个,那么打乱后的 fetch 顺序可能是 3-1-2-4-5。然后按照打乱后的顺序去 fetch 一个个 data block。fetch 的过程就是通过 “本地 blockManager -本地 connectionManager-driver/executor 的 connectionManager-driver/executor 的 blockManager-data” 得到 data,这个过程与 fetch cached rdd 类似。每 fetch 到一个 block 就将其存放到 executor 的 blockManager 里面,同时通知 driver 上的 blockManagerMaster 说该 data block 多了一个存储地址。这一步通知非常重要,意味着 blockManagerMaster 知道 data block 现在在 cluster 中有多份,下一个不同节点上的 task 再去 fetch 这个 data block 的时候,可以有两个选择了,而且会随机选择一个去 fetch。这个过程持续下去就是 BT 协议,随着下载的客户端越来越多,data block 服务器也越来越多,就变成 p2p下载了。关于 BT 协议,Wikipedia 上有一个动画。

整个 fetch 过程结束后,task 会开一个大 Array[Byte],大小为 data 的总大小,然后将 data block 都 copy 到这个 Array,然后对 Array 中 bytes 进行反序列化得到原始的 data,这个过程就是 driver 序列化 data 的反过程。

最后将 data 存放到 task 所在 executor 的 blockManager 里面,StorageLevel 为内存+磁盘。显然,这时候 data 在 blockManager 里存了两份,不过等全部 executor 都 fetch 结束,存储 data blocks 那份可以删掉了。

问题:broadcast RDD 会怎样?

@Andrew-Xia 回答道:不会怎样,就是这个rdd在每个executor中实例化一份。

Discussion

公共数据的 broadcast 是很实用的功能,在 Hadoop 中使用 DistributedCache,比如常用的

-libjars

就是使用 DistributedCache 来将 task 依赖的 jars 分发到每个 task 的工作目录。不过分发前 DistributedCache 要先将文件上传到 HDFS。这种方式的主要问题是资源浪费,如果某个节点上要运行来自同一 job 的 4 个 mapper,那么公共数据会在该节点上存在 4 份(每个 task 的工作目录会有一份)。但是通过 HDFS 进行 broadcast 的好处在于单点瓶颈不明显,因为公共 data 首先被分成多个 block,然后不同的 block 存放在不同的节点。这样,只要所有的 task 不是同时去同一个节点 fetch 同一个 block,网络拥塞不会很严重。

对于 Spark 来讲,broadcast 时考虑的不仅是如何将公共 data 分发下去的问题,还要考虑如何让同一节点上的 task 共享 data。

对于第一个问题,Spark 设计了两种 broadcast 的方式,传统存在单点瓶颈问题的 HttpBroadcast,和类似 BT 方式的 TorrentBroadcast。HttpBroadcast 使用传统的 client-server 形式的 HttpServer 来传递真正的 data,而 TorrentBroadcast 使用 blockManager 自带的 NIO 通信方式来传递 data。TorrentBroadcast 存在的问题是慢启动占内存,慢启动指的是刚开始 data 只在 driver 上有,要等 executors fetch 很多轮 data block 后,data server 才会变得可观,后面的 fetch 速度才会变快。executor 所占内存的在 fetch 完 data blocks 后进行反序列化时需要将近两倍 data size 的内存消耗。不管哪一种方式,driver 在分块时会有两倍 data size 的内存消耗。

对于第二个问题,每个 executor 都包含一个 blockManager 用来管理存放在 executor 里的数据,将公共数据存放在 blockManager 中(StorageLevel 为内存+磁盘),可以保证在 executor 执行的 tasks 能够共享 data。

其实 Spark 之前还尝试了一种称为 TreeBroadcast 的机制,详情可以见技术报告 Performance and Scalability of Broadcast in Spark。

更深入点,broadcast 可以用多播协议来做,不过多播使用 UDP,不是可靠的,仍然需要应用层的设计一些可靠性保障机制。

什么是数据倾斜,怎样去处理数据倾斜

数据倾斜是一种很常见的问题(依据二八定律),简单来说,比方WordCount中某个Key对应的数据量非常大的话,就会产生数据倾斜,导致两个后果:

  • OOM(单或少数的节点);
  • 拖慢整个Job执行时间(其他已经完成的节点都在等这个还在做的节点)

数据倾斜主要分为两类: 聚合倾斜 和 join倾斜

  • 聚合倾斜- 双重聚合(局部聚合+全局聚合)****场景: 对RDD进行reduceByKey等聚合类shuffle算子,SparkSQL的groupBy做分组聚合这两种情况 思路:首先通过map给每个key打上n以内的随机数的前缀并进行局部聚合,即(hello, 1) (hello, 1) (hello, 1) (hello, 1)变为(1_hello, 1) (1_hello, 1) (2_hello, 1),并进行reduceByKey的局部聚合,然后再次map将key的前缀随机数去掉再次进行全局聚合;原理: 对原本相同的key进行随机数附加,变成不同key,让原本一个task处理的数据分摊到多个task做局部聚合,规避单task数据过量。之后再去随机前缀进行全局聚合; 优点:效果非常好(对聚合类Shuffle操作的倾斜问题); 缺点:范围窄(仅适用于聚合类的Shuffle操作,join类的Shuffle还需其它方案)
  • join倾斜- 将reduce join转为map join****场景: 对RDD或Spark SQL使用join类操作或语句,且join操作的RDD或表比较小(百兆或1,2G); 思路:使用broadcast和map类算子实现join的功能替代原本的join,彻底规避shuffle。对较小RDD直接collect到内存,并创建broadcast变量;并对另外一个RDD执行map类算子,在该算子的函数中,从broadcast变量(collect出的较小RDD)与当前RDD中的每条数据依次比对key,相同的key执行你需要方式的join;原理: 若RDD较小,可采用广播小的RDD,并对大的RDD进行map,来实现与join同样的效果。简而言之,用broadcast-map代替join,规避join带来的shuffle(无Shuffle无倾斜); 优点:效果很好(对join操作导致的倾斜),根治;缺点:适用场景小(大表+小表),广播(driver和executor节点都会驻留小表数据)小表也耗内存- 采样倾斜key并分拆join操作****场景: 两个较大的(无法采用方案五)RDD/Hive表进行join时,且一个RDD/Hive表中少数key数据量过大,另一个RDD/Hive表的key分布较均匀(RDD中两者之一有一个更倾斜);思路:1. 对更倾斜rdd1进行采样(RDD.sample)并统计出数据量最大的几个key;2. 对这几个倾斜的key从原本rdd1中拆出形成一个单独的rdd1_1,并打上0n的随机数前缀,被拆分的原rdd1的另一部分(不包含倾斜key)又形成一个新rdd1_2;3. 对rdd2过滤出rdd1倾斜的key,得到rdd2_1,并将其中每条数据扩n倍,对每条数据按顺序附加0n的前缀,被拆分出key的rdd2也独立形成另一个rdd2_2; 【个人认为,这里扩了n倍,最后union完还需要将每个倾斜key对应的value减去(n-1)】4. 将加了随机前缀的rdd1_1和rdd2_1进行join(此时原本倾斜的key被打散n份并被分散到更多的task中进行join); 【个人认为,这里应该做两次join,两次join中间有一个map去前缀】5. 另外两个普通的RDD(rdd1_2、rdd2_2)照常join;6. 最后将两次join的结果用union结合得到最终的join结果。 原理:对join导致的倾斜是因为某几个key,可将原本RDD中的倾斜key拆分出原RDD得到新RDD,并以加随机前缀的方式打散n份做join,将倾斜key对应的大量数据分摊到更多task上来规避倾斜;优点: 前提是join导致的倾斜(某几个key倾斜),避免占用过多内存(只需对少数倾斜key扩容n倍);缺点: 对过多倾斜key不适用。- 用随机前缀和扩容RDD进行join****场景: RDD中有大量key导致倾斜; 思路:与方案六类似。1. 查看RDD/Hive表中数据分布并找到造成倾斜的RDD/表;2. 对倾斜RDD中的每条数据打上n以内的随机数前缀;3. 对另外一个正常RDD的每条数据扩容n倍,扩容出的每条数据依次打上0到n的前缀;4. 对处理后的两个RDD进行join。原理: 与方案六只有唯一不同在于这里对不倾斜RDD中所有数据进行扩大n倍,而不是找出倾斜key进行扩容;优点: 对join类的数据倾斜都可处理,效果非常显著;缺点: 缓解,扩容需要大内存

分析一下一段spark代码中哪些部分在Driver端执行,哪些部分在Worker端执行

Driver Program是用户编写的提交给Spark集群执行的application,它包含两部分

  • 作为驱动: Driver与Master、Worker协作完成application进程的启动、DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等。
  • 计算逻辑本身,当计算任务在Worker执行时,执行计算逻辑完成application的计算任务

一般来说transformation算子均是在worker上执行的,其他类型的代码在driver端执行

三种Writer的分类

上面是使用哪种 writer 的判断依据, 是否开启 mapSideCombine 这个判断,是因为有些算子会在 map 端先进行一次 combine, 减少传输数据。 因为 BypassMergeSortShuffleWriter 会临时输出Reducer个(分区数目)小文件,所以分区数必须要小于一个阀值,默认是小于200

UnsafeShuffleWriter需要Serializer支持relocation,Serializer支持relocation:原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要Serializer支持relocation,在指定位置读取对应数据

标签: 大数据 spark hadoop

本文转载自: https://blog.csdn.net/jankin6/article/details/122426567
版权归原作者 孙中明 所有, 如有侵权,请联系我们删除。

“大数据面试题&mdash;&mdash;spark”的评论:

还没有评论