第一章 Spark概述
Spark是Apache软件基金会下的一个开源大数据处理框架,它最初由加州大学伯克利分校的AMPLab开发。Spark提供了一个快速、通用的大规模数据处理引擎,具有内存计算的优势,使得它能够比传统的基于磁盘的数据处理系统(如Hadoop MapReduce)快得多。以下是Spark的一些关键概述:
- 内存计算:Spark的主要优势之一是它的内存计算能力。通过将数据加载到内存中,Spark能够减少磁盘I/O的延迟,从而显著提高处理速度。Spark还支持将数据缓存在内存中,以便在多个作业之间重复使用,进一步提高性能。
- 统一编程模型:Spark提供了多种编程接口,如Scala、Java、Python和R,以及丰富的库来支持各种数据处理任务,如批处理、流处理、图形处理和机器学习。这些库都基于Spark的核心RDD(弹性分布式数据集)API构建,使得开发者可以在一个统一的编程模型下处理各种类型的数据和计算任务。
- 容错性和可扩展性:Spark具有强大的容错性和可扩展性。它基于Hadoop的分布式文件系统(HDFS)或类似的存储系统来存储数据,并使用Apache ZooKeeper等分布式协调服务来管理集群状态。Spark还提供了丰富的API和工具来监控和管理集群,以确保系统的稳定性和可用性。
- 生态系统和集成:Spark已经发展成为一个庞大的生态系统,包括多个与数据处理和分析相关的项目。例如,Spark SQL用于处理结构化数据,Spark Streaming用于处理实时数据流,MLlib用于机器学习,GraphX用于图形处理。此外,Spark还可以与Hadoop、Kafka、Flume等其他大数据工具和系统无缝集成。
- 实时分析:除了传统的批处理任务外,Spark还提供了强大的流处理能力,支持实时数据流的分析和处理。这使得Spark成为构建实时大数据应用程序的理想选择。
- 易用性和灵活性:Spark的API设计简洁明了,易于学习和使用。同时,Spark的编程模型也非常灵活,可以轻松地与其他工具和库集成,以满足各种复杂的数据处理需求。
总的来说,Spark是一个功能强大、灵活易用的大数据处理框架,已经成为大数据领域的重要支柱之一。
第二章Scala基础
(1)定义使用常量变量
1.val定义常量:
2.var定义变量:
(2)定义使用数组
方法一:
方法二:
(3)操作数组
1.查看数组长度:
2.查看数组z的第一个元素:
3.查看数组z中除了第一个元素外的其他元素:
4.判断数组z是否为空:
5.判断数组z 是否包含元素“a”:
(4)连接数组
1.“++”连接:
2.concat()l连接:
(5)创建区间函数,生成数组
1.调用函数:
2.Scala占位符:
(6)高阶函数 —函数作为返回值
定义高阶函数计算矩形周长:
(7)函数柯里化
定义两个整数相加函数的写法以及其调用方式:
函数柯里化:
(8)使用for循环
环嵌套if判断
(9)定义不同数据类型的列表
val:name:List(类型)=List(“ ”,” ”,” ”)
使用“Nil”和“::”定义列表:
(10)定义和使用元祖
(11)使用函数组合器
a.Map函数可以通过一个函数重新计算列表中所有的元素
b.Foreach()方法和map()方法类似但是无返回值
c.Filtet()方法可以移除传入函数为false的函数
d.flatten()方法可以将嵌套结构展开
e.flaMap()方法结合了map()方法和flatten()方法的功能
f.groupBy()方法可以对元素的集合进行分组操作
第三章Spark编程基础
RDD简介
RDD 是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点中的存放元素的集合。RDD有3种不同的创建方法。第一种是将程序中已存在的 Sea 集合(如集合、列表、数组)转换成RDD,第二种是对已有RDD 进行转换得到新的RDD,这两种方法都是通过内存中已有的数据创建RDD 的。第三种是直接读取外部存储系统的数据创建 RDD
1).parallelize()
创建RDD及查看分区个数
创建RDD及查看分区个数
package test
import org.apache.spark.{SparkConf, SparkContext}
object d {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("worldcount").setMaster("local")
val sc = new SparkContext(conf)
val data = Array(1, 2, 3, 4, 5)
//val distData = sc.parallelize(data)显示默认分区个数
val distData = sc.parallelize(data, 4)//设置分区为4
print(distData.partitions.size)
}
}
2).makeRDD()
创建RDD并查看分区值
//使用makeRDD()方法创建RDD并查看各分区的值
val seq=Seq((1,seq("iteblogs.com","sparkhost1.com")),
(3,Seq("iteblog.com","sparkhost2.com")),
(2,Seq("iteblog.com","sparkhost3.com")))
//使用makeRDD创建RDD
val iteblog=sc.makeRDD(seq)
//查看RDD的值
iteblog.collect.foreach(print)
3).map()方法转换数据
可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD
//创建RDD
val disData = sc.parallelize(List(1,3,45,3,76))
//map()方法求平均值
val sq_list=disData.map(x => x*x)
println(sq_list)
4).sortBy()方法进行排序
P58 获取上半年实际薪资排名前三的员工信息
//创建RDD
val data = sc.parallelize(List((1,3),(45,3),(7,6)))
//使用sortBy()方法对元祖的第二个值进行降序排序,分区个数设置为1
val sort_data=data.sortBy(x => x._2,false,1)
println(sort_data)
练习: 获取上半年实际薪资排名前三的员工信息
package test
import org.apache.spark.{SparkConf, SparkContext}
object a {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("worldcount").setMaster("local")
val sc=new SparkContext(conf)
val first_half = sc.textFile("C:\\Users\\Administrator\\Desktop\\Employee_salary_first_half.csv")
val drop_first =first_half.mapPartitionsWithIndex((ix,it) => {
if (ix==0 ) it drop(1)
it
})
val split_first = drop_first.map(line => {val data = line.split(",");
(data(1),data(6).toInt)
})
val sort_first= split_first.sortBy(x => x._2,false)
sort_first.take(3)
}
}
5).filter()方法进行过滤
package test
import org.apache.spark.{SparkConf, SparkContext}
object b {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setMaster("local").setAppName("PartialFumction")
val sc=new SparkContext(conf)
val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3)))
rdd1.filter(_._2>1).collect.foreach(println)
rdd1.filter(x => x._2 > 1).collect.foreach(println)
}
}
6).Distinct()方法去重
package test
import org.apache.spark.{SparkConf, SparkContext}
object c {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setMaster("local").setAppName("PartialFumction")
val sc=new SparkContext(conf)
val rdd3=sc.makeRDD(List(1,100,200,300,100))
rdd3.filter(x => x>99).collect()
rdd3.distinct().collect().foreach(println)
}
}
7).Intersection方法
package test
import org.apache.spark.{SparkConf, SparkContext}
object e {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setMaster("local").setAppName("PartialFumction")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(('a',1),('a',1),('b',1),('c',1)))
val rdd2=sc.parallelize(List(('a',1),('b',1),('d',1)))
//用intersection()求两个RDD的共同元素
rdd1.intersection(rdd2).collect.foreach(print)
}
}
第四章 Spark编程进阶
1.在集群环境中运行 Spark
直接在开发环境中运行 Spark 程序时通常选择的是本地模式。如果数据的规模比较庞大,更常用的方式还是在 Spark 程序开发测试完成后编译打包为Java 归档(Java Archive, JAR)包,并通过 spark-submit 命令提交到 Spark集群环境中运行。
spark-submit的脚本在 Spark安装目录的bin 目录下,spark-submit 是 Spark 为所有支特
的集群管理器提供的一个提交作业的工具。Spark 在/example 目录下有 Scala、Java、 Pyithon 和R的示例程序,都可以通过 spark-submit运行。
sporik-subomit 提交JAR 包到集群有一定的格式要求,需要设置一些参数,语法如下,
./bin/spark-submit --class <main-class>
--master <master-url>
--deploy-mode <deploy-mode>
--conf <"key=value">
...# other options
<appli.cat.ion-jar> )
[application-argaments」
如果除了设置运行的脚本名称之外不设置其他参数,那么 Spark 程序默认在本地
运行。
--class:应用程序的入口,指主程序。
--master:指定要连接的集群URL。
-deploy-mode:是否将驱动程序部署在工作节点(cluster)或本地作为外部客户端
(client)。
--conf:设置任意 Spark 配登属性,允许使用"key=value WordCount"的格式设置任意的SparkConf 配置属性。
application-jar:包含应用程序和所有依赖关系的JAR包的路径。
application-arguments:传递给main)方法的参数。
将代码 4-3 所示的程序运行模式更改为打包到集样中运行。程序中无须设置 master 地址、Hadoop 安装包位置。输人、输出路径可通过 spark-submit 指定。
2.spark-submit常用项配置
ndme Name 设置程序名
--jars JARS 添加依赖包
-driver-memoryMEM Driver 程序使用的内存大小
-executor-memoryMEM Executor使用的内存大小
-total-executor-cores NUM Executor使用的总内核数
--executor-cores NUM 每个Bxecutor使用的内核数
-num-executors NUM 启动的Executor数量
spark.eventLog.dir
保存日志相关信息的路径,可以是“hdfs://” 开头的 HDES 路径,也可以
是“e:// 开头的本地路径,路径均需要提前创建
spark.eventLog.enabled 是否开启日志记录
spark.cores.max
当应用程序运行在 Standalone 集群或粗粒度共享模式 Mesos 集群时,应用
程序向集群(不是每台机器,而是整个集群)请求的最大 CPU内核总数。如果不设置,那么对于 Standalone 集群将使用 spark,deploy. defaultCores 指定的数值,而 Mesos 集群将使用集群中可用的内核
第五章 Spark SQL——结构化数据文件处理
1.Spark SQL的简介
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理。
2.Spark SQL主要提供了以下三个功能:
Ø Spark SQL 可从各种结构化数据源中读取数据,进行数据分析。
Ø Spark SQL 包含行业标准的 JDBC 和 ODBC 连接方式,因此它不局限于在 Spark 程序内使用 SQL 语句进行查询。
Ø Spark SQL 可以无缝地将 SQL 查询与 Spark 程序进行结合,它能够将结构化数据作为 Spark 中的分布式数据集( RDD )进行查询。
3.Spark SQL 的核心组成部分包括:
DataFrame: 这是 Spark SQL 提供的一个分布式数据集,它带有 Schema 元信息,使得 Spark SQL 可以进行某些形式的执行优化。DataFrame 是从 RDD 派生而来的,但它带有额外的结构信息,使得 Spark 可以对数据进行更好的优化处理23。
SQL 查询引擎: Spark SQL 内置了一个 SQL 查询引擎,它支持执行 SQL 语句,使得用户可以以 SQL 的方式查询数据。Spark SQL 还支持将 SQL 查询与 Spark 程序无缝混合使用,这大大提高了其灵活性和易用性23。
优化器: Catalyst 是 Spark SQL 的执行优化器,它负责对 SQL 查询进行解析和优化,以提高执行效率。Catalyst 优化器可以处理和优化多种类型的 SQL 语句,包括那些涉及复杂数据处理逻辑的语句3。
插件系统: Spark SQL 有一个插件系统,允许开发者扩展其功能,如添加新的数据源或数据 sink。
4.DataFrame简介
DataFrame可以看作是分布式的Row对象的集合,在二维表数据集的每一列都带有名称和类型,这就是Schema元信息,这使得Spark框架可获取更多数据结构信息,从而对在DataFrame背后的数据源以及作用于DataFrame之上数据变换进行针对性的优化,最终达到提升计算效率。
DataFrame 是 Spark SQL 提供的一个分布式数据集,它是带有 Schema 元信息的 DataSet,可以看作是关系数据库中的一个表。DataFrame 可以从多种源构建,如结构化数据文件、Hive 表、外部数据库或现有的 RDD。它支持 Spark SQL 查询和 DataFrame API,可以方便地进行数据处理和转换23
5.DataFrame的创建
创建DataFrame的两种基本方式:
•已存在的RDD调用toDF()方法转换得到DataFrame。
通过Spark读取数据源直接创建DataFrame
若使用SparkSession方式创建DataFrame,可以使用spark.read从不同类型的文件中加载数据创建DataFrame。spark.read的具体操作,在创建Dataframe之前,为了支持RDD转换成Dataframe及后续的SQL操作,需要导入import.spark.implicits._包启用隐式转换。若使用SparkSession方式创建Dataframe,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,具体操作API如表4-1所示。
(1)数据准备
在HDFS文件系统中的/spark目录中有一个person.txt文件,内容如下:
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 jerry 40
(2)通过文件直接创建DataFrame
我们通过Spark读取数据源的方式进行创建DataFrame
scala > val personDF = spark.read.text("/spark/person.txt")
personDF: org.apache.spark.sql.DataFrame = [value: String]
scala > personDF.printSchema()
root
|-- value: String (Nullable = true)
(3)RDD直接转换为DataFrame
scala > val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" "))
lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at <console>:24
scala > case class Person(id:Int,name:String,age:Int) defined class Person
scala > val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[7] at map at <console>:27
scala > val personDF = personRDD.toDF()
personDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
6.DataFrame的常用操作
** **操作DataFrame的常用方法,具体如下表如示。
Dataframe提供了两种谱法风格,即DSL风格语法和SQL风格语法,二者在功能上并无区别,仅仅是根据用户习惯自定义选择操作方式。接下来,我们通过两种语法风格,分讲解Dstaframe操作的具体方法。
一.DSL风格操作
DataFrame提供了一个领域特定语言(DSL)以方便操作结构化数
据,下面将针对DSL操作风格,讲解DataFrame
常用操作示例:
**1.show():**查看DataFrame中的只体内容信息
**2.pritSchema0:**查看0staFrame的Schema信息
3.select():查看DataFmame中造取部分列的数据,
下面演示查看xixiDF对象的name字段数据,具体代码如下所示:
** DataFrame**
7.filter()/where条件查询
filter() 与 where 条件查询
在JavaScript中,
filter()
方法可用于从数组中筛选出符合特定条件的元素。该方法接受一个回调函数作为参数,该回调函数定义了筛选的条件。例如,如果要筛选出数组中所有年龄超过20岁的对象,我们可以使用以下代码:
const people = [
{ name: 'Alice', age: 25 },
{ name: 'Bob', age: 18 },
{ name: 'Charlie', age: 22 }
];
const adults = people.filter(person => person.age > 20);
console.log(adults);
输出结果将是所有年龄超过20岁的对象。
类似地,在数据库查询中,
WHERE
条件用于过滤结果集,只返回符合特定条件的记录。例如,在SQL中,如果我们想查询所有年龄超过20岁的用户,我们会使用:
SELECT * FROM users WHERE age > 20;
8.mgroupBy()对数据进行分组
mgroupBy() 对数据进行分组
在JavaScript中,尽管没有内置的
mgroupBy()
方法,但我们可以通过其他方法实现数据的分组。常见的方法包括使用哈希表(
Object.keys()
)和数组的
reduce()
方法来进行分组。
例如,以下代码展示了如何使用哈希表对一组数据进行分组:
const data = [
{ key: 'a', value: 1 },
{ key: 'b', value: 2 },
{ key: 'c', value: 3 }
];
const groupedData = Object.keys(data).reduce((acc, key) => {
acc[key] = data[key];
return acc;
}, {});
console.log(groupedData);
这段代码将
data
数组中的数据按
key
进行分组,最终输出一个对象,其键为
a
、
b
和
c
,对应的值为1、2和3。
在数据库中,
GROUP BY
用于对数据进行分组。例如,在SQL中,如果我们想对用户按年龄分组:
SELECT age, COUNT(*) FROM users GROUP BY age;
9.sort()/orderBy():对特定字段进行排序
desc:降序;asc:升序
sort() / orderBy() 对特定字段进行排序
JavaScript的
sort()
方法可用于对数组中的元素进行排序。当应用于数组对象时,可以提供比较函数来根据特定字段进行排序。例如,以下代码展示了如何根据年龄对用户数组进行排序:
const users = [
{ name: 'Alice', age: 25 },
{ name: 'Bob', age: 18 },
{ name: 'Charlie', age: 22 }
];
users.sort((a, b) => a.age - b.age);
console.log(users);
输出结果将是按年龄排序的用户数组。
在数据库中,
ORDER BY
用于根据一个或多个字段对结果进行排序。例如,在SQL中,如果我们想按年龄降序排序用户:
SELECT * FROM users ORDER BY age DESC;
10.RDD、DataFrame及Dataset的区别
接下来,我们来比较一下这三者的不同之处:
综上所述,RDD、DataFrame和Dataset在Spark SQL中各自扮演着不同的角色,它们共同构成了Spark SQL强大的数据处理能力。在选择使用哪一个时,我们需要根据自己的数据处理需求和场景来决定。
11.Dataset****对象的创建
(1)通过SparkSession中的createDataset来创建****Dataset
(2)DataFrame通过“as[ElementType]”方法转换得到Dataset
12.RDD转换DataFrame
•Spark官方提供了两种方法实现从RDD转换得到DataFrame。
•第一种方法是利用反射机制来推断包含特定类型对象的Schema,这种方式适用于对已知数据结构的RDD转换
•第二种方法通过编程接口构造一个Schema,并将其应用在已知的RDD数据中。
13.反射机制推断****Schema
反射机制推断Schema的过程
反射机制推断Schema的过程可以通过以下几个步骤来概括:
- 定义样例类:首先需要定义一个包含所需字段的样例类,例如
Person
,它代表了数据框中每行的结构。 - 创建SparkSession:建立与Spark集群的连接,并创建一个
SparkSession
对象,它是Spark 2.0之后用于数据处理的入口。 - 读取数据:使用
textFile
或csv
等方法读取数据文件,并将数据解析为一个RDD(Resilient Distributed Dataset)。 - 将RDD与样例类关联:通过
map
函数将RDD中的每个元素转换为样例类实例,从而生成一个新的RDD。 - 转换为DataFrame:使用
toDF
方法将RDD转换为一个DataFrame对象,这个对象包含了数据的逻辑结构和类型信息。 - 展示和处理DataFrame:可以使用
show
方法查看DataFrame的前几行数据,使用printSchema
方法查看其Schema。 - 执行SQL查询:可以将DataFrame注册为一个临时表,然后使用SQL语句对其进行查询和分析。
示例代码
以下是一个简化的代码示例,展示了如何使用反射机制推断Schema:
import org.apache.spark.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
// 定义样例类
case class Person(id: Int, name: String, age: Int)
object ReflectiveSchemaInference {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("ReflectiveSchemaInference")
.master("local[*]")
.getOrCreate()
// 读取数据
val data = spark.textFile("data/people.txt").map(line => line.split("\t"))
// 将RDD与样例类关联
val peopleRdd = data.map(row => Person(row(0).toInt, row(1), row(2).toInt))
// 转换为DataFrame
val peopleDf = peopleRdd.toDF
// 展示DataFrame前20行数据
peopleDf.show(20)
// 打印DataFrame的Schema
peopleDf.printSchema()
// 统计年龄大于30的人数
println(peopleDf.filter($"age" > 30).count())
// 关闭SparkSession
spark.stop()
}
}
在这个示例中,首先定义了一个Person样例类,然后从文本文件中读取数据,并将其转换为Person类型的RDD。接着,将这个RDD转换为DataFrame,并展示了它的前20行数据以及完整的Schema。最后,进行了一个简单的统计查询,并关闭了SparkSession。
结论
反射机制推断Schema是一种在Spark中自动化数据处理任务的方法,它可以减少手动编写数据处理逻辑的工作量。这种方法的核心在于使用样例类来定义数据结构,并通过反射来自动推断数据的逻辑类型和关系。这种方法不仅适用于Spark,也适用于其他支持类似API的类似大数据处理框架
15.编程方式定义****Schema
a.概述
定义Schema有多种方式,可以通过编程语言中的数据类型定义,也可以使用特定的数据描述语言,如XML Schema(XSD)或JSON Schema等。在编程中,定义Schema通常涉及到定义数据类型、数据结构以及数据之间的关系。以下将分别介绍在不同编程环境和语言下定义Schema的方法。
b.详细分析
1)Java环境下的定义
在Java环境中,可以使用如Java对象表示法(Java Object Notation)来定义数据类型,并通过反射机制来实现动态类型检查。此外,还可以使用第三方库如Jackson或Gson来更方便地处理JSON格式的Schema定义。
2)Python环境下的定义
Python通常使用内建的dict类型作为数据结构,通过键值对的方式定义数据类型和关系。在Python 3.5及之后的版本中,可以使用typing模块来定义类型注释,使得代码更为清晰。
3)JavaScript环境下的定义
在JavaScript中,可以使用对象字面量(Object Literal)来定义数据结构,并通过函数来定义数据类型。ES6之后,可以使用class关键字来定义类,从而更好地组织和管理数据类型。
4)XML环境下的定义
XML使用标记语言(如XML Schema Definition, XSD)来定义数据类型。通过编写XML Schema文件,可以定义数据元素的类型、结构和约束。
5)JSON环境下的定义
JSON本身是一种轻量级的数据交换格式,但它也可以用来定义数据类型。通过编写JSON文档,可以描述数据结构及其相互关系。
总结
定义Schema的方法取决于使用的编程语言和环境。无论是在编程语言中直接定义数据类型,还是使用特定的数据描述语言,重要的是确保定义能够准确反映所需的数据结构和行为。
版权归原作者 yu: 所有, 如有侵权,请联系我们删除。