0


Spark 的学习笔记

Spark 的学习笔记

文章目录

1. 概述

Apache Spark 是一个快速的,多用途的集群计算系统。它提供了 Java,Scala,Python 和 R 的高级 API,以及一个支持通用的执行图计算的优化过的引擎,它还支持一组丰富的高级工具,包括使用 SQL 处理结构化数据的 Spark SQL,用于机器学习的 MLlib,用于图计算的 GraphX,以及 Spark Streaming。

Spark官网下载https://spark.apache.org/downloads.html

Spark,是一种“One Stack to rule them all” 的大数据计算框架,期望用一个技术堆栈就完美地解决大数据领域的各种计算任务。Apache官方,对Spark的定义就是:通用的大数据快速处理引擎。

Spark 使用 Spark RDD 、Spark SQL、Spark Streaming,MLlib,GraphX 成功解决了大数据领域中,离线批处理、交互式查询、实时流计算、机器学习与图计算等最重要的任务和问题。

Spark 除了一站式的特点之外,另外一个最重要的特点,就是基于内存进行计算,从而让它的速度可以达到 MapReduce、Hive 的数倍甚至数十倍!

Spark 是一种通用的大数据计算框架,包含了大数据领域常见的各种计算框架:比如 Spark Core 用于离线计算,Spark SQL 用于交互式查询,Spark Streaming 用于实时流式计算,Spark MILlib 用于机器学习,Spark GraphX 用于图计算。

Spark 主要用于大数据的计算,而Hadoop以后主要用于大数据的存储(比如 HDFS、Hive,HBase等),以及资源调度(Yarn)。

Spark 优势及特点

(1)spark 计算速度块

Spark将每个任务构建成DAG进行计算,内部的计算过程通过弹性式分布式数据集RDD在内存上进行计算,相比于 hadoop 的mapreduce 效率提升了100 倍。

(2)易于使用

Spark 提供了大量的算子,开发只需调用相关api进行实现无法关注底层的实现原理。

相较于以前离线任务采用 mapreduce 实现,实时任务采用 storm 实现,目前这些都可以通过 spark 来实现,降低开发成本,同时 spark 通过Spark SQL 降低了用户的学习使用门槛,还提供了机器学习,图计算引擎等。

(3)支持多种的资源管理模式

学习使用中可以采用 local 模型进行任务的调试,在正式环境中又提供了 standalone,yarn 等模式,方便用户选择合适的资源管理模式进行适配。

(4)社区支持

spark 生态圈丰富,迭代更新块,成为大数据领域必备的计算引擎。

优秀的数据模型和丰富计算抽象

首先看看 MapReduce,它提供了对数据访问和计算的抽象,但是对于数据的复用就是简单的将中间数据写到一个稳定的文件系统中(例如 HDFS),所以会产生数据的复制备份,磁盘的 I/O 以及数据的序列化,所以在遇到需要在多个计算之间复用中间结果的操作时效率就会非常的低。而这类操作是非常常见的,例如迭代式计算,交互式数据挖掘,图计算等。

因此,AMPLab 提出了一个新的模型,叫做 RDD。

  • RDD 是一个可以容错且并行的数据结构(其实可以理解成分布式的集合,操作起来和操作本地集合一样简单),它可以让用户显式的将中间结果数据集保存在内存中,并且通过控制数据集的分区来达到数据存放处理最优化。同时 RDD 也提供了丰富的 API (map、reduce、filter、foreach、reduceByKey…)来操作数据集。

后来 RDD 被 AMPLab 在一个叫做 Spark 的框架中提供并开源。

Spark 生态圈

Spark 有完善的生态圈,如下:

  • Spark Core:实现了 Spark 的基本功能,包含 RDD 、任务调度、内存管理、错误恢复、与存储系统交互等模块。
  • Spark SQL:Spark 用来操作结构化数据的程序包。通过 Spark SQL,我们可以使用 SQL 操作数据。
  • Spark Streaming:Spark 提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API。
  • Spark MLlib:提供常见的及其学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。
  • GraphX(图计算):Spark 中用于图计算的API,性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂地图算法。
  • 集群管理器:Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。
  • Structured Streaming:处理结构化流,统一了离线和实时的API。

Spark 特点

  • :与 Hadoop 的 MapReduce 相比,Spark 基于内存的运算要快 100 倍以上,基于硬盘的运算也要快 10 倍以上。Spark 实现了高效的 DAG 执行引擎,可以通过基于内存来高效处理数据流。
  • 易用:Spark 支持 Java、Python、R 和 Scala 的 API,还支持超过 80 种高级算法,使用户可以快速构建不同的应用。而且 Spark 支持交互式的 Python 和 Scala 的shell,可以非常方便地在这些 shell 中使用 Spark 集群来验证解决问题地方法。
  • 通用:Spark 提供了统一地解决方案。Spark 可以用于批处理、交互式查询(Spark SQL )、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(Graph X),这些不同类型地处理都可以在同一个应用中无缝使用。
  • 兼容性:Spark 可以非常方便地与其他地开源产品进行融合。比如,Spark 可以使用 Hadoop 的 YARN 和 Apache Mesos 作为它的资源管理和调度器,并且可以处理所有 Hadoop 支持的数据,包括 HDFS、HBase 和 Cassandra等。这对于已经部署 Hadoop 集群的用户特别重要,因为不需要做任何数据迁移就可以使用 Spark 的强大处理能力。

Spark 与 Hadoop

尽管 Spark 相对于 Hadoop 而言具有较大优势,但 Spark 并不能完全替代 Hadoop,Spark 主要用于替代 Hadoop 中的 MapReduce 计算模型。存储依然可以使用HDFS,但是中间结果可以存放在内存中;调度可以使用 Spark 内置的,也可以使用更成熟的调度系统 YARN 等。
HadoopSpark类型分布式基础平台,包含计算,存储,调度分布式计算工具场景大规模数据集上的批处理迭代计算,交互式计算,流计算价格对机器要求低,便宜堆内存有要求,相对较贵编程范式Map+Reduce,API 较为底层,算法适应性差RDD组成 DAG 有向无环图,API 较为顶层,方便使用数据存储结构MapReduce 中间计算结果存在 HDFS 磁盘上,延迟大RDD 中间运算结果存在内存中,延迟小运行方式Task 以进程方式维护,任务启动慢Task 以线程方式维护,任务启动快
实际上,Spark 已经很好地融入了 Hadoop 生态圈,并成为其中的重要一员,它可以借助于 YARN 实现资源调度管理,借助于 HDFS 实现分布式存储。

此外,Hadoop 可以使用廉价的、异构的及其来做分布式存储与计算,但是,Spark 对硬件的要求稍高一些,对内存与 CPU 有一定的要求。

Spark与MR

MapReduce 能够完成的各种离线批处理功能,以及常见算法(比如 二次排序、topn等),基于Spark RDD 的核心编程,都可以实现,并且可以更好地、更容易地实现。而且基于 Spark RDD 编写地离线批处理程序,运行速度是MapReduce的数倍,速度上有非常明显的优势。

Spark 相较于 MapReduce 速度快的最主要原因就在于,MapReduce的计算模型太死板,必须是map-reduce模式,有时候即使完成一些诸如过滤之类的操作,也必须经过map-reduce 过程,这样就必须经过 shuffle过程。而 MapReduce 的 shuffle 过程是最消耗性能的,因为 shuffle 中间的过程必须基于磁盘来读写。而 Spark 的 shuffle 虽然也要基于磁盘,但是其大量transformation 操作,比如单纯的map或者filter等操作,可以直接基于内存进行pipeline操作,速度性能自然大大提升。

但是Spark也有其劣势。由于Spark基于内存进行计算,虽然开发容易,但是真正面对大数据的时候(比如一次操作针对10亿以上级别),在没有进行调优的情况下,可能会出现各种各样的问题,比如OOM内存溢出等等。导致Spark程序可能都无法完全运行起来,就报错挂掉了,而MapReduce即使是运行缓慢,但是至少可以慢慢运行完。

此外,Spark 由于是新崛起的技术新秀,因此在大数据领域的完善程度,肯定不如MapReduce,比如基于HBase、Hive 作为离线批处理程序的输入输出,Spark就远没有MapReduce来的完善。实现起来非常麻烦。

Spark Streaming与Storm

Spark Streaming 与 Storm 都可以用于进行实时流计算。但是他们两者的区别是非常大的。其中区别之一,就是 Spark Streaming 和 Storm 的计算模型完全不一样,Spark Streaming 是基于RDD的,因此需要将一小段时间内的,比如1秒内的数据,手机起来,作为一个 RDD,然后再针对这个 batch 的数据进行处理。而Storm 却可以做到每来一条数据,都可以立即进行处理和计算 。因此,Spark Streaming 实际上严格意义上来说,只能称作准实时的流计算框架;而Storm 是真正意义上的实时计算框架。

此外,Storm支持的一项高级特性,是 Spark Streaming 暂时不具备的,即 Storm 支持在分布式流式计算程序(Topolopy)在运行过程中,可以动态地调整并行度,从而动态提高并发处理能力。而Spark Streaming 是无法动态调整并行度的。

但是 Spark Streaming 也有其优点,首先 Spark Streaming由于是基于barch进行处理的,因此相较于 Storm 基于单挑数据进行处理,具有数倍甚至数十倍的吞吐量。

此外。Spark Streaming 由于也身处于Spark生态圈内,因此 Spark Streaming可以与 Spark Core、Spark SQL,甚至是 Spark Mllib.Spark GraphX 进行无缝整合。流式处理完的数据,可以立即进行各种 map、reduce 转换操作,可以立即使用sql进行查询,甚至可以立即使用macheine learning 或者图计算算法进行处理。这种一站式的大数据处理功能和优势,是 Storm 无法匹配的。

因此,综合上述来看,通常在对实时性要求特别高,而且实时数据量不稳定,比如在白天有高峰期的情况下,可以选择使用Storm。但是如果是对实时性要求一般,允许1秒的准实时处理,而且不要求动态调整并行度的话,选择Spark Streaming是更好的选择。

Spark SQL与Hive

Spark SQL实际上并不能完全替代Hive,因为Hive是一种基于HDFS的数据仓库,并且提供了基于SQL模型的,针对存储了大数据的数据仓库,进行分布式交互查询的查询引擎。

严格的来说,Spark SQL能够替代的,是Hive的查询引擎,而不是Hive本身,实际上即使在生产环境下,SparkSQL 也是针对Hive数据仓库中的数据进行查询,Spark 本身自己是不提供存储的,自然也不可能替代Hive作为数据仓库的这个功能。

Spark SQL 的一个优点,相较于Hive查询引擎来说,就是速度快,同样的SQL语句,可能使用Hive的查询引擎,由于其底层基于MapReduce,必须经过 shuffle 过程走磁盘,因此速度是非常缓慢的。很多复杂的SQL语句,在hive中执行都需要一个小时以上的时间。而 Spark SQL由于其底层基于自身内存的特点,因此速度达到了Hive查询引擎的数倍以上。

Spark 运行模式

① local 本地模式(单机)

  • 学习测试使用
  • 分为 local 单线程和 local-cluster 多线程。

② standalone 独立集群模式

  • 学习测试使用
  • 典型的 Mater/slave 模式。

③ standalone-HA 高可用模式

  • 生产环境使用
  • 基于 standalone 模式,使用 zk 搭建高可用,避免 Master 是有单点故障的。

④ on yarn 集群模式

  • 生产环境使用
  • 运行在 yarn 集群之上,由 yarn 负责资源管理,Spark 负责任务调度和计算。
  • 好处:计算资源按需伸缩,集群利用率高,共享底层存储,避免数据跨集群迁移。

⑤ on mesos 集群模式

  • 国内使用较少
  • 运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark 负责任务调度和计算。

⑥ on cloud 集群模式

  • 中小公司未来会更多的使用云服务
  • 比如 AWS 的 EC2,使用这个模式能很方便的访问 Amazon 的 S3。

2. 快速入门

使用 Spark Shell 进行交互式分析

基础

Spark shell 提供了一种来学习该 API 比较简单的方式,以及一个强大的来分析数据交互的工具。在 Scala (运行于 Java 虚拟机之上,并能很好的调用已存在的 Java 类库)或者 Python 中它是可用的。通过在 Spark 目录中运行一下的命令来启动它:

./bin/spark-shell 

Spark 的主要抽象是一个称为 Dataset 的分布式的 item 集合。Datasets 可以从 Hadoop 的 InputFormats(例如 HDFS文件)或者 通过其他的 Datasets 转换来创建。让我们从 Spark 源目录中的 README 文件来创建一个新的 Dataset:

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

可以直接从 Dataset 中获取 values(值),通过调用一些 actions(动作),或者 transform(转换) Dataset 以获得一个新的。

cala> textFile.count()// Number of items in this Dataset
res0:Long=126// May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first()// First item in this Dataset
res1:String= # Apache Spark

可以 transform 这个 Dataset 以获得一个新的。通过调用

filter

以返回一个新的 Dataset,它是文件中的 items 的一个子集。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

可以链式操作 transformation(转换) 和 action(动作):

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

Dataset 上的更多操作

Dataset actions(操作)和 transformations(转换)可以用于更复杂的计算。

例如,统计出现次数最多的行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

第一个 map 操作创建一个新的 Dataset,将一行数据 map 为一个整型值。在 Dataset 上调用

reduce

来找到最大的行计数。参数

map

reduce

是 Scala 函数(closures),并且可以使用 Scala/Java 库的任何语言特性。例如,我们可以很容易地调用函数声明,我们将定义一个 max 函数来使代码更易于理解:

cala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一种常见地数据流模式是被 Hadoop 所推广的 MapReduce。Spark 可以很容易实现 MapReduce:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

缓存

Spark 还支持 Pulling(拉取)数据集到一个集群范围的内存缓存中。例如当查询一个小的 “hot” 数据集或运行一个像 PageRANK 这样的迭代算法时,在数据被重复访问时是非常高效的。举一个简单的例子,让我们标记我们的

linesWithSpark

数据集到缓存中:

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

使用 Spark 来探索和缓存一个 100 行的文本文件看起来比较愚蠢。有趣的是,即时在他们跨越几十或者几百个节点时,这些相同的函数也可以用于非常大的数据集。

独立的应用

假设我们希望使用 Spark API 来创建一个独立的应用程序。我们在 Scala(SBT),Java(Maven)和 Python 中联系一个简单应用程序。

我们将在 Scala 中创建一个非常简单的 Spark 应用程序 - 很简单的,事实上,它名为

SimpleApp.scala

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

注意,这个应用程序我们应该定义一个

main()

方法而不是去扩展

scala.App

。使用

scala.App

的子类可能不会正常运行。

该程序仅仅统计了 Spark README 文件中每一行包含 ‘a’ 的数量和包含 ‘b’ 的数量。注意,需要将 YOUR_SPARK_HOME 替换为 Spark 安装的位置。不想先前使用 spark shell 操作的示例,它们初始化了它们自己的 SparkContext,我们初始化了一个 SparkContext 作为应用程序的一部分。

我们调用

SparkSession.builder

以构造一个 [[SparkSession]],然后设置 application name (应用名称),最终调用

getOrCreate

以获得 [[SparkSession]] 实例。

我们的应用依赖了 Spark API,所以我们将包含一个名为

build.sbt

的 sbt 配置文件,它描述了 Spark 的依赖。该文件也会添加一个 Spark 依赖的 repository:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"

为了让 sbt 正常的运行,我们需要根据经典的目录解构来布局

SimpleApp.scala

build.sbt

文件。在成功后,我们可以创建一个包含应用程序代码的 JAR 包,然后使用

spark-submit

脚本来运行我们的程序。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23

这个例子使用 Maven 来编译成一个 jar 应用程序,其他的构建系统(如Ant、Gradle)也可以。

我们会创建一个非常简单的 Spark 应用,

SimpleApp.java

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read.textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

这个程序计算 Spark README 文档中包含字母 ‘a’ 和 字母 ‘b’ 的行数。注意把 YOUR_SPARK_HOME 修改成你的Spark 的安装目录。跟之前的 Spark shell 不同,我们需要初始化 SparkSession。

把Spark依赖添加到Maven的

pom.xml

文件里。注意Spark的artifacts 使用 Scala版本进行标记。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>
  </dependencies>
</project>

我们按照Maven 经典的目录结构组织这些文件:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在我们用Maven打包这个应用,然后用

./bin/spark-submit

执行它。

# 打包包含应用程序的JAR
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# 用spark-submit来运行程序
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

3. Spark 编程指南

概述

在一个较高的概念上来说,每一个 Spark 应用程序由一个在集群上运行着用户的

main

函数和执行各种并行操作的 driver program (驱动程序)组成。

Spark 提供的主要抽象是一个

_弹性分布式数据集_

(RDD),它是可以执行并行操作且阔集群节点的元素的集合。RDD 可以从一个 Hadoop文件系统(或者任何其它 Hadoop 支持的文件系统),或者一个在 driver program(驱动程序)中已存在的 Scala 集合,以及通过 transforming(转换)来创建一个 RDD。用户为了让它在整个并行操作中更高效的启用,也许会让 Spark persist (持久化)一个 RDD 到内存中。最后,RDD 会自动地从节点故障中恢复。

在 Spark 中地第二个抽象是能够用于并行操作的 shared variables(共享变量),默认情况下,当 Spark 的一个函数作为一组不同节点上的任务运行时,它将每一个变量的副本应用到每一个任务的函数中去。有时候,一个变量需要在整个任务中,或者在任何和 driver program (驱动程序)之间来共享。Spark 支持两种类型的共享变量:broadcast variables(广播变量),它可以用于在所有节点上缓存一个值,和 accumulators(累加器),他是一个只能被 “added (增加)” 的变量,例如 counters 和 sums。

Spark 依赖

Spark 2.2.0 默认使用 Scala 2.11 来构建和发布直到运行。(当然,Spark 也可以与其它的 Scala 版本一起运行)。为了使用 Scala 编写应用程序,需要使用可兼容的 Scala 版本 (例如,2.11.X)。

要编写一个 Spark 的应用程序,您需要在 Spark 上添加一个 Maven 依赖。

Spark 可以通过 Maven 中央仓库获取:

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0 

此外,如果您像访问一个 HDFS 集群,则需要针对您的 HDFS 版本添加一个

hadoop-client

(hadoop 客户端) 依赖。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version> 

最好,需要导入一些 Spark classes (类) 到您的程序中去。添加下面几行:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在 Spark 1.3.0 之前,需要明确导入

org.apache.spark.SparkContext._

来启用必要的隐式转换。)

初始化 Spark

Spark 程序必须做的第一件事情是创建一个 SparkContext 对象,它会告诉Spark 如何访问集群。要创建一个

SparkContext

,首先需要构建一个包含应用程序的信息的 SparkConf 对象。

每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用

stop()

该方法停止活跃的 SparkContext。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
appName

参数是一个在集群 UI 上展示应用程序的名称。

master

是一个 Spark,Mesos 或 YARN 的 cluster URL,或者指定为在 local mode (本地模式)中运行的 “local” 字符串。在实际工作中,当在集群上运行时,不希望在程序中将 master 给硬编码,而是用 使用

spark-submit

启动应用 并且接收它。然而,对于本地测试和单元测试,可以通过 “local” 来运行 Spark 进程。

使用 Shell

在 Spark Shell 中,一个特殊的 interpreter-aware(可用的解析器)SparkContext 已经为您创建好了,称之为

sc

的变量。创建您自己的 SparkContext 将不起作用。您可以使用

--master

参数设置这个 SparkContext 连接到哪一个 master 上,并且您可以通过

--jars

参数传递一个逗号分隔的列表来添加 JARs 到 classpath 中。也可以通过

--packages

参数应用一个用逗号分隔的 maven coordinates (maven 坐标)方式来添加依赖(例如,Spark 包)到您的 shell session 中去。任何额外存在且依赖的仓库(例如Sonatype)可以传递到

--repositories

参数。例如,要明确使用四个核 (CPU) 来运行

bin/spark-shell

,使用:

$ ./bin/spark-shell --master local[4]

或者,也可以添加

code.jar

到它的 classpath 中去,使用:

$ ./bin/spark-shell --master local[4] --jars code.jar

为了包含一个依赖,使用 Maven 坐标:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

弹性分布式数据集(RDDs)

Spark 主要以一个 弹性分布式数据集(RDD)的概念为中心,它是一个容错且可以执行并行操作的元素的集合。有两种方法可以创建 RDD:在你的 driver program (驱动程序)中 parallelizing 一个已存在的集合,或者在外部存储系统中引用一个数据集,例如,一个共享文件系统,HDFS,HBase,或者提供 Hadoop InputFormat 的任何数据源。

并行集合

可以在 driver program (a Scala

Seq

) 中已存在的集合上通过调用

SparkContext

parallelize

方法来创建并行集合。该集合的元素从一个可以并行操作的 distributed dataset (分布式数据集)中复制到另一个 dataset(数据集)中去。例如,这里是一个如何去创建一个保存数字 1~5 的并行集合。

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

在创建后,该 distributed dataset(分布式数据集)(

distData

)可以并行的执行操作。例如,我们可以调用

distData.reduce((a,b)=&gt;a+b)

来合计数组中的元素。

并行集合的一个重要参数是分区(partition),即这个分布式数据集可以分割为多少片。Spark 中每个任务(task)都是基于分区的,每个分区一个对应的任务(task)。典型场景

下,一般每个CPU对应2~4个分区。并且一般而言,Spark会基于集群的情况,自动设置这个分区数。当然,还是可以手动控制这个分区数,只需给parallelize 方法再传一个参数即可(如:sc.parallelize(data, 10))。注意:Spark 代码里有些地方仍然使用分片(slice)这个术语,这只不过是分区的一个别名,主要为了保持向后兼容。

外部数据集

Spark 可以通过 Hadoop 所支持的任何数据源来创建分布式数据集,包括:本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark 支持的文件格式包括:文本文件(text files)、SequenceFiles,以及其他 Hadoop 支持的输入格式(InputFormat)。

文本文件创建 RDD 可以用 SparkContext.textFile 方法。这个方法输入参数是一个文件的 URI(本地路径,或者 hdfs://,s3n://等),其输出RDD是一个文本行集合。如下是一个简单示例:

scala> val distFile = sc.textFile(“data.txt”) distFile: RDD[String] = MappedRDD@1d4cee08

创建后,distFile 就可以执行数据集的一些操作。比如,我们可以把所有文本行的长度加和:distFile.map(s => s.length).reduce((a,b) => a + b)

以下是一些 Spark 读取文件的要点:

  • 如果是本地文件系统,那么这个文件必须在所有的 worker 节点上能够以相同的路径访问到。所以要么把文件复制到所有 worker 节点上同一路径下,要么挂载一个共享文件系统。
  • 所有 Spark 基于文件输入的方法 (包括 textFile)都支持输入参数为:目录,压缩文件,以及通配符。例如:textFile(“/my/directory”),textFile(“/my/directory/.txt”),以及textFile(“/my/directory/.gz”)
  • textFile 方法同时还支持一个可选参数,用以控制数据的分区个数。默认地,Spark会为文件的每一个 block 创建一个分区(HDFS 上默认 block 大小为 64MB),你可以通过调整这个参数来控制数据的分区数。注意,分区数不能少于 block 个数。除了文本文件之外,Spark 的 Scala API 还支持其他几种数据格式。
  • SparkContext.wholeTextFiles 可以读取一个包含很多小文本文件的目录,并且以(filename,content)键值对的形式返回结果,这与 textFile 不同,textFile只返回文件的内容,每行作为一个元素。
  • 对于 SequenceFiles,可以调用 SparkContext.sequenceFile[K,V],其中 K 和 V 分别是文件中 key 和 value 的类型。这些类型都应该是 Writable 接口的子类,如:IntWritable and Text等。另外,Spark 允许你为一些常用 Writable 指定原生类型,例如:sequenceFile[Int,String] 将自动读取 IntWritable 和 Text。
  • 对于其他的 Hadoop InputFormat,你可以用 SparkContext,hadoopRDD 方法,并传入任意的 JobConf 对象和 InputFormat,以及 key class、value class。这和设置 Hadoop job 的输入源是同样的方法。你还可以使用 SparkContext.nextAPIHadoopRDD,该方法接收一个基于新版 Hadoop MapReduce API(org.apache.hadoop.mapreduce) 的 InputFormat 作为参数。
  • RDD.saveAsObjectFile 和 SparkContext.objectFile 支持将 RDD 中元素以 Java 对象序列化的格式保存成文件。虽然这种序列化方式不如 Avro 效率高,却为保存 RDD提供了一种简便方式。

RDD 算子

RDD 支持两种类型的算子:transformation 和 action。transformation 算子可以将已有 RDD转换得到一个新的RDD,而action算子则是基于RDD的计算,并将结果返回给驱动器(driver)。例如,map 是一个 transformation 算子,它将数据集中每个元素传给一个指定的函数,并将该函数返回结果构建为一个新的RDD;而 reduce 是一个 action 算子,它可以将 RDD 中所有元素传给指定的聚合函数,并将最终的聚合结果返回给驱动器(还有一个 reduceByKey 算子,其结果的聚合结果是一个RDD)。

Spark 中所有 transformation 算子都是懒惰的,也就是说,transformation 算子并不立即计算结果,而是记录下对基础数据集(如:一个数据文件)的转换操作。只有等到某个action 算子所记录的操作才会被计算。这种设计使 Spark 可以运行得更加高效-例如,map s算子创建了一个数据集,同时该数据集下一步会调用reduce算子,那么Spark将只会返回 reduce的最终聚合结果(单独的一个数据)给驱动器,而不是将map 所产生的数据集整个返回给驱动器。

默认情况下,每次调用action 算子的时候,每个由 transformation 转换得到的RDD都会被重新计算。然而,你也可以通过调用persist(或者cache)操作来持久化一个RDD,这意味着 Spark 将会把RDD的元素都保存在集群中,因此下一次访问这些元素的速度将大大提高。同时,Spark 还支持将RDD元素持久化到内存或者磁盘上,甚至可以支持跨节点多副本。

4. Spark 基本原理

(1)Spark Core

Spark Core 是Spark的核心,其包含如下几个部分:

在这里插入图片描述

1、spark 基础配置

SparkContext是Spark应用程序的入口,spark应用程序的提交和执行离不开sparkContext,它隐藏了网络通信,分布式部署,消息通信,存储体系,计算存储等,开发人员只需要通过sparkContext等api进行开发即可。

sparkRpc 基于netty实现,分为异步和同步两种方式。事件总线主要用于sparkContext组件间的交换,它属于监听者模式,采用异步调用。度量系统主要用于系统的运行监控。

2、spark 存储系统

它用于管理spark运行中依赖的数据存储方式和存储位置,spark的存储系统优先考虑在各节点以内存的方式存储数据,内存不足时将数据写入磁盘中,这也是spark计算性能高的重要原因。

我们可以灵活的控制数据存储在内存还是磁盘中,同时可以通过远程网络调用将结果输出到远程存储中,比如hdfs,hbase等。

3、spark 调度系统

spark 调度系统主要由DAGScheduler和TaskScheduler组成。

DAGScheduler 主要是把一个Job根据RDD间的依赖关系,划分为多个Stage,对于划分后的每个Stage都抽象为一个或多个Task组成的任务集,并交给TaskScheduler来进行进一步的任务调度。而TaskScheduler负责对每个具体的Task进行调度。

具体调度算法有FIFO,FAIR:

  • FIFO调度:先进先出,这是Spark默认的调度模式。
  • FAIR调度:支持将作业分组到池中,并为每个池设置不同的调度权重,任务可以按照权重来决定执行顺序。

(2)Spark SQL

spark sql 提供了基于sql的数据处理方法,使得分布式的数据集处理变的更加简单,这也是spark 广泛使用的重要原因。

目前大数据相关计算引擎一个重要的评价指标就是:是否支持sql,这样才会降低使用者的门槛。spark sql 提供了两种抽象的数据集合 DataFrame和DataSet。

DataFrame 是 spark sql 对结构化数据的抽象,可以简单的理解为 spark中的表,相比较于RDD多了数据的表结构信息(schema).DataFrame = Data + schema

RDD是分布式对象集合,DataFrame是分布式Row的集合,提供了比RDD更丰富的算子,同时提升了数据的执行效率。

DataSet 是数据的分布式集合,它具有RDD强类型的优点 和Spark SQL 优化后执行的优点。DataSet 可以由jvm 对象构建,然后使用map,filter,flatmap 等操作函数操作。

(3)Spark Streaming

Spark Streaming 是一个基于 Spark Core 之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的处理,具有高吞吐量和容错能力强等特点。

在这里插入图片描述

这个模块主要是对流数据的处理,支持流数据的可伸缩和容错处理,可以与Flume和Kafka等已建立的数据源集成。Spark Streaming的实现,也使用RDD抽象的概念,使得在为流数据编写应用程序时更为方便。

Sparking Streaming 的特点

  • 易用:可以像编写离线批处理一样去编写流式程序,支持 java/scala/python 语言。
  • 容错:SparkingStreaming 在没有额外代码和配置的情况下可以恢复丢失的工作。
  • 易整合到 Spark 体系:流式处理与批处理和交互式查询相结合。

整体流程

  • ① Spark Streaming 中,会有一个接收器组件 Receiver,作为一个长期运行的 task 泡在一个 Executor 上,Receiver 接收外部的数据流形成 input DStream。
  • ② DStream 会被按照时间间隔划分成一批一批的RDD,当批处理间隔缩短到秒级时,便可以用于处理实时数据流(时间间隔的大小可以由参数指定,一般设在500毫秒到几秒之间)。
  • ③ 对 DStream 进行操作就是对 RDD 进行操作,计算处理的结果可以传给外部系统。
  • ④ 接受到实时数据后,给数据分批次,然后传给 Spark Engine 处理最后生成该批次的结果。

数据抽象

Spark Streaming 的基础抽象是 DStream(Discretized Stream,离散化数据流,连续不断的数据流),代表持续性的数据流和经过各种 Spark 算子操作后的结果数据流。

可以从以下多个角度深入理解 DStream:

① DStream 本质上就是一系列时间上连续的 RDD:

在这里插入图片描述

② 对 DStream 的数据的进行操作也是按照 RDD为单位来进行的:

在这里插入图片描述

③ 容错性,底层 RDD 之间存在依赖关系,DStream 直接也有依赖关系,RDD 具有容错性,那么 DStream 也具有容错性。

④ 准实时性/近实时性

  • Spark Streaming 将流式计算分解成多个 Spark Job,对于每一时间段数据的处理都会经过 Spark DAG 图分解以及 Spark 的任务集的调度过程。
  • 对于目前版本的 Spark Streaming 而言,其最小的 Batch Size 的选取在 0.5~5 秒钟之间。

所以 Spark Streaming 能够满足流式准实时计算场景,对实时性要求非常高的如高频实时交易场景则不太适合。

总结:简单来说 DStream 就是对 RDD 的封装,你对 DStream 进行操作,就是对 RDD 进行操作。对于 DataFrame /DataSet/DStream 来说本质上都可以理解成 RDD。

(4)Spark 基本工作原理

Spark基本工作原理的理解,其最主要的是要搞清楚什么是RDD以及RDD的特性。深刻理解了RDD的特性,也就理解了数据在spark中是如何被处理的(spark的基本工作原理)

那么RDD是什么,官方说法:

RDD是Spark提供的核心抽象,全称为 Resillient Distributed Dataset,即弹性分布式数据集。

最简单的理解:

RDD就是源数据的抽象,或者叫映射,代表。也就是说,数据要被spark进行处理,在处理之前的首要任务就是要将数据映射成RDD,对于spark来说,RDD才是我们处理数据的规则,我只认RDD,只有RDD,通过spark的计算引擎,才能发挥巨大的威力!

1、分布式数据集

RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。

RDD在抽象上来说是一种元素集合,包含了数据。它是被区分的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。

在这里插入图片描述

2、弹性

RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。

在这里插入图片描述

3、迭代式处理

在这里插入图片描述

对节点1、2、3、4上的数据进行处理完成之后,可能会移动到其他的节点内存中继续处理!Spark与Mr最大的不同在于迭代式计算模型:MR分为两个阶段。map 和 reduce,两个阶段处理完了就结束了,所以我们在一个job中能做的处理很有限,只能在map和reduce中处理;而spark计算过程可以分为n个阶段,因为他是内存迭代式的,我们在处理完一个阶段之后,可以继续往下处理很多阶段,而不是两个阶段。所以Spark相较于MR,计算模型可以提供更强大的功能。

4、容错性

RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来,即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。

在这里插入图片描述

5. Spark 运行模式及集群角色

(1)Spark 运行模式

运行模式运行类型说明local本地模式常用于本地开发测试,分为local单线程和local-cluster多线程模式standalone集群模式独立模式,在spark自己的资源调度管理框架上运行,该框架采用master/salve结构yarn集群模式在yarn资源管理器框架上运行,由yarn负责资源管理,spark负责任务调度和计算mesos集群模式在mesos资源管理器框架上运行,由mesos负责资源管理,spark负责任务调度和计算k8s集群模式在k8s上运行

(2)Spark集群角色

下图是Spark的集群角色图,主要由集群管理节点cluster manager,工作节点worker,执行器executor,驱动器driver和应用程序application 五部分组成,下面详细说明每部分的特点。

在这里插入图片描述

1、Cluster Manager

集群管理器,它存在于Master进程中,主要用来对应用程序申请的资源进行管理,根据其部署模式的不同,可以分为local,standalone,yarn,mesos等模式。

2、worker

worker是spark的工作节点,用于执行任务的提交,主要工作职责有下面四点:

  • worker节点通过注册机向cluster manager汇报自身的cpu内存等信息。
  • worker 节点在spark master作用下创建并启用executor,executor是真正的计算单元。
  • spark master将任务Task分配给worker节点上的executor并执行运用。
  • worker节点同步资源信息和executor状态信息给cluster manager。

在这里插入图片描述

在 yarn 模式下运行 worker 系欸但一般指的是Nodemanager节点,standalone模式下运行一般指的是slave节点。

3、executor

executor 是真正执行计算任务的组件,它是application运行在worker上的一个进程。这个进程负责Task的运行,它能够将数据保存在内存或磁盘存储中,也能够将结果数据返回给Driver。

4、Application

application 是Spark API 编程的应用程序,它包括实现Driver功能的代码和在程序中各个executor上要执行的代码,一个application有多个job组成。其中应用程序的入口为用户所定义的main方法。

5、Driver

驱动器节点,它是一个运行Application中main函数并创建SparkContext的进程。application通过Driver 和 Cluster Manager 及 executor 进行通讯。它可以运行在application 节点上,也可以由application 提交给 Cluster Manager,再由Cluster Manager 安排worker 进行运行。

Driver 节点也负责提交Job,并将Job转化为Task,在各个Executor进程间协调Task的调度。

6、sparkContext

sparkContext是整个Spark应用程序最关键的一个对象,是Spark所有功能的主要入口点。核心作用是初始化spark影城程序所需要的组件,同时还负责向master程序进行注册等。

(3)Spark其他核心概念

1、RDD

它是Spark中最重要的一个概念,是弹性分布式数据集,是一种容错的、可以被并行操作的元素集合,是Spark对所有数据处理的一种基本抽象。可以通过一系列的算子对rdd进行操作,主要分为 Transformation 和 Action 两种操作。

  • Transformation(转换):是对已有的RDD进行换行生成新的RDD,对于转换过程采用惰性计算机制,不会立即计算出结果。常用的方法有map,filter,flatmap等。
  • Action(执行):对数据执行计算产生结果,并将结果返回Driver或者写入到外部存储中。常用到方法有reduce,collect,saveAsTextFile等。

在这里插入图片描述

2、DAG

DAG是一个有向无环图,在Spark中,使用 DAG 来描述我们的计算逻辑。主要分为 DAG Scheduler 和 Task Scheduler。

在这里插入图片描述

3、DAG Scheduler

DAG Scheduler 是面向stage的高层级的调度器,DAG Scheduler把DAG拆分为多个Task,每组Task都是一个stage,解析时是以shuffle为边界进行反向构建的,每当遇见一个shuffle,spark就会产生一个新的stage,接着以TaskSet的形式提交给底层的调度器(task scheduler),每个 Stage 封装成一个TaskSet。DAG Scheduler 需要记录RDD被存入磁盘物化等动作,同时会需要Task寻找最优等调度逻辑,以及监控因shuffle跨节点输出导致的失败。

在这里插入图片描述

4、Task Scheduler

Task Scheduler 负责每一个具体任务的执行。它的主要职责包括

  • 任务集的调度管理
  • 状态结果跟踪
  • 物理资源调度管理
  • 任务执行
  • 获取结果

5、Job

Job 是有多个 stage构建的并行的计算任务,job 是由spark 的action 操作来触发的,在spark 中一个job包含多个RDD以及作用在RDD的各种操作算子。

在这里插入图片描述

6、stage

DAG Scheduler会把 DAG切割成多个相互依赖的 Stage,划分Stage的一个依据是RDD间的宽窄依赖。

在对Job中的所有操作划分Stage时,一般会按照倒序进行,即从Action开始,遇到窄依赖操作,则划分到同一个执行阶段,遇到宽依赖操作,则划分一个新的执行阶段,且新的极端为之前阶段的parent,然后依次类推递归执行。

child Stage 需要等待所有的 parent Stage执行完之后才可以执行,这时Stage之间根据依赖关系构成了一个大粒度的DAG。在一个Stage内,所有的操作以串行的Pipeline的方式,由一组Task完成计算。

7、TaskSet Task

TaskSet 可以理解为一种任务,对应一个 stage,是Task 组成的任务集。一个TaskSet中的所有Task没有shuffle依赖可以并行计算。

Task是spark中最独立的计算单元,由Driver manager发送到executer执行,通常情况一个task处理spark RDD一个partition。Task分为ShuffleMap Task和ResultTask两种,位于最后一个Stage的Task为ResultTask,其他阶段的属于ShuffleMapTask。

6. Spark作业运行流程

(1)Spark作业运行流程

spark应用程序以进程集合为单位在分布式集群上运行,通过driver程序的main方法创建sparkContext的对象与集群进行交互。具体运行流程如下:

  • sparkContext 向cluster Manager申请CPU,内存等计算资源。
  • cluster Manager分配应用程序执行所需要的资源,在worker节点创建executor。
  • sparkContext将程序代码和task任务发送到executor上进行执行,代码可以是编译成的jar包或者python文件等。接着sparkContext会手机结果到Driver端。

在这里插入图片描述

(2)Spark RDD迭代过程

  • sparkContext创建RDD对象,计算RDD间的依赖关系,并组成一个DAG有向无环图。
  • DAGScheduler将DAG划分为多个stage,并将stage对应的TaskSet提交到集群的管理中心,stage的划分依据是RDD中的宽窄依赖,Spark遇见宽依赖就会划分为一个stage,每个stage中包含来一个或多个task任务,避免多个stage之间消息传递产生的系统开销。
  • taskScheduler 通过集群管理中心为每一个task申请资源并将task提交到worker的节点上进行执行。
  • worker 上的executor执行具体的任务。

在这里插入图片描述

(3)Yarn资源管理器介绍

spark 程序一般是运行在集群上的,spark on yarn 是工作或生产上用的非常多的一种运行模式。

没有yarn模式前,每个分布式框架都要跑在一个集群上面,比如说Hadoop要泡在一个集群上,Spark用集群的时候泡在standalone上。这样的话整个集群的资源的利用率低,且管理起来比较麻烦。

yarn是分布式资源管理和任务管理,主要由ResourceManager,Nodemanager和ApplicationMaster三个模块组成。

在这里插入图片描述

ResourceManager 主要负责集群的资源管理,监控和分配。对于所有的应用它有绝对的控制权和资源管理权限。

NodeManager 负责节点的维护,执行和监控task运行状况。会通过心跳的方式向Resourcemanager汇报自己的资源使用情况。

yarn资源管理器的每个节点都运行着一个NodeManager,是ResourceManager的代理。如果主节点的ResourceManager宕机后,会连接ResourceManager的备用节点。

ApplicationMaster负责具体应用程序的调度和资源的协调,它会与ResourceManager协商进行资源申请。ResourceManager以container容器的形式将资源分配给application进行运行。同时负责任务的启停。

container 是资源的抽象,它封装着每个节点上的资源信息(cpu,内存,磁盘,网络等),yarn将任务分配到container上运行,同时该任务只能使用container描述的资源,达到各个任务间资源的隔离。

(4)Spark程序在Yarn上执行流程

spark on yarn 分为两种模式yarn-client模式,和yarn-cluster模式,一般线上采用的是yarn-cluster模式。

1、yarn-client模式

driver在客户端本地执行,这种模式可以使得spark application和客户都但进行交互,因为driver在客户端可以通过webUI访问driver的状态。同时Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加。

2、yarn-cluster模式

Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台Nodemanage中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。

下图是yarn-cluster运行模式:

在这里插入图片描述

  • client 向yarn 提交应用程序,包含ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等。
  • ApplicationMaster 程序启动ApplicationMaster的命令、需要在Executor中运行的程序等。
  • ApplicationMaster 向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态。
  • ApplicationMaster申请到资源(也就是Container)后,便与对应的Nodemanager通信,启动Task。
  • Task向Applicationmaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。
  • 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。

7. Spark SQL概述及特点详解

(1)概念

它主要用于结构化数据处理和对Spark数据执行类SQL的查询。通过Spark SQL,可以针对不同格式的数据执行ETL操作(如JSON,Parquet,数据库)然后完成特定的查询操作。一般来说,Spark没支持一种新的应用开发,都会引入一个新的Context及相应的RDD,对于SQL这一特性来说,引入的就是SQLContext和SchemaRDD。注意:在Spark1.3之后,Schema RDD已经更名为DataFrame,但它本质就类似一个RDD,因为可以将DataFrame无缝的转换成一个RDD。

  • Hive 是将 SQL 转为 MapReduce。
  • SparkSQL 可以理解成是将 SQL 解析成:“RDD + 优化” 再执行

数据分类

数据分为如下几类:
定义特点举例结构化数据有固定的 Schema有预定义的 Schema关系型数据库的表半结构化数据没有固定的 Schema,但是有结构没有固定的 Schema,有结构信息,数据一般是自描述的指一些有结构的文件格式,例如JSON非结构化数据没有固定 Schema,也没有结构没有固定 Schema,也没有结构指图片/音频之类的格式
总结

  • RDD 主要用于处理非结构化数据、板结构化数据、结构化;
  • SparkSQL 是一个既支持 SQL 又支持命令式数据处理的工具;
  • SparkSQL 主要用于处理结构化数据(较为规范的半结构化数据也可以处理)。

(2)Spark SQL 的特点

  • 提供标准化的SQL支持和子查询支持
  • 支持多种数据源:Hive、RDD、Parquet、JSON、JDBC等。
  • 多种性能优化技术:in memory columnar storage、byte code generation、cost model动态评估等。
  • 组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户都可以自己重新开发,并且动态扩展。

1、内存列存储(in-memory columnar storage)

内存列存储意味着,Spark SQL的数据,不是使用Java对象的方式来进行存储,而是使用面向列的内存存储的方式来进行存储。也就是说,每一列,作为一个数据存储的单位。从而大大优化了内存使用的效率。采用了内存列存储之后,减少了对内存的消耗,也就避免了gc大量数据的性能开销。

2、字节码生成技术(byte code generation)

Spark SQL 在其catalyst模块的expressions中增加了codegen模块,对于SQL语句中的计算表达式,比如select num + num from t 这种的sql,就可以使用动态字节码生成技术来优化其性能。

3、Scala代码编写的优化

对于Scala代码编写中,可能会造成较大性能开销的地方,自己重写,使用更加复杂的方式,来获取更好的性能。比如Option样例类、for循环、map/filter/foreach等高阶函数,以及不可变对象,都改成了用null、while循环等来实现,并且重用可变的对象。

在这里插入图片描述

(3)Spark SQL 数据抽象

DataFrame 和 DataSet

Spark SQL数据抽象可以分为两类:

① DataFrame:DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格,带有 Schema 元信息(可以理解为数据库的列名和类型)。DataFrame = RDD + 泛型 + SQL的操作 + 优化

② DataSet:DataSet 是 DataFrame的进一步发展,它比 RDD 保存了更多的描述信息,概念上等同于关系型数据库中的二维表,它保存了类型信息,是强类型的。提供了编译时类型检查。调用 Dataset 的方法先会生成逻辑计划,然后被 spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行!DataFrame = Dataset[Row]

RDD、DataFrame、DataSet 的关系如下:

在这里插入图片描述

  • RDD[Person]:以 Person 为类型参数,但不了解其内部结构。
  • DataFrame:提供了详细的结构信息 schema 列的名称和类型。这样看起来就像一张表了。
  • DataSet[Person]:不光有 schema 信息,还有类型信息。

总结

  • DataFrame = RDD - 泛型 + Schema + SQL + 优化
  • DataSet = DataFrame + 泛型
  • DataSet = RDD + Schema + SQL + 优化

两种查询风格:DSL 和 SQL

DSL风格示例:

personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show

SQL 风格示例:

spark.sql("select * from t_person").show

总结

  • DataFrame 和 DataSet 都可以通过 RDD 来进行创建;
  • 也可以通过读取普通文本创建-注意:直接读取没有完整的约束,需要通过 RDD + Schema;
  • 通过 json/parquet 会有完整的约束;
  • 不管是 DataFrame 还是 DataSet 都可以注册成表,之后就可以使用 SQL 进行查询了!也可以使用 DSL!

(4)Spark SQL 与 Hive 的区别

什么是Spark SQL

  • Spark SQL 是Spark中准们用来处理结构化数据(每一行数据都遵循Schema信息建表时表的字段及其类型)的一个模块
  • 提供了 DataFrame/Dataset 的对分布式数据处理的基本抽象
  • 其实之上是一个分布式的 SQL 引擎

什么是 Hive

  • 数据仓库,能使用 SQL 读取、写入和管理存在于分布式存储架构上的大数据集
  • 结构可以映射到已经存储的数据上
  • 用户连接 Hive 可以使用命令行工具和 JDBC 驱动

两者的区别

都支持ThriftServer服务,为JDBC提供解决方案,区别如下:

**Spark SQL **

  • 是Spark的一个库文件
  • Spark SQL 元数据可有可无
  • Spark SQL 中 schema 是自动推断的
  • 支持标准 SQL 语句,也支持 HQL 语句等
  • 从开发角度来讲,即支持SQL方式开发,也支持HQL开发,还支持函数式编程(DSL)实现SQL语句

Hive

  • 是一个框架
  • Hive中必须有元数据,一般由 MySql 管理,必须开启 metastore 服务
  • Hive 中在建表时必须明确使用 DDL 声明 schema
  • 只支持 HQL 语句

Hive:处理海量数据,比如一个月、一个季度、一年的数据量,依然可以处理,虽然很慢;

Spark SQL:这种情况下 Spark SQL 不支持,无法处理;

8. Spark Core

RDD 详解

RDD 概念

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

RDD 单词拆解:

  • Resilient:它是弹性的,RDD 里面的数据可以保存在内存中或者磁盘里面;
  • Distributed:它里面的元素是分布式存储的,可以用于分布式计算;
  • Dataset:它是一个集合,可以存放很多元素。

RDD属性

RDD 的源码描述如下:

在这里插入图片描述

其含义如下:

  • A list of partitions:一组分片(Partition)/一个分区(Partition)列表,即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,分片数决定并行度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。
  • A function for computing each split:一个函数会被作用在每一个分区。Spark 中 RDD 的计算是以分区为单位的,compute 函数会被作用到每个分区上。
  • A list of dependencies on other RDDs:一个 RDD 会依赖于其他多个 RDD。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。(Spark 的容错机制)
  • **Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)**:可选项,对于 KV 类型的 RDD 会有一个 Partitioner,即 RDD 的分区函数,默认为 HashPartitioner。
  • **Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)**:可选项,一个列表,存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在任务调度的时候,会尽可能选择那些存有数据的 worker 节点来进行任务计算。

总结:RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算,主要属性包括:

  • 分区列表
  • 计算函数
  • 依赖关系
  • 分区函数(默认是 hash)
  • 最佳位置

分区列表、分区函数、最佳位置,这三个属性其实说的就是数据集在哪,在哪计算更合适,如何分区;

计算函数、依赖关系,这两个属性其实说的是数据集怎么来的。

RDD API

RDD的创建方式

① 由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase等

val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

② 通过已有的 RDD 经过算子转换生成新的 RDD:

val rdd2=rdd1.flatMap(_.split(" "))

③ 由一个已经存在的 Scala集合创建:

val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

makeRDD 方法底层调用了 parallelize 方法:

在这里插入图片描述

RDD 算子

RDD 的算子分为两类:

  • Transformation 转换操作:返回一个新的 RDD
  • Action 动态操作:返回值不是 RDD(无返回值或返回其他的)

注意:

  1. RDD 不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数)。
  2. RDD 中的所有准换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给 Driver 的 Action 动作时,这些转换才会真正运行。
  3. 之所以使用惰性求值/延迟执行,是因为这样可以在 Action 时对 RDD 操作形成 DAG 有向无环图进行 Stage的划分和并行优化,这种设计让 Spark 更加有效率地运行。

Transformation 转换算子:
转换算子含义map(func)返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成filter(func)返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成flatMap(func)类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素)mapPartitions(func)类似于 map,但独立地在 RDD的每一个分片上运行,因此在类型为 T 的 RDD 上运行时, func 的函数类型必须时 Iterator[T] => Iterator[U]mapPartitionsWithIndex(func)类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时, func 的函数类型必须是(Int,Iterator[T]) => Iterator[U]sample(withReplacement, fraction, seed)根据 fraction 指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed 用于指定随机数生成器种子union(otherDataset)对源 RDD 和参数 RDD 求并集后返回一个新的 RDDintersection(otherDataset)对源 RDD 和参数 RDD 求交集后返回一个新的 RDDdistinct([numTasks]))对源 RDD 进行去重后返回一个新的 RDDgroupByKey([numTasks])在一个(K,V)的 RDD 上调用,返回一个(K, Iterator[V])的 RDDreduceByKey(func, [numTasks])在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])对 PairRDD 中相同的 Key 值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和 aggregate 函数类似,aggregateByKey 返回值的类型不需要和 RDD 中 value 的类型一致sortByKey([ascending], [numTasks])在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDDsortBy(func,[ascending], [numTasks])与 sortByKey 类似,但是更灵活join(otherDataset, [numTasks])在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的 RDDcogroup(otherDataset, [numTasks])在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDDcartesian(otherDataset)笛卡尔积pipe(command, [envVars])对 rdd 进行管道操作coalesce(numPartitions)减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作repartition(numPartitions)重新给 RDD 分区
Action 动作算子:
动作算子含义reduce(func)通过 func 函数聚集 RDD 中的所有元素,这个功能必须时可交换且可并联的collect()在驱动程序中,以数组的形式返回数据集的所有元素count()返回 RDD 的元素个数first()返回 RDD 的第一个元素(类似于 take(1))take(n)返回一个由数据集的前 n 个元素组成的数组takeSample(withReplacement,num, [seed])返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定随机数生成器种子takeOrdered(n, [ordering])返回自然顺序或者自定义顺序的前 n 个元素saveAsTextFile(path)将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本saveAsSequenceFile(path)将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统saveAsObjectFile(path)将数据集的元素,以 Java 序列化的方式保存到指定的目录下countByKey()针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数foreach(func)在数据集的每一个元素上,运行函数 func 进行更新foreachPartition(func)在数据集的每一个分区上,运行函数 func
统计操作
算子含义count个数mean均值sum求和max最大值min最小值variance方差sampleVariance从采样中计算方差stdev标准差:衡量数据的离散程度sampleStdev采用的标准差stats查看统计结果

RDD 持久化/缓存

某些 RDD 的计算或转换可能会比较耗费时间,如果 这些 RDD 后续还会频繁的被使用到,那么可以将这些 RDD 进行持久化/缓存:

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了

persist 方法和 cache 方法

RDD 通过 persist 或 cache 方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

通过查找 RDD 的源码发现 cache 最终也是调用了 persist 无参方法(默认存储只存在内存中):

在这里插入图片描述

存储级别

默认的存储级别都是仅在内存存储一份,Spark 的存储级别还有好多种,存储级别在 object StorageLevel 中定义的。
持久化级别说明MORY_ONLY(默认)将 RDD 以非序列化的 Java 对象存储在 JVM 中。如果没有足够的内存存储 RDD,则某些分区将不会被缓存,每次需要时都会重新计算。这是默认级别MORY_AND_DISK(开发中可以使用这个)将 RDD 以非序列化的 Java 对象存储在 JVM 中。如果数据在内存中放不下,则溢写到磁盘上,需要时则会从磁盘上读取MEMORY_ONLY_SER(Java and Scala)将 RDD 以序列化的 Java对象(每个分区一个字节数组)的方式存储,这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的 CPUMEMORY_AND_DISK_SER(Java and Scala)与 MEMORY_ONLY_SER 类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们DISK_ONLY将 RDD 分区存储在磁盘上MEMORY_ONLY_2,MEMORY_AND_DISK_2等与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上OFF_HEAP(实验中)与 MEMORY_ONLY_SER 类似,但将数据存储在堆外内存中。(即不是直接存储在 JVM 内存中)
总结

  • RDD 持久化/缓存的目的是为了提高后续操作的速度
  • 缓存的级别有很多,默认只存在内存中,开发中使用 memory_and_disk
  • 只有执行 action 操作的时候才会真正将 RDD数据进行持久化/缓存
  • 实际开发中如果某一个 RDD 后续会被频繁的使用,可以将该RDD进行持久化/缓存

RDD容错机制Checkpoint

持久化的局限

  • 持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。

问题解决

  • Checkpoint 的产生就是为了更加可靠的数据持久化,在 Checkpoint 的时候一般把数据放在 HDFS 上,这就天然的借助了 HDFS 天生的高容错、高可靠来实现数据最大程度上的安全,实现了 RDD 的容错和高可用。

用法如下

SparkContext.setCheckpointDir("目录") //HDFS的目录

RDD.checkpoint

总结

  • 开发中如何保证数据的安全性及读取效率:可以对频繁使用且重要的数据,先做缓存/持久化,再做checkpoint 操作。

持久化和 Checkpoint 的区别

  • 位置:Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存-实验中)Check point 可以保存数据到 HDFS 这类可靠的存储上。
  • 生命周期:Cache 和 Persist 的 RDD 会在程序结束后被清除或者手动调用 unpersist 方法 Checkpoint 的 RDD 在程序结束后依然存在,不会被删除。

RDD 的依赖关系

RDD 有两种依赖,分别为 宽依赖(wide dependency/shuffle dependency)和 窄依赖(narrow dependency)

在这里插入图片描述

在这里插入图片描述

从上图可以看到:

  • 窄依赖: 父 RDD 的一个分区只会被子 RDD 的一个分区依赖;
  • 宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。

对于窄依赖

  • 窄依赖的多个分区可以并行计算;
  • 窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。

对于宽依赖

  • 划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。

DAG 的生成和划分 Stage

DAG

**DAG(Directed Acyclic Graph 有向无环图)**:指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程);

原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。

DAG 的边界

  • 开始:通过 SparkContext 创建的 RDD;
  • 结束:触发 Action,一旦触发 Action 就形成了一个完整的 DAG。

DAG 划分 Stage

在这里插入图片描述

从上图可以看出:

  • 一个 Spark 程序可以有多个 DAG(有几个 Action,就有几个 DAG,上图最后只有一个 Action (图中未表现),那么就是一个 DAG);
  • 一个 DAG 可以有多个 Stage(根据宽依赖/shuffle 进行划分);
  • 同一个 Stage 可以有多个 Task 并行执行(task 数=分区数,如上图,Stage1 中有三个分区 P1、P2、P3,对应的也有三个 Task);
  • 可以看到这个 DAG 中只 reduceByKey 操作是一个宽依赖,Spark 内核会以此为边界将其前后划分成不同的 Stage;
  • 在图中 Stage1 中,从 textFile 到 flatMap 到 map 都是窄依赖,这几部操作可以形成一个流水线操作,通过 flatMap 操作生成的 partition 可以不用等待整个 RDD 计算结束,而是继续进行 map 操作,这样大大提高了计算的效率。

为什么要划分 Stage?–并行计算

  • 一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖进行划分),就可以将一个 DAG 划分成多个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。

如何划分 DAG 的 stage?

  • 对于窄依赖,partition的转换处理在 stage 中完成计算,不划分(将窄依赖尽量放在同一个 stage 中,可以实现流水线计算)。
  • 对于宽依赖,由于有 shuffle 的存在,只能在父 RDD 处理完成后,才能开始接下来的计算,也就是说需要划分 stage。

总结:

  • Spark 会根据 shuffle /宽依赖使用回溯算法来对 DAG 进行 Stage 划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD加入到当前的 stage/阶段中。

RDD累加器和广播变量

在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都声称一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。

为了满足这种需求,Spark 提供了两种类型的变量:

  • 累加器(accumulators):累加器支持在所有不同节点之间进行累加计算(比如技术或者求和)。
  • 广播变量(broadcast variables):广播变量用来把白能量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。
  • 累加器

累加器

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果:

语法:

val xx: Accumulator[Int] = sc.accumulator(0)

示例代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}

object AccumulatorTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //使用scala集合完成累加
    var counter1: Int = 0;
    var data = Seq(1,2,3)
    data.foreach(x => counter1 += x )
    println(counter1)//6

    println("+++++++++++++++++++++++++")

    //使用RDD进行累加
    var counter2: Int = 0;
    val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
    dataRDD.foreach(x => counter2 += x)
    println(counter2)//0
    //注意:上面的RDD操作运行结果是0
    //因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量
    //而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2
    //最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系

    //那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊!
    //如果解决?---使用累加器
    val counter3: Accumulator[Int] = sc.accumulator(0)
    dataRDD.foreach(x => counter3 += x)
    println(counter3)//6
  }
}

广播变量

关键词:

sc.broadcast()
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object BroadcastVariablesTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //不使用广播变量
    val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
    val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
    //scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)
    val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))
    //根据水果编号取水果名称
    val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))
    fruitNames.foreach(println)
    //注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多,
    //那么会导致,被各个Task共用到的fruitMap会被多次传输
    //应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可
    //如何做到?---使用广播变量
    //注意:广播变量的值不能被修改,如需修改可以将数据存到外部数据源,如MySQL、Redis
    println("=====================")
    val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
    val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))
    fruitNames2.foreach(println)

  }
}
标签: spark 学习 大数据

本文转载自: https://blog.csdn.net/weixin_45866849/article/details/128491450
版权归原作者 白居不易. 所有, 如有侵权,请联系我们删除。

“Spark 的学习笔记”的评论:

还没有评论