0


2023_Spark_实验十一:RDD基础算子操作

一、RDD的练习可以使用两种方式

  1. 使用Shell
  2. 使用IDEA

二、使用Shell练习RDD

当你打开 Spark 的交互式命令行界面(也就是 Spark shell)的时候,它已经自动为你准备好了一个叫做

  1. sc

的特殊对象,这个对象是用来和 Spark 集群沟通的。你不需要,也不应该自己再创建一个这样的对象。

如果你想告诉 Spark 用哪个计算机或者计算机集群来执行你的命令,可以通过

  1. --master

这个选项来设置。比如,你想在本地计算机上只用四个核心来运行,就可以在命令里加上

  1. --master local[4]

  1. $ ./bin/spark-shell --master local[4]

如果你有一些自己的代码打包成了 JAR 文件,想要在 Spark shell 里用,可以通过

  1. --jars

选项,后面跟上你的 JAR 文件名,用逗号分隔,来把它们加入到可以识别的路径里。

  1. $ ./bin/spark-shell --master local[4] --jars code.jar

此外,如果你需要一些额外的库或者 Spark 的扩展包,可以通过

  1. --packages

选项,后面跟上这些库的 Maven 坐标(一种常用的依赖管理方式),用逗号分隔,来添加它们。假设你需要的包是

  1. org.apache.spark:spark-mllib_2.13:3.4.1

,这是Spark的机器学习库。

  1. $ ./bin/spark-shell --master local[4] --packages "org.apache.spark:spark-mllib_2.13:3.4.1"

简单来说,这些选项就是让你告诉 Spark 怎么运行你的代码,以及在哪里找到运行代码所需要的资源。

RDD基础

  1. // 从array中创建RDD
  2. val data = Array(1, 2, 3, 4, 5)
  3. val distData = sc.parallelize(data)
  4. distData.foreach(println)
  5. // 读取文件创建RDD
  6. val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
  7. val lineLengths = lines.map(s => s.length)
  8. val totalLength = lineLengths.reduce((a, b) => a + b)
  9. println(totalLength)
  10. // 数据持久化
  11. lineLengths.persist()
  12. print(lineLengths.reduce((a, b) => a + b))
  13. // 对象的函数
  14. object MyFunctions {
  15. def func1(s: String): String = { s"打印RDD中的字符串,包含的字符串有: $s" }
  16. }
  17. val myRdd = lines.flatMap(lines => lines.split(" "))
  18. myRdd.map(MyFunctions.func1).foreach(println)
  19. import org.apache.spark.rdd.RDD
  20. // 类的函数
  21. class MyClass extends Serializable {
  22. def func1(s: String): String = { f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s" }
  23. def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
  24. }
  25. val f1 = new MyClass()
  26. f1.doStuff(myRdd).foreach(println)
  27. // 类的应用
  28. class MyClass2 extends Serializable {
  29. val field = "你好,测试案例..."
  30. def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
  31. }
  32. val f2 = new MyClass2()
  33. f2.doStuff(myRdd).foreach(println)
  34. // Pair RDD应用
  35. val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
  36. val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
  37. val counts = pairs.reduceByKey((a, b) => a + b)
  38. // 交换键和值的位置
  39. val swappedCounts = counts.map(_.swap)
  40. // 先根据值排序(降序),然后根据键排序(升序)
  41. val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
  42. val CountsDescondvalue = sortedByValueThenKeyDesc .map(_.swap)
  43. CountsDescondvalue .collect()
  44. // 广播变量 Broadcast Variables
  45. val broadcastVar = sc.broadcast(Array(1, 2, 3))
  46. broadcastVar.value
  47. val accum = sc.longAccumulator("My Accumulator")
  48. sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
  49. accum.value

三、使用IDEA练习RDD

基于Spark3.4.1,IDEA练习基础的RDD

  1. package test
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.sql.SparkSession
  4. /**
  5. * @projectName GNUSpark20204
  6. * @package test
  7. * @className test.RDD_spark341
  8. * @description ${description}
  9. * @author pblh123
  10. * @date 2024/9/26 23:08
  11. * @version 1.0
  12. *
  13. */
  14. object RDD_spark341 extends App {
  15. // 创建SparkSession sparkcontext
  16. val spark = SparkSession.builder
  17. .appName("RDD_spark341")
  18. .master("local[2]")
  19. .getOrCreate()
  20. val sc: SparkContext = spark.sparkContext
  21. // spark代码主体
  22. // 从array中创建RDD
  23. val data = Array(1, 2, 3, 4, 5)
  24. val distData = sc.parallelize(data)
  25. distData.foreach(println)
  26. // 读取文件创建RDD
  27. val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
  28. val lineLengths = lines.map(s => s.length)
  29. val totalLength = lineLengths.reduce((a, b) => a + b)
  30. println(totalLength)
  31. // 数据持久化
  32. lineLengths.persist()
  33. print(lineLengths.reduce((a, b) => a + b))
  34. // 对象的函数
  35. object MyFunctions {
  36. def func1(s: String): String = {
  37. s"打印RDD中的字符串,包含的字符串有: $s"
  38. }
  39. }
  40. val myRdd = lines.flatMap(lines => lines.split(" "))
  41. myRdd.map(MyFunctions.func1).foreach(println)
  42. import org.apache.spark.rdd.RDD
  43. // 类的函数
  44. class MyClass extends Serializable {
  45. def func1(s: String): String = {
  46. f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s"
  47. }
  48. def doStuff(rdd: RDD[String]): RDD[String] = {
  49. rdd.map(func1)
  50. }
  51. }
  52. val f1 = new MyClass()
  53. f1.doStuff(myRdd).foreach(println)
  54. // 类的应用
  55. class MyClass2 extends Serializable {
  56. val field = "你好,测试案例..."
  57. def doStuff(rdd: RDD[String]): RDD[String] = {
  58. rdd.map(x => field + x)
  59. }
  60. }
  61. val f2 = new MyClass2()
  62. f2.doStuff(myRdd).foreach(println)
  63. // Pair RDD应用
  64. val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
  65. val counts = pairs.reduceByKey((a, b) => a + b)
  66. // 交换键和值的位置
  67. val swappedCounts = counts.map(_.swap)
  68. // 先根据值排序(降序),然后根据键排序(升序)
  69. val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
  70. val CountsDescondvalue = sortedByValueThenKeyDesc.map(_.swap)
  71. println(CountsDescondvalue.collect())
  72. // 广播变量 Broadcast Variables
  73. val broadcastVar = sc.broadcast(Array(1, 2, 3))
  74. println(broadcastVar.value)
  75. val accum = sc.longAccumulator("My Accumulator")
  76. sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
  77. println(accum.value)
  78. // 关闭sparkSesssion sparkcontext
  79. sc.stop()
  80. spark.stop()
  81. }

本文转载自: https://blog.csdn.net/pblh123/article/details/142578067
版权归原作者 pblh123 所有, 如有侵权,请联系我们删除。

“2023_Spark_实验十一:RDD基础算子操作”的评论:

还没有评论