前言
在一些大型的电商网站上,对于产品经理或线上推广的营销人员来说,经常需要对页面的点击量进行分析统计,从而为页面的广告投放做更精确的数据支撑;
在实际的业务场景中,大致是这样的一个流程,页面先做用户行为的日志数据埋点,然后由实时或准实时应用将行为数据经过ETL落盘(HDFS或大数据存储引擎),之后再由下游应用对这些行为日志的数据根据业务指标进行统计分析,输出并展示成相关的大屏或报表;
如下,为一个经过ETL之后的页面点击行为的CSV文件,对于每一行数据来说,按照逗号进行分割的话,从左到右,每个字段的含义依次表示:用户ID,广告ID,省份,城市,以及时间戳;
业务实现的需求:
- 从埋点日志中,统计每小时页面广告的点击量,5秒刷新一次,并按照不同省份进行划分;
- 对于“刷单”式的频繁点击行为进行过滤,并将该用户加入黑名单;
解决思路分析:
- 根据省份进行分组,创建长度为1小时、滑动距离为5秒的时间窗口进行统计;
- 利用用 process function 进行黑名单过滤,检测用户对同一广告(页面)的点击量,如果超过指定的上限,将用户信息以侧输出流输出到黑名单中(后续针对该用户进行相关的处理);
本例的需求主要有2个,下面就这两个需求分别进行实现
一、按时间窗口统计各省份广告点击量
1、定义CSV文件的值对象
publicclassAdClickEvent{privateLong userId;privateLong adId;privateString province;privateString city;privateLong timestamp;publicAdClickEvent(){}publicAdClickEvent(Long userId,Long adId,String province,String city,Long timestamp){this.userId = userId;this.adId = adId;this.province = province;this.city = city;this.timestamp = timestamp;}publicLonggetUserId(){return userId;}publicvoidsetUserId(Long userId){this.userId = userId;}publicLonggetAdId(){return adId;}publicvoidsetAdId(Long adId){this.adId = adId;}publicStringgetProvince(){return province;}publicvoidsetProvince(String province){this.province = province;}publicStringgetCity(){return city;}publicvoidsetCity(String city){this.city = city;}publicLonggetTimestamp(){return timestamp;}publicvoidsetTimestamp(Long timestamp){this.timestamp = timestamp;}@OverridepublicStringtoString(){return"AdClickEvent{"+"userId="+ userId +", adId="+ adId +", province='"+ province +'\''+", city='"+ city +'\''+", timestamp="+ timestamp +'}';}}
2、输出的结果值对象
publicclassAddCountResult{privateString province;privateString windowEnd;privateLong count;publicAddCountResult(String province,String windowEnd,Long count){this.province = province;this.windowEnd = windowEnd;this.count = count;}publicAddCountResult(){}publicStringgetProvince(){return province;}publicvoidsetProvince(String province){this.province = province;}publicStringgetWindowEnd(){return windowEnd;}publicvoidsetWindowEnd(String windowEnd){this.windowEnd = windowEnd;}publicLonggetCount(){return count;}publicvoidsetCount(Long count){this.count = count;}@OverridepublicStringtoString(){return"AddCountResult{"+"province='"+ province +'\''+", windowEnd='"+ windowEnd +'\''+", count="+ count +'}';}}
3、统计输出核心代码
importorg.apache.flink.api.common.functions.AggregateFunction;importorg.apache.flink.streaming.api.TimeCharacteristic;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importjava.sql.Timestamp;publicclassAdvertiseClickAgg{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//时间语义设置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//从文件中读取数据String path ="E:\\code-self\\flink_study\\src\\main\\resources\\AdClickLog.csv";//读取数据SingleOutputStreamOperator<AdClickEvent> adClickEventStream = env.readTextFile(path).map(line ->{String[] fields = line.split(",");returnnewAdClickEvent(newLong(fields[0]),newLong(fields[1]), fields[2], fields[3],newLong(fields[4]));}).assignTimestampsAndWatermarks(newAscendingTimestampExtractor<AdClickEvent>(){@OverridepubliclongextractAscendingTimestamp(AdClickEvent element){return element.getTimestamp()*1000L;}});//基于省份分组,进行开窗聚合SingleOutputStreamOperator<AddCountResult> addCountResultStream = adClickEventStream.keyBy(AdClickEvent::getProvince).timeWindow(Time.hours(1),Time.minutes(5)).aggregate(newAdCountAgg(),newAdCountRes());
addCountResultStream.print();
env.execute("add click count by province");}publicstaticclassAdCountResimplementsWindowFunction<Long,AddCountResult,String,TimeWindow>{@Overridepublicvoidapply(String province,TimeWindow window,Iterable<Long> input,Collector<AddCountResult> out)throwsException{String windowEnd =newTimestamp(window.getEnd()).toString();Long count = input.iterator().next();
out.collect(newAddCountResult(province,windowEnd,count));}}publicstaticclassAdCountAggimplementsAggregateFunction<AdClickEvent,Long,Long>{@OverridepublicLongcreateAccumulator(){return0L;}@OverridepublicLongadd(AdClickEvent adClickEvent,Long aLong){return aLong +1;}@OverridepublicLonggetResult(Long aLong){return aLong;}@OverridepublicLongmerge(Long aLong,Long acc1){return aLong + acc1;}}}
在这段代码中,由于我们的需求是按照窗口进行区域广告数据的统计,因此肯定要用到WindowFunction窗口函数 与 AggregateFunction 统计函数;
运行这段代码,观察控制台输出效果,这个效果即为程序中定义的,每个小时为一个统计,每5分钟滚动一次输出统计结果;
统计每个省用户广告点击量并过滤恶意点击的用户
在上一个需求的基础上,进一步有这样一个需求,即对那些恶意刷页面的用户,需要能够通过程序过滤这些用户,并以侧输出流的方式进行告警输出;
1、定义一个用于侧输出流告警的值对象
publicclassBlackListUserWarning{privateLong userId;privateLong adId;privateString warningMsg;publicBlackListUserWarning(Long userId,Long adId,String warningMsg){this.userId = userId;this.adId = adId;this.warningMsg = warningMsg;}publicBlackListUserWarning(){}publicLonggetUserId(){return userId;}publicvoidsetUserId(Long userId){this.userId = userId;}publicLonggetAdId(){return adId;}publicvoidsetAdId(Long adId){this.adId = adId;}publicStringgetWarningMsg(){return warningMsg;}publicvoidsetWarningMsg(String warningMsg){this.warningMsg = warningMsg;}@OverridepublicStringtoString(){return"BlackListUserWarning{"+"userId="+ userId +", adId="+ adId +", warningMsg='"+ warningMsg +'\''+'}';}}
2、核心代码块
importorg.apache.flink.api.common.functions.AggregateFunction;importorg.apache.flink.api.common.state.ValueState;importorg.apache.flink.api.common.state.ValueStateDescriptor;importorg.apache.flink.api.java.tuple.Tuple;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.TimeCharacteristic;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.KeyedProcessFunction;importorg.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importorg.apache.flink.util.OutputTag;importjava.io.IOException;importjava.sql.Timestamp;publicclassAdvertiseClickAggBlackIp{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//时间语义设置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//从文件中读取数据String path ="E:\\code-self\\flink_study\\src\\main\\resources\\AdClickLog.csv";//读取数据SingleOutputStreamOperator<AdClickEvent> adClickEventStream = env.readTextFile(path).map(line ->{String[] fields = line.split(",");returnnewAdClickEvent(newLong(fields[0]),newLong(fields[1]), fields[2], fields[3],newLong(fields[4]));}).assignTimestampsAndWatermarks(newAscendingTimestampExtractor<AdClickEvent>(){@OverridepubliclongextractAscendingTimestamp(AdClickEvent element){return element.getTimestamp()*1000L;}});SingleOutputStreamOperator<AdClickEvent> filterAddClickStream = adClickEventStream
.keyBy("userId","adId").process(newFilterBalckIpListUser(100));//基于省份分组,进行开窗聚合SingleOutputStreamOperator<AddCountResult> addCountResultStream = filterAddClickStream.keyBy(AdClickEvent::getProvince).timeWindow(Time.hours(1),Time.minutes(5)).aggregate(newAdCountAgg(),newAdCountRes());
addCountResultStream.print("主流输出");
filterAddClickStream.getSideOutput(newOutputTag<BlackListUserWarning>("blacklist"){}).print("侧输出流");
env.execute("add click count by province");}publicstaticclassFilterBalckIpListUserextendsKeyedProcessFunction<Tuple,AdClickEvent,AdClickEvent>{privateInteger countUpperBound;publicFilterBalckIpListUser(Integer countUpperBound){this.countUpperBound=countUpperBound;}//保存用户点击某个广告的次数ValueState<Long> countState;//保存当前用户之前是否存在黑名单里面了ValueState<Boolean> isSentState;@Overridepublicvoidopen(Configuration parameters)throwsException{
countState =getRuntimeContext().getState(newValueStateDescriptor<Long>("ad-count",Long.class,0L));
isSentState =getRuntimeContext().getState(newValueStateDescriptor<Boolean>("is-sent",Boolean.class,false));}@OverridepublicvoidprocessElement(AdClickEvent adClickEvent,Context ctx,Collector<AdClickEvent> out){//判断当前用户对同一广告的点击次数,如果没有达到,count ++ 正常输出内容, 如果达到上限,则过滤掉,通过侧输出流输出到黑名单try{Long currentCount = countState.value();//每天凌晨定时清理count值,避免一直增长//这里只需要判断是否是第一个进来的数据,如果是,注册一个第二天凌晨的定时器if(currentCount ==0){Long ts =(ctx.timerService().currentProcessingTime()/(24*60*60*1000)+1)*(24*60*60*1000)-8*60*60*1000;
ctx.timerService().registerProcessingTimeTimer(ts);}if(currentCount > countUpperBound){//进一步判断是否之前有输出到黑名单if(!isSentState.value()){
isSentState.update(true);
ctx.output(newOutputTag<BlackListUserWarning>("blacklist"){},newBlackListUserWarning(adClickEvent.getUserId(),adClickEvent.getAdId(),"click over "+ countUpperBound +"times"));}return;//不再执行后面的操作}//如果没有返回,将点击次数增加,更新状态,正常输出当前数据到主流
countState.update(currentCount +1);
out.collect(adClickEvent);}catch(IOException e){
e.printStackTrace();}}@OverridepublicvoidonTimer(long timestamp,OnTimerContext ctx,Collector<AdClickEvent> out)throwsException{
countState.clear();
isSentState.clear();}}publicstaticclassAdCountResimplementsWindowFunction<Long,AddCountResult,String,TimeWindow>{@Overridepublicvoidapply(String province,TimeWindow window,Iterable<Long> input,Collector<AddCountResult> out)throwsException{String windowEnd =newTimestamp(window.getEnd()).toString();Long count = input.iterator().next();
out.collect(newAddCountResult(province,windowEnd,count));}}publicstaticclassAdCountAggimplementsAggregateFunction<AdClickEvent,Long,Long>{@OverridepublicLongcreateAccumulator(){return0L;}@OverridepublicLongadd(AdClickEvent adClickEvent,Long aLong){return aLong +1;}@OverridepublicLonggetResult(Long aLong){return aLong;}@OverridepublicLongmerge(Long aLong,Long acc1){return aLong + acc1;}}}
这段代码,在第一个需求的基础上,主要是增加了用于过滤恶意点击的侧输出流逻辑,具体来说,主流里面仍然是对广告点击量按照省份进行窗口的统计输出,但是恶意登录的统计将会在侧输出流中输出;
运行上面的代码,观察控制台输出效果,从控制台输出也可以看到,我们定义了100为上限,页面点击超过100次的用户将会输出到侧输出流,这样在实际的业务中,就可以针对侧输出流的用户进行相关的后续处理;
版权归原作者 小码农叔叔 所有, 如有侵权,请联系我们删除。