0


湖仓一体电商项目(十二):编写写入DM层业务代码

编写写入DM层业务代码

DM层主要是报表数据,针对实时业务将DM层设置在Clickhouse中,在此业务中DM层主要存储的是通过Flink读取Kafka “KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC” topic中的数据进行设置窗口分析,每隔10s设置滚动窗口统计该窗口内访问商品及商品一级、二级分类分析结果,实时写入到Clickhouse中。

一、​​​​​​​​​​​​​​代码编写

具体代码参照“ProcessBrowseLogInfoToDM.scala”,大体代码逻辑如下:

  1. object ProcessBrowseLogInfoToDM {
  2. def main(args: Array[String]): Unit = {
  3. //1.准备环境
  4. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  5. val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
  6. env.enableCheckpointing(5000)
  7. import org.apache.flink.streaming.api.scala._
  8. /**
  9. * 2.创建 Kafka Connector,连接消费Kafka dwd中数据
  10. *
  11. */
  12. tblEnv.executeSql(
  13. """
  14. |create table kafka_dws_user_login_wide_tbl (
  15. | user_id string,
  16. | product_name string,
  17. | first_category_name string,
  18. | second_category_name string,
  19. | obtain_points string
  20. |) with (
  21. | 'connector' = 'kafka',
  22. | 'topic' = 'KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC',
  23. | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
  24. | 'scan.startup.mode'='earliest-offset', --也可以指定 earliest-offset 、latest-offset
  25. | 'properties.group.id' = 'my-group-id',
  26. | 'format' = 'json'
  27. |)
  28. """.stripMargin)
  29. /**
  30. * 3.实时统计每个用户最近10s浏览的商品次数和商品一级、二级种类次数,存入到Clickhouse
  31. */
  32. val dwsTbl:Table = tblEnv.sqlQuery(
  33. """
  34. | select user_id,product_name,first_category_name,second_category_name from kafka_dws_user_login_wide_tbl
  35. """.stripMargin)
  36. //4.将Row 类型数据转换成对象类型操作
  37. val browseDS: DataStream[BrowseLogWideInfo] = tblEnv.toAppendStream[Row](dwsTbl)
  38. .map(row => {
  39. val user_id: String = row.getField(0).toString
  40. val product_name: String = row.getField(1).toString
  41. val first_category_name: String = row.getField(2).toString
  42. val second_category_name: String = row.getField(3).toString
  43. BrowseLogWideInfo(null, user_id, null, product_name, null, null, first_category_name, second_category_name, null)
  44. })
  45. val dwsDS: DataStream[ProductVisitInfo] = browseDS.keyBy(info => {
  46. info.first_category_name + "-" + info.second_category_name + "-" + info.product_name
  47. })
  48. .timeWindow(Time.seconds(10))
  49. .process(new ProcessWindowFunction[BrowseLogWideInfo, ProductVisitInfo, String, TimeWindow] {
  50. override def process(key: String, context: Context, elements: Iterable[BrowseLogWideInfo], out: Collector[ProductVisitInfo]): Unit = {
  51. val currentDt: String = DateUtil.getDateYYYYMMDD(context.window.getStart.toString)
  52. val startTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getStart.toString)
  53. val endTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getEnd.toString)
  54. val arr: Array[String] = key.split("-")
  55. val firstCatName: String = arr(0)
  56. val secondCatName: String = arr(1)
  57. val productName: String = arr(2)
  58. val cnt: Int = elements.toList.size
  59. out.collect(ProductVisitInfo(currentDt, startTime, endTime, firstCatName, secondCatName, productName, cnt))
  60. }
  61. })
  62. /**
  63. * 5.将以上结果写入到Clickhouse表 dm_product_visit_info 表中
  64. * create table dm_product_visit_info(
  65. * current_dt String,
  66. * window_start String,
  67. * window_end String,
  68. * first_cat String,
  69. * second_cat String,
  70. * product String,
  71. * product_cnt UInt32
  72. * ) engine = MergeTree() order by current_dt
  73. *
  74. */
  75. //准备向ClickHouse中插入数据的sql
  76. val insertIntoCkSql = "insert into dm_product_visit_info (current_dt,window_start,window_end,first_cat,second_cat,product,product_cnt) values (?,?,?,?,?,?,?)"
  77. val ckSink: SinkFunction[ProductVisitInfo] = MyClickHouseUtil.clickhouseSink[ProductVisitInfo](insertIntoCkSql,new JdbcStatementBuilder[ProductVisitInfo] {
  78. override def accept(pst: PreparedStatement, productVisitInfo: ProductVisitInfo): Unit = {
  79. pst.setString(1,productVisitInfo.currentDt)
  80. pst.setString(2,productVisitInfo.windowStart)
  81. pst.setString(3,productVisitInfo.windowEnd)
  82. pst.setString(4,productVisitInfo.firstCat)
  83. pst.setString(5,productVisitInfo.secondCat)
  84. pst.setString(6,productVisitInfo.product)
  85. pst.setLong(7,productVisitInfo.productCnt)
  86. }
  87. })
  88. //针对数据加入sink
  89. dwsDS.addSink(ckSink)
  90. env.execute()
  91. }
  92. }

二、创建Clickhouse-DM层表

代码在执行之前需要在Clickhouse中创建对应的DM层商品浏览信息表dm_product_visit_info,clickhouse建表语句如下:

  1. #node1节点启动clickhouse
  2. [root@node1 bin]# service clickhouse-server start
  3. #node1节点进入clickhouse
  4. [root@node1 bin]# clickhouse-client -m
  5. #node1节点创建clickhouse-DM层表
  6. create table dm_product_visit_info(
  7. current_dt String,
  8. window_start String,
  9. window_end String,
  10. first_cat String,
  11. second_cat String,
  12. product String,
  13. product_cnt UInt32
  14. ) engine = MergeTree() order by current_dt;

三、​​​​​​​代码测试

以上代码编写完成后,代码执行测试步骤如下:

1、将代码中消费Kafka数据改成从头开始消费

代码中Kafka Connector中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。

这里也可以不设置从头开始消费Kafka数据,而是直接启动向日志采集接口模拟生产日志代码“RTMockUserLogData.java”,需要启动日志采集接口及Flume。

2、执行代码,查看对应结果

以上代码执行后在,在Clickhouse-DM层中表“dm_product_visit_info”中查看对应数据结果如下:

四、架构图


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文转载自: https://blog.csdn.net/xiaoweite1/article/details/126795045
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。

“湖仓一体电商项目(十二):编写写入DM层业务代码”的评论:

还没有评论