消除重复的数据可以通过使用 distinct 和 dropDuplicates 两个方法。
distinct数据去重
distinct 是所有的列进行去重的操作,假如你的 DataFrame里面有10列,那么只有这10列完全相同才会去重。
使用distinct:返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。
dropDuplicates()y有四个重载方法
- 第一个def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns) 这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。
def dropDuplicates(colNames: Array[String]): Dataset[T]= dropDuplicates(colNames.toSeq)
- 第二个def dropDuplicates(colNames: Seq[String]) 传入的参数是一个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进行去重,然后也是返回每一行数据出现的第一条
def dropDuplicates(colNames: Seq[String]): Dataset[T]= withTypedPlan {val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
val groupCols = colNames.toSet.toSeq.flatMap {(colName:String)=>// It is possibly there are more than one columns with the same name,// so we call filter instead of find.val cols = allColumns.filter(col => resolver(col.name, colName))if(cols.isEmpty){thrownew AnalysisException(s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")}
cols
}
Deduplicate(groupCols, planWithBarrier)}
- 第三个def dropDuplicates(colNames: Array[String]) 传入的参数是一个数组,然后方法会把数组转换为序列然后再调用第二个方法。
def dropDuplicates(colNames: Array[String]): Dataset[T]= dropDuplicates(colNames.toSeq)
- 第四个def dropDuplicates(col1: String, cols: String*) 传入的参数为字符串,在方法体内会把你传入的字符串组合成一个序列再调用第二个方法。
@scala.annotation.varargsdef dropDuplicates(col1:String, cols:String*): Dataset[T]={val colNames: Seq[String]= col1 +: cols
dropDuplicates(colNames)}
第三和第四个本质上还是调用了第二个方法,所以我们在使用的时候如果需要根据指定的列进行数据去重,可以直接传入一个Seq。
第一个方法默认根据所有列去重,实际上也是调用了第二个方法,然后传入参数this.columns,即所有的列组成的Seq。
所以各位想深究dropDuplicate()去重的核心代码,只需要研究第二个去重方法即可。
特别的
使用dropDuplicate时,可以先排序,去重时,会保留排第一的,如:
val sourceByTime = df.select("url","name","updatetime").repartition(col("url"))//这一步是为了解决下面注意事项里出现的问题.orderBy(desc("updatetime")).dropDuplicates(Seq("url"))
上面代码,会保留每个url最新的数据。
注意事项
用dropDuplicates可能会出现重复数据,原因:
数据存在多个excuter中,因为spark是分布式计算的,数据在计算的时候会分布在不同的excutor上,使用dropDuplicate去重的时候,可能只是一个excutor内的数据进行了去重,别的excutor上可能还会有重复的数据。
数据是存放在不同分区的,因为spark是分布式计算的,数据在计算的时候会分散在不同的分区中,使用dropDuplicate去重的时候,不同的区分可能还会存在相同的数据。
参考
https://www.cnblogs.com/Jaryer/p/13558701.html
https://blog.csdn.net/qq_39900031/article/details/115797287
https://www.yangch.net/archives/114
版权归原作者 Code_LT 所有, 如有侵权,请联系我们删除。