0


全国职业院校技能大赛-大数据 离线数据处理模块-指标计算

赛题来源2023年全国职业院校技能大赛赛题第1套任务B中指标计算模块

子任务三:指标计算

编写Scala代码,使用Spark计算相关指标。

注:在指标计算中,不考虑订单信息表中order_status字段的值,将所有订单视为有效订单。计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表取最新的分区。

第一题

根据dwd层表统计每个省份、每个地区、每个月下单的数量和下单的总金额,存入MySQL数据库shtd_result的provinceeverymonth表中(表结构如下),然后在Linux的MySQL命令行中根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下

字段

类型

中文含义

备注

provinceid

int

省份表主键

provincename

text

省份名称

regionid

int

地区表主键

regionname

text

地区名称

totalconsumption

double

订单总金额

当月订单总金额

totalorder

int

订单总数

当月订单总数

year

int

订单产生的年

month

int

订单产生的月

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql.{SaveMode, SparkSession}
  3. object Compute01 {
  4. def main(args: Array[String]): Unit = {
  5. System.setProperty("HADOOP_USER_NAME", "atguigu")
  6. // TODO 创建spark连接
  7. val conf = new SparkConf().setMaster("local[*]").setAppName("Compute01")
  8. val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
  9. // 开启动态分区
  10. spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
  11. // 关闭打印日志
  12. spark.sparkContext.setLogLevel("OFF")
  13. // TODO 执行核心查询SQL
  14. val result = spark.sql(
  15. """
  16. |select
  17. | province.id provinceid,
  18. | province.name provincename,
  19. | region.id regionid,
  20. | region.region_name regionname,
  21. | sum(final_total_amount)
  22. | over(partition by province.id,region.id,year(od.create_time),month(od.create_time)) totalconsumption,
  23. | count(od.id)
  24. | over(partition by province.id,region.id,year(od.create_time),month(od.create_time)) totalorder,
  25. | year(od.create_time) year,
  26. | month(od.create_time) month
  27. |from (
  28. | select
  29. | id,
  30. | province_id,
  31. | final_total_amount,
  32. | trade_body,
  33. | create_time
  34. | from dwd.act_order_info
  35. |) od
  36. |left join (
  37. | select
  38. | id,
  39. | name,
  40. | region_id
  41. | from ods.dim_province
  42. | where etl_date = (
  43. | select max(etl_date)
  44. | from ods.dim_province
  45. | )
  46. |) province on od.province_id = province.id
  47. |left join (
  48. | select
  49. | id,
  50. | region_name
  51. | from dwd.dim_region
  52. | where etl_date = (
  53. | select max(etl_date)
  54. | from ods.dim_province
  55. | )
  56. |) region on province.region_id = region.id
  57. |""".stripMargin)
  58. // 查看结果
  59. result.show()
  60. // TODO 结果保存到指定的表中
  61. result.write
  62. .format("jdbc") // 使用jdbc格式写入带mysql
  63. .mode(SaveMode.Append) // 保存方式为追加
  64. .option("Driver", "com.mysql.jdbc.Driver")
  65. .option("url", "jdbc:mysql://shtd_result")
  66. .option("user", "root")
  67. .option("password", "000000")
  68. .option("dbtable", "provinceeverymonth")
  69. .save()
  70. // shtd_result.provinceeverymonth
  71. // TODO 关闭spark连接
  72. spark.close()
  73. }
  74. }

结果查询SQL

  1. -- 订单总数
  2. select
  3. *
  4. from dwd.shtd_result.provinceeverymonth
  5. order by totalorder desc
  6. limit 5;
  7. -- 订单总金额
  8. select
  9. *
  10. from dwd.shtd_result.provinceeverymonth
  11. order by totalconsumption desc
  12. limit 5;
  13. -- 省份表主键
  14. select
  15. *
  16. from dwd.shtd_result.provinceeverymonth
  17. order by provinceid desc
  18. limit 5;

第二题

请根据dwd层表计算出2020年4月每个省份的平均订单金额和所有省份平均订单金额相比较结果(“高/低/相同”),存入MySQL数据库shtd_result的provinceavgcmp表(表结构如下)中,然后在Linux的MySQL命令行中根据省份表主键、该省平均订单金额均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下

字段

类型

中文含义

备注

provinceid

int

省份表主键

provincename

text

省份名称

provinceavgconsumption

double

该省平均订单金额

allprovinceavgconsumption

double

所有省平均订单金额

comparison

text

比较结果

该省平均订单金额和所有省平均订单金额比较结果,值为:高/低/相同

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql.{SaveMode, SparkSession}
  3. object Compute02 {
  4. def main(args: Array[String]): Unit = {
  5. System.setProperty("HADOOP_USER_NAME", "atguigu")
  6. val conf = new SparkConf().setMaster("local[*]").setAppName("Compute02")
  7. val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
  8. spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
  9. spark.sparkContext.setLogLevel("OFF")
  10. val result = spark.sql(
  11. """
  12. |select
  13. | provinceid,
  14. | provincename,
  15. | provinceavgconsumption,
  16. | allprovinceavgconsumption,
  17. | case
  18. | when provinceavgconsumption > allprovinceavgconsumption then '高'
  19. | when provinceavgconsumption < allprovinceavgconsumption then '低'
  20. | else '相同'
  21. | end comparison -- 比较结果
  22. |from
  23. |(
  24. | select
  25. | id provinceid,
  26. | name provincename
  27. | from dwd.dim_province
  28. | where etl_date = (
  29. | select max(etl_date) from ods.base_province
  30. | )
  31. |) province
  32. |left join (
  33. | select
  34. | province_id,
  35. | avg(final_total_amount) provinceavgconsumption -- 该省平均订单金额
  36. | from ods.order_info
  37. | where create_time between '2020-04-01' and '2020-04-30'
  38. | group by dwd.act_order_info
  39. |) od on od.province_id = province.provinceid
  40. |left join (
  41. | select
  42. | province_id,
  43. | avg(final_total_amount) allprovinceavgconsumption -- 所有省平均订单金额
  44. | from dwd.act_order_info
  45. | where create_time between '2020-06-01' and '2022-06-30'
  46. |) avgorder on avgorder.province_id = province.provinceid
  47. |""".stripMargin)
  48. result
  49. .write
  50. .format("jdbc")
  51. .mode(SaveMode.Append)
  52. .option("Driver", "com.mysql.jdbc.Driver")
  53. .option("url", "jdbc:mysql://shtd_result")
  54. .option("user", "root")
  55. .option("password", "000000")
  56. .option("dbtable", "provinceavgcmp")
  57. .save()
  58. spark.close()
  59. }
  60. }

结果查询SQL

  1. -- 省份表主键
  2. select *
  3. from shtd_result.provinceavgcmp
  4. order by provinceid desc
  5. limit 5;
  6. -- 该省平均订单金额
  7. select
  8. *
  9. from shtd_result.provinceavgcmp
  10. order by provinceavgconsumption desc
  11. limit 5;

第三题

根据dwd层表统计在两天内连续下单并且下单金额保持增长的用户,存入MySQL数据库shtd_result的usercontinueorder表(表结构如下)中,然后在Linux的MySQL命令行中根据订单总数、订单总金额、客户主键均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下

字段

类型

中文含义

备注

userid

int

客户主键

username

text

客户名称

day

text

记录下单日的时间,格式为

yyyyMMdd_yyyyMMdd

例如: 20220101_20220102

totalconsumption

double

订单总金额

连续两天的订单总金额

totalorder

int

订单总数

连续两天的订单总数

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.sql.{SaveMode, SparkSession}
  3. object Compute03 {
  4. def main(args: Array[String]): Unit = {
  5. System.setProperty("HADOOP_USER_NAME", "atguigu")
  6. val conf = new SparkConf().setMaster("local[*]").setAppName("Compute03")
  7. val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
  8. spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
  9. spark.sparkContext.setLogLevel("OFF")
  10. val result = spark.sql(
  11. """
  12. | select
  13. | userid,
  14. | username,
  15. | buy_date_first,
  16. | buy_date_second,
  17. | concat(buy_date_first, '_', buy_date_second) day,
  18. | totalconsumption,
  19. | totalorder
  20. | from (
  21. | select
  22. | od1.user_id userid,
  23. | od2.consignee username,
  24. | buy_date_first,
  25. | buy_date_second,
  26. | totalconsumption,
  27. | od1.totalorder,
  28. | datediff(buy_date_second, buy_date_first) part_date_num,
  29. | if (buy_amount_second - total_amount > 0, 1, 0) part_amount_increase
  30. | from (
  31. | select
  32. | user_id ,
  33. | create_time buy_date_first, -- 获取当前时间的下一天
  34. | count(id) totalorder,
  35. | lead(create_time, 1, "9999-12-31 00:00:00") over (partition by user_id order by create_time) buy_date_second,
  36. | lead(final_total_amount) over(partition by user_id order by create_time) buy_amount_second,
  37. | sum(total_amount) over (partition by user_id) totalconsumption
  38. | from dwd.act_order_info
  39. | group by user_id, date_format(create_time, 'yyyyMMdd')
  40. | ) od1
  41. | left join (
  42. | select
  43. | user_id,
  44. | consignee,
  45. | final_total_amount
  46. | from dwd.act_order_info
  47. | ) od2 on od1.user_id = od2.user_id
  48. | )
  49. | where part_date_num = 1 -- 连续两天的订单
  50. | and part_amount_increase = 1 -- 订单金额保持增长
  51. |""".stripMargin)
  52. result
  53. .write
  54. .format("jdbc")
  55. .mode(SaveMode.Append)
  56. .option("Driver", "com.mysql.jdbc.Driver")
  57. .option("url", "jdbc:mysql://shtd_result")
  58. .option("user", "root")
  59. .option("password", "000000")
  60. .option("dbtable", "provinceavgcmp")
  61. .save()
  62. spark.close()
  63. }
  64. }

结果查询sql

  1. -- 订单总数
  2. select
  3. *
  4. from shtd_result.usercontinueorder
  5. order by totalorder desc
  6. limit 5;
  7. -- 订单总金额
  8. select
  9. *
  10. from shtd_result.usercontinueorder
  11. order by totalconsumption
  12. limit 5;
  13. -- 客户主键
  14. select
  15. *
  16. from shtd_result.usercontinueorder
  17. order by userid desc
  18. limit 5;

指标计算部分的难点就是多表查询的部分已经开窗函数的合理运用,因此熟练掌握HiveSQL中高级函数的部分是非常重要的,不然此部分将会很难完成

标签: 大数据 spark scala

本文转载自: https://blog.csdn.net/weixin_71868447/article/details/135429861
版权归原作者 张一西158 所有, 如有侵权,请联系我们删除。

“全国职业院校技能大赛-大数据 离线数据处理模块-指标计算”的评论:

还没有评论