0


Spark框架-离线数据统计

数据清洗

任务简介:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

第一小问:

第一步:输出日志(使用spark默认的log4j配置文件)

//此方法要放在主程序的首行,靠后对输出日志控制不起作用

Logger.getLogger("org").setLevel(Level.ERROR)

第二步:创建SparkSession对象(关闭严格模式,否则创建静态分区)

  val spark=SparkSession
      .builder().appName("c_dataClear1").master("local").config("hive.exec.dynamic.partition.mode","nonstrict")//关闭严格模式,否则会有一个静态分区//hive元数据仓库目录.config("hive.metastore.warehouse.dir","hdfs://192.168.3.89:9000/user/hive/warehouse")//目录地址.enableHiveSupport().getOrCreate()

第三步:拿出所有的表并进行清洗

val  tableArray=List("customer","lineitem","nation","orders","part","partsupp","region","supplier")//所有数据表的名称
    tableArray.foreach(table =>{

第四步:删除分区并且统计

//2.删除分区并统计  var odsData=spark.sql(s"select * from ods.${table}").drop("part_date")//去除原来字段
      val num=odsData.count()//查看数据表结构println(odsData.schema)//查看数据表的结构

第五步:将对于字段的日期改为timestamp类型

  • TimeStamp类型为:yyyy-MM-dd HH:mm:ss
//3.将对于字段的日期改为timestamp类型 格式为:yyyy-MM--dd HH:mm:ss
      odsData.columns.foreach(tableName =>{
       val startIndex=tableName.length -4
       val endIndex=tableName.length
       
       if(endIndex >=4){
         val tamp=tableName.substring(startIndex,endIndex)//查看每个字段中最后四位是否带有dateif(tamp.toLowerCase.equals("date")){println("==================带有日期字段=============="+tableName)
           odsData=odsData.withColumn(tableName,date_format(col(tableName),"yyyy-MM-dd HH:mm:ss").cast("timestamp"))}

第六步:去除重复字段并创建临时视图

odsData.distinct().createOrReplaceTempView(table)//去除重复字段并创建临时视图
      
      spark.sql(s"drop table if exists dwd.${table}")//4.创建表
      spark.sql(s"create table if not exists dwd.${table} like ods.${table}")//5.插入数据
      spark.sql(s"insert overwrite table dwd.${table} select * from ods.${table}")
      spark.sql(s"select * from dwd.${table} limit 5").show //查询前五条数据
      
      spark.sql(s"desc dwd.${table}").show//查看数据结构

第七步:查看去重启后数据的条数

//6.查看去重后数据的条数

val resultNum=spark.sql(s"select * from dwd.${table}").count()print(s"================去除重复前数据条数${num}=======================")print(s"================去除重复前数据条数${resultNum}=======================")

第七步:打包集群

在这里插入图片描述

在这里插入图片描述

第八步:进入环境并运行

在这里插入图片描述

第九步:代码结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

第十步:代码源码

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col,date_format}object C_DataClear1 {
  def main(args:Array[String]):Unit={
    Logger.getLogger("org").setLevel(Level.ERROR)//1:输出日志:使用spark默认的log4j配置文件设置日志输出级别 此方法要放在主程序的首行,靠后对输出日志控制不起作用//2.准备sparkSession对象
    val spark=SparkSession
      .builder().appName("c_dataClear1").master("local").config("hive.exec.dynamic.partition.mode","nonstrict")//关闭严格模式,否则会有一个静态分区//hive元数据仓库目录.config("hive.metastore.warehouse.dir","hdfs://192.168.3.89:9000/user/hive/warehouse")//目录地址.enableHiveSupport().getOrCreate()//拿出所有的表进行清洗
    val tableArray=List("customer","lineitem","nation","orders","part","partsupp","region","supplier")//所有数据表的名称
    tableArray.foreach(table =>{//2.删除分区并统计var odsData=spark.sql(s"select * from ods.${table}").drop("part_date")//去除原来字段
      val num=odsData.count()//查看数据表结构println(odsData.schema)//查看数据表的结构//3.将对于字段的日期改为timestamp类型 格式为:yyyy-MM--dd HH:mm:ss
      odsData.columns.foreach(tableName =>{
       val startIndex=tableName.length -4
       val endIndex=tableName.length

       if(endIndex >=4){
         val tamp=tableName.substring(startIndex,endIndex)//查看每个字段中最后四位是否带有dateif(tamp.toLowerCase.equals("date")){println("==================带有日期字段=============="+tableName)
           odsData=odsData.withColumn(tableName,date_format(col(tableName),"yyyy-MM-dd HH:mm:ss").cast("timestamp"))}}})
      odsData.distinct().createOrReplaceTempView(table)//去除重复字段并创建临时视图

      spark.sql(s"drop table if exists dwd.${table}")//4.创建表
      spark.sql(s"create table if not exists dwd.${table} like ods.${table}")//5.插入数据
      spark.sql(s"insert overwrite table dwd.${table} select * from ods.${table}")
      spark.sql(s"select * from dwd.${table} limit 5").show //查询前五条数据

      spark.sql(s"desc dwd.${table}").show//查看数据结构//6.查看去重后数据的条数
      val resultNum=spark.sql(s"select * from dwd.${table}").count()print(s"================去除重复前数据条数${num}=======================")print(s"================去除重复前数据条数${resultNum}=======================")})}}

第二小问:

第一步:创建SparkSession对象

val spark=SparkSession
      .builder().master("local").appName("c1_dataClear").config("hive.metastore.warehouse.dir","hdfs://192.168.3.89:9000/user/hive/warehouse").enableHiveSupport().getOrCreate()

第二步:输入SQL语句

spark.sql("show databases").show
    
    spark.sql("alter table ods.customer drop partition (part_date<'20220410')")//保留三天内的数据

整体代码如下:

import org.apache.spark.sql.SparkSession

object C1_DataClear {
  def main(args:Array[String]):Unit={
    val spark=SparkSession
      .builder().master("local").appName("c1_dataClear").config("hive.metastore.warehouse.dir","hdfs://192.168.3.89:9000/user/hive/warehouse").enableHiveSupport().getOrCreate()
      
    spark.sql("show databases").show
    
    spark.sql("alter table ods.customer drop partition (part_date<'20220410')")//保留三天内的数据}}
标签: 大数据 big data hive

本文转载自: https://blog.csdn.net/m0_62491934/article/details/124154736
版权归原作者 那人独钓寒江雪. 所有, 如有侵权,请联系我们删除。

“Spark框架-离线数据统计”的评论:

还没有评论