0


Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)

文章目录

每日一句正能量

人生很长,不必慌张。你未长大,我要担当。

第3章 Spark RDD弹性分布式数据集

章节概要

传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。

RDD是Spark提供的最重要的抽象概念,我们可以将RDD理解为一个分布式存储在集群中的大型数据集合,不同RDD之间可以通过转换操作形成依赖关系实现管道化,从而避免了中间结果的I/O操作,提高数据处理的速度和性能。接下来,本章将针对RDD进行详细讲解。

3.3 RDD的处理过程

Spark用Scala语言实现了RDD的API,程序开发者可以通过调用API对RDD进行操作处理。RDD经过一系列的“转换”操作,每一次转换都会产生不同的RDD,以供给下一次“转换”操作使用,直到最后一个RDD经过“行动”操作才会被真正计算处理,并输出到外部数据源中,若是中间的数据结果需要复用,则可以进行缓存处理,将数据缓存到内存中。

Spark用Scala语言实现了RDD的API,程序开发者可以通过调用API对RDD进行操作处理。下面,通过一张图来描述RDD的处理过程。
在这里插入图片描述
RDD经过一系列的"转换”操作,每一次转换都会产生不同的RDD,以供给下一次转换”操作使用,直到最后一个RDD经过“行动”操作才会被真正计算处理。

需要注意的是,RDD采用了惰性调用,即在RDD的处理过程中,真正的计算发生在RDD的"行动”操作,对于"行动"之前的所有"转换"操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD相互之间的依赖关系,而不会触发真正的计算处理。

3.3.1 转换算子

RDD处理过程中的“转换”操作主要用于根据已有RDD创建新的RDD,每一次通过Transformation算子计算后都会返回一个新RDD,供给下一个转换算子使用。下面,通过一张表来列举一些常用转换算子操作的API,具体如下。
在这里插入图片描述
下面,我们通过结合具体的示例对这些转换算子API进行详细讲解。

  • filter(func) filter(func)操作会筛选出满足函数func的元素,并返回一个新的数据集。假设,有一个文件test.txt(内容如前面所示),下面,通过一张图来描述如何通过filter算子操作,筛选出包含单词“spark”的元素。在这里插入图片描述 通过从test.txt文件中加载数据的方式创建RDD,然后通过map操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String]= file:///export/data/test.txt MapPartitionsRDD[6] at textFile at <console>:24

scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String]= MapPartitionsRDD[7] at filter at <console>:25

具体步骤如下:
1.进入到hadoop01,进入/export/data目录,命令如下

cd /export/data

2.修改test.txt文件的内容与源数据保持一致(

vi test.txt

)。

hadoop spark
itcast     heima
scala      spark
spark     itcast
iscast     hadoop

3.进入到spark shell(参考之前的启动)。

cd export/servers/spark/
bin /spark-shell --master local[2]

4.加载文件并产生RDD,代码如下。

scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String]= file:///export/data/test.txt MapPartitionsRDD[6] at textFile at <console>:24

scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String]= MapPartitionsRDD[7] at filter at <console>:25

结果如下图所示
在这里插入图片描述

  • map(func) map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集。假设,有一个文件test.txt,接下来,通过一张图来描述如何通过map算子操作把文件内容拆分成一个个的单词并封装在数组对象中,具体过程如下图所示。在这里插入图片描述 通过从test.txt文件中加载数据的方式创建RDD,然后通过map操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String]= file:///export/data/test.txt MapPartitionsRDD[9] at textFile at <console>:24

scala> var words=lines.map(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[Array[String]]= MapPartitionsRDD[10] at map at <console>:25

结果如下所示:
在这里插入图片描述

  • flatMap(func) flatMap(func)与map(func)相似,但是每个输入的元素都可以映射到0或者多个输出的结果。有一个文件test.txt,接下来,通过一张图来描述如何通过flatMap算子操作,把文件内容拆分成一个个的单词。具体过程如下图所示。在这里插入图片描述 通过从test.txt文件中加载数据的方式创建RDD,然后通过flatMap操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String]= file:///export/data/test.txt MapPartitionsRDD[12] at textFile at <console>:24

scala> val words=lines.flatMap(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[String]= MapPartitionsRDD[13] at flatMap at <console>:25

结果如下所示:
在这里插入图片描述

  • groupByKey() groupByKey()主要用于(Key,Value)键值对的数据集,将具有相同Key的Value进行分组,会返回一个新的(Key ,lterable)形式的数据集。同样以文件test.txt为例,接下来,通过一张图来描述如何通过groupByKey算子操作,将文件内容中的所有单词进行分组。具体过程如下图所示。在这里插入图片描述 通过groupByKey操作把(Key,Value))键值对类型的RDD,按单词将单词出现的次数进行分组,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String]= file:///export/data/test.txt MapPartitionsRDD[15] at textFile at <console>:24

scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)]= MapPartitionsRDD[17] at map at <console>:25

scala> val groupWords=words.groupByKey()
groupWords: org.apache.spark.rdd.RDD[(String, Iterable[Int])]= ShuffledRDD[18] at groupByKey at <console>:25

结果如下所示:
在这里插入图片描述

  • reduceByKey(func) reduceByKey()主要用于(Key,Value)键值对的数据集,返回的是一个新的(Key,Iterable)形式的数据集,该数据集是每个Key传递给函数func进行聚合运算后得到的结果。同样以文件test.txt,接下来,通过一张图来描述如何通过reduceByKey算子操作统计单词出现的次数。具体过程如下图所示。在这里插入图片描述 通过groupByKey操作把(Key,Value)键值对类型的RDD,按单词将单词出现的次数进行分组,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String]= file:///export/data/test.txt MapPartitionsRDD[20] at textFile at <console>:24

scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)]= MapPartitionsRDD[22] at map at <console>:25

scala> var reduceWords=words.reduceByKey((a,b)=>a+b)
reduceWords: org.apache.spark.rdd.RDD[(String, Int)]= ShuffledRDD[23] at reduceByKey at <console>:25

结果如下所示:
在这里插入图片描述

3.3.2 行动算子

行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发真正的计算。下面,通过一张表来列举一些常用行动算子操作的API,具体如下。
在这里插入图片描述
下面,结合具体的示例对这些行动算子API进行详细讲解。

  • count () count()主要用于返回数据集中的元素个数。假设,现有一个arrRdd,如果要统计arrRdd元素的个数,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.count()
res0: Long =5
  • first() first()主要用于返回教组的第一个元素。现有一个arrRdd,如果要获取arrRdd中第一个元素,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.first()
res1: Int =1

从上述结果可以看出,当执行arrRdd.first()操作后返回的结果是1,说明成功获取到了第1个元素。

  • take() take()主要用于以数组的形式返回数组集中的前n个元素。现有一个arrRdd,如果要获取arrRdd中的前三个元素,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.take(3)
res2: Array[Int]= Array(1, 2, 3)

从上述代码可以看出,执行arrRdd.take(3)操作后返回的结果是Array(1,2,3),说明成功获取到了RDD数据集的前3个元素。

  • reduce(func) reduce()主要用于通过函数func(输入两个参数并返回一个值)聚合数据集中的元素。现有一个arrRdd,如果要对arrRdd中的元素进行聚合,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.reduce((a,b)=>a+b)
res3: Int =15
  • collect() collect()主要用于以数组的形式返回数据集中的所有元素。现有一个rdd,如果希望rdd中的元素以数组的形式输出,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.collect()
res4: Array[Int]= Array(1, 2, 3, 4, 5)
  • foreach(func) foreach()主要用于将数据集中的每个元素传递到函数func中运行。现有一个arrRdd,如果希望遍历输出arrRdd中的元素,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int]= ParallelCollectionRDD[24] at parallelize at <console>:24

scala> arrRdd.foreach(x=>println(x))12345

3.3.3 编写WordCount词频统计案例

在Linux本地系统的/export/data目录下,有一个test.txt文件,文件里有多行文本,每行文本都是由2个单词构成,且单词之间都是用空格分隔。现在,我们需要通过RDD统计每个单词出现的次数(即词频),具体操作过程如下。
在这里插入图片描述
具体参见书本内容

转载自:https://blog.csdn.net/u014727709/article/details/136032993
欢迎 👍点赞✍评论⭐收藏,欢迎指正


本文转载自: https://blog.csdn.net/u014727709/article/details/136032993
版权归原作者 想你依然心痛 所有, 如有侵权,请联系我们删除。

“Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)”的评论:

还没有评论