0


spark数据清洗练习

文章目录

通过编写Spark程序清洗酒店数据里的缺失数据、非法数据、重复数据

准备工作

  1. 搭建 hadoop 伪分布或 hadoop 完全分布
  2. 上传 hotal_data.csv 文件到 hadoop
  3. idea 配置好 scala 环境

删除缺失值 >= 3 的数据

  1. 读取 /hotel_data.csv
  2. 删除缺失值 >= 3 的数据, 打印剔除的数量
  3. 将清洗后的数据保存为/hotelsparktask1
importorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo01 {def main(args: Array[String]):Unit={// System.setProperty("HADOOP_USER_NAME", "root")//解决保存文件权限不够的问题val config: SparkConf =new SparkConf().setMaster("local[1]").setAppName("1")val sc =new SparkContext(config)val hdfsUrl ="hdfs://192.168.226.129:9000"val filePath:String= hdfsUrl+"/file3_1/hotel_data.csv"val data: RDD[Array[String]]= sc.textFile(filePath).map(_.split(",")).cache()val total:Long= data.count()val dataDrop: RDD[Array[String]]= data.filter(_.count(_.equals("NULL"))<=3)
    println("删除的数据条目有: "+(total - dataDrop.count()))
    dataDrop.map(_.mkString(",")).saveAsTextFile(hdfsUrl+"/hotelsparktask1")
    sc.stop()}}

删除星级、评论数、评分中任意字段为空的数据

  1. 读取 /hotel_data.csv
  2. 将字段{星级、评论数、评分}中任意字段为空的数据删除, 打印剔除的数量
  3. 保存 /hotelsparktask2
importorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo02 {def main(args: Array[String]):Unit={
    System.setProperty("HADOOP_USER_NAME","root")val config: SparkConf =new SparkConf().setMaster("local[1]").setAppName("2")val sc =new SparkContext(config)val hdfsUrl ="hdfs://192.168.226.129:9000"val filePath:String= hdfsUrl+"/file3_1/hotel_data.csv"val data: RDD[Array[String]]= sc.textFile(filePath).map(_.split(",")).cache()val total:Long= data.count()val dataDrop: RDD[Array[String]]= data.filter {
      arr: Array[String]=>!(arr(6).equals("NULL")|| arr(10).equals("NULL")|| arr(11).equals("NULL"))}
    println("删除的数据条目有: "+(total - dataDrop.count()))
    dataDrop.map(_.mkString(",")).saveAsTextFile(hdfsUrl+"/hotelsparktask2")
    sc.stop()}}

删除非法数据

  1. 读取第一题的 /hotelsparktask1
  2. 剔除数据集中评分和星级字段的非法数据,合法数据是评分[0,5]的实数,星级是指星级字段内容中包含 NULL、二星、三星、四星、五星的数据
  3. 剔除数据集中的重复数据
  4. 分别打印 删除含有非法评分、星级以及重复的数据条目数
  5. 保存 /hotelsparktask3
importorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo03 {def main(args: Array[String]):Unit={
    System.setProperty("HADOOP_USER_NAME","root")//解决权限问题val config: SparkConf =new SparkConf().setMaster("local[1]").setAppName("3")val sc =new SparkContext(config)val hdfsUrl ="hdfs://192.168.226.129:9000"val filePath:String= hdfsUrl+"/hotelsparktask1"val lines: RDD[String]= sc.textFile(filePath).cache()val data: RDD[Array[String]]= lines.map(_.split(","))val total:Long= data.count()val dataDrop: RDD[Array[String]]= data.filter {
      arr: Array[String]=>try{(arr(10).toDouble >=0)&&(arr(10).toDouble <=5)}catch{case _: Exception =>false}}val lab = Array("NULL","一星","二星","三星","四星","五星")val dataDrop1: RDD[Array[String]]= data.filter { arr: Array[String]=>var flag =falsefor(elem <- lab){if(arr(6).contains(elem)){
          flag =true}}
      flag
    }val dataDrop2: RDD[String]= lines.distinct

    println("删除的非法评分数据条目有: "+(total - dataDrop.count()))
    println("删除的非法星级数据条目有: "+(total - dataDrop1.count()))
    println("删除重复数据条目有: "+(total - dataDrop2.count()))val wordsRdd: RDD[Array[String]]= lines.distinct.map(_.split(",")).filter {
      arr: Array[String]=>try{(arr(10).toDouble >=0)&&(arr(10).toDouble <=5)}catch{case _: Exception =>false}}.filter { arr: Array[String]=>var flag =falsefor(elem <- lab){if(arr(6).contains(elem)){
          flag =true}}
      flag
    }

    wordsRdd.map(_.mkString(",")).saveAsTextFile(hdfsUrl +"/hotelsparktask3")

    sc.stop()}}

hotel_data.csv

下载数据:https://download.csdn.net/download/weixin_44018458/87437211

标签: spark 大数据 scala

本文转载自: https://blog.csdn.net/weixin_44018458/article/details/128980802
版权归原作者 码上行舟 所有, 如有侵权,请联系我们删除。

“spark数据清洗练习”的评论:

还没有评论