需求分析
- 关键词
- 统计关键词出现的频率
IK分词
进行分词需要引入IK分词器,使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机,拆分为苹果,iphone, 手机。
<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.17</artifactId></dependency><dependency><groupId>com.janeluo</groupId><artifactId>ikanalyzer</artifactId></dependency>
测试代码如下:
publicclassIkUtil{publicstaticvoidmain(String[] args)throwsIOException{String s ="Apple 苹果15 5G手机";StringReader stringReader =newStringReader(s);IKSegmenter ikSegmenter =newIKSegmenter(stringReader,true);//第二个参数表示是否再对拆分后的单词再进行拆分,true时表示不在继续拆分Lexeme next = ikSegmenter.next();while(next!=null){System.out.println(next.getLexemeText());
next = ikSegmenter.next();}}}
整体流程
- 创建自定义分词工具类IKUtil,IK是一个分词工具依赖
- 创建自定义函数类
- 注册函数
- 消费kafka DWD页面主题数据并设置水位线
- 从主流中过滤搜索行为 - page[‘item’] is not null- item_type : “keyword”- last_page_id: “search”
- 使用分词函数对keyword进行拆分
- 对keyword进行分组开窗聚合
- 写出到doris - 创建doris sink- flink需要打开检查点才能将数据写出到doris
具体实现
importcom.atguigu.gmall.realtime.common.base.BaseSQLApp;importcom.atguigu.gmall.realtime.common.constant.Constant;importcom.atguigu.gmall.realtime.common.util.SQLUtil;importcom.atguigu.gmall.realtime.dws.function.KwSplit;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.TableEnvironment;/**
* title:
*
* @Author 浪拍岸
* @Create 28/12/2023 上午11:06
* @Version 1.0
*/publicclassDwsTrafficSourceKeywordPageViewWindowextendsBaseSQLApp{publicstaticvoidmain(String[] args){newDwsTrafficSourceKeywordPageViewWindow().start(10021,4,"dws_traffic_source_keyword_page_view_window");}@Overridepublicvoidhandle(StreamExecutionEnvironment env,TableEnvironment tableEnv,String groupId){//1. 读取主流dwd页面主题数据
tableEnv.executeSql("create table page_info(\n"+" `common` map<string,string>,\n"+" `page` map<string,string>,\n"+" `ts` bigint,\n"+" `row_time` as to_timestamp_ltz(ts,3),\n"+" WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"+")"+SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));//测试是否获取到数据//tableEnv.executeSql("select * from page_info").print();//2. 筛选出关键字keywordsTable keywrodTable = tableEnv.sqlQuery("select\n"+" page['item'] keywords,\n"+" `row_time`,\n"+" ts\n"+" from page_info\n"+" where page['last_page_id'] = 'search'\n"+" and page['item_type'] = 'keyword'\n"+" and page['item'] is not null");
tableEnv.createTemporaryView("keywords_table", keywrodTable);// 测试是否获取到数据//tableEnv.executeSql("select * from keywords_table").print();//3. 自定义分词函数并注册
tableEnv.createTemporarySystemFunction("kwSplit",KwSplit.class);//4. 调用分词函数对keywords进行拆分Table splitKwTable = tableEnv.sqlQuery("select keywords, keyword, `row_time`"+" from keywords_table"+" left join lateral Table(kwSplit(keywords)) on true");
tableEnv.createTemporaryView("split_kw_table", splitKwTable);//tableEnv.executeSql("select * from split_kw_table").print();//5. 对keyword进行分组开窗聚合Table windowAggTable = tableEnv.sqlQuery("select\n"+" keyword,\n"+" cast(tumble_start(row_time,interval '10' second ) as string) wStart,\n"+" cast(tumble_end(row_time,interval '10' second ) as string) wEnd,\n"+" cast(current_date as string) cur_date,\n"+" count(*) keyword_count\n"+"from split_kw_table\n"+"group by tumble(row_time, interval '10' second), keyword");//tableEnv.createTemporaryView("result_table",table);//tableEnv.executeSql("select keyword,keyword_count+1 from result_table").print();//6. 写出到doris
tableEnv.executeSql("create table doris_sink\n"+"(\n"+" keyword STRING,\n"+" wStart STRING,\n"+" wEnd STRING,\n"+" cur_date STRING,\n"+" keyword_count BIGINT\n"+")"+SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));
windowAggTable.insertInto("doris_sink").execute();}}
本文转载自: https://blog.csdn.net/qq_44273739/article/details/135236762
版权归原作者 十七✧ᐦ̤ 所有, 如有侵权,请联系我们删除。
版权归原作者 十七✧ᐦ̤ 所有, 如有侵权,请联系我们删除。