0


Flink旁路输出特性简单实例:按照股价对股票进行数据分流并写出到文本文件

关于旁路输出的官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/side_output/

除了由 DataStream 操作产生的主要流之外,我们还可以产生任意数量的旁路输出结果流。结果流中的数据类型不必与主要流中的数据类型相匹配,并且不同旁路输出的类型也可以不同。

使用旁路输出时,首先需要定义用于标识旁路输出流的OutputTag类对象。
构造方法的第一个参数表示一个区分旁路输出流的id标识,第二个参数表示要处理的数据类型。

OutputTag<String> outputTag =newOutputTag<String>("side-output",Types.STRING);

定义旁路输出标签后,通过主输出流的process方法,把数据发送到旁路输出流中。

SingleOutputStreamOperator<String> process = input
  .process(newProcessFunction<String,Object>(){@OverridepublicvoidprocessElement(String value,Context ctx,Collector<Object> collector)throwsException{// 发送数据到主要的输出
        collector.collect(value);// 发送数据到旁路输出
        ctx.output(outputTag,"sideout-"+ value);}});

我们可以在 DataStream 运算结果上使用 getSideOutput(OutputTag) 方法获取旁路输出流,这将产生一个与旁路输出流结果类型一致的 DataStream。

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);

以股票为例,在发送股票数据时,我们假设股价小于50就是低价股,否则就是高级股,在发送股票数据时,我们希望把高价股和低价股分别写出到不同的文件中保存起来。

首先,创建一个股票类Stock:

publicclassStock{//股票名称privateString name;//股票价格privateInteger price;//构造方法、getter、setter、toString方法在此省略}

其次,编写Flink消费者程序:

publicclassFlinkKafkaConsumer{publicstaticvoidmain(String[] args)throwsException{//1.创建执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//2.配置文件Properties props =newProperties();
        props.put("bootstrap.servers","Kafka集群地址");//3.构造消费者FlinkKafkaConsumer<String> consumer =newFlinkKafkaConsumer<>("stock",newSimpleStringSchema(), props);//4.配置消费者DataStreamSource stream = env.addSource(consumer);//5.data sick
        stream.addSick(newSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context){Syestem.out.println("当前已处理的数据:"+JsonUtils.deserialize(value,Stock.class));}});//6.执行程序
        env.execute("消费者程序");}}

接着,编写生产者程序:

​publicclassFlinkProducer{publicstaticvoidmain(String[] args)throwsException{//结果输出路径String outputPath1 ="C:\\Users\\xxx\\Desktop\\result1";String outputPath2 ="C:\\Users\\xxx\\Desktop\\result2";//创建数据源ArrayList<String> stocks =newArrayList<>();for(int i =0; i <100; i++){
            stocks.add(JsonUtils.serialize(newStock("stock-"+ i,(int)(Math.random()*100))));}//创建消费者环境StreamExecutionEnvironment producerEnvironment =StreamExecutionEnvironment.getExecutionEnvironment();//配置文件Properties props =newProperties();
        props.put("bootstrap.servers","10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");//构建生产者FlinkKafkaProducer<String> producer_all =newFlinkKafkaProducer<>("STOCK",newSimpleStringSchema(), props);//配置数据源DataStreamSource<String> stream_all = producerEnvironment.fromCollection(stocks);//创建旁路输出finalOutputTag<String> outputTag_lowPrice =newOutputTag<>("STOCK_LOW_PRICE",Types.STRING);finalOutputTag<String> outputTag_highPrice =newOutputTag<>("STOCK_HIGH_PRICE",Types.STRING);//配置旁路输出SingleOutputStreamOperator<Object> process = stream_all.process(newProcessFunction<String,Object>(){@OverridepublicvoidprocessElement(String s,ProcessFunction<String,Object>.Context context,Collector<Object> collector){Stock stock =JsonUtils.deserialize(s,Stock.class);
                collector.collect(s);if(stock.getPrice()<50){
                    context.output(outputTag_lowPrice, s);}else{
                    context.output(outputTag_highPrice, s);}}});//获取低价股票和高价股票的旁路输出DataStream<String> stream_lowPrice = process.getSideOutput(outputTag_lowPrice);DataStream<String> stream_highPrice = process.getSideOutput(outputTag_highPrice);//配置生产者
        stream_all.addSink(producer_all);//处理低价股票
        stream_lowPrice.writeAsText(outputPath1,FileSystem.WriteMode.OVERWRITE);//处理高价股票
        stream_highPrice.writeAsText(outputPath2,FileSystem.WriteMode.OVERWRITE);//执行flink程序
        producerEnvironment.execute("生产者流处理");}}

依次启动消费者程序、生产者程序,观察消费者程序控制台中的输出:
在这里插入图片描述
此时,桌面生成了两个文件夹,result1记录了小于50的股票,result2相反:
在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_41112238/article/details/123113172
版权归原作者 2020GetGoodOffer 所有, 如有侵权,请联系我们删除。

“Flink旁路输出特性简单实例:按照股价对股票进行数据分流并写出到文本文件”的评论:

还没有评论