0


Flink 滚动窗口、滑动窗口详解

1 滚动窗口(Tumbling Windows)

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计。
在这里插入图片描述

2 滑动窗口(Sliding Windows)

与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。

既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代
表了窗口计算的频率。滑动的距离代表了下个窗口开始的时间间隔,而窗口大小是固定的,所以也就是两个窗口结束时间的间隔;窗口在结束时间触发计算输出结果,那么滑动步长就代表
了计算频率。例如,我们定义一个长度为 1 小时、滑动步长为 5 分钟的滑动窗口,那么就会统计 1 小时内的数据,每 5 分钟统计一次。同样,滑动窗口可以基于时间定义,也可以基于数据
个数定义。
在这里插入图片描述
在这里插入图片描述
我们可以看到,当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide)来决
定。如图 6-18 所示,滑动步长刚好是窗口大小的一半,那么每个数据都会被分配到 2 个窗口里。比如我们定义的窗口长度为 1 小时、滑动步长为 30 分钟,那么对于 8 点 55 分的数据,应该同时属于[8 点, 9 点)和[8 点半, 9 点半)两个窗口;而对于 8 点 10 分的数据,则同时属于[8点, 9 点)和[7 点半, 8 点半)两个窗口。所以,滑动窗口其实是固定大小窗口的更广义的一种形式;换句话说,滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长(size = slide)。当然,我们也可以定义滑动步长大于窗口大小,这样的话就会出现窗口不重叠、但会有间隔的情况;这时有些数据不
属于任何一个窗口,就会出现遗漏统计。所以一般情况下,我们会让滑动步长小于窗口大小,并尽量设置为整数倍的关系。

在一些场景中,可能需要统计最近一段时间内的指标,而结果的输出频率要求又很高,甚至要求实时更新,比如股票价格的 24 小时涨跌幅统计,或者基于一段时间内行为检测的异常报警。这时滑动窗口无疑就是很好的实现方式。

3 窗口API

3.1 按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时
执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。

在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...).window(...)

3.2 非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。

在代码中,直接基于 DataStream 调用.windowAll()定义窗口。

stream.windowAll(...)

3.3 代码中窗口 API 的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做 keyBy,后面调用.window()时直接换成.windowAll()就可以了。

3.4 滚动处理时间窗口

窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。

stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)

这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。另外,.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量,用这个偏移量可以处理时区。

例如:我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了。

.window(TumblingProcessingTimeWindows.of(Time.days(1),Time.hours(-8)))

3.5 滑动处理时间窗口

窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。

这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。
滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).aggregate(...)

4 窗口函数

定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗
口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream。
在这里插入图片描述

4.1 增量聚合函数(incremental aggregation functions)

4.1.1 归约函数(ReduceFunction)

将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。

统计每一小时用户的访问量:

packagecom.rosh.flink.pojo;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;@AllArgsConstructor@NoArgsConstructor@DatapublicclassUserPojo{privateInteger userId;privateString name;privateString uri;privateLong timestamp;}
packagecom.rosh.flink.wartermark;importcom.rosh.flink.pojo.UserPojo;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importjava.security.NoSuchAlgorithmException;importjava.security.SecureRandom;importjava.util.ArrayList;importjava.util.List;importjava.util.Random;publicclassWindowTS{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DataStreamSource<UserPojo> dataDS = env.fromCollection(getUserLists());//生成有序水位线SingleOutputStreamOperator<UserPojo> orderStreamDS = dataDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(newSerializableTimestampAssigner<UserPojo>(){@OverridepubliclongextractTimestamp(UserPojo element,long recordTimestamp){return element.getTimestamp();}}));//聚合SingleOutputStreamOperator<Tuple2<Integer,Long>> userDS = orderStreamDS.map(newMapFunction<UserPojo,Tuple2<Integer,Long>>(){@OverridepublicTuple2<Integer,Long>map(UserPojo value)throwsException{returnTuple2.of(value.getUserId(),1L);}});//开窗统计每1小时用户访问了多少次SingleOutputStreamOperator<Tuple2<Integer,Long>> resultDS = userDS.keyBy(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.hours(1))).reduce(newReduceFunction<Tuple2<Integer,Long>>(){@OverridepublicTuple2<Integer,Long>reduce(Tuple2<Integer,Long> value1,Tuple2<Integer,Long> value2)throwsException{
                        value1.f1 = value1.f1 + value2.f1;return value1;}});

        resultDS.print();

        env.execute("WarterMarkTest");}privatestaticList<UserPojo>getUserLists()throwsNoSuchAlgorithmException{List<UserPojo> lists =newArrayList<>();Random random =SecureRandom.getInstanceStrong();for(int i =1; i <=1000; i++){String uri ="/goods/"+ i;int userId = random.nextInt(10);//有序时间UserPojo userPojo =newUserPojo(userId,"name"+ userId, uri,(long)(1000* i));//无序时间
            lists.add(userPojo);}return lists;}}

在这里插入图片描述

4.1.2 聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数
据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦。

例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?很明显,这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用 ReduceFunction,那么我们应该先把数据转换成二元组(sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。

于是自然可以想到,如果取消类型一致的限制,让输入数据、中间状态、输出结果三者类型都可以不同,不就可以一步直接搞定了吗?Flink 的 Window API 中的 aggregate 就提供了这样的操作。直接基于 WindowedStream 调用.aggregate()方法,就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction 的实现类作为参数。AggregateFunction 在源码中的定义如下:

/**
 *
 *  The type of the values that are aggregated (input values)
 *    The type of the accumulator (intermediate aggregate state).
 *    The type of the aggregated result
 *
 */publicinterfaceAggregateFunction<IN, ACC, OUT>extendsFunction,Serializable{/**
      * 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
            */ACCcreateAccumulator();/**
        * 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;
          * 返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
        */ACCadd(IN value,ACC accumulator);/**
       * 从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均
       * 值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
       */OUTgetResult(ACC accumulator);/**
       * 合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景
       * 就是会话窗口(Session Windows)。
       */ACCmerge(ACC a,ACC b);}

所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的
结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输
出的类型可以不同,使得应用更加灵活方便。

·统计人均访问次数:

packagecom.rosh.flink.wartermark;importcom.rosh.flink.pojo.UserPojo;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.AggregateFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importjava.security.NoSuchAlgorithmException;importjava.security.SecureRandom;importjava.util.*;publicclassAggWindowTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);SingleOutputStreamOperator<UserPojo> userDS = env.fromCollection(getUserLists()).assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(newSerializableTimestampAssigner<UserPojo>(){@OverridepubliclongextractTimestamp(UserPojo element,long recordTimestamp){return element.getTimestamp();}}));//统计5秒内,人均访问次数SingleOutputStreamOperator<Double> resultDS = userDS.keyBy(key ->true).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).aggregate(newPeopleHourAvgCount());

        resultDS.print("人均访问次数为:");

        env.execute("AggWindowTest");}privatestaticclassPeopleHourAvgCountimplementsAggregateFunction<UserPojo,Tuple2<HashSet<Integer>,Long>,Double>{/**
         * 初始化累加器
         */@OverridepublicTuple2<HashSet<Integer>,Long>createAccumulator(){returnTuple2.of(newHashSet<>(),0L);}/**
         *
         */@OverridepublicTuple2<HashSet<Integer>,Long>add(UserPojo value,Tuple2<HashSet<Integer>,Long> accumulator){//distinct userId
            accumulator.f0.add(value.getUserId());//次数+1
            accumulator.f1 = accumulator.f1 +1;//返回累加器return accumulator;}@OverridepublicDoublegetResult(Tuple2<HashSet<Integer>,Long> accumulator){return accumulator.f1 *1.0/ accumulator.f0.size();}@OverridepublicTuple2<HashSet<Integer>,Long>merge(Tuple2<HashSet<Integer>,Long> a,Tuple2<HashSet<Integer>,Long> b){returnnull;}}/**
     * 获取随机人数的1000次访问
     */privatestaticList<UserPojo>getUserLists()throwsNoSuchAlgorithmException{List<UserPojo> lists =newArrayList<>();Random random =SecureRandom.getInstanceStrong();//获取随机人数int peopleCount = random.nextInt(20);System.out.println("随机人数为:"+ peopleCount);for(int i =1; i <=1000; i++){String uri ="/goods/"+ i;int userId = random.nextInt(peopleCount);//有序时间UserPojo userPojo =newUserPojo(userId,"name"+ userId, uri,newDate().getTime());//无序时间
            lists.add(userPojo);}return lists;}}

在这里插入图片描述

4.2 全窗口函数(full window functions)

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员。

统计10秒访问UV:

packagecom.rosh.flink.wartermark;importcom.rosh.flink.pojo.UserPojo;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;importorg.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importjava.security.NoSuchAlgorithmException;importjava.security.SecureRandom;importjava.sql.Timestamp;importjava.util.*;publicclassProcessWindowTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists());//水位线SingleOutputStreamOperator<UserPojo> watermarks = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(newSerializableTimestampAssigner<UserPojo>(){@OverridepubliclongextractTimestamp(UserPojo element,long recordTimestamp){return element.getTimestamp();}}));//开窗10秒UV统计SingleOutputStreamOperator<String> resultDS = watermarks.keyBy(key ->true).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(newUserUVCount());

        resultDS.print("UV:");

        env.execute("ProcessWindowTest");}privatestaticclassUserUVCountextendsProcessWindowFunction<UserPojo,String,Boolean,TimeWindow>{@Overridepublicvoidprocess(Boolean aBoolean,ProcessWindowFunction<UserPojo,String,Boolean,TimeWindow>.Context context,Iterable<UserPojo> elements,Collector<String> out)throwsException{//用户集合HashSet<Integer> hashSet =newHashSet<>();for(UserPojo user : elements){
                hashSet.add(user.getUserId());}//获取时间信息long start = context.window().getStart();long end = context.window().getEnd();String rs ="窗口信息,startTime:"+newTimestamp(start)+",endTime: "+newTimestamp(end)+",用户访问的次数为:"+ hashSet.size();
            out.collect(rs);}}privatestaticList<UserPojo>getUserLists()throwsNoSuchAlgorithmException{List<UserPojo> lists =newArrayList<>();Random random =SecureRandom.getInstanceStrong();int userCount = random.nextInt(100);for(int i =1; i <=1000; i++){String uri ="/goods/"+ i;int userId = random.nextInt(userCount);//有序时间UserPojo userPojo =newUserPojo(userId,"name"+ userId, uri,newDate().getTime());//无序时间
            lists.add(userPojo);}return lists;}}

在这里插入图片描述

4.3 增量聚合和全窗口函数的结合使用

增量聚合函数处理计算会更高效。全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。

在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction。

// ReduceFunction 与 WindowFunction 结合public<R>SingleOutputStreamOperator<R>reduce(ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function)// ReduceFunction 与 ProcessWindowFunction 结合public<R>SingleOutputStreamOperator<R>reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function)// AggregateFunction 与 WindowFunction 结合public<ACC,V,R>SingleOutputStreamOperator<R>aggregate(AggregateFunction<T, ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction)// AggregateFunction 与 ProcessWindowFunction 结合public<ACC,V,R>SingleOutputStreamOperator<R>aggregate(AggregateFunction<T, ACC,V> aggFunction,ProcessWindowFunction<V,R,K,W> windowFunction)

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输
出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。

统计10秒的url浏览量:

packagecom.rosh.flink.wartermark;importcom.alibaba.fastjson.JSONObject;importcom.rosh.flink.pojo.UserPojo;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importjava.security.NoSuchAlgorithmException;importjava.security.SecureRandom;importjava.sql.Timestamp;importjava.util.ArrayList;importjava.util.Date;importjava.util.List;importjava.util.Random;publicclassUrlWindowTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);//读取数据源DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists());//水位线SingleOutputStreamOperator<UserPojo> waterDS = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(newSerializableTimestampAssigner<UserPojo>(){@OverridepubliclongextractTimestamp(UserPojo element,long recordTimestamp){return element.getTimestamp();}}));//url countSingleOutputStreamOperator<Tuple2<String,Long>> urlDS = waterDS.map(newMapFunction<UserPojo,Tuple2<String,Long>>(){@OverridepublicTuple2<String,Long>map(UserPojo value)throwsException{returnTuple2.of(value.getUri(),1L);}});SingleOutputStreamOperator<JSONObject> resultDS = urlDS.keyBy(data -> data.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce(newReduceFunction<Tuple2<String,Long>>(){@OverridepublicTuple2<String,Long>reduce(Tuple2<String,Long> value1,Tuple2<String,Long> value2)throwsException{
                        value1.f1 = value1.f1 + value2.f1;return value1;}},newWindowFunction<Tuple2<String,Long>,JSONObject,String,TimeWindow>(){@Overridepublicvoidapply(String s,TimeWindow window,Iterable<Tuple2<String,Long>> input,Collector<JSONObject> out)throwsException{Tuple2<String,Long> tuple2 = input.iterator().next();JSONObject jsonObject =newJSONObject();
                        jsonObject.put("url", tuple2.f0);
                        jsonObject.put("count", tuple2.f1);newTimestamp(window.getStart());

                        jsonObject.put("startTime",newTimestamp(window.getStart()).toString());
                        jsonObject.put("endTime",newTimestamp(window.getEnd()).toString());
                        out.collect(jsonObject);}});

        resultDS.print();

        env.execute("UrlWindowTest");}privatestaticList<UserPojo>getUserLists()throwsNoSuchAlgorithmException{List<UserPojo> lists =newArrayList<>();Random random =SecureRandom.getInstanceStrong();for(int i =1; i <=1000; i++){//随机生成userId、goodIdint userId = random.nextInt(100);int goodId = random.nextInt(50);String uri ="/goods/"+ goodId;//有序时间UserPojo userPojo =newUserPojo(userId,"name"+ userId, uri,newDate().getTime());//无序时间
            lists.add(userPojo);}return lists;}}

在这里插入图片描述

标签: flink 算法 java

本文转载自: https://blog.csdn.net/qq_34125999/article/details/128971255
版权归原作者 响彻天堂丶 所有, 如有侵权,请联系我们删除。

“Flink 滚动窗口、滑动窗口详解”的评论:

还没有评论