编写Scala工程代码,将MySQL的shtd_store库中表CUSTOMER、NATION、PART、PARTSUPP、REGION、SUPPLIER的数据全量抽取到Hive的ods库中对应表customer,nation,part,partsupp,region,supplier中,将表ORDERS、LINEITEM的数据增量抽取到Hive的ods库中对应表ORDERS,LINEITEM中。
本题直白一点将就是使用sparksql抽取mysql库中数据到hive中,拆开来看分为三步,第一步为连接hive,第二步为连接mysql并将数据抽取出来,第三步为将抽取出来的数据传到hive中所对应的表
首先第一步:连接hive
val spark = SparkSession.builder()
.master("local[*]")
.appName("抽取数据")
.enableHiveSupport()
.config("spark.sql.warehouse.dir", "hdfs://master:50070/usr/hive/warehouse")
.config("hive.metastore.uris", "thrift://master:9083")
.getOrCreate()
第二步:连接mysql并将数据拿到
spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/spark-sql")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "user")
.load()
.createTempView("data")
spark.sql("select * from data").show()
最好show一下看看数据是否抽取成功
第三部:将数据传给hive中对应的表
spark.sql("use study")
//静态分区
spark.sql(
"""
|create table if not exists customer(
|id int,
|name string,
|age int
|)
|partitioned by(time string)
|row format delimited fields terminated by '\t'
|;
|""".stripMargin)
println("*************")
spark.sql(
"""
|insert overwrite table customer partition (time='1001')
|select id,name,age
|from data;
|""".stripMargin)
spark.sql("select * from customer").show()
完整代码:
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder()
.master("local[*]")
.appName("抽取数据")
.enableHiveSupport()
.config("spark.sql.warehouse.dir", "hdfs://master:50070/usr/hive/warehouse")
.config("hive.metastore.uris", "thrift://master:9083")
.getOrCreate()
spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/spark-sql")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "user")
.load()
.createTempView("data")
spark.sql("select * from data").show()
println("***************")
spark.sql("use study")
//静态分区
spark.sql(
"""
|create table if not exists customer(
|id int,
|name string,
|age int
|)
|partitioned by(time string)
|row format delimited fields terminated by '\t'
|;
|""".stripMargin)
println("*************")
spark.sql(
"""
|insert overwrite table customer partition (time='1001')
|select id,name,age
|from data;
|""".stripMargin)
spark.sql("select * from customer").show()
println("-------------------------------------------------------------")
spark.stop()
}
本题不难,首先要把hive和spark环境配好
所需环境配置:
pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>atguigu-classes</artifactId> <groupId>com.atguigu.bigdata</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>spark-core</artifactId> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.0.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> </project>
然后将hive中的hive-site.xml文件移到resources下
运行代码前一定要先把hive进程启动,然后hive的服务端也要启动
启动命令:hive --service metastore &
版权归原作者 guo_0423 所有, 如有侵权,请联系我们删除。