0


基于 RDD 的分布式数据处理实验(pyspark)

基于 RDD 的分布式数据处理实验

文章目录

一 实验目的

  1. 掌握 Spark 的 RDD 读写、转换、动作、持久化等基本操作
  2. 熟悉使用 RDD 解决实际具体问题的方法

二 实验环境

  1. Spark 分布式集群
  2. Python3.7 环境
  3. jupyter notebook

三 实验内容及要求

  1. 配置 jupyter notebook 与 spark 集群的连接
  2. RDD 的基本算子操作

四 实验步骤

4.1 配置 jupyter notebook 与 spark 集群的连接

4.1.1 在master节点安装Ananconda

Anaconda安装包链接:百度云提取码2022

首先在本地下载好安装包,通过以下命令将其上传到master节点的/usr/loacl目录下:

docker cp C:/Users/Fuwenshuai/Desktop/Anaconda3-2020.02-Linux-x86_64.sh master:/usr/local

image-20221005135404984

登录到master节点,访问/usr/local目录可以看到安装包已经上传成功。

image-20221005140540528

接下来就是安装,在该目录下,直接通过bash命令运行安装包,由于默认直接运行的话会安装在root目录下,这里在安装之前直接通过命令指定安装位置在/opt/app下

bash ./Anaconda3-2020.02-Linux-x86_64.sh -p /opt/app/anaconda3 -u

运行完毕bash命令之后,一直点击回车,遇到选项选择yes即可,可以看到这里边的安装位置就是我们指定好的位置:

image-20221005152739687

下边显示已经安装成功。

image-20221005152820765

接下来配置环境变量进入配置文件:输入以下命令:

vim ~/.bashrc 

在文件后边增加如下内容,其中/opt/app/anaconda3 为为 anaconda 的安装目录

export ANACONDA3_HOME=/opt/app/anaconda3 
export PATH=$ANACONDA3_HOME/bin:$PATH

image-20221006180109027

运行以下命令,使环境变量生效。

source ~/.bashrc  

在终端输入 python 命令,查看 python 命令行是否可以启动。

image-20221005142139625

在输入python之后,即跳转到了python命令行模式,尝试打印简单的“hello world”是完全没问题的

image-20221006180334272

4.1.2 配置 Jupyter 和 spark 连接

①Jupyter 配置文件

使用 jupyter notebook --generate-config 命令,生成 jupyter 配置文件

image-20221005142435235

此时在当前用户的主目录下会生成.jupyter 文件夹,里面包含了 jupyter 的配置文件,cd 到该目录下 :

cd ~/.jupyter

image-20221005142625348

此时再次输入python命令进入 python 命令行,执行下面的命令来设置密码:

from notebook.auth import passwd 
passwd()

image-20221005153351472

在这里我们会输入和验证密码,保存生成的密钥字符串,‘sha1:e298e8b040ca:366be3d9739685a83d322f27401b1046f0841e36’,这个字符串其实就是加密后的密码。

之后,ctrl-D退出python命令行,编辑该目录下的 jupyter_notebook_config.py 文件

vim jupyter_notebook_config.py 

在该文件下写入一下内容:

# 允许所有IP登录
c.NotebookApp.ip='*'# 使用刚刚生成的sha1值
c.NotebookApp.password='sha1:e298e8b040ca:366be3d9739685a83d322f27401b1046f0841e36'# 是否自动打开浏览器
c.NotebookApp.open_browser =False# 允许使用root用户登录
c.NotebookApp.allow_root =True# 设置访问jupyter notebook的端口为4040
c.NotebookApp.port =4040
c.NotebookApp.notebook_dir ='/opt/app/anaconda3/jupyternotebook'

image-20221005154028474

其中,c.NotebookApp.notebook_dir = ‘/opt/app/anaconda3/jupyternotebook’ 这行用于设置 Notebook 启动进入的目录,由于该目录还不存在,所以需要在终端中执行如下命令创建:

cd /opt/app/anaconda3 
mkdir jupyternotebook 

image-20221005153638590

②运行 jupyter notebook

在终端输入:jupyter notebook,运行 jupyter

image-20221005154510097

如果在上述配置 jupyter_notebook_config.py 文件时,将c.NotebookApp.allow_root权限设置为了False,就会报出下边的错误,因此在上述配置时就将登录权限设置为True

image-20221005150732740

接着在本地浏览器中输入localhost:4040,这个4040就是上边设置的访问端口,进入 jupyter。 第一次访问会要求输入密码,就是上边自己设置的密码。现在已经可以访问jupyter了,接下在就是与spark连接。

image-20221005160235823

4.1.3 配置 Jupyter Notebook 实现和 PySpark 交互

这里实现交互需要用到Py4J ,Py4J是一个用 Python 和 Java 编写的库。通过 Py4J,Python 程序 能够动态访问 Java 虚拟机 中的 Java 对象,Java 程序 也能够回调 Python 对象。可以根据需求点击下载网址 下载需要的版本,如果没有严格的选择也可以直接使用spark内自带的Py4J。位置在spark安装目录下的/python/lib/文件夹内,zip格式的压缩包,直接使用即可。

image-20221006203544963

接着来编辑 master 节点的环境变量

vim ~/.bashrc 

输入如下内容:

export PYSPARK_PYTHON=/opt/app/anaconda3/bin/python3 
export PYSPARK_DRIVER_PYTHON=/opt/app/anaconda3/bin/jupyter 
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9.5-src.zip:$PYTHONPATH 
export PYSPARK_DRIVER_PYTHON_OPTS="jupyter-notebook --no-browser --ip 0.0.0.0 --port=4040 --allow-root"

image-20221006203325645

最后,保存退出,运行 source ~/.bashrc,使文件生效。

然后,进入到 anaconda 的安装目录 中的 bin 目录下,再执行 pyspark,启动 jupyter。

image-20221005163258515

接着在本地浏览器打开localhost:4040,即可使用jupyter实现和pysaprk交互编程。

image-20221005164108711

4.2 RDD基本算子操作

4.2.1 通过并行集合创建RDD

from pyspark import SparkContext
#其中"local" 是指让Spark程序本地运行,"test" 是指Spark程序的名称(为了直观明了的查看,最好设置有意义的名称)
sc = SparkContext("local","test")
data=[1,2,3,4,5,6,7,8]#调用SparkContext的parallelize方法,在Driver中一个已经存在的列表上创建
rdd = sc.parallelize(data)#collect 在驱动程序中将数据集的所有元素作为数组返回;(属于行动操作)注意用collect方法的时候,数据集不能过大print(rdd.collect())

image-20221006204319887

4.2.2 从本地文件系统中加载数据创建RDD

使用/usr/local/data目录下的wordcount.txt数据为例,数据如下:

image-20221006205959506

from pyspark import SparkContext
sc = SparkContext("local","test")#textFile()方法来从文件系统中加载数据创建RDD该方法把文件的URI作为参数,这个URI可以是:本地文件系统的地址(file:///)#或者是分布式文件系统HDFS的地址(hdfs://)
rdd1=sc.textFile("file:///usr/local/data/wordcount.txt")print(rdd1.collect())

image-20221006210902269

4.2.3 基于4.2.2创建的RDD,进行词频统计并按频数排序输出

rdd2=rdd1.flatMap(lambda x:str(x).split(' ')).map(lambda x:(x,1))
rdd3=rdd2.reduceByKey(lambda x,y:x+y).sortBy(lambda x:tuple(x)[1],False)print(rdd3.collect())

image-20221006211314723

这里边:

  • map() 是将文件每一行进行操作,数量不会改变
  • flatMap() 是将所有元素进行操作,数量只会大于或者等于初始数量
  • reduce 将 RDD 中元素前两个传给输入函数,产生一个新的 return 值,新产生的 return 值与 RDD 中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
  • reduceByKey 就是对元素为键值对的 RDD 中 Key 相同的元素的 Value 进行 reduce 操作,因此,Key 相同的多个元素的值被 reduce 为一个值,然后与原 RDD 中的 Key 组成一个新的键值对。(去键重)
  • sortByKey 按照 key 进行排序,传入参数的默认值为 true,是按照从小到大排序,也可以传入参数 false,表示从大到小排序

4.3 RDD的综合案例

4.3.1 各国恐怖主义袭击次数分析

基于 RDD 的基本操作,计算各国恐怖主义袭击次数,并利用 PyEcharts 用地图展示统计结果

(1)导入所需的库

#导入 PySpark 库from pyspark import SparkConf, SparkContext
#导入 Pandas 库import pandas as pd

(2)实例化Spark context

sc.stop()  #如果出现以下这种错,说明已经打开过一次了,先stop()

1664989641463

conf = SparkConf()
conf.set("spark.cores.max","2")
conf.setAppName('MyTry')
conf.setMaster('local')
sc = SparkContext(conf=conf)

**(3)读取本地文件 GTD,生成 RDD,并将 RDD 中的每个元素以

tab

键进行分裂**

#读取本地文件 GTD
csvFile = sc.textFile('file:///usr/local/data/GTD1.txt')#将 RDD 中的每个元素以 tab 键进行分割
csvRow = csvFile.map(lambda x: x.split('\t'))

注意:对数据进行分割想了好久,原始数据是excel文件,转化为csv后,对数据进行元素分割,虽然csv是以’,’作为分隔符的,但在该数据集中不合适,因为数据几种有描述语句,在单元格中1就有’,’,所以分割出来的是混乱的。如下图所示:

1664989641463

因此,就想到换一种特殊的符号作为分隔符,故选择以tab为分隔符的txt文件

(4)基于 RDD 操作计算各国恐怖主义袭击次数

country_list = csvRow.map(lambda x: x[8]).collect()
rdd = sc.parallelize(country_list[1:]).map(lambda x:(x,1)).reduceByKey(lambda a, b: a + b)
country_count = rdd.collect()print(country_count)

1664989947686

(5)利用 PyEcharts 进行可视化

#GTD国家名字与pyecharts地图上的国家名字对齐
unzip_country_count =list(zip(*country_count))
attr_list =list(unzip_country_count[0])for i inrange(len(attr_list)):if attr_list[i]=='Democratic Republic of the Congo':
        attr_list[i]='Dem. Rep. Congo'elif attr_list[i]=='Republic of the Congo':
        attr_list[i]='Congo'elif attr_list[i]=='South Korea':
        attr_list[i]='Korea'elif attr_list[i]=='South Sudan':
        attr_list[i]='S. Sudan'elif attr_list[i]=='Dominican Republic':
        attr_list[i]='Dominican Rep.'elif attr_list[i]=='Central African Republic':
        attr_list[i]='Central African Rep.'elif attr_list[i]=='Equatorial Guinea':
        attr_list[i]='Eq. Guinea'elif attr_list[i]=='Laos':
        attr_list[i]='Lao PDR'elif attr_list[i]=='Czech Republic':
        attr_list[i]='Czech Rep.'elif attr_list[i]=='Slovak Republic':
        attr_list[i]='Slovakia'
count_list = unzip_country_count[1]#利用地图进行可视化from pyecharts import options as opts
from pyecharts.charts import Map

defmap_base()-> Map:# 基础数据
    value = count_list
    attr = attr_list

    data =[]for index inrange(len(attr)):
        city_ionfo=[attr[index],value[index]]
        data.append(city_ionfo)

    c =(
        Map().add("世界地图",data,"world", is_map_symbol_show=False).set_series_opts(label_opts=opts.LabelOpts(is_show=False)).set_global_opts(
            title_opts=opts.TitleOpts(title="各国恐怖袭击次数统计"),
            visualmap_opts=opts.VisualMapOpts(max_=500),).render_notebook())return c

1664990104236

根据恐怖袭击在全球的分布图可得出,恐怖袭击的发生主要还是分布在非洲、中东地区,主要原因还是因为这些地区战争频发、局势动荡不安

4.3.2 恐怖袭击伤亡与地点分析

基于 RDD 的基本操作,统计恐怖袭击伤亡与地点,并利用 PyEcharts 对统计结果进行展示 。

(1)获取袭击事件发生地的经纬度

latitude_list = csvRow.map(lambda x: x[13]).collect()
longitude_list = csvRow.map(lambda x: x[14]).collect()
coordinate_list =list(zip(latitude_list, longitude_list))#获取袭击事件的ID,以及被袭击的城市
city_list = csvRow.map(lambda x: x[12]).collect()
id_list = csvRow.map(lambda x: x[0]).collect()#获取袭击中的受伤人数和死亡人数
kill_list = csvRow.map(lambda x: x[98]).collect()
wound_list = csvRow.map(lambda x: x[101]).collect()print(wound_list)

1664990379322

(2)统计每个城市受伤的人数和死亡人数

city_coord_dict =dict()
kill_pair_list =[]
wound_pair_list =[]for i inrange(len(city_list)-1):if coordinate_list[i +1][0]!=''and coordinate_list[i +1][1]!='':
        lati =float(coordinate_list[i +1][0])long=float(coordinate_list[i +1][1])
        city = city_list[i +1]
        event_id = id_list[i +1]
        kill_num = kill_list[i +1]
        wound_num = wound_list[i +1]if kill_num =='':
            kill_num =0else:
            kill_num =float(kill_num)if wound_num =='':
            wound_num =0else:print(wound_num)
            wound_num =float(wound_num)if kill_num + wound_num >10:
            city_coord_dict[city +'-'+ event_id]=[lati,long]
            kill_pair_list.append((city +'-'+ event_id, kill_num))
            wound_pair_list.append((city +'-'+ event_id, wound_num))

(3)利用pyecharts进行结果展示

from pyecharts.charts import Geo
defgeo_base()-> Geo:# 基础数据   
    g = Geo()    
    g.add_schema(maptype="world")for city, coord in city_coord_dict.items():
        g.add_coordinate(city,coord[1],coord[0])
    
    g.add('死亡数', kill_pair_list, symbol_size=5)
    g.add('受伤数', wound_pair_list, symbol_size=5)  
    g.set_series_opts(label_opts=opts.LabelOpts(is_show=False)).set_global_opts(
            title_opts=opts.TitleOpts(title="恐怖袭击伤亡与地点统计"),
            visualmap_opts=opts.VisualMapOpts(max_=100))return g.render_notebook()

1664990456848

可以看出恐怖袭击造成伤亡的主要集中地点还是在非洲和中东地区,这也说明了恐怖袭击的系数是与伤亡人数成正相关的。

4.3.3 每年恐袭事件发生的次数的变化分析

基于 RDD 的基本操作,计算每年恐袭事件发生的次数的变化,并利用 PyEcharts 对统计结果进行展示

(1)统计每年每月的恐怖袭击的次数

rdd = csvRow.map(lambda x:(x[1],x[2])).map(lambda x:(x,1)).reduceByKey(lambda a, b: a + b)
year_month_count = rdd.collect()print(year_month_count)

(2)构建以年份为 key,每个月发生的恐袭事件的次数为 value,构建一个恐怖袭击每年每月发生的字典数据集

year_month_dict =dict()for year inrange(2008,2018):
    year_month_dict[year]=[0,0,0,0,0,0,0,0,0,0,0,0]for month inrange(1,13):match=Falsefor item in year_month_count:if item[0][0]==str(year)and item[0][1]==str(month):
                year_month_dict[year][month-1]= item[1]match=Trueifmatch==False:print(year, month)

(3)利用 PyEcharts 进行结果展示

from pyecharts.charts import Bar, Timeline
defbar_base()-> Bar:
    tl = Timeline()for i inrange(2008,2018):
        bar =(
            Bar().add_xaxis(['Jan.','Feb.','Mar.','Apr.','May.','Jun.','Jul.','Aug.','Sep.','Oct.','Nov.','Dec.']).add_yaxis("恐怖袭击次数", year_month_dict[i], label_opts=opts.LabelOpts(position="right")).reversal_axis().set_global_opts(
                title_opts=opts.TitleOpts("恐怖袭击时间统计 (时间: {} 年)".format(i)),
                xaxis_opts=opts.AxisOpts(max_=1800)))
        tl.add(bar,"{}年".format(i))return tl.render_notebook()

1664990641938

4.3.4 恐怖袭击组织的袭击对象分析

基于 RDD 的基本操作,统计每个恐怖袭击组织的袭击对象,并利用 PyEcharts 对统计结果进行展示。

(1)统计恐怖袭击发生的次数以及袭击的类型

top_terro_name=['Taliban','Islamic State of Iraq and the Levant (ISIL)','Al-Shabaab','Boko Haram','Communist Party of India - Maoist (CPI-Maoist)',"New People's Army (NPA)",'Maoists',\
                'Tehrik-i-Taliban Pakistan (TTP)',"Kurdistan Workers' Party (PKK)",'Houthi extremists (Ansar Allah)','Al-Qaida in the Arabian Peninsula (AQAP)','Revolutionary Armed Forces of Colombia (FARC)'\
                ,"Donetsk People's Republic",'Muslim extremists','Al-Qaida in Iraq','Fulani extremists']
attack_count = csvRow.map(lambda x:(x[29])).map(lambda x:(x,1)).reduceByKey(lambda a, b: a + b).collect()
attack_type =[attack[0]for attack in attack_count if attack[1]>1and attack[0]!='Unknown']
target_count = csvRow.map(lambda x:(x[35])).map(lambda x:(x,1)).reduceByKey(lambda a, b: a + b).collect()
target_type =[target[0]for target in target_count if target[1]>1and target[0]notin['Unknown','Other']]
terro_target = csvRow.map(lambda x:(x[58], x[35])).filter(lambda x: x[0]in top_terro_name).filter(lambda x: x[1]notin['Unknown','Other']).map(lambda x:(x,1)).reduceByKey(lambda a, b: a + b).collect()

(2)统计恐怖袭击的对象的类型

terro_target_dict =dict()#构建攻击目标的词典
new_terro_target ={item[0]: item[1]for item in terro_target}for terro in top_terro_name:
    terro_target_dict[terro]=dict()for target in target_type:if(terro, target)in new_terro_target.keys():
            terro_target_dict[terro][target]= new_terro_target[(terro, target)]else:
            terro_target_dict[terro][target]=0print(terro_target_dict)

image-20221006212153744

(3)利用pyecharts进行结果展示

from pyecharts.charts import Pie

defpie_base()-> Pie:#饼图位置
    pie_position =[["10%","15%"],["30%","15%"],["50%","15%"],["70%","15%"],["10%","39%"],["30%","39%"],["50%","39%"],["70%","39%"],["10%","63%"],["30%","63%"],["50%","63%"],["70%","63%"],["10%","87%"],["30%","87%"],["50%","87%"],["70%","87%"],]
    p = Pie()
    i =0for terro in terro_target_dict.keys():
        data_pair =[[target, count]for target,count in terro_target_dict[terro].items()]
        data_pair.sort(key=lambda x: x[1])
        p.add(
            terro,
            data_pair=data_pair,
            radius=[40,60],
            center=pie_position[i],
            label_opts=opts.LabelOpts(is_show=False),)
        i +=1
        p.set_global_opts(title_opts=[dict(text='恐怖组织袭击对象'),dict(text='Taliban', top='11%', left='6%'),dict(text='ISIL', top='11%', left='28%'),dict(text='Al-Shabaab', top='11%', left='44%'),dict(text='Boko Haram', top='11%', left='64%'),dict(text='CPI-Maoist', top='35%', left='4%'),dict(text='NPA', top='35%', left='27%'),dict(text='Maoists', top='35%', left='46%'),dict(text='TTP', top='35%', left='68%'),dict(text='PKK', top='59%', left='7%'),dict(text='Ansar Allah', top='59%', left='24%'),dict(text='AQAP', top='59%', left='47%'),dict(text='FARC', top='59%', left='67%'),dict(text='DPR', top='83%', left='7%'),dict(text='Muslim EXT', top='83%', left='24%'),dict(text='Al-Qaida', top='83%', left='45%'),dict(text='Fulani EXT', top='83%', left='64%'),],
        legend_opts=opts.LegendOpts(type_="scroll", pos_left="80%", orient="vertical"))#设置饼图的颜色
        p.set_colors(["tomato","wheat","greenyellow","yellow","gold","pink","orange","steelblue","beige","red","lightcyan","aquamarine","sandybrown","lightgreen","skyblue","peachpuff","salmon","lavender","teal","khaki"])return p.render_notebook()

image-20221006212223908

根据该图可以直观的分析各个恐怖组织发动恐怖袭击的对象,其中对军队、警察、个人公民等发动的袭击占比较多。

4.3.5 受恐怖袭击国家词云图

(1)统计每个国家受恐怖袭击的次数

country_list = csvRow.map(lambda x: x[8])
rdd=country_list.map(lambda x:(x,1))#组成键值对(word,1)
a=rdd.groupByKey().mapValues(len).sortBy(lambda x: x[1],False)print(a.collect())

image-20221009184443122

(2)利用pyecharts进行结果展示

import pyecharts.options as opts
from pyecharts.charts import WordCloud
from pyecharts.globalsimport SymbolType
defword_cloud()-> WordCloud():
    word = WordCloud()
    word.add(series_name="恐怖袭击词云", data_pair=a.collect(), word_size_range=[6,66],shape=SymbolType.DIAMOND)
    word.set_global_opts(
        title_opts=opts.TitleOpts(
            title="恐怖袭击词云", title_textstyle_opts=opts.TextStyleOpts(font_size=23)),
        tooltip_opts=opts.TooltipOpts(is_show=True),)return word.render_notebook()
word_cloud()

image-20221009184519730

根据词云图,恐怖袭击主要还是发生在伊拉克、阿富汗、巴基斯坦、印度等地区

4.3.6 每年恐怖袭击次数变化趋势分析

(1)统计每年全球发生恐怖袭击总次数

rdd = csvRow.map(lambda x:(x[1])).map(lambda x:(x,1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[0],True) 
year_month_count = rdd.collect()print(year_month_count)

image-20221009185305897

(2)利用pyecharts进行结果展示

import pyecharts.options as opts
from pyecharts.charts import Line

y=[i[1]for i in year_month_count[0:20]]
x=[i[0]for i in year_month_count[0:20]]

line=(
    Line().set_global_opts(
        tooltip_opts=opts.TooltipOpts(is_show=False),
        xaxis_opts=opts.AxisOpts(type_="category"),
        yaxis_opts=opts.AxisOpts(
            type_="value",
            axistick_opts=opts.AxisTickOpts(is_show=True),
            splitline_opts=opts.SplitLineOpts(is_show=True),),).add_xaxis(xaxis_data=x).add_yaxis(
        series_name="恐怖袭击年份图",
        y_axis=y,
        symbol="emptyCircle",
        is_symbol_show=True,
        label_opts=opts.LabelOpts(is_show=True),
        markline_opts=opts.MarkLineOpts(
            data=[opts.MarkLineItem(type_="average", name="平均值")]),))
line.render_notebook()

image-20221009185336977根据以上恐怖袭击数量在各个年份的折线图,可得自1970年到1992年,恐怖袭击数量逐年缓慢增加,之后逐年呈现缓慢减少趋势; 但在2005年之后,恐怖袭击数量再次回升且增速较大,至2014年后才开始减少可结合国际实时对数据进行分析,数量变化的原因包括政治、经济等原因。 在2005年之后,恐怖袭击数量再次回升,有很大程度原因是互联网技术的发展。

4.3.7 恐怖袭击攻击类型分析

(1)统计每种恐怖袭击类型的数量

rdd = csvRow.map(lambda x:(x[29])).map(lambda x:(x,1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[0],True) 
attack_type = rdd.collect()

image-20221009191559367

(2)利用pyecharts进行结果展示

from pyecharts import options as opts
from pyecharts.charts import Pie
from pyecharts.faker import Faker

typ =[i[0]for i in attack_type[0:8]]
value =[i[1]for i in attack_type[0:8]]
c =(
    Pie().add("",[list(z)for z inzip(typ, value)]).set_global_opts(
        title_opts=opts.TitleOpts(title="恐怖袭击类型分布图"),
        legend_opts=opts.LegendOpts(type_="scroll", pos_left="85%", orient="vertical"),).set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}")))
c.render_notebook()

image-20221009191709312

根据攻击类型饼图,恐怖袭击的主要方式还是采用炸弹、刺杀以及武装袭击。其中炸弹占比接近一半以上。

4.4 standalone的client模式以及cluster模式提交任务

standalone运行机制:

27909487-eb821fe0f6315d42

Standalone 集群有四个重要组成部分, 分别是:

  • Driver: 是一个进程,我们编写的 Spark 应用程序就运行在 Driver 上, 由Driver 进程执行;
  • Master:是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责;
  • Worker:是一个进程,一个 Worker 运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储 RDD 的某个或某些 partition;另一个是启动其他进程和线程(Executor) ,对 RDD 上的 partition 进行并行的处理和计算。
  • Executor:是一个进程, 一个 Worker 上可以运行多个 Executor, Executor 通过启动多个线程( task)来执行对 RDD 的 partition 进行并行计算,也就是执行我们对 RDD 定义的例如 map、flatMap、reduce 等算子操作。

spark-submit 详细参数说明
参数名****参数说明–mastermaster 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local–deploy-mode在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client–class应用程序的主类,仅针对 java 或 scala 应用–name应用程序的名称–jars用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下–packages包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标–exclude-packages为了避免冲突 而指定不包含的 package–repositories远程 repository–conf PROP=VALUE指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions=“-XX:MaxPermSize=256m”–properties-file加载的配置文件,默认为 conf/spark-defaults.conf–driver-memoryDriver内存,默认 1G–driver-java-options传给 driver 的额外的 Java 选项–driver-library-path传给 driver 的额外的库路径–driver-class-path传给 driver 的额外的类路径–driver-coresDriver 的核数,默认是1。在 yarn 或者 standalone 下使用–executor-memory每个 executor 的内存,默认是1G–total-executor-cores所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用–num-executors启动的 executor 数量。默认为2。在 yarn 下使用–executor-core每个 executor 的核数。在yarn或者standalone下使用
在选择提交任务的方式时仅需根据上边的提交参数进行修改即可,后边的yarn模式也一样。

4.4.1 standalone-client提交任务方式

27909487-8b404ed751f56fdb

  • 在Standalone Client模式下,Driver在任务提交的本地机器上运行,
  • Driver启动后向Master注册应用程序,Master根据submit脚本的资源需求找到内部资源至少可以启动一个Executor的所有Worker,
  • 然后在这些Worker之间分配Executor,Worker上的Executor启动后会向Driver反向注册,所有的Executor注册完成后,
  • Driver开始执行main函数,之后执行到Action算子时,开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。

这里以计算pi值为例子,进入到spark安装目录的bin文件夹下,通过以下命令提交:

./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://master:7077 \
--deploy-mode client \
--driver-memory 1G \
--executor-memory 1G \
--total-executor-cores 2 \
--executor-cores 1 \
/usr/local/spark-3.3.0-bin-hadoop2/examples/jars/spark-examples_2.12-3.3.0.jar 10

image-20221006094333523

这种模式运行结果,直接在客户端显示出来了。

image-20221006094403183

4.4.2 standalone-cluster提交任务方式

27909487-7d9a640bba23c792

  • 在 Standalone Cluster 模式下,任务提交后,Master 会找到一个 Worker 启动 Driver进程,
  • Driver 启动后向 Master 注册应用程序,
  • Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个 Executor 的所有 Worker,
  • 然后在这些 Worker 之间分配 Executor,Worker 上的 Executor 启动后会向 Driver 反向注册,
  • 所有的 Executor 注册完成后,Driver 开始执行 main 函数,之后执行到 Action 算子时,开始划分 stage,每个 stage 生成对应的 taskSet,之后将 task 分发到各个 Executor 上执行。
./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://master:7077 \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 1G \
--total-executor-cores 2 \
--executor-cores 1 \
/usr/local/spark-3.3.0-bin-hadoop2/examples/jars/spark-examples_2.12-3.3.0.jar 10

这种模式基本上没什么输出信息,需要登录web页面查看

image-20221009193348464

image-20221006094451442

查看driver日志信息

image-20221006094519532

最终在driver日志里查看运行结果了。

image-20221008210003992

4.5 YARN的client模式以及cluster模式提交作业

Yarn模式下仅需要将上述–master下的参数调整为yarn即可。

4.5.1 yarn-client提交任务方式

在Yarn-client中,Driver运行在Client上,通过ApplicationMaster向RM获取资源。本地Driver负责与所有的executor container进行交互,并将最后的结果汇总。结束掉终端,相当于kill掉这个spark应用。

因为Driver在客户端,所以可以通过webUI访问Driver的状态,默认是http://master:4040访问,而YARN通过http:// master:8088访问。

因为是与Client端通信,所以Client不能关闭。客户端的Driver将应用提交给Yarn后,Yarn会先后启动ApplicationMaster和executor,另外ApplicationMaster和executor都 是装载在container里运行,container默认的内存是1G,ApplicationMaster分配的内存是driver- memory,executor分配的内存是executor-memory。同时,因为Driver在客户端,所以程序的运行结果可以在客户端显 示,Driver以进程名为SparkSubmit的形式存在。

image-20221009193603901

提交之后与standalone模式一样直接在下方命令行就可以显示出来,提交命令如下:

./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-client \
--driver-memory 1G \
--executor-memory 1G \
--executor-cores 1 \
/usr/local/spark-3.3.0-bin-hadoop2/examples/jars/spark-examples_2.12-3.3.0.jar 10

4.5.1 yarn-cluster提交任务方式

在YARN-Cluster模式中,当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动;第二个阶段是由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成。

应用的运行结果不能在客户端显示,计算结果打开yarn,在yarn日志里可以查看。

./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--driver-memory 1G \
--executor-memory 1G \
--total-executor-cores 2 \
--executor-cores 1 \
/usr/local/spark-3.3.0-bin-hadoop2/examples/jars/spark-examples_2.12-3.3.0.jar 10

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2tJcqD1A-1665414075890)(C:\Users\Fuwenshuai\AppData\Roaming\Typora\typora-user-images\image-20221009185830956.png)]

五 实验遇到的问题和解决办法

问题1

在配置jupyter_notebook_config.py时,需要给jupyter notebook设置映射端口,由于在实验一创建master容器的时候只配置了50070与8088端口,于是需要添加新的端口。

具体解决步骤如下:

方法1:

  1. 在本电脑中输入\wsl$\docker-desktop-data\data\docker\containers,

也可能是\wsl$\docker-desktop-data\version-pack-data\community\docker\containers,安装的docker版本不一致可能路径不一样,根据需求找到containers文件夹即可

图片2

  1. 找到与master节点对应的文件夹,可以在docker中找到对应的文件夹(我的master对应的是第三个11ba021c…文件夹):

  1. 找到如下两个json文件

图片3

  1. 添加配置端口

在config.v2文件中找到ExposePorts:按照格式添加4040与7077端口
图片4
在hostconfig文件中找到PortBindings:按照格式 添加4040与7077端口:
图片5

  1. 最后重启docker,在master节点上即可发现新添加的4040与7077端口了

图片6

方法2

将现有的容器打包成镜像,然后在使用新的镜像运行容器时重新指定要映射的端口,大概过程如下:

  1. 先停止现有容器:
docker stop container-name
  1. 将容器commit成为一个镜像
docker commit container-name  new-image-name
  1. 用新镜像运行容器:
docker run -it -d --name container-name -p p1:p1 -p p2:p2 new-image-name

问题2

在运行Jupyter notebook时出现如下问题:

图片7

配置文件jupyter_notebook_config.py中的c.NotebookApp全部改为c.ServerApp。但是实际上解决办法是:输入jupyter notebook --allow-root 即可解决。

问题3

在运行Anaconda3-2021.05-Linux-x86_64.sh文件时,如何将Anaconda3安装到指定目录下?

按照实验指导书做的时候,最后配置文件jupyter_notebook_config.py与环境变量时找不到/opt/app/anaconda3,后面发现是最先运行.sh文件时,没有指定目录,默认装到了root下

方法1:

重新运行.sh文件,提示:

Do you accept the license terms? [yes|no]

输入yes

然后出现:

[/root/anaconda3] >>>

在后面输入你想要安装的目录路径,如/opt/app/anaconda3即可,然后后续运行jupyter时就不会出现找不到文件的情况。

方法2:

在最开始运行.sh文件时直接指定安装目录:如/opt/app/anaconda3

bash ./Anaconda3-2020.02-Linux-x86_64.sh -p /opt/app/anaconda3 -u

问题4

jupyter运行

jupyter notebook

jupyter lab

出现“Running as root is not recommended. Use --allow-root to bypass”错误:

解决方法:

  1. 使用 jupyter notebook --generate-config 命令生成jupyter配置文件,将会提示当前生成的配置文件的存放路径,一般为 ~/.jupyter/jupyter_notebook_config.py
  2. 修改配置文件vim ~/.jupyter/jupyter_notebook_config.py打开配置文件,找到 #c.NotebookApp.allow_root = False ,去掉#,并修改为True。
  3. 保存该文件,使用jupyter notebook重新运行程序。

问题5

关于使用Docker-desktop重启后/etc/hosts中之前设置的ip消失的问题,文件设置的ip和主机名的映射也会消失,重新再使用这几个容器的时候还需要重新设置ip,每次都手工写会很麻烦。

解决方法:

我们可以通过写一个脚本的方式来解决,如在master节点中,新建一个addHosts.sh脚本:

vim addHosts.sh

脚本内容为:

echo 172.18.0.3 slave1 >> /etc/hosts
echo 172.17.0.4 slave2 >> /etc/hosts

image-20221008222706544

然后添加执行权限:

chmod +x addHosts.sh 

同样的,在另外两台slave节点上也添加脚本,并分别执行权限:

image-20221008222844462

image-20221008223046037

之后在每次重启之后执行脚本命令即可。

问题6

在容器内安装ssh服务,但是会发现当容器重启后,ssh服务并不能跟着重启,需要手动进入容器执行命令才可以重启。

解决方法:

在 /root 目录下新建一个 start_ssh.sh文件,并给予该文件可执行权限

touch /root/start_ssh.sh
vim /root/start_ssh.sh
chmod +x /root/start_ssh.sh

start_ssh.sh 脚本的内容,如下:

#!/bin/bash
LOGTIME=$(date "+%Y-%m-%d %H:%M:%S")
echo "[$LOGTIME] startup run..." >>/root/start_ssh.log
service ssh start >>/root/start_ssh.log
#service mysql start >>/root/star_mysql.log   //其他服务也可这么实现

将start_ssh.sh脚本添加到启动文件中

vim .bashrc

在.bashrc 文件末尾加入如下内容:

# startup run
if [ -f /root/start_ssh.sh ]; then
      /root/start_ssh.sh
fi

保存后,等下次重启容器的时候,添加的服务也就跟着重启了。

问题7

在启动hadoop输入start-all.sh ,master节点尝试连接其他主机并启动相应节点,但始终连接不上,尝试直接通过ssh命令免密登录其他主机,结果报出警告:the ECDSA host key for ‘boylenubuntu0’ differs from the key for the IP address,输入yes后仍可以通过密码正常连接其他主机,但集群节点却启动不起来,还会报出“IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY。Someone could be eavesdropping on you”。

解决方法:

同一个ip地址,已经连过第一次了,那次保存了那个ip地址的“指纹”。现在仍然对这个ip地址进行连接,发现其“指纹”和保存的不一样。出于安全考虑,报错。为什么会不一样,这个原因有很多,例如服务器重装系统这个应该会导致这个现象,或者其他原因我也不得而知。

只需要删除:/home/root/.ssh目录下的known_hosts文件,该文件为ssh连接过程中记录下的各个主机的IP以及各种信息,上面报错也是因为该文件中存有的IP-用户名与现有的IP-用户名相冲突。该文件删除不影响任何使用。

image-20221008224017844

删除之后,重新配好.ssh下的权限,在终端中执行如下指令即可:

sudo chmod 700 ~/.ssh
sudo chmod 600 ~/.ssh/authorized_keys

注意以上两条指令必不可少,也不可以乱改权限码,否则依然会让你输入密码,实现不了ssh免密。

最后重新测试ssh主机名即可。免密登录也不会报错,集群也可以正常启动。

问题8

实验时老是会遇到可能某一步出现错误导致需要从头开始,有时甚至更危险的是需要将所有配置信息重新配置一遍,这就想到了在使用vmware虚拟机时的“快照”功能,在docker中,也可以通过备份来实现“快照”功能。具体实现方法:

在命令行中输入docker ps 命令:

image-20221008225314932

在此之后,我们要选择我们想要备份的容器,然后去创建该容器的快照。我们可以使用 docker commit 命令根据容器id来创建快照。

docker commit -p 3e4590b36d0d container-backup  #其中container-backup为镜像名称

该命令会生成一个作为Docker镜像的容器快照,我们可以通过运行

docker images

命令来查看Docker镜像,如下

image-20221008225952567

正如我们所看见的,上面做的快照已经作为Docker镜像保存了。现在,为了备份该快照,我们有两个选择,一个是我们可以登录进Docker注册中心,并推送该镜像;另一个是我们可以将Docker镜像打包成tar包备份,以供今后使用。

这里以打包成tar包为例将此镜像保存在本地机器中,以供日后使用。要完成该操作,我们需要运行以下

docker save

命令。

docker save -o ~/container-backup.tar container-backup

接下来,在我们成功备份了我们的Docker容器后,我们现在来恢复这些制作了Docker镜像快照的容器。

如果我们将这些Docker镜像作为tar包文件备份到了本地,那么我们只要使用 docker load 命令,后面加上tar包的备份路径,就可以加载该Docker镜像了。

docker load -i ~/container-backup.tar

现在,为了确保这些Docker镜像已经加载成功,我们来运行 docker images 命令。

docker images

在镜像被加载后,我们将用加载的镜像去运行Docker容器。

docker run -d  container-backup

六 实验思考与体会

  1. 本次实验的一大体会在于对全球恐怖主义袭击数据分析上,该数据集过于庞大,记得以前在数据挖掘课程上用的是pandas库对该数据集进行分析,由于数据量太大导致运行代码效率偏低,但是在通过spark集群,利用rdd分布式处理能够极大提高运行代码的效率。利用分布式集群极大地提高了数据分析领域的发展空间。

在命令行中输入docker ps 命令:

[外链图片转存中…(img-K211Tv6z-1665414075894)]

在此之后,我们要选择我们想要备份的容器,然后去创建该容器的快照。我们可以使用 docker commit 命令根据容器id来创建快照。

docker commit -p 3e4590b36d0d container-backup  #其中container-backup为镜像名称

该命令会生成一个作为Docker镜像的容器快照,我们可以通过运行

docker images

命令来查看Docker镜像,如下

[外链图片转存中…(img-CMzIECUD-1665414075895)]

正如我们所看见的,上面做的快照已经作为Docker镜像保存了。现在,为了备份该快照,我们有两个选择,一个是我们可以登录进Docker注册中心,并推送该镜像;另一个是我们可以将Docker镜像打包成tar包备份,以供今后使用。

这里以打包成tar包为例将此镜像保存在本地机器中,以供日后使用。要完成该操作,我们需要运行以下

docker save

命令。

docker save -o ~/container-backup.tar container-backup

接下来,在我们成功备份了我们的Docker容器后,我们现在来恢复这些制作了Docker镜像快照的容器。

如果我们将这些Docker镜像作为tar包文件备份到了本地,那么我们只要使用 docker load 命令,后面加上tar包的备份路径,就可以加载该Docker镜像了。

docker load -i ~/container-backup.tar

现在,为了确保这些Docker镜像已经加载成功,我们来运行 docker images 命令。

docker images

在镜像被加载后,我们将用加载的镜像去运行Docker容器。

docker run -d  container-backup

六 实验思考与体会

  1. 本次实验的一大体会在于对全球恐怖主义袭击数据分析上,该数据集过于庞大,记得以前在数据挖掘课程上用的是pandas库对该数据集进行分析,由于数据量太大导致运行代码效率偏低,但是在通过spark集群,利用rdd分布式处理能够极大提高运行代码的效率。利用分布式集群极大地提高了数据分析领域的发展空间。
  2. 其次,本次实验内容不多,但容易在许多细节的地方出现错误,也是因此浪费了许多时间,总结的体会就是在执行每一步时要明白这一步的意义所在,盲目去按照指导书去做可能会漏掉一些步骤也可能因为版本不匹配而出现错误。因此,对实验的深刻理解才是实验顺利完成的基础。
标签: 分布式

本文转载自: https://blog.csdn.net/m0_46526335/article/details/127254471
版权归原作者 小坏蛋儿& 所有, 如有侵权,请联系我们删除。

“基于 RDD 的分布式数据处理实验(pyspark)”的评论:

还没有评论