前言
学习任何一门语言,都是从helloword开始,对于大数据框架来说,则是从wordcount开始,spark也不例外,作为一门大数据处理框架,在系统的学习spark之前,让我们先从一个wordcount开始吧!
环境准备说明
Spark 由 Scala 语言开发的,所以本课件接下来的开发所使用的语言也为 Scala,咱们当 前使用的 Spark 版本为 3.0.0,默认采用的 Scala 编译版本为 2.12,所以后续开发时。我们依 然采用这个版本。开发前请保证 IDEA 开发工具中含有 Scala 开发插件;
maven 中添加基础依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
在工程的resources目录下,准备如下的一个文件,里面有一行行单词,我们将在下面的程序中统计出这个文件中各个单词的数量;
三种方式实现 wordcount
方式一
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object HelloWord {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf);
// 读取文件数据
val lines: RDD[String] = sc.textFile("E:\\code-self\\spi\\spark-test\\src\\main\\resources\\data1.txt")
//字符串的拆分,整体拆分成个体数据【数据扁平化处理】
val words: RDD[String] = lines.flatMap(_.split(" "))
// 转换数据结构 word => (word, 1)
val wordGroup : RDD[(String,Iterable[String])] = words.groupBy(word => word)
//对分组后的数据进行转换
val wordCount = wordGroup.map {
case (word, list) => {
(word, list.size)
}
}
//将转换的结果采集并输出到控制台
val array : Array[(String,Int)] = wordCount.collect()
array.foreach(println)
//关闭 Spark 连接
sc.stop()
}
}
运行上面的代码,控制台观察输出效果,
方式二
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object WordCount2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf);
// 读取文件数据
val lines: RDD[String] = sc.textFile("E:\\code-self\\spi\\spark-test\\src\\main\\resources\\data1.txt")
//字符串的拆分,整体拆分成个体数据【数据扁平化处理】
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne = words.map(
word => (word,1)
)
// 转换数据结构 word => (word, 1)
val wordGroup = wordToOne.groupBy(
t => t._1
)
//对分组后的数据进行转换
val wordCount = wordGroup.map {
case (word, list) => {
list.reduce(
(t1,t2) =>{
(t1._1,t1._2 + t2._2)
}
)
}
}
//将转换的结果采集并输出到控制台
val array : Array[(String,Int)] = wordCount.collect()
array.foreach(println)
//关闭 Spark 连接
sc.stop()
}
}
运行上面的代码,观察控制台输出效果
方式三
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object WordCount3 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf);
// 读取文件数据
val lines: RDD[String] = sc.textFile("E:\\code-self\\spi\\spark-test\\src\\main\\resources\\data1.txt")
//字符串的拆分,整体拆分成个体数据【数据扁平化处理】
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne = words.map(
word => (word,1)
)
//相同的key的数据对value做聚合
//val wordCount = wordToOne.reduceByKey((x,y) => x + y)
//val wordCount = wordToOne.reduceByKey(_+_)
val wordCount = wordToOne.reduceByKey((x,y) => {x + y})
//将转换的结果采集并输出到控制台
val array : Array[(String,Int)] = wordCount.collect()
array.foreach(println)
//关闭 Spark 连接
sc.stop()
}
}
运行上面的代码,观察控制台输出效果
以上三种方式,最终能够达到统计单词的数量的目的,其中前面两种采用了比较常规的方式,即先对数据进行扁平化,拆分成一个个的单词,然后再对单词进行分组聚合,最后得到统计结果;
而第三种方式,则是使用了spark的map-reduce思想,直接通过reduceByKey这个方法,一次性的完成了对数据的分组聚合操作,省区了前面的步骤,属于一种更加优化的实现思路;
版权归原作者 逆风飞翔的小叔 所有, 如有侵权,请联系我们删除。