0


安装Spark-单机部署,Standalone集群部署,Spark on Yarn实现


安装spark的前提是已经在完成了Hadoop的部署

单机部署spark本地模式部署

部署:Anaconda部署Python(3台机器都需要)

  • 目标:实现Linux机器上使用Anaconda部署Python
  • 上传:
  1. cd /export/server/
  • 安装
  1. # 添加执行权限
  2. chmod u+x Anaconda3-2020.07-Linux-x86_64.sh
  3. # 执行
  4. sh ./Anaconda3-2020.07-Linux-x86_64.sh
  5. # 过程
  6. #第一次:【直接回车,然后按q】
  7. Please, press ENTER to continue
  8. >>>
  9. #第二次:【输入yes】
  10. Do you accept the license terms? [yes|no]
  11. [no] >>> yes
  12. #第三次:【输入解压路径:/export/server/anaconda3】
  13. [/root/anaconda3] >>> /export/server/anaconda3
  14. #第四次:【输入yes,是否在用户的.bashrc文件中初始化Anaconda3的相关内容】
  15. Do you wish the installer to initialize Anaconda3
  16. by running conda init? [yes|no]
  17. [no] >>> yes

  • 激活
  1. # 刷新环境变量
  2. source /root/.bashrc
  3. # 激活虚拟环境,如果需要关闭就使用:conda deactivate
  4. conda activate
  • 验证
  1. python3

  • 配置
  1. # 编辑环境变量
  2. vim /etc/profile
  3. # 添加以下内容
  4. # Anaconda Home
  5. export ANACONDA_HOME=/export/server/anaconda3
  6. export PATH=$PATH:$ANACONDA_HOME/bin
  7. # 刷新环境变量
  8. source /etc/profile
  9. # 创建软连接
  10. ln -s /export/server/anaconda3/bin/python3 /usr/bin/python3
  11. # 验证
  12. echo $ANACONDA_HOME

部署:Spark本地模式部署

  • 目标:实现Spark本地模式的单机部署
  • 上传
  1. cd /export/server/
  • 安装
  1. # 解压安装
  2. tar -zxf spark-3.1.2-bin-hadoop3.2.tgz -C /export/server
  3. # 重命名
  4. cd /export/server
  5. mv spark-3.1.2-bin-hadoop3.2 spark-local
  6. # 创建软连接
  7. ln -s spark-local spark

单机部署:Spark Python Shell

  • 目标:掌握Spark Shell的基本使用

  • 实施- 功能:提供一个交互式的命令行,用于测试开发Spark的程序代码- Spark的客户端bin目录下:提供了多个测试工具客户端- pyspark:Python命令行- spark-submit:用于提交我们开发的Spark程序的- beeline:SQL命令行- spark-sql:SQL命令行- 启动

  1. # --master指定运行的模式,local代表本地模式,[N]代表这个程序运行给定几核CPU
  2. /export/server/spark/bin/pyspark --master local[2]

  • 核心 - Spark context Web UI available at http://node1.itcast.cn:4040- Spark为每个程序都提供了一个Web监控界面,端口从4040开始,如果被占用就不断+1- 方便我们对每个程序的运行状况做监控用的- Spark context available as 'sc' (master = local[2], app id = local-1637509567232).- SparkContext是Spark程序中的一个类,这个类是所有Spark程序都必须有的,负责读取数据,调度Task等- 当前的程序中默认构建了一个SparkContext类的对象叫做sc- SparkSession available as 'spark'- SparkSession也是Spark程序中的一个类,功能类似于SparkContext,Spark2.0以后推出的,主要用于SparkSQL- 当前的程序中默认构建了一个SparkSession类的对象叫做spark

Spark的Standalone集群部署

Standalone集群架构

  • 目标:理解Standalone集群架构

  • 对比
    概念MR+YARNSpark Standalone主节点ResourceManagerMaster从节点NodeManagerWoker计算进程MapTask/ReduceTaskExecutor

  • 分布式主从架构:整体的功能及架构高度类似于YARN【ResourceManager、NodeManager】- 分布式架构- 普通分布式主从架构:HDFS、YARN、Spark、Flink、Hbase => 主节点单点故障问题- 解决主节点单点故障问题:HA高可用架构来解决- 公平分布式主从架构:Zookeeper- 不存在所讲的单点故障问题,Zookeeper负责帮别人解决单点故障问题- 整个大数据平台中ZK的场景:1-辅助实现HA,解决单点故障问题。2-存储实时工具元数据

  • 功能:提供分布式资源管理和任务调度

  • 主:Master:管理节点,类比于YARN中的RM- 接受客户端请求:所有程序的提交,都是提交给主节点- 管理从节点:通过心跳机制检测所有的从节点的健康状态- 资源管理和任务调度:将所有从节点的资源在逻辑上合并为一个整体,将任务分配给不同的从节点

  • 从:Worker:计算节点,类比于YARN中NM- 使用自己所在节点的资源运行计算进程Executor:给每个计算进程分配一定的资源- 所有Task线程计算任务就运行在Executor进程中- 假设每台机器机器:32Core - 64GB- 那么Worker的资源由配置决定,例如16Core - 32GB- 表示Worker最多能使用这台机器的16Core32GB的资源用于计算

  • 注意:Executor类似于MapTask或者ReduceTask进程,每个程序的Executor只启动一次

Standalone集群部署

  • 目标:实现Spark Standalone集群的部署

实施

  1. # 三台机器都执行
  2. cd /export/server
  3. ln -s jdk1.8.0_241 jdk
  4. ln -s hadoop-3.3.0 hadoop
  5. # 只在node1上执行
  6. ln -s hive-3.1.2/ hive
  • 安装Spark:第一台机器# 解压安装cd /export/server/tar -zxf spark-3.1.2-bin-hadoop3.2.tgz -C /export/server# 重命名mv spark-3.1.2-bin-hadoop3.2 spark-standalone# 重新构建软连接rm -rf sparkln -s spark-standalone spark

  • 配置集群节点【注意不要从PDF复制,要从md文件复制,进入vim以后要按i或者o】- spark-env.sh:Spark环境配置cd /export/server/spark/confmv spark-env.sh.template spark-env.shvim spark-env.sh``````# 22行:申明JVM环境路径以及Hadoop的配置文件路径export JAVA_HOME=/export/server/jdkexport HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop# 60行左右SPARK_MASTER_HOST=node1.itcast.cnSPARK_MASTER_PORT=7077SPARK_MASTER_WEBUI_PORT=8080SPARK_WORKER_CORES=1SPARK_WORKER_MEMORY=1gSPARK_WORKER_PORT=7078SPARK_WORKER_WEBUI_PORT=8081SPARK_DAEMON_MEMORY=1gSPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1.itcast.cn:8020/spark/eventLogs/ -Dspark.history.fs.cleaner.enabled=true"

  1. # 注释:不用放到配置文件中
  2. SPARK_MASTER_HOST=node1.itcast.cn #用于指定Master主节点的地址
  3. SPARK_MASTER_PORT=7077 #用于指定Master任务提交端口
  4. SPARK_MASTER_WEBUI_PORT=8080 #用于指定Master Web界面的端口,类似于YARN中的8088
  5. SPARK_WORKER_CORES=1 #用于指定每个Worker能用机器多少核CPU
  6. SPARK_WORKER_MEMORY=1g #用于指定每个Worker能用机器多少内存
  7. SPARK_WORKER_PORT=7078 #用于指定从节点的通信端口
  8. SPARK_WORKER_WEBUI_PORT=8081 #用于指定从节点的Web端口
  9. SPARK_DAEMON_MEMORY=1g #运行进程使用的资源
  10. SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1.itcast.cn:8020/spark/eventLogs/ -Dspark.history.fs.cleaner.enabled=true" # 配置Spark的HistoryServer进程日志存储配置
  • 在HDFS上创建程序日志存储目录# 第一台机器启动HDFSstart-dfs.sh# 创建程序运行日志的存储目录hdfs dfs -mkdir -p /spark/eventLogs/# 在浏览器中访问http://192.168.88.100:9870/

  • spark-defaults.conf:Spark属性配置文件

  1. cd /export/server/spark/conf
  2. mv spark-defaults.conf.template spark-defaults.conf
  3. vim spark-defaults.conf
  1. # 末尾
  2. spark.eventLog.enabled true
  3. spark.eventLog.dir hdfs://node1.itcast.cn:8020/spark/eventLogs
  4. spark.eventLog.compress true
  • workers:从节点地址配置文件
  1. mv workers.template workers
  2. vim workers
  1. # 删掉localhost,添加以下内容
  2. node1.itcast.cn
  3. node2.itcast.cn
  4. node3.itcast.cn
  • log4j.properties:日志配置文件
  1. mv log4j.properties.template log4j.properties
  2. vim log4j.properties
  1. # 19行:修改日志级别为WARN
  2. log4j.rootCategory=WARN, console
  • 分发同步- 分发
  1. cd /export/server/
  2. ln -s spark-standalone spark
  • 第二台和第三台创建软链接
  1. cd /export/server/
  2. ln -s spark-standalone spark

Standalone集群启动

  • 目标掌握Spark Standalone集群的启动管理
  • 实施- 启动- 第一台机器执行命令- 启动 hdfs
  1. # 第一台机器启动HDFS
  2. start-dfs.sh
  • 启动Master
  1. cd /export/server/spark
  2. sbin/start-master.sh

  • 启动Worker
  1. sbin/start-workers.sh

  • 启动HistoryServer
  1. sbin/start-history-server.sh

  • 监控:启动以后才能访问- Master监控服务:相当于YARN中的8080
  1. http://node1:8080/

HistoryServer历史监控服务:相当于MR中的19888

  1. http://node1:18080/

  • 端口不要记混了- Master提交程序端口:7077- Master Web界面的端口:8080- HistoryServer Web界面端口:18080

  • 关闭

  1. # 关闭Master
  2. sbin/stop-master.sh
  3. # 关闭Worker
  4. sbin/stop-workers.sh
  5. # 关闭HistoryServer
  6. sbin/stop-history-server.sh

Standalone集群测试

  • 目标实现Spark程序在Standalone集群上的测试
  • 实施- 圆周率测试
  1. # 提交程序脚本:bin/spark-submit
  2. # --master:指定运行模式,本地模式:local, Standalone集群:spark://主节点地址:7077
  3. /export/server/spark/bin/spark-submit \
  4. --master spark://node1.itcast.cn:7077 \
  5. --conf "spark.pyspark.driver.python=/export/server/anaconda3/bin/python3" \
  6. --conf "spark.pyspark.python=/export/server/anaconda3/bin/python3" \
  7. /export/server/spark/examples/src/main/python/pi.py \
  8. 200
  • 提交程序提交给主节点
  • 主节点是Master,Master用于接受客户端请求的7077

Spark on YARN的实现

Spark on YARN:资源设计

  • 目标掌握Spark on YARN的设计
  • 实施- 问题为什么要将Spark的程序运行在YARN上,不运行在自带的Standalone集群上?- 实现- 统一化资源管理- 工作中的计算集群大多数情况下只有1套集群- 如果Hadoop生态的程序,例如MR、Hive、Sqoop、Oozie等使用YARN来计算- 而Spark的程序单独用Standalone集群来计算- 就导致了一套硬件资源被两套资源管理平台所管理,使用时会导致资源竞争冲突等问题- 不能充分的发挥硬件资源的性能且管理麻烦- 自由开发模式- 使用YARN统一化管理整个硬件集群的所有计算资源:公共分布式资源平台- YARN支持多种类型程序的运行:MR、Tez、Spark、Flink等- 成熟的资源调度机制- 支持多队列、多种调度器可以实现不同场景下的计算资源隔离和任务调度- YARN中Capacity、Fair调度器- 关闭原来的Standalone集群
  1. cd /export/server/spark
  2. sbin/stop-master.sh
  3. sbin/stop-workers.sh
  4. sbin/stop-history-server.sh

Spark on YARN:配置测试

  • 目标实现Spark on YARN的配置及测试
  • 实施- 搭建- 准备工作(三台机器都需要做)
  1. cd /export/server
  2. ln -s jdk1.8.0_241 jdk
  3. ln -s hadoop-3.3.0 hadoop
  • 解压:第一台机器
  1. cd /export/server
  2. tar -zxf spark-3.1.2-bin-hadoop3.2.tgz -C /export/server
  3. mv spark-3.1.2-bin-hadoop3.2 spark-yarn
  4. rm -rf /export/server/spark
  5. ln -s /export/server/spark-yarn /export/server/spark
  • 修改配置- spark-env.sh
  1. cd /export/server/spark-yarn/conf
  2. mv spark-env.sh.template spark-env.sh
  3. vim /export/server/spark-yarn/conf/spark-env.sh
  1. ## 22行左右设置JAVA安装目录、HADOOP和YARN配置文件目录
  2. export JAVA_HOME=/export/server/jdk
  3. export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
  4. export YARN_CONF_DIR=/export/server/hadoop/etc/hadoop
  5. ## 历史日志服务器
  6. SPARK_DAEMON_MEMORY=1g
  7. SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1.itcast.cn:8020/spark/eventLogs/ -Dspark.history.fs.cleaner.enabled=true"
  • spark-defaults.conf
  1. cd /export/server/spark-yarn/conf
  2. mv spark-defaults.conf.template spark-defaults.conf
  3. vim spark-defaults.conf
  1. ## 添加内容:
  2. spark.eventLog.enabled true
  3. spark.eventLog.dir hdfs://node1.itcast.cn:8020/spark/eventLogs
  4. spark.eventLog.compress true
  5. spark.yarn.historyServer.address node1.itcast.cn:18080
  6. spark.yarn.jars hdfs://node1.itcast.cn:8020/spark/jars/*
  • 上传spark jar
  1. #因为YARN中运行Spark,需要用到Spark的一些类和方法
  2. #如果不上传到HDFS,每次运行YARN都要上传一次,比较慢
  3. #所以自己手动上传一次,以后每次YARN直接读取即可
  4. hdfs dfs -mkdir -p /spark/jars/
  5. hdfs dfs -put /export/server/spark-yarn/jars/* /spark/jars/
  6. hdfs dfs -mkdir -p /spark/eventLogs
  • 修改yarn-site.xml【不用做】
  1. vim /export/server/hadoop/etc/hadoop/yarn-site.xml
  1. ## 添加如下内容
  2. <!-- 开启日志聚合功能 -->
  3. <property>
  4. <name>yarn.log-aggregation-enable</name>
  5. <value>true</value>
  6. </property>
  7. <!-- 设置聚合日志在hdfs上的保存时间 -->
  8. <property>
  9. <name>yarn.log-aggregation.retain-seconds</name>
  10. <value>604800</value>
  11. </property>
  12. <!-- 设置yarn历史服务器地址 -->
  13. <property>
  14. <name>yarn.log.server.url</name>
  15. <value>http://node1.itcast.cn:19888/jobhistory/logs</value>
  16. </property>
  17. <!-- 关闭yarn内存检查 -->
  18. <property>
  19. <name>yarn.nodemanager.pmem-check-enabled</name>
  20. <value>false</value>
  21. </property>
  22. <property>
  23. <name>yarn.nodemanager.vmem-check-enabled</name>
  24. <value>false</value>
  25. </property>
  • 分发yarn-site.xml
  1. cd /export/server/hadoop/etc/hadoop
  2. scp -r yarn-site.xml root@node2.itcast.cn:$PWD
  3. scp -r yarn-site.xml root@node3.itcast.cn:$PWD
  • 分发:将第一台机器的spark-yarn分发到第二台和第三台** (可以省略)**

  • 分发:第一台机器操作

  1. cd /export/server/
  2. scp -r spark-yarn node2:$PWD
  3. scp -r spark-yarn node3:$PWD
  • 修改第二台和第三台软连接
  1. rm -rf /export/server/spark
  2. ln -s /export/server/spark-yarn /export/server/spark
  • 启动:第一台机器
  1. # 启动hdfs
  2. start-dfs.sh
  3. # 启动yarn
  4. start-yarn.sh
  5. # 启动MR的JobHistoryServer:19888
  6. mapred --daemon start historyserver
  7. # 启动Spark的HistoryServer:18080
  8. /export/server/spark/sbin/start-history-server.sh
  • 测试- 官方圆周率计算
  1. /export/server/spark/bin/spark-submit \
  2. --master yarn \
  3. /export/server/spark-yarn/examples/src/main/python/pi.py \
  4. 10
  • 自己开发WorkCount
  1. # 指定运行资源
  2. /export/server/spark/bin/spark-submit \
  3. --master yarn \
  4. --driver-memory 512M \
  5. --driver-cores 1 \
  6. --supervise \
  7. --executor-memory 1G \
  8. --executor-cores 1 \
  9. --num-executors 2 \
  10. hdfs://node1:8020/spark/app/py/pyspark_core_word_args.py \
  11. /spark/wordcount/input \
  12. /spark/wordcount/output9
  • pyspark_core_word_args.py 内容
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import time
  4. from pyspark import SparkContext, SparkConf
  5. import os
  6. import re
  7. import sys
  8. if __name__ == '__main__':
  9. # todo:0-设置系统环境变量:全部换成Linux地址
  10. os.environ['JAVA_HOME'] = '/export/server/jdk'
  11. os.environ['HADOOP_HOME'] = '/export/server/hadoop'
  12. os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'
  13. os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'
  14. # todo:1-构建SparkContext
  15. # conf = SparkConf().setMaster("local[2]").setAppName("Remote Test APP")
  16. conf = SparkConf().setAppName("Remote Test APP")
  17. sc = SparkContext(conf=conf)
  18. # todo:2-数据处理:读取、转换、保存
  19. # 2.1 读
  20. # input_rdd = sc.textFile("hdfs://node1:8020/spark/wordcount/input/b_word_re.txt")
  21. input_rdd = sc.textFile(sys.argv[1])
  22. # print(input_rdd.take(10))
  23. # 2.2 处理
  24. rs_rdd = (
  25. input_rdd
  26. .filter(lambda line: len(line.strip())>0)
  27. # .flatMap(lambda line: line.strip().split(" "))
  28. .flatMap(lambda line: re.split('\\s+', line))
  29. .map(lambda word: (word, 1))
  30. .reduceByKey(lambda a, b: a + b)
  31. .sortBy(lambda x: x[1])
  32. )
  33. rs_rdd.foreach(lambda x: print(x))
  34. # 2.3 写
  35. # rs_rdd.saveAsTextFile("hdfs://node1:8020/spark/wordcount/output_8")
  36. rs_rdd.saveAsTextFile(sys.argv[2])
  37. # todo:3-关闭SparkContext
  38. sc.stop()

本文转载自: https://blog.csdn.net/weixin_58305115/article/details/142745516
版权归原作者 木鬼与槐 所有, 如有侵权,请联系我们删除。

“安装Spark-单机部署,Standalone集群部署,Spark on Yarn实现”的评论:

还没有评论