0


使用spark做数据清洗(增量)

在这里插入图片描述

数据清洗

数据清洗是指对数据进行重新审查和校验的过程中,发现并纠正数据文件中可识别的错误,按照一定的规则把错误或冲突的数据洗掉,包括检查数据一致性,处理无效值和缺失值等。

清洗要求

编写Scala代码,使用Spark将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。(若dwd库中部分表没有数据,正常抽取即可)
抽取ods库中user_info表中昨天的分区数据,并结合dim_user_info最新分区现有的数据, 根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段, 根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充, 并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。 使用hive cli执行show partitions dwd.dim_user_info命令。

清洗准备

按要求全量抽取到dwd库中

编写sparkAPI 进行数据处理

    Logger.getLogger("org").setLevel(Level.OFF)
    System.setProperty("HADOOP_USER_NAME","root")
    val sparkconf=new SparkConf()
      .setAppName("qingxi")
      .setMaster("local[*]")
    val sparkSession=SparkSession.builder()
      .config(sparkconf)
      .config("hive.metastore.uris","thrift://bigdata1:9083")
      .config("dfs.client.use.datanode.hostname","true")
      .config("spark.sql.parquet.writeLegacyFormat","true")
      .enableHiveSupport()
      .getOrCreate()

根据题目要求,使用Spark将ods库中相应表数据全量抽取到Hive的dwd库中对应表中

清洗代码

 sparkSession.read.table("ods.user_info")
         .withColumn("dwd_insert_user",lit("user1"))
         .withColumn("dwd_insert_time",to_timestamp(from_unixtime(unix_timestamp(),"yyyy-MM-dd HH:mm:ss")))
         .withColumn("dwd_modify_user",lit("user1"))
         .withColumn("dwd_modify_time",to_timestamp(from_unixtime(unix_timestamp(),"yyyy-MM-dd HH:mm:ss")))
         .withColumn("birthday",to_timestamp(from_unixtime(unix_timestamp(col("birthday"),"yyyy-MM-dd HH:mm:ss"))))
         .write
         .mode(SaveMode.Overwrite)
         .partitionBy("etl_date")
         .saveAsTable("dwd.dim_user_info")
      

查看抽取内容

 sparkSession.sql("select * from dwd.dim_user_info").show()

在这里插入图片描述

增量清洗

清洗ods 库中user_info 表数据

清洗代码

val ods_user_info=sparkSession.table("ods.user_info")
      .withColumn("dwd_insert_user",lit("user1"))
      .withColumn("dwd_insert_time",to_timestamp(from_unixtime(unix_timestamp(),"yyyy-MM-dd HH:mm:ss")))
      .withColumn("dwd_modify_user",lit("user1"))
      .withColumn("dwd_modify_time",to_timestamp(from_unixtime(unix_timestamp(),"yyyy-MM-dd HH:mm:ss")))
      .withColumn("birthday",to_timestamp(from_unixtime(unix_timestamp(col("birthday"),"yyyy-MM-dd HH:mm:ss"))))
      .withColumn("operate_time",when(col("operate_time").isNull,col("create_time")).otherwise(col("operate_time")))
      .withColumn("etl_date",lit("20231211"))

将dwd库中dim_user_info表给到变量dwd_user_info

val dwd_user_info = sparkSession.table("dwd.dim_user_info")

将两个表内容合并

val union_table=ods_user_info.select(dwd_user_info.columns.map(col):_*).union(dwd_user_info)
      .withColumn("sortId",row_number().over(Window.partitionBy("id").orderBy(desc("operate_time"))))
      .withColumn("dwd_insert_time",lead("dwd_insert_time",1).over(Window.partitionBy("id").orderBy(desc("dwd_modify_time"))))
      .withColumn("dwd_insert_time",when(col("dwd_insert_time").isNull,col("dwd_modify_time")).otherwise(col("dwd_insert_time")))

左连接去重,把旧数据删掉

     val new_data=union_table.filter(col("sortId")===1).as("new_data")
      val old_data=union_table.filter(col("sortId")===2).as("old_data")
      new_data.join(old_data,new_data("id")===old_data("id"),"left")
       .select("new_data.*")
      .drop("sortId")
      .write
      .mode(SaveMode.Overwrite)
      .saveAsTable("dwd.zhang_dim_user_info")

完整代码

package qingxi.zengliang
import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, desc, format_string, from_unixtime, lead, lit, row_number, to_timestamp, unix_timestamp, when}
/**
 * 抽取ods库中user_info表中昨天的分区数据,并结合dim_user_info最新分区现有的数据,
 * 根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,
 * 根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,
 * 并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。
 * 若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,
 * 并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。
 * 使用hive cli执行show partitions dwd.dim_user_info命令,
 */
object Test01 {
  def main(args: Array[String]): Unit = {
    //sparkAPI 编写
    Logger.getLogger("org").setLevel(Level.OFF)
    System.setProperty("HADOOP_USER_NAME","root")
    val sparkconf=new SparkConf()
      .setAppName("qingxi")
      .setMaster("local[*]")
    val sparkSession=SparkSession.builder()
      .config(sparkconf)
      .config("hive.metastore.uris","thrift://bigdata1:9083")
      .config("dfs.client.use.datanode.hostname","true")
      .config("spark.sql.parquet.writeLegacyFormat","true")
      .enableHiveSupport()
      .getOrCreate()
//清洗dwd.dim_user_info
//       sparkSession.read.table("ods.user_info")
//         .withColumn("dwd_insert_user",lit("user1"))
//         .withColumn("dwd_insert_time",to_timestamp(from_unixtime(unix_timestamp(),"yyyy-MM-dd HH:mm:ss")))
//         .withColumn("dwd_modify_user",lit("user1"))
//         .withColumn("dwd_modify_time",to_timestamp(from_unixtime(unix_timestamp(),"yyyy-MM-dd HH:mm:ss")))
//         .withColumn("birthday",to_timestamp(from_unixtime(unix_timestamp(col("birthday"),"yyyy-MM-dd HH:mm:ss"))))//若原数据中只有年月日,则在时分秒的位置添加00:00:00
//         .write
//         .mode(SaveMode.Overwrite)
//         .partitionBy("etl_date")
//         .saveAsTable("dwd.dim_user_info")
//       sparkSession.sql("select * from dwd.dim_user_info").show()
//ods_user_info 这个变量放着清洗过得ods中的数据
    val ods_user_info=sparkSession.table("ods.user_info")
      .withColumn("dwd_insert_user",lit("user1"))
      .withColumn("dwd_insert_time",to_timestamp(from_unixtime(unix_timestamp(),"yyyy-MM-dd HH:mm:ss")))
      .withColumn("dwd_modify_user",lit("user1"))
      .withColumn("dwd_modify_time",to_timestamp(from_unixtime(unix_timestamp(),"yyyy-MM-dd HH:mm:ss")))
      .withColumn("birthday",to_timestamp(from_unixtime(unix_timestamp(col("birthday"),"yyyy-MM-dd HH:mm:ss"))))
      .withColumn("operate_time",when(col("operate_time").isNull,col("create_time")).otherwise(col("operate_time")))
      .withColumn("etl_date",lit("20231211"))
//dwd_user_info 这个变量存放dwd.dim_user_info过去清洗的数据
    val dwd_user_info = sparkSession.table("dwd.dim_user_info")
//合并两表数据根据id分区,添加sortID字段区分新旧表,并实现dwd_insert_time时间不变
    val union_table=ods_user_info.select(dwd_user_info.columns.map(col):_*).union(dwd_user_info)
      .withColumn("sortId",row_number().over(Window.partitionBy("id").orderBy(desc("operate_time"))))
      .withColumn("dwd_insert_time",lead("dwd_insert_time",1).over(Window.partitionBy("id").orderBy(desc("dwd_modify_time"))))
      .withColumn("dwd_insert_time",when(col("dwd_insert_time").isNull,col("dwd_modify_time")).otherwise(col("dwd_insert_time")))
    val new_data=union_table.filter(col("sortId")===1).as("new_data")
    val old_data=union_table.filter(col("sortId")===2).as("old_data")
      new_data.join(old_data,new_data("id")===old_data("id"),"left")
       .select("new_data.*")
      .drop("sortId")
      .write
      .mode(SaveMode.Overwrite)
      .saveAsTable("dwd.zhang_dim_user_info")
    println("表结果")
    sparkSession.sql("select * from dwd.zhang_dim_user_info").show()
    sparkSession.stop()
  }
}

实现效果
在这里插入图片描述

清洗完成


本文转载自: https://blog.csdn.net/2301_80792800/article/details/134896428
版权归原作者 吃草料的羊 所有, 如有侵权,请联系我们删除。

“使用spark做数据清洗(增量)”的评论:

还没有评论