文章目录
前言
本题来源于全国职业技能大赛之大数据技术赛项赛题-离线数据处理-数据抽取(其他暂不透露)
题目:编写Scala代码,使用Spark将MySQL的shtd_industry库中表EnvironmentData,ChangeRecord,BaseMachine,MachineData,ProduceRecord全量抽取到Hive的ods库(需自建)中对应表environmentdata,changerecord,basemachine, machinedata, producerecord中。
以下面题目为例:
抽取MySQL的shtd_industry库中EnvironmentData表的全量数据进入Hive的ods库中表environmentdata,字段排序、类型不变,同时添加静态分区,分区字段类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。并在hive cli执行show partitions ods.environmentdata命令,将结果截图粘贴至对应报告中;
提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写)
一、读题分析
涉及组件:Spark,Mysql,Hive
涉及知识点:
- Spark读取数据库数据
- DataFrameAPI的使用(重点)
- Spark写入数据库数据
- Hive数据库的基本操作
二、使用步骤
1.导入配置文件到pom.xml
<!--SparkSQL配置-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--spark连接hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--mysql配置-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
2.代码部分
由于不是很难,直接上代码,代码如下(示例):
package A.offlineDataProcessing.shtd_industry.task1_dataExtraction
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
object SparkToMysqlToHive {
def main(args: Array[String]): Unit = {
// 创建Spark对象会话
val spark = SparkSession.builder()
.appName("MySQL to Hive")
.master("spark://bigdata1:7077")
.enableHiveSupport().getOrCreate()
// 连接MySQL数据库并设置属性
val jdbcUrl = "jdbc:mysql://bigdata1:3306/shtd_industry"
val table = "EnvironmentData"
val properties = new Properties
properties.put("user", "root")
properties.put("password", "123456")
// Read data from MySQL
val df: DataFrame = spark.read.jdbc(jdbcUrl, table, properties)
println("-------------------自定义操作-------------------------")
// Add partition column
val dateFormat = new SimpleDateFormat("yyyyMMdd")
// 第一个getTime返回的是一个 Date 对象
// 第二个 getTime 方法返回的是一个整数值,表示此 Date 对象表示的时间距离标准基准时间(1970年1月1日00:00:00 GMT)的毫秒数。
val yesterday = dateFormat.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)
//对MySQL来的数据进行withCoulum操作,有就修改,没有就添加
val dfWithPartition: DataFrame = df.withColumn("etldate", lit(yesterday))
println("-------------------写入数据-------------------------")
// Write data to Hive
// mode模式为覆盖,还有append为追加
// partitionBy 根据指定列进行分区
// saveAsTable保存表
dfWithPartition.write.mode("overwrite")
.partitionBy("etldate")
.saveAsTable("ods.environmentdata")
}
}
hive数据库相关的操作在这不做演示
三、重难点分析
没有难点,主要涉及能否自定义函数完成任务需求
val dateFormat = new SimpleDateFormat("yyyyMMdd")
// 第一个getTime返回的是一个 Date 对象
// 第二个 getTime 方法返回的是一个整数值,表示此 Date 对象表示的时间距离标准基准时间(1970年1月1日00:00:00 GMT)的毫秒数。
val yesterday = dateFormat.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)
//对MySQL来的数据进行withCoulum操作,有就修改,没有就添加
val dfWithPartition: DataFrame = df.withColumn("etldate", lit(yesterday))
总结
本文仅仅介绍了Spark读取MySQL的数据到hive数据库的操作,spark提供了许多方法,我们不必写SQL语法就可以直接对数据进行操作,还是很方便的,并且难度也不高(比flink简单)。
如转载请标明出处
版权归原作者 云梦泽·兮 所有, 如有侵权,请联系我们删除。