0


Spark--Spark编程基础和编程进阶知识总结(第三章和第四章)

一、配置Spark运行环境

1.1 Spark安装步骤

  • 选择“Libraries”选项,单击“+”按钮,选择“Java”选项
  • 在弹出的界面中找到Spark安装目录下的jars文件夹,将整个文件夹导入,如图所示
  • 点击“OK”

*1.2 编写Spark*程序

任何Spark程序都是以SparkContext对象开始的,因为SparkContext是Spark应用程序的上下文和入口,无论是Scala、Python、R程序,都是通过SparkContext对象的实例来创建RDD,Spark Shell中的sc就是SparkContext对象的实例。因此在实际Spark应用程序的开发中,在main方法中需要创建SparkContext对象,作为Spark应用程序的入口,并在Spark程序结束时关闭SparkContext对象。

  • 初始化SparkContext需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数,属性参数是一种键值对的格式,一般可以通过set(属性名,属性设置值)的方法修改属性。其中还包含了设置程序名setAppName、设置运行模式setMaster等方法。如下图所示
  • SparkContext对象的实例创建完成后,就可以通过实例变量转化集合或者读取数据,计算过程中转化操作和行动操作的使用方法与在Shell环境中一致。

1.3 从内存中读取数据创建****RDD

RDD是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点中的存放元素的集合。RDD的创建有3种不同的方法。

  • 第一种是将程序中已存在的Seq集合(如集合、列表、数组)转换成RDD。
  • 第二种是对已有RDD进行转换得到新的RDD,这两种方法都是通过内存中已有的集合创建RDD的。
  • 第三种是直接读取外部存储系统的数据创建RDD。

1.3.1 parallelize()

parallelize()方法有两个输入参数,说明如下。

  • 要转化的集合,必须是Seq集合。Seq表示序列,指的是一类具有一定长度的、可迭代访问的对象,其中每个数据元素均带有一个从0开始的、固定的索引。
  • 分区数。若不设分区数,则RDD的分区数默认为该程序分配到的资源的CPU核心数。

1.3.2 makeRDD()

  • makeRDD()方法有两种使用方式:
  1. 第一种方式的使用与parallelize()方法一致;
  2. 第二种方式是通过接收一个是Seq[(T,Seq[String])]参数类型创建RDD。
  • 第二种方式生成的RDD中保存的是T的值,Seq[String]部分的数据会按照Seq[(T,Seq[String])]的顺序存放到各个分区中,一个Seq[String]对应存放至一个分区,并为数据提供位置信息,通过preferredLocations()方法可以根据位置信息查看每一个分区的值。调用makeRDD()时不可以直接指定RDD的分区个数,分区的个数与Seq[String]参数的个数是保持一致的。

1.4 从外部存储系统中读取数据创建****RDD

  • 从外部存储系统中读取数据创建RDD是指直接读取存放在文件系统中的数据文件创建RDD。
  • 从内存中读取数据创建RDD的方法常用于测试,从外部存储系统中读取数据创建RDD才是用于实践操作的常用方法。
  • 从外部存储系统中读取数据创建RDD可以有很多种数据来源,可通过SparkContext对象的textFile()方法读取数据集,该方法支持多种类型的数据集,如目录、文本文件、压缩文件和通配符匹配的文件等,并且允许设定分区个数。
  • 分别读取HDFS文件和Linux本地文件的数据并创建RDD,具体操作如下。

通过HDFS文件创建RDD:

直接通过textFile()方法读取HDFS文件的位置即可。

通过Linux本地文件创建RDD:

本地文件的读取也是通过sc.textFile("路径")的方法实现的,在路径前面加上“file://”表示从Linux本地文件系统读取。在IntelliJ IDEA开发环境中可以直接读取本地文件;但在spark-shell中,要求在所有节点的相同位置保存该文件才可以读取它。

二、RDD方法

RDD提供了丰富的操作方法用于操作分布式的数据集合,包括转换操作和行动操作两部分。

  • 转换操作可以将一个RDD转换为一个新的RDD,但是转换操作是懒操作,不会立刻执行计算。
  • 行动操作是用于触发转换操作的操作,这时才会真正开始进行计算。

*2.1 使用***map()**方法转换数据

  • map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD。
  • map()方法是转换操作,不会立即进行计算。
  • 转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD。

2.2 使用**sortBy()**方法进行排序

  • sortBy()方法用于对标准RDD进行排序,有3个可输入参数,说明如下。
  1. 第1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
  2. 第2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false。
  3. 第3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即this.partitions.size。
  • 第一个参数是必须输入的,而后面的两个参数可以不输入。

*2.3 使用***collect()**方法查询数据

  • collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。
  • 因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。
  • 因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。

*2.4 使用***collect()**方法查询数据

collect()方法有以下两种操作方式:

  1. collect:直接调用collect返回该RDD中的所有元素,返回类型是一个Array[T]数组。

collect[U: ClassTag](f: PartialFunction[T, U]):RDD[U]。这种方式需要提供一个标准的偏函数,将元素保存至一个RDD中。首先定义一个函数one,用于将collect方法得到的数组中数值为1的值替换为“one”,将其他值替换为“other”。

2.5 使用**flatMap()**方法转换数据

  • flatMap()方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的RDD。
  • 使用flatMap()方法时先进行map(映射)再进行flat(扁平化)操作,数据会先经过跟map一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD。
  • 这个转换操作通常用来切分单词。

*2.6 使用***take()**方法查询某几个值

  • take(N)方法用于获取RDD的前N个元素,返回数据为数组。
  • take()与collect()方法的原理相似,collect()方法用于获取全部数据,take()方法获取指定个数的数据。
  • 获取RDD的前5个元素

*2.7 使用**union()方法合并多个RDD*

  • union()方法是一种转换操作,用于将两个RDD合并成一个,不进行去重操作,而且两个RDD中每个元素中的值的个数、数据类型需要保持一致。
  • 使用union()方法合并两个RDD

*2.8 使用***filter()**方法进行过滤

  • filter()方法是一种转换操作,用于过滤RDD中的元素。
  • filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。
  • filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD。
  • 创建一个RDD,并且过滤掉每个元组第二个值小于等于1的元素。

*2.9 使用***distinct()**方法进行去重

  • distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。
  • 创建一个带有重复数据的RDD,并使用distinct()方法去重。

三、使用简单的集合操作

Spark中的集合操作常用方法(转换操作)

**3.1 intersection()**方法

  • intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。
  • 创建两个RDD,其中有相同的元素,通过intersection()方法求出两个RDD的交集。

3.2 subtract()方法

  • subtract()方法用于将前一个RDD中在后一个RDD出现的元素删除,可以认为是求补集的操作,返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果。
  • 创建两个RDD,分别为rdd1和rdd2,包含相同元素和不同元素,通过subtract()方法求rdd1和rdd2彼此的补集。

**3.3 cartesian()**方法

  • cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。
  • 创建两个RDD,分别有4个元素,通过cartesian()方法求两个RDD的笛卡儿积。

3.4 了解键值对****RDD

  • Spark的大部分RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD。
  • 顾名思义,键值对RDD由一组组的键值对组成,这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口。
  • 例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD。

3.5 创建键值对****RDD

  • 有很多种创建键值对RDD的方式,很多存储键值对的数据格式会在读取时直接返回由其键值对组成的PairRDD。
  • 当需要将一个普通的RDD转化为一个PairRDD时可以使用map函数来进行操作,传递的函数需要返回键值对。

3.6 使用键值对RDDkeysvalues方法

  • 键值对RDD,包含键和值两个部分。
  • Spark提供了两种方法,分别获取键值对RDD的键和值。
  1. keys方法返回一个仅包含键的RDD。
  2. values方法返回一个仅包含值的RDD。

3.7 使用键值对RDD的**reduceByKey()**方法

  • 当数据集以键值对形式展现时,合并统计键相同的值是很常用的操作。
  • reduceByKey()方法用于合并具有相同键的值,作用对象是键值对,并且只对每个键的值进行处理,当RDD中有多个键相同的键值对时,则会对每个键对应的值进行处理。
  • reduceByKey()方法需要接收一个输入函数,键值对RDD相同键的值会根据函数进行合并并且创建一个新的RDD作为返回结果。

3.8 使用键值对RDD的**reduceByKey()**方法

在进行处理时,reduceByKey()方法将相同键的前两个值传给输入函数,产生一个新的返回值,新产生的返回值与RDD中相同键的下一个值组成两个元素,再传给输入函数,直到最后每个键只有一个对应的值为止。reduceByKey()方法不是一种行动操作,而是一种转换操作。

3.9 使用键值对RDD的**groupByKey()**方法

  • groupByKey()方法用于对具有相同键的值进行分组,可以对同一组的数据进行计数、求和等操作。
  • 对于一个由类型K的键和类型V的值组成的RDD,通过groupByKey()方法得到的RDD类型是[K,Iterable[V]]。

3.10 使用join()方法连接两个RDD

  • 将有键的一组数据与另一组有键的数据根据键进行连接,是对键值对数据常用的操作之一。
  • 与合并不同,连接会对键相同的值进行合并,连接方式多种多样,包含内连接、右外连接、左外连接、全外连接,不同的连接方式需要使用不同的连接方法。
  • 连接方法如下表。

3.10.1 join()方法

  • join()方法用于根据键对两个RDD进行内连接,将两个RDD中键相同的数据的值存放在一个元组中,最后只返回两个RDD中都存在的键的连接结果。
  • 例如,在两个RDD中分别有键值对(K,V)和(K,W),通过join()方法连接会返回(K,(V,W))。
  • 创建两个RDD,含有相同键和不同的键,通过join()方法进行内连接。

3.10.2 rightOuterJoin()方法

  • rightOuterJoin()方法用于根据键对两个RDD进行右外连接,连接结果是右边RDD的所有键的连接结果,不管这些键在左边RDD中是否存在。
  • 在rightOuterJoin()方法中,如果在左边RDD中有对应的键,那么连接结果中值显示为Some类型值;如果没有,那么显示为None值。

3.10.3 leftOuterJoin()方法

leftOuterJoin()方法用于根据键对两个RDD进行左外连接,与rightOuterJoin()方法相反,返回结果保留左边RDD的所有键。

3.10.4 fullOuterJoin()方法

fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。

3.11 使用zip()方法组合两个RDD

  • zip()方法用于将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常。
  • 将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

3.12 使用combineByKey()方法合并相同键的值

  • combineByKey()方法是Spark中一个比较核心的高级方法,键值对的其他一些高级方法底层均是使用combineByKey()方法实现的,如groupByKey()方法、reduceByKey()方法等。
  • combineByKey()方法用于将键相同的数据聚合,并且允许返回类型与输入数据的类型不同的返回值。
  • combineByKey()方法的使用方式如下。

  • combineByKey()方法接收3个重要的参数,具体说明如下:
  1. createCombiner:V=>C,V是键值对RDD中的值部分,将该值转换为另一种类型的值C,C会作为每一个键的累加器的初始值。
  2. mergeValue:(C,V)=>C,该函数将元素V聚合到之前的元素C(createCombiner)上(这个操作在每个分区内进行)。
  3. mergeCombiners:(C,C)=>C,该函数将两个元素C进行合并(这个操作在不同分区间进行)。

  • 由于合并操作会遍历分区中所有的元素,因此每个元素(这里指的是键值对)的键只有两种情况:以前没出现过或以前出现过。对于这两种情况,3个参数的执行情况描述如下:
  1. 如果以前没出现过,则执行的是createCombiner()方法,createCombiner()方法会在新遇到的键对应的累加器中赋予初始值,否则执行mergeValue()方法。
  2. 对于已经出现过的键,调用mergeValue()方法进行合并操作,对该键的累加器对应的当前值(C)与新值(V)进行合并。
  3. 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法对各个分区的结果(全是C)进行合并。

3.13 使用lookup()方法查找指定键的值

lookup(key:K)方法作用于键值对RDD,返回指定键的所有值。

四、读取与存储JSON文件

Spark支持的一些常见文件格式

4.1 JSON文件的读取

4.2 JSON文件的存储

五、读取与存储CSV文件

5.1 CSV文件的读取

5.2 CSV文件的存储

六、读取与存储SequenceFile文件

6.1 SequenceFile文件的存储

6.2 SequenceFile文件的读取

七、读取与存储文本文件

7.1 文本文件的读取

通过textFile()方法即可直接读取,一条记录(一行)作为一个元素。

7.2 文本文件的存储

RDD数据可以直接调用saveAsTextFile()方法将数据存储为文本文件。

扩展:RDD

RDD分布式对象集合,本质上是一个只读的分区记录集合,不能直接修改,通过转换得到新的RDD。

在RDD的执行过程中,真正的计算发生在行动操作中,在前面的所有转换,spark只是记录下转换操作应用的一些基础数据集和RDD生成轨迹,不会触发计算。

优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单、高效的容错性、存放的数据可以是JAVA对象

一.核心-RDD

1.1 设计背景

  • 许多迭代式算法《比如机器学习、图算法等》和交互式数据挖掘工具,共同之处是,不同计算阶段之间会重用中间结果
  • 目前的MapReduce框架都是把中间结果写入到磁盘中,带来大量的数据复制、磁盘Io和序列化开销
  • RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据结构
  • 我们不必担心底层数据的分布式持性,只需将具体的应用逻辑表达为一系列转换处理
  • 不同RDD之间的转换操作形成依赖关系,可以实现管道化,避免中间数据存储

1.2 RDD概念

  • 一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,不同节点上进行并行计算
  • RDD提供了一种高度受限的共享内存模型,RDD是只读的记录分区集合,不能直接修改,只能通过在转换的过程中改

1.3 RDD特性

  • 高效的容错性
  • 现有容错机制:数据复制或者记录日志RDD具有天生的容错性:血缘关系,重新计算丢失分区,无需回滚系统,重算过程在不同节点之间并行,只记录粗粒度的操作
  • 中间结果持久化到内存,数据在内存中的多个RDD操作直接按进行传递,避免了不必要的读写磁盘开销
  • 存放的数据可以是JAVA对象,避免了不必要的对象序列化和反序列化

1.4 RDD之间的依赖关系

  • 窄依赖指的是子RDD的一个分区只依赖于某个父RDD中的一个分区。
  • 宽依赖指的是子RDD的每一个分区都依赖于某个父RDD中一个以上的分区。
  • 理解宽、窄依赖的区别,需要先了解父RDD和子RDD。map()、filter()方法上方箭头左边的RDD是父RDD,而右边的RDD是子RDD。union()方法上方箭头左边的两个RDD均为右边RDD的父RDD, union()方法是有两个父RDD 的。

1.5 RDD运行过程

上述对RDD概念、依赖关系和Stage划分的介绍,结合之前介绍的Spark运行基本流程,再总结一下RDD在Spark架构中的运行过程:

  • 创建RDD对象
  • SparkContext负责计算RDD之间的依赖关系,构建DAG
  • DAGScheduler负责把DAG图分解成多个Stage,每个Stage中包含了多个Task,每个Task会被TaskScheduler分发给各个WorkerNode上的Executor去执行。

Spark编程进阶

一、Hadoop与spark区别

Hadoop虽然已经成为大数据技术的事实标准,但其本身存在很多缺陷。比如,mapreduce计算模型延迟过高,无法实现实时快速计算的需求,只适用于离线批处理,I/O磁盘开销大。

spark在借鉴mapreduce优点同时,很好解决了mapreduce存在的缺陷:

  1. spark计算也属于mapreduce计算,但不局限于map和reduce操作;
  2. spark提供内计算,中间结果放入内存,提高迭代运算效率;
  3. 基于DAG的任务调度执行机制,优于mapreduce调度机制。

二、安装IDEA

可以在官网下载安装社区版本:

IntelliJ IDEA – the Leading Java and Kotlin IDE (jetbrains.com)

2.1 安装Scala

在File菜单->Settings->Plugins 插件安装界面搜索scala插件安装。

2.2 Scala下载

(我选择的版本是2.12.15)安装及环境变量的配置

官方下载地址:The Scala Programming Language (scala-lang.org)

双击打开下载好的安装程序,一直“Next”即可,最好不要安装到C盘,中间修改一下安装路径即可,最后点击“Finish”。我将scala软件安装在了D盘目录下的Develop文件夹,bin路径如下:

配置scala的系统环境变量,将scala安装的bin目录路径加入到系统环境变量path中:

win+R打开命令窗口输入:scala -verison ,进行检测是否成功配置环境变量

2.3 Scala插件(版本要与IDEA版本保持一致,下载2019.2.3版本)的下载安装

官方下载: Versions: Scala Plugin for IntelliJ IDEA & Android Studio | JetBrains Marketplace

下载完成后,将下载的压缩包解压到IDEA安装目录下的plugins目录下

2.4 检测Scala插件是否在IDEA中已经安装成功

2.5 新建scala类文件编写代码

2.6 鼠标点击java文件夹,右键new--->Scala Class

在WordCount文件中编写如下代码:

import org.apache.spark.sql.SparkSession
object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("word count")
      .getOrCreate()
    val sc = spark.sparkContext
    val rdd = sc.textFile("data/input/words.txt")
    val counts = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    counts.collect().foreach(println)
    println("全部的单词数:"+counts.count())
    counts.saveAsTextFile("data/output/word-count")
  }
}

2.7 准备好测试文件words.txt,将文件存放在scalaproject-->data-->input-->words.txt

运行WordCount程序:

运行结果:

三、编写本地运行的spark程序

3.1 编写pom.xml 文件

管理spark程序依赖jar,此时要能上网,在pom.xml文件中,添加如下配置信息

<repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <properties>
        <encoding>UTF-8</encoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.12.10</scala.version>
        <spark.version>3.0.1</spark.version>
        <hadoop.version>2.7.7</hadoop.version>
    </properties>
    <dependencies>
        <!--依赖Scala语言-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
 
        <!--SparkCore依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
 
        <!--SparkSQL依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
      
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
 
        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.7</version>
        </dependency>
      
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
 
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <!-- 指定编译java的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
            <!-- 指定编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
</build>

刷新maven工程,会自动下载所需依赖jar,此时会下载时间较长,耐心等待

3.2 编写代码

WordCount .scala文件实现单词计数关键代码的解析如下:

 
 
   //1.env/准备sc/SparkContext/Spark上下文执行环境
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
 
 
    //2.source/读取数据
    val lines: RDD[String] = sc.textFile("data/input/words.txt")
//(这里要特别注意一下,你自己电脑的目录下要保证有这个words.txt文件)
 
 
    //3.transformation/数据操作/转换
    //切割:RDD[一个个的单词]
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //记为1:RDD[(单词, 1)]
    val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
    val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
 
 
    //4.输出
    //直接输出
    result.foreach(println)
 
    //输出到指定path(可以是文件/夹)
    result.repartition(1).saveAsTextFile("data/output/result")
 
    //为了便于查看Web-UI可以让程序
    Thread.sleep(1000 * 60)
 
    //5.关闭资源
    sc.stop()
  

四、spark-submit 详细参数说明

参数名参数说明--mastermaster 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local--deploy-mode 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client--class应用程序的主类,仅针对 java 或 scala 应用--name应用程序的名称--jars用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下--packages包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标--exclude-packages为了避免冲突 而指定不包含的 package--repositories远程 repository--conf PROP=VALUE
指定 spark 配置属性的值,

例如-conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"
--properties-file加载的配置文件,默认为 conf/spark-defaults.conf--driver-memory Driver内存,默认 1G--driver-java-options传给 driver 的额外的 Java 选项--driver-library-path 传给 driver 的额外的库路径--driver-class-path 传给 driver 的额外的类路径--driver-cores Driver 的核数,默认是1。在 yarn 或者 standalone 下使用--executor-memory每个 executor 的内存,默认是1G--total-executor-cores 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用--num-executors启动的 executor 数量。默认为2。在 yarn 下使用--executor-core每个 executor 的核数。在yarn或者standalone下使用

五、完整代码实现

5.1 WordCount.scala文件在本地运行:

package net.objet
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
 
object WordCount {
  def main(args: Array[String]): Unit = {
    //1.env/准备sc/SparkContext/Spark上下文执行环境
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
 
    //2.source/读取数据
    val lines: RDD[String] = sc.textFile("data/input/words.txt")
 
    //3.transformation/数据操作/转换
    //切割:RDD[一个个的单词]
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //记为1:RDD[(单词, 1)]
    val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
    val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
    //4.输出
    //直接输出
    result.foreach(println)
    //输出到指定path(可以是文件/夹)
    result.repartition(1).saveAsTextFile("data/output/result")
 
    //为了便于查看Web-UI可以让程序
    Thread.sleep(1000 * 60)
    //5.关闭资源
    sc.stop()
  }
 
}

5.2 WordCount.scala文件在yarm上运行:

package net
 
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
 
object WordCount {
  def main(args: Array[String]): Unit = {
 
    if(args.length < 2){
      println("请指定input和output")
      System.exit(1)//非0表示非正常退出程序
    }
 
    //1.env/准备sc/SparkContext/Spark上下文执行环境
    val conf: SparkConf = new SparkConf().setAppName("wc") //.setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
 
    //2.source/读取数据
    val lines: RDD[String] = sc.textFile(args(0))
 
    //3.transformation/数据操作/转换
    //切割:RDD[一个个的单词]
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //记为1:RDD[(单词, 1)]
    val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
    val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
    //4.输出
    //直接输出
    result.foreach(println)
    //输出到指定path(可以是文件/夹)
    result.repartition(1).saveAsTextFile(args(1))
 
    //为了便于查看Web-UI可以让程序Thread.sleep(1000 * 60)
    //5.关闭资源
    sc.stop()
  }
 
}

本文转载自: https://blog.csdn.net/2302_80385233/article/details/138791309
版权归原作者 番外剧. 所有, 如有侵权,请联系我们删除。

“Spark--Spark编程基础和编程进阶知识总结(第三章和第四章)”的评论:

还没有评论