0


Flink之常用处理函数

常用处理函数

处理函数

概述

处理函数(Processing Function)是Apache Flink中用于对数据流上的元素进行处理的核心组件之一。处理函数负责定义数据流上的数据如何被处理,允许开发人员编写自定义逻辑以执行各种操作,如转换、聚合、筛选、连接等,并在处理后生成输出数据流。

对于数据流,都可以直接调用.process()方法进行自定义处理,传入的参数就叫作处理函数,也可以把它划分为转换算子。

基本处理函数

ProcessFunction是最基本的处理函数,基于DataStream直接调用.process()时作为参数传入

ProcessFunction介绍

ProcessFunction是一个抽象类,它继承AbstractRichFunction,有两个泛型类型参数:

1.输入的数据类型

2.处理完成之后输出数据类型

内部单独定义了两个方法:

1.必须要实现的抽象方法.processElement()

2.一个非抽象方法.onTimer()

ProcessFunction类如下:

/**
 * 处理流元素的函数
 *
 * 对于输入流中的每个元素,调用processElement(Object,ProcessFunction.Context,Collector) 可以产生零个或多个元素作为输出
 * 还可以通过提供的ProcessFunction.Context查询时间和设置计时器
 *
 * 对于触发计时器,将调用onTimer(long,ProcessFunction.OnTimerContext,Collector) 可以再次产生零个或多个元素作为输出,并注册其他计时器
 *
 * @param <I> 输入元素的类型
 * @param <O> 输出元素的类型
 */@PublicEvolvingpublicabstractclassProcessFunction<I,O>extendsAbstractRichFunction{privatestaticfinallong serialVersionUID =1L;/**
     * 处理输入流中的一个元素,对于流中的每个元素都会调用一次
     *
     * 可以使用输出零个或多个元素收集器参数,并使用更新内部状态或设置计时器ProcessFunction.Context参数
     *
     * @param value 输入值,类型与流中数据类型一致
     * @param ctx ProcessFunction的内部抽象类Context,表示当前运行的上下文,可以获取当前时间戳,用于查询时间和注册定时器的定时服务
     * @param out 用于返回结果值的收集器,与out.collect()方法向下游发数据类似
     */publicabstractvoidprocessElement(I value,Context ctx,Collector<O> out)throwsException;/**
     * 当使用设置计时器时调用TimerService
     * 
     * 只有在注册好的定时器触发的时候才会调用,而定时器是通过定时服务TimerService来注册的
     * 
     * 事件时间语义下就是由水位线watermark来触发
     * 
     * 也可以自定义数据按照时间分组、定时触发计算输出结果,实现类似窗口window的功能
     *
     * @param timestamp 触发计时器的时间戳,指设定好的触发时间
     * @param ctx 上下文
     * @param out 用于返回结果值的收集器
     */publicvoidonTimer(long timestamp,OnTimerContext ctx,Collector<O> out)throwsException{}}

使用示例

基本处理函数ProcessFunction的使用与基本的转换操作类似,直接基于DataStream调用.process()方法,传入一个ProcessFunction作为参数,用来定义处理逻辑。

具体举例使用示例如下:

publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);DataStreamSource<Integer> stream = env.fromCollection(Arrays.asList(1,2,3,4,5,-6));/**
         * 创建OutputTag对象
         * 分别指定: 标签名、放入侧输出流的数据类型(Typeinformation)
         */OutputTag<Integer> evenTag =newOutputTag<>("even",Types.INT);OutputTag<Integer> oddTag =newOutputTag<>("odd",Types.INT);// 使用process算子SingleOutputStreamOperator<Integer> process = stream.process(newProcessFunction<Integer,Integer>(){@OverridepublicvoidprocessElement(Integer value,Context ctx,Collector<Integer> out)throwsException{if(value >0){if(value %2==0){// 偶数放到侧输出流evenTag中// 调用上下文对象ctx的output方法,分别传入 Tag对象、放入侧输出流中的数据
                                ctx.output(evenTag, value);}elseif(value %2==1){// 奇数放到侧输出流oddTag中
                                ctx.output(oddTag, value);}}else{// 负数 数据,放到主流中
                            out.collect(value);}}});// 在主流中,根据标签 获取 侧输出流SideOutputDataStream<Integer> even = process.getSideOutput(evenTag);SideOutputDataStream<Integer> odd = process.getSideOutput(oddTag);// 打印主流
        process.printToErr("主流-负数-job");//打印 侧输出流
        even.print("偶数-job");
        odd.print("奇数-job");
        env.execute();}
奇数-job:1>1
偶数-job:2>2
奇数-job:1>3
偶数-job:2>4
奇数-job:1>5
主流-负数-job:2>-6

按键分区处理函数

KeyedProcessFunction对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,必须基于KeyedStream

KeyedProcessFunction介绍

KeyedProcessFunction与ProcessFunction的定义几乎完全一样,区别只是在于类型参数多了一个K,这是当前按键分区的key的类型。

按键分区处理函数接口如下:

publicabstractclassKeyedProcessFunction<K,I,O>extendsAbstractRichFunction{publicabstractvoidprocessElement(I value,Context ctx,Collector<O> out)throwsException;publicvoidonTimer(long timestamp,OnTimerContext ctx,Collector<O> out)throwsException{}}

定时器Timer和定时服务TimerService

另外在KeyedStream中是支持使用定时服务TimerService,可以通过它访问流中的事件event、时间戳timestamp、水位线watermark,甚至可以注册定时事件。

在onTimer()方法中可以实现定时处理的逻辑,而它触发的前提是之前曾经注册过定时器、并且现在已经到了触发时间。

注册定时器的功能是通过上下文中提供的定时服务来实现的。

// 获取定时服务TimerService timerService = ctx.timerService();

TimerService是Flink关于时间和定时器的基础服务接口,对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器,具体方法如下:

// 获取当前的处理时间longcurrentProcessingTime();// 获取当前的水位线(事件时间)longcurrentWatermark();// 注册处理时间定时器,当处理时间超过time时触发voidregisterProcessingTimeTimer(long time);// 注册事件时间定时器,当水位线超过time时触发voidregisterEventTimeTimer(long time);// 删除触发时间为time的处理时间定时器voiddeleteProcessingTimeTimer(long time);// 删除触发时间为time的处理时间定时器voiddeleteEventTimeTimer(long time);

注意:

尽管处理函数中都可以访问TimerService,不过只有基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法

使用示例

直接基于keyBy之后的KeyedStream,直接调用.process()方法,传入KeyedProcessFunction的实现类参数

必须实现processElement()抽象方法,用来处理流中的每一个数据

必须实现非抽象方法onTimer(),用来定义定时器触发时的回调操作
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String,Integer>> streamSource = env.socketTextStream("IP",8086)// 将输入数据转换为Tuple2.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{String[] split = value.split(",");returnTuple2.of(split[0],Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts)-> value.f1 *1000L));// keyBy分区KeyedStream<Tuple2<String,Integer>,String> keyByStream = streamSource.keyBy(a -> a.f0);// 按键分区处理函数SingleOutputStreamOperator<Integer> process = keyByStream.process(newKeyedProcessFunction<String,Tuple2<String,Integer>,Integer>(){/**
                     * 来一条数据调用一次
                     * @param value 当前数据
                     * @param ctx 上下文
                     * @param out 收集器
                     * @throws Exception
                     */@OverridepublicvoidprocessElement(Tuple2<String,Integer> value,Context ctx,Collector<Integer> out)throwsException{//获取当前数据的keyString currentKey = ctx.getCurrentKey();p();// 获取定时服务TimerService timerService = ctx.timerService();// 数据中提取出来的事件时间Long currentEventTime = ctx.timestam
                        // 注册事件时间定时器
                        timerService.registerEventTimeTimer(3000L);System.out.println("key: "+ currentKey +" 当前数据: "+ value +" 当前时间: "+ currentEventTime +" 注册一个3s定时器");/**
                     * 时间进展到定时器注册的时间,调用该方法
                     * @param timestamp 定时器被触发时的时间
                     * @param ctx       上下文
                     * @param out       采集器
                     */@OverridepublicvoidonTimer(long timestamp,OnTimerContext ctx,Collector<Integer> out)throwsException{super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key: "+ currentKey +" 时间: "+ timestamp +" 定时器触发");}});
        process.print();
        env.execute();}

其他

1.注册一个事件时间的定时器

事件时间定时器,通过watermark来触发,即watermark >= 注册的时间

水印watermark = 当前最大事件时间 - 等待时间 -1ms

例子:等待3s,3s定时器,事件时间6s 则watermark = 6s - 3s -1ms = 2.99s,不会触发3s的定时器
// 数据中提取出来的事件时间Long currentEventTime = ctx.timestam
// 注册事件时间定时器
timerService.registerEventTimeTimer(3000L);System.out.println("key: "+ currentKey +" 当前数据: "+ value +" 当前时间: "+ currentEventTime +" 注册一个3s定时器");

输入数据如下,当输入7时,水位线是7-3=4s-1ms=3.99s,即水位线超过定时器3s,执行触发回调操作

nc -lk 8086
key1,1
key1,2
key2,3
key2,4
key1,5
key2,6
key1,7

控制台输出:

key: key1 当前数据:(key1,1) 当前时间:1000 注册一个3s定时器
key: key1 当前数据:(key1,2) 当前时间:2000 注册一个3s定时器
key: key2 当前数据:(key2,3) 当前时间:3000 注册一个3s定时器
key: key2 当前数据:(key2,4) 当前时间:4000 注册一个3s定时器
key: key1 当前数据:(key1,5) 当前时间:5000 注册一个3s定时器
key: key2 当前数据:(key2,6) 当前时间:6000 注册一个3s定时器
key: key1 当前数据:(key1,7) 当前时间:7000 注册一个3s定时器
key: key1 时间:3000 定时器触发
key: key2 时间:3000 定时器触发

注意:

TimerService会以键和时间戳为标准,对定时器进行去重,因此对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

2.注册一个处理时间的定时器

long currentTs = timerService.currentProcessingTime();
timerService.registerProcessingTimeTimer(currentTs +3000L);System.out.println("key: "+ currentKey +" 当前数据: "+ value +" 当前时间: "+ currentTs +" 注册一个3s后的定时器");

输入测试数据如下:

key1,1
key2,2

当注册一个处理时间的定时器,3s后定时器会触发操作

key: key1 当前数据:(key1,1) 当前时间:1688136512301 注册一个3s后的定时器
key: key2 当前数据:(key2,2) 当前时间:1688136514179 注册一个3s后的定时器
key: key1 时间:1688136515301 定时器触发
key: key2 时间:1688136517179 定时器触发

3.获取process当前watermark

long currentWatermark = timerService.currentWatermark();System.out.println("当前数据: "+ value +" 当前watermark: "+ currentWatermark);
key1,1
key1,2
key1,3

结论:每次process处理,watermark是指上一条数据的事件时间-等待时间,例如:3-2-1ms=-1001

当前数据=(key1,1),当前watermark=-9223372036854775808
当前数据=(key1,2),当前watermark=-2001
当前数据=(key1,3),当前watermark=-1001

4.删除一个处理时间定时器

// 注册处理时间定时器long currentTs = timerService.currentProcessingTime();long timer = currentTs +3000;
                        timerService.registerProcessingTimeTimer(timer);System.out.println("key: "+ currentKey +" 当前数据: "+ value +" 当前时间: "+ currentTs +" 注册一个3s后的定时器");// 在3000毫秒后删除处理时间定时器if("key1".equals(currentKey)){
                            timerService.deleteProcessingTimeTimer(timer)}

输入测试数据:

key1,1
key2,2

控制台输出结果:

key: key1 当前数据:(key1,1) 当前时间:1688138104565 注册一个3s后的定时器
key: key2 当前数据:(key2,2) 当前时间:1688138106441 注册一个3s后的定时器
key: key2 时间:1688138109441 定时器触发

窗口处理函数

窗口处理函数就是一种典型的全窗口函数,它是基于WindowedStream直接调用.process()方法

窗口处理函数有2个:

1.ProcessWindowFunction:

开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入,必须是keyBy的数据流

2.ProcessAllWindowFunction:

同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入,必须是非keyBy的数据流

ProcessWindowFunction介绍

ProcessWindowFunction既是处理函数又是全窗口函数,具体接口如下:

/**
     * ProcessWindowFunction它有四个类型参数:

     * @param <IN> 数据流中窗口任务的输入数据类型
     * @param <OUT> 窗口任务进行计算之后的输出数据类型
     * @param <KEY> 数据中键key的类型
     * @param <W>  窗口的类型,是Window的子类型。一般情况下我们定义时间窗口,W就是TimeWindow
     */publicabstractclassProcessWindowFunction<IN, OUT, KEY,WextendsWindow>extendsAbstractRichFunction{/**
         * 处理数据的核心方法process()方法
         *
         * @param key 窗口做统计计算基于的键,也就是之前keyBy用来分区的字段
         * @param context 当前窗口进行计算的上下文,它的类型就是ProcessWindowFunction内部定义的抽象类Context
         * @param elements 窗口收集到用来计算的所有数据,这是一个可迭代的集合类型
         * @param out 用来发送数据输出计算结果的收集器,类型为Collector
         * @throws Exception
         */publicabstractvoidprocess(KEY key,Context context,Iterable<IN> elements,Collector<OUT> out)throwsException;/**
         * 主要是进行窗口的清理工作
         * 如果自定义了窗口状态,那么必须在clear()方法中进行显式地清除,避免内存溢出
         * @param context 当前窗口进行计算的上下文
         * @throws Exception
         */publicvoidclear(Context context)throwsException{}}

ProcessAllWindowFunction介绍

ProcessAllWindowFunction的用法类似,不过它是基于AllWindowedStream,也就是对没有keyBy的数据流直接开窗并调用.process()方法

stream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(newMyProcessAllWindowFunction())

使用示例

以使用ProcessWindowFunction为例说明:

publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 从socket接收数据流SingleOutputStreamOperator<String> source = env.socketTextStream("IP",8086);// 将输入数据转换为(key, value)元组DataStream<Tuple2<String,Integer>> dataStream = source.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2map(String s)throwsException{int number =Integer.parseInt(s);String key = number %2==0?"key1":"key2";Tuple2 tuple2 =newTuple2(key, number);return tuple2;}}).returns(Types.TUPLE(Types.STRING,Types.INT));// 将数据流按键分组,并定义滚动窗口(处理时间窗口)DataStream<String> resultStream = dataStream
                .keyBy(tuple -> tuple.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(newMyProcessWindowFunction());

        resultStream.print();

        env.execute("ProcessWindowFunction Example");}publicstaticclassMyProcessWindowFunctionextendsProcessWindowFunction<Tuple2<String,Integer>,String,String,TimeWindow>{@Overridepublicvoidprocess(String key,Context context,Iterable<Tuple2<String,Integer>> elements,Collector<String> out){int sum =0;for(Tuple2<String,Integer> element : elements){
                sum += element.f1;}

            out.collect("Key: "+ key +", Window: "+ context.window()+", Sum: "+ sum);}}

流的合并处理函数

CoProcessFunction是合并connect两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入

CoProcessFunction介绍

调用.process()时,传入一个CoProcessFunction。它需要实现的就是processElement1()、processElement2()两个方法

CoProcessFunction类具体结构如下:

/**
 * 用于同时处理两个连接的流
 * 它允许定义自定义处理逻辑,以处理来自两个不同输入流的事件并生成输出
 *
 * @param <IN1> 第一个输入流的元素类型
 * @param <IN2> 第二个输入流的元素类型
 * @param <OUT> 输出元素的类型
 */publicabstractclassCoProcessFunction<IN1, IN2, OUT>extendsAbstractRichFunction{/**
     * 处理第一个输入流的元素
     *
     * @param value 第一个输入流的元素
     * @param ctx 用于访问上下文信息,例如事件时间和状态的Context对象
     * @param out 用于发射输出元素的Collector对象
     * @throws Exception 处理时可能抛出的异常
     */publicabstractvoidprocessElement1(IN1 value,Context ctx,Collector<OUT> out)throwsException;/**
     * 处理第二个输入流的元素
     *
     * @param value 第二个输入流的元素
     * @param ctx 用于访问上下文信息,可以使用Context对象来访问事件时间、水位线和状态等上下文信息
     * @param out 用于发射输出元素的Collector对象
     * @throws Exception 处理时可能抛出的异常
     */publicabstractvoidprocessElement2(IN2 value,Context ctx,Collector<OUT> out)throwsException;/**
     * 当定时器触发时调用的方法。可以重写这个方法来执行基于时间的操作
     *
     * @param timestamp 触发定时器的时间戳
     * @param ctx 用于访问上下文信息,如事件时间和状态的OnTimerContext对象
     * @param out 用于发射输出元素的Collector对象
     * @throws Exception 处理时可能抛出的异常
     */publicvoidonTimer(long timestamp,OnTimerContext ctx,Collector<OUT> out)throwsException{}}

使用示例

假设有两个输入流,将这两个流合并计算得到每个key对应的合计,并输出结果流

publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Tuple2<String,Integer>> source1 = env.fromElements(Tuple2.of("key1",1),Tuple2.of("key2",4),Tuple2.of("key1",2));DataStreamSource<Tuple2<String,Integer>> source2 = env.fromElements(Tuple2.of("key1",3),Tuple2.of("key2",5),Tuple2.of("key2",6));ConnectedStreams<Tuple2<String,Integer>,Tuple2<String,Integer>> connect = source1.connect(source2);// 进行keyby操作,将key相同数据放到一起ConnectedStreams<Tuple2<String,Integer>,Tuple2<String,Integer>> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);/**
         * 对2个流中相同key的值求和
         */SingleOutputStreamOperator<String> process = connectKeyby.process(newCoProcessFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,String>(){Map<String,Integer> map =newHashMap<>();/**
                     * 第一条流的处理逻辑
                     * @param value 第一条流的数据
                     * @param ctx   上下文
                     * @param out   采集器
                     * @throws Exception
                     */@OverridepublicvoidprocessElement1(Tuple2<String,Integer> value,Context ctx,Collector<String> out)throwsException{String key = value.f0;if(!map.containsKey(key)){// 如果key不存在,则将值直接put进map
                            map.put(key, value.f1);}else{// key存在,则计算:获取上一次put的值 + 本次的值Integer total = map.get(key)+ value.f1;
                            map.put(key, total);}

                        out.collect("processElement1  key = "+ key +" value = "+ value +"total = "+ map.get(key));}/**
                     * 第二条流的处理逻辑
                     * @param value 第二条流的数据
                     * @param ctx   上下文
                     * @param out   采集器
                     * @throws Exception
                     */@OverridepublicvoidprocessElement2(Tuple2<String,Integer> value,Context ctx,Collector<String> out)throwsException{String key = value.f0;if(!map.containsKey(key)){// 如果key不存在,则将值直接put进map
                            map.put(key, value.f1);}else{// key存在,则计算:获取上一次put的值 + 本次的值Integer total = map.get(key)+ value.f1;
                            map.put(key, total);}
                        out.collect("processElement2  key = "+ key +" value = "+ value +"total = "+ map.get(key));}});

        process.print();
        env.execute();}
3> processElement1  key = key2 value =(key2,4)total =44> processElement1  key = key1 value =(key1,1)total =14> processElement2  key = key1 value =(key1,3)total =44> processElement1  key = key1 value =(key1,2)total =63> processElement2  key = key2 value =(key2,5)total =93> processElement2  key = key2 value =(key2,6)total =15

流的联结处理函数

JoinFunction 和 ProcessJoinFunction 是 Flink 中用于执行窗口连接操作的两个不同接口

窗口联结 JoinFunction

Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键key的数据放在窗口中进行配对处理。

JoinFunction接口如下:

/**
     * 联接通过在指定的键上联接两个数据集的元素来组合它们,每对连接元素都调用此函数
     * 
     * 默认情况下,连接严格遵循SQL中 “inner join” 的语义
     *
     * @param <IN1> 第一个输入中元素的类型
     * @param <IN2> 第二个输入中元素的类型
     * @param <OUT> 结果元素的类型
     */publicinterfaceJoinFunction<IN1, IN2, OUT>extendsFunction,Serializable{/**
         * join方法,每对联接的元素调用一次
         *
         * @param first 来自第一个输入的元素
         * @param second 来自第二个输入的元素
         * @return 生成的元素
         */OUTjoin(IN1 first,IN2 second)throwsException;}

具体语法格式如下:

/**
 * 1.调用DataStream的.join()方法来合并两条流,得到一个JoinedStreams
 * 2.通过.where()和.equalTo()方法指定两条流中联结的key。注意:两者相同的元素,如果在同一窗口中,才可以匹配起来
 * 3.通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算
 */
stream1.join(stream2)// where()参数是KeySelector键选择器,用来指定第一条流中的key.where(<KeySelector>)// equalTo()传入KeySelector则指定第二条流中的key.equalTo(<KeySelector>)// window()传入窗口分配器.window(<WindowAssigner>)// apply()看作实现一个特殊的窗口函数,只能调用.apply()。传入JoinFunction是一个函数类接口,使用时需要实现内部的.join()方法,方法有两个参数,分别表示两条流中成对匹配的数据。.apply(<JoinFunction>)

示例如下:

publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);// 生成数据源1DataStreamSource<Tuple2<String,Integer>> streamSource1 = env.fromElements(Tuple2.of("a",1),Tuple2.of("a",2),Tuple2.of("b",3),Tuple2.of("c",4));// 定义 使用 Watermark策略SingleOutputStreamOperator<Tuple2<String,Integer>> stream1 = streamSource1
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts)-> value.f1 *1000L));// 生成数据源2DataStreamSource<Tuple2<String,Integer>> streamSource2 = env.fromElements(Tuple2.of("a",1),Tuple2.of("a",2),Tuple2.of("b",3),Tuple2.of("c",4),Tuple2.of("d",5),Tuple2.of("e",6));// 定义 使用 Watermark策略SingleOutputStreamOperator<Tuple2<String,Integer>> stream2 = streamSource2
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts)-> value.f1 *1000L));/**
         * 根据keyby的key进行匹配关联
         *
         * 注意:落在同一个时间窗口范围内才能匹配
         */DataStream<String> join = stream1.join(stream2)// stream1的keyby.where(r1 -> r1.f0)// stream2的keyby.equalTo(r2 -> r2.f0)// 传入窗口分配器.window(TumblingEventTimeWindows.of(Time.seconds(10)))// 传入JoinFunction函数类接口,实现内部的.join()方法,方法有两个参数,分别表示两条流中成对匹配的数据.apply(newJoinFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,String>(){/**
                     * 关联上的数据,调用join方法
                     * @param first  stream1的数据
                     * @param second stream2的数据
                     */@OverridepublicStringjoin(Tuple2<String,Integer> first,Tuple2<String,Integer> second)throwsException{return"stream1 数据: "+ first +" 关联 stream2 数据: "+ second;}});

        join.print();
        env.execute();}

执行结果如下:

stream1 数据:(a,1) 关联 stream2 数据: (a,1)
stream1 数据:(a,1) 关联 stream2 数据: (a,2)
stream1 数据:(a,2) 关联 stream2 数据: (a,1)
stream1 数据:(a,2) 关联 stream2 数据: (a,2)
stream1 数据:(c,4) 关联 stream2 数据: (c,4)
stream1 数据:(b,3) 关联 stream2 数据: (b,3)

间隔联结 ProcessJoinFunction

Interval Join即间隔联结,它是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。

ProcessJoinFunction接口情况如下 :

/**
 * 处理两个连接流的关联操作的抽象类
 * 该类允许定义自定义的处理逻辑,以在连接两个流时处理匹配的元素
 *
 * @param <IN1> 第一个输入流的元素类型
 * @param <IN2> 第二个输入流的元素类型
 * @param <OUT> 输出元素的类型
 */publicinterfaceProcessJoinFunction<IN1, IN2, OUT>{/**
     * 处理连接两个流的元素
     *
     * @param left  第一个输入流的元素
     * @param right 第二个输入流的元素
     * @param ctx   用于访问上下文信息的 Context 对象
     * @param out   用于发射输出元素的 Collector 对象
     * @throws Exception 处理时可能抛出的异常
     */voidprocessElement(IN1 left,IN2 right,Context ctx,Collector<OUT> out)throwsException;}

间隔联结使用语法如下:

// 第一条流进行KeyedStream
stream1
    .keyBy(<KeySelector>)// 得到KeyedStream之后,调用.intervalJoin()合并两条流,传入一个KeyedStream参数,两者key类型应该一致,最终得到一个IntervalJoin类型.intervalJoin(stream2.keyBy(<KeySelector>))// 通过.between()方法指定间隔的上下界.between(Time.milliseconds(-2),Time.milliseconds(1))// 调用.process()方法,定义对匹配数据对的处理操作,传入一个处理函数.process (newProcessJoinFunction<Integer,Integer,String(){@OverridepublicvoidprocessElement(Integer left,Integer right,Context ctx,Collector<String> out){
                out.collect(left +","+ right);}});

使用示例如下:

publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);// 生成数据源1DataStreamSource<Tuple2<String,Integer>> streamSource1 = env.fromElements(Tuple2.of("a",1),Tuple2.of("a",2),Tuple2.of("b",3),Tuple2.of("c",4));// 定义 使用 Watermark策略SingleOutputStreamOperator<Tuple2<String,Integer>> stream1 = streamSource1
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts)-> value.f1 *1000L));// 生成数据源2DataStreamSource<Tuple2<String,Integer>> streamSource2 = env.fromElements(Tuple2.of("a",1),Tuple2.of("a",2),Tuple2.of("b",3),Tuple2.of("c",4),Tuple2.of("d",5),Tuple2.of("e",6));// 定义 使用 Watermark策略SingleOutputStreamOperator<Tuple2<String,Integer>> stream2 = streamSource2
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts)-> value.f1 *1000L));// 对2条流分别做keyby,key就是关联条件KeyedStream<Tuple2<String,Integer>,String> keyedStream1 = stream1.keyBy(r1 -> r1.f0);KeyedStream<Tuple2<String,Integer>,String> keyedStream2 = stream2.keyBy(r2 -> r2.f0);// 执行间隔联结
        keyedStream1.intervalJoin(keyedStream2).between(Time.seconds(-2),Time.seconds(2)).process(newProcessJoinFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,String>(){/**
                             * 当两条流数据匹配上时调用这个方法
                             * @param left  stream1的数据
                             * @param right stream2的数据
                             * @param ctx   上下文
                             * @param out   采集器
                             * @throws Exception
                             */@OverridepublicvoidprocessElement(Tuple2<String,Integer> left,Tuple2<String,Integer> right,Context ctx,Collector<String> out)throwsException{// 关联的数据
                                out.collect("stream1 数据: "+ left +" 关联 stream2 数据: "+ right);}}).print();

        env.execute();}
stream1 数据:(a,1) 关联 stream2 数据: (a,1)
stream1 数据:(a,1) 关联 stream2 数据: (a,2)
stream1 数据:(a,2) 关联 stream2 数据: (a,2)
stream1 数据:(a,2) 关联 stream2 数据: (a,1)
stream1 数据:(b,3) 关联 stream2 数据: (b,3)
stream1 数据:(c,4) 关联 stream2 数据: (c,4)

迟到数据的处理

窗口间隔联结处理函数可以实现对迟到数据的处理

publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String,Integer>> streamSource1 = env.socketTextStream("112.74.96.150",8086).map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{String[] split = value.split(",");returnTuple2.of(split[0],Integer.valueOf(split[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts)-> value.f1 *1000L));SingleOutputStreamOperator<Tuple2<String,Integer>> streamSource2 = env.socketTextStream("112.74.96.150",8087).map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{String[] split = value.split(",");returnTuple2.of(split[0],Integer.valueOf(split[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts)-> value.f1 *1000L));// 对2条流分别做keyby,key就是关联条件KeyedStream<Tuple2<String,Integer>,String> keyedStream1 = streamSource1.keyBy(r1 -> r1.f0);KeyedStream<Tuple2<String,Integer>,String> keyedStream2 = streamSource2.keyBy(r2 -> r2.f0);// 定义 标记操作符的侧面输出OutputTag<Tuple2<String,Integer>> keyedStream1OutputTag =newOutputTag<>("keyedStream1",Types.TUPLE(Types.STRING,Types.INT));OutputTag<Tuple2<String,Integer>> keyedStream2OutputTag =newOutputTag<>("keyedStream2",Types.TUPLE(Types.STRING,Types.INT));// 执行间隔联结SingleOutputStreamOperator<String> process = keyedStream1.intervalJoin(keyedStream2)// 指定间隔的上界、下界的偏移,负号代表时间往前,正号代表时间往后// 若keyedStream1中某事件时间为5,则其水位线是5-3=2,其上界是 5-2=3 下界是5+2=7 即2-7这个区间能匹配keyedStream2中事件时间是2-7的数据.between(Time.seconds(-2),Time.seconds(2))// 将streamSource1迟到数据,放入侧输出流.sideOutputLeftLateData(keyedStream1OutputTag)// 将streamSource2迟到数据,放入侧输出流.sideOutputRightLateData(keyedStream2OutputTag)// 对匹配数据对的处理操作 只能处理 join上的数据.process(newProcessJoinFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,String>(){/**
                             * 当两条流数据匹配上时调用这个方法
                             * @param left  stream1的数据
                             * @param right stream2的数据
                             * @param ctx   上下文
                             * @param out   采集器
                             * @throws Exception
                             */@OverridepublicvoidprocessElement(Tuple2<String,Integer> left,Tuple2<String,Integer> right,Context ctx,Collector<String> out)throwsException{// 进入这个方法,是关联上的数据
                                out.collect("stream1 数据: "+ left +" 关联 stream2 数据: "+ right);}});

        process.print("主流");
        process.getSideOutput(keyedStream1OutputTag).printToErr("streamSource1迟到数据");
        process.getSideOutput(keyedStream2OutputTag).printToErr("streamSource2迟到数据");
        env.execute();}

1.2条流数据匹配
若keyedStream1中某事件时间为5,则其水位线是5-3=2,其上界是 5-2=3 下界是5+2=7 即2-7这个区间能匹配keyedStream2中事件时间是2-7的数据

 nc -lk 8086
key1,5
nc -lk 8087
key1,3
key1,7
key1,8
主流> stream1 数据:(key1,5) 关联 stream2 数据: (key1,3)
主流> stream1 数据:(key1,5) 关联 stream2 数据: (key1,7)

2.keyedStream2迟到数据
此时,keyedStream1中水位线是5-3=2,keyedStream2中水位线是8-3=5,多并行度下水位线取最小,即取水位线2

在keyedStream2输入事件时间1

nc -lk 8087
key1,3
key1,7
key1,8
key1,1

事件时间1 < 水位线2,且事件时间1被keyedStream1的事件时间5的上界5-2=3与下界5+2=7不包含,即数据不匹配且streamSource2数据迟到

streamSource2迟到数据>(key1,1)

3.keyedStream1迟到数据

keyedStream1输入事件时间7

nc -lk 8086
key1,5
key1,7

此时匹配到streamSource2中的8、7

主流> stream1 数据:(key1,7) 关联 stream2 数据: (key1,8)
主流> stream1 数据:(key1,7) 关联 stream2 数据: (key1,7)

此时,keyedStream1的水位线是7-3=4,keyedStream2的水位线是8-3=5,多并行度下水位线取最小,即取水位线4

keyedStream1输入事件时间3

 nc -lk 8086
key1,5
key1,7
key1,3

事件时间3 < 水位线4,且事件时间3被keyedStream2的事件时间3的上界3-2=1与下界3+2=5包含,即数据匹配且streamSource1数据迟到

streamSource1迟到数据>(key1,3)

广播流处理函数

用于连接一个主数据流和多个广播数据流。可以实现processElement 方法来处理主数据流的每个元素,同时可以处理广播数据流,通常用于数据广播和连接。

广播流处理函数有2个:

1.BroadcastProcessFunction:

广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。它是一个未keyBy的普通DataStream与一个广播流BroadcastStream做连接之后的产物

2.KeyedBroadcastProcessFunction:

按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。它是一个KeyedStream与广播流做连接

KeyedBroadcastProcessFunction

/**
 * @param <KS>  输入键控流的键类型
 * @param <IN1> 键控 (非广播) 端的输入类型
 * @param <IN2> 广播端的输入类型
 * @param <OUT> 运算符的输出类型
 */publicabstractclassKeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>extendsBaseBroadcastProcessFunction{privatestaticfinallong serialVersionUID =-2584726797564976453L;/**
     * (非广播) 的键控流中的每个元素调用此方法
     *
     * @param value 流元素
     * @param ctx   允许查询元素的时间戳、查询当前处理/事件时间以及以只读访问迭代广播状态
     * @param out   将结果元素发出
     */publicabstractvoidprocessElement(finalIN1 value,finalReadOnlyContext ctx,finalCollector<OUT> out)throwsException;/**
     * 针对broadcast stream中的每个元素调用该方法
     *
     * @param value stream元素
     * @param ctx   上下文 许查询元素的时间戳、查询当前处理/事件时间和更新广播状态
     * @param out   将结果元素发射到
     */publicabstractvoidprocessBroadcastElement(finalIN2 value,finalContext ctx,finalCollector<OUT> out)throwsException;/**
     * 当使用TimerService设置的计时器触发时调用
     *
     * @param timestamp 触发计时器的时间戳
     * @param ctx       上下文
     * @param out       返回结果值的收集器
     */publicvoidonTimer(finallong timestamp,finalOnTimerContext ctx,finalCollector<OUT> out)throwsException{}}

BroadcastProcessFunction

BroadcastProcessFunction与KeyedBroadcastProcessFunction类似,不过它是基于AllWindowedStream,也就是对没有keyBy的数据流直接开窗并调用.process()方法

publicabstractclassBroadcastProcessFunction<IN1, IN2, OUT>extendsBaseBroadcastProcessFunction{publicabstractvoidprocessElement(finalIN1 value,finalReadOnlyContext ctx,finalCollector<OUT> out)throwsException;publicabstractvoidprocessBroadcastElement(finalIN2 value,finalContext ctx,finalCollector<OUT> out)throwsException;}

使用示例

以使用KeyedBroadcastProcessFunction为例说明:

publicclassKeyedBroadcastProcessFunctionExample{/**
     * 主流 数据对象
     */@Data@AllArgsConstructor@NoArgsConstructorpublicstaticclassMainRecord{privateString key;privateint value;}/**
     * 广播流 数据对象
     */@Data@AllArgsConstructor@NoArgsConstructorpublicstaticclassBroadcastRecord{privateString configKey;privateint configValue;}/**
     * 结果 数据对象
     */@Data@AllArgsConstructor@NoArgsConstructorpublicstaticclassResultRecord{privateString key;privateint result;}// 使用给定的名称和给定的类型信息新建一个MapStateDescriptorstaticMapStateDescriptor<String,Integer> mapStateDescriptor =newMapStateDescriptor<>("broadcastState",String.class,Integer.class);publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 创建主数据流DataStream<MainRecord> mainStream = env.fromElements(newMainRecord("A",10),newMainRecord("B",20),newMainRecord("A",30));// 创建广播数据流DataStream<BroadcastRecord> broadcastStream = env.fromElements(newBroadcastRecord("config",5));// 将广播数据流转化为 BroadcastStreamBroadcastStream<BroadcastRecord> broadcast = broadcastStream.broadcast(mapStateDescriptor);// 使用 KeyedBroadcastProcessFunction 连接主数据流和广播数据流DataStream<ResultRecord> resultStream = mainStream
                .keyBy(newMainRecordKeySelector()).connect(broadcast).process(newMyKeyedBroadcastProcessFunction());

        resultStream.print();

        env.execute("KeyedBroadcastProcessFunction Example");}/**
     * 使用提供的键对其运算符状态进行分区
     */publicstaticclassMainRecordKeySelectorimplementsKeySelector<MainRecord,String>{@OverridepublicStringgetKey(MainRecord mainRecord){return mainRecord.getKey();}}/**
     *
     */publicstaticclassMyKeyedBroadcastProcessFunctionextendsKeyedBroadcastProcessFunction<String,MainRecord,BroadcastRecord,ResultRecord>{@OverridepublicvoidprocessBroadcastElement(BroadcastRecord value,Context ctx,Collector<ResultRecord> out)throwsException{// 通过上下文获取广播状态BroadcastState<String,Integer> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 处理广播数据流中的每个元素,更新广播状态
            broadcastState.put(value.getConfigKey(), value.getConfigValue());}@OverridepublicvoidprocessElement(MainRecord value,ReadOnlyContext ctx,Collector<ResultRecord> out)throwsException{// 在 processElement 中访问广播状态ReadOnlyBroadcastState<String,Integer> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 从广播状态中获取配置值Integer configValue = broadcastState.get("config");// 注意:刚启动时,可能是数据流的第1 2 3...条数据先来 不是广播流先来if(configValue ==null){return;}System.out.println(String.format("主数据流的Key: %s, value: %s, 广播更新结果: %s", value.key, value.value, value.value + configValue));// 根据配置值和主数据流中的元素执行处理逻辑int result = value.getValue()+ configValue;// 发出结果记录
            out.collect(newResultRecord(value.getKey(), result));}}}
主数据流的Key:A, value:10, 广播更新结果:15
主数据流的Key:B, value:20, 广播更新结果:252>KeyedBroadcastProcessFunctionExample.ResultRecord(key=B, result=25)7>KeyedBroadcastProcessFunctionExample.ResultRecord(key=A, result=15)
主数据流的Key:A, value:30, 广播更新结果:357>KeyedBroadcastProcessFunctionExample.ResultRecord(key=A, result=35)
标签: flink java 大数据

本文转载自: https://blog.csdn.net/qq_38628046/article/details/131466269
版权归原作者 CodeDevMaster 所有, 如有侵权,请联系我们删除。

“Flink之常用处理函数”的评论:

还没有评论