一、本地文件系统的数据读写
1,从文件中读取数据创建RDD
从本地文件系统读取数据,可以采用textFile()方法,可以为textFile()方法提供一个本地文件或目录地址,如果是一个文件地址,它会加载该文件,如果是一个目录地址,它会加载该目录下的所有文件的数据。
示例:读取一个本地文件word.txt
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
val textFile中的textFile是变量名称,sc.textFile()中的textFile是方法名称,二者同时使用时要注意区分,它们所代表的含义是不同的。执行上面这条命令以后,并不会马上显示结果
Spark采用惰性机制。可以查看textFile中的内容:
scala> textFile.first()
正因为Spark采用了惰性机制,在执行转换操作的时候,即使输入了错误的语句,spark-shell也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来。
示例:
使用一个根本就不存在的word123.txt,执行上面语句时,spark-shell根本不会报错,因为,没有遇到“行动”类型的操作first()之前,这个加载操作是不会真正执行的。
scala> val textFile = sc.| textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")
2,把RDD写入到文本文件中
可以使用saveAsTextFile()方法把RDD中的数据保存到文本文件中。需要注意的是,saveAsTextFile()中提供的参数,不是文件名称,而是一个目录名称,因为,Spark通常是在分布式环境下执行,RDD会存在多个分区,由多个任务对这些分区进行并行计算,每个分区的计算结果都会保存到一个单独的文件中。例如,如果RDD有3个分区,saveAsTextFile()方法就会产生part-00001、part-00002和part-00003,以及一个_SUCCESS文件,其中,part-00001、part-00002和part-00003包含了RDD中的数据,_SUCCESS文件只是用来表示写入操作已经成功执行,该文件里面是空的,可以忽略该文件。因此,在Spark编程中,需要改变传统单机环境下编程的思维习惯,在单机编程中,我们已经习惯把数据保存到一个文件中,而作为分布式编程框架,因为RDD被分成多个分区,由多个任务并行执行计算,Spark通常都会产生多个文件,我们需要为这些文件提供一个保存目录,因此,需要为saveAsTextFile()方法提供一个目录地址,而不是一个文件地址。saveAsTextFile()要求提供一个事先不存在的保存目录,如果事先已经存在该目录,Spark就会报错。所以,如果是在独立应用程序中执行,最好在程序执行saveAsTextFile()之前先判断一下目录是否存在。
把textFile变量中的内容再次写回到另外一个目录wordback中,命令如下:
scala> val textFile = sc.| textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
scala> textFile.| saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")
打开一个新的Linux终端,进入到“/usr/local/spark/mycode/wordcount/”目录,可以看到这个目录下面多了一个名称为“writeback”的子目录。进入writeback子目录以后,可以看到该目录中生成了两个文件part-00000和_SUCCESS(可以忽略),part-00000文件就包含了刚才写入的数据。之所以writeback目录下只包含一个文件part-00000,而不是多个part文件。
在启动进入Spark Shell环境:
$ cd /usr/local/spark
$ ./bin/spark-shell
spark-shell命令后面没有带上任何参数,则系统默认采用local模式启动spark-shell,即只使用一个Worker线程本地化运行Spark(完全不并行)。而且,在读取文件时,sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)语句的圆括号中的参数只有文件地址,并没有包含分区数量,因此,生成的textFile这个RDD就只有一个分区,这样导致saveAsTextFile()最终生成的文件就只有一个part-00000。
在读取文件时进行分区:
scala> val textFile = sc.| textFile("file:///usr/local/spark/mycode/wordcount/word.txt",2)
scala> textFile.| saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")
可以在writeback子目录下看到两个part文件,即part-00000和part-00001。
二、分布式文件系统HDFS的数据读写
从分布式文件系统HDFS中读取数据,也是采用textFile()方法,可以为textFile()方法提供一个HDFS文件或目录地址,如果是一个文件地址,它会加载该文件,如果是一个目录地址,它会加载该目录下的所有文件的数据。
读取一个HDFS文件:
scala> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> textFile.first()
为textFile()方法提供的文件地址格式可以有多种,如下3条语句都是等价的:
scala> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> val textFile = sc.textFile("/user/hadoop/word.txt")
scala> val textFile = sc.textFile("word.txt")
可以使用saveAsTextFile()方法把RDD中的数据保存到HDFS文件中:
scala> val textFile = sc.textFile("word.txt")
scala> textFile.saveAsTextFile("writeback")
三、JSON文件的读取
JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,它基于ECMAScript规范的一个子集,采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清晰的层次结构使得JSON成为理想的数据交换语言,不仅易于阅读和编写,同时也易于机器解析和生成,并能够有效提升网络传输效率。Spark提供了一个JSON样例数据文件,存放在“/usr/local/spark/examples/src/main/resources/people.json”中(注意,“/usr/local/spark/”是Spark的安装目录)。
people.json文件内容如下:
{"name":"Michael"}{"name":"Andy","age":30}{"name":"Justin","age":19}
Scala中有一个自带的JSON库——scala.util.parsing.json.JSON,可以实现对JSON数据的解析,JSON.parseFull(jsonString:String)函数以一个JSON字符串作为输入并进行解析,如果解析成功,则返回一个Some(map: Map[String,Any]),如果解析失败,则返回None。
新建一个JSONRead.scala代码文件,输入以下内容:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
._ import org.apache.spark.SparkConf import scala.util.parsing.json.JSON
object JSONRead {
def main(args: Array[String]){
val inputFile = "file:///usr/local/spark/examples/src/main/resources/people. json"
val conf = new SparkConf().setAppName("JSONRead")
val sc = new SparkContext(conf)
val jsonStrs = sc.textFile(inputFile)
val result = jsonStrs.map(s => JSON.parseFull(s))
result.foreach({
r => r match {
case Some(map: Map[String, Any]) => println(map)
case None =>println("Parsing failed")
case other => println("Unknown data structure: "+ other)}})}}
val jsonStrs = sc.textFile(inputFile)语句执行后,会生成一个名称为jsonStrs的RDD,这个RDD中的每个元素都是来自people.json文件中的一行,即一个JSON字符串。valresult =jsonStrs.map(s => JSON.parseFull(s))语句会对jsonStrs中的每个元素(即每个JSON字符串)进行解析,解析后的结果,存放到一个新的RDD(即result)中。如果解析成功,则返回一个Some(map: Map[String,Any]),如果解析失败,则返回None。所以,result中的元素,或者是一个Some(map: Map[String, Any]),或者是一个None。result.foreach()语句执行时,会依次扫描result中的每个元素,并对当前取出的元素进行模式匹配,如果是一个Some(map: Map[String, Any]),就打印出来,如果是一个None,就打印出“Parsing failed”。
使用sbt工具把JSONRead.scala代码文件编译打包成JAR包,通过spark-submit运行程序,屏幕上会输出如下信息:
Map(name -> Michael)
Map(name -> Andy, age -> 30.0)
Map(name -> Justin, age -> 19.0)
文章来源:《Spark编程基础》 作者:林子雨
文章内容仅供学习交流,如有侵犯,联系删除哦!
版权归原作者 晓之以理的喵~~ 所有, 如有侵权,请联系我们删除。