0


Spark中方法运用

SparkRDD的用法

1、从已有的数据集合创建 RDD

:你可以使用

parallelize

方法从一个已有的 Scala 集合(如数组或列表)中创建 RDD。

val qxyrdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))

2、从外部数据源读取数据创建 RDD

:你可以使用 Spark 提供的各种数据源来创建 RDD,比如文本文件、序列文件、JSON 文件、CSV 文件等。

val qxyrdd2 = sc.textFile("/data1/person.txt")

3、通过转换已有的 RDD 创建 RDD

你可以通过对已有的 RDD 进行各种转换操作来创建新的 RDD。

val qxyrdd3 = qxyrdd1.map(_ * 2)

4. 显示RDD中的元素:

   qxyrdd.collect().foreach(println)

  1. 对RDD中的每个元素应用函数:
   val qxyRDD = rdd.map(x => x * 2)

6. 对RDD中的元素进行过滤:

val qxyfilteredRDD = qxyrdd.filter(x => x % 2 == 0)

7. 对RDD中的元素进行聚合:

val qxysum = qxyrdd.reduce((x, y) => x + y)

  1. 对RDD中的元素进行排序:
 val qxysortedRDD = qxyrdd.sortBy(x => x, ascending = false)

  1. 对两个RDD进行笛卡尔积操作:
val qxycartesianRDD = qxyrdd.cartesian(qxyRDD)

10.对RDD中的每个分区应用函数:

 val qxyPartitionsRDD = qxyrdd.mapPartitions(iter => iter.map(_ * 2))
   

Spark SQL

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理。

•Spark SQL使用的数据抽象并非是RDD,而是DataFrame。
•在Spark 1.3.0版本之前,DataFrame被称为SchemaRDD。
•DataFrame使Spark具备处理大规模结构化数据的能力。
•在Spark中,DataFrame是一种以RDD为基础的分布式数据集。
•DataFrame的结构类似传统数据库的二维表格,可以从很多数据源中创建,如结构化文件、外部数据库、Hive表等数据源。

DataFrame的常见操作方法

# 创建DataFrame

   val qxydf = spark.createDataFrame(Seq(
     (1, "zhangsan",33),
     (2, "lisi",54),
     (3, "wangwu",44),
     (4, "kk",28),
     (5, "xiaoshuai",54),
     (6, "xiaomei",88)
   )).toDF("id", "name","age")

#select查看DataFrame中选取部分列的数据及进行重命名

# filter() 实现条件查询,过滤出想要的结果查询age为44的数据

# groupBy() 对记录进行分组

# sort()对特定字段进行排序操作(默认升序)

已存在的RDD调用toDF()方法转换得到DataFrame。通过Spark读取数据源直接创建DataFrame。

若使用SparkSession方式创建DataFrame,可以使用spark.read从不同类型的文件中加载数据创建DataFrame。spark.read的具体操作,在创建Dataframe之前,为了支持RDD转换成Dataframe及后续的SQL操作,需要导入import.spark.implicits._包启用隐式转换。若使用SparkSession方式创建Dataframe,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame

通过文件直接创建****DataFrame

#导入 Spark 的隐式转换方法和函数的包

#通过Spark读取数据源的方式进行创建DataFrame

#使用printSchema函数查看DataFrame的数据模式输出列的名称和类型

#使用show()方法查看数据

RDD直接转换为DataFrame

创建一个命名为qxy.txt文档内容如下

1 zhangsan 34
2 lisi 44
3 wangwu 45
4 zhaoliu 44
5 fuzhengting 55
6 fushao 36

上传hadoop

打开Hadoop浏览目录,发现已经创建了一个spark文件夹且上传了文档

直接将RDD转换为DataFrame

#定义一个名叫Qsy的样例类

#读取wsy.txt数据创建RDD wsyData 以空格分割

show()方法与show(true)方法查询结果一样,如需显示全部字符需使用show(false)方法

查看前4条数据

查看前4条数据并显示所有字符


本文转载自: https://blog.csdn.net/2302_78090734/article/details/138673493
版权归原作者 2302_78090734 所有, 如有侵权,请联系我们删除。

“Spark中方法运用”的评论:

还没有评论