0


大数据之使用Spark增量抽取MySQL的数据到Hive数据库(2)

目录


前言

本题来源于2022 年全国职业院校技能大赛(高职组)“大数据技术与应用”赛项(电商)- 离线数据处理 - 数据抽取

题目:


提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写)

一、读题分析

涉及组件:MYSQL,HIVE,SPARK,SCALA

涉及知识点:

  1. 与大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)一样
  2. 与(1)不同的是,1是针对单列的时间进行比较,本题是在表上的两列当中选取时间较大的那一列的值作为判定时间

二、处理过程

比较每一行两列的值,将他们筛选出来,然后赋给新的一列临时列,最后在删除临时列

1.常规思路


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.time.LocalDate

object sparkMysqltoHive {
  def main(args: Array[String]): Unit = {
    import java.text.SimpleDateFormat
    import java.util.{Calendar, Properties}

    val spark = SparkSession.builder()
      .appName("Incremental Data Extraction").master("spark://host:7077")
      .enableHiveSupport()
      .getOrCreate()

    val jdbcurl = "jdbc:mysql://bigdata1:3306/db"
    val tablename = "table1"
    val properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "123456")
    properties.setProperty("driver", "com.mysql.jdbc.Driver")

    //    读取mysql数据创建dataframe
    val mysqlDF = spark.read.jdbc(jdbcurl, tablename, properties)
    //    val maxtime = mysqlDF.agg(max(greatest(col("time1"), col("time2")))).toString()   最大时间

    // 创建临时视图
    mysqlDF.createOrReplaceTempView("mysql_user_info")

    // 拿到之前的数据和两比较中最大的数据列
    val mysqlDF2 = spark.sql("select *, greatest(operate_time, create_time) as incremental_field from mysql_user_info")

    // 读取hive数据
    val hiveDF = spark.sql("select * from ods.user_info")

    // 在hive中生成一列,并找到hive那两列最大的列
    val hiveDF2 = hiveDF.withColumn("incremental_field", greatest(col("operate_time"), col("create_time")))

    // 在两列比较最大的同时在比较最大转换为时间
    val maxDate = hiveDF2.agg(max(col("incremental_field"))).first().getTimestamp(0)
    // agg被用来处理聚合数据的函数

    // 在MySQL全部数据中将不满足mysql最大时间大于hive最大时间的数据过滤
    val newRecords = mysqlDF2.filter(col("incremental_field") > maxDate)

    val yesterday = new SimpleDateFormat("yyyyMMdd")
      .format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)

    // 筛选后删除临时列,并存上分区列
    val result = newRecords
      .drop(col("incremental_field"))
      .withColumn("etl_date", lit(yesterday))

    // Write the new records to Hive table with static partitioning
    result.write.mode("append")
      .partitionBy("etl_date")
      .saveAsTable("ods_userinfo")
  }
}

2.这里提供第二种比较和筛选数据

    //    从hive读取数据
    val hiveDF = spark.sql("select max(greatest(time1,time2)) from ods.table4")
    val maxTime = hiveDF.collect()(0)(0).toString

    //    筛选出增量数据
    val valueDF = mysqlDF.filter(col("time1").gt(maxTime) || col("time2").gt(maxTime))

三、重难点分析

必须要知道sql中greatest函数,这个函数能够筛选出一行中的两列中最大的值,与max不同,max是选出单独的一列中最大的值。


总结

数据处理需要灵活的使用sql函数或者spark相关函数对数据进行处理,但思路总体上一样。对于数据处理,还需要掌握好一些非常见但是又很使用的函数。

可以与大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)进行比较,找到他们的不同点,对比一下。

链接:大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)

原创作品如需引用请标明出处

标签: 大数据 hive spark

本文转载自: https://blog.csdn.net/qq_36920766/article/details/130389118
版权归原作者 云梦泽·兮 所有, 如有侵权,请联系我们删除。

“大数据之使用Spark增量抽取MySQL的数据到Hive数据库(2)”的评论:

还没有评论