环境介绍(hadoop、spark、jdk)
HDFS常用命令:
hadoop fs & hdfs dfs
注:path 为路径 src为文件路径 dist 为文件夹
-help[cmd] 显示命令的帮助信息
-ls(r) 显示当前目录下的所有文件 -R层层循出文件夹
-du(s) 显示目录中所有文件大小
-count[-q] 显示当前目录下的所有文件大小
-mv 移动多个文件目录到目标目录
-cp 复制多个文件到目标目录
-rm(r) 删除文件(夹)
-put 本地文件复制到hdfs
-copyFromLocal 本地文件复制到hdfs
-moveFromLocal 本地文件移动到hdfs
-get[-ignoreCrc] 复制文件到本地,可以忽略crc校验
-getmerge 将源目录中的所有文件排序合并到一个文件中
-appendToFile 将内容追加到指定文档中
-cat 在终端显示文件内容
-text 在终端显示文件内容
-copyToLocal[-ignoreCrc] 复制文件到本地
-moveToLocal 移动文件到本地
-mkdir 创建文件夹 后跟-p 可以创建不存在的父路径
-touchz 创建一个空文件
一、文件上传
hdfs上传:
hadoop fs -mkdir -p /data //创建目录
hadoop fs -put /data/wenjian1.csv /data //上传文件
(hadoop fs或者hdfs dfs上传都可以,命令区别不大)
二、修改文件名(根据自身情况而定)
这边建议有中文名的都改为英文名,有特殊字符的都改成标准的格式,以免后续报错
下面假设我们上传的文件名称为(文件1.csv)
hadoop fs -mv /data/文件1.csv /data/wenjian1.csv
三、多文件合并
(1)该命令为data1文件和data2文件末尾追加到data3.csv文件中,做到多文件合并成一个新的文件
hadoop fs -appendToFile /data/data1.csv /data/data2.csv /data/data3.csv
(2)在这个例子中,
file1.txt
的内容会首先出现在
merged_data.txt
中,然后是
file2.txt
的内容,最后是
file3.txt
的内容。达成合并文件目的
hdfs dfs -getmerge /user/data/file1.txt /user/data/file2.txt /user/data/file3.txt merged_data.txt
四、读取文件
- Text文件:这是最基本的文件格式,Spark可以通过
sc.textFile("path")
来读取Text文件,并且可以使用saveAsTextFile(path)
将RDD保存为Text文件。 - Json文件:对于Json文件,Spark提供了多种方式来读取和解析,例如使用SparkSQL或第三方库如fastjson。读取Json文件时,通常需要解析其Json格式。
- Sequence文件:这是针对key-value类型RDD的文件格式。可以通过
sc.sequenceFile[KeyClass, ValueClass]("path")
来读取,并可以创建RDD保存为Sequence文件。 - Object文件:可以将pairRDD保存为Object文件,并使用
sc.objectFile[KeyClass, ValueClass]("path")
来读取 - Csv文件:可以用spark.read.csv()来实现读取
val df = spark.read.option("header", "true").option("sep", ",").csv("/data/wenjian1.csv")
- **
val df =
**:这是Scala中的声明变量的方式。这里声明了一个名为df
的变量,它将被赋予一个DataFrame类型的值。 - **
spark.read
**:spark
是一个SparkSession的实例,它是Spark所有功能的入口点。read
方法用于从外部数据源读取数据。 - **
.option("header", "true")
**:这是为读取操作设置的一个选项。header
选项指定CSV文件的第一行是否包含列名。在这个例子中,我们设置header
为"true"
,意味着CSV文件的第一行是列名,并且这些列名将作为DataFrame的列名。 - **
.option("sep", ",")
**:这也是为读取操作设置的一个选项。sep
选项指定字段之间的分隔符。在这个例子中,我们设置分隔符为逗号(,
),这是CSV文件的标准分隔符。 .csv(
"/data/wenjian1.csv"**)
**:这指定了要读取的CSV文件的路径。在这个例子中,文件位于/data/
目录下,名为wenjian1.csv
。
五、显示数据
show()显示语句
df.show(5)显示前五行数据
六、处理数据
常用语句
select 选择 :可以选择指定行,指定类,并且可以重名
where 筛选 :给筛选的列值等于给定值的行
filter 筛选:可以对数据进行筛选
groupby 分组:对数据进行分组
distinct 去重 :对重复数据进行清洗
select:
对“学校”列进行去重:
val uniquewenjian = df.select("学校").distinct()
选择指定列
val df1 = df.select("columnName1", "columnName2")
选择指定列,并重命名
val df1 =df.select($"columnName",$"columnName")
where:
筛选指定列值等于给定值的行
val df1= df.where("columnName = 'value'")
fitter:
筛选列值大于 5的行
val df1 = df.filter("columnName > 5")
groupBy():
按指定列分组,并对每组 进行聚合计数
val df1 = df.groupBy("columnName").agg(count("columnName"))
对多列进行分组,并对每组进行聚合计数,这段代码首先创建了一个SparkSession实例,然后创建了一个包含三列("columnName1", "columnName2", "col")的DataFrame。之后,使用
groupBy
方法按照"columnName1"和"columnName2"进行分组,然后调用
agg
方法进行聚合,并使用
mean
函数计算"col"列的平均值,将结果列命名为"m"。最后,使用
show
方法打印出分组聚合后的结果。
val df1 = df.groupBy("columnName1", "columnName2").agg(mean("student").alias("s1"))
(补充)以下是一些常用的聚合函数及其在DataFrame中的应用:
- count(): 用于计算分组中的记录数。
- sum(): 计算分组中某列的总和。
- avg(): 计算分组中某列的平均值。
- max(): 获取分组中某列的最大值。
- min(): 获取分组中某列的最小值。
- collect_list() 和 collect_set(): 分别收集分组中某列的所有值到一个列表或集合中,去除重复项。
- first() 和 last(): 分别获取分组中某列的第一个和最后一个值。
- group_concat(): 连接分组中某列的所有值。注意,这不是Spark SQL的内置函数,但可以使用
concat_ws
函数达到类似的效果。- approx_count_distinct(): 计算近似不同的值数,这对于大数据集来说通常比
count(DISTINCT column)
更快,但结果可能不是精确的。
val countDF = df.groupBy("columnName").agg(count("*").alias("count"))
val sumDF = df.groupBy("columnName").agg(sum("numericColumn").alias("sum"))
val avgDF = df.groupBy("columnName").agg(avg("numericColumn").alias("avg"))
val maxDF = df.groupBy("columnName").agg(max("numericColumn").alias("max"))
val minDF = df.groupBy("columnName").agg(min("numericColumn").alias("min"))
val listDF = df.groupBy("columnName").agg(collect_list("anotherColumn").alias("list"))
val setDF = df.groupBy("columnName").agg(collect_set("anotherColumn").alias("set"))
val firstDF = df.groupBy("columnName").agg(first("anotherColumn").alias("firstValue"))
val lastDF = df.groupBy("columnName").agg(last("anotherColumn").alias("lastValue"))
val concatDF = df.groupBy("columnName").agg(concat_ws(",", collect_list("anotherColumn")).alias("concatenated"))
val approxCountDF = df.groupBy("columnName").agg(approx_count_distinct("anotherColumn").alias("approxCount"))
distinct:
获取指定列的唯一值
df.select("columnName").distinct()
版权归原作者 sc.溯琛 所有, 如有侵权,请联系我们删除。