抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这
两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.user_info命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;
创建ods_ds_hudi.user_info表
建表之前需要判断一下表是否存在,如果不存在才创建。涉及到两个概念,一个是ODS,一个是hudi。
在数据仓库(Data Warehouse)建模中, ODS(Operational Data Store,操作性数据存储)层 作为数据仓库架构的一部分,通常用于整合来自各种操作性系统的数据。这一层的主要目的是将这些数据整合到一个一致的、标准化的格式中,并进行清洗、转换以确保数据的质量。 一般来说ODS层只需要保存从数据库中抽取来的原始数据,不做任何的更改。 DWD (Data Warehouse Detailed Data) 层用于存储原始、详细的事务性数据,这些数据通常来自于企业的各个操作系统和数据源。这一层包含了数据仓库的底层原子数据,记录了每个操作的细节。 DWS层是DWD层的上一层,用于存储已经汇总和聚合过的数据。这一层的数据是经过处理、加工和聚合的,通常用于支持高层次的分析和决策。
Hudi(Hadoop Upserts Deletes and Incrementals)是一种开源的数据管理框架,旨在简化和优化Apache Hadoop和Apache Spark等大数据处理引擎上的数据湖(Data Lake)操作。Hudi 提供了一种方式,让用户更方便地实现增量数据存储、更新、删除以及实时查询等操作。
任务中要求分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。也就是说要根据etl_date时间不同,将抽取的数据放到对应etl时间的分区里,那么建表的时候就需要增加etl_date字段,类型为String。 任务要求中提到将id作为primaryKey,operate_time作为preCombineField。建表的时候需要配置表使用hudi,并且将id配置为主键,操作时间作为合并的字段。也就是说hudi会根据主键和操作时间来判断数据是插入还是更新。 建表语句如下:
spark.sql(
"""
|create table if not exists `ods_ds_hudi`.`user_info` (
| `id` int,
| `login_name` string,
| `nick_name` string,
| `passwd` string,
| `name` string,
| `phone_num` string,
| `email` string,
| `head_img` string,
| `user_level` string,
| `birthday` date,
| `gender` string,
| `create_time` timestamp,
| `operate_time` timestamp,
| `etl_date` string
| )using hudi
|options (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'operate_time'
|)
|partitioned by (`etl_date`);
|""".stripMargin)
语句中的如下部分是关于hudi的相关配置
using hudi
options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'operate_time'
)
using hudi: 这一部分指定了使用 Hudi 进行数据操作。
options: 这是 Hudi SQL 的 options 子句,用于指定 Hudi 表的一些配置选项。
type = 'cow': 这个选项指定了 Hudi 表的写入类型,这里是 'cow',表示 Copy on Write。Copy on Write 是 Hudi 的一种存储引擎,它在写入时会复制数据,保留历史版本。与之相对的是 'mor'(Merge on Read),它在写入时只更新最新版本,而历史版本会在查询时进行合并。
primaryKey = 'id': 这个选项指定了主键,即在 Hudi 表中用于唯一标识每条记录的字段。在这里,主键是 'id'。
preCombineField = 'operate_time': 这个选项指定了用于决定版本顺序的字段。Hudi 会使用这个字段的值来判断历史版本的先后顺序。在这里,使用了 'operate_time' 字段。
综合起来,这条 SQL 语句表示要使用 Hudi 操作数据,表的写入类型为 Copy on Write,主键是 'id',并且版本的顺序是通过 'operate_time' 字段来确定的。这些配置可以根据具体的需求进行调整。
如何在spark中查询MySQL中的数据
需要将MySQL中表映射到sparksql中。
spark.sql(
"""
| create temporary view user_info_tmp
| using org.apache.spark.sql.jdbc
|options(
| url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
| dbtable 'user_info',
| user 'root1',
| password '123456'
|)
|""".stripMargin)
create temporary view user_info_tmp: 这部分创建了一个临时视图(temporary view)叫做 "user_info_tmp"。这个视图用于在 Spark 中表示和查询 "user_info" 表的数据。
using org.apache.spark.sql.jdbc: 这表明使用了 JDBC 数据源来读取外部数据。具体地,这里使用的是 Spark SQL 的 JDBC 数据源。
options(...): 这里指定了连接 JDBC 数据源时的一些配置选项。
url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8': 指定了 MySQL 数据库的连接信息,包括数据库地址、端口以及其他连接参数。
dbtable 'user_info': 指定了要读取的数据库表,这里是 "user_info"。
user 'root1': 指定了连接数据库所使用的用户名。
password '123456': 指定了连接数据库所使用的密码。
stripMargin: 这是 Scala 语言中的一个特性,用于在多行字符串中去掉每行前面的空格。
获取比赛前一天新增的数据
从MySQL中查询出比赛前一天新增的数据,也就是创建时间是比赛前一天的数据。
select
`id`,
`login_name`,
`nick_name`,
`passwd`,
`name`,
`phone_num`,
`email`,
`head_img`,
`user_level`,
`birthday`,
`gender`,
`create_time`,
`operate_time`
from user_info_tmp where create_time >= '2023-09-01 00:00:00';
获取比赛前一天更新的数据
从MySQL中查询出比赛前一天更新的数据,也就是更新时间是比赛前一天的数据。
select
`id`,
`login_name`,
`nick_name`,
`passwd`,
`name`,
`phone_num`,
`email`,
`head_img`,
`user_level`,
`birthday`,
`gender`,
`create_time`,
`operate_time`
from user_info_tmp where operate_time >= '2023-09-01 00:00:00';
整合新增和更新的查询语句
将更新和新增的查询语句合并成一条sql语句,实现若operate_time为空,则用create_time填充的业务需求,sql如下:
select
`id`,
`login_name`,
`nick_name`,
`passwd`,
`name`,
`phone_num`,
`email`,
`head_img`,
`user_level`,
`birthday`,
`gender`,
`create_time`,
`operate_time`
from user_info_tmp where coalesce(`operate_time`, `create_time`) >= '2023-09-01 00:00:00';
coalesce(..., ''):COALESCE 函数用于返回参数列表中的第一个非空表达式。在这里,如果 ‘operate_time’字段值为空,那么返回 'create_time'字段的值。
如何确定更新多少数据
思考两个问题: 第一次导入的时候ods层中没有历史数据,如何处理? 某段时间没有定时导入数据到ods层中,新增和更新的数据时间使用数据抽取的时间是否合理? 以上两种问题就会导致数据导入不完整,故而任务要求中说“根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入”。意思就是说要根据ODS层中历史数据的最近的操作时间(operate_time或create_time谁最新)来确定从MySQL中抽取哪段时间的数据。所以需要从ods层中查询出最大的操作时间。
select
case
when max(`operate_time`) > max(`create_time`)
then max(`operate_time`)
else max(`create_time`)
end
from `ods_ds_hudi`.`user_info`
case when max(operate_time) > max(create_time) then max(operate_time) else max(create_time) end: 这是一个 CASE 语句,用于根据条件判断选择不同的值。具体地,它比较 operate_time 列的最大值和 create_time 列的最大值,如果 operate_time 大于 create_time,则选择 operate_time 的最大值,否则选择 create_time 的最大值。
如果ods层的表中没有数据,那么这时候查到的值为空,这时候需要给个默认值,sql如下:
select
coalesce(
case
when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
else max(`create_time`)
end, ''
)
from `ods_ds_hudi`.`user_info`
coalesce(..., ''):COALESCE 函数用于返回参数列表中的第一个非空表达式。在这里,如果 CASE 语句的结果为空,那么返回空字符串 '' 在sql中日期的比较可以转换成字符串来比较,将查询到的最大时间转换为字符串,sql如下:
select
cast(
coalesce(
case
when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
else max(`create_time`)
end, ''
) as string
)
from `ods_ds_hudi`.`user_info`
cast(... as string):CAST 函数用于将结果转换为指定的数据类型。在这里,将前面的结果强制转换为字符串类型。
综合起来,整个查询的目的是从 ods_ds_hudi 数据库的 user_info 表中选择一个字符串值,该值是根据比较 operate_time 和 create_time 列的最大值而确定的。如果 operate_time 的最大值大于 create_time 的最大值,则选择 operate_time 的最大值,否则选择 create_time 的最大值。如果这个值为空,那么返回一个空字符串。
从MySQL中查询操作时间大于等于ODS层中最大时间的数据
将查询的ods层中最大的时间作为查询更新数据的参数,将最大时间查询作为子查询放到where条件中,sql如下:
select
`id`,
`login_name`,
`nick_name`,
`passwd`,
`name`,
`phone_num`,
`email`,
`head_img`,
`user_level`,
`birthday`,
`gender`,
`create_time`,
`operate_time`
from user_info_tmp
where
cast(coalesce(`operate_time`, `create_time`) as string) >=
(
select
cast(
coalesce(
case
when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
else max(`create_time`)
end, ''
) as string
)
from `ods_ds_hudi`.`user_info`
);
where cast(coalesce(operate_time, create_time) as string) >= ...: 这是一个 WHERE 子句,用于过滤数据。它选择了满足条件的行,条件是 operate_time 列或 create_time 列的非空最大值(作为字符串形式)要大于等于子查询的结果。
select cast(coalesce(case when max(operate_time) > max(create_time) then max(operate_time) else max(create_time) end, '') as string) from ods_ds_hudi.user_info``: 这是一个子查询,它计算了 ods_ds_hudi 数据库中 user_info 表中的 operate_time 和 create_time 列的非空最大值。这个值将被用于与外部查询中的条件比较。
将查询到的更新数据插入到ods_ds_hudi.user_info表中
结合上面的操作,只需要将查询结果直接插入到ods层的表中即可,sql如下:
insert into `ods_ds_hudi`.`user_info` partition(etl_date = '20230901')
select
`id`,
`login_name`,
`nick_name`,
`passwd`,
`name`,
`phone_num`,
`email`,
`head_img`,
`user_level`,
`birthday`,
`gender`,
`create_time`,
`operate_time`
from user_info_tmp
where
cast(coalesce(`operate_time`, `create_time`) as string) >=
(
select
cast(
coalesce(
case
when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
else max(`create_time`)
end, ''
) as string
)
from `ods_ds_hudi`.`user_info`
);
insert into ods_ds_hudi.user_info partition(etl_date = '20230901'): 这部分指定了数据插入的目标表,即 ods_ds_hudi.user_info,并指定了插入的分区为 etl_date = '20230901'。这表示数据将插入到指定分区中。
查看分区
spark.sql("show partitions `ods_ds_hudi`.`user_info` ").show()
抽取shtd_store库中sku_info的增量数据进入Hudi的ods_ds_hudi库中表sku_info。根据ods_ds_hudi.sku_info表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.sku_info命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;
--2.1创建ODS层表
spark.sql(
"""
|create table if not exists `ods_ds_hudi`.`sku_info` (
| `id` int,
| `spu_id` int,
| `price` decimal,
| `sku_name` string,
| `sku_desc` string,
| `weight` decimal,
| `tm_id` int,
| `category3_id` int,
| `sku_default_img` string,
| `create_time` timestamp,
| `etl_date` string
| )using hudi
|options (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'create_time'
|)
|partitioned by (`etl_date`);
|""".stripMargin)
--2.2创建MySQL表映射
spark.sql(
"""
| create temporary view sku_info_tmp
| using org.apache.spark.sql.jdbc
|options(
| url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
| dbtable 'sku_info',
| user 'root1',
| password '123456'
|)
|""".stripMargin)
--2.3增量导入数据
spark.sql("alter table `ods_ds_hudi`.`sku_info` drop partition(etl_date = '20230901') ")
spark.sql(
"""
|insert into `ods_ds_hudi`.`sku_info` partition(etl_date = '20230901')
|select
| `id`,
| `spu_id`,
| `price`,
| `sku_name`,
| `sku_desc`,
| `weight`,
| `tm_id`,
| `category3_id`,
| `sku_default_img`,
| `create_time`
| from sku_info_tmp
| where
| cast(`create_time` as string)>(select cast(coalesce(max(`create_time`),'') as string) from `ods_ds_hudi`.`sku_info` )
|""".stripMargin)
--2.4查看分区
spark.sql("show partitions `ods_ds_hudi`.`sku_info`").show()
抽取shtd_store库中base_province的增量数据进入Hudi的ods_ds_hudi库中表base_province。根据ods_ds_hudi.base_province表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,create_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.base_province命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;
--3.1创建ODS层表
spark.sql(
"""
|create table if not exists `ods_ds_hudi`.`base_province` (
| `id` int,
| `name` string,
| `region_id` string,
| `area_code` string,
| `iso_code` string,
| `create_time` timestamp,
| `etl_date` string
| )using hudi
|options (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'create_time'
|)
|partitioned by (`etl_date`);
|""".stripMargin)
--3.2创建MySQL表映射
spark.sql(
"""
| create temporary view base_province_tmp
| using org.apache.spark.sql.jdbc
|options(
| url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
| dbtable 'base_province',
| user 'root1',
| password '123456'
|)
|""".stripMargin)
--3.3增量导入数据
spark.sql("alter table `ods_ds_hudi`.`base_province` drop partition(etl_date = '20230901') ")
spark.sql(
"""
|insert into `ods_ds_hudi`.`base_province` partition(etl_date = '20230901')
|select
| `id`,
| `name`,
| `region_id`,
| `area_code`,
| `iso_code`,
| current_timestamp as `create_time`
| from base_province_tmp
| where
| cast(`id` as string)>(select cast(coalesce(max(`id`),'') as string) from `ods_ds_hudi`.`base_province` )
|""".stripMargin)
--3.4查看分区
spark.sql("show partitions `ods_ds_hudi`.`base_province`").show()
抽取shtd_store库中base_region的增量数据进入Hudi的ods_ds_hudi库中表base_region。根据ods_ds_hudi.base_region表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,create_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.base_region命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;
--4.1创建ODS层表
spark.sql(
"""
|create table if not exists `ods_ds_hudi`.`base_region` (
| `id` string,
| `region_name` string,
| `create_time` timestamp,
| `etl_date` string
| )using hudi
|options (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'create_time'
|)
|partitioned by (`etl_date`);
|""".stripMargin)
--4.2创建MySQL表映射
spark.sql(
"""
| create temporary view base_region_tmp
| using org.apache.spark.sql.jdbc
|options(
| url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
| dbtable 'base_region',
| user 'root1',
| password '123456'
|)
|""".stripMargin)
--4.3增量导入数据
spark.sql("alter table `ods_ds_hudi`.`base_region` drop partition(etl_date = '20230901') ")
spark.sql(
"""
|insert into `ods_ds_hudi`.`base_region` partition(etl_date = '20230901')
|select
| `id`,
| `region_name`,
| current_timestamp as `create_time`
| from base_region_tmp
| where
| cast(`id` as string)>(select cast(coalesce(max(`id`),'') as string) from `ods_ds_hudi`.`base_region` )
|""".stripMargin)
--4.4查看分区
spark.sql("show partitions `ods_ds_hudi`.`base_region`").show()
抽取shtd_store库中order_info的增量数据进入Hudi的ods_ds_hudi库中表order_info,根据ods_ds_hudi.order_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.order_info命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;
--5.1创建ODS层表
spark.sql(
"""
|create table if not exists `ods_ds_hudi`.`order_info` (
| `id` int,
| `consignee` string,
| `consignee_tel` string,
| `total_amount` decimal,
| `order_status` string,
| `user_id` int,
| `delivery_address` string,
| `order_comment` string,
| `out_trade_no` string,
| `trade_body` string,
| `create_time` timestamp,
| `operate_time` timestamp,
| `expire_time` timestamp,
| `tracking_no` string,
| `parent_order_id` int,
| `img_url` string,
| `province_id` int,
| `benefit_reduce_amount` decimal,
| `original_total_amount` decimal,
| `feight_fee` decimal,
| `payment_way` string,
| `etl_date` string
| )using hudi
|options (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'operate_time'
|)
|partitioned by (`etl_date`);
|""".stripMargin)
--5.2创建MySQL表映射
spark.sql(
"""
| create temporary view order_info_tmp
| using org.apache.spark.sql.jdbc
|options(
| url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
| dbtable 'order_info',
| user 'root1',
| password '123456'
|)
|""".stripMargin)
--5.3增量导入数据
spark.sql("alter table `ods_ds_hudi`.`order_info` drop partition(etl_date = '20230901') ")
spark.sql(
"""
|insert into `ods_ds_hudi`.`order_info` partition(etl_date = '20230901')
|select
| `id`,
| `consignee` ,
| `consignee_tel`,
| `total_amount`,
| `order_status`,
| `user_id`,
| `delivery_address`,
| `order_comment`,
| `out_trade_no`,
| `trade_body`,
| `create_time`,
| `operate_time`,
| `expire_time`,
| `tracking_no`,
| `parent_order_id`,
| `img_url`,
| `province_id`,
| `benefit_reduce_amount`,
| `original_total_amount`,
| `feight_fee`,
| `payment_way`
| from order_info_tmp
| where
| cast(coalesce(`operate_time`,`create_time`) as string) >(select cast(coalesce(case when max(`operate_time`) > max(`create_time`) then max(`operate_time`) else max(`create_time`) end,'') as string) from `ods_ds_hudi`.`order_info` )
|""".stripMargin)
--5.4查看分区
spark.sql("show partitions `ods_ds_hudi`.`order_info`").show()
抽取shtd_store库中order_detail的增量数据进入Hudi的ods_ds_hudi库中表order_detail,根据ods_ds_hudi.order_detail表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,create_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.order_detail命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下。
解题思路同任务要求1
--6.1创建ODS层表
spark.sql(
"""
|create table if not exists `ods_ds_hudi`.`order_detail` (
| `id` int,
| `order_id` int,
| `sku_id` int,
| `sku_name` string,
| `img_url` string,
| `order_price` decimal,
| `sku_num` int,
| `create_time` timestamp,
| `etl_date` string
| )using hudi
|options (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'create_time'
|)
|partitioned by (`etl_date`);
|""".stripMargin)
--6.2创建MySQL表映射
spark.sql(
"""
| create temporary view order_detail_tmp
| using org.apache.spark.sql.jdbc
|options(
| url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
| dbtable 'order_detail',
| user 'root1',
| password '123456'
|)
|""".stripMargin)
--6.3增量导入数据
spark.sql("alter table `ods_ds_hudi`.`order_detail` drop partition(etl_date = '20230901') ")
spark.sql(
"""
|insert into `ods_ds_hudi`.`order_detail` partition(etl_date = '20230901')
|select
| `id`,
| `order_id`,
| `sku_id`,
| `sku_name`,
| `img_url`,
| `order_price`,
| `sku_num`,
| `create_time`
| from order_detail_tmp
| where
| cast(`create_time` as string) >(select cast(coalesce(max(`create_time`),'') as string) from `ods_ds_hudi`.`order_detail` )
|""".stripMargin)
--6.4查看分区
spark.sql("show partitions `ods_ds_hudi`.`order_detail`").show()
版权归原作者 weixin_44117248 所有, 如有侵权,请联系我们删除。