0


Flink中aggregate[AggregateFunction]的使用及讲解

Flink的

aggregate()

方法一般是通过实现

AggregateFunction

接口对数据流进行聚合计算的场景。例如,在使用 Flink 的

DataStream API

时,用户经常需要对输入数据进行分组操作,并按照一组

key

对数据进行汇总、运算或聚合计算。对于这些场景,可以使用

aggregate()

方法来实现聚合计算。通过指定一个

AggregateFunction

类型的函数作为聚合操作来调用

aggregate()

方法,可以对元素流进行聚合和处理,生成新的输出流。在具体应用中,根据不同的业务需求,可以根据实际情况选择不同类型的

AggregateFunction

来完成聚合计算任务。
接下来先对

AggregateFunction

中的需要实现的4个方法进行说明
**1.

createAccumulator()

**
此方法用于创建累加器,并将其初始化为默认值
**2.

add()

**
此方法将输入的元素添加到累加器,返回更新后的累加器
**3.

getResult()

**
此方法用于从累加器中提取操作的结果
**4.

merge()

**
此方法将两个累加器合并为一个新的累加器

下面在通过代码实例说明AggregateFunction的使用,这里都以Tuple2类型作为举例说明

  • 求平均值
publicstaticclassAverageAggregateimplementsAggregateFunction<Tuple2<String,Double>,Tuple2<String,Double>,Tuple2<String,Double>>{@OverridepublicTuple2<String,Double>createAccumulator(){// 先将累加器进行初始化,这里给了一个""作为key, 0.0作为值returnTuple2.of("",0.0);}@OverridepublicTuple2<String,Double>add(Tuple2<String,Double> value,Tuple2<String,Double> accumulator){// 这里的实现是将输入的元素和累加器中的元素相加,并返回一个新的元素returnTuple2.of(value.f0, accumulator.f1 + value.f1);}@OverridepublicTuple2<String,Double>getResult(Tuple2<String,Double> accumulator){// 这里返回一个包含平均值的 Tuple2 对象,这里是将累加器中的元素除以2,然后返回一个新元素。returnTuple2.of(accumulator.f0, accumulator.f1 /2.0);}@OverridepublicTuple2<String,Double>merge(Tuple2<String,Double> a,Tuple2<String,Double> b){// 这里是将两个累加器中的元素相加并除以2,然后返回一个新的元素对。returnTuple2.of(a.f0,(a.f1 + b.f1)/2);}}
  • 求最大值
publicclassMaxAggregateFunctionimplementsAggregateFunction<Tuple2<String,Double>,Tuple2<String,Double>,Tuple2<String,Double>>{@OverridepublicTuple2<String,Double>createAccumulator(){returnTuple2.of("",Double.MIN_VALUE);// 将累加器初始化为最小值}@OverridepublicTuple2<String,Double>add(Tuple2<String,Double> value,Tuple2<String,Double> accumulator){if(value.f1 > accumulator.f1){returnTuple2.of(value.f0, value.f1);}else{return accumulator;}}@OverridepublicTuple2<String,Double>getResult(Tuple2<String,Double> accumulator){return accumulator;// 返回最大值}@OverridepublicTuple2<String,Double>merge(Tuple2<String,Double> a,Tuple2<String,Double> b){if(a.f1 > b.f1){return a;}else{return b;}}}
  • 求最小值
publicclassMinAggregateFunctionimplementsAggregateFunction<Tuple2<String,Double>,Tuple2<String,Double>,Tuple2<String,Double>>{@OverridepublicTuple2<String,Double>createAccumulator(){returnTuple2.of("",Double.MAX_VALUE);// 将累加器初始化为最大值}@OverridepublicTuple2<String,Double>add(Tuple2<String,Double> value,Tuple2<String,Double> accumulator){if(value.f1 < accumulator.f1){returnTuple2.of(value.f0, value.f1);}else{return accumulator;}}@OverridepublicTuple2<String,Double>getResult(Tuple2<String,Double> accumulator){return accumulator;// 返回最小值}@OverridepublicTuple2<String,Double>merge(Tuple2<String,Double> a,Tuple2<String,Double> b){if(a.f1 < b.f1){return a;}else{return b;}}}
  • 求和
publicclassSumAggregateFunctionimplementsAggregateFunction<Tuple2<String,Double>,Double,Double>{@OverridepublicDoublecreateAccumulator(){return0.0;// 将累加器初始化为0}@OverridepublicDoubleadd(Tuple2<String,Double> value,Double accumulator){return value.f1 + accumulator;// 将输入元素和累加器中的元素相加}@OverridepublicDoublegetResult(Double accumulator){return accumulator;// 返回总和}@OverridepublicDoublemerge(Double a,Double b){return a + b;// 合并两个累加器中的元素相加}}

以上代码就是通过实现AggregateFunction接口,自定义不同的逻辑达到求平均值、最大值、最小值、总和的目的。

  • 方法调用演示
importcom.alibaba.fastjson.JSONObject;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.AggregateFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.api.common.typeinfo.TypeHint;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.connector.kafka.source.KafkaSource;importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/2/1
 * @Description: 测试
 **/publicclassDemo1{publicstaticvoidmain(String[] args)throwsException{Properties prop =newProperties();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 这里以kafka作为数据源KafkaSource<String> kafkaSource =KafkaSource.<String>builder().setBootstrapServers("lx01:9092").setTopics("topic-01").setGroupId("g02").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(newSimpleStringSchema()).build();DataStreamSource<String> stream = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"KafkaSource");// 先将数据转成需要的Tuple2的形式SingleOutputStreamOperator<Tuple2<String,Double>> mapStream = stream.map((MapFunction<String,Tuple2<String,Double>>) value ->{JSONObject data =JSONObject.parseObject(JSONObject.parseObject(value).get("data").toString());returnTuple2.of(data.getString("gender"), data.getDouble("salary"));}).returns(TypeInformation.of(newTypeHint<Tuple2<String,Double>>(){}));// 这里先通过keyBy将数据根据性别进行分组,然后5秒为一个窗口,再求不同性别对应的工资平均值SingleOutputStreamOperator<Tuple2<String,Double>> avg = mapStream.keyBy((KeySelector<Tuple2<String,Double>,String>) value ->{String key = value.f0;return key;}).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(newAverageAggregate());// 这里调用平均值的AggregateFunction

        avg.print();
        env.execute();}
  • AggregateFunction的这4个方法在flink中执行的原理 在 Flink 中AggregateFunction的这四个方法在执行过程中会被转化为对应的内部Function对象,用于Flink的运行时执行计算。1. 在数据输入流进入 Flink 的过程中,Flink会为每一个key创建对应的累加器。键值对流会按照键所在的组进行分区,然后把每一个组的所有元素分配到一个task slot中,并为每个key创建一个累加器。累加器的类型是任务的一个状态的函数,Flink根据累加器函数的类型来决定使用哪种累加器。2. 当每个数据元素输入到累加器中时,add()方法会被调用。add()方法对输入的元素进行变换,然后更新累加器中的结果,返回新的结果给Flink 的相应算子。3. 在结果计算完毕后,getResult()方法将被调用,并将结果返回给 Flink。最后,如果有多个累加器需要合并的情况,Flink 会调用merge()方法将结果进行合并。通过这样的执行机制,AggregateFunction对象可以更加灵活快捷地处理数据。
  • 累加器的选择 当 Flink 创建累加器时,它会根据AggregateFunction的类型来确定使用哪种类型的累加器。具体来说,Flink支持两种类型的累加器:heap-basedincrementalheap-based累加器需要在内存中存储完整的所有元素,对于数据量较小的情况,它可以提供最好的性能。对于数据量更大的情况,它可能会导致内存不足的问题。incremental累加器可以在输入元素上进行增量操作,并在内存中保存仅仅是必要的元素。它可以处理更大的数据量,并且在内存使用上更加高效。在使用增量式累加器时,用户需要重写accumulate()retract()方法。 根据AggregateFunction的类型,Flink会自动选择合适的累加器类型来进行计算,以提高计算的效率和性能。

以上就是对aggregate方法的使用讲解及简单的原理介绍

标签: flink java 大数据

本文转载自: https://blog.csdn.net/AnameJL/article/details/131063024
版权归原作者 飞天小老头 所有, 如有侵权,请联系我们删除。

“Flink中aggregate[AggregateFunction]的使用及讲解”的评论:

还没有评论