0


spark

RDD的创建

1.从内存中的集合创建:使用**

parallelize()

**方法可以将一个集合转换为RDD

data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)

2.使用Spark提供的API可以从外部存储系统读取数据并创建RDD

rdd = spark.sparkContext.textFile("hdfs://path/to/your/file.txt")
3.通过转换操作创建:可以通过对已存在的RDD执行转换操作来创建新的RDD

old_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

new_rdd = old_rdd.map(lambda x: x * 2)

4.通过函数的结果创建:可以通过调用函数来创建RDD,其中函数的返回值会成为RDD的元素

rdd = spark.sparkContext.range(1, 100, 2)

RDD的方法

map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD。

map()方法是(懒操作)转换操作,不会立即进行计算。

转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD。

使用sortBy()方法排序

第1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。

第2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false。

第3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等。

除了第一个参数是必须输入的,而后面的两个参数可以不输入。

使用collect()方法查询数据

collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。

collect()方法有两种操作方式:

collect:直接调用collect返回该RDD中的所有元素,返回类型是一个Array[T]数组.

collect[U: ClassTag](f: PartialFunction[T, U]):RDD[U]。这种方式需要提供一个标准的偏函数,将元素保存至一个RDD中。首先定义一个函数one,用于将collect方法得到的数组中数值为1的值替换为“one”,将其他值替换为“other”。

使用flatMap()方法转换数据

flatMap()方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的RDD。使用flatMap()方法时先进行map(映射)再进行flat(扁平化)操作,数据会先经过跟map一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD。

简单的集合操作方法

intersection()方法

intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。

创建两个RDD,其中有相同的元素,通过intersection()方法求出两个RDD的交集。

subtract()方法

subtract()方法用于将前一个RDD中在后一个RDD出现的元素删除,可以认为是求补集的操作,返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果。

创建两个RDD,分别为rdd1和rdd2,包含相同元素和不同元素,通过subtract()方法求rdd1和rdd2彼此的补集。

cartesian()方法

cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。

创建两个RDD,分别有4个元素,通过cartesian()方法求两个RDD的笛卡儿积。

使用键值对RDD的reduceByKey()方法

spark提供了两种方法分别获取键值对RDD的键和值通过keys和values方法分别查看。

key.collect和value.collect。

当数据集以键值对形式展现时,合并统计键相同的值是很常用的操作。

reduceByKey()方法用于合并具有相同键的值,作用对象是键值对,并且只对每个键的值进行处理,当RDD中有多个键相同的键值对时,则会对每个键对应的值进行处理。reduceByKey()方法需要接收一个输入函数,键值对RDD相同键的值会根据函数进行合并并且创建一个新的RDD作为返回结果。在进行处理时,reduceByKey()方法将相同键的前两个值传给输入函数,产生一个新的返回值,新产生的返回值与RDD中相同键的下一个值组成两个元素,再传给输入函数,直到最后每个键只有一个对应的值为止。reduceByKey()方法不是一种行动操作,而是一种转换操作。

RDD的连接

join()方法

join()方法用于根据键对两个RDD进行内连接,将两个RDD中键相同的数据的值存放在一个元组中,最后只返回两个RDD中都存在的键的连接结果。例如,在两个RDD中分别有键值对(K,V)和(K,W),通过join()方法连接会返回(K,(V,W))。创建两个RDD,含有相同键和不同的键,通过join()方法进行内连接。

rightOuterJoin()方法

rightOuterJoin()方法用于根据键对两个RDD进行右外连接,连接结果是右边RDD的所有键的连接结果,不管这些键在左边RDD中是否存在。在rightOuterJoin()方法中,如果在左边RDD中有对应的键,那么连接结果中值显示为Some类型值;如果没有,那么显示为None值。

leftOuterJoin()方法

leftOuterJoin()方法用于根据键对两个RDD进行左外连接,与rightOuterJoin()方法相反,返回结果保留左边RDD的所有键。

fullOuterJoin()方法

fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。

使用zip()方法组合两个RDD

zip()方法用于将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常。

将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

使用lookup()方法查找指定键的值

lookup(key:K)方法作用于键值对RDD,返回指定键的所有值。

Spark SQL

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理。

•Spark SQL使用的数据抽象并非是RDD,而是DataFrame。
•在Spark 1.3.0版本之前,DataFrame被称为SchemaRDD。
•DataFrame使Spark具备处理大规模结构化数据的能力。
•在Spark中,DataFrame是一种以RDD为基础的分布式数据集。
•DataFrame的结构类似传统数据库的二维表格,可以从很多数据源中创建,如结构化文件、外部数据库、Hive表等数据源。

创建DataFrame
1.通过Spark读取数据源直接创建DataFrame。

若使用SparkSession方式创建DataFrame,可以使用spark.read从不同类型的文件中加载数据创建DataFrame。spark.read的具体操作,在创建Dataframe之前,为了支持RDD转换成Dataframe及后续的SQL操作,需要导入import.spark.implicits._包启用隐式转换。若使用SparkSession方式创建Dataframe,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame

使用spark-shell进入交互界面

2.通过文件直接创建****DataFrame

#导入 Spark 的隐式转换方法和函数的包

import spark.implicits

#通过Spark读取数据源的方式进行创建DataFrame

val lhrDF=spark.read.text("/spark/lhr.text")

#使用printSchema函数查看DataFrame的数据模式输出列的名称和类型

lhrDF.printSchema()

RDD直接转换为DataFrame

#定义一个名叫lhr的样例类

case class lhr(id:Int,name:string,age:Int)

#读取lhr.txt数据创建RDD wsyData 以空格分割

val lhrData=sc.textFile("/spark/lhr.text").map(_.split(""))

#将lhrData转换为DataFrame

val lhr=lhrData.map(w=>lhr(w(0).trim.toInt,w(1),w(2).trim.toInt))toDF()

#使用where()方法查询lhr中age为44的用户

val lhrWhere=lhr.where("age=44")

lhrwhere.show

DataFrame的常见操作方法

获取第一条数据

lhr.first()

head()方法获取前三条数据

lhr.head(3)

takeAsList()方法获取前三条数据以列表形式展现

lhr.takeAsList(3)

printSchema():查看DataFrame的Schema信息

lhrDF.printSchema()

select():查看DataFrame中选取部分列的数据及进行重命名

val lhrSelect=lhr.select("name")

使用方法查询对id字段使用replace函数并取别名sex,查询第一条数据

lhrSelectExpr.show(1)

filter():实现条件查询,过滤出想要的结果查询age为44的数据

val lhrFilter=lhr.filter("age=44")

** sort()/orderBy()方法对特定字段进行排序操作(默认升序)**

val lhrsort=lhr.soprt(asc("id"))

降序排序

val lhrsort=lhr/soprt(desc("id"))


本文转载自: https://blog.csdn.net/dmnjsw/article/details/138799632
版权归原作者 dsvsvda 所有, 如有侵权,请联系我们删除。

“spark”的评论:

还没有评论