0


flink处理函数--副输出功能

背景

在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出

副输出

本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:

packagewikiedits.processfunc.job;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.OutputTag;importwikiedits.processfunc.pojo.SensorReading;importwikiedits.processfunc.process.FreezingMonitor;importwikiedits.processfunc.source.SensorSource;publicclassSideOutPutJob{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment see =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> readings = see.addSource(newSensorSource());SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(newFreezingMonitor());// 打印附输出
        monitoredReadings.getSideOutput(newOutputTag<String>("freezing-alarms"){}).print();// 打印主输出
        monitoredReadings.print();
        see.execute();}}packagewikiedits.processfunc.process;importorg.apache.flink.streaming.api.functions.ProcessFunction;importorg.apache.flink.util.Collector;importorg.apache.flink.util.OutputTag;importwikiedits.processfunc.pojo.SensorReading;publicclassFreezingMonitorextendsProcessFunction<SensorReading,SensorReading>{privateOutputTag<String> freezingAlarmOutput =newOutputTag<String>("freezing-alarms"){};@OverridepublicvoidprocessElement(SensorReading value,Context ctx,Collector<SensorReading> out)throwsException{if(value.temperature <32.0){
            ctx.output(freezingAlarmOutput,"freezing alarm for "+ value.id +" :"+ value.temperature);}
        out.collect(value);}}packagewikiedits.processfunc.source;/*
 * Copyright 2015 Fabian Hueske / Vasia Kalavri
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */importorg.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;importwikiedits.processfunc.pojo.SensorReading;importjava.util.Calendar;importjava.util.Random;/**
 * Flink SourceFunction to generate SensorReadings with random temperature values.
 *
 * Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.
 *
 * Note: This is a simple data-generating source function that does not checkpoint its state.
 * In case of a failure, the source does not replay any data.
 */publicclassSensorSourceextendsRichParallelSourceFunction<SensorReading>{// flag indicating whether source is still runningprivateboolean running =true;/** run() continuously emits SensorReadings by emitting them through the SourceContext. */@Overridepublicvoidrun(SourceContext<SensorReading> srcCtx)throwsException{// initialize random number generatorRandom rand =newRandom();// look up index of this parallel taskint taskIdx =this.getRuntimeContext().getIndexOfThisSubtask();// initialize sensor ids and temperaturesString[] sensorIds =newString[10];double[] curFTemp =newdouble[10];for(int i =0; i <10; i++){
            sensorIds[i]="sensor_"+(taskIdx *10+ i);
            curFTemp[i]=65+(rand.nextGaussian()*20);}while(running){// get current timelong curTime =Calendar.getInstance().getTimeInMillis();// emit SensorReadingsfor(int i =0; i <10; i++){// update current temperature
                curFTemp[i]+= rand.nextGaussian()*0.5;// emit reading
                srcCtx.collect(newSensorReading(sensorIds[i], curTime, curFTemp[i]));}// wait for 100 msThread.sleep(3000);}}/** Cancels this SourceFunction. */@Overridepublicvoidcancel(){this.running =false;}}

程序运行结果:
在这里插入图片描述

标签: flink 算法 大数据

本文转载自: https://blog.csdn.net/lixia0417mul2/article/details/133486768
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。

“flink处理函数--副输出功能”的评论:

还没有评论