什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据 处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
RDD特点:
1、弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片;
2、分布式
数据存储在大数据集群不同节点上;
3、数据集
RDD 封装了计算逻辑,并不保存数据;
4、数据抽象
RDD 是一个抽象类,需要子类具体实现;
5、不可变
RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的RDD 里面封装计算逻辑;
6、
可分区、并行计算;
执行原理
- 从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合;
- Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的 计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计 算。最后得到计算结果;
RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD
的工作原理:
**1、启动 Yarn **集群环境
2、Spark 通过申请资源创建调度节点和计算节点
3、Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
4、调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给 Executor 节点执行计算,接下来我们就一起看看 Spark 框架中 RDD 是具体是如何进行数据 处理的;
创建RDD
上面了解了RDD的基础概念之后,下面通过代码演示,来看看常用的几种创建RDD的方式吧
1、通过内存进行创建
这种方式最简单,通常可以在程序中创建一个集合的方式创建出来一个RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RDD_Memory {
def main(args: Array[String]): Unit = {
// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// TODO 创建RDD
// 从内存中创建RDD,将内存中集合的数据作为处理的数据源
val seq = Seq[Int](1,2,3,4,5)
// parallelize : 并行
//val rdd: RDD[Int] = sc.parallelize(seq)
// makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法。
val rdd: RDD[Int] = sc.makeRDD(seq)
rdd.collect().foreach(println)
// TODO 关闭环境
sc.stop()
}
}
运行这段程序,从控制台可以看出,从集合中创建的RDD并打印出了各个元素
2、读取外部文件创建RDD
在实际开发中,读取外部文件是非常常见的场景,本例在工程的根目录下,创建一个文件,内容如下;
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RDD_File {
def main(args: Array[String]): Unit = {
// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
//创建RDD ,可以是绝对路径,相对路径,也可以是 hdfs的路径
val rdd : RDD[String] = sc.textFile("E:\\code-self\\spi\\datas\\1.txt")
rdd.collect().foreach(println)
// TODO 关闭环境
sc.stop()
}
}
运行这段程序,观察控制台输出效果
3、读取HDFS文件创建RDD
在大数据处理场景中,从HDFS上面读取文件也是非常常用的,下面展示下如何读取hdfs的文件创建RDD;
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RDD_File {
def main(args: Array[String]): Unit = {
// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// TODO 创建RDD
val rdd1 = sc.textFile("hdfs://localhost:9000/sanguo/yingxiong.txt")
rdd1.collect().foreach(println)
// TODO 关闭环境
sc.stop()
}
}
在hdfs的sanguo目录下,有这样一个yingxiong.txt文件,里面内容如下:
运行上面的程序,观察控制台输出效果
版权归原作者 逆风飞翔的小叔 所有, 如有侵权,请联系我们删除。