一、RDD的练习可以使用两种方式
- 使用Shell
- 使用IDEA
二、使用Shell练习RDD
当你打开 Spark 的交互式命令行界面(也就是 Spark shell)的时候,它已经自动为你准备好了一个叫做
sc
的特殊对象,这个对象是用来和 Spark 集群沟通的。你不需要,也不应该自己再创建一个这样的对象。
如果你想告诉 Spark 用哪个计算机或者计算机集群来执行你的命令,可以通过
--master
这个选项来设置。比如,你想在本地计算机上只用四个核心来运行,就可以在命令里加上
--master local[4]
。
$ ./bin/spark-shell --master local[4]
如果你有一些自己的代码打包成了 JAR 文件,想要在 Spark shell 里用,可以通过
--jars
选项,后面跟上你的 JAR 文件名,用逗号分隔,来把它们加入到可以识别的路径里。
$ ./bin/spark-shell --master local[4] --jars code.jar
此外,如果你需要一些额外的库或者 Spark 的扩展包,可以通过
--packages
选项,后面跟上这些库的 Maven 坐标(一种常用的依赖管理方式),用逗号分隔,来添加它们。假设你需要的包是
org.apache.spark:spark-mllib_2.13:3.4.1
,这是Spark的机器学习库。
$ ./bin/spark-shell --master local[4] --packages "org.apache.spark:spark-mllib_2.13:3.4.1"
简单来说,这些选项就是让你告诉 Spark 怎么运行你的代码,以及在哪里找到运行代码所需要的资源。
RDD基础
// 从array中创建RDD
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData.foreach(println)
// 读取文件创建RDD
val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
println(totalLength)
// 数据持久化
lineLengths.persist()
print(lineLengths.reduce((a, b) => a + b))
// 对象的函数
object MyFunctions {
def func1(s: String): String = { s"打印RDD中的字符串,包含的字符串有: $s" }
}
val myRdd = lines.flatMap(lines => lines.split(" "))
myRdd.map(MyFunctions.func1).foreach(println)
import org.apache.spark.rdd.RDD
// 类的函数
class MyClass extends Serializable {
def func1(s: String): String = { f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s" }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
val f1 = new MyClass()
f1.doStuff(myRdd).foreach(println)
// 类的应用
class MyClass2 extends Serializable {
val field = "你好,测试案例..."
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
val f2 = new MyClass2()
f2.doStuff(myRdd).foreach(println)
// Pair RDD应用
val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
// 交换键和值的位置
val swappedCounts = counts.map(_.swap)
// 先根据值排序(降序),然后根据键排序(升序)
val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
val CountsDescondvalue = sortedByValueThenKeyDesc .map(_.swap)
CountsDescondvalue .collect()
// 广播变量 Broadcast Variables
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value
三、使用IDEA练习RDD
基于Spark3.4.1,IDEA练习基础的RDD
package test
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/**
* @projectName GNUSpark20204
* @package test
* @className test.RDD_spark341
* @description ${description}
* @author pblh123
* @date 2024/9/26 23:08
* @version 1.0
*
*/
object RDD_spark341 extends App {
// 创建SparkSession sparkcontext
val spark = SparkSession.builder
.appName("RDD_spark341")
.master("local[2]")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
// spark代码主体
// 从array中创建RDD
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData.foreach(println)
// 读取文件创建RDD
val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
println(totalLength)
// 数据持久化
lineLengths.persist()
print(lineLengths.reduce((a, b) => a + b))
// 对象的函数
object MyFunctions {
def func1(s: String): String = {
s"打印RDD中的字符串,包含的字符串有: $s"
}
}
val myRdd = lines.flatMap(lines => lines.split(" "))
myRdd.map(MyFunctions.func1).foreach(println)
import org.apache.spark.rdd.RDD
// 类的函数
class MyClass extends Serializable {
def func1(s: String): String = {
f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s"
}
def doStuff(rdd: RDD[String]): RDD[String] = {
rdd.map(func1)
}
}
val f1 = new MyClass()
f1.doStuff(myRdd).foreach(println)
// 类的应用
class MyClass2 extends Serializable {
val field = "你好,测试案例..."
def doStuff(rdd: RDD[String]): RDD[String] = {
rdd.map(x => field + x)
}
}
val f2 = new MyClass2()
f2.doStuff(myRdd).foreach(println)
// Pair RDD应用
val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
// 交换键和值的位置
val swappedCounts = counts.map(_.swap)
// 先根据值排序(降序),然后根据键排序(升序)
val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
val CountsDescondvalue = sortedByValueThenKeyDesc.map(_.swap)
println(CountsDescondvalue.collect())
// 广播变量 Broadcast Variables
val broadcastVar = sc.broadcast(Array(1, 2, 3))
println(broadcastVar.value)
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
println(accum.value)
// 关闭sparkSesssion sparkcontext
sc.stop()
spark.stop()
}
版权归原作者 pblh123 所有, 如有侵权,请联系我们删除。