spark第一篇简介
spark概述
- 什么是spark - spark是一个基于内存计算的引擎,计算速度非常快,但是并没有涉及到数据的存储,后期想要处理数据,需要引入外部的数据源(比如hadoop中数据)
- 为什么要用spark - 比mapreduce计算速度快很多。
spark的特性
- 1、速度快 - 比mapreduce在内存中快100x,在磁盘中快10x- 快的原因 - 1、spark的中间处理结果数据可以保存在内存中,mapreduce中间结果数据保存在磁盘中。- 2、mapreduce最终的任务是以进程中方式运行在集群中。比如有100个MapTask任务,1个ReduceTask任务。Spark任务是线程的方式运行在进程中。
- 2、易用性 - 可以快速的写一个spark应用程序通过4种语言(java/scala/Python/R)
- 3、通用性 - 可以使用spark sql /sparkStreaming/Mlib/Graphx
- 4、兼容性 - spark就是一个计算任务的程序,哪里可以给当前程序提供计算的资源,我们就可以把任务提交到哪里去运行 - yarn (资源的分配由resourceManager去分配资源)- standAlone(资源的分配由Master去分配资源)- mesos(apache下开源的资源调度框架)
spark集群安装
- 1、下载spark安装包- spark-2.0.2-bin-hadoop2.7.tgz- 下载地址spark官网:http://spark.apache.org/downloads.html
- 2、规划安装目录- /export/servers
- 3、上传安装包到服务器中
- 4、解压安装包到指定的安装目录- tar -zxvf spark-2.0.2-bin-hadoop2.7.tgz -C /export/servers
- 5、重命名安装目录- mv spark-2.0.2-bin-hadoop2.7 spark
- 6、修改配置文件- 6.1 进入到conf目录,修改spark-env.sh( mv spark-env.sh.template spark-env.sh)配置java环境变量 export JAVA_HOME=/export/servers/jdk配置master的地址 export SPARK_MASTER_HOST=node1配置master的端口 export SPARK_MASTER_PORT=7077- 6.2 修改slaves (mv slaves.template slaves) .指定整个集群中worker节点
node2node3
- 7、配置spark环境变量- 修改 /etc/profile
export SPARK_HOME=/export/servers/sparkexport PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
- 8、分发spark安装目录到其他节点
scp -r spark node2:/export/serversscp -r spark node3:/export/servers
- 9、分发spark环境变量到其他节点
scp /etc/profile node2:/etcscp /etc/profile node3:/etc
- 10、让所有节点的spark环境变量生效- 在所有节点上执行 - source /etc/profile- echo $SPARK_HOME 查看配置文件地址是否生效
spark集群启动和停止
- 1、启动spark集群 - 在主节点上执行脚本 - $SPARK_HOME/sbin/start-all.sh
- 2、停止spark集群 - 在主节点上执行脚本 - $SPARK_HOME/sbin/stop-all.sh
spark的web管理界面
- 启动spark集群后 - 访问 http://master地址:8080 可以通过这样一个界面,来查看整个spark集群相关信息和对应任务运行的情况。
基于zk搭建一个sparkHA
- 1、修改配置文件- vi spark-env.sh
1.1 注释掉手动指定master参数# export SPARK_MASTER_HOST=node11.2 引入zk 搭建sparkHAexport SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181 -Dspark.deploy.zookeeper.dir=/spark"
- 2、分发spark-env.sh到其他节点
scp spark-env.sh node2:$PWDscp spark-env.sh node3:$PWD
- 3、启动sparkHA集群- 3.1 启动zk- 3.2 在任意一台节点来启动 start-all.sh (保证每2台机器之间实现ssh免登陆) - 1、会在当前机器上产生一个Master进程- 2、worker进程通过slaves文件指定的节点上产生- 3.3 在另一台机器上单独启动master - sbin/start-master.sh
在整个spark集群中有一个活着的master,其他多个master都处于standby。此时,活着的master挂掉之后,zk会感知到,接下来会在所有的处于standby的master中进行选举,再次产生一个活着的master。此时这个活着的master会读取在zk中保存上次活着的master的相关信息。进行恢复!整个恢复过程需要1-2分钟。
在整个恢复的过程中:
之前运行的任务正常运行。只不过这里你再次提交新的任务到集群中的时候,此时没有活着的master去分配对应的资源,是无法提交到集群中去运行。
spark角色介绍
- 1、Driver - 运行客户端的main方法,创建sparkContext对象
- 2、Application - 一个应用程序,包括:driver的代码和任务计算所需要的资源
- 3、Master - 整个spark集群中的主节点,负责资源的调度和任务的分配
- 4、ClusterManager - 指的是在集群上获取资源的外部服务。目前有三种类型 - Standalone: spark原生的资源管理,由Master负责资源的分配- Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架- Hadoop Yarn: 主要是指Yarn中的ResourceManager
- 5、Worker - 就是具体干活的小弟- 在Standalone模式中指的是通过slaves文件配置的Worker节点- 在Spark on Yarn模式下就是NodeManager节点
- 6、executor - 是一个进程,它会在worker节点上启动一个进程
- 7、task - spark任务最后会以task线程的方式运行在worker节点的executor进程中
初识spark程序
- 1、普通模式提交(已经知道活着的master地址)
bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master spark://node1:7077 \--executor-memory 1G \--total-executor-cores 2 \examples/jars/spark-examples_2.11-2.0.2.jar \10
- 2、高可用模式提交(不确定活着的master是哪一个)
bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master spark://node1:7077,node2:7077,node3:7077 \--executor-memory 1G \--total-executor-cores 2 \examples/jars/spark-examples_2.11-2.0.2.jar \10此时指定master为一个列表:--master spark://node1:7077,node2:7077,node3:7077应用程序会轮询该master列表,找到活着的master,最后把任务提交给活着的master
spark-shell 使用
- 1、利用 spark-shell --master local[N] 读取本地数据文件实现单词统计- local[N] - local表示本地单机版运行程序,N表示一个正整数,表示本地采用几个线程去运行- 它会产生SparkSubmit进程
sc.textFile("file:///root/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
- 2、利用 spark-shell --master local[N] 读取HDFS上数据文件实现单词统计- spark整合HDFS- vi spark-env.sh
export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop``````sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
- 3、利用spark-shell 指定具体的master 读取HDFS上数据文件实现单词统计- 提交脚本
spark-shell --master spark://node1:7077 --executor-memory 1g --total-executor-cores 2``````sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
通过IDEA编写spark wordcount程序
1 利用IDEA开发spark wodcount程序(本地运行)
- 导包
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> </dependency> </dependencies>
- 开发实现
import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDD//todo:通过IDEA开发spark的wordcount程序object WordCount { def main(args: Array[String]): Unit = { //1、创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]") //2、创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler val sc = new SparkContext(sparkConf) //设置日志输出的级别 sc.setLogLevel("WARN") //3、读取数据文件 val data: RDD[String] = sc.textFile("D:\\words.txt") //4、切分每一行,获取所有的单词 val words: RDD[String] = data.flatMap(_.split(" ")) //5、每个单词记为1 val wordAndOne: RDD[(String, Int)] = words.map((_,1)) //6、相同单词出现的次数进行累加 val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) //按照单词出现的次数降序排列 val sortByRDD: RDD[(String, Int)] = result.sortBy(x=>x._2,false) //7、收集打印结果数据 val finalResult: Array[(String, Int)] = sortByRDD.collect() println(finalResult.toBuffer) //8、关闭sparkContext对象 sc.stop() }}
2 利用IDEA开发spark wodcount程序(集群运行)
- 代码开发
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
//todo:通过IDEA开发spark wordcount程序,打成jar包,提交到集群中去运行
object WordCount_Online {
def main(args: Array[String]): Unit = {
//1、创建SparkConf对象 设置appName
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount_Online")
//2、创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
val sc = new SparkContext(sparkConf)
//设置日志输出的级别
sc.setLogLevel("WARN")
//3、读取数据文件
val data: RDD[String] = sc.textFile(args(0))
//4、切分每一行,获取所有的单词
val words: RDD[String] = data.flatMap(_.split(" "))
//5、每个单词记为1
val wordAndOne: RDD[(String, Int)] = words.map((_,1))
//6、相同单词出现的次数进行累加
val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
//7、把结果数据保存到HDFS上
result.saveAsTextFile(args(1))
//8、关闭sparkContext对象
sc.stop()
}
}
- 打包运行
spark-submit --master spark://node1:7077 --class cn.包名.WordCount_Online --executor-memory 1g --total-executor-cores 2 original-spark_class06-2.0.jar /words.txt /out
3 利用IDEA开发spark wordCount程序(java开发)
- 开发实现
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;import java.util.List;//todo:利用java来实现spark的wordcount程序public class WordCount_Java { public static void main(String[] args) { //1、创建SparkConf SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]"); //2、创建SparkContext JavaSparkContext jsc = new JavaSparkContext(sparkConf); //3、读取数据文件 JavaRDD<String> data = jsc.textFile("d:\\words.txt"); //4、切分每一行,获取所有的单词 JavaRDD<String> wordsJavaRDD = data.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String line) throws Exception { String[] words = line.split(" "); return Arrays.asList(words).iterator(); } }); //5、每个单词记为1 JavaPairRDD<String, Integer> wordAndOneJavaPairRDD = wordsJavaRDD.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); //6、相同单词出现的次数累加 (hadoop,List(1,1,1,1)) JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOneJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //按照单词出现的次数降序排列 (单词,次数)----->(次数,单词).sortByKey(false)----->(单词,次数) JavaPairRDD<Integer, String> reverseJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception { return new Tuple2<Integer, String>(t._2, t._1); } }); JavaPairRDD<String, Integer> sortedJavaPairRDD = reverseJavaPairRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception { return new Tuple2<String, Integer>(t._2, t._1); } }); //7、收集打印 List<Tuple2<String, Integer>> finalResult = sortedJavaPairRDD.collect(); //遍历 快捷键 iter for (Tuple2<String, Integer> t : finalResult) { System.out.println("单词:"+t._1+" 次数:"+t._2); } //8、关闭jsc jsc.stop(); }}
本文转载自: https://blog.csdn.net/Emperor_rock/article/details/139435316
版权归原作者 帝乙岩 所有, 如有侵权,请联系我们删除。
版权归原作者 帝乙岩 所有, 如有侵权,请联系我们删除。