0


Spark 创建RDD的几种方式

什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据 处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

RDD特点:

1、弹性

  • 存储的弹性:内存与磁盘的自动切换;
  • 容错的弹性:数据丢失可以自动恢复;
  • 计算的弹性:计算出错重试机制;
  • 分片的弹性:可根据需要重新分片;

2、分布式

数据存储在大数据集群不同节点上;

3、数据集

RDD 封装了计算逻辑,并不保存数据;

4、数据抽象

RDD 是一个抽象类,需要子类具体实现;

5、不可变

RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的RDD 里面封装计算逻辑;

6、
可分区、并行计算;

执行原理

  • 从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合;
  • Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的 计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计 算。最后得到计算结果;

RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD

的工作原理:

**1、启动 Yarn **集群环境

2、Spark 通过申请资源创建调度节点和计算节点

3、Spark 框架根据需求将计算逻辑根据分区划分成不同的任务

4、调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给 Executor 节点执行计算,接下来我们就一起看看 Spark 框架中 RDD 是具体是如何进行数据 处理的;

创建RDD

上面了解了RDD的基础概念之后,下面通过代码演示,来看看常用的几种创建RDD的方式吧

1、通过内存进行创建

这种方式最简单,通常可以在程序中创建一个集合的方式创建出来一个RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDD_Memory {

  def main(args: Array[String]): Unit = {
    // TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    // TODO 创建RDD
    // 从内存中创建RDD,将内存中集合的数据作为处理的数据源
    val seq = Seq[Int](1,2,3,4,5)

    // parallelize : 并行
    //val rdd: RDD[Int] = sc.parallelize(seq)
    // makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法。
    val rdd: RDD[Int] = sc.makeRDD(seq)

    rdd.collect().foreach(println)

    // TODO 关闭环境
    sc.stop()
  }

}

运行这段程序,从控制台可以看出,从集合中创建的RDD并打印出了各个元素

2、读取外部文件创建RDD

在实际开发中,读取外部文件是非常常见的场景,本例在工程的根目录下,创建一个文件,内容如下;

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDD_File {

  def main(args: Array[String]): Unit = {

    // TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    //创建RDD ,可以是绝对路径,相对路径,也可以是 hdfs的路径
    val rdd : RDD[String] =  sc.textFile("E:\\code-self\\spi\\datas\\1.txt")
    rdd.collect().foreach(println)

    // TODO 关闭环境
    sc.stop()

  }

}

运行这段程序,观察控制台输出效果

3、读取HDFS文件创建RDD

在大数据处理场景中,从HDFS上面读取文件也是非常常用的,下面展示下如何读取hdfs的文件创建RDD;

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDD_File {

  def main(args: Array[String]): Unit = {

    // TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    // TODO 创建RDD
    val rdd1 = sc.textFile("hdfs://localhost:9000/sanguo/yingxiong.txt")
    rdd1.collect().foreach(println)

    // TODO 关闭环境
    sc.stop()

  }

}

在hdfs的sanguo目录下,有这样一个yingxiong.txt文件,里面内容如下:

运行上面的程序,观察控制台输出效果


本文转载自: https://blog.csdn.net/congge_study/article/details/124196343
版权归原作者 逆风飞翔的小叔 所有, 如有侵权,请联系我们删除。

“Spark 创建RDD的几种方式”的评论:

还没有评论