0


Flink实时电商数仓之DWS层

需求分析

  • 关键词在这里插入图片描述
  • 统计关键词出现的频率

IK分词

进行分词需要引入IK分词器,使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机,拆分为苹果,iphone, 手机。

<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.17</artifactId></dependency><dependency><groupId>com.janeluo</groupId><artifactId>ikanalyzer</artifactId></dependency>

测试代码如下:

publicclassIkUtil{publicstaticvoidmain(String[] args)throwsIOException{String s ="Apple 苹果15 5G手机";StringReader stringReader =newStringReader(s);IKSegmenter ikSegmenter =newIKSegmenter(stringReader,true);//第二个参数表示是否再对拆分后的单词再进行拆分,true时表示不在继续拆分Lexeme next = ikSegmenter.next();while(next!=null){System.out.println(next.getLexemeText());
            next = ikSegmenter.next();}}}

整体流程

  1. 创建自定义分词工具类IKUtil,IK是一个分词工具依赖
  2. 创建自定义函数类
  3. 注册函数
  4. 消费kafka DWD页面主题数据并设置水位线
  5. 从主流中过滤搜索行为 - page[‘item’] is not null- item_type : “keyword”- last_page_id: “search”
  6. 使用分词函数对keyword进行拆分
  7. 对keyword进行分组开窗聚合
  8. 写出到doris - 创建doris sink- flink需要打开检查点才能将数据写出到doris

在这里插入图片描述

具体实现

importcom.atguigu.gmall.realtime.common.base.BaseSQLApp;importcom.atguigu.gmall.realtime.common.constant.Constant;importcom.atguigu.gmall.realtime.common.util.SQLUtil;importcom.atguigu.gmall.realtime.dws.function.KwSplit;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.TableEnvironment;/**
 * title:
 *
 * @Author 浪拍岸
 * @Create 28/12/2023 上午11:06
 * @Version 1.0
 */publicclassDwsTrafficSourceKeywordPageViewWindowextendsBaseSQLApp{publicstaticvoidmain(String[] args){newDwsTrafficSourceKeywordPageViewWindow().start(10021,4,"dws_traffic_source_keyword_page_view_window");}@Overridepublicvoidhandle(StreamExecutionEnvironment env,TableEnvironment tableEnv,String groupId){//1. 读取主流dwd页面主题数据
        tableEnv.executeSql("create table page_info(\n"+"    `common` map<string,string>,\n"+"    `page` map<string,string>,\n"+"    `ts` bigint,\n"+"    `row_time` as to_timestamp_ltz(ts,3),\n"+"     WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"+")"+SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));//测试是否获取到数据//tableEnv.executeSql("select * from page_info").print();//2. 筛选出关键字keywordsTable keywrodTable = tableEnv.sqlQuery("select\n"+"    page['item'] keywords,\n"+"    `row_time`,\n"+"    ts\n"+" from page_info\n"+" where page['last_page_id'] = 'search'\n"+" and page['item_type'] = 'keyword'\n"+" and page['item'] is not null");
        tableEnv.createTemporaryView("keywords_table", keywrodTable);// 测试是否获取到数据//tableEnv.executeSql("select * from keywords_table").print();//3. 自定义分词函数并注册
        tableEnv.createTemporarySystemFunction("kwSplit",KwSplit.class);//4. 调用分词函数对keywords进行拆分Table splitKwTable = tableEnv.sqlQuery("select keywords, keyword, `row_time`"+" from keywords_table"+" left join lateral Table(kwSplit(keywords)) on true");
        tableEnv.createTemporaryView("split_kw_table", splitKwTable);//tableEnv.executeSql("select * from split_kw_table").print();//5. 对keyword进行分组开窗聚合Table windowAggTable = tableEnv.sqlQuery("select\n"+"    keyword,\n"+"    cast(tumble_start(row_time,interval '10' second ) as string) wStart,\n"+"    cast(tumble_end(row_time,interval '10' second ) as string) wEnd,\n"+"    cast(current_date as string)  cur_date,\n"+"    count(*) keyword_count\n"+"from split_kw_table\n"+"group by tumble(row_time, interval '10' second), keyword");//tableEnv.createTemporaryView("result_table",table);//tableEnv.executeSql("select keyword,keyword_count+1 from result_table").print();//6. 写出到doris
        tableEnv.executeSql("create table doris_sink\n"+"(\n"+"    keyword                STRING,\n"+"    wStart                 STRING,\n"+"    wEnd                   STRING,\n"+"    cur_date               STRING,\n"+"    keyword_count          BIGINT\n"+")"+SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));

        windowAggTable.insertInto("doris_sink").execute();}}
标签: flink 大数据

本文转载自: https://blog.csdn.net/qq_44273739/article/details/135236762
版权归原作者 十七✧ᐦ̤ 所有, 如有侵权,请联系我们删除。

“Flink实时电商数仓之DWS层”的评论:

还没有评论