SparkRDD的用法
1、从已有的数据集合创建 RDD
:你可以使用
parallelize
方法从一个已有的 Scala 集合(如数组或列表)中创建 RDD。
val qxyrdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
2、从外部数据源读取数据创建 RDD
:你可以使用 Spark 提供的各种数据源来创建 RDD,比如文本文件、序列文件、JSON 文件、CSV 文件等。
val qxyrdd2 = sc.textFile("/data1/person.txt")
3、通过转换已有的 RDD 创建 RDD:
你可以通过对已有的 RDD 进行各种转换操作来创建新的 RDD。
val qxyrdd3 = qxyrdd1.map(_ * 2)
4. 显示RDD中的元素:
qxyrdd.collect().foreach(println)
- 对RDD中的每个元素应用函数:
val qxyRDD = rdd.map(x => x * 2)
6. 对RDD中的元素进行过滤:
val qxyfilteredRDD = qxyrdd.filter(x => x % 2 == 0)
7. 对RDD中的元素进行聚合:
val qxysum = qxyrdd.reduce((x, y) => x + y)
- 对RDD中的元素进行排序:
val qxysortedRDD = qxyrdd.sortBy(x => x, ascending = false)
- 对两个RDD进行笛卡尔积操作:
val qxycartesianRDD = qxyrdd.cartesian(qxyRDD)
10.对RDD中的每个分区应用函数:
val qxyPartitionsRDD = qxyrdd.mapPartitions(iter => iter.map(_ * 2))
Spark SQL
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理。
•Spark SQL使用的数据抽象并非是RDD,而是DataFrame。
•在Spark 1.3.0版本之前,DataFrame被称为SchemaRDD。
•DataFrame使Spark具备处理大规模结构化数据的能力。
•在Spark中,DataFrame是一种以RDD为基础的分布式数据集。
•DataFrame的结构类似传统数据库的二维表格,可以从很多数据源中创建,如结构化文件、外部数据库、Hive表等数据源。
DataFrame的常见操作方法
# 创建DataFrame
val qxydf = spark.createDataFrame(Seq(
(1, "zhangsan",33),
(2, "lisi",54),
(3, "wangwu",44),
(4, "kk",28),
(5, "xiaoshuai",54),
(6, "xiaomei",88)
)).toDF("id", "name","age")
#select查看DataFrame中选取部分列的数据及进行重命名
# filter() 实现条件查询,过滤出想要的结果查询age为44的数据
# groupBy() 对记录进行分组
# sort()对特定字段进行排序操作(默认升序)
已存在的RDD调用toDF()方法转换得到DataFrame。通过Spark读取数据源直接创建DataFrame。
若使用SparkSession方式创建DataFrame,可以使用spark.read从不同类型的文件中加载数据创建DataFrame。spark.read的具体操作,在创建Dataframe之前,为了支持RDD转换成Dataframe及后续的SQL操作,需要导入import.spark.implicits._包启用隐式转换。若使用SparkSession方式创建Dataframe,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame
通过文件直接创建****DataFrame
#导入 Spark 的隐式转换方法和函数的包
#通过Spark读取数据源的方式进行创建DataFrame
#使用printSchema函数查看DataFrame的数据模式输出列的名称和类型
#使用show()方法查看数据
RDD直接转换为DataFrame
创建一个命名为qxy.txt文档内容如下
1 zhangsan 34
2 lisi 44
3 wangwu 45
4 zhaoliu 44
5 fuzhengting 55
6 fushao 36
上传hadoop
打开Hadoop浏览目录,发现已经创建了一个spark文件夹且上传了文档
直接将RDD转换为DataFrame
#定义一个名叫Qsy的样例类
#读取wsy.txt数据创建RDD wsyData 以空格分割
show()方法与show(true)方法查询结果一样,如需显示全部字符需使用show(false)方法
查看前4条数据
查看前4条数据并显示所有字符
版权归原作者 2302_78090734 所有, 如有侵权,请联系我们删除。