0


【spark数据处理】文件上传+文件读取+文件筛选

环境介绍(hadoop、spark、jdk)

HDFS常用命令:
hadoop fs & hdfs dfs
注:path 为路径 src为文件路径 dist 为文件夹
-help[cmd] 显示命令的帮助信息
-ls(r) 显示当前目录下的所有文件 -R层层循出文件夹
-du(s) 显示目录中所有文件大小
-count[-q] 显示当前目录下的所有文件大小
-mv 移动多个文件目录到目标目录
-cp 复制多个文件到目标目录
-rm(r) 删除文件(夹)
-put 本地文件复制到hdfs
-copyFromLocal 本地文件复制到hdfs
-moveFromLocal 本地文件移动到hdfs
-get[-ignoreCrc] 复制文件到本地,可以忽略crc校验
-getmerge 将源目录中的所有文件排序合并到一个文件中
-appendToFile 将内容追加到指定文档中
-cat 在终端显示文件内容
-text 在终端显示文件内容
-copyToLocal[-ignoreCrc] 复制文件到本地
-moveToLocal 移动文件到本地
-mkdir 创建文件夹 后跟-p 可以创建不存在的父路径
-touchz 创建一个空文件

一、文件上传

hdfs上传:

hadoop fs -mkdir -p /data //创建目录

hadoop fs -put /data/wenjian1.csv /data //上传文件

(hadoop fs或者hdfs dfs上传都可以,命令区别不大)

二、修改文件名(根据自身情况而定)

这边建议有中文名的都改为英文名,有特殊字符的都改成标准的格式,以免后续报错

下面假设我们上传的文件名称为(文件1.csv)

hadoop fs -mv /data/文件1.csv /data/wenjian1.csv

三、多文件合并

(1)该命令为data1文件和data2文件末尾追加到data3.csv文件中,做到多文件合并成一个新的文件

hadoop fs -appendToFile /data/data1.csv /data/data2.csv /data/data3.csv

(2)在这个例子中,

file1.txt

的内容会首先出现在

merged_data.txt

中,然后是

file2.txt

的内容,最后是

file3.txt

的内容。达成合并文件目的

hdfs dfs -getmerge /user/data/file1.txt /user/data/file2.txt /user/data/file3.txt merged_data.txt

四、读取文件

  1. Text文件:这是最基本的文件格式,Spark可以通过sc.textFile("path")来读取Text文件,并且可以使用saveAsTextFile(path)将RDD保存为Text文件。
  2. Json文件:对于Json文件,Spark提供了多种方式来读取和解析,例如使用SparkSQL或第三方库如fastjson。读取Json文件时,通常需要解析其Json格式。
  3. Sequence文件:这是针对key-value类型RDD的文件格式。可以通过sc.sequenceFile[KeyClass, ValueClass]("path")来读取,并可以创建RDD保存为Sequence文件。
  4. Object文件:可以将pairRDD保存为Object文件,并使用sc.objectFile[KeyClass, ValueClass]("path")来读取
  5. Csv文件:可以用spark.read.csv()来实现读取

val df = spark.read.option("header", "true").option("sep", ",").csv("/data/wenjian1.csv")

  1. **val df =**:这是Scala中的声明变量的方式。这里声明了一个名为df的变量,它将被赋予一个DataFrame类型的值。
  2. **spark.read**:spark是一个SparkSession的实例,它是Spark所有功能的入口点。read方法用于从外部数据源读取数据。
  3. **.option("header", "true")**:这是为读取操作设置的一个选项。header选项指定CSV文件的第一行是否包含列名。在这个例子中,我们设置header"true",意味着CSV文件的第一行是列名,并且这些列名将作为DataFrame的列名。
  4. **.option("sep", ",")**:这也是为读取操作设置的一个选项。sep选项指定字段之间的分隔符。在这个例子中,我们设置分隔符为逗号(,),这是CSV文件的标准分隔符。
  5. .csv("/data/wenjian1.csv"**)**:这指定了要读取的CSV文件的路径。在这个例子中,文件位于/data/目录下,名为wenjian1.csv

五、显示数据

show()显示语句

df.show(5)显示前五行数据

六、处理数据

常用语句

select 选择 :可以选择指定行,指定类,并且可以重名

where 筛选 :给筛选的列值等于给定值的行

filter 筛选:可以对数据进行筛选

groupby 分组:对数据进行分组

distinct 去重 :对重复数据进行清洗

select:

对“学校”列进行去重:

val uniquewenjian = df.select("学校").distinct()

选择指定列

val df1 = df.select("columnName1", "columnName2")

选择指定列,并重命名

val df1 =df.select($"columnName",$"columnName")

where:

筛选指定列值等于给定值的行

val df1= df.where("columnName = 'value'")

fitter:

筛选列值大于 5的行

val df1 = df.filter("columnName > 5")

groupBy():

按指定列分组,并对每组 进行聚合计数

val df1 = df.groupBy("columnName").agg(count("columnName"))

对多列进行分组,并对每组进行聚合计数,这段代码首先创建了一个SparkSession实例,然后创建了一个包含三列("columnName1", "columnName2", "col")的DataFrame。之后,使用

groupBy

方法按照"columnName1"和"columnName2"进行分组,然后调用

agg

方法进行聚合,并使用

mean

函数计算"col"列的平均值,将结果列命名为"m"。最后,使用

show

方法打印出分组聚合后的结果。

val df1 = df.groupBy("columnName1", "columnName2").agg(mean("student").alias("s1"))

(补充)以下是一些常用的聚合函数及其在DataFrame中的应用:

  1. count(): 用于计算分组中的记录数。
  2. sum(): 计算分组中某列的总和。
  3. avg(): 计算分组中某列的平均值。
  4. max(): 获取分组中某列的最大值。
  5. min(): 获取分组中某列的最小值。
  6. collect_list() 和 collect_set(): 分别收集分组中某列的所有值到一个列表或集合中,去除重复项。
  7. first() 和 last(): 分别获取分组中某列的第一个和最后一个值。
  8. group_concat(): 连接分组中某列的所有值。注意,这不是Spark SQL的内置函数,但可以使用concat_ws函数达到类似的效果。
  9. approx_count_distinct(): 计算近似不同的值数,这对于大数据集来说通常比count(DISTINCT column)更快,但结果可能不是精确的。
val countDF = df.groupBy("columnName").agg(count("*").alias("count"))
val sumDF = df.groupBy("columnName").agg(sum("numericColumn").alias("sum"))
val avgDF = df.groupBy("columnName").agg(avg("numericColumn").alias("avg"))
val maxDF = df.groupBy("columnName").agg(max("numericColumn").alias("max"))
val minDF = df.groupBy("columnName").agg(min("numericColumn").alias("min"))
val listDF = df.groupBy("columnName").agg(collect_list("anotherColumn").alias("list"))  
val setDF = df.groupBy("columnName").agg(collect_set("anotherColumn").alias("set"))
val firstDF = df.groupBy("columnName").agg(first("anotherColumn").alias("firstValue"))  
val lastDF = df.groupBy("columnName").agg(last("anotherColumn").alias("lastValue"))
val concatDF = df.groupBy("columnName").agg(concat_ws(",", collect_list("anotherColumn")).alias("concatenated"))
val approxCountDF = df.groupBy("columnName").agg(approx_count_distinct("anotherColumn").alias("approxCount"))

distinct:

获取指定列的唯一值

df.select("columnName").distinct()


本文转载自: https://blog.csdn.net/m0_71868960/article/details/137760218
版权归原作者 sc.溯琛 所有, 如有侵权,请联系我们删除。

“【spark数据处理】文件上传+文件读取+文件筛选”的评论:

还没有评论