在之前的文章中,我们学习了如何在scala中定义与使用类和对象,并做了几道例题。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。
Spark-Scala语言实战(5)-CSDN博客文章浏览阅读1.6k次,点赞51次,收藏18次。今天我会给大家带来如何在Scala中定义类和对象,并正确使用它们同时也会给大家讲解一些实训题例。希望在本篇文章中,大家有所收获。也欢迎朋友们到评论区下一起交流学习,共同进步。https://blog.csdn.net/qq_49513817/article/details/136954217?spm=1001.2014.3001.5501今天的文章,我会教大家如何在IDEA中导入我们spark的jars包,并使用RDD方法查看是否导入成功,并完成一道相关例题。
一、知识回顾
在我们的Scala中,有着我们的类(Class)和对象(Object),而我们的类定义了对象的属性和方,而对象是类的实例。
我们可以使用new关键字来创建类的对象。
其次就是构造器
Scala的类可以有一个或多个构造器,它们用于初始化对象的属性,使用
this
关键字。
最后就是伴生对象
在Scala中,每个类都有一个与之关联的伴生对象。这个对象与类共享相同的名称,并且它的定义位于类定义的外部。
现在,开始今天的学习吧
二、导入jars包
首先,进入File中的project structure
进入Libraries 添加java
选择本地spark文件中的jars即可。
现在,写一个简单的RDD看下我们的jars包是否导入成功
在代码中有几行是我们要注意的
1.配置
import org.apache.spark.{SparkConf, SparkContext}
这一行的作用是配置spark应用程序,初始化spark的运行环境。
2.方法
val conf=new SparkConf().setMaster("local").setAppName("123456")
val sc=new SparkContext(conf)
这两行的作用是使用
.setMaster("local")
方法指定了 Spark 应用程序的运行模式。通过
.setAppName("123456")
方法,为 我的Spark 应用程序设置了一个名称。
现在完整代码附上:
import org.apache.spark.{SparkConf, SparkContext}
object p3 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("123456")
val sc=new SparkContext(conf)
val data = Array(1,2,3,4,5)
val distData = sc.parallelize(data)
print(distData.partitions.size)
}
}
运行结果 ,输出了1
三、任务实现
查询上半年实际薪资排名前3的员工信息,需要对上半年的实际薪资进行排序,而创建RDD时,textFile0方法是将每一行数据作为一条记录存储的,所以在排序前需要先对数据进行转换,实现步骤如下。(任务文档以文章附件提供)
(1)读取CSV文件,将第一行字段名称删除。
(2)将数据按分隔符“”分隔,取出第2列员工姓名和第7列实际薪员工信息 资数据,并将实际薪资数据转换成Int类型数据。
(3)通过sortBy()方法根据实际薪资进行降序排列。
(4)通过take()方法获取上半年实际薪资排名前3的员工信息。
import org.apache.spark.{SparkConf, SparkContext}
object p2 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("p2")
val sc=new SparkContext(conf)
val first_half = sc.textFile("C:\\Employee_salary_first_half.csv")
val drop_first = first_half.mapPartitionsWithIndex((ix,it) => {
if (ix ==0) it.drop(1)
it
})
val split_first = drop_first.map(line => {val data = line.split(","); (data(1),data(6).toInt)})
val sort_first =split_first.sortBy(x => x._2,false)
val ppp=sort_first.take(3)
println(ppp.toList)
}
}
使用了
sc.textFile
方法读取位于
"C:\\Employee_salary_first_half.csv"
的 CSV 文件,并将它作为一个 RDD返回。
使用了
mapPartitionsWithIndex
方法去除首行数据,因为首行数据是我们用不到的标题等,
使用了
sortBy
方法,按照元组的第二个元素(即第七列的值)进行降序排序。
最后将数据转化为列表输出。
执行代码获得最后结果。
拓展-RDD
1.什么是rdd
RDD,全称Resilient Distributed Dataset,即弹性分布式数据集,是分布式内存的一个抽象概念。它是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建。
RDD的主要特性包括:
- 分区列表:每个RDD被分为多个分区,这些分区运行在集群的不同节点上,每个分区都会被一个计算任务处理,分区数决定并行计算的数量。
- 依赖关系:RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系,包括窄依赖(一对一)和宽依赖(多对多)。
- 容错性:RDD支持高效的容错,数据既可以缓存在内存中,也可以缓存在磁盘中,或者缓存在外部存储中。
RDD的作用是降低了开发分布式应用程序的门槛,提高了执行效率。在RDD中,开发者可以通过简单的操作如map、reduce等完成数据的拆分、通信机制、作业调度等复杂任务。
2.RDD的特点
特点描述弹性RDD是弹性的,可以在内存中缓存数据,并支持容错性。当计算节点发生故障时,可以重新计算丢失的数据分区,而不需要重新启动整个计算过程。分区RDD将数据集合划分为多个分区,每个分区存储在不同的计算节点上。这样可以实现数据的并行处理,提高计算效率。不可变性RDD是不可变的,即不能直接修改RDD中的数据。如果需要对RDD进行转换或操作,会生成一个新的RDD。延迟计算RDD采用了惰性计算的策略,即只有在需要获取结果时才会进行计算。这样可以避免不必要的计算,提高计算效率。容错性RDD具有容错性,可以自动从节点失败中恢复过来。如果某个节点上的RDD分区因为节点故障导致数据丢失,RDD会自动通过自己的数据来源重新计算该分区。
3.RDD框架
组件描述Spark框架一个用于大规模数据处理的快速、通用计算引擎,支持批处理、流处理、图计算和机器学习等。RDD(弹性分布式数据集)Spark的核心抽象,表示一个只读、可分区的数据集,可以跨集群节点进行计算。分区(Partition)RDD中的数据被逻辑上划分为多个分区,每个分区存储在不同的节点上,以实现并行计算。转换操作(Transformations)创建新RDD的操作,如map、filter、flatMap等。这些操作是惰性的,只记录计算逻辑,不立即执行。动作操作(Actions)触发RDD计算并返回结果到驱动程序的操作,如collect、count、reduce等。依赖关系(Dependencies)RDD之间的依赖关系,用于描述RDD之间的转换链,支持容错和计算优化。缓存(Caching)RDD可以缓存在内存中,以便在多次计算中重用,提高计算效率。容错性(Fault Tolerance)RDD通过记录数据转换的血缘关系实现容错,当节点故障导致数据丢失时,可以重新计算丢失的分区。调度器(Scheduler)负责任务的调度和分配,将任务发送到合适的节点上执行,以实现高效的并行计算。执行器(Executor)运行在集群节点上的进程,负责执行具体的计算任务,与驱动程序通信以获取数据和指令。
版权归原作者 Peng0426. 所有, 如有侵权,请联系我们删除。