ReduceFunction
实例代码如下(本博客内容均基于按键分区)
reduce函数有两个参数,分别是IN(输入)和OUT(输出类型)
SingleOutputStreamOperator<WaterSensor> reduce = sensorWS.reduce(
new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("调用reduce方法,value1=" + value1 + "value2=" + value2);
return new WaterSensor(value1.getId(), value2.ts, value1.vc + value2.vc);
}
}
);
reduce.print();
1、相同key的第一条数据来的时候,不会调用reduce方法
![](https://img-blog.csdnimg.cn/direct/0edffddd189c4f4a9248ea6e6192975e.png)
![](https://img-blog.csdnimg.cn/direct/1e7d35f160ec4aa5bccc9734176aaf4c.png)
2、相同key在一个窗口内传入的数据大于一条,则会调用reduce方法
![](https://img-blog.csdnimg.cn/direct/876c5b3a20d842f3b2924b6358584b38.png)
![](https://img-blog.csdnimg.cn/direct/03225f185903426d99468ddda98e9fd0.png)
3、不同key对应不同的窗口,互不影响
![](https://img-blog.csdnimg.cn/direct/9bd867a381224cb3a5058a6a8490dad5.png)
![](https://img-blog.csdnimg.cn/direct/b22d8242d31047d5967b6bab91d61446.png)
相比于ReduceFunction限制了输入、输出、聚合状态类型保持一致,AggregateFunction更加灵活,3个变量类型可以任意选择
AggregateFunction
// 2、增量聚合 Aggregate
/*
* 1、属于本窗口的第一条数据来,创建窗口,创建累加器
* 2、增量聚合: 来一条计算一条, 调用一次add方法
* 3、窗口输出时调用一次getresult方法
* 4、输入、中间累加器、输出 类型可以不一样,非常灵活
*/
SingleOutputStreamOperator<String> agg = sensorWS.aggregate(
/*
* 第一个类型: 输入数据的类型
* 第二个类型: 累加器的类型,存储的中间计算结果的类型
* 第三个类型: 输出的类型
*/
new AggregateFunction<WaterSensor, Integer, String>() {
// 初始化累加器
@Override
public Integer createAccumulator() {
return 0;
}
// 聚合逻辑
@Override
public Integer add(WaterSensor waterSensor, Integer integer) {
System.out.println("调用add方法,value" + integer);
return waterSensor.vc + integer;
}
// 获取最终结果
@Override
public String getResult(Integer integer) {
System.out.println("调用getResult方法");
return integer.toString();
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
}
);
agg.print();
1、初始化累加器之后,第一个值设置为0,add方法一定会执行
![](https://img-blog.csdnimg.cn/direct/013cea81ae9740a890d9a0662950667a.png)
![](https://img-blog.csdnimg.cn/direct/e08d0f5b675c436b8d7f0bd347476995.png)
2、有n条数据,就会执行n次add方法
![](https://img-blog.csdnimg.cn/direct/3263b0b97f8643629feeb36a289cf8df.png)
![](https://img-blog.csdnimg.cn/direct/6700c3906c8a486aa1deb976c5ba1b04.png)
以上两种方法不能看到窗口的信息,比如时间,因此引入全窗口函数,它提供了一个上下文对象,不仅能获取窗口信息,还能得到访问的当前时间和状态信息。
ProcessWindowFunction
该function的四个参数按顺序分别表示为输入,输出,key,窗口类型
SingleOutputStreamOperator<String> process = sensorWS.process(
// IN,OUT,KEY,Window
new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
/**
* 全窗口函数计算逻辑: 窗口触发时才会调用一次,统一计算窗口的所有数据
* @param s 分组的key
* @param context 上下文
* @param elements 存的数据
* @param out 采集器
* @throws Exception
*/
public void process(String s, Context context, Iterable<WaterSensor> iterable, Collector<String> collector) throws Exception {
// 拿到窗口的开始时间、结束时间
long startTS = context.window().getStart();
long endTS = context.window().getEnd();
String start_time = DateFormatUtils.format(startTS, "yyyy-MM-dd HH:mm:ss.SSS");
String end_time = DateFormatUtils.format(endTS, "yyyy-MM-dd HH:mm:ss.SSS");
// 去除窗口的size
long count = iterable.spliterator().estimateSize();
collector.collect("key=" + s + "的窗口[" + start_time + "->" +
end_time + "),长度为" + count);
}
}
);
process.print();
1、不同key会产生不同的窗口,而一个窗口的数据在结束时才会执行计算
![](https://img-blog.csdnimg.cn/direct/dab6a828ee6b417e978a940c3f25741e.png)
![](https://img-blog.csdnimg.cn/direct/b30b1eb7e1d148ff96c48f6e75acb05b.png)
结合增量聚合函数和全窗口函数
看过上面3个案例,可以发现增量聚合函数ReduceFunction和AggregateFunction都是数据来一条处理一条,而全窗口函数ProcessWindowFunction则是在窗口结束的时候才执行计算,因此process的效率相对更低。但是process的优点在于它可以得到上下文信息,因此,将两者结合起来使用,既可以提升计算效率,又能得到窗口信息。
public class WindowAggregateAndProcessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> source = env
.socketTextStream("node1", 7777)
.map(new WaterSensorMapFunction());
KeyedStream<WaterSensor, String> keyBy = source.keyBy(
new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor waterSensor) throws Exception {
return waterSensor.getId();
}
}
);
// 1、窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new MyAgg(), new MyProcess());
aggregate.print();
env.execute();
}
public static class MyAgg implements AggregateFunction<WaterSensor,Integer,String>{
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(WaterSensor waterSensor, Integer integer) {
System.out.println("调用add函数,value=" + integer);
return waterSensor.vc + integer;
}
@Override
public String getResult(Integer integer) {
System.out.println("调用getResult函数");
return integer.toString();
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
}
// 注意,这里ProcessWindowFunction的输入为String类型,也就是AggregateFunction的输出类型
public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{
// iterable 只有一条数据,即AggregateFunction的计算结果
@Override
public void process(String s, Context context, Iterable<String> iterable, Collector<String> collector) throws Exception {
// 拿到窗口的开始时间、结束时间
long startTS = context.window().getStart();
long endTS = context.window().getEnd();
String start_time = DateFormatUtils.format(startTS, "yyyy-MM-dd HH:mm:ss.SSS");
String end_time = DateFormatUtils.format(endTS, "yyyy-MM-dd HH:mm:ss.SSS");
// 去除窗口的size
long count = iterable.spliterator().estimateSize();
collector.collect("key=" + s + "的窗口[" + start_time + "->" +
end_time + "),长度为" + count);
}
}
}
1、结合两者既可以实现来一条数据执行一条数据,还能拿到上下文信息。
2、process中的长度均为1,因为其输入是aggregate的输出。
![](https://img-blog.csdnimg.cn/direct/adc1c192af814515a328f91bbf5710bb.png)
![](https://img-blog.csdnimg.cn/direct/2fcc827ab6434ceeb99ad290bcc9d9aa.png)
同理,reduce也支持结合使用增量聚合函数和全窗口函数,写法与aggregate基本保持一致,同样注意process的输入应该改成reduceFunction的输出类型。
本文转载自: https://blog.csdn.net/m0_63069778/article/details/136592573
版权归原作者 数据开发杰出cv工程师 所有, 如有侵权,请联系我们删除。
版权归原作者 数据开发杰出cv工程师 所有, 如有侵权,请联系我们删除。