0


Spark 之WordCount

前言

学习任何一门语言,都是从helloword开始,对于大数据框架来说,则是从wordcount开始,spark也不例外,作为一门大数据处理框架,在系统的学习spark之前,让我们先从一个wordcount开始吧!

环境准备说明

Spark 由 Scala 语言开发的,所以本课件接下来的开发所使用的语言也为 Scala,咱们当 前使用的 Spark 版本为 3.0.0,默认采用的 Scala 编译版本为 2.12,所以后续开发时。我们依 然采用这个版本。开发前请保证 IDEA 开发工具中含有 Scala 开发插件;

maven 中添加基础依赖

<dependencies>
 <dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-core_2.12</artifactId>
 <version>3.0.0</version>
 </dependency>
</dependencies>

在工程的resources目录下,准备如下的一个文件,里面有一行行单词,我们将在下面的程序中统计出这个文件中各个单词的数量;

三种方式实现 wordcount

方式一

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object HelloWord {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf);

    // 读取文件数据
    val lines: RDD[String] = sc.textFile("E:\\code-self\\spi\\spark-test\\src\\main\\resources\\data1.txt")

    //字符串的拆分,整体拆分成个体数据【数据扁平化处理】
    val words: RDD[String] = lines.flatMap(_.split(" "))

    // 转换数据结构 word => (word, 1)
    val wordGroup : RDD[(String,Iterable[String])] = words.groupBy(word => word)

    //对分组后的数据进行转换
    val wordCount = wordGroup.map {
      case (word, list) => {
        (word, list.size)
      }
    }

    //将转换的结果采集并输出到控制台
    val array : Array[(String,Int)] = wordCount.collect()

    array.foreach(println)

    //关闭 Spark 连接
    sc.stop()
  }

}

运行上面的代码,控制台观察输出效果,

方式二

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object WordCount2 {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf);

    // 读取文件数据
    val lines: RDD[String] = sc.textFile("E:\\code-self\\spi\\spark-test\\src\\main\\resources\\data1.txt")

    //字符串的拆分,整体拆分成个体数据【数据扁平化处理】
    val words: RDD[String] = lines.flatMap(_.split(" "))

    val wordToOne = words.map(
      word => (word,1)
    )

    // 转换数据结构 word => (word, 1)
    val wordGroup = wordToOne.groupBy(
      t => t._1
    )

    //对分组后的数据进行转换
    val wordCount = wordGroup.map {
      case (word, list) => {

        list.reduce(
          (t1,t2) =>{
            (t1._1,t1._2 + t2._2)
          }
        )
      }
    }

    //将转换的结果采集并输出到控制台
    val array : Array[(String,Int)] = wordCount.collect()

    array.foreach(println)

    //关闭 Spark 连接
    sc.stop()
  }

}

运行上面的代码,观察控制台输出效果

方式三

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object WordCount3 {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf);

    // 读取文件数据
    val lines: RDD[String] = sc.textFile("E:\\code-self\\spi\\spark-test\\src\\main\\resources\\data1.txt")

    //字符串的拆分,整体拆分成个体数据【数据扁平化处理】
    val words: RDD[String] = lines.flatMap(_.split(" "))

    val wordToOne = words.map(
      word => (word,1)
    )

    //相同的key的数据对value做聚合
    //val wordCount = wordToOne.reduceByKey((x,y) => x + y)
    //val wordCount = wordToOne.reduceByKey(_+_)
    val wordCount = wordToOne.reduceByKey((x,y) => {x + y})

    //将转换的结果采集并输出到控制台
    val array : Array[(String,Int)] = wordCount.collect()

    array.foreach(println)

    //关闭 Spark 连接
    sc.stop()
  }

}

运行上面的代码,观察控制台输出效果

以上三种方式,最终能够达到统计单词的数量的目的,其中前面两种采用了比较常规的方式,即先对数据进行扁平化,拆分成一个个的单词,然后再对单词进行分组聚合,最后得到统计结果;

而第三种方式,则是使用了spark的map-reduce思想,直接通过reduceByKey这个方法,一次性的完成了对数据的分组聚合操作,省区了前面的步骤,属于一种更加优化的实现思路;

标签: Spark WordCount

本文转载自: https://blog.csdn.net/congge_study/article/details/124087482
版权归原作者 逆风飞翔的小叔 所有, 如有侵权,请联系我们删除。

“Spark 之WordCount”的评论:

还没有评论