Spark 有哪些核心组件
master&worker:(spark独立部署模式里的概念):
master是一个进程,主要负责资源的调度和分配,进行集群的监控,类似于yarn的RM。
worker也是一个进程,一个Worker运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算,类似于yarn中的NM。
Driver&Executor:
Driver是Spark驱动器节点,用于执行spark任务中的main方法,负责实际代码的执行工作。
- 将用户程序转化为作业(job);
- 在Executor之间调度任务(task);
- 跟踪Executor的执行情况;
- 通过UI展示查询运行情况。
Executor是集群工作节点(Worker)中的一个JVM进程,负责在Spark作业中运行具体的任务,任务彼此间相互独立。如果有Executor节点发生了故障,spark应用也可继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。
- 负责运行task,并将结果返回给驱动器进程;
- 通过自身的块管理器(Block Manager)为RDD的缓存提供内存存储。
ApplicationMaster(spark on yarn):
Hadoop用户向YARN集群提交应用程序时,提交程序包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,运行job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。实现了资源(RM)和计算(driver)之间的解耦合。
spark context:控制整个application的生命周期,包括dagsheduler和task scheduler等组件。
client:用户提交程序的入口。
Spark任务执行流程
1.构建Spark Application的运行环境(启动SparkContext)
2.SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
3.资源管理器分配Executor资源,Executor运行情况将随着心跳发送到资源管理器上;
4.SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler
5.Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行,SparkContext将应用程序代码发放给Executor。
6.Task在Executor上运行,运行完毕释放所有资源。
简述Spark的作业提交流程
YarnClient 运行模式介绍:
1.client向ResouceManager申请启动ApplicationMaster,同时在SparkContext初始化中创建DAGScheduler和TaskScheduler
2.ResouceManager收到请求后分配container,在合适的NodeManager中启动ApplicationMaster
3.Dirver中的SparkContext初始化完成后与ApplicationMaster建立通讯,ApplicationMaster向ResourceManager申请Application的资源
4.一旦ApplicationMaster申请到资源,便与之对应的NodeManager通讯,启动Executor,并把Executor信息反向注册给Dirver
5.Dirver分发task,并监控Executor的运行状态,负责重试失败的task
6.运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己
YarnCluster 模式介绍:
1.任务提交后会和ResourceManager通讯申请启动ApplicationMaster
2.ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。
3.Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册
4.Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。
Yarn-client和Yarn-cluster的区别:
yarn-cluster模式下,Dirver运行在ApplicationMaster中,负责申请资源并监控task运行状态和重试失败的task,当用户提交了作业之后就可以关掉client,作业会继续在yarn中运行;
yarn-client模式下,Dirver运行在本地客户端,client不能离开。
Spark作业提交参数
executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我 们企业是4个//8
num-executors —— 启动executors的数量,默认为2//10
executor-memory —— executor内存大小,默认1G//15g
driver-cores —— driver使用内核数,默认为1
driver-memory —— driver内存大小,默认512M//30g
Spark为什么比mapreduce快
1.基于内存计算,减少低效的磁盘交互;
2.基于DAG的高效的调度算法;
3.容错机制Linege:数据丢失或者出错,可以根据血缘进行数据重建。
spark相对于mapreduce的特点:
减少磁盘I/O:MR 会把map端中间结果输出和结果存储在磁盘中,reduce端又需要从磁盘读取中间结果,造成磁盘I/O瓶颈,而Spark允许将map端的中间输出和结果存储在内存中,reduce从中间结果拉取,避免了大量的磁盘I/O
增加并行度 :由于把中间结果写入磁盘与从磁盘读取中间结果属于不同的环境,hadoop简单的通过串行执行链接起来,而Spark则把不同的环节抽象成Stage,允许多个Stage既可以串行又可以并行执行
MapReduce 默认是排序的,spark 默认不排序,除非使用 sortByKey 算子。
spark中的RDD是什么,有哪些特性?
RDD是spark提供的核心抽象,全称为弹性分布式数据集,在逻辑上是一个hdfs文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让RDD中的数据可以被并行操作。RDD的数据默认存放在内存中,但是当内存资源不足时,spark会自动将RDD数据写入磁盘。
五大特性:
(1)A list of partitions
一个分区列表,RDD中的数据都存在一个分区列表里面
(2)A function for computing each split
作用在每一个分区中的函数
(3)A list of dependencies on other RDDs
一个RDD依赖于其他多个RDD,这个点很重要,RDD的容错机制就是依据这个特性而来的
(4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
可选的,针对于kv类型的RDD才具有这个特性,作用是决定了数据的来源以及数据处理后的去向
(5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
可选项,数据本地性,数据位置最优
RDD的弹性体现在哪里
1.可以根据数据量的大小,通过repartition、coalesce算子来增加或者减少分区数,进而决定Task数的多少。
2.对应的计算资源调整,可以通过提交任务时的参数来调整,也可以设置成动态调整方式。
3.自动进行内存和磁盘之间权衡和切换的机制
spark中的宽窄依赖:
窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖;
宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)
对于窄依赖: 窄依赖的多个分区可以并行计算; 窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。
对于宽依赖: 划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。
区分这两种依赖很有用,首先,窄依赖允许在一个集群节点上以流水线的方式计算所有父分区,
而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。
第二窄依赖能够更有效地进行失效节点的恢复,即只需要重新计算丢失分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。
DAG概念
DAG(Directed Acyclic Graph 有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程); 原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。
DAG 中为什么要划分 Stage
并行计算。
一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。
spark中如何划分stage:
根据DAG有向无环图进行划分,从当前job的最后一个算子往前推,遇到宽依赖,那么当前在这个批次中的所有算子操作都划分成一个stage,然后继续按照这种方式在继续往前推,如在遇到宽依赖,又划分成一个stage,一直到最前面的一个算子。最后整个job会被划分成多个stage,而stage之间又存在依赖关系,后面的stage依赖于前面的stage。
DAG 划分为 Stage 的算法了解吗
核心算法:回溯算法
从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分。
Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推,首先会为最后一个 RDD 创建一个 Stage,然后继续倒推,如果发现对某个 RDD 是宽依赖,那么就会将宽依赖的那个 RDD 创建一个新的 Stage,那个 RDD 就是新的 Stage 的最后一个 RDD。然后依次类推,继续倒推,根据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全部遍历完成为止
rdd.df.ds区别
1.RDD不支持sparkSQL操作,DF和DS支持sparkSQL
2.DF每一行类型固定为Row,只有通过解析才能获取值。
3.DS每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获 得 每一行的信息
4.DF和DS支持方便地保存文件格式,可以直接指定
引起Shuffle的算子有哪些?
spark中会导致shuffle操作的有以下几种算子:
1、repartition类的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等;
2、byKey类的操作:比如reduceByKey、groupByKey、sortByKey等;
3、join类的操作:比如join、cogroup等。
什么时候会产生spark shuffle?
- repartition类的操作:repartition、coalesce、repartitionAndSortWithinPartitions(分组然后排序)
- bykey:reducebykey、groupbykey、sortbykey
- join:join、cogroup(计算区内元素个数)
常用的行动算子与转换算子
Action算子:reduce(func)、collect()、count()、first()、take(n)、takeSample(withReplacement, num, [seed])、aggregate ()、saveAsTextFile(path)、foreach(func)、countByKey()
transformation算子:
一、Value类型
map()映射、flatMap()扁平化、groupBy()分组、filter()过滤、distinct()去重、sortBy()排序
二、key_Value类型
mapValues()只对V进行操作
groupByKey()按照K重新分组
reduceByKey()按照K聚合V
sortByKey()按照K进行排序
行动算子与转换算子的区别
转换算子(transformations):
转换算子会从一个已经存在的数据集(RDD)中生成一个新的数据集(RDD),比如map就是一个转换算子,它通过映射关系从一个RDD生成了一个新的RDD。
行动算子(actions):
行动算子在进行数据集计算后会给driver程序返回一个值。
转换算子和行动算子最大的区别:
转换算子返回一个数据集而行动算子返回一个具体值,如reduce算子是行动算子 而 reducebykey是转换算子;
同时由于spark的惰性求值特性,所有的转换算子是不会立即计算结果的,转换算子只记录它应用的数据集,在行动算子需要给drive返回数据时转换算子才会去计算结果。(这个设计能让spark运行效率更高)
RDD的操作分类
转换(transformations) :从已经存在的数据集中创建一个新的数据集,会创建一个新的RDD,例如map操作,会把数据集的每个元素传给函数处理,并生成一个新的RDD,常见如:Map,Filter,FlatMap,GroupByKey,ReduceByKey,Join,Sort,PartionBy
动作(actions) :在数据集上进行计算之后返回一个值到驱动程序,例如reduce动作,使用函数聚合RDD所有元素,并将结果返回给驱动程序,常见有:Collect,Reduce,Save,Lookup
map与flatMap的区别
map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象
flatMap:对RDD每个元素转换,然后再扁平化,将所有的对象合并为一个对象,会抛弃值为null的值
累加器和广播变量
累加器和广播变量属于共享变量,累加器是只写变量,广播变量是只读变量。
累加器:
原理:累加器用来把Executor端变量信息聚合到Driver端,在driver程序中定义的变量,在Executor端的每个task都会得到一份新的副本,每个task更新这些副本的值后,传回driver端进行合并。
用途:累加器的常见用途是在调试时对作业执行的过程中的事件进行计数。例如:统计 100 内的偶数的个数
用法:
通过调用 SparkContext 的 accumulator(initiaValue) 方法来创建累加器 ac
在 scala 中通过 += 来更改 ac(java 中通过 add 来修改)
使用 ac.value 来访问累加器的值
广播变量:
用途:
当多个 Executor 中的多个 Task 操作需要使用(读取)同一个很大变量时,如果我们采取常规方式把该变量发送到每一个 task 中,那么会极大地浪费性能,所以我们可以直接把该变量发送到每一个 Executor 上,Executor 上对应的 Task 可以共同访问该变量,这样就可以提高性能。
scala>val sourceRDD = sc.makeRDD(1 to 100,3)
sourceRDD: org.apache.spark.rdd.RDD[Int]= ParallelCollectionRDD[0] at makeRDD at :24//1. 创建累加器
scala>val accumulator = sc.accumulator(0)//2. 修改累加器的值
scala>val test = sourceRDD.map(x =>{if(x %2==0) accumulator +=1})//3. 访问累加器
scala> println(accumulator.value)50
Spark容错机制
- 血缘容错:血缘容错记录了较粗粒度的操作:例如filter、map、join,当rdd的部分分区数据丢失的时候,可以通过血缘来重新运算以及恢复丢失的分区。
- checkpoint机制:如果是窄依赖:只要把丢失的父依赖的分区重新计算即可;但是是宽依赖:需要恢复父依赖的分区并且重新计算,开销会大。 因此有了checkpoint机制:将内存的变化持久化到磁盘持久存储,可以把RDD保存在hdfs的namenode中元数据edit log中并刷新到磁盘fsimage,斩断所需的依赖链,如果没有才往前追溯。
- 持久化机制: cache机制:将RDD的结果写入内存,运行后缓存自动消失; persist机制:将结果写入磁盘。
sparkbatchsize中小文件问题
1.窄依赖计算结果会出现大量小文件,因此采用coalese方法和repartition方法最后返回一个特定分区的RDD;
2. 降低spark的并行度,生成的文件就会少一些;
3. 新增一个任务专门合并小文件。
Checkpoint 检查点机制
checkpoint是安全可靠、不保留RDD血统的持久化方式,checkpoint 的数据通常是保存在高可用的文件系统中,比如 HDFS 中,所以数据丢失可能性比较低,Checkpoint 首先会调用 SparkContext 的 setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说 HDFS;然后对 RDD 调用 checkpoint()方法。之后在 RDD 所处的 job 运行结束之后,会启动一个单独的 job,来将 checkpoint 过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类持久化操作
RDD 持久化原理
spark非常重要的一个功能特性就是可以将RDD持久化在内存中。
调用cache()和persist()方法即可。cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用persist()的无参版本persist(MEMORY_ONLY),将数据持久化到内存中。
如果需要从内存中清除缓存,可以使用unpersist()方法。RDD持久化是可以手动选择不同的策略的。在调用persist()时传入对应的StorageLevel即可。
checkpoint和持久化机制的区别?
最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的lineage就改变了。
持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低
Repartition和Coalesce 的关系与区别,能简单说说吗?
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
区别:
repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。
persist和cache的区别
(1)cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间;
(2)cache只有一个默认的缓存级别MEMORY_ONLY ,cache调用了persist,而persist可以根据情况设置其它的缓存级别;
Spark有哪几种存储级别
- MEMORY_ONLY:默认选项,RDD的(分区)数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不足,某些分区的数据将不会被缓存,需要在使用的时候根据世代信息重新计算。
- MEMORY_ONLY_SER:RDD的数据(Java对象)序列化之后存储于JVM的内存中(一个分区的数据为内存中的一个字节数组),相比于MEMORY_ONLY能够有效节约内存空间(特别是使用一个快速序列化工具的情况下),但读取数据时需要更多的CPU开销;如果内存空间不足,处理方式与MEMORY_ONLY相同。
- MYMORY_AND_DISK:RDD的数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不中,某些分区的数据会被存储至磁盘,使用的时候从磁盘读取。
- MEMORY_AND_DISK_SER:相比于MEMORY_ONLY_SER,在内存空间不足的情况下,将序列化之后的数据存储于磁盘。
- DISK_ONLY:仅仅使用磁盘存储RDD的数据(未经序列化)。
- xxx_2:以MEMORY_ONLY_2为例,MEMORY_ONLY_2相比于MEMORY_ONLY存储数据的方式是相同的,不同的是会将数据备份到集群中两个不同的节点,其余情况类似。
reduceByKey与groupByKey的区别
reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。
groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。
所以在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还可以防止使用groupByKey造成的内存溢出问题。
spark数据倾斜发生在什么阶段,怎么解决数据倾斜
Spark内存管理机制
Dstream基本工作原理
DStream是spark streaming提供的一种高级抽象,代表了一个持续不断的数据流。
DStream可以通过输入数据源来创建,比如Kafka、flume等,也可以通过其他DStream的高阶函数来创建,比如map、reduce、join和window等。
DStream内部其实不断产生RDD,每个RDD包含了一个时间段的数据。
Spark streaming一定是有一个输入的DStream接收数据,按照时间划分成一个一个的batch,并转化为一个RDD,RDD的数据是分散在各个子节点的partition中
Spark streaming以及基本工作原理?
Spark streaming内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成batch,比如每收集一秒的数据封装成一个batch,然后将每个batch交给spark的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的batch组成的。
Spark Streaming 整合 Kafka 的两种模式
Spark背压机制
背压机制:根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。主要是为了更好的协调数据接收速率与资源处理能力。启用反压机制很简单,只需要将 spark.streaming.backpressure.enabled 设置为 true 即可,这个参数的默认值为 false
Spark Streaming怎么实现精准一次消费
Spark Sreaming优雅关闭
计算节点不再接受新数据,而是将现有的数据处理完毕,然后再关闭。
具体实现:
另起一个线程:
new Thread({new Runnable{overridedef run():Unit={//mysql:增加一行记录,若查询表,记录存在,则执行关闭操作//redis: 在redis中设置一个标识,若满足要求,则进行关闭//zk: 设置/stopSpark 节点,若节点存在则进行关闭while(ture){//查询出状态,假设状态为true,执行以下操作if(true){//获取SparkStreaming状态val state : StreamingContextState = ssc.getState()if( state == StreamingContextState.ACTIVE){
ssc.stop(stopSparkContext =true, stopGracefully =true)}}
Thread.sleep(millis =5000)
System.exit(status =0)}}).start()
版权归原作者 探索者onex 所有, 如有侵权,请联系我们删除。