0


Spark基础进阶


一、Scala基础

1、Scala下载与安装

Scala官方网址:https://www.scala-lang.org/

(Scala是运行在JVM上的语言,因此必须确保系统环境中安装了JDK,即Java开发工具包)

2、Scala的特性

(1)面向对象

Scala是一种纯的面向对象语言。一个对象的类型和行为是由类和特征描述的。类通过子类化和灵活类进行拓展,成为多重继承的可靠解决方案。

(2)函数式编程

Scala提供了轻量级语法来定义匿名函数,支持高阶函数,允许函数嵌套,并支持函数柯里化。Scala的样例类与模式匹配支持函数式编程语言中的代数类型。Scala的单例对象提供了方便的方法来组合不属于类的函数。用户还可以使用Scala的模式匹配,便携类似正则表达式的代码处理可扩展标记语言(Extensible Markup Languege,XML)格式的数据。

(3)静态类型

Scala配备了表现型的系统,以静态的方式进行抽象,以安全和连贯的方式进行使用。系统支持将通用类、内部类、抽象类和复合类作为对象成员,也支持隐式参数、转换和多态方法等,这为抽象编程的安全重用和软件类型的安全扩展提供了强大的支持。

(4)可扩展性

在实践中,专用领域的应用程序开发往往需要特定的语言扩展。Scala提供了许多独特的语言机制,可以以库的形式无缝添加新的语言结构。

1、Scala常用数据类型

  • Int:32位有符号补码整数。数值区间为*-32768~32767*
  • Float:32位IEEE754(IEEE浮点数算数标准)单精度浮点数
  • Double:64位IEEE754(IEEE浮点数算数标准)双精度浮点数
  • String:字符序列,即字符串
  • Boolean:布尔值,truefalse
  • Unit:表示无值,作用与Java中的viod一样是不返回任何结果的方法的结果类型。Unit只有一个实例值,写成()

2、定义常量与变量

(1)常量

常量通过val关键字定义,在程序运行过程中值不会发生变化的量,其一旦定义就不可更改,无法对其进行重新计算或赋值。

(2)变量

变量通过var关键字定义,是在程序运行过程中值可以发生改变的量,在其定义后可以重新被赋值。但是在重新赋值时,只能将同类型的值赋值给变量。

  • 常量名要以字母或下划线开头,且变量名不能使用美元符号($)

3、运算符

Scala内置运算符包含了算术运算符、关系运算符、逻辑运算符、位运算符和赋值运算符等。

4、定义数组

数组是一种储存了相同类型元素的固定大小的顺序集合。

数组定义格式:

  • 方法一:var arr:Array[string] = new ArrayString
  • 方法二:var arr:Array[string] = Array(元素1 , 元素2 , ...)

数组相关常用方法:

  • length:返回数组长度
  • head:查看数组第一个元素
  • tail:查看数组中除了第一个元素外的其他元素
  • isEmpty:判断数组是否为空
  • xontains(x):判断数组是否包含元素x

二、Spark编程基础

1、创建RDD

RDD是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点中的存放元素的集合。RDD的创建方式有以下三种:

(1)从内存中读取并创建

①parallelize()

该方法有两个输入参数:

  • 要转化的集合。必须是Seq集合(具有一定长度且可以迭代访问的对象),其中每个数据元素均带有一个从0开始的固定索引。
  • 分区数。若未设定分区数,则默认为该程序分配到的资源的CPU核心数。
②makeRDD()

该方法有两种使用方式:

  • 第一种与parallelize()方法一致。
  • 第二种则是通过接收一个Seq[(T,Seq[String])]参数类型创建RDD。(保存的是T的值)

调用时不可直接指定RDD的分区个数。

(2)从外部储存系统中读取并创建

①通过Linux本地文件

Linux本地文件须通过textFile("file://路径")实现。

②HDFS文件创建

HDFS文件则更简单常用,可直接通过textFile("路径")读取

(3)使用map()方法转换数据

该方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD。

2、sortBy()方法排序

三个参数:

  • 函数f:(T) => K,左边是要被排序对象选中的每一个元素,右边返回的值是元素中要进行排序的值。
  • ascending,决定排序后RDD中的元素是升序还是降序,若未设定则默认值为true,即升序。
  • numPartitions,决定排序后RDD的分区个数,若未设定则默认为排序前后相等。

3、collect()方法查询数据

(1)collect

直接调用则返回RDD中所有元素,返回类型为一个Array[T]数组。较常用

(2)collect[U: ClassTag](f: PartialFunction[T,U]): RDD[U]

该方式须提供一个标准的偏函数,将元素保存至一个RDD中。

4、flatMap()方法转换数据

该方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的RDD。

先map(映射)再flat(扁平化)。

三、Spark编程进阶

1、搭建Spark开发环境

(1)下载与安装IntelliJ IDEA

官网链接:https://www.jetbrains.com.cn/idea/

(2)Scala插件安装与应用

  • 在线安装:Configure(设置)-Plugins(插件)-搜索Scala-Install(下载)-Restart IDEA(重启)
  • 离线安装:提前下载完成插件,在上述插件中点击齿轮状按钮,在下拉栏中点击“Install plugin from disk...”,选择插件所在路径单击OK等待安装,安装完成后点击上述重启按钮即可。

(3)配置Spark运行环境

  • 添加Spark开发依赖包:File(文件)-Project Structure(项目管理)/Ctrl+Alt+Shift+S-Libraries-+(添加)-Java-找到jars文件夹并导入)-OK
  • 编写Spark程序:导入com.tipdm.sparkDemo包并新建WordCount类,指定编写类型为object,编写Spark程序实现单词计数

导包:

package com.tipdm.sparkDemo

import org.apache.spark.(SparkConf, SparkContext)

object Wordcount (

def main (args: Array[String]): Unit = {

val conf = new SparkConf ().setAppName("WordCount")

val sc = new Sparkcontext (conf)

val input = "D:\\words.txt"

}

}

单词计数:

object WordCount{

def main(args: Array[String]): Unit ={

val conf = new SparkConf().setAppName("WordCount"].setMaster("local")

val sc = new Sparkcontext (conf)

System.setProperty("hadoop.home.dir", "F:\\hadoop-3.1.4")

val input = "D:\\words.txt"

val count = sc.textFile(input).flatMap(

  × => x.split(" ")).map( x =>( x,1)).reduceByKey((x, y) => x + y)

count.foreach(x => println(x._1+ ","+x._2))

}

}

2、运行Spark程序

(1)在开发环境中运行

  • 设置运行模式:Run-Edit Configurations...-Application-WordCount类-右侧VM options中输入“-Dspark.master=local”(local为指定使用运行模式)
  • 指定Hadoop安装包的bin文件夹的路径:可修改参数名为“hadoop.home.dir”的参数,也可自行前往官网下载对应Hadoop插件
  • 设置自定义参数:Run-Edit Configurations...-Program arguments

local 使用一个工作线程在本地模式下运行Spark(非并行模式)

local[K] 使用K个工作线程在本地模式下运行Spark(理想情况下,将K设置为机器上的内核数)

local[*] 在本地模式下运行Spark,工作线程与机器上的逻辑内核一样多

spark://HOST:PORT 连接到指定端口的Spark独立集群上,默认为7077端口

mesos://HOST:PORT 连接到指定端口的Mesos集群

yarn 根据配置的值连接到YARN集群,使用yarn-client或yam-cluster 模式

--deploy-mode client 客户端运行模式

--deploy-mode cluster集群运行模式

(2)在集群环境中运行

  • 直接在开发环境中运行 Spark 程序时通常选择的是本地模式。如果数据的规模比较庞大,更常用的方式还是在 Spark 程序开发测试完成后编译打包为Java 归档(Java Archive, JAR)包,并通过 spark-submit 命令提交到 Spark集群环境中运行

  • spark-submit的脚本在 Spark安装目录的bin 目录下,spark-submit 是 Spark 为所有支持的集群管理器提供的一个提交作业的工具。Spark在/example 目录下有 Scala、Java、Pyithon和R的示例程序,都可以通过 spark-submit运行

  • sporik-subomit 提交JAR 包到集群有一定的格式要求,需要设置一些参数

  • 如果除了设置运行的脚本名称之外不设置其他参数,那么 Spark 程序默认在本地运行。

    提交参数设置如下:

    ./bin/spark-submit --class <main-class>

    --master <master-url>

    --deploy-mode <deploy-mode>\

    --conf <"key=value">

    ...# other options

    <appli.cat.ion-jar> )

    [application-argaments」

注释

--class:应用程序的入口,指主程序。

--master:指定要连接的集群URL。

--deploy-mode:是否将驱动程序部署在工作节点(cluster)或本地作为外部客户端

(client)。

--conf:设置任意 Spark 配登属性,允许使用"key=value WordCount"的格式设置任意的SparkConf 配置属性。

application-jar:包含应用程序和所有依赖关系的JAR包的路径。

application-arguments:传递给main)方法的参数。

四、Spark SQL——结构化数据文件处理

1、Spark SQL的基本概念

Spark SQL是一个用于处理结构化数据框架,可以被视为一个分布式的SQL查询引擎,提供了一个抽象的可编程数据模型DataFrame。

2、DataFrame是什么?

DataFrame是一种二维的、表格型的数据结构,用于存储和处理结构化数据。它由行和列组成,每列可以包含不同类型的数据(如数值、字符串、布尔值等),并且具有行索引和列索引。DataFrame的设计初衷是将Series(一维数据结构)的使用场景从一维拓展到多维,使得数据操作更加灵活和高效。

3、创建DataFrame

(1)准备数据

例如在HDFS文件系统中/spark目录下有一个qq.txt文件,内容如下:

1 zhangsan 26

2 lisi 23

3 wangwu 28

4 gualiu 19

5 shenqi 16

(2)RDD直接转换为DataFrame

以上述文件进行举例:

val lineRDD=sc.textFile("/qq.txt").map(_.split(" "))

case class qq(id:Int,name:String,age:Int)

val pyxRDD=lineRDD.map(x=>qq(x(0).toInt,x(1),x(2).toInt))

val pyxDF=pyxRDD.toDF()

pyxDF.show

4、DataFrame的常用操作

  • show() :查看DataFrame中的具体内容信息
  • selectExpr() :对指定字段进行特殊处理(替换)
  • select() :查看DataFrame中获取指定字段
  • filter()/where :实现条件查询,过滤出想要的结果
  • groupBy() :对记录进行分组
  • sort() :对特定字段进行排序操作
标签: spark

本文转载自: https://blog.csdn.net/Seven_7bei/article/details/138770443
版权归原作者 Seven.7__ 所有, 如有侵权,请联系我们删除。

“Spark基础进阶”的评论:

还没有评论