0


全国职业院校技能大赛-大数据 离线数据处理模块-指标计算

赛题来源2023年全国职业院校技能大赛赛题第1套任务B中指标计算模块

子任务三:指标计算

编写Scala代码,使用Spark计算相关指标。

注:在指标计算中,不考虑订单信息表中order_status字段的值,将所有订单视为有效订单。计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表取最新的分区。

第一题

根据dwd层表统计每个省份、每个地区、每个月下单的数量和下单的总金额,存入MySQL数据库shtd_result的provinceeverymonth表中(表结构如下),然后在Linux的MySQL命令行中根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下

字段

类型

中文含义

备注

provinceid

int

省份表主键

provincename

text

省份名称

regionid

int

地区表主键

regionname

text

地区名称

totalconsumption

double

订单总金额

当月订单总金额

totalorder

int

订单总数

当月订单总数

year

int

订单产生的年

month

int

订单产生的月

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object Compute01 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "atguigu")

    // TODO 创建spark连接
    val conf = new SparkConf().setMaster("local[*]").setAppName("Compute01")
    val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

    // 开启动态分区
    spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    // 关闭打印日志
    spark.sparkContext.setLogLevel("OFF")

    // TODO 执行核心查询SQL
    val result = spark.sql(
      """
        |select
        |    province.id provinceid,
        |    province.name provincename,
        |    region.id regionid,
        |    region.region_name regionname,
        |    sum(final_total_amount)
        |       over(partition by province.id,region.id,year(od.create_time),month(od.create_time)) totalconsumption,
        |    count(od.id)
        |       over(partition by province.id,region.id,year(od.create_time),month(od.create_time)) totalorder,
        |    year(od.create_time) year,
        |    month(od.create_time) month
        |from (
        |     select
        |        id,
        |        province_id,
        |        final_total_amount,
        |        trade_body,
        |        create_time
        |     from dwd.act_order_info
        |) od
        |left join (
        |    select
        |        id,
        |        name,
        |        region_id
        |    from ods.dim_province
        |    where etl_date = (
        |        select max(etl_date)
        |        from ods.dim_province
        |    )
        |) province on od.province_id = province.id
        |left join (
        |    select
        |        id,
        |        region_name
        |    from dwd.dim_region
        |    where etl_date = (
        |       select max(etl_date)
        |       from ods.dim_province
        |    )
        |) region on province.region_id = region.id
        |""".stripMargin)

    // 查看结果
    result.show()

    // TODO 结果保存到指定的表中
    result.write
      .format("jdbc") // 使用jdbc格式写入带mysql
      .mode(SaveMode.Append) // 保存方式为追加
      .option("Driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://shtd_result")
      .option("user", "root")
      .option("password", "000000")
      .option("dbtable", "provinceeverymonth")
      .save()
    // shtd_result.provinceeverymonth
    // TODO 关闭spark连接
    spark.close()
  }
}

结果查询SQL

-- 订单总数
select
    *
from dwd.shtd_result.provinceeverymonth
order by totalorder desc
limit 5;
-- 订单总金额
select
    *
from dwd.shtd_result.provinceeverymonth
order by totalconsumption desc
limit 5;
-- 省份表主键
select
    *
from dwd.shtd_result.provinceeverymonth
order by provinceid desc
limit 5;

第二题

请根据dwd层表计算出2020年4月每个省份的平均订单金额和所有省份平均订单金额相比较结果(“高/低/相同”),存入MySQL数据库shtd_result的provinceavgcmp表(表结构如下)中,然后在Linux的MySQL命令行中根据省份表主键、该省平均订单金额均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下

字段

类型

中文含义

备注

provinceid

int

省份表主键

provincename

text

省份名称

provinceavgconsumption

double

该省平均订单金额

allprovinceavgconsumption

double

所有省平均订单金额

comparison

text

比较结果

该省平均订单金额和所有省平均订单金额比较结果,值为:高/低/相同

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object Compute02 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "atguigu")

    val conf = new SparkConf().setMaster("local[*]").setAppName("Compute02")
    val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

    spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    spark.sparkContext.setLogLevel("OFF")

    val result = spark.sql(
      """
        |select
        |    provinceid,
        |    provincename,
        |    provinceavgconsumption,
        |    allprovinceavgconsumption,
        |    case
        |        when provinceavgconsumption > allprovinceavgconsumption then '高'
        |        when provinceavgconsumption < allprovinceavgconsumption then '低'
        |        else '相同'
        |    end comparison -- 比较结果
        |from
        |(
        |    select
        |        id provinceid,
        |        name provincename
        |    from dwd.dim_province
        |    where etl_date = (
        |        select max(etl_date) from ods.base_province
        |    )
        |) province
        |left join (
        |    select
        |        province_id,
        |        avg(final_total_amount) provinceavgconsumption -- 该省平均订单金额
        |    from ods.order_info
        |    where create_time between '2020-04-01' and '2020-04-30'
        |    group by dwd.act_order_info
        |) od on od.province_id = province.provinceid
        |left join (
        |    select
        |        province_id,
        |        avg(final_total_amount) allprovinceavgconsumption -- 所有省平均订单金额
        |    from dwd.act_order_info
        |    where create_time between '2020-06-01' and '2022-06-30'
        |) avgorder on avgorder.province_id = province.provinceid
        |""".stripMargin)

    result
      .write
      .format("jdbc")
      .mode(SaveMode.Append)
      .option("Driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://shtd_result")
      .option("user", "root")
      .option("password", "000000")
      .option("dbtable", "provinceavgcmp")
      .save()

    spark.close()
  }
}

结果查询SQL

-- 省份表主键
select *
from shtd_result.provinceavgcmp
order by provinceid desc
limit 5;
-- 该省平均订单金额
select
    *
from shtd_result.provinceavgcmp
order by provinceavgconsumption desc
limit 5;

第三题

根据dwd层表统计在两天内连续下单并且下单金额保持增长的用户,存入MySQL数据库shtd_result的usercontinueorder表(表结构如下)中,然后在Linux的MySQL命令行中根据订单总数、订单总金额、客户主键均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下

字段

类型

中文含义

备注

userid

int

客户主键

username

text

客户名称

day

text

记录下单日的时间,格式为

yyyyMMdd_yyyyMMdd

例如: 20220101_20220102

totalconsumption

double

订单总金额

连续两天的订单总金额

totalorder

int

订单总数

连续两天的订单总数

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object Compute03 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "atguigu")

    val conf = new SparkConf().setMaster("local[*]").setAppName("Compute03")
    val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

    spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    spark.sparkContext.setLogLevel("OFF")

    val result = spark.sql(
      """
        | select
        |     userid,
        |     username,
        |     buy_date_first,
        |     buy_date_second,
        |     concat(buy_date_first, '_', buy_date_second) day,
        |     totalconsumption,
        |     totalorder
        | from (
        |     select
        |      od1.user_id userid,
        |      od2.consignee username,
        |      buy_date_first,
        |      buy_date_second,
        |      totalconsumption,
        |      od1.totalorder,
        |      datediff(buy_date_second, buy_date_first) part_date_num,
        |      if (buy_amount_second - total_amount > 0, 1, 0) part_amount_increase
        |     from (
        |         select
        |             user_id ,
        |             create_time buy_date_first, -- 获取当前时间的下一天
        |             count(id) totalorder,
        |             lead(create_time, 1, "9999-12-31 00:00:00") over (partition by user_id order by create_time) buy_date_second,
        |             lead(final_total_amount) over(partition by user_id order by create_time) buy_amount_second,
        |             sum(total_amount) over (partition by user_id) totalconsumption
        |         from dwd.act_order_info
        |         group by user_id, date_format(create_time, 'yyyyMMdd')
        |     ) od1
        |     left join (
        |     select
        |         user_id,
        |         consignee,
        |         final_total_amount
        |     from dwd.act_order_info
        |     ) od2 on od1.user_id = od2.user_id
        | )
        | where part_date_num = 1        -- 连续两天的订单
        | and part_amount_increase = 1   -- 订单金额保持增长
        |""".stripMargin)

    result
      .write
      .format("jdbc")
      .mode(SaveMode.Append)
      .option("Driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://shtd_result")
      .option("user", "root")
      .option("password", "000000")
      .option("dbtable", "provinceavgcmp")
      .save()

    spark.close()
  }
}

结果查询sql

-- 订单总数
select
    *
from shtd_result.usercontinueorder
order by totalorder desc
limit 5;
-- 订单总金额
select
    *
from shtd_result.usercontinueorder
order by totalconsumption
limit 5;
-- 客户主键
select
    *
from shtd_result.usercontinueorder
order by userid desc
limit 5;

指标计算部分的难点就是多表查询的部分已经开窗函数的合理运用,因此熟练掌握HiveSQL中高级函数的部分是非常重要的,不然此部分将会很难完成

标签: 大数据 spark scala

本文转载自: https://blog.csdn.net/weixin_71868447/article/details/135429861
版权归原作者 张一西158 所有, 如有侵权,请联系我们删除。

“全国职业院校技能大赛-大数据 离线数据处理模块-指标计算”的评论:

还没有评论