零、pyspark模板
**import os
from pyspark import SparkContext, SparkConf
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'if name == 'main':
print('pyspark模板')**
一、初识spark
1. spark的基本介绍
MapReduce: 分布式计算引擎
分布式计算引擎,主要用于对大规模的数据进行统计分析的操作,主要用于批处理(离线处理)
MR有什么弊端?
1-执行效率比较低(慢):整个内部运转过程不断进行 磁盘和内存的交互,产生大量IO ,从而影响效率2-对于选代计算支持不够好,效率比较低: 选代计算:当计算操作,必须多个阶段,而且每个阶段之间存在依赖关系,只有当上一个阶段执行完成后,下一个阶段继续 3-MR的代码相对来说比较底层,开发难度系数较高
1.1 发展史
1.2 4个特点
1-速度快
- Spark基于RDD计算模型进行处理,整个计算操作可以基于内存来计算,也可以基于磁盘来计算,而且可以更好更方便的进行选代计算,整个选代过程汇总,中间的结果是可以保存在内存中,内存不足可以保存到磁盘。
- Spark是基于线程运行的,线程的启动和销毁要高于进程的启动和销毁,而MR是基于进程的。
2-易用性
- Spark提供多种语言的操作API,操作Spark不仅仅可以使用python,也可以使用 scala、SQL、java、R.....
- Spark提供的API更加的高阶,意味很多功能方法全部都定义好了,比如说转换,遍历,排序
3-通用性
spark提供多组组件,从而应对未来不同的场景
- Sparkcore:spark核心,学习spark基础,学习主要点就是RDD,其中包含各种操作语言的客户端,以及RDD维护,对资源的处理的操作API全部都是在CORE中
- Spark SQL:Spark可以使用SQL方式操作Spark,Spark sql组件,用于支持这种方案,需要将SQL翻译为RDD来运行
- Spark steaming:spark的流式处理Spark可以支持进行流式计算,也就是实时计算
- structured steaming:结构化的流式处理
- spark MLlib:Spark的机器学习库 主要是用于 进行机器学习 算法相关的行业使用的库,比如说 回归 聚类
- Spark graphx:spark的图计算库主要是用于进行图计算,比如说:地图中行程规划
4-随处运行
- Spark的计算程序可以运行在不同的资源调度平台上,比如 local yarn spark集群 还支持一些云上调度环境(mesos...)
- Spark可以和大数据生态圈中各种软件进行集成,这样可以更加方便的对接使用
1.3 通信方案
二、spark的环境安装
1. Local模式安装
local模式连要是用于开发测试环境,不能作为生产环境
local本质上就是一个JVM进程程序 在这个程序中,运行多个线程来分布式处理
local模式是一种单机模式, 仅适合于小量数据集的处理,无法处理大规模数据
解压安装进入spark bin 目录
** spark-shell**
2. PySpark 的安装
pyspark 是python下的一个库,如果需要安装pyspark,首先需要先保证有python的环境,而且当前这个spark版本要求python的环境必须为3以上版本,而目前虚拟机的版本为 Python2的版本
此时,需要先安装python3的版本,目前在虚拟机中,需要安装的python版本为: 3.8.8
此处在安装python环境的时候,我们不在采用原有的直接安装python包的方式,而且是选择使用 anaconda(数据科学库)
原因:
- 1-:anaconda是一个数据科学库,这个库包含有python的环境 + python各种进行数据分析的库,可以节省一部分关于数据科学库安装操作
- 2-:anaconda提交一套完善的虚拟环境,可以基于anaconda构建多套互相隔离的虚拟环境(沙箱环境),可以在不同环境中安装不同的python的版本,以及安装不同的python包
2.1 Anaconda安装
此环境三台节点都是需要安装的, 以下演示在node1安装, 其余两台也是需要安装的
cd /export/software
rz 上传Anaconda脚本环境
执行脚本:
bash Anaconda3-2021.05-Linux-x86_64.sh
不断输入空格, 直至出现以下解压, 然后输入yes
此时, anaconda需要下载相关的依赖包, 时间比较长, 耐心等待即可.... 如果遇到需要输入yes/no, 一律输入yes
配置anaconda的环境变量:
vim /etc/profile
**##**增加如下配置
export ANACONDA_HOME=/root/anaconda3/bin
export PATH=$PATH:$ANACONDA_HOME/bin
重新加载环境变量: source /etc/profile
修改bashrc文件
sudo vim ~/.bashrc
添加如下内容:
export PATH=~/anaconda3/bin:$PATH
说明:
profile
其实看名字就能了解大概了, profile 是某个用户唯一的用来设置环境变量的地方, 因为用户可以有多个 shell 比如 bash, sh, zsh 之类的, 但像环境变量这种其实只需要在统一的一个地方初始化就可以了, 而这就是 profile.
bashrc
bashrc 也是看名字就知道, 是专门用来给 bash 做初始化的比如用来初始化 bash 的设置, bash 的代码补全, bash 的别名, bash 的颜色. 以此类推也就还会有 shrc, zshrc 这样的文件存在了, 只是 bash 太常用了而已.
2.2 conda操作
Anaconda(水蟒):是一个科学计算软件发行版,集成了大量常用扩展包的环境,包含了 conda、Python 等 180 多个科学计算包及其依赖项,并且支持所有操作系统平台。下载地址:https://www.continuum.io/downloads
- 安装包:pip install xxx,conda install xxx
- 卸载包:pip uninstall xxx,conda uninstall xxx
- 升级包:pip install upgrade xxx,conda update xxx
Jupyter Notebook:启动命令
jupyter notebook
功能如下:
- Anaconda自带,无需单独安装
- 实时查看运行过程
- 基本的web编辑器(本地)
- ipynb 文件分享
- 可交互式
- 记录历史运行结果
修改jupyter显示的文件路径:
通过jupyter notebook --generate-config命令创建配置文件,之后在进入用户文件夹下面查看.jupyter隐藏文件夹,修改其中文件jupyter_notebook_config.py的202行为计算机本地存在的路径。
IPython:
命令:ipython,其功能如下
1.Anaconda自带,无需单独安装
2.Python的交互式命令行 Shell
3.可交互式
4.记录历史运行结果
5.及时验证想法
Spyder:
命令:spyder,其功能如下
1.Anaconda自带,无需单独安装
2.完全免费,适合熟悉Matlab的用户
3.功能强大,使用简单的图形界面开发环境
下面就Anaconda中的conda命令做详细介绍和配置。
- conda命令及pip命令
conda管理数据科学环境,conda和pip类似均为安装、卸载或管理Python第三方包。
conda install 包名 pip install 包名
conda uninstall 包名 pip uninstall 包名
conda install -U 包名 pip install -U 包名
(2) Anaconda设置为国内下载镜像
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --set show_channel_urls yes
(3)conda创建虚拟环境
conda env list
conda create py_env python=3.8.8 #创建python3.8.8环境
activate py_env #激活环境
deactivate py_env #退出环境
2.3 PySpark安装
仅需要在nodc1安装即可 后续会让pycharm连接n0de1的|远程python环境进行操作
2.3.1 直接安装PySpark
安装如下:
使用PyPI安装PySpark如下:也可以指定版本安装
pip install pyspark**==3.1.2**
或者指定清华镜像**(对于网络较差的情况)**:
**pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark==3.1.2 # **指定清华镜像源
如果要为特定组件安装额外的依赖项,可以按如下方式安装(此步骤暂不执行,后面Sparksql部分会执行):
pip install pyspark[sql]
2.3.2 创建Conda环境安装PySpark
#从终端创建新的虚拟环境,如下所示
conda create -n pyspark_env python=3.8
#创建虚拟环境后,它应该在 Conda 环境列表下可见,可以使用以下命令查看
conda env list
#现在使用以下命令激活新创建的环境:
source activate pyspark_env
或者
conda activate pyspark_env
如果报错: CommandNotFoundError: Your shell has not been properly configured to use 'conda deactivate'.切换使用 source activate
#您可以在新创建的环境中通过使用PyPI安装PySpark来安装pyspark,例如如下。它将pyspark_env在上面创建的新虚拟环境下安装 PySpark。
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark==3.1.2
#或者,可以从 Conda 本身安装 PySpark:
conda install pyspark==3.1.2
3.2.3 手动下载安装[不推荐]
将spark对应版本下的python目录下的pyspark复制到anaconda的
Library/Python3/site-packages/目录下即可。
请注意,PySpark 需要JAVA_HOME正确设置的Java 8 或更高版本。如果使用 JDK 11,请设置-Dio.netty.tryReflectionSetAccessible=true,Arrow相关功能才可以使用。
扩展:
conda虚拟环境 命令
查看所有环境
conda info --envs
新建虚拟环境
conda create -n myenv python=3.6
删除虚拟环境
conda remove -n myenv --all
激活虚拟环境
conda activate myenv
source activate base
退出虚拟环境
conda deactivate myenv
2.4 初体验-PySpark shell方式
前面的Spark Shell实际上使用的是Scala交互式Shell,实际上 Spark 也提供了一个用 Python 交互式Shell,即Pyspark。
bin/pyspark --master local[*]
三、Spark Standalone集群环境
Standalone模式是Spark自带的一种集群模式,不同于前面本地模式启动多个进程来模拟集群的环境,Standalone模式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理。
节点
主节点(master)
从节点(worker)
历史服务(history server)
node1
是
是
是
node2
否
是
否
node3
否
是
否
1. 修改配置文件
说明: 直接对local模型下的spark进行更改为standalone模式
【workers】
cd /export/server/spark/conf/
cp workers.template workers
vim workers
添加以下内容:
node1.itcast.cn
node2.itcast.cn
node3.itcast.cn
【spark-env.sh】
cd /export/server/spark/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
增加如下内容:
JAVA_HOME=/export/server/jdk1.8.0_241/
HADOOP_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop/
YARN_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop/
export SPARK_MASTER_HOST=node1
export SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=1g
SPARK_WORKER_PORT=7078
SPARK_WORKER_WEBUI_PORT=8081
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"
注意:
Jdk,hadoop, yarn的路径, 需要配置为自己的路径(可能与此位置不一致)
History配置中, 需要指定hdfs的地址, 其中端口号为8020或者9820, 大家需要参考hdfs上对应namenode的通信端口号
【配置spark应用日志】
第一步: 在HDFS上创建应用运行事件日志目录:
hdfs dfs -mkdir -p /sparklog/
第二步: 配置spark-defaults.conf
cd /export/server/spark/conf
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
添加以下内容:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:8020/sparklog/
spark.eventLog.compress true
其中HDFS的地址, 8020 还是9820 需要查看HDFS的界面显示
【log4j.properties】
cd /export/server/spark/conf
cp log4j.properties.template log4j.properties
vim log4j.properties
改变日志级别
2. 分发到其他机器
将配置好的将 Spark 安装包分发给集群中其它机器,命令如下:
cd /export/server/
scp -r spark-3.1.2-bin-hadoop3.2/ node2:$PWD
scp -r spark-3.1.2-bin-hadoop3.2/ node3:$PWD
##分别在node2, 和node3中创建软连接
ln -s /export/server/spark-3.1.2-bin-hadoop3.2/ /export/server/spark
3. 启动spark Standalone
- 启动方式1:集群启动和停止
在主节点上启动spark集群
cd /export/server/spark
sbin/start-all.sh
sbin/start-history-server.sh
在主节点上停止spark集群
/export/server/spark/sbin/stop-all.sh
- 启动方式2:单独启动和停止
在 master 安装节点上启动和停止 master:
start-master.sh
stop-master.sh
在 Master 所在节点上启动和停止worker(work指的是slaves 配置文件中的主机名)
start-slaves.sh
stop-slaves.sh
- WEB UI页面
可以看出,配置了3个Worker进程实例,每个Worker实例为1核1GB内存,总共是3核 3GB 内存。目前显示的Worker资源都是空闲的,当向Spark集群提交应用之后,Spark就会分配相应的资源给程序使用,可以在该页面看到资源的使用情况。
- 历史服务器HistoryServer:
/export/server/spark/sbin/start-history-server.sh
WEB UI页面地址:http://node1:18080
4. 连接集群
【spark-shell 连接】
cd /export/server/spark
bin/spark-shell --master spark://node1:7077
【pyspark 连接】
cd /export/server/spark
./bin/pyspark --master spark://node1:7077
四、基于pycharm完成PySpark入门案例
1. pycharm如何连接远程环境
- 一般在企业中,会存在两套线上环境,一套环境是用于开发(测试)环境,一套环境是用于生产环境,首先一般都是先在开发测试环境上进行编写代码,并且在此环境上进行测试,当整个项目全部开发完成后,需要将其上传到生产环境,面向用于使用
- 如果说还是按照之前的本地模式开发方案,每个人的环境有可能都不一致,导致整个团队无法统--套开发环境进行使用,从而导致后续在进行测试,上线的时候,出现各种各样环境问题
- pycharm提供了一些解决方案:远程连接方案,允许所有的程序员都去连接远端的测试环境的,确保大家的环境都是统一,避免各种环境问题发生,而且由于连接的远程环境,所有在pycharm编写代码,会自动上传到远端环境中,在执行代码的时候,相当于是直接在远端环境上进行执行操作
2. Wrodcount代码实现 local
2.1 Wrodcount案例流程实现
2.2 代码实现
# 演示 pyspark的入门系例:Wordcount
import os
from pyspark import SparkContext, SparkConf
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('pyspark的入门系例:Wordcount')
# 1- 创建spark核心对象 SparkConf
conf = SparkConf().setAppName('WordCount').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2- 读取数据
# 此处的路经地址,不应该这样写,因为后续有可能无法加裁到,建议添加文件协议
# 如果读取本地文件: file:///
# 如何该取HDFS文件:hdfs://node1:8020
rdd_init = sc.textFile('file:///export/data/workspace/pyspark_parent/01_pyspark_base/data/word.txt')
# print(rdd_init.collect())
"""
['hadoop hive sqoop hadoop', 'sqoop oozie hive java', 'spark python spark java', 'sqoop oozie hive java']
"""
# 3-对每一行的数据执行切割操作,转换为一个个列表
# 一到一的转换操作: map
# rdd_map = rdd_init.map(lambda line : line.split())
"""
[
['hadoop', 'hive', 'sqoop', 'hadoop'],
['sqoop', 'oozie', 'hive', 'java'],
['spark', 'python', 'spark', 'java'],
['sqoop', 'oozie', 'hive', 'java']]
"""
# 扁平化处理
# map转换增强版,用于进行一对多的转换操作,相当于 先执行map操作、然后执行flat(扁平化操作)
rdd_flatmap = rdd_init.flatMap(lambda line : line.split())
# print(rdd_flatmap.collect())
"""
['hadoop', 'hive', 'sqoop', 'hadoop', 'sqoop', 'oozie', 'hive', 'java', 'spark', 'python', 'spark', 'java', 'sqoop', 'oozie', 'hive', 'java']
"""
# 4- 将每一个单词转换为 (单词,1)
rdd_map = rdd_flatmap.map(lambda word:(word,1))
# print(rdd_map.collect())
"""
[('hadoop', 1), ('hive', 1), ('sqoop', 1), ('hadoop', 1), ('sqoop', 1), ('oozie', 1), ('hive', 1), ('java', 1), ('spark', 1), ('python', 1), ('spark', 1), ('java', 1), ('sqoop', 1), ('oozie', 1), ('hive', 1), ('java', 1)]
"""
# 5- 根据key分组聚合
rdd_res = rdd_map.reduceByKey(lambda agg,curr:agg+curr) #agg,curr 为值
# 6- 输出
print(rdd_res.collect())
无法加载到java_home
原因:
目前pycharm连接远程的python环境,执行python的代码,最终是将代码运行在远端环境的,但是在远端环境中,可能存在多个python环境,以及内部加载的 .bashrc中环境信息,但是这个环境中压根就没有 JAVA_HOME;安装pyspark库同步安装了另一个 py4j的库,spark程序运行,需要将python的代码 转换为java代码从而运行解决:
演示:pySpark入门案例: WordCount
需求:从HDFS中读取数据,对数据进行统计分析(Wordcount):最后将结果根据单词数量进行倒序排序,并将结果写出HDFS上
# 演示:pySpark入门案例: WordCount
# 需求:从HDFS中读取数据,对数据进行统计分析(Wordcount):最后将结果根据单词数量进行倒序排序,并将结果写出HDFS上
import os
from pyspark import SparkContext, SparkConf
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('pyspark的入门系例:Wordcount,从HDFS读取数据')
# 1- 创建spark核心对象 SparkConf
conf = SparkConf().setAppName('WordCount').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2- 读取HDFS数据
rdd_init = sc.textFile('hdfs://node1:8020/pyspark_data/word.txt')
# print(rdd_init.collect())
# 3- 对数据执行切割:每一行都有可能产生多个单词,所以这里切割,是一个 1对多操作 采用flatmap()
rdd_flatmap = rdd_init.flatMap(lambda line : line.split())
# 4- 将每一个单词转换为 单词,1
rdd_map = rdd_flatmap.map(lambda word: (word,1))
# 5- 根据key分组聚合
rdd_res = rdd_map.reduceByKey(lambda agg, curr: agg + curr)
#6- 根据结果进行排序
rdd_res.sortBy(lambda wd_tup: wd_tup[1])
# 7- 写入到hdfs
rdd_res.saveAsTextFile('hdfs://node1:8020/pyspark_data/wd_output')
# 8- 释放资源
sc.stop()
2.3 提交任务spark-submit
spark-submit
基础格式:
** ./spark-submit --master 文件**
说明:
./spark-submit:用于将spark程序提交到指定的平台来运行,同时可以设置各种相关配置参数./spark-submit --master local[*]
/export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pyspark_wd.py
五、Spark on yarn环境构建
1. Spark on Yarn的本质
Spark On Yarn的本质:指的将Spark程序提交到Yarn集群中,通过yarn进行统一的调度运行操作
这种操作,将会是以后主要的提交上线部署的方式
2- 提交测试
cd /export/server/spark/bin \
./spark-submit \
--master yann \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ /export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pyspark_wd.py
2.1 两种部署方式说明
指的在将Spark任务提交到集群(YARN, Spark集群为主)的时候,提供两种提交部署方案:
client模式,cluster模式
本质区别:Spark程序中Driver程序运行在什么位置上
client模式:客户端,Driver程序运行在执行spark-submit所在节点上
**好处**:由于Driver是运行在客户端,当执行完成后,需要查看结果,此时executor会将结果返回给Driver,Driver在客户端,直接答应,我们直接在客户端看到执行结果(**方便测试**) 在客户端模式下,不存在Driver的日志,因为日志是直接输出客户端 **弊端**:由于客户端和executor有可能不在同一个环境中,会致中间网络传输效率比较低,从而影响整体的效率(生产环境不适用)
cluster模式:集群模式,Driver程序运行在提交集群所在的某一个节点上
**好处**:个集群环境中,在进行传输数据的时候,提升效率可以更大利用内部网络带宽优势, **弊端**:不方便测试,Driver运行在集群环境中,所有的内容全部都会记录到日志文件中,无法返回给提交的客户端,所以客户端想要查看结果,需要看日志(生产环境适用)
如何使用这两种方式呢?
cd /export/server/spark/bin** **
**./spark-submit ****--master yann **
**--deploy-mode ****client ****| ****cluster ****\ ****(默认client ****)**
**--conf "spark.pyspark.driver.python=****/root/anaconda3/bin/python3" ****
--conf "spark.pyspark.python=****/root/anaconda3/bin/python3"** ****/export/data/workspace/pyspark_parent/01_pyspark_base/src/02_pyspark_wd.py**
如何查看(spark日志)
查看Spark日志一般有二种方式:
一种为通过yarn查看
![](https://i-blog.csdnimg.cn/direct/f48a3326a5bd4adcb88efd9a9d0ab276.png)
一种通过Spark的18080端口查看
Driver和executor日志
查看对应Task的日志
3. Spark程序和PySpark交互流程
3.1 提交到Spark集群 部署方式为client
- 1-启动Driver程序
- 2-向Master申请资源
- 3-Master根据要申请的资源,返回对应资源列表 - executor1 :node1 1核 1GB- executor2 :node3 1核 1GB
- 4-连接对应worker节点,通知他们启动Executor,当我worker启动完成后,需要反向注册回Driver(通知)
- 5-Driver开始执行Main函数: - 5.1 初始化sc对象:构建Sparkcontext,基于py4j.将python中定义的如何初始化sc对象的代码转换为iava代码进行执行处理 - 5.2 当Driver程序构建好SC对象后,开始将后续的所有的RDD相关的算子全部合并在一起,根据rdd之间的依赖的关系,形成一个DAG执行流程图 ,划分出一共有多少个阶段 以及每个阶段需要运行多少个线程,最后每个线程需要运行在那个executor上(任务分配)- 5.3 当确定每个节点中有多少个线程以及每个线程应该运行在那个executor后,接下来将这些任务信息发送给对应executor,让其进行执行处理- 5.4 当executor接收到任务后,根据任务信息,开始运行处理,当Task运行完成后,如果需要将结果返回给Driver(比如:执行co]lectO),直接通过网络返回执行,如果不需要返回(执行:saveasTextFi1eO)),线程直接处理,当内部Task执行完成后executor会通知Driver已经执行完成了- 5.5 当Driver程序接收到所有的节点都运行完成后,执行 后续的非RDD的代码,并最终将sc对象关闭,通知Master执行完成了Master回收资源
**3.2 提交到Spark集群 部署方式为Cluster **
- 1-首先会先将任务信息提交到Master主节点上
- 2-当Master收到任务信息后,首先会根据Driver的资源信息,随机找一台worker节点用于启动Driver程序,并将任务信息交给Driver
- 3-当对应worker节点收到请求后,开启启动Driver,启动后会和master保持心跳机制,告知Master启动成功了,启动后立即开始申请资源(executor)
- Master根据要申请的资源,返回对应资源列表 - executor1 :node1 1核4- 1GB- executor2 :node3 1核 1GB
- 5-连接对应worker节点,通知他们启动Executor,当我worker启动完成后,需要反向注册回Driver(通知)
- 6-Driver开始执行Main函数: - 6.1 初始化sc对象:构建Sparkcontext,基于py4j.将python中定义的如何初始化sc对象的代码转换为iava代码进行执行处理 - 6.2 当Driver程序构建好SC对象后,开始将后续的所有的RDD相关的算子全部合并在一起,根据rdd之间的依赖的关系,形成一个DAG执行流程图 ,划分出一共有多少个阶段 以及每个阶段需要运行多少个线程,最后每个线程需要运行在那个executor上(任务分配)- 6.3 当确定每个节点中有多少个线程以及每个线程应该运行在那个executor后,接下来将这些任务信息发送给对应executor,让其进行执行处理- 6.4 当executor接收到任务后,根据任务信息,开始运行处理,当Task运行完成后,如果需要将结果返回给Driver(比如:执行co]lectO),直接通过网络返回执行,如果不需要返回(执行:saveasTextFi1eO)),线程直接处理,当内部Task执行完成后executor会通知Driver已经执行完成了- 6.5 当Driver程序接收到所有的节点都运行完成后,执行 后续的非RDD的代码,并最终将sc对象关闭,通知Master执行完成了Master回收资源
3.4 提交到Yarn集群,部署方式为Client
在YARN Cient模式下,Driver在任务提交的本地机器上运行,示意图如下:
3.5 提交到Yarn集群,部署方式为Cluster
在YARN Cluster模式下,Driver运行在NodeManager Contanier中,此时Driver与AppMaster合为一体,示意图如下:
**3.6 spark集群角色 **
六、Spark-submit相大参数说明
说 local(本地), spark集群,yarn集群,云上调度平台(k8s ...)
spark-submit 这个命令 是我们spark提供的一个专门用于提交spark程序的客户端,可以将spark程序提交到各种资源调度平台上:比如
spark-submit在提交的过程中,设置非常多参数,调整任务相关信息
基本参数设置
资源配置参数
executor的资源配置参数
七、spark core(*)
1. RDD的介绍
RDD: 弹性分布式数据集
目的: 为了支持高效的迭代计算
早期计算模型:单机模型
比如说:python计算,MySQL
适用于:小规模数据的处理操作,中间结果也可以保存的内存或者磁盘随着发展,单机模型并不能够应对未来数据不断的增长,因为单机总有资源的上限,无法无限制的扩展 想到方案:分布式计算操作
产生了MapReduce计算框架,通过分而治之思想,将任务拆解多个部分,进行分布式运算,从而解决大规模数据计算的操作
MR的计算流程:
首先读取数据,将数据读取到内存中,对数据进行转换处理,将处理后的数据写入到环形缓冲区(内存),当环形缓冲区写满后,将数据溢写到磁盘上,接着将磁盘上多个溢写的文件进行合并操作(从磁盘到内存,从内存到磁盘)最后reduce来拉取处理又是从磁盘到内存,内存到磁盘操作 这种处理模型最大好处,可以在有限的资源下,处理大规模的数据 同样也存在着一些弊端,比如说IO比较大,运行效率降低
如果在计算过程中,需要进行选代计算操作,对于MR来说,需要使用多个MR串行处理,第一个MR执行完成后,将结果保存下来,然后第二个MR读取第一个MR执行结果,进行处理操作,依次类推这种方式同样效率比较低,因为每一次运行MR都要重新申请资源
思考:那么能不能有一种技术可以支持高效计算同时也可以更好的选代计算操作,并且也可以处理大规模的数据——RDD(弹性分布式数据集)而spark将RDD进行实现
MR迭代流程图
Spark迭代方案
算子: 在spark中将一些具有某种功能的RDD函数称为算子
1.1 RDD的五大特性
RDD的五大特性:
- (必须满足)可分区的:每一个RDD必须能够进行分区操作,每一个分区代表就是一个Task线程,可分区也是分布式计算要求
- (必须满足)存在计算函数:每一个RDD都是由一个计算函数而得出的
- (必须满足)RDD之间存在依赖关系
- (可选)对于kv类型的RDD都是存在分区函数的
- (可选)移动存储不如移动计算(让计算离数据越近越好)
1.2 RDD五大特点
- 分区:RDD可以分区的
- 只读:不允许进行修改,如果要修改,处理后会得到一个新的RDD,原有RDD保持现状
- 依赖:RDD之间有依赖关系(血缘关系):两种依赖关系(宽依赖、窄依赖)
- 缓存:在进行RDD计算操作的时候,当一个RDD的结果被重复使用多次的时候,可以将RDD的结果进行缓存
- 检查点(checkpoint):当RDD之间的依赖链条越长后,为了减少中间处理失败整个回溯效率低的问题,可以进行checkpoint操作,在RDD依赖链条中保存几个快照(checkpoint),将阶段的结果保存到磁盘(HDES)上,当后续RDD出现失败了 可以直接从磁盘上读取阶段结果,不需要进行回溯整个流程,从而提升效率
- RDD的创建
构建RDD主要有二种方式
一种是通过本地集合模拟数据方式(Test测试使用)一种通过读取外部文件的方式(正常使用方式)
2.1 通过本地集合模拟数据方式
import os
from pyspark import SparkContext, SparkConf
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('如何创建RDD')
# 1-构建SparkContext对象
conf = SparkConf().setAppName("create_rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 2- 构建RDD对象
rdd_init = sc.parallelize(['赵一','钱二','张三','李四','王五','赵六','李老八'],5) #5个分区
print(rdd_init.collect())
# 需求: 给每一个元素添加一个10序号
rdd_map = rdd_init.map(lambda name: name + '10')
print(rdd_map.collect())
# 如何査看RDD中每个分区的数据,以及分区的数最
print(rdd_init.getNumPartitions()) #两个分区
print(rdd_init.glom().collect()) #分区及内容
说明
验证RDD是存在分区的,而且每一个计算函数作用在每个分区上进行处理
如何查看RDD有多少个分区:rdd.getNumPartitions()
如何查看RDD中每一个分区的数据:rdd.glom()
当采用本地模拟数据(并行方式)构建RDD的时候,RDD分区数量取决于:
** 1- 默认:setMaster("local[N]")
取快于 N的值,当N为的时候,表示为当前节点的CPU核心数*** 2- 同时也可以通过para1lelize算子来设置分区数量,设置多少就为多少个分区**
2.2 通过读取外部文件的方式
# 1- 创建sparkContext对象
conf = SparkConf().setAppName('create_RDD').setMaster('local[*]')
sc = SparkContext(conf=conf)# 2- 读取外部文件
rdd_init=sc.textFile('file:///export/data/workspace/pyspark_parent/02_pyspark_core/data/word.txt')
print(rdd_init.getNumPartitions())
print(rdd_init.glom().collect())
当读取后,分区 过多,在写入到HDFS的时候,就会产生大量的小文件,如果由丁数据源文件比较多导致的,可以采用在读取数据的时候,尽量减少分区数
wholeTextFiles:尽可能的减少分区数量,从前减少最终输出到目的地文件数量
import os
from pyspark import SparkContext, SparkConf
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('如何创建RDD外部文件')
# 1- 创建sparkContext对象
conf = SparkConf().setAppName('create_RDD').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2- 读取外部文件
# 4个分区
# rdd_init = sc.textFile('file:///export/data/workspace/pyspark_parent/02_pyspark_core/data')
# 读取小文件 wholeTextFiles:尽可能的减少分区数量,从前减少最终输出到目的地文件数量
# 2个分区
rdd_init = sc.wholeTextFiles('file:///export/data/workspace/pyspark_parent/02_pyspark_core/data')
print(rdd_init.getNumPartitions())
print(rdd_init.glom().collect())
关于textFile在读取数据的时候,分区数量确定:
确定分区数量取决于两个公式:
defaultMinPartitions = min(default.parallelism,2)|设置minPartition的值
其中:
default.parallelism=setMaster('1ocal[N]')=N值设置minPartition的值 textFile(path,minPartition) 如果设置minpartition的值,defaultMinpartitions 等于此值,否则按 min(default.parallelism,2)来计算 本地文件 rdd分区数量= max(文件分片数量,defaultMinPartitions) HDFS文件: rdd分区数量= max(文件block块数量,defaultMinPartitions)
- RDD的操作
3.1 算子的分类
转换算子
- 返回一个新的RDD
- 所有的转换算子都是情性的,只有遇到action算子才会触发执行
- RDD并不存储实际的数据,只存储转换的规则,当遇到action算子后,根据规则对数据进行处理即可
动作算子
- 不会返回一个RDD,要不然没有返回值,要不返回其他的
- 所有动作算子都是立即执行,在代码中每一个action都会去触发一个JOB的任务
3.2.RDD的Transformation算子操作
3.2.1 值类型的算子
主要是针对value进行处理相关的算子
Map算子: 一对一的转换操作
作用: 根据用户传入的自定义转换规则(函数)将数据一对一的转换称为一个新的RDD map算子是作用在每一个分区上每一个元素上,对每一个元素执行自定义的函数
groupBy算子: 用于执行分组操作
格式: groupBy(fn)
作用: 根据传入的函数对数据进行分组操作,每一组都是一个迭代器(列表)
filter算子:
格式: filter(fn)
作用: 根据传入自定义函数对数据进行过滤操作,自定义函数用于定义过滤规则注意: 自定义函数返回false 表示过滤掉,返回true表示保留此数据
3.2.2 双值类型DoubleValueType
union和intersection:
union: 并集,用于计算两个RDD的并集intersection: 交集,用于计算两个RDD的交集
3.3.3 kv类型的相关的算子
groupByKey算子:
格式: groupByKey()
作用: 根据Key进行分组,将value合并为一个迭代器(列表)
reduceBykey算子
格式: reduceBykey(fn)
作用: 根据key进行分组,将value合并为一个迭代器(列表).,然后根据传入的自定义函数,对每组中value数据进行聚合处理
sortByKey算子:
格式: sortByKey()。
作用: 根据key进行排序操作,默认为升序,如果要倒序排列,需要设置 asc参数为 False
countByValue(了解)
格式:countByValue()
作用: 对value进行count的数量统计
3.3 RDD的action算子
collect():
格式: collect()
作用:将各个分区的处理完成的数据收集回来,统一的放置在一个列表中。
reduce():
格式: reduce(fn)
作用:对数据进行聚合操作,根据传入的fn完成相关聚合统计
first:
格式: first()
作用: 获取第一个数据
take:
格式: take(N)
作用: 获取前N个数据 类似于 SQL中limit
top:
格式:top(N,[fn])
作用: 首先对数据进行降序排序,排序后,获取前N个元素,同时支持自定义排序规则。对于单值类型的数据,直接降序排序,对于kv类型的,默认按照k进行倒序排序,也可以自定义排序方案
count:
格式: count()
作用: 获取一共返回多少个元素
takeSample:
格式: takeSample(是否允许重复抽取,抽样数量, 种子值)
作用: 用于对数据进行抽样说明: 一旦设置种子值,会导致每次采样的结果都是一致的
foreach:
格式: foreach(fn)作用: 对数据执行遍历操作,没有返回值
3.3 基本函数
3.4 分区函数
3.4.2 分区函数
分区函数:指的对rdd中每一个分区进行处理操作
普通函数:指的对RDD中每一个分区下每一个元素进行操作
比如说:map 和mapPartitions foreach 和foreachPartition
下面的两个图描述了分区函数和普通函数区别,一个对每个分区中每个元素执行自定义函数,而分区函数是直接将整个分区数据传递给了自定义函数,从而导致 分区函数触发执行自定义函数次数更低一些,普通函数自定义函数触发次数更多
分区函数的好处:
减少自定义函数触发的次数,如果说自定义函数中有一些资源相关的操作(比如IO),触发次数减少了 资源调度少了举例说明:
比如说,需要将每一个数据依次的写入到数据库中,此时可以通过foreach相关函数来处理,遍历一个,然后将数据在自定义函数开启连接数据库连接 将数据写入到数据库中,有多少个元素,就需要开启多少次的连接map:
mapPartitions:
map和mapPartitions
** foreach 和foreachPartition**
3.4.3 重分区的函数
作用:对RDD中分区数量进行增大或者减少
在什么时候需要增大分区数量?
一个分区最终是通过一个线程来处理,当分区中**数据量比较大**的时候,也就意味每个线程处理的数据量比较大,此时可以**增大分区数量**,从而减少每个线程处理的数据量** 提高并行度 提升效率**
在什么时候需要减少分区数量呢?
当每个分区中数据量比较少的时候,过多的分区增加了资源损耗,此时可以减少分区的数量,减少资源的浪费当需要将数据写出到文件系统的时候,分区越多意味着输出的文件数量越多,减少分区数量 从而减少输出文件数量,避免小文件过多
增加分区函数
函数名称:repartition,此函数使用的谨慎,会产生Shuffle。格式:reparation(N)
减少分区函数
coalesce算子:减少分区不会产生 Shuffle
格式:coalesce(N,ifshuffle=False) 参数2 表示是否有shuffle,默认为False 表示没有 此时只能减少分区
总结:
reparation 是coalesce的一种当参数2为True-种简写
coalesce 默认只能进行减少分区,如果要增大分区,需要将参数2设置为True,一旦设置为True就会产生shuffle
区别:
- 1)两个算子都是用于重分区的算子,一个能增大也能减少,而coalesce默认只能减少分区
- 2)reparation默认会产生shuffle 而cpalesce默认没有shuffle
- 3)reparation 是coalesce的一种当参数2为True一种简写
专门用于针对kv类型的调整分区的函数: partitionBy
格式:
** partitionBy(N[,fn]) **参数2是用于设置分区规则
3.5 聚合函数
3.5.1 单值聚合算子
reduce l fold l aggregate
格式:
- **reduce(fn)**:根据传入的自定义函数,对整个数据集进行聚合统计默认 agg初始值为0 ** **
- **fold(default,fn)**:根据传入的自定义函数,对整个数据集进行聚合统计 并且可以设置agg的初始值
- **aggregate(defaut,fn1,fn2)**:参数1表示agg初始值参数2:对每个分区执行函数参数3:对各个分区结果进行聚合函数
3.5.2 kv类型聚合算子
groupByKey() reduceByKey() foldByKey() aggregateByKey()
说明:
reduceByKey() foldByKey() aggregateByKey() 和 reduce l fold | aggregate都是类型的,只是比单值多了一个分组操作,对每一个组内部聚合聚合
**groupByKey **是仅分组 无聚合计算
面试题:reduceBykey ===groupByKey+ reduce 请问两种处理模式,那种效率更高一些呢?
肯定是reduceBykey效率更高一些,因为可以现在每个分区中执行提前聚合计算,从而减少中间shuffle的数据量 提升效率
3.6 关联函数
涉及到函数格式:
** join()**:内连接 **leftouterJoin()**:左关联 **rightouterJoin()**:右关联 ** fullouterJoin()**:全外关联
主要针对kv类型
4. 案例
4.1 搜索案例
数据处理:
**import os
from pyspark import SparkContext, SparkConf
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'if name == 'main':
print('搜狗案例')**
** # 需求-:统计每个关键词出现了多少次
# 需求二:统计每个用户每个搜索词点击的次数
# 需求三:统计每个小时点击次数**** #**** 1- 创建sparkcontext对象**
** conf = SparkConf().setAppName('sougou').setMaster('local[*]')
sc = SparkContext(conf=conf)# ****2- 读取外部数据**
** rdd_init=sc.textFile('file:///export/data/workspace/pyspark_parent/02_pyspark_core/data/SogouQ.sample')
# print(rdd_init.count())#**** 3- 过滤数据:保证数据不为空,并且字段为6个**
** rdd_filter = rdd_init.filter(lambda line : line.strip() != '' and len(line.split()) == 6)
# print(rdd_filter.count())# ****4- 对数据进行切割放入元组 一行一个元组**
** rdd_map = rdd_filter.map(lambda line : (
line.split()[0],
line.split()[1],
line.split()[2][1:-1],
line.split()[3],
line.split()[4],
line.split()[5]
))
print(rdd_map.take(10))**
需求计算
需求一:统计每个关键词出现了多少次
在目前的数据集中没有一个字段代表是关键词,但是关键词是包含在搜索词中,一个搜索词中可能包含了多个关键词倒如:
电脑创业 ---> 电脑 创业发现搜索词中包含了多个关键词,所以首先需要从搜索词中提取各个关键词,那么也就意味着要对数据进行分词操作
如何进行分词呢? 中文分词
python:jieba库
java:IK分词器
如何使用iieba分词器呢?
1- 需要安装jieba分词器库pip install jieba(local模式需要在node1安装即可,如果集群模式各个节点都需要安装)**pip install jieba**
2- 在代码中引入jieba库,进行使用
# 默认模式 print(list(jieba.cut('你好同志我毕业于清华大学'))) 结果:['你好', '同志', '我', '毕业', '于', '清华大学'] # 全模式(最细粒度分析) print(list(jieba.cut('你好同志我毕业于清华大学',cut_all=True))) 结果:[['你好', '同志', '我', '毕业', '于清华', '清华', '清华大学', '华大', '大学'] # 搜索引擎模式 print(list(jieba.cut_for_search('你好同志我毕业于清华大学'))) 结果:[['你好', '同志', '我', '毕业', '于', '清华', '华大', '大学', '清华大学']
代码:
# 5.1需求一:统计每个关键词出现了多少次 # 5.1.1获取搜索词 rdd_seach = rdd_map.map(lambda line_tup : line_tup[2]) # print(rdd_seach.take(10)) # 5.1.2对搜索词进行分词操作 rdd_keywords = rdd_seach.flatMap(lambda seach : jieba.cut(seach)) # print(rdd_keywords.take(10)) # 5.1.3将每个关键词转换为 (关键词,1),进行分组统计 rdd_res = rdd_keywords.map(lambda keyword : (keyword,1)).reduceByKey(lambda agg,curr : agg+curr) # print(rdd_res.take(10)) # # 5.1.4 对结果数据倒序排序 rdd_sort = rdd_res.sortBy(lambda res : res[1],ascending=False) # # 5.2.5 打印结果 print(rdd_sort.take(50))
需求2:统计每个用户每个搜索词点击的次数
def xuqiu2(rdd_map): # SQL:select user,搜索词,count(1) from 表 group by user,搜索词; # 提取用户和搜索词 rdd_user_search = rdd_map.map(lambda line_tup: (line_tup[1], line_tup[2])) # 基于用户和搜索词进行分组统计 rdd_res = rdd_user_search.map(lambda user_sarch: (user_sarch, 1)).reduceByKey(lambda agg, curr: agg + curr) rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False) return rdd_sort.take(30)
需求三:统计每个小时点击次数
def xuqiu3(rdd_map): # 提取小时 rdd_hour = rdd_map.map(lambda line: line[0].split(':')[0]) rdd_res = rdd_hour.map(lambda line: (line, 1)).reduceByKey(lambda agg, curr: agg + curr) rdd_sort = rdd_res.sortBy(lambda line: line[1], ascending=False) return rdd_sort.take(50)
示例代码:
import os
import jieba
from pyspark import SparkContext, SparkConf
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
def xuqiu1(rdd_map):
# 5.1.1获取搜索词
rdd_seach = rdd_map.map(lambda line_tup: line_tup[2])
# print(rdd_seach.take(10))
# 5.1.2对搜索词进行分词操作
rdd_keywords = rdd_seach.flatMap(lambda seach: jieba.cut(seach))
# print(rdd_keywords.take(10))
# 5.1.3将每个关键词转换为 (关键词,1),进行分组统计
rdd_res = rdd_keywords.map(lambda keyword: (keyword, 1)).reduceByKey(lambda agg, curr: agg + curr)
# print(rdd_res.take(10))
# # 5.1.4 对结果数据倒序排序
rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False)
# # 5.2.5 打印结果
return rdd_sort.take(50)
def xuqiu2(rdd_map):
# SQL:select user,搜索词,count(1) from 表 group by user,搜索词;
# 提取用户和搜索词
rdd_user_search = rdd_map.map(lambda line_tup: (line_tup[1], line_tup[2]))
# 基于用户和搜索词进行分组统计
rdd_res = rdd_user_search.map(lambda user_sarch: (user_sarch, 1)).reduceByKey(lambda agg, curr: agg + curr)
rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False)
return rdd_sort.take(30)
def xuqiu3(rdd_map):
# 提取小时
rdd_hour = rdd_map.map(lambda line: line[0].split(':')[0])
rdd_res = rdd_hour.map(lambda line: (line, 1)).reduceByKey(lambda agg, curr: agg + curr)
rdd_sort = rdd_res.sortBy(lambda line: line[1], ascending=False)
return rdd_sort.take(50)
if __name__ == '__main__':
print('搜狗案例')
# 需求-:统计每个关键词出现了多少次
# 需求二:统计每个用户每个搜索词点击的次数
# 需求三:统计每个小时点击次数
# 1- 创建sparkcontext对象
conf = SparkConf().setAppName('sougou').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2- 读取外部数据
rdd_init = sc.textFile('file:///export/data/workspace/pyspark_parent/02_pyspark_core/data/SogouQ.sample')
# print(rdd_init.count())
# 3- 过滤数据:保证数据不为空,并且字段为6个
rdd_filter = rdd_init.filter(lambda line : line.strip() != '' and len(line.split()) == 6)
# print(rdd_filter.count())
# 4- 对数据进行切割放入元组 一行一个元组
rdd_map = rdd_filter.map(lambda line : (
line.split()[0],
line.split()[1],
line.split()[2][1:-1],
line.split()[3],
line.split()[4],
line.split()[5]
))
# print(rdd_map.take(10))
# 5- 统计分析
# 5.1需求一:统计每个关键词出现了多少次
xuqiu1(rdd_map)
# 5.2 需求二:统计每个用户每个搜索词点击的次数
xuqiu2(rdd_map)
# 5.3 需求三:统计每个小时点击次数
xuqiu3(rdd_map)
4.2 通过PySpark实现点击流日志分析
需求一: 统计pv(访问次数) 和 uv(用户数量)
def pv_uv_cnt(rdd_filter): # pv:访问次数 rdd_pv = rdd_filter.count() # uv: 独立访客数 rdd_uv = rdd_filter.map(lambda line: line.split()[0]).distinct().count() return rdd_pv, rdd_uv
需求二:统计每个访问的URL的次数,找到前10个
def xuqiu2(rdd_filter): rdd_res = rdd_filter.map(lambda line: (line.split()[6], 1)).reduceByKey(lambda agg, curr: agg + curr).sortBy( lambda line: line[1], ascending=False) print(rdd_res.take(10))
示例代码:
import os
import jieba
from pyspark import SparkContext, SparkConf
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
def xuqiu2(rdd_filter):
rdd_res = rdd_filter.map(lambda line: (line.split()[6], 1)).reduceByKey(lambda agg, curr: agg + curr).sortBy(
lambda line: line[1], ascending=False)
print(rdd_res.take(10))
def pv_uv_cnt(rdd_filter):
# pv:访问次数
rdd_pv = rdd_filter.count()
# uv: 独立访客数
rdd_uv = rdd_filter.map(lambda line: line.split()[0]).distinct().count()
return rdd_pv, rdd_uv
if __name__ == '__main__':
print('点击流')
# 1- 创建sparkcontext对象
conf = SparkConf().setAppName('sougou').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2- 读取外部数据
rdd_init = sc.textFile('file:///export/data/workspace/pyspark_parent/02_pyspark_core/data/access.log')
# print(rdd_init.count())
# 3- 过滤数据:保证数据不为空,并且字段大于等于12个
rdd_filter = rdd_init.filter(lambda line : line.strip() != '' and len(line.split()) >= 12)
# print(rdd_filter.count())
# 4- 统计pv和uv
pv_uv_cnt(rdd_filter)
# 5 - 统计每个访问的URL的次数
xuqiu2(rdd_filter)
5. RDD 持久化
5.1 RDD的缓存
缓存:
当一个RDD的产生过程(计算过程),是比较昂贵的(生成RDD整个计算流程比较复杂),并且这个RDD可能会被多方(RDD会被重复使用)进行使用了此时为了提升计算效率,可以将RDD的结果设置为缓存,这样后续在使用这个RDD的时候,无需在重新计算了,直接获取缓存中数据
提升spark的容错的能力,正常情况,当spark中某一个RDD计算失败的时候,需要对整个RDD链条进行整体的回溯计算,有了缓存可以将某些阶段的RDD进行缓存操作,这样当后续的RDD计算失败的时候,可以从最近的一个缓存中恢复数据 重新计算即可,无需在后,回溯所有链条
应用场景:
- 1-当一个RDD被重复使用的时候,可以使用缓存来解决
- 2-当一个RDD产生非常昂贵的时候,可以将RDD设置为缓存
- 3-当需要提升容错能力的时候,可以在局部设置一些缓存来提升容错能力
注意事项:
- 1-缓存仅仅是一种临时存储,可以将RDD的结果数据存储到内存(executor)或者 磁盘 甚至可以存储到堆外内存中
- 2 -由于缓存的存储是一种临时存储,所以缓存的数据有可能丢失的,所以缓存操作并不会将RDD之间的依赖关系给截断掉(清除掉),以防止当缓存数据丢失的时候,可以让程序进行重新计算操作
- 缓存的API都是]azy的设置缓存后,并不会立即触发,如果需要立即触发,后续必须跟action算子建议使用 count
5.1.1 缓存函数
设置缓存的相关API:
**cache()**:执行设置缓存的操作,cache在设置缓存的时候,仅能将缓存数据放置到内存**persist(设置缓存级别)**:执行设置缓存的操作,默认情况下,将缓存数据放置到内存中,同时支持设置其他缓存方案
手动清理缓存:
**unpersisto**:清理缓存
默认情况下,当程序执行完成后,缓存会被自动清理
** 常用的缓存级别有那些呢?**
常用:
** MEMORY_ONLY:**仅缓存到内存中
** MEMORY ONLY SER:**仅缓存到内存中,同时在缓存数据的时候,会对数据进行序列化(从对象 --> 二进制数据)操作,可以在一定程度上减少内存使用量
**MEMORY_AND_DISK**: **MEMORY_AND_DISK_2**:优先将数据保存到内存中,当内存不足的时候,可以将数据保存到磁盘中,带2的表示保存二份 ** MEMORY AND DISK SER**: **MEMORY_AND_DISK_SER_2**:优先将数据保存到内存中,当内存不足的时候,可以将数据保存到磁盘中,带2的表示保存二份,对于保存到内存的数据,会进行序列化的操作,从而减少内存占用量 提升内存保存数据体量,对磁盘必须要进行序列化
序列化: 将数据 从 对象 转换为 二进制的数据,对于RDD的数据来说,内部数据都是一个个对象,如果没有序列化是直接将对象存储到内存中,如果有序列化会将对象转换为二进制然后存储到内存中
好处:减少内存的占用量,从而让有限内存可以存储更多的数据
弊端:会增大对CPu的占用量,因为转换的操作,要使用CPU来工作
代码演示
import os
import time
import jieba
from pyspark import SparkContext, SparkConf ,StorageLevel
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
def xuqiu1(rdd_map):
# 5.1.1获取搜索词
rdd_seach = rdd_map.map(lambda line_tup: line_tup[2])
# print(rdd_seach.take(10))
# 5.1.2对搜索词进行分词操作
rdd_keywords = rdd_seach.flatMap(lambda seach: jieba.cut(seach))
# print(rdd_keywords.take(10))
# 5.1.3将每个关键词转换为 (关键词,1),进行分组统计
rdd_res = rdd_keywords.map(lambda keyword: (keyword, 1)).reduceByKey(lambda agg, curr: agg + curr)
# print(rdd_res.take(10))
# # 5.1.4 对结果数据倒序排序
rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False)
# # 5.2.5 打印结果
return rdd_sort.take(50)
def xuqiu2(rdd_map):
# SQL:select user,搜索词,count(1) from 表 group by user,搜索词;
# 提取用户和搜索词
rdd_user_search = rdd_map.map(lambda line_tup: (line_tup[1], line_tup[2]))
# 基于用户和搜索词进行分组统计
rdd_res = rdd_user_search.map(lambda user_sarch: (user_sarch, 1)).reduceByKey(lambda agg, curr: agg + curr)
rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False)
return rdd_sort.take(30)
def xuqiu3(rdd_map):
# 提取小时
rdd_hour = rdd_map.map(lambda line: line[0].split(':')[0])
rdd_res = rdd_hour.map(lambda line: (line, 1)).reduceByKey(lambda agg, curr: agg + curr)
rdd_sort = rdd_res.sortBy(lambda line: line[1], ascending=False)
return rdd_sort.take(50)
if __name__ == '__main__':
print('搜狗案例')
# 需求-:统计每个关键词出现了多少次
# 需求二:统计每个用户每个搜索词点击的次数
# 需求三:统计每个小时点击次数
# 1- 创建sparkcontext对象
conf = SparkConf().setAppName('sougou').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2- 读取外部数据
rdd_init = sc.textFile('file:///export/data/workspace/pyspark_parent/02_pyspark_core/data/SogouQ.sample')
# print(rdd_init.count())
# 3- 过滤数据:保证数据不为空,并且字段为6个
rdd_filter = rdd_init.filter(lambda line : line.strip() != '' and len(line.split()) == 6)
# print(rdd_filter.count())
# 4- 对数据进行切割放入元组 一行一个元组
rdd_map = rdd_filter.map(lambda line : (
line.split()[0],
line.split()[1],
line.split()[2][1:-1],
line.split()[3],
line.split()[4],
line.split()[5]
))
# print(rdd_map.take(10))
#TODO---------------设置缓存------------------#
# storageLevel 这个类需要在前面的from pyspark中加入此对象的导入
# 一般建议,设置完缓存后,让其立即触发,调用action算子 -> count
rdd_map.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()
# 5- 统计分析
# 5.1需求一:统计每个关键词出现了多少次
xuqiu1(rdd_map)
# 5.2 需求二:统计每个用户每个搜索词点击的次数
# 清理缓存
rdd_map.unpersist().count()
xuqiu2(rdd_map)
# 5.3 需求三:统计每个小时点击次数
xuqiu3(rdd_map)
# time.sleep(1000)
**是否有缓存 **
**查看缓存信息 **
5.2 RDD的checkpoint检查点
checkpoint跟缓存类似,也可以将某一个RDD结果进行存储操作,一般都是将数据保存到HDFS中,提供一种更加可靠的存储方案,所以说采用checkpoint方案,会将RDD之间的依赖关系给截断掉
checkpoint出现, 从某种角度上也可以提升执行效率(没有缓存高),更多是为了容错能力
对于checkpoint来说,大家可以将其理解为对整个RDD链条进行设置阶段快照的操作
由于checkpoint这种可靠性,储着,不会被删除,需要手动删除所以Spark本身只管设置,不管删除,所以checkpoint即使程序停止了,checkpoint数据依然存
如何设置checkpoint呢?
1-通过sc对象,设置checkpoint保存数据的位置
** sc.setcheckpointDir( 'hdfs路径' )**
2-通过rdd.checkpoint()设置开启检查点(1azy)
3-通过rdd.count()触发检查点的执行
示例:
import os
import time
import jieba
from pyspark import SparkContext, SparkConf ,StorageLevel
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
def xuqiu1(rdd_map):
# 5.1.1获取搜索词
rdd_seach = rdd_map.map(lambda line_tup: line_tup[2])
# print(rdd_seach.take(10))
# 5.1.2对搜索词进行分词操作
rdd_keywords = rdd_seach.flatMap(lambda seach: jieba.cut(seach))
# print(rdd_keywords.take(10))
# 5.1.3将每个关键词转换为 (关键词,1),进行分组统计
rdd_res = rdd_keywords.map(lambda keyword: (keyword, 1)).reduceByKey(lambda agg, curr: agg + curr)
# print(rdd_res.take(10))
# # 5.1.4 对结果数据倒序排序
rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False)
# # 5.2.5 打印结果
return rdd_sort.take(50)
def xuqiu2(rdd_map):
# SQL:select user,搜索词,count(1) from 表 group by user,搜索词;
# 提取用户和搜索词
rdd_user_search = rdd_map.map(lambda line_tup: (line_tup[1], line_tup[2]))
# 基于用户和搜索词进行分组统计
rdd_res = rdd_user_search.map(lambda user_sarch: (user_sarch, 1)).reduceByKey(lambda agg, curr: agg + curr)
rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False)
return rdd_sort.take(30)
def xuqiu3(rdd_map):
# 提取小时
rdd_hour = rdd_map.map(lambda line: line[0].split(':')[0])
rdd_res = rdd_hour.map(lambda line: (line, 1)).reduceByKey(lambda agg, curr: agg + curr)
rdd_sort = rdd_res.sortBy(lambda line: line[1], ascending=False)
return rdd_sort.take(50)
if __name__ == '__main__':
print('搜狗案例')
# 需求-:统计每个关键词出现了多少次
# 需求二:统计每个用户每个搜索词点击的次数
# 需求三:统计每个小时点击次数
# 1- 创建sparkcontext对象
conf = SparkConf().setAppName('sougou').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2- 读取外部数据
rdd_init = sc.textFile('file:///export/data/workspace/pyspark_parent/02_pyspark_core/data/SogouQ.sample')
# print(rdd_init.count())
# 3- 过滤数据:保证数据不为空,并且字段为6个
rdd_filter = rdd_init.filter(lambda line : line.strip() != '' and len(line.split()) == 6)
# print(rdd_filter.count())
# 4- 对数据进行切割放入元组 一行一个元组
rdd_map = rdd_filter.map(lambda line : (
line.split()[0],
line.split()[1],
line.split()[2][1:-1],
line.split()[3],
line.split()[4],
line.split()[5]
))
# ---------------设置检查点------------------#
sc.setCheckpointDir('/spark/Checkpoint')
rdd_map.count()
# 5- 统计分析
# 5.1需求一:统计每个关键词出现了多少次
xuqiu1(rdd_map)
# 5.2 需求二:统计每个用户每个搜索词点击的次数
# 清理缓存
rdd_map.unpersist().count()
xuqiu2(rdd_map)
# 5.3 需求三:统计每个小时点击次数
xuqiu3(rdd_map)
# time.sleep(1000)
5.3 面试题:RDD缓存和检查点
区别:
区别一:存储位置
** 缓存**:会将RDD的结果数据缓存到内存或者磁盘,或者外内存** 检查点**:会将RDD的结果数据存储到HDFS(默认),当然也支持本地存储(仅在1oca1模式,但如果是1oca]模式,检查点无所谓)
区别二:依赖关系
** 缓存**:由于缓存存储是一种临时存储,所以缓存不会截断掉依赖关系,以防止缓存丢失后,进行回溯计算** 检查点**:会截断掉依赖关系,因为检查点方案认为存储数据是可靠的,不会丢失
区别三:生命周期
缓存:当整个程序执行完成后(一个程序中是包含多个J0B任务的),会自动清理掉缓存数据,或者也可以在程序运行中手动清理** 检查点**:会将数据保存到HDFS中,不会自动删除,即使程序停止了,检查点数据依然存在,只能手动删除数据(会永久保存)
请问: 在实际使用中,在Spark程序中,是使用缓存呢 还是检查点呢
会将两种方案都作用于程序中,一般是先设置检查点,然后设置缓存
- 共享变量
6.1 广播变量
广播变量:
目的:减少Driver和executor之间网络数据传输数据量,以及减少内存的使用从而提升效率** 适用于**:多个Task线程需要使用到同一个变量的值的时候(这个概念:闭包)
默认做法:
各个线程会将这个变量形成一个副本,然后拷贝到自己的线程中,进行使用即可,由于一个executor中有多个线程,那么意味需要拷贝多次,导致executor和 Driver之间的传输量增加,对带宽有一定影响,同时拷贝了多次,对内存占用量提升解决方案:引入一个广播变量
让executor从Driver中拉取过来一个副本即可,一个executor只需要拉取一次副本,让executor中各个线程读取executor中变量即可 注意:广播变量是只读的,各个线程只能读取数据,不能修改数据
如何使用
通过sc创建一个广播变量:在Driver设置
** 广播变量对象 = sc.broadcast(值)**
获取变量:在Task获取
** 广播变量对象.value**
代码演示:
import os
import jieba
from pyspark import SparkContext, SparkConf
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('广播变量')
conf = SparkConf().setAppName("create_rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 设置广播变量
bc = sc.broadcast(10000)
# 读取数据
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 将每个数据都加上指定值,委用广播变量
rdd_res = rdd_init.map(lambda num : num+bc.value)
rdd_res.foreach(lambda a : print(a))
6.2 累加器
累加器主要提供在多个线程中对同一个变量进行累加的操作,对于多个线程来说只能对数据进行累加,不能读取数据,读取数据的操作只能有Driver来处理
应用场景:全局累加操作
如何使用:
1-由于Driver设置一个累加器的初始值 ** 累加器对象 = sc.accumulator(初始值)**
** ** 2-由rdd(线程)来进行累加操作
**累加器对象.add(累加内容)** 3-在Driver中获取值: ** 累加器.value**
当没有累加器的时候
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
定义变量
a=10
3-处理数据:为a将列表中变量的值累加上去
rdd_map = rdd_init.map(lambda num:a + num)
当有累加器的时候
读取数据
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
定义变量,引入累加器
a=sc.accumulator(10)
3-处理数据:为a将列表中变量的值累加上去
**rdd_map = rdd_init.map(lambda num:a.add(num))
print(a.value) #65**
一个问题点:
当我们对设置过累加器的RDD,后续在进行一些其他的操作,调度多次action算子后,发现累加器被累加了多次,本应该只累加一次,这种情况是如何产生的呢?原因:当调度用多次action的时候,会产生多个J0B(计算任务),由于RDD值存储计算的规则,不存储数据,当第一个action计算完成后,得到一个结果,整个任务完成了,接下来再运行下一个job的任务,这个任务依然需要重头开始进行计算得到最终结果
这样就会 累加的操作就会被触发多次,从而被累加了多次
解决方案:
对累加器执行完的RDD 设置为缓存或者检查点,或者两个都设置,即可解决
7. Spark内核调度
Spark Job 默认的调度模式是 FIFO(先进先出)
7.1 RDD的依赖
RDD之间是存在依赖关系,这也是RDD中非常重要特性,一般将RDD之间的依赖关系划分为两种依赖关系: 窄依赖 和 宽依赖
窄依赖
目的:让各个分区的数据可以并行的计算操作
指的:上一个RDD的某一个分区的数据 被下一个RDD的某一个分区全部都继承下来
宽依赖
目的:划分stage阶段
指的:上一个RDD的分区数据被下一个RDD的多个分区所接收并处理(shuffle),我们将这种关系称为宽依赖所以说,判断两个RDD之间是否存在宽依赖,主要看两个RDD之间是否存在shuffle
说明:
在spark中,每一个算子是否存在shuffle操作,在spark设计的时候就已经确定了,比如说 map一定不会有shuffle,比如说reduceByKey一定是存在shuffle如何判断这个算子是否会走shuffle呢?
可以从查看DAG执行流程图,如果发现一执行到这个算子,阶段被分为多个,那么一定是存在shuffle,以及可以通过查看每个算子的文档的说明信息,里面也会有一定的说明
但是:在实际操作中,我们一般不会纠结这个事情,我们要以实现需求为导向,需要用什么算子的时候,我们就采用什么算子来计算即可,虽然说过多的shuff1e操作,会影响我们的执行的效率,但是依然该用的还是要用的
7.2 DAG和Stage
DAG:有向无环图 oozie里面的调度流程 DAG
如何形成一个DAG执行流程图呢?
- 1-Driver会将所有的RDD基于依赖关系,形成一个stage的流程图
- 2-对整个RDD从后往前进行回溯,当遇到RDD之间依赖关系为宽依赖的时候,自动分为一个阶段,如果是窄依赖,合并到一个阶段中
- 3-当整个回溯全部完成后,形成了DAG的执行流程图
7.3 RDD的shuflle
spark中shuffle历史进程:
- 1-在Spark 1.1以前的版本中,整个Spark采用shuffle方案为 HASH shuffle
- 2-在Spark 1.1版本的时候,引入 Sort shuffle, 主要增加合并排序操作
- 3-在Spark 1.5 版本的时候,引入钨丝计划:优化操作,提升内存以及CPU运行
- 4-在Spark 1.6版本的时候 将钨丝计划合并到sort shuffle中
- 5-在park 2.0版本以后,删除掉 HASH shuffle,全部合并到sort shuffle中
早期:
shuffle过程:
父RDD的每个分区(线程)在生产各个分区的数据的时候,会产生与子RDD分区数量相等的文件的数量,每个文件对应一个子RDD的分区当父RDD执行完成后,子RDD 从父RDD产生的文件中,找出对应分区文件,直接拉取处理即可思考:有什么弊端呢?
父RDD产出的分区文件数量太多了,从而在HDES上产生了大量的小文件
由于文件变多了 对应磁盘I0也增大了,需要打开文件N次
子RDD拉取数据,文件数量也比较多,磁盘I0比较大,对效率有比较大的影响
优化:
经过优化后的HASH SHUFFLE. 整个生成的文件数量整体下降很多
将原来由各个线程来生成N个分区文件,变更为由executor来统一生成与下游RDD分区数量相同的文件数量即可,这样各个线程在输出数据的时候 将对应分区的数据输出到对应分区文件上即可,下游的RDD在拉取数据的时候,只需要拉取自己分区文件的数据即可
sort shuffle
sort shuffle流程:
首先父RDD的各个线程将数据分好区后写入到内存中,当内存达到一定的阈值后,就会触发溢写操作,将数据溢写到磁盘上(分批次溢写:1w),不断输出,不断的溢写 ,产生多个小文件,当整个父rdd的数据处理完成后,然后对小文件进行合并操作,形成一个最终的文件,同时每一个文件都匹配一个索引文件,用于下游的RDD在拉取数据的时候,根据索引文件快速找到相对应的分区数据
在sort shuffle中两种机制:
** 普通机制 和 bypass机制**
普通机制:带有排序操作
首先父RDD的各个线程将数据分好区后写入到内存中,当内存达到一定的阈值后,就会触发溢写操作,将数据溢写到磁盘上(分批次溢写:1w),在溢写的过程中,会对数据进行排序操作 不断输出,不断的溢写 ,产生多个小文件,当整个父rdd的数据处理完成后,然后对小文件进行合并操作,形成一个最终的文件,狂形成的时候同样也会对数据进行排序操作,同时每一个文件都匹配一个索引文件,用于下游的RDD在拉取数据的时候,根据索引文件快速找到相对应的分区数据
bypass机制:不含排序 并不是所有的sort shuffle都可以走bypass
满足以下的条件:
1-上游的RDD的分区数量要小于200
2-上游不能执行提前聚合的操作
执行bypass机制,由于没有了排序的操作,整个执行效率要高于普通机制
7.4 JOB的调度流程
Driver底层调度方案:
- 1- 当启动spark应用的时候,首先执行main函数,创建一个 Sparkcontext对象,当这个对象的创建的时候,底层还同时构建DAGschedule和 Taskschedule
- 2-当spark发现后续的代码有action算子后,就会立即触发任务的执行,生成一个J0B任务,一个action就会触发一个Job任务
- 3-触发任务后,首先由Driver负责任务分配工作(DAG流程图,stage划分,每个stage需要运行多少个线程,每个线程需要在那个executor上 - 3.1 首先由Driver中DAGSchedule执行,主要进行DAG流程图的生成,以及划分stage,并且还会划分出每个stage阶段需要运行多少个线程,并将每个阶段的线程封装到一个Taskset的列表中,有多少个阶段,就会产生多少个Taskset,最后将Taskset传递给Taskscheduler- 3.2 接下来由Taskscheduler来处理,根据Taskset中描述的线程的信息,将线程执行任务发送给executor来执行,尽可能保证每一个分区的Task运行在不同的executor上,确保资源最大化,整个资源申请都是由Taskscheduler申请的
7.5 spark的并行度
Spark的并行度是决定Spark执行效率非常重要因素,一般可以说并行度越高,执行效率越高,
前提资源足够
**在Spark中并行度主要取决于以下两个因素: **
- 资源因素:由提交任务时候,所申请的executor的数量以及CPU核数和内存来决定
- 数据因素:数据的大小,对应分区数量以及Task线程
- 当申请的资源比较大的时候,如果数据量不大,这样虽然不会影响执行的效率,但是会导致资源的浪费
- 费当并行度不高的时候,但是数据量比较大,会导致没有相应资源来运行,本应该可以并行执行的操作,变成了串行执行
如何调整并行度呢?
调整的标准:在合适的资源上,运行合适的任务 产生合适的并行度 除了可以给出一些经验值以外,更多还需要我们不断的调试 建议值:一个CPU核数上运行2~3个线程 一个CPU对应内存大小为 3~5GB
可以通过这个参数设置spark的并行度,此并行度主要是决定经过shuffle后,分区的数量
7.6 了解combinerByKey
combinerByKey是一个非常底层的算子,是 aggregateByKey底层实现:
整体关系
combinerByKey --> aggregateBykey --> flogByKey --> reduceBykey
使用格式:
combinerBykey(fnl,fn2,fn3)参数1:fn1设置初始值 参数2:fn2对每个分区执行函数 参数3:fn3对各个分区执行完结果汇总处理
示例:
import os
from pyspark import SparkContext, SparkConf
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('combinerBykey')
conf = SparkConf().setAppName("combinerBykey").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 2- 切始化数据
rdd_init = sc.parallelize([('co1', '张三'),('c02', '李四'),('co1', '王五'),('c04', '周六'),('c02', '张四'),('co1', '李五'),('c04', '孙六')])
# 3- 处理数据
def fn1(agg):
return [agg]
def fn2(agg,curr):
agg.append(curr)
return agg
def fn3(agg,curr):
agg.extend(curr)
return agg
rdd_res = rdd_init.combineByKey(fn1,fn2,fn3)
print(rdd_res.collect())
八、Spark SQL
1. sparksql的基本概念
1.1 了解什么是Spark sQL
Spark SQL是 spark的一个模块,此模块主要用于处理结构化的数据
- Spark SQL 主要是处理结构化的数据,而Spark Core 可以处理任意数据类型
- Spark SQL中 核心的数据结构为 dataFrame: 数据(RDD)+元数据(schema)
为什么学习SparkSQL:
- SQL比较简单,会SQL的人一定比会大数据的人多:SQL更加通用
- Spark SQL可以兼容 HIVE ,可以让Spark sQL 和 hive集成,从而将执行引擎替换为Spark
- Spark SQL 不仅仅可以写SQL,还可以写代码,SQL和代码是可以共存,也可以单独使用
- Spark SQL可以处理大规模的数据,底层是基于SparkRDD
Spark SQL内特点:
- 融合性: Spark SQL中既可以编写SQL 也可以编写代码 也可以混合使用
- 统一的数据访问:使用Spark SQL 可以和各种数据源进行集成,比如 HIVE,MySQL,oracle ...…,集成后,可以使用一套Spark SoL的API来操作不同的数据源的数据
- HIVE兼容:Spark SQL 可以和 HIVE进行集成,集成后将HIVE执行引擎从MR替换为 spark,提升效率 集成核心是共享metastore
- 标准化的连接:SparkSQL 也是支持 JDBC/0DBC的连接方式,可以让各种连接数据库的工具来连接使用
1.2 spark sQL的发展史
从 2.0版本后,Spark SQL将Spark两个核心对象: dataset 和 dataframe 合二为一了,统一称为叫做 dataset,但是为了能够支持向python这样没有泛型的语言,在客户端依然保留dataframe,但是当dataFrame到达Spark后,依然会被转换为dataSet
1.3 spark sQL与hive异同
相同点
- 都是分布式SQL引擎
- Spark SOL 和 HIVE都可以处理大规模的数据
- 都是处理结构化的数据
不同点
- Spark sQL 是基于内存的选代计算, HIVE是基于磁盘的选代计算。
- HIVE仅能使用SQL来处理数据,而 Spark sQL 不仅可以使用SQL 还可以使用 DSL代码。
- HIVE提供了专门用于元数据管理的服务:metastore而 spark soL 没有元数据管理的服务,自己来进行维护。
- HIVE底层是基于MR来运行的,而sparksQL底层是基于RDD。
1.4 Spark SQL的数据结构对比
- **pandas的 dataFrame: ** 表示的是一个二维的表,仅能处理结构化的数据,单机处理操作,仅适合于处理小数据集分析
- Spark Core的RDD: 不局限于数据结构,分布式的处理引擎,可以处理大规模的数据
- Spark SQL的dataFrame: 表示的一个二维的表,仅能处理结构化的数据,可以分布式的处理,可以处理大规模的数据
在实际中:
一般如果遇到的数据集以kb MB 或者几个GB,此时可以使用pandas即可完成统计分析处理,比如财务的相关数据分析如果数据集以 几十GB 或者 TB 甚至 PB级别以上的数据集,必须使用大规模处理数据的引擎
- RDD表示的具体数据对象,一个RDD就代表一个数据集
- dataFrame:是将RDD中对象中各个属性拆解出来,形成一列列的数据,变更为一个二维的表
- dataSet:是在dataFrame的基础上,加入了泛型的支持,将每一行的数据,使用一个泛型来表示
从Spark SQL 2.0开始,整个Spark sQL只有一种数据结构:dataset
但是由于spark SQL需要支持多种语言的开发的工作 有一些语言并不支持泛型,所以spark SQL为了能够让这些语言对接spark所以在客户端依然保留了dataFrame的接口,让其他无泛型的语言使用dataFrame接口来对接即可,底层会将其转换为datasetSQL
2. Spark SQL的入门案例
2.1 Spark SQL的统一入口
从Spark SQL开始,需要将核心对象,从SparkContext切换为Spark Session对象
Spark session对象是spark2.0后推出一个全新的对象,此对象将会作为spark整个编码入口对象,此对象不仅仅可以操作spark SQL还可以获取到Sparkcontext对象,用于操作Spark core代码
构建saprkSession
import os
from pyspark.sql import SparkSession
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('pyspark构建Sparksession')
# 1- 创建SparkSession对象
spark = SparkSession.builder.master('local[*]').appName('create_sparkSession').getOrCreate()
print(spark)
2.2 Spark SQL入门案例
文件内容
需求: 请将年龄大于20岁的数据获取出来
import os
from pyspark.sql import SparkSession
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('sparksql_案例1')
# 1- 创建SparkSession对象
spark = SparkSession\
.builder\
.master('local[*]')\
.appName('init_pyspark_sql')\
.getOrCreate()
# 2- 读取外部文件的数据
df_init = spark.read.csv(path='file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/stu.csv'
,header=True #是否有表头
,sep=',' #分隔符
,inferSchema=True #自动识别字段类型
) # dataframe
# 3- 获取数据
df_init.show()
# 获取元数据信息
df_init.printSchema()
# 4- 需求: 请将年龄大于20岁的数据获取出来 :DSL写法
df_where = df_init.where('age > 20')
df_where.show()
# SQL写法:
df_init.createTempView('t1')
spark.sql('select * from t1 where age>20').show()
3. DataFrame详解
3.1 DataFrame基本介绍
dataFrame表示的是一个二维的表,既然是一个表,那么应该有 字段名字,字段的类型,数据
dataFrame中,主要由 structType和structField 和 ROW来组成的
其中:
StructType: 其实dataFrame中表示schema元数据信息的核心对象
StructField: 表示字段的对象,一个structType中可以有多个structField,类似于一个表中可以有多个列 ,涵盖三个部分的内容: 字段名称,字段的类型,字段数据是否可以为空**ROW**:行,表示的行数据,每一行的数据就是一个ROw对象 **column**:-列数据 含列信息和列数据
如何构建一个schema元数据信息:
3.2 DataFrame的构建方式
方式一:通过RDD转换为dataFrame对象
import os from pyspark.sql import SparkSession from pyspark.sql.types import * os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3' if __name__ == '__main__': print('方式一:通过RDD转换为dataFrame') # 1- 创建sparksession对象 spark = SparkSession.builder.master('local[*]').appName('create_df').getOrCreate() # 从spark中获取sparkContext对象 sc =spark.sparkContext # 2- 读取数据获取RDD rdd_init = sc.parallelize([('c01','张三',19),('c02','王麻子',29),('c03','赵六',16),('c02','王五',25),('c01','李四',39)]) # 3- 通过rdd将c01的数据过滤 rdd_filter = rdd_init.filter(lambda line:line[0] != 'c01') # print(rdd_filter.collect()) # 4- 将RDD转换为DF # 4.1 方案1 schema = StructType()\ .add('id',StringType(),True) \ .add('name', StringType(), False) \ .add('age',IntegerType()) df_init = spark.createDataFrame(rdd_filter,schema=schema) # 4.2 方案2 df_init = spark.createDataFrame(rdd_filter, schema=['id','name','age']) # 4.3 方案3 df_init = rdd_filter.toDF() # 4.4 方案4 df_init = rdd_filter.toDF(schema=schema) # 4.5 方案4 df_init = rdd_filter.toDF(schema=['id','name','age']) # 打印结果 df_init.printSchema() df_init.show()
** 作用:**
进行清洗转换处理的操作,将其转换为结构化的数据,然后将RDD转换DF 通过spark SQL来处理
当我们需要读取的数据是一种半结构化的数据,但是希望使用spark SQL来处理,此时可以先使用spark core来读取数据,将数据
方式二:通过pandas的DF对象 转换为 spark SQL的DF对象
import os
import pandas as pd
from pyspark.sql import SparkSession
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('演示pandas DF 转换为 spark sQL DF')
# 1- 创建SparkSession对象
spark = SparkSession\
.builder\
.master('local[*]')\
.appName('init_pyspark_sql')\
.getOrCreate()
# 2- 构建pandas DF对象
pd_df = pd.DataFrame({'id':['c01','c02'],'name':['张三','李四']})
# print(pd_df)
# 3- 将pd_df 转换为 spark SQL 的df
spark_df = spark.createDataFrame(pd_df)
spark_df.show()
当处理的数据源比较特殊的时候,比如 Exce1格式,此时可以通过pandas的DF直接读取,然后转换为 spark的DF进行处理,从而扩充spark的读取的数据源
方式三:通过读取外部的文件的方式
通用完整的格式:
** sparksession对象.read.format('text l csv l json l parquet l orc l avro l jdbc l....')**** .option('参数的key',参数的值') # 可选项:读取的格式不同 参数也不同**
** .load('指定读取数据的路径:支持本地和HDFS')**
**读取CSV **
spark = SparkSession.builder.master('local[*]').appName('create_df').getOrCreate()
# 2- 读取数据
# sep参数:没置csv文件中字版的分隔符号 默认 ,
# header参数:设置csv是否含有头信息 默认为 false
# inferSchema参数 用于让程序自动推断字段的类型默认为false默认所有的类型都是string
# encoding参数: 设置对应文件的字符集默认为 UTF-8
df_init = spark.read\
.format('csv')\
.option('sep',',') \
.option('header', True) \
.option('inferSchema', True) \
.option('encoding', 'UTF-8') \
.load('file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/stu.csv')
df_init.show()
读取 text方式
# 1- 创建sparksession对象
spark = SparkSession.builder.master('local[*]').appName('create_df').getOrCreate()
# 2- 读取数据
# #用text的方式来读取数据,仅支持产生一列数据,默认列名为value,
# 当然可以通过schema修改列名
df_init = spark.read\
.format('text')\
.schema(schema='id String')\
.load('file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/stu.csv')
df_init.show()
方式四:采用JSON方式来读取数据
# 1- 创建sparksession对象
spark = SparkSession.builder.master('local[*]').appName('create_df').getOrCreate()
# 2- 读取数据
df_init = spark.read\
.format('json')\
.load('file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/people.json')
df_init.show()
简写
上述所有方式均可简写
其他的类似:
** spark.read.text**
** json**
** parquet**
** Orc
jdbc**# 1- 创建sparksession对象 spark = SparkSession.builder.master('local[*]').appName('create_df').getOrCreate() # 2- 读取数据 :简单读取 df_init = spark.read.csv(path='file://',header=True,encoding='UTF-8',inferSchema=True) df_init = spark.read.json()
3.3 DataFrame的相关API
相关spark支持的所有的SQL函数
Spark SQL, Built-in FunctionsNonehttps://spark.apache.org/docs/3.1.2/api/sql/index.html
dataFrame的操作,主要支持两利方式: **DSL **和 SQL
DSL:特定领域语言
在Spark SQL中 DSL指的 dataFrame的相关的API,这些API相当简单,因为大多数的API都与SQL的关键词同名SQL :
主要通过编写SQL 完成统计分析操作
思考:在工作中,这两种方案,会使用那种呢?
- 在刚刚接触使用Spark SQL的时候,更多的使用的 SQL的方式来操作(大多数的行为)
- 从Spark官方角度来看,推荐使用DSL,因为DSL对于框架更容易解析处理,效率相对SQL来说更高效一些(本质差别不大)
- 一般编写DSL看起来要比SQL高大上一些
3.3.1 相关API
在使用dataFrame的相关API的时候,传入参数的说明:
** 在使用dataFrame的API的时候,每个API的传入的参数方式都支持多种方式**
** 字符串,列表,column对象 **
如何查看每个API支持传递那些方式呢?
以select API为例说明:
源码:*def select(self,cols): **
1- **show() **方法: 用于显示表中相关数据
一般都不设置相关的参数,直接用
2- printSchema(): 用于打印表的结构信息(元数据信息)
3- select(): 此API是用于实现 在SQL中 select后面放置的内容的
比如说: 可以放置字段,或者相关函数,或者表达式 ![](https://i-blog.csdnimg.cn/direct/9254d7cfa2d74a5c837315e9c8145121.png)
4- **filter **和 where:用于对数据进行过滤的操作
5- groupBy: 对数据执行分组
说明:**分组必聚合 **
说明: 如果想在DSL中使用SQL的函数,在spark SQL中,专门将函数放置在一个类中
1-先导入函数的核心对象:
** import pyspark.sql.functions as F**
2-使用F.函数名 即可使用
使用SQL方式来处理,必须将DF注册为一个视图(表): 支持临时视图和全局视图
spark.sql("编写SQL语句')
** df = spark.SQL()**
编写SQL语句即可
案例:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('案例')
# 1- 创建sparksession对象
spark = SparkSession.builder.master('local[*]').appName('create_df').getOrCreate()
# 2- 读取数据 :简单读取
df = spark.read.csv(
path='file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/stu.csv',
header=True,
encoding='UTF-8',
inferSchema=True
)
# 演示相关的APT:
df.printSchema()
df.show()
# select操作:查看id列和 address列
df.select('id','address').show()
df.select(df['id'],df['age']).show()
df.select([df['id'],df['age']]).show()
df.select(['id', 'address']).show()
# where filter
df.where('id>2').show()
df.where(df['id'] > 2).show()
# group by
df.groupby(df['address']).count().show()
# 统计每个地区人数及不同年龄
df.groupby('address').agg(
F.count('id').alias('cnt')
# F.count_distinct(),
# F.countDistinct('age').alias('age_cnt')
).show()
# SQL
df.createTempView('t1')
spark.sql("select address,count(id),count(distinct age) from t1 group by address").show()
3.4 案例
3.4.1 词频统计分析案例
方式一RDD 转换为 DF 方式完成
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('Wordcount案例:方式一RDD 转换为 DF 方式完成')
# 1- 创建sparksession对象 和 sc对象
spark = SparkSession.builder.master('local[*]').appName('create_df').getOrCreate()
sc = spark.sparkContext
# 2- 读取数据
rdd_init = sc.textFile('file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/word.txt')
# 3- 将数据转换为一个个单词
rdd_word= rdd_init.flatMap(lambda line:line.split()).map(lambda word:(word,))
# 4- 将数据转换为df对象
df_word = rdd_word.toDF(schema=['word'])
# 5- SQL计算
df_word.createTempView('t1')
spark.sql('select word,count(1) from t1 group by word').show()
# DSL
# withColumnRenamed:修改列表:参数1表示旧列名 参数2表示新列表
df_word.groupby(df_word['word']).count().withColumnRenamed('count','cnt').show()
df_word.groupby(df_word['word']).agg(
F.count('*').alias('cnt')
).show()
# 关闭资源
spark.stop()
方式二实现: 直接读取为DF处理 DSL SQL以及 DSL+SQL
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('Wordcount案例:方式二实现: 直接读取为DF处理 DSL SQL以及 DSL+SQL')
# 1- 创建sparksession对象 和 sc对象
spark = SparkSession.builder.master('local[*]').appName('create_df').getOrCreate()
# 2- 读取数据
df_init = spark.read.text('file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/word.txt')
# df_init.show()
# 3- 处理数据
# 纯DSL方案
# 3.1. 1首先将数期转换为一列数据,一行为一个单词 explain(列表)
df_word = df_init.select(F.explode(F.split('value',' ')).alias('word'))
# df_word.show()
df_word.groupby('word').count().show()
# 纯DSL+SQL 混合方案
# 3.2. 1首先将数期转换为一列数据,一行为一个单词 explain(列表)
df_init.createTempView('t1')
df_word1 = spark.sql('select explode(split(value," ")) as word from t1')
df_word1.groupby('word').count().show()
# 纯SQL方案
df_word1.createTempView('t2')
spark.sql('select word,count(1) as cnt from t2 group by word').show()
3.4.2 电影分析案例
字段说明
数据说明:userid ,movieid,score,datestr
字段的分隔符号为:\t
需求:
- 查询用户平均分
- 查询电影平均分
- 查询大于平均分的电影的数量
- 查询高分电影中(>3)打分次数最多的用户,并求出此人打的平均分
- 查询每个用户的平均打分,最低打分,最高打分
- 查询被评分超过100次的电影,的平均分 排名 TOP10
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
def xuqiu1(df_init):
# SQL 实现
spark.sql("""
select
userid,
round(avg(score),2) as u_avg
from t1
group by userid
order by u_avg desc
""").show()
#DSL
df_init.groupby(df_init['userid']).agg(
F.round(F.avg('score'),2).alias('u_avg')
).orderBy(F.desc('u_avg')).show()
def xuqiu2(df_init):
df_init.groupby(df_init['movieid']).agg(
F.round(F.avg('score'),2).alias('m_avg')
).orderBy(F.desc('m_avg')).show()
def xuqiu3(df_init):
# spark.sql("""
# select count(1)
# from t1 where score>
# (select avg(score)
# from t1)
# """).show()
df_avg = df_init.select(F.avg(df_init['score']).alias('s_avg'))
# df_avg.show()
df_a = df_init.where(df_init['score'] > df_avg.first()['s_avg'])
print(df_a.count())
def xuqiu4(df_init):
# SQL
# 3.3.1- 找到高分电影
df_top_movie = spark.sql("""
select
movieid,
round(avg(score),2) as avg_score
from t1
group by movieid having avg_score>3
""")
# df_top_movie.show()
df_top_movie.createTempView('t2')
# 3.3.2- 找到打分次数最多的用户
# df_top_user = spark.sql("""
# select
# t1.userid,
# count(1) as cnt
# from t2 join t1 on t1.movieid=t2.movieid
# group by t1.userid order by cnt desc limit 1
# """)
# df_top_user.createTempView('t3')
# res = spark.sql("""
# select
# round(avg(t1.score),2)
# from t1 join t3 on t1.userid=t3.userid
#
# """)
# res.show()
# DSL
df_top_movie = df_init.groupby('movieid').agg(
F.round(F.avg('score'),2).alias('m_avg')
).where('m_avg > 3')
df_u_top = df_top_movie.join(df_init,'movieid').groupby('userid').agg(
F.count('userid').alias('u_cnt')
).orderBy(F.desc('u_cnt')).select('userid').limit(1)
df_init.where(df_init['userid'] == df_u_top.first()['userid']).groupby('userid').agg(
F.round(F.avg('score'),2).alias('avg_score')
).show()
df_init.join(df_u_top,'userid').groupby('userid').agg(
F.round(F.avg('score'),2).alias('avg_score')).show()
if __name__ == '__main__':
print('电影分析案例')
# 1- 创建sparksession对象
spark = SparkSession.builder.master('local[*]').appName('create_df').getOrCreate()
# 2- 读取数据
df_init = spark.read.csv(path='hdfs://node1:8020/spark/movie_data//u.data',
sep='\t',
schema='userid string,movieid string ,score string,datestr string'
)
# df_init.printSchema()
# df_init.show()
# 3- 处理数据
df_init.createTempView('t1')
# 3.1- 查询用户平均分
xuqiu1(df_init)
# 3.2- 查询电影平均分
xuqiu2(df_init)
# 3.3- 查询大于平均分的电影的数量
xuqiu3(df_init)
# 3.3- 查询高分电影中(>3)打分次数最多的用户,并求出此人打的平均分
xuqiu4(df_init)
# 查询每个用户的平均打分,最低打分,最高打分
df_init.groupby('userid').agg(
F.round(F.avg('score'),2).alias('u_avg'),
F.min('score').alias('u_min'),
F.max('score').alias('u_max')
).show()
# 查询被评分超过100次的电影, 的平均分排名TOP10
df_init.groupby('movieid').agg(
F.count('score').alias('cnt')
).where('cnt > 100').join(df_init,'movieid').orderBy(F.desc('score')).limit(10).show()
4. Spark SQL中shuffle分区设置
spark SQL在执行的过程中,会将SQL翻译为Spark的RDD程序来运行,对于Spark SQL来说,执行的时候,同样也会触发shuffe操作,默认情况下, Spark SQL的shuffle的分区数量默认为 200个
5. Spark SQL中数据清洗的相关API
1- 去重APl: df.dropDuplicates()
说明: 当不加参数的时候,默认对数据整体进行去重,同样支持针对指定列进行去重操作
2- 删除null值数据:df.dropna()
说明: 默认支持对所有列进行判断,如果有一列的对应值为nul, 就会将为nul这一行数据全部都删除,也支持针对某些列
3- 替换null值: df.fillna()
说明: 将表中为null的数据替换为指定的值,也支持针对某些列
import os
from pyspark.sql import SparkSession
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('pyspark clear_data')
spark = SparkSession.builder.master('local[*]').appName('clear_data').getOrCreate()
df_init = spark.read.csv(
path='file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/stu.csv',
header= True,
inferSchema=True
)
df_init.show()
# 去重
df = df_init.dropDuplicates()
df1 = df_init.dropDuplicates(['name'])
# df.show()
# 删除空值
df2 = df_init.dropna()
# 至少保留3行有效
df3 = df_init.dropna(thresh=3)
# 在三列中至少有2行有效数据
df4 = df_init.dropna(thresh=2,subset=['name','address','age'])
df2.show()
# 替换空值
# 将空值替换为 aa
df_init.fillna('aa')
# 将指定列中空值替换为 aa
df_init.fillna('aa',subset=['name','address','age'])
df_init.fillna({'id':0,'name':'未知','age':0})
6. Spark SQL数据写出操作
上述的完整的API 同样也有简单写法:
**df.write.mode().输出类型() **
import os
from pyspark.sql import SparkSession
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('pyspark clear_data')
spark = SparkSession.builder.master('local[*]').appName('clear_data').config("spark.sql.shuffle.partitions",'1').getOrCreate()
df_init = spark.read.csv(
path='file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/stu.csv',
header= True,
inferSchema=True
)
# 清洗
df = df_init.dropDuplicates()
df = df.dropna()
df.show()
# 写出到文件
# 演示写出为CSV文件
# TODO 输出为目录
df.write\
.mode('overwrite')\
.format('csv')\
.option('header',True)\
.option('sep','|')\
.save('file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/stu_clear')
# 演示写出为 JSON
df.write.mode('overwrite').format('json').save('hdfs://node1:8020/sparkwrite/output2')
# 演示输出为text
df.select('id').write.mode('overwrite').format('text').save('hdfs://node1:8020/sparkwrite/output2')
print('完成')
spark.stop()
6.1 将数据写入到 hive中
df.write.mode('append overwritelignorelerror').saveAsTable('表名','存储类型')
6.2 将数据输出到MYSQL
df.write.mode('append | overwrite | ignorel | error').format('jdbc')
**.option("url","jdbc:mysql://xxx:3306/库名?useSSL=false&useUnicode=true&characterEncoding=utf-8")****.option("dbtable","表名")**
**.option("user", "root") **
**.option("password","密码")**
.save()
可能报错
原因:
当前Spark无法找到一个适合的驱动连接MySQL解决方案:添加MySQL的驱动包,需要在以下几个位置中添加驱动包
** 1- 在python的环境中添加mysql的驱动包**
Base的pyspark库的相关的jar包路径 cd /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/ 虚拟环境:/root/anaconda3/envs/虚拟环境名称/lib/python3.8/site-packages/pyspark/jars/
2- 需要在 Spark的家目录下jars目录下添加mysql的驱动包
(spark-submit提交到spark集群或者local模式需要使用) /export/server/spark/iars/
3-需要在HDFS的/spark/jars目录下添加mysql的驱动包(spark-submit提交到yarn环境的时候)
建议以上三个位置都添加建议:如果是常用的jar包
7. Pandas的相关的内容
适用场景:
数据量大到excel严重卡顿,且又都是单机数据的时候,我们使用pandaspandas用于处理单机数据(小数据集(相对于大数据来说))
在大数据ETL数据仓库中,对数据进行清洗及处理的环节使用pandas
import pandas as pd
data = pd.read_csv('file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/stu.csv')
print(data.iloc[:-1])
7.1 pandas 数据结构
7.2 series对象
7.3 dataframe 对象
7.3.1 数据读写
7.3.2 读写数据库
8. Spark SQL的函数定义
8.1 窗口函数
窗口函数格式:
分析函数 over(partition by xxx order by xxx [ascl desc] [rows between窗口范围 and 窗口范围 ])分析函数:
第一类函数: row_number() rank() dense_rank() ntile(N)
第二类函数: 与聚合函数组合使用 sum() max() min() avg() count()
第三类函数: lead() lag() frist_value() last_value()
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession,Window as win
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('pysparkSQL 窗口函数')
# 1- 创建对象
spark = SparkSession.builder.master('local[*]').appName('windowsSQL').getOrCreate()
# 2- 读取数据
df_init = spark.read.csv(
path='file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/pv.csv',
header=True,
sep=',',
inferSchema=True
)
df_init.printSchema()
df_init.show()
# 3-处理数据操作: 演示窗口函数
df_init.createTempView('pv_t1')
#3.1 SQL方式
spark.sql("""
select
uid,
datestr,
pv,
row_number() over(partition by uid order by pv desc) as rank1,
rank() over(partition by uid order by pv desc) as rank2,
dense_rank() over(partition by uid order by pv desc) as rank3
from pv_t1
""").show()
# 3.2 DSL方式
df_init.select(
df_init['uid'],
df_init['datestr'],
df_init['pv'],
F.row_number().over(win.partitionBy('uid').orderBy(F.desc('pv')) ).alias('rank1'),
F.rank().over(win.partitionBy('uid').orderBy(F.desc('pv'))).alias('rank2')
).show()
8.2 自定义UDF函数
8.3 Spark原生自定义UDF函数
第一步:根据也要功能要求,定义一个普通的Python的函数:
第二步:将这个python的函数注册到Spark SQL中:
往册方式有以下二种方案
** udf对象 = sparksession.udf.register(参数1,参数2,参数3)**
** ** 参数1:udf函数的函数名称,此名称用于在S0L风格中使用
参数2:需要将那个python函数注册为udf函数 参数3:设置python函数返回的类型
** udf对象 = F.udf(参数1,参数2) **(仅适用于DSL中)
参数1:需要将那个python函数注册为udf函数 参数2:设置python函数返回的类型 语法糖写法: **@F.udf(参数2)** 底层走的是装饰器
- 第三步:在SQL或者 DSL中使用即可
import os
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession,Window as win
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('pysparkSQL 自定义UDF函数')
# 1- 创建对象
spark = SparkSession.builder.master('local[*]').appName('clear_data').getOrCreate()
# 2- 读取数据
df = spark.read.csv(
path='file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/student.csv',
inferSchema=True,
header=True
)
# df.show()
df.createTempView('t1')
# 3- 处理数据
# 需求: 请在name名称后面添加一个_itcast
# 3.1 自定义一个python函数,完成主题功能
def concat_udf(name:str) -> str:
name = name+'_itcast'
return name
# 3.2 将函数注册给sparkSQL
# 方式一:
concat_udf_DSL = spark.udf.register('concat_udf_sql',concat_udf,StringType())
# 3.3 使用自定义函数
# SQL中使用
spark.sql("""
select
id,
concat_udf_sql(name) as name,
age
from t1
""").show()
# DSL使用
df.select(
df['id'],
concat_udf_DSL(df['name']).alias('name'),
df['age']
).show()
# 方式二:
concat_udf_DSL2 = F.udf(concat_udf,StringType())
df.select(
df['id'],
concat_udf_DSL2(df['name']).alias('name'),
df['age']
).show()
# 语法糖
@F.udf(returnType=StringType())
def concat_udf1(name: str) -> str:
name = name + '_itcast'
return name
df.select(
df['id'],
concat_udf1(df['name']).alias('name'),
df['age']
).show()
演示返回类型为 字典/列表
import os
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession,Window as win
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('pysparkSQL 自定义UDF函数 返回字典列表')
# 1- 创建对象
spark = SparkSession.builder.master('local[*]').appName('clear_data').getOrCreate()
# 2- 读取数据
df = spark.read.csv(
path='file:///export/data/workspace/pyspark_parent/03_pyspark_sql/data/user.csv',
inferSchema=True,
header=True
)
# df.printSchema()
# df.show()
df.createTempView('t1')
# 3- 处理数据
# 需求:自定义函数 请将line字殷切割开,将其转换姓名 地址年龄
# 自定义函数
def split_3col1(line):
return line.split("|")
def split_3col2(line):
arr=line.split("|")
return {'name':arr[0],'address':arr[1],'age':arr[2]}
# -------------------------------------列表方式------------------------------------------
# 3.2 注册
# 对于返回列表的注册方式
# 方式1
schema = StructType().add('name',StringType()).add('address',StringType()).add('age',StringType())
split_3col1_dls = spark.udf.register('split_3col1_sql', split_3col1,schema )
# 3.3 使用自定义函数
spark.sql("""
select
userid,
split_3col1_sql(line)['name'] as name,
split_3col1_sql(line)['address'] as address,
split_3col1_sql(line)['age'] as age
from t1
""").show()
# ----------------------------------------字典方式---------------------------------------
schema1 = StructType().add('name', StringType()).add('address', StringType()).add('age', StringType())
split_3col2_dsl = F.udf(split_3col2,schema1)
# DSL
df.select(
'userid',
split_3col2_dsl('line')['name'].alias('name'),
split_3col2_dsl('line')['address'].alias('address'),
split_3col2_dsl('line')['age'].alias('age')
).show()
8.4 基于pandas的Spark UDF函数
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")
8.4.1 如何基于arrow完成pandas DF与 Spark DF的互转操作
如何将pandas的DF对象 转换 spark的DF,以及如何从spark df 转换为 pandas的 df对象
请注意:开启arrow方案,必须先安装arrow,否则无法使用,一执行就会报出:
no module'pyarrow'(没有此模板
import os
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession,Window as win
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('pysparkSQL pandas spark df 转换')
# 1- 创建对象
spark = SparkSession.builder.master('local[*]').appName('clear_data').getOrCreate()
# 2-创建pandas的DF对象:
pd_df = pd.DataFrame({'name':['张三','李四','王五'],'age':[12,45,23]})
# 3 - 可以使用panda的API米对数据进行处理操作
pd_df_fil = pd_df[pd_df['age']>20]
# 4-将其转换为Spark DF
spark_df=spark.createDataFrame(pd_df_fil)
spark_df.printSchema()
spark_df.show()
spark_df_sum = spark_df.select(F.sum('age').alias('sum_age'))
# 4-将Spark DF转换为 pandas DF
pd_df_new = spark_df_sum.toPandas()
pd_df_new_fil = pd_df[pd_df['age']>40]
print(pd_df_new_fil)
8.5 pandas 自定义UDF函数
方式一:** series To series**
描述: 定义一个python的函数,接收series类型,返回series类型,接收一列返回一列O
目的: 用于定义 pandas的UDF函数import os import pandas as pd import pyspark.sql.functions as F from pyspark.sql.types import * from pyspark.sql import SparkSession,Window as win os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3' if __name__ == '__main__': print('pysparkSQL pandas spark df 转换') # 1- 创建对象 spark = SparkSession.builder.master('local[*]').appName('clear_data').getOrCreate() # 2-构建数据 schema = StructType().add('a',IntegerType(),False).add('b',IntegerType(),False) pd_df = spark.createDataFrame([(1,2),(3,4),(5,6)],schema=schema) pd_df.createTempView('t1') # 处理数据 # 需求:基fpandas的UDF 完成对a和b列乘积计算 # #3.1 自定义一个python的函数:传入series类型,返回series类型 @F.pandas_udf(returnType=IntegerType()) #装饰器将pandas的函数转换为spark SQL的函数 def pd_cj(a:pd.Series,b:pd.Series) -> pd.Series: return a*b # 注册函数 # 方式一 pd_cj_DSL = spark.udf.register('pd_cj_sql',pd_cj) # 使用函数 # SQL spark.sql(""" select a, b, pd_cj_sql(a,b) as a_b from t1 """).show() # DSL pd_df.select('a','b',pd_cj_DSL('a','b').alias('a_b')).show()
8.6 pandas 自定义UDAF函数
从series类型 到 标量(python基本数据类型):
描述: 定义一个python函数,接收series类型的数据, 输出为标量,用于定义 UDAF函数
import os
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession,Window as win
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('pysparkSQL pandas spark df 转换')
# 1- 创建对象
spark = SparkSession.builder.master('local[*]').appName('clear_data').getOrCreate()
# 2-构建数据
schema = StructType().add('a',IntegerType(),False).add('b',IntegerType(),False)
pd_df = spark.createDataFrame([(1,2),(3,4),(5,6)],schema=schema)
pd_df.createTempView('t1')
# 处理数据
# 需求: pandas的UDAF需求 对B列求和
# 3.1 创建python的函数:按收series类型,输出基本数据类型
@F.pandas_udf(IntegerType())
def pd_b_sum(b:pd.Series) -> int:
return b.sum()
# 3.2 注册函数
pd_b_sum_dsl = spark.udf.register('pd_b_sum_sql',pd_b_sum)
# 使用函数
spark.sql("""
select
pd_b_sum_sql(b) as sum_b
from t1
""").show()
九、 spark on Hive
1. 集成原理说明
说明:
HIVE的Hiveserver2本质上作用:接收SQL语句 将 SOL 翻译为 MR程序,需要使用到相关元数据的时候,连接metastore来获取即可Spark on HIVE的本质
将HIVE的MR执行引擎替换为 Spark RDD思考:对于HIVE来说,本质上将SOL翻译为 MR,这个操作是有hiveserver2来负责的,所以说spark on HIve主要目的,将Hiveserver2替换掉,将其更换为Spark所提供的Sparkserver2
认为Spark on HIVE本质:
替换掉HIVE中Hiveserver2 让Spark提供一个spark的Hiveserver2,对接metastore 从而完成将SQL翻译为Spark RDDspark集成目的:抢占HIVE的市场
所以spark后续会提供一个分布式执行引警,此引擎就是为了模拟hiveserver2,一旦模拟后,会让用户感觉跟操作原来HIVE基本上雷同的,比如说;端口号,连接的方式,配置的方式全部都一致
** 好处:**
- 1-对于Spark来说,也不需要自己维护元数据,可以利用hive的metastore来维护
- 2-集成HIVE后,整个HIVE的执行效率也提高了
2. 配置措施
2.1 连接hive
1- 创建对象
enableHiveSupport : 是否开启hive集成
spark.sql.warehouse.dir: 默认加载数据路径
hive.metastore.uris: 指定metastore服务地址
*spark = SparkSession
.builder
.master('local[]').
appName('clear_data')
.config('spark.sql.shuffle.partitions','4')
.config('spark.sql.warehouse.dir','hdfs://node1:8020/user/hive/warehouse')
.config('hive.metastore.uris','thrift://node1:9083')
.enableHiveSupport()
.getOrCreate()spark.sql('show databases').show()**
十、Spark SQL 分布式执行引擎
如何启动spark的分布式执行引擎呢?这个引擎可以理解为 spark的hiveserver2服务
cd /export/server/spark
/sbin/start-thriftserver.sh
--hiveconf hive.server2.thrift.port=10000
--hiveconf hive.server2.thrift.bind.host=node1
-hiveconf spark.sgl.warehouse.dir=hdfs://nodel:8020/user/hive/warehouse
--master local[*]
十一、Spark SQL的运行机制
1. Spark RDD中 Driver的运行机制
Spark SQL底层依然需要将SQL语句翻译为Spark RDD操作 所以,Spark sQL也是存在上述的流程的,只不过在上述流程中加入了 从spark SQL 翻译为Spark RDD的过程
2. Spark SQL的运行机制
3. 如何查看物理执行计划呢?
方式1: 通过spark thrift server的服务界面: 大概率是 4040界面
方式2: 通过SQL的命令
** explain SQL语句**
十二、综合案例
1. 新零售案例
1.1 数据描述
1.2 数据清洗
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('完成新零售的清洗需求')
# 1- 创建对象SparkSession
spark = SparkSession.builder.master('local[*]').appName('xls_clear').getOrCreate()
# 2- 读取外部数据
df_init = spark.read.csv(
path='file:///export/data/workspace/pyspark_parent/04_新零售项目/data/E_Commerce_Data.csv',
inferSchema=True,
header=True,
sep=',',
encoding='utf-8'
)
# df_init.printSchema()
# df_init.show()
# 3- 数据清洗
df_clear = df_init.where('CustomerID != 0 and Description is not null')
df_clear = df_clear.withColumn(
'InvoiceDate',
F.from_unixtime(F.unix_timestamp(df_clear['InvoiceDate'],'M/d/yyyy H:mm'),'yyyy-MM-dd HH:mm')
)
# 4- 保存数据
df_clear.write.mode('overwrite').csv(
path='hdfs://node1:8020/xls/output',
header=True,
sep='\001'
)
# 5- 关闭连接
spark.stop()
1.3 相关需求
相关的需求(DSL和SQL):
- 客户数最多的10个国家
- 销量最高的10个国家
- 各个国家的总销售额分布情况
- 销量最高的10个商品
- 商品描述的热门关键词Top300
- 退货订单数最多的10个国家
- 月销售额随时间的变化趋势
- 日销量随时间的变化趋势
- 各国的购买订单量和退货订单量的关系
- 商品的平均单价与销量的关系
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('完成新零售的分析需求')
# 1- 创建对象SparkSession
spark = SparkSession.builder.master('local[*]').appName('xls_clear').getOrCreate()
# 2- 读取外部数据
df_init = spark.read.csv(
path='hdfs://node1:8020/xls/output',
inferSchema=True,
header=True,
sep='\001',
encoding='utf-8'
)
# df_init.printSchema()
df_init.createTempView('t1')
# 2- 需求分析
# 客户数最多的10个国家
df_init.groupby('Country').agg(
F.countDistinct('CustomerID').alias('country_Customer_cnt')
).orderBy(F.desc('country_Customer_cnt')).limit(10).show()
# 销量最高的10个国家
df_init.groupby('Country').agg(
F.sum('Quantity').alias('country_Quantity_cnt')
).orderBy(F.desc('country_Quantity_cnt')).limit(10).show()
# 各个国家的总销售额分布情况
df_init.withColumn(
'TotalSales', F.col('Quantity') * F.col('UnitPrice')
).groupBy('Country').agg(
F.sum('TotalSales').alias('Country_TotalSales')
).orderBy(F.desc('Country_TotalSales')).show()
#
# df_init.groupby('Country').agg(
# F.sum(F.col('Quantity') * F.col('UnitPrice')).alias('TotalSales')
# ).orderBy(F.desc('TotalSales')).show()
# 销量最高的10个商品
df_init.groupby('StockCode').agg(
F.sum('Quantity').alias('StockCode_Quantity_cnt')
).orderBy(F.desc('StockCode_Quantity_cnt')).limit(10).show()
# 商品描述的热门关键词Top300
# df_init.groupby('Description').agg(
# F.count('Description').alias('Description_cnt')
# ).orderBy(F.desc('Description_cnt')).limit(300).show()
df_init.withColumn('words',F.explode(F.split('Description',' '))).groupBy('words').agg(
F.count('words').alias('cnt_words')
).orderBy('cnt_words').limit(300).show()
# 退货订单数最多的10个国家
df_init.where('Quantity <0').groupby('Country').agg(
F.sum('Quantity').alias('country_Quantity_cnt')
).orderBy('country_Quantity_cnt').limit(10).show()
# 月销售额随时间的变化趋势
print('7')
df_init.groupby(F.month('InvoiceDate')).agg(
F.sum(F.col('Quantity') * F.col('UnitPrice')).alias('TotalSales')
).orderBy('month(InvoiceDate)').show()
# 日销量随时间的变化趋势
df_init.groupby(F.dayofmonth('InvoiceDate')).agg(
F.sum(F.col('Quantity') * F.col('UnitPrice')).alias('TotalSales')
).orderBy('dayofmonth(InvoiceDate)').show()
print('8')
# 各国的购买订单量和退货订单量的关系
spark.sql("""
select
Country,
count(distinct InvoiceNO) as cnt_oid,
count( distinct if(InvoiceNO like 'C%',InvoiceNO,null)) as c_cnt_oid
from t1
group by Country
""").show()
df_init.groupby('Country').agg(
F.countDistinct('InvoiceNO').alias('cnt_oid'),
F.countDistinct(F.expr("if(InvoiceNO like 'C%',InvoiceNO,null)")).alias('c_cnt')
).show()
print('9')
# 商品的平均单价与销量的关系
df_init.groupby('UnitPrice').agg(
F.count('Quantity').alias('TotalQuantity')
).orderBy(F.desc('TotalQuantity')).show()
2. 在线教育
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DIRVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print('完成教育分析需求')
# 1- 创建对象SparkSession
spark = SparkSession.builder.master('local[*]').appName('xls_clear').getOrCreate()
# 2- 读取外部数据
df_init = spark.read.csv(
path='file:///export/data/workspace/pyspark_parent/05_教育项目/data/eduxxx.csv',
inferSchema=True,
header=True,
sep='\t',
encoding='utf-8'
)
# df_init.printSchema()
# df_init.show()
df_init.createTempView('t1')
# 3- 处理数据
# 需求1 :找到TOP50热点题对应科目.然后统计这些科目中, 分别包含几道热点题目
# sql
# 1: 找到TOP50热点题
# df_top50 = spark.sql("""
# select
# question_id,
# sum(score) as total_question
# from t1
# group by question_id order by total_question desc limit 50
# """)
# df_top50.createTempView('top_50')
# # 2: 热点题对应科目,根据学科分组统计每个学科下有几道热点题
# spark.sql("""
# select
# subject_id,
# count(distinct top_50.question_id) as sub_cnt
# from top_50 join t1 on top_50.question_id = t1.question_id
# group by t1.subject_id
# """).show()
# dsl
df_top501 = df_init.groupby('question_id').agg(
F.sum('score').alias('total_question')
).orderBy(F.desc('total_question')).limit(50)
df_top501.join(df_init,'question_id').groupby('subject_id').agg(
F.countDistinct('question_id').alias('sub_cnt')
).show()
# 需求二: 各科目推荐题分析
# 要求: 找到Top20热点题对应的推荐题目,然后找到推荐题目对应的科目,并统计每个科目分别包含推荐题目的数量
df_top20 = df_init.groupby('question_id').agg(
F.sum('score').alias('total_question')
).orderBy(F.desc('total_question')).limit(20)
df_tuijian = df_top20.join(df_init,'question_id').select(F.explode(F.split('recommendations',',')).alias('question_id'))
df_tuijian.join(df_init,'question_id').groupby('subject_id').agg(
F.countDistinct('recommendations').alias('recommendations_cnt')
).show()
完结撒花
版权归原作者 叫我王富贵i 所有, 如有侵权,请联系我们删除。