0


Spark - 介绍及使用 Scala、Java、Python 三种语言演示

一、Spark

在这里插入图片描述

  1. Apache Spark

是一个快速的,多用途的集群计算系统, 相对于

  1. Hadoop MapReduce

将中间结果保存在磁盘中,

  1. Spark

使用了内存保存中间结果, 能在数据尚未写入硬盘时在内存中进行运算。

  1. Spark

只是一个计算框架, 不像

  1. Hadoop

一样包含了分布式文件系统和完备的调度系统, 如果要使用

  1. Spark

, 需要搭载其它的文件系统。

Hadoop 之父 Doug Cutting 指出:Use of MapReduce engine for Big Data projects will decline, replaced by Apache Spark (大数据项目的 MapReduce 引擎的使用将下降,由 Apache Spark 取代)。

当然现在有了更为发展趋势,更好处理流式数据的

  1. Flink

,但

  1. Spark

在大数据处理领域仍有一席之地。

1. Spark的优点:

  • 速度快Spark 在内存时的运行速度是 Hadoop MapReduce100倍,基于硬盘的运算速度大概是 Hadoop MapReduce10倍,并且Spark 实现了一种叫做 RDDsDAG 执行引擎, 其数据缓存在内存中可以进行迭代处理。
  • 易上手Spark 支持 Java、Scala、Python、R,、SQL 等多种语言的API,并且支持超过80个高级运算符使得用户非常轻易的构建并行计算程序,同时Spark 也可以使用基于 Scala, Python, R, SQLShell 交互式查询。
  • 通用性强Spark 提供一个完整的技术栈,,包括 SQL执行, Dataset命令式API, 机器学习库MLlib, 图计算框架GraphX, 流计算SparkStreaming等。
  • 兼容性好Spark 可以运行在 Hadoop Yarn、Apache Mesos、 Kubernets、 Spark Standalone等集群中,可以访问 HBase、 HDFS、Hive、 Cassandra 在内的多种数据库。

2. Spark中的组件

  • Spark-Core:整个 Spark 的基础,,提供了分布式任务调度和基本的 I/O 功能,并且Spark 最核心的功能是 RDDsRDDs 就存在于这个包内。同时 RDDs 简化了编程复杂性,操作 RDDs 类似 Jdk8Streaming 操作本地数据集合。
  • Spark SQL:在 spark-core 基础之上带出了 DataSetDataFrame 的数据抽象化的概念,提供了在 DatasetDataFrame 之上执行 SQL 的能力,提供了 DSL, 可以通过 Scala, Java, Python 等语言操作 DataSetDataFrame,还支持使用 JDBC/ODBC 服务器操作 SQL 语言。
  • Spark Streaming:利用 spark-core 的快速调度能力来运行流分析,通过时间窗口截取小批量的数据并可以对之运行 RDD Transformation
  • MLlib:分布式机器学习的框架,可以使用许多常见的机器学习和统计算法,例如:支持向量机、 回归、 线性回归、 逻辑回归、 决策树、 朴素贝叶斯、汇总统计、相关性、分层抽样、 假设检定、随机数据生成等,简化大规模机器学习。
  • GraphX:分布式图计算框架, 提供了一组可以表达图计算的 API,还对这种抽象化提供了优化运行。

3. Spark 和 Hadoop 对比

对比项Sparkhadoop类型分布式计算工具基础平台, 包含计算, 存储, 调度延迟中间运算结果存在内存中,延迟小中间计算结果存在 HDFS 磁盘上,延迟大场景迭代计算, 交互式计算, 流计算大规模数据集上的批处理易用性RDD 组成 DAG 有向无环图, API 较为顶层, 方便使用Map+Reduce, API 较为底层, 算法适应性差硬件要求对内存有要求对机器要求低

4. Spark 中的 RDD

  1. RDD

  1. Spark

的核心,在

  1. RDD

之前

  1. MapReduce

的处理过程如下:

在这里插入图片描述
多个任务之间通过磁盘来共享数据,

  1. RDD

出现后的处理过程:

在这里插入图片描述
整个过程共享内存,不需要将中间结果存放在磁盘中。

5. RDD 的分区

  1. RDD

使用分区来分布式并行处理数据,做到尽量少的在不同的

  1. Executor

之间使用网络交换数据,使用

  1. RDD

读取数据的时候,会尽量的在物理上靠近数据源,比如在读取

  1. Cassandra

或者

  1. HDFS

中数据的时候,会尽量的保持

  1. RDD

的分区和数据源的分区数对应。

6. RDD 的 Shuffle

分区的主要作用是用来实现并行计算,但是往往在进行数据处理的时候,例如

  1. reduceByKey, groupByKey

等聚合操作时, 需要把

  1. Key

相同的

  1. Value

拉取到一起进行计算, 这个时候有可能这些

  1. Key

相同的

  1. Value

会坐落于不同的分区,因此需要进行

  1. Shuffle

对数据处理。

  1. Spark

  1. Shuffle

操作的特点:

  • 只有 Key-Value 型的 RDD 才会有 Shuffle 操作,但是有一个特例,就是 repartition 算子可以对任何数据类型 Shuffle
  • 早期版本 SparkShuffle 算法是 Hash base shuffle,后来改为 Sort base shuffle, 更适合大吞吐量的场景

7. Spark 运行模式

  1. Hadoop

  1. Mapreduce

类似,Spark 也有本地模式,和线上集群模式,不过不同的是,

  1. Spark

有自己的调度集群

  1. standalone

,并且支持

  1. Hadoop

  1. yarn

,一般情况下本地开发使用

  1. local

本地模式,生产环境可以使用

  1. standalone-HA

或者

  1. on yarn

二、Spark WordCount 演示

  1. WordCount

是大数据中的 和

  1. hello word

,前面在学习

  1. Hadopp Mapreduce

时,使用

  1. Mapreduce

的方式进行了实现,下面我们基于

  1. Spark

分别从

  1. Scala

语言、

  1. Java

语言、

  1. Python

语言进行实现,下面是

  1. Mapreduce

讲解时的实现文章:

https://blog.csdn.net/qq_43692950/article/details/127195121

下面我在本地

  1. D:/test/input

下,创建了一个

  1. txt

文件,内容如下:

  1. hello map reduce abc
  2. apple spark map
  3. reduce abc hello
  4. spark map

在这里插入图片描述

1. Scala 语言

  1. Spark

源码是使用

  1. Scala

语言开发的,因此使用

  1. Scala

开发是首选方案,如果对

  1. Scala

语言还不是很了解的,可以看下下面的教程学习下:

https://www.cainiaojc.com/scala/scala-tutorial.html

下面创建一个

  1. Maven

项目,在 pom 中加入

  1. scala

  1. spark

的依赖:

  1. <!--依赖Scala语言--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.11</version></dependency><!--SparkCore依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.1</version></dependency>

创建

  1. object
  1. WordCountScala

  1. object WordCountScala {def main(args: Array[String]):Unit={val conf =new SparkConf().setAppName("spark").setMaster("local[*]")val sc =new SparkContext(conf)
  2. sc.setLogLevel("WARN")//读取数据val textFile = sc.textFile("D:/test/wordcount/")//处理统计
  3. textFile.filter(StringUtils.isNotBlank)//过滤空内容.flatMap(_.split(" "))//根据空格拆分.map((_,1))// 构建减值,value 固定 1.reduceByKey(_ + _)// 同一个 key 下面的 value 相加.foreach(s => println(s._1 +" "+ s._2))}}

直接运行查看结果:

在这里插入图片描述

2. Java 语言

由于

  1. Java

  1. Scala

都是运行在

  1. JVM

之上的编程语言,这里可以直接在上面

  1. Scala

的项目中创建

  1. Java

类进行测试:

创建

  1. WordCountJava

测试类:

  1. publicclassWordCountJava{publicstaticvoidmain(String[] args){SparkConf conf =newSparkConf().setAppName("spark").setMaster("local[*]");JavaSparkContext sc =newJavaSparkContext(conf);
  2. sc.setLogLevel("WARN");//读取数据JavaRDD<String> textFile = sc.textFile("D:/test/wordcount/");//处理统计
  3. textFile.filter(StringUtils::isNoneBlank)//过滤空内容.flatMap(s ->Arrays.asList(s.split(" ")).iterator())//根据空格拆分.mapToPair(s ->newTuple2<>(s,1))// 构建减值,value 固定 1.reduceByKey(Integer::sum)// 同一个 key 下面的 value 相加.foreach(s->System.out.println(s._1 +" "+ s._2));}}

直接运行查看结果:
在这里插入图片描述

3. Python 语言

使用

  1. pyspark

前,先安装相关依赖:

  1. pip install pyspark
  2. pip install psutil
  3. pip install findspark

创建

  1. WordCountPy

测试脚本:

  1. from pyspark import SparkConf, SparkContext
  2. import findspark
  3. if __name__ =='__main__':
  4. findspark.init()
  5. conf = SparkConf().setAppName('spark').setMaster('local[*]')
  6. sc = SparkContext(conf=conf)
  7. sc.setLogLevel("WARN")# 读取数据
  8. textFile = sc.textFile("D:/test/wordcount/")# 处理统计
  9. textFile.filter(lambda s: s and s !='') \
  10. .flatMap(lambda s: s.split(" ")) \
  11. .map(lambda s:(s,1)) \
  12. .reduceByKey(lambda v1, v2: v1 + v2) \
  13. .foreach(lambda s:print(s[0]+" "+str(s[1])))

运行查看结果:

在这里插入图片描述

标签: spark scala java

本文转载自: https://blog.csdn.net/qq_43692950/article/details/128063998
版权归原作者 小毕超 所有, 如有侵权,请联系我们删除。

“Spark - 介绍及使用 Scala、Java、Python 三种语言演示”的评论:

还没有评论