0


flink的副输出sideoutput单元测试

背景

处理函数中处理输出主输出的数据流数据外,也可以输出多个其他的副输出的数据流数据,当我们的处理函数有副输出时,我们需要测试他们功能的正确性,本文就提供一个测试flink副输出单元测试的例子

测试flink副输出单元测试

首先看一下处理函数,其中包含副输出逻辑

publicclassMySideOutputProcessFunctionextendsProcessFunction<String,String>{publicstaticfinalOutputTag<String>OUTPUT_TAG=newOutputTag<String>("sideoutput"){};@OverridepublicvoidprocessElement(String value,Context ctx,Collector<String> out)throwsException{
        out.collect("normal:"+ value);
        ctx.output(OUTPUT_TAG,"side:"+ value);}}

其次,看下对应的单元测试

/**
 * 测试sideOutput的输出功能
 */@TestpublicvoidtestSideOutput()throwsException{MySideOutputProcessFunction mySideOutputProcessFunction =newMySideOutputProcessFunction();OneInputStreamOperatorTestHarness<String,String> testHarness =ProcessFunctionTestHarnesses.forProcessFunction(mySideOutputProcessFunction);
    testHarness.open();
    testHarness.processElement("hello",10);// 测试主输出Assert.assertEquals(Lists.newArrayList("normal:hello"), testHarness.extractOutputValues());ConcurrentLinkedQueue<StreamRecord<String>> sideOutPutQueue =
            testHarness.getSideOutput(MySideOutputProcessFunction.OUTPUT_TAG);// 测试副输出Assert.assertEquals(Lists.newArrayList("side:hello"),
            sideOutPutQueue.stream().map(StreamRecord::getValue).collect(Collectors.toList()));
    testHarness.close();}

本文转载自: https://blog.csdn.net/lixia0417mul2/article/details/134387791
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。

“flink的副输出sideoutput单元测试”的评论:

还没有评论