目录
前言
本题来源于2022 年全国职业院校技能大赛(高职组)“大数据技术与应用”赛项(电商)- 离线数据处理 - 数据抽取
题目:
提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写)
一、读题分析
涉及组件:MYSQL,HIVE,SPARK,SCALA
涉及知识点:
- 与大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)一样
- 与(1)不同的是,1是针对单列的时间进行比较,本题是在表上的两列当中选取时间较大的那一列的值作为判定时间
二、处理过程
比较每一行两列的值,将他们筛选出来,然后赋给新的一列临时列,最后在删除临时列
1.常规思路
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.time.LocalDate
object sparkMysqltoHive {
def main(args: Array[String]): Unit = {
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
val spark = SparkSession.builder()
.appName("Incremental Data Extraction").master("spark://host:7077")
.enableHiveSupport()
.getOrCreate()
val jdbcurl = "jdbc:mysql://bigdata1:3306/db"
val tablename = "table1"
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
properties.setProperty("driver", "com.mysql.jdbc.Driver")
// 读取mysql数据创建dataframe
val mysqlDF = spark.read.jdbc(jdbcurl, tablename, properties)
// val maxtime = mysqlDF.agg(max(greatest(col("time1"), col("time2")))).toString() 最大时间
// 创建临时视图
mysqlDF.createOrReplaceTempView("mysql_user_info")
// 拿到之前的数据和两比较中最大的数据列
val mysqlDF2 = spark.sql("select *, greatest(operate_time, create_time) as incremental_field from mysql_user_info")
// 读取hive数据
val hiveDF = spark.sql("select * from ods.user_info")
// 在hive中生成一列,并找到hive那两列最大的列
val hiveDF2 = hiveDF.withColumn("incremental_field", greatest(col("operate_time"), col("create_time")))
// 在两列比较最大的同时在比较最大转换为时间
val maxDate = hiveDF2.agg(max(col("incremental_field"))).first().getTimestamp(0)
// agg被用来处理聚合数据的函数
// 在MySQL全部数据中将不满足mysql最大时间大于hive最大时间的数据过滤
val newRecords = mysqlDF2.filter(col("incremental_field") > maxDate)
val yesterday = new SimpleDateFormat("yyyyMMdd")
.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)
// 筛选后删除临时列,并存上分区列
val result = newRecords
.drop(col("incremental_field"))
.withColumn("etl_date", lit(yesterday))
// Write the new records to Hive table with static partitioning
result.write.mode("append")
.partitionBy("etl_date")
.saveAsTable("ods_userinfo")
}
}
2.这里提供第二种比较和筛选数据
// 从hive读取数据
val hiveDF = spark.sql("select max(greatest(time1,time2)) from ods.table4")
val maxTime = hiveDF.collect()(0)(0).toString
// 筛选出增量数据
val valueDF = mysqlDF.filter(col("time1").gt(maxTime) || col("time2").gt(maxTime))
三、重难点分析
必须要知道sql中greatest函数,这个函数能够筛选出一行中的两列中最大的值,与max不同,max是选出单独的一列中最大的值。
总结
数据处理需要灵活的使用sql函数或者spark相关函数对数据进行处理,但思路总体上一样。对于数据处理,还需要掌握好一些非常见但是又很使用的函数。
可以与大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)进行比较,找到他们的不同点,对比一下。
链接:大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)
原创作品如需引用请标明出处
版权归原作者 云梦泽·兮 所有, 如有侵权,请联系我们删除。