0


大数据国赛第2套任务B-子任务一数据抽取

抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这

两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.user_info命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

创建ods_ds_hudi.user_info表
建表之前需要判断一下表是否存在,如果不存在才创建。涉及到两个概念,一个是ODS,一个是hudi。

在数据仓库(Data Warehouse)建模中, ODS(Operational Data Store,操作性数据存储)层 作为数据仓库架构的一部分,通常用于整合来自各种操作性系统的数据。这一层的主要目的是将这些数据整合到一个一致的、标准化的格式中,并进行清洗、转换以确保数据的质量。 一般来说ODS层只需要保存从数据库中抽取来的原始数据,不做任何的更改。 DWD (Data Warehouse Detailed Data) 层用于存储原始、详细的事务性数据,这些数据通常来自于企业的各个操作系统和数据源。这一层包含了数据仓库的底层原子数据,记录了每个操作的细节。 DWS层是DWD层的上一层,用于存储已经汇总和聚合过的数据。这一层的数据是经过处理、加工和聚合的,通常用于支持高层次的分析和决策。
Hudi(Hadoop Upserts Deletes and Incrementals)是一种开源的数据管理框架,旨在简化和优化Apache Hadoop和Apache Spark等大数据处理引擎上的数据湖(Data Lake)操作。Hudi 提供了一种方式,让用户更方便地实现增量数据存储、更新、删除以及实时查询等操作。
任务中要求分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。也就是说要根据etl_date时间不同,将抽取的数据放到对应etl时间的分区里,那么建表的时候就需要增加etl_date字段,类型为String。 任务要求中提到将id作为primaryKey,operate_time作为preCombineField。建表的时候需要配置表使用hudi,并且将id配置为主键,操作时间作为合并的字段。也就是说hudi会根据主键和操作时间来判断数据是插入还是更新。 建表语句如下:

spark.sql(
  """
    |create table if not exists `ods_ds_hudi`.`user_info` (
    |   `id` int,
    |  `login_name` string,
    |  `nick_name`  string,
    |  `passwd`  string,
    |  `name`  string,
    |  `phone_num`  string,
    |  `email`  string,
    |  `head_img`  string,
    |  `user_level`  string,
    |  `birthday` date,
    |  `gender`  string,
    |  `create_time` timestamp,
    |  `operate_time` timestamp,
    |  `etl_date` string
    |  )using hudi
    |options (
    |  type = 'cow',
    |  primaryKey = 'id',
    |  preCombineField = 'operate_time'
    |)
    |partitioned by (`etl_date`);
    |""".stripMargin)
语句中的如下部分是关于hudi的相关配置

using hudi
  options (
    type = 'cow',
    primaryKey = 'id',
    preCombineField = 'operate_time'
  )
using hudi: 这一部分指定了使用 Hudi 进行数据操作。
options: 这是 Hudi SQL 的 options 子句,用于指定 Hudi 表的一些配置选项。
type = 'cow': 这个选项指定了 Hudi 表的写入类型,这里是 'cow',表示 Copy on Write。Copy on Write 是 Hudi 的一种存储引擎,它在写入时会复制数据,保留历史版本。与之相对的是 'mor'(Merge on Read),它在写入时只更新最新版本,而历史版本会在查询时进行合并。
primaryKey = 'id': 这个选项指定了主键,即在 Hudi 表中用于唯一标识每条记录的字段。在这里,主键是 'id'。
preCombineField = 'operate_time': 这个选项指定了用于决定版本顺序的字段。Hudi 会使用这个字段的值来判断历史版本的先后顺序。在这里,使用了 'operate_time' 字段。
综合起来,这条 SQL 语句表示要使用 Hudi 操作数据,表的写入类型为 Copy on Write,主键是 'id',并且版本的顺序是通过 'operate_time' 字段来确定的。这些配置可以根据具体的需求进行调整。

如何在spark中查询MySQL中的数据
需要将MySQL中表映射到sparksql中。

spark.sql(
  """
    | create temporary view user_info_tmp
    |    using org.apache.spark.sql.jdbc
    |options(
    |    url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
    |    dbtable 'user_info',
    |    user 'root1',
    |    password '123456'
    |)
    |""".stripMargin)
create temporary view user_info_tmp: 这部分创建了一个临时视图(temporary view)叫做 "user_info_tmp"。这个视图用于在 Spark 中表示和查询 "user_info" 表的数据。

using org.apache.spark.sql.jdbc: 这表明使用了 JDBC 数据源来读取外部数据。具体地,这里使用的是 Spark SQL 的 JDBC 数据源。

options(...): 这里指定了连接 JDBC 数据源时的一些配置选项。

url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8': 指定了 MySQL 数据库的连接信息,包括数据库地址、端口以及其他连接参数。
dbtable 'user_info': 指定了要读取的数据库表,这里是 "user_info"。
user 'root1': 指定了连接数据库所使用的用户名。
password '123456': 指定了连接数据库所使用的密码。
stripMargin: 这是 Scala 语言中的一个特性,用于在多行字符串中去掉每行前面的空格。

获取比赛前一天新增的数据
从MySQL中查询出比赛前一天新增的数据,也就是创建时间是比赛前一天的数据。

select
  `id`,
  `login_name`,
  `nick_name`,
  `passwd`,
  `name`,
  `phone_num`,
  `email`,
  `head_img`,
  `user_level`,
  `birthday`,
  `gender`,
  `create_time`,
  `operate_time`
from user_info_tmp where create_time >= '2023-09-01 00:00:00';
获取比赛前一天更新的数据
从MySQL中查询出比赛前一天更新的数据,也就是更新时间是比赛前一天的数据。

select
  `id`,
  `login_name`,
  `nick_name`,
  `passwd`,
  `name`,
  `phone_num`,
  `email`,
  `head_img`,
  `user_level`,
  `birthday`,
  `gender`,
  `create_time`,
  `operate_time`
from user_info_tmp where operate_time >= '2023-09-01 00:00:00';
整合新增和更新的查询语句
将更新和新增的查询语句合并成一条sql语句,实现若operate_time为空,则用create_time填充的业务需求,sql如下:

select
  `id`,
  `login_name`,
  `nick_name`,
  `passwd`,
  `name`,
  `phone_num`,
  `email`,
  `head_img`,
  `user_level`,
  `birthday`,
  `gender`,
  `create_time`,
  `operate_time`
from user_info_tmp where coalesce(`operate_time`, `create_time`) >= '2023-09-01 00:00:00';
coalesce(..., ''):COALESCE 函数用于返回参数列表中的第一个非空表达式。在这里,如果 ‘operate_time’字段值为空,那么返回 'create_time'字段的值。

如何确定更新多少数据
思考两个问题: 第一次导入的时候ods层中没有历史数据,如何处理? 某段时间没有定时导入数据到ods层中,新增和更新的数据时间使用数据抽取的时间是否合理? 以上两种问题就会导致数据导入不完整,故而任务要求中说“根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入”。意思就是说要根据ODS层中历史数据的最近的操作时间(operate_time或create_time谁最新)来确定从MySQL中抽取哪段时间的数据。所以需要从ods层中查询出最大的操作时间。

select
  case
    when max(`operate_time`) > max(`create_time`) 
    then max(`operate_time`)
    else max(`create_time`)
  end
from `ods_ds_hudi`.`user_info`

case when max(operate_time) > max(create_time) then max(operate_time) else max(create_time) end: 这是一个 CASE 语句,用于根据条件判断选择不同的值。具体地,它比较 operate_time 列的最大值和 create_time 列的最大值,如果 operate_time 大于 create_time,则选择 operate_time 的最大值,否则选择 create_time 的最大值。

如果ods层的表中没有数据,那么这时候查到的值为空,这时候需要给个默认值,sql如下:

select
    coalesce(
      case
        when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
        else max(`create_time`)
      end, ''
    )
from `ods_ds_hudi`.`user_info`

coalesce(..., ''):COALESCE 函数用于返回参数列表中的第一个非空表达式。在这里,如果 CASE 语句的结果为空,那么返回空字符串 '' 在sql中日期的比较可以转换成字符串来比较,将查询到的最大时间转换为字符串,sql如下:

select
  cast(
    coalesce(
      case
        when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
        else max(`create_time`)
      end, ''
    ) as string
  )
from `ods_ds_hudi`.`user_info`
cast(... as string):CAST 函数用于将结果转换为指定的数据类型。在这里,将前面的结果强制转换为字符串类型。
综合起来,整个查询的目的是从 ods_ds_hudi 数据库的 user_info 表中选择一个字符串值,该值是根据比较 operate_time 和 create_time 列的最大值而确定的。如果 operate_time 的最大值大于 create_time 的最大值,则选择 operate_time 的最大值,否则选择 create_time 的最大值。如果这个值为空,那么返回一个空字符串。

从MySQL中查询操作时间大于等于ODS层中最大时间的数据
将查询的ods层中最大的时间作为查询更新数据的参数,将最大时间查询作为子查询放到where条件中,sql如下:

select
  `id`,
  `login_name`,
  `nick_name`,
  `passwd`,
  `name`,
  `phone_num`,
  `email`,
  `head_img`,
  `user_level`,
  `birthday`,
  `gender`,
  `create_time`,
  `operate_time`
from user_info_tmp
where
  cast(coalesce(`operate_time`, `create_time`) as string) >=
  (
    select
      cast(
        coalesce(
          case
            when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
            else max(`create_time`)
          end, ''
        ) as string
      )
    from `ods_ds_hudi`.`user_info`
  );

where cast(coalesce(operate_time, create_time) as string) >= ...: 这是一个 WHERE 子句,用于过滤数据。它选择了满足条件的行,条件是 operate_time 列或 create_time 列的非空最大值(作为字符串形式)要大于等于子查询的结果。
select cast(coalesce(case when max(operate_time) > max(create_time) then max(operate_time) else max(create_time) end, '') as string) from ods_ds_hudi.user_info``: 这是一个子查询,它计算了 ods_ds_hudi 数据库中 user_info 表中的 operate_time 和 create_time 列的非空最大值。这个值将被用于与外部查询中的条件比较。

将查询到的更新数据插入到ods_ds_hudi.user_info表中
结合上面的操作,只需要将查询结果直接插入到ods层的表中即可,sql如下:

insert into `ods_ds_hudi`.`user_info` partition(etl_date = '20230901')
select
  `id`,
  `login_name`,
  `nick_name`,
  `passwd`,
  `name`,
  `phone_num`,
  `email`,
  `head_img`,
  `user_level`,
  `birthday`,
  `gender`,
  `create_time`,
  `operate_time`
from user_info_tmp
where
  cast(coalesce(`operate_time`, `create_time`) as string) >=
  (
    select
      cast(
        coalesce(
          case
            when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
            else max(`create_time`)
          end, ''
        ) as string
      )
    from `ods_ds_hudi`.`user_info`
  );

insert into ods_ds_hudi.user_info partition(etl_date = '20230901'): 这部分指定了数据插入的目标表,即 ods_ds_hudi.user_info,并指定了插入的分区为 etl_date = '20230901'。这表示数据将插入到指定分区中。

查看分区

spark.sql("show partitions `ods_ds_hudi`.`user_info` ").show()

抽取shtd_store库中sku_info的增量数据进入Hudi的ods_ds_hudi库中表sku_info。根据ods_ds_hudi.sku_info表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.sku_info命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

--2.1创建ODS层表
spark.sql(
  """
    |create table if not exists `ods_ds_hudi`.`sku_info` (
    |  `id` int,
    |  `spu_id` int,
    |  `price` decimal,
    |  `sku_name` string,
    |  `sku_desc` string,
    |  `weight` decimal,
    |  `tm_id` int,
    |  `category3_id` int,
    |  `sku_default_img` string,
    |  `create_time` timestamp,
    |  `etl_date` string
    |  )using hudi
    |options (
    |  type = 'cow',
    |  primaryKey = 'id',
    |  preCombineField = 'create_time'
    |)
    |partitioned by (`etl_date`);
    |""".stripMargin)
--2.2创建MySQL表映射
spark.sql(
  """
    | create temporary view sku_info_tmp
    |    using org.apache.spark.sql.jdbc
    |options(
    |    url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
    |    dbtable 'sku_info',
    |    user 'root1',
    |    password '123456'
    |)
    |""".stripMargin)
--2.3增量导入数据
spark.sql("alter table `ods_ds_hudi`.`sku_info` drop partition(etl_date = '20230901') ")
spark.sql(
  """
    |insert into `ods_ds_hudi`.`sku_info` partition(etl_date = '20230901')
    |select
    |  `id`,
    |  `spu_id`,
    |  `price`,
    |  `sku_name`,
    |  `sku_desc`,
    |  `weight`,
    |  `tm_id`,
    |  `category3_id`,
    |  `sku_default_img`,
    |  `create_time`
    |  from sku_info_tmp
    |  where
    |  cast(`create_time` as string)>(select  cast(coalesce(max(`create_time`),'') as string) from `ods_ds_hudi`.`sku_info` )
    |""".stripMargin)
--2.4查看分区
spark.sql("show partitions `ods_ds_hudi`.`sku_info`").show()

抽取shtd_store库中base_province的增量数据进入Hudi的ods_ds_hudi库中表base_province。根据ods_ds_hudi.base_province表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,create_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.base_province命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

--3.1创建ODS层表
spark.sql(
  """
    |create table if not exists `ods_ds_hudi`.`base_province` (
    |  `id` int,
    |  `name` string,
    |  `region_id` string,
    |  `area_code` string,
    |  `iso_code` string,
    |  `create_time` timestamp,
    |  `etl_date` string
    |  )using hudi
    |options (
    |  type = 'cow',
    |  primaryKey = 'id',
    |  preCombineField = 'create_time'
    |)
    |partitioned by (`etl_date`);
    |""".stripMargin)
--3.2创建MySQL表映射
spark.sql(
  """
    | create temporary view base_province_tmp
    |    using org.apache.spark.sql.jdbc
    |options(
    |    url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
    |    dbtable 'base_province',
    |    user 'root1',
    |    password '123456'
    |)
    |""".stripMargin)
--3.3增量导入数据
spark.sql("alter table `ods_ds_hudi`.`base_province` drop partition(etl_date = '20230901') ")
spark.sql(
  """
    |insert into `ods_ds_hudi`.`base_province` partition(etl_date = '20230901')
    |select
    |  `id`,
    |  `name`,
    |  `region_id`,
    |  `area_code`,
    |  `iso_code`,
    |  current_timestamp as `create_time`
    |  from base_province_tmp
    |  where
    |  cast(`id` as string)>(select  cast(coalesce(max(`id`),'') as string) from `ods_ds_hudi`.`base_province` )
    |""".stripMargin)
--3.4查看分区
spark.sql("show partitions `ods_ds_hudi`.`base_province`").show()

抽取shtd_store库中base_region的增量数据进入Hudi的ods_ds_hudi库中表base_region。根据ods_ds_hudi.base_region表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,create_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.base_region命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

--4.1创建ODS层表
spark.sql(
  """
    |create table if not exists `ods_ds_hudi`.`base_region` (
    | `id` string,
    |  `region_name` string,
    |  `create_time` timestamp,
    |  `etl_date` string
    |  )using hudi
    |options (
    |  type = 'cow',
    |  primaryKey = 'id',
    |  preCombineField = 'create_time'
    |)
    |partitioned by (`etl_date`);
    |""".stripMargin)
--4.2创建MySQL表映射
spark.sql(
  """
    | create temporary view base_region_tmp
    |    using org.apache.spark.sql.jdbc
    |options(
    |    url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
    |    dbtable 'base_region',
    |    user 'root1',
    |    password '123456'
    |)
    |""".stripMargin)
--4.3增量导入数据
spark.sql("alter table `ods_ds_hudi`.`base_region` drop partition(etl_date = '20230901') ")
spark.sql(
  """
    |insert into `ods_ds_hudi`.`base_region` partition(etl_date = '20230901')
    |select
    | `id`,
    |  `region_name`,
    |  current_timestamp as `create_time`
    |  from base_region_tmp
    |  where
    |  cast(`id` as string)>(select  cast(coalesce(max(`id`),'') as string) from `ods_ds_hudi`.`base_region` )
    |""".stripMargin)
--4.4查看分区
spark.sql("show partitions `ods_ds_hudi`.`base_region`").show()

抽取shtd_store库中order_info的增量数据进入Hudi的ods_ds_hudi库中表order_info,根据ods_ds_hudi.order_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.order_info命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

--5.1创建ODS层表
spark.sql(
  """
    |create table if not exists `ods_ds_hudi`.`order_info` (
    |  `id` int,
    |  `consignee` string,
    |  `consignee_tel` string,
    |  `total_amount` decimal,
    |  `order_status` string,
    |  `user_id` int,
    |  `delivery_address` string,
    |  `order_comment` string,
    |  `out_trade_no` string,
    |  `trade_body` string,
    |  `create_time` timestamp,
    |  `operate_time` timestamp,
    |  `expire_time` timestamp,
    |  `tracking_no` string,
    |  `parent_order_id` int,
    |  `img_url` string,
    |  `province_id` int,
    |  `benefit_reduce_amount` decimal,
    |  `original_total_amount` decimal,
    |  `feight_fee` decimal,
    |  `payment_way` string,
    |  `etl_date` string
    |  )using hudi
    |options (
    |  type = 'cow',
    |  primaryKey = 'id',
    |  preCombineField = 'operate_time'
    |)
    |partitioned by (`etl_date`);
    |""".stripMargin)
--5.2创建MySQL表映射
spark.sql(
  """
    | create temporary view order_info_tmp
    |    using org.apache.spark.sql.jdbc
    |options(
    |    url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
    |    dbtable 'order_info',
    |    user 'root1',
    |    password '123456'
    |)
    |""".stripMargin)
--5.3增量导入数据
spark.sql("alter table `ods_ds_hudi`.`order_info` drop partition(etl_date = '20230901') ")
spark.sql(
  """
    |insert into `ods_ds_hudi`.`order_info` partition(etl_date = '20230901')
    |select
    |  `id`,
    |  `consignee` ,
    |  `consignee_tel`,
    |  `total_amount`,
    |  `order_status`,
    |  `user_id`,
    |  `delivery_address`,
    |  `order_comment`,
    |  `out_trade_no`,
    |  `trade_body`,
    |  `create_time`,
    |  `operate_time`,
    |  `expire_time`,
    |  `tracking_no`,
    |  `parent_order_id`,
    |  `img_url`,
    |  `province_id`,
    |  `benefit_reduce_amount`,
    |  `original_total_amount`,
    |  `feight_fee`,
    |  `payment_way`
    |  from order_info_tmp
    | where
    |  cast(coalesce(`operate_time`,`create_time`) as string) >(select cast(coalesce(case when max(`operate_time`) > max(`create_time`) then max(`operate_time`) else max(`create_time`) end,'') as string)  from `ods_ds_hudi`.`order_info` )
    |""".stripMargin)
--5.4查看分区
spark.sql("show partitions `ods_ds_hudi`.`order_info`").show()

抽取shtd_store库中order_detail的增量数据进入Hudi的ods_ds_hudi库中表order_detail,根据ods_ds_hudi.order_detail表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,create_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.order_detail命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下。

解题思路同任务要求1

--6.1创建ODS层表
spark.sql(
  """
    |create table if not exists `ods_ds_hudi`.`order_detail` (
    |  `id` int,
    |  `order_id` int,
    |  `sku_id` int,
    |  `sku_name` string,
    |  `img_url` string,
    |  `order_price` decimal,
    |  `sku_num` int,
    |  `create_time` timestamp,
    |  `etl_date` string
    |  )using hudi
    |options (
    |  type = 'cow',
    |  primaryKey = 'id',
    |  preCombineField = 'create_time'
    |)
    |partitioned by (`etl_date`);
    |""".stripMargin)
--6.2创建MySQL表映射
spark.sql(
  """
    | create temporary view order_detail_tmp
    |    using org.apache.spark.sql.jdbc
    |options(
    |    url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
    |    dbtable 'order_detail',
    |    user 'root1',
    |    password '123456'
    |)
    |""".stripMargin)
--6.3增量导入数据
spark.sql("alter table `ods_ds_hudi`.`order_detail` drop partition(etl_date = '20230901') ")
spark.sql(
  """
    |insert into `ods_ds_hudi`.`order_detail` partition(etl_date = '20230901')
    |select
    |  `id`,
    |  `order_id`,
    |  `sku_id`,
    |  `sku_name`,
    |  `img_url`,
    |  `order_price`,
    |  `sku_num`,
    |  `create_time`
    |  from order_detail_tmp
    | where
    |  cast(`create_time` as string) >(select   cast(coalesce(max(`create_time`),'') as string) from `ods_ds_hudi`.`order_detail` )
    |""".stripMargin)
--6.4查看分区
spark.sql("show partitions `ods_ds_hudi`.`order_detail`").show()
标签: 大数据

本文转载自: https://blog.csdn.net/weixin_44117248/article/details/138844979
版权归原作者 weixin_44117248 所有, 如有侵权,请联系我们删除。

“大数据国赛第2套任务B-子任务一数据抽取”的评论:

还没有评论