数据清洗
1.题目分析
使用Scala编写spark工程代码,将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。
根据以上提示分析得出以下:
- 当然是使用scala编写spark代码
- 将ods库的全部数据抽取到hive的dwd库中,ods和dwd都是数仓中的分层(具体可看数仓的分层概念)
- 表中涉及到的timestamp类型或者缺少时分秒的字段,需要进行时间格式化,转换为 yyyy-MM-dd HH:mm:ss格式
接下来我们看具体的题目:
抽取ods库中user_info表中昨天的分区(任务一生成的分区)数据,并结合dim_user_info最新分区现有的数据,根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_user_info命令,将结果截图粘贴至对应报告中;
首先这个题目是要我们实现从ods.user_info抽取一个分区的数据与dwd.dim_user_info最新分区的数据根据id进行合并,取出operate_time字段最大的一条数据,分区字段依旧是etldate,剩下的都是对dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time这四个字段填充值得解释,这道题得难点其实在在合并数据上。
2.代码实现
接下来我们针对题目理解开始代码实现
- 首先我们需要确保服务器已经启动hadoop、hive的metastore
hadoop 启动命令:start-all.sh
hive metastore 启动命令:hive --service metastore &
jps(jdk/bin下的命令)命令:查看hadoop各个组件是否成功运行
netstat -ntpl | grep 9083 :查看hive metastore 默认的9083端口是否已经正在运行
- 创建scala on spark工程,添加pom依赖,添加scala框架支持(如果idea创建文件时候没有scala选项) 此过程没有难点,所以不在叙述
- 项目的初始化先将hive安装目录下的配置文件(hive-site.xml)复制到resources目录下,目的是为了让spark程序读取hive的连接信息
编写初始化代码
packagecom.rj.qximportorg.apache.log4j.{Level, Logger}importorg.apache.spark.sql.expressions.{UserDefinedFunction, Window}importorg.apache.spark.sql._
importorg.apache.spark.sql.functions._
importorg.apache.spark.sql.types.LongType
importjava.util.Properties
object UserInfo {//设置hadoop的用户
System.setProperty("HADOOP_USER_NAME","root")// 设置日志等级,oFF代表关闭日志
Logger.getLogger("org").setLevel(Level.OFF)def main(args: Array[String]):Unit={val session: SparkSession = SparkSession
.builder()// 集群跑需要注释.master("local[*]").appName("Merge user_info from MYSQL to ODS")// 客户端访问datanode的时候是通过主机域名访问,就不会出现通过内网IP来访问了.config("dfs.client.use.datanode.hostname","true")// hive开启动态分区.config("hive.exec.dynamic.partition.mode","nonstrict")// 连接服务中的hive metastore.config("hive.metastore.uris","thrift://bigdata1:9083")// 需要开启hive支持,不开启会连接默认hive.enableHiveSupport().getOrCreate()// 关闭程序
session.stop()}}
- 经过分析我们第一步需要先抽取ods昨天的分区,比赛中也就是数据抽取阶段抽取的数据
// 读取ods的昨天分区数据val ods_user_nfo: Dataset[Row]= session
.table("ods.user_info").where(col("etldate")==="20230321")
- 读取dwd最新分区,取出分区字段最大的即可
// 读取dwd的最新分区数据val dwd_user_info: Dataset[Row]= session
.table("dwd.dim_user_info").where("etldate = (SELECT MAX(etldate) FROM dwd.dim_user_info)")
- 接下来就是最关键的一步,就是合并数据这一步需要使用spark的窗口函数ROW_NUMBER实现,row_number()函数是为每一行数据生成行号,从1开始那我们可以对id进行分组,根据operate_time倒序排序取出行号为1的那条数据就是我们最终合并保留的数据
// 合并ods和dwd的分区数据,添加一个行号列,取出行号为1的后删除字段val merged_user_info: Dataset[Row]= ods_user_nfo.union(dwd_user_info)// 根据ID进行分组.withColumn("rowNumber",
row_number().over(Window.partitionBy("id").orderBy(desc("operate_time")))).where(col("rowNumber")===1).drop("rowNumber")
- 我们对题目要求的时间进行填充
// 如果operate_time为空,则使用create_time填充val user_info_with_operate_time: DataFrame = merged_user_info.withColumn("operate_time",
when(col("operate_time").isNull, col("create_time")).otherwise(col("operate_time")))importsession.implicits._
val modify_timeFunction: UserDefinedFunction = session
.udf
.register("modify_time",(id:Long)=>{val modify_time:String= dwd_user_info.filter((r: Row)=>{
r.get(0).toString.toLong.equals(id)}).select("dwd_modify_time").first().get(0).toString
modify_time
})val ids: Array[Long]= dwd_user_info.select("id").map((_: Row)(0).toString.toLong).collect()val user_info_with_dwd_cols: DataFrame = user_info_with_operate_time
.withColumn("dwd_insert_user", lit("user1")).withColumn("dwd_modify_user", lit("user1")).withColumn("dwd_insert_time",
when(col("id").isin(ids: _*), modify_timeFunction(col("id"))).otherwise(date_format(current_timestamp(),"yyyy-MM-dd HH:mm:ss"))).withColumn("dwd_modify_time",
when(col("dwd_modify_time").isNull, date_format(current_timestamp(),"yyyy-MM-dd HH:mm:ss")).otherwise(when(col("id").isin(ids: _*), date_format(current_timestamp(),"yyyy-MM-dd HH:mm:ss")).otherwise(col("dwd_modify_time"))))
- 将结果写入dwd库的dim_user_info分区表
注意:因为本人的数据库中dwd.dim_user_info只有一个分区的数据,所以可以进行覆盖,但是如果还有其他分区,需要将我们合并的结果覆盖掉dwd.dim_user_info最新的一个分区
// 将结果写入dwd库的dim_user_info分区表
user_info_with_dwd_cols.write
.mode("append").partitionBy("etldate").saveAsTable("dwd.dim_user_info")
- 显示dim_user_info的所有分区
session.sql("SHOW PARTITIONS dwd.dim_user_info").show(false)
如果有问题,欢迎一起讨论!
版权归原作者 do{a++b++}while(a&b) 所有, 如有侵权,请联系我们删除。