0


六十四、Spark-分别统计各个单词个数及特殊字符总个数

共享变量

广播变量(Broadcast Variables):广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本,简单理解:减少内存,减小计算压力;

累加器(Accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);

需求说明:以词频统计WordCount程序为例,处理特殊数据,包括非单词符号,做WordCount的同时统计出特殊字符的数量

原数据展示

  1. 注:原数据杂乱无章,与所需单词混淆,间隔且不等

业务逻辑

1、创建本地环境,并设置日志提示级别

  1. val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
  2. val sc: SparkContext = new SparkContext(conf)
  3. sc.setLogLevel("WARN")

2、创建累加器

  1. val mycounter: LongAccumulator = sc.longAccumulator("mycounter")

3、定义特殊字符集合

  1. val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")

4、将集合作为广播变量广播到各个节点

  1. val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)

5、加载数据,创建RDD

  1. val lines: RDD[String] = sc.textFile("data/input/words2.txt")

6、过滤筛选

  1. val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
  2. .flatMap(_.split("\\s+"))
  3. .filter(ch => {
  4. //获取广播数据
  5. val list: List[String] = broadcast.value
  6. if (list.contains(ch)) { //特殊字符
  7. mycounter.add(1)
  8. false
  9. } else { //单词
  10. true
  11. }
  12. }).map((_, 1))
  13. .reduceByKey(_ + _)

7、输出单词统计及特殊字符

  1. wordcountResult.foreach(println)
  2. val chResult: lang.Long = mycounter.value
  3. println("特殊字符的数量:" + chResult)

完整代码

  1. package org.example.spark
  2. import java.lang
  3. import org.apache.commons.lang3.StringUtils
  4. import org.apache.spark.broadcast.Broadcast
  5. import org.apache.spark.rdd.RDD
  6. import org.apache.spark.util.LongAccumulator
  7. import org.apache.spark.{SparkConf, SparkContext}
  8. /**
  9. * Author tuomasi
  10. * Desc 演示RDD的共享变量
  11. */
  12. object RDD_ShareVariable {
  13. def main(args: Array[String]): Unit = {
  14. //TODO 0.env/创建环境
  15. val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
  16. val sc: SparkContext = new SparkContext(conf)
  17. sc.setLogLevel("WARN")
  18. //创建计数器/累加器
  19. val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
  20. //定义一个特殊字符集合
  21. val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")
  22. //将集合作为广播变量广播到各个节点
  23. val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)
  24. //TODO 1.source/加载数据/创建RDD
  25. val lines: RDD[String] = sc.textFile("data/input/words2.txt")
  26. //TODO 2.transformation
  27. val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
  28. .flatMap(_.split("\\s+"))
  29. .filter(ch => {
  30. //获取广播数据
  31. val list: List[String] = broadcast.value
  32. if (list.contains(ch)) { //特殊字符
  33. mycounter.add(1)
  34. false
  35. } else { //单词
  36. true
  37. }
  38. }).map((_, 1))
  39. .reduceByKey(_ + _)
  40. //TODO 3.sink/输出
  41. wordcountResult.foreach(println)
  42. val chResult: lang.Long = mycounter.value
  43. println("特殊字符的数量:" + chResult)
  44. }
  45. }

程序运行

原数据:

控制台打印:

  1. 注:通过对比,该程序实现了单词与特殊字符的分别统计

项目总结

使用广播变量的好处:

1、Driver每次分发任务的时候会把task和计算逻辑的变量发送给Executor。不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。这样会导致消耗大量的内存导致严重的后果。
2、使用广播变量的好处,不需要每个task带上一份变量副本,而是变成每个节点的executor才一份副本。这样的话, 就可以让变量产生的副本大大减少;

标签: spark big data 运维

本文转载自: https://blog.csdn.net/m0_54925305/article/details/122699912
版权归原作者 托马斯-酷涛 所有, 如有侵权,请联系我们删除。

“六十四、Spark-分别统计各个单词个数及特殊字符总个数”的评论:

还没有评论