Spark基础
简述
1、spark作业执行的特点:
(1)只有遇到行动算子的时候,整个spark作业才会被触发执行
(2)遇到几次,执行几次
2、RDD: 弹性分布式数据集
弹性:数据量可大可小
RDD类似于容器,但是本身存储的不是数据,是计算逻辑
当遇到行动算子的时候,整个spark作业才会被触发执行,从第一个RDD开始执行,数据才开始产生流动
数据在RDD之间只是流动关系,不会存储
流动的数据量可以很大,也可以很小,所以称为弹性
分布式:
spark本质上它是需要从HDFS中读取数据的,HDFS是分布式,数据block块将来可能会在不同的datanode上
RDD中流动的数据,可能会来自不同的datanode中的block块数据
数据集:
计算流动过程中,可以短暂地将RDD看成一个容器,容器中有数据,默认情况下在内存中不会进行存储
后面会有办法将一个RDD的数据存储到磁盘中
RDD的5大特性:(面试必问!)
1、RDD是由一系列分区构成
1)读文件时的minPartitions参数只能决定最小分区数,实际读取文件后的RDD分区数,由数据内容本身以及集群的分布来共同决定的
2)若设置minPartitions的数量比block块数量还少的话,实际上以block块数量来决定分区数
3)产生shuffle的算子调用时,可以传入numPartitions(例如:groupby()),可以真正改变RDD的分区数,设置多少,最终RDD就有多少分区
4)文件会以block块的形式存储在HDFS上,若文件未达到128M默认值的话也会被一个block块存储。
一开始RDD中的分区数由读取数据的block块数量决定的。
后一个RDD中的分区数据,除KV函数以外,对应的是前一个RDD中的分区数据所进行逻辑处理后的结果。默认情况下,若后续分区不做处理的话,后续所有的RDD的分区数取决于第一个RDD。
最终RDD中有几个分区,将来在HDFS中就会看到几个结果文件(HDFS -> RDD -> HDFS)
2、算子是作用在每一个分区上的(每一个分区都会处理)
3、RDD与RDD之间存在一些依赖关系
1)窄依赖 前一个RDD中的某一个分区数据只会到后一个RDD中的某唯一分区中 一对一(也可能前多个分区到后一个分区中)的关系
2)宽依赖 前一个RDD中的某一个分区数据会进入到后一个RDD中的不同分区中 一对多的关系 也可以通过查看是否产生shuffle来判断
3)整个spark作业会被宽依赖的个数划分若干个stage, Num(stage) = Num(宽依赖) + 1
4)当遇到产生shuffle的算子的时候,涉及到从前一个RDD写数据到磁盘中,从磁盘中读取数据到后一个RDD的现象,
注意:第一次触发执行的时候,磁盘是没有数据的,所以会从第一个RDD产生开始执行
当重复触发相同的执行的时候,对于同一个DAG有向无环图而言,会直接从shuffle之后的RDD开始执行(省略从前一个RDD写数据到磁盘中的过程),可以直接从磁盘读取数据。
5)在一个阶段中,RDD有几个分区,就会有几个并行task任务
4、kv算子只能作用在kv的RDD上
5、spark会提供最优的任务计算方式,只移动计算,不移动数据。
Spark的设计原则之一是数据本地化(Data Locality),即尽量让计算任务在数据所在的节点上执行,从而减少数据的网络传输开销。
Spark实例:wordcount
object WordCount2{
def main(args:Array[String]):Unit={//创建spark配置文件对象
val conf:SparkConf=newSparkConf()//设置运行模式//如果是本地local模式运行的话,需要设置setMaster//将来如果是集群进行,将这句话注释即可
conf.setMaster("local")//设置spark作业的名字
conf.setAppName("wordcount")//创建spark core上下文环境对象
val sc:SparkContext=newSparkContext(conf)//===================================================================================//读取文件,每次读取一行//RDD是spark core中的核心数据结构,将来运行的时候,数据会在RDD之间流动,默认基于内存计算
val linesRDD:RDD[String]= sc.textFile("spark/data/wcs/*")// println(s"linesRDD的分区数:${linesRDD.getNumPartitions}")//一行数据根据分隔符分割
val wordRDD:RDD[String]= linesRDD.flatMap(_.split("\\|"))// println(s"wordRDD的分区数:${wordRDD.getNumPartitions}")//将每一个单词组成(word,1)
val kvRDD:RDD[(String,Int)]= wordRDD.map((_,1))println(s"kvRDD的分区数:${kvRDD.getNumPartitions}")//根据键进行分组,并设置分区数为 5
val kvRDD2:RDD[(String,Iterable[(String,Int)])]= kvRDD.groupBy(_._1,numPartitions =5)println(s"kvRDD2的分区数:${kvRDD2.getNumPartitions}")
val resRDD:RDD[(String,Int)]= kvRDD2.map((e:(String,Iterable[(String,Int)]))=>(e._1, e._2.size))println(s"resRDD的分区数:${resRDD.getNumPartitions}")//打印
resRDD2.foreach(println)//指定的是所要写入数据的文件夹的路径//spark如果是local本地运行的话,会将本地文件系统看作一个hdfs文件系统
resRDD.saveAsTextFile("spark/data/outdata1")}}
Spark中RDD调用的函数,称之为算子
算子分为两类:
1、转换算子(RDD -> RDD,处理逻辑)
2、行动算子(触发作业的执行)
1、转换算子
1)Map
importorg.apache.spark.rdd.RDDimportorg.apache.spark.{SparkConf,SparkContext}
object Demo1Map{
def main(args:Array[String]):Unit={
val conf =newSparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc:SparkContext=newSparkContext(conf)
val lineRDD:RDD[String]= sc.textFile("spark/data/students.txt")//map操作算子:将rdd中的数据依次取出,传递给后面函数逻辑,将计算后的数据返回到新的rdd中//将rdd中的数据依次取出,处理完的数据返回下一个rdd直接继续执行后续的逻辑
val rdd2:RDD[(String,String,String,String,String)]= lineRDD.map((line:String)=>{println("==============处理后的数据========================")
val array1:Array[String]= line.split(",")(array1(0),array1(1),array1(2),array1(3),array1(4))})//foreach是一个行动算子,遇到行动算子,触发作业执行/**
* 转换操作(转换算子中定义了操作逻辑)仅仅是定义了数据应该如何被转换,而不会立即执行。
* 只有当需要计算一个结果时(即调用行动算子时),才会执行。
* 打印结果:
* ==============处理后的数据========================
* (1500100001,施笑槐,22,女,文科六班)
* ==============处理后的数据========================
* (1500100002,吕金鹏,24,男,文科六班)
*每次调用行动算子(foreach)打印一条数据,都会是整个RDD重新执行一次(所有RDD的执行关系是一个有向无环图)
*/
rdd2.foreach(println)}}
2)filter
importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.spark.rdd.RDD
object Demo2Filter{
def main(args:Array[String]):Unit={
val conf =newSparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc:SparkContext=newSparkContext(conf)//===============================================================
val lineRDD:RDD[String]= sc.textFile("spark/data/students.txt")//需求:过滤出所有的男生//filter转换算子:将rdd中的数据依次取出,传递给后面的函数,跟map一样,也是依次传递一条// 若不匹配,则无数据在RDD间流动,在下面执行.foreach(println)时也无数据进行打印,// 但是判断中的println()属于scala,并不受影响
val genderRDD:RDD[String]= lineRDD.filter((line:String)=>{var b:Boolean=falseif("女".equals(line.split(",")(3))){println("============这是女生==================")}else{println("============这是男生==================")
b ="男".equals(line.split(",")(3))}
b
})
genderRDD.foreach(println)}}
3)flatMap
importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.spark.rdd.RDD
object Demo3FlatMap{
def main(args:Array[String]):Unit={
val conf =newSparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc:SparkContext=newSparkContext(conf)//===============================================================
val lineRDD:RDD[String]= sc.textFile("spark/data/wcs/words.txt")/**
* flatMap: 将rdd中的每一条数据传递给后面的函数,最终将返回的数组或者是序列进行扁平化,返回给新的集合
* 由于flatMap会“扁平化”结果,因此words RDD将包含所有分割后的单词,而不是单词数组(返回一个元素为单个单词的集合)。
* 打印结果:
* ===============一条数据====================
* hello
* world
* ===============一条数据====================
* java
* hadoop
* linux
*/
val rdd1:RDD[String]= lineRDD.flatMap((line:String)=>{println("===============一条数据====================")
line.split("\\|")})
rdd1.foreach(println)}}
4)sample
importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.spark.rdd.RDD
object Demo4Sample{
def main(args:Array[String]):Unit={
val conf =newSparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")
val sc:SparkContext=newSparkContext(conf)//===============================================================
val lineRDD:RDD[String]= sc.textFile("spark/data/students.txt")/**
* sample抽样,1000条数据,抽0.1比例,结果的数量在100左右
* 这个函数主要在机器学习的时候会用到
* withReplacement :
* 为True时,抽样结果中可能会包含重复的元素。
* 为False时,抽样结果中不会包含重复的元素。
* fraction:这是一个浮点数(Double),指定了抽样的比例,取值范围在[0, 1]之间。
*/
val rdd1:RDD[String]= lineRDD.sample(withReplacement =false, fraction =0.1)
rdd1.foreach(println)}}
5)groupBy
importorg.apache.spark.rdd.RDDimportorg.apache.spark.{SparkConf,SparkContext}
object Demo5GroupBy{
def main(args:Array[String]):Unit={
val conf:SparkConf=newSparkConf().setMaster("local").setAppName("groupBy")
val sc:SparkContext=newSparkContext(conf)//===================================================
val linesRDD:RDD[String]= sc.textFile("spark/data/students.txt")//求每个班级的平均年龄
val arrayRDD:RDD[Array[String]]= linesRDD.map((line:String)=> line.split(","))//像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
val clazzWithAgeRDD:RDD[(String,Int)]= arrayRDD.map {caseArray(_, _, age:String, _, clazz:String)=>(clazz, age.toInt)}/**
* groupBy算子的使用
*
* 1、groupBy的算子,后面的分组条件是我们自己指定的
* 2、spark中groupBy之后的,所有值会被封装到一个Iterable迭代器中存储(与scala中不同)
* 输出:
* (理科二班,22.556962025316455)
* (文科三班,22.680851063829788)
* (理科四班,22.63736263736264)
* (理科一班,22.333333333333332)
* (文科五班,22.30952380952381)
*/// val map: Map[String, List[Score]] = scoreList.groupBy((s: Score) => s.id)
val groupRDD:RDD[(String,Iterable[(String,Int)])]= clazzWithAgeRDD.groupBy(_._1)// groupRDD.foreach(println)
val resKvRDD:RDD[(String,Double)]= groupRDD.map((kv:(String,Iterable[(String,Int)]))=>{
val clazz:String= kv._1
val avgAge:Double= kv._2.map(_._2).sum.toDouble / kv._2.size
(clazz, avgAge)})
resKvRDD.foreach(println)// while (true){//// }}}

6)groupByKey
importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.spark.rdd.RDD
object Demo6GroupByKey{
def main(args:Array[String]):Unit={
val conf:SparkConf=newSparkConf().setMaster("local").setAppName("groupByKey")
val sc:SparkContext=newSparkContext(conf)//===================================================
val linesRDD:RDD[String]= sc.textFile("spark/data/students.txt")//求每个班级的平均年龄
val arrayRDD:RDD[Array[String]]= linesRDD.map((line:String)=> line.split(","))//像这种RDD中的元素是(key,value)类型的,我们将这种RDD称之为键值对RDD(kv格式RDD)
val clazzWithAgeRDD:RDD[(String,Int)]= arrayRDD.map {caseArray(_, _, age:String, _, clazz:String)=>(clazz, age.toInt)}/**
* GroupByKey属于kv格式的算子,只能作用在kv格式的RDD上
* 也就说,只有kv格式的RDD才能调用kv格式的算子
* 输出:
* (理科二班,22.556962025316455)
* (文科三班,22.680851063829788)
* (理科四班,22.63736263736264)
* (理科一班,22.333333333333332)
* (文科五班,22.30952380952381)
*/
val groupByKeyRDD:RDD[(String,Iterable[Int])]= clazzWithAgeRDD.groupByKey()
val resKvRDD2:RDD[(String,Double)]= groupByKeyRDD.map((kv:(String,Iterable[Int]))=>(kv._1, kv._2.sum.toDouble / kv._2.size))
resKvRDD2.foreach(println)/**
* 面试题:spark core中 groupBy算子与groupByKey算子的区别?
* 1、代码格式上:
* groupBy的分组条件可以自己指定,并且绝大部分的RDD都可以调用该算子,返回的是键和元素本身组成的迭代器构成的kv格式RDD
* groupByKey算子,只能由kv格式的RDD进行调用,分组的条件会自动根据键进行分组,不需要在自己指定,返回的是键和值组成的迭代器构成的kv格式RDD
*
* 2、执行shuffle数据量来看
* groupBy产生的shuffle数据量在一定程度上要大于groupByKey产生的shuffle数据量
* 所以groupByKey算子的执行效率要比groupBy算子的执行效率要高
*/while(true){}}}

7)reduceByKey
importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.spark.rdd.RDD
object Demo7ReduceByKey{
def main(args:Array[String]):Unit={
val conf:SparkConf=newSparkConf().setMaster("local").setAppName("reduceByKey")
val sc:SparkContext=newSparkContext(conf)//===================================================
val linesRDD:RDD[String]= sc.textFile("spark/data/score.txt")//求每个班级的平均年龄
val arrayRDD:RDD[Array[String]]= linesRDD.map((line:String)=> line.split(","))//分别使用groupByKey和reduceBykey计算每个学生的总分
val idWithScoreRDD:RDD[(String,Int)]= arrayRDD.map {caseArray(id:String, _, score:String)=>(id, score.toInt)}/**
* groupByKey实现
*/// val kvRDD1: RDD[(String, Iterable[Int])] = idWithScoreRDD.groupByKey()// val resRDD1: RDD[(String, Int)] = kvRDD1.map((kv: (String, Iterable[Int])) => (kv._1, kv._2.sum))// resRDD1.foreach(println)/**
* reduceByKey实现
* 输出:
* (1500100113,519)
* (1500100724,440)
* (1500100369,376)
* (1500100378,402)
* (1500100306,505)
* (1500100578,397)
*/
val resRDD2:RDD[(String,Int)]= idWithScoreRDD.reduceByKey((v1:Int, v2:Int)=> v1 + v2)
resRDD2.foreach(println)/**
* 面试题:
* groupByKey与reduceBykey的区别?
* 相同点:
* 它们都是kv格式的算子,只有kv格式的RDD才能调用
* 不同点:
* 1)groupByKey只是单纯地根据键进行分组,分组后的逻辑可以在后续的处理中调用其他的算子实现
* 2)reduceByKey 相当于MR中的预聚合,所以shuffle产生的数据量要比groupByKey中shuffle产生的数据量少,效率高,速度要快一些
* 3)groupByKey的灵活度要比reduceByKey灵活度要高,reduceBykey无法做一些复杂的操作,比如方差。但是groupByKey可以在分组之后的RDD进行方差操作
*/while(true){}}}

8)union
importorg.apache.spark.rdd.RDDimportorg.apache.spark.{SparkConf,SparkContext}
object Demo8Union{
def main(args:Array[String]):Unit={
val conf:SparkConf=newSparkConf().setMaster("local").setAppName("reduceByKey")
val sc:SparkContext=newSparkContext(conf)//===================================================//parallelize:将scala的集合变成spark中的RDD
val rdd1:RDD[(String,String)]= sc.parallelize(List(("1001","张三"),("1002","张三2"),("1003","张三3"),("1004","张三4"),("1005","张三5")))println(s"rdd1的分区数:${rdd1.getNumPartitions}")
val rdd2:RDD[(String,String)]= sc.parallelize(List(("1006","李四6"),("1007","李四7"),("1003","张三3"),("1008","李四8"),("1009","李四9")))println(s"rdd2的分区数:${rdd2.getNumPartitions}")
val rdd3:RDD[(String,Int)]= sc.parallelize(List(("1006",111),("1007",22),("1003",33),("1008",444),("1009",55)))//两个RDD要想进行union合并,必须保证元素的格式和数据类型是一致的//分区数也会进行合并,最终的分区数由两个RDD总共的分区数决定// rdd1.union(rdd3)
val resRDD1:RDD[(String,String)]= rdd1.union(rdd2)
resRDD1.foreach(println)println(s"resRDD1的分区数:${resRDD1.getNumPartitions}")/**
* 输出:
* rdd1的分区数:1
* rdd2的分区数:1
* (1001,张三)
* (1002,张三2)
* (1003,张三3)
* (1004,张三4)
* (1005,张三5)
* (1006,李四6)
* (1007,李四7)
* (1003,张三3)
* (1008,李四8)
* (1009,李四9)
* resRDD1的分区数:2
*/}}
9)join
importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.spark.rdd.RDD/**
* join算子也要作用在kv格式的RDD上
*/
object Demo9Join{
def main(args:Array[String]):Unit={
val conf:SparkConf=newSparkConf().setMaster("local").setAppName("reduceByKey")
val sc:SparkContext=newSparkContext(conf)//===================================================//parallelize:将scala的集合变成spark中的RDD
val rdd1:RDD[(String,String)]= sc.parallelize(List(("1001","张三"),("1002","李四"),("1003","王五"),("1004","小明"),("1005","小红")))
val rdd2:RDD[(String,String)]= sc.parallelize(List(("1001","看美女"),("1002","看综艺"),("1003","看八卦"),("1004","打游戏"),("1009","学习")))/**
* join 内连接
* right join 右连接
* left join 左连接
* full join 全连接
*/// join 内连接 两个rdd共同拥有的键才会进行关联/**
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1003,王五,看八卦)
*/
val resRDD1:RDD[(String,(String,String))]= rdd1.join(rdd2)
val resRDD2:RDD[(String,String,String)]= resRDD1.map {case(id:String,(name:String, like:String))=>(id, name, like)}
resRDD2.foreach(println)//right join 右连接 保证右边rdd键的完整性/**
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1009,查无此人,学习)
* (1003,王五,看八卦)
*/
val resRDD2:RDD[(String,(Option[String],String))]= rdd1.rightOuterJoin(rdd2)
val resRDD3:RDD[(String,String,String)]= resRDD2.map {case(id:String,(Some(name), like:String))=>(id, name, like)case(id:String,(None, like:String))=>(id,"查无此人", like)}
resRDD3.foreach(println)//left join: 左连接/**
* (1005,小红,此人无爱好)
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1003,王五,看八卦)
*/
val resRDD1:RDD[(String,(String,Option[String]))]= rdd1.leftOuterJoin(rdd2)
val resRDD2:RDD[(String,String,String)]= resRDD1.map {case(id:String,(name:String,Some(like:String)))=>(id, name, like)case(id:String,(name:String,None))=>(id, name,"此人无爱好")}
resRDD2.foreach(println)//全连接,保证所有的键、值的完整/**
* (1005,小红,此人无爱好)
* (1001,张三,看美女)
* (1002,李四,看综艺)
* (1004,小明,打游戏)
* (1009,查无此人,学习)
* (1003,王五,看八卦)
*/
val resRDD2:RDD[(String,(Option[String],Option[String]))]= rdd1.fullOuterJoin(rdd2)
val resRDD3:RDD[(String,String,String)]= resRDD2.map {case(id:String,(Some(name),Some(like)))=>(id, name, like)case(id:String,(Some(name),None))=>(id, name,"此人无爱好")case(id:String,(None,Some(like)))=>(id,"查无此人", like)}
resRDD3.foreach(println)}}
版权归原作者 灌木丛中的微风 所有, 如有侵权,请联系我们删除。