0


50、Flink的单元测试介绍及示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、Flink测试概述

Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。
本文示例的maven依赖

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version></dependency><dependency><groupId>org.mockito</groupId><artifactId>mockito-core</artifactId><version>4.0.0</version><scope>test</scope></dependency></dependencies>

二、测试用户自定义函数

可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。

1、单元测试无状态、无时间限制的 UDF

1)、示例-mapFunction

以下无状态的 MapFunction 为例

publicclassIncrementMapFunctionimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Long record)throwsException{return record +1;}}

通过传递合适地参数并验证输出,可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。

importstaticorg.junit.Assert.assertEquals;importorg.apache.flink.api.common.functions.MapFunction;importorg.junit.Test;/**
 * @author alanchan
 *
 */publicclassTestDemo{publicclassIncrementMapFunctionimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Long record)throwsException{return record +1;}}@TestpublicvoidtestIncrement()throwsException{IncrementMapFunction incrementer =newIncrementMapFunction();assertEquals((Long)3L, incrementer.map(2L));}}

2)、示例-flatMapFunction

对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行单元测试。

importstaticorg.mockito.Mockito.mock;importstaticorg.mockito.Mockito.times;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.util.Collector;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.mockito.Mockito;importorg.mockito.junit.MockitoJUnitRunner;/**
 * @author alanchan
 *
 */@RunWith(MockitoJUnitRunner.class)publicclassTestDemo2{publicstaticclassIncrementFlatMapFunctionimplementsFlatMapFunction<String,Long>{@OverridepublicvoidflatMap(String value,Collector<Long> out)throwsException{Long sum =0L;for(String num : value.split(",")){
                sum +=Long.valueOf(num);}
            out.collect(sum);}}@TestpublicvoidtestSum()throwsException{IncrementFlatMapFunction incrementer =newIncrementFlatMapFunction();Collector<Long> collector =mock(Collector.class);
        incrementer.flatMap("1,2,3,4,5", collector);Mockito.verify(collector,times(1)).collect(15L);}}

2、对有状态或及时 UDF 和自定义算子进行单元测试

对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

  • OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
  • KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
  • TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
  • KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。

1)、DataStream API 测试依赖

如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>1.17.2</version><scope>test</scope></dependency>

在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。

2)、Table API 测试依赖

如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>1.17.2</version><scope>test</scope></dependency>

这将自动引入查询计划器和运行时,分别用于计划和执行查询。

flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。

3)、flatmap function 单元测试

现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。

  • 示例如下
/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 单元测试flatmap,如果是偶数则存储原值及平方数
 */importjava.util.concurrent.ConcurrentLinkedQueue;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.streaming.api.operators.StreamFlatMap;importorg.apache.flink.streaming.api.watermark.Watermark;importorg.apache.flink.streaming.runtime.streamrecord.StreamRecord;importorg.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;importorg.apache.flink.streaming.util.TestHarnessUtil;importorg.apache.flink.util.Collector;importorg.junit.Before;importorg.junit.Test;publicclassTestStatefulFlatMapDemo3{staticclassAlanFlatMapFunctionimplementsFlatMapFunction<Integer,Integer>{@OverridepublicvoidflatMap(Integer value,Collector<Integer> out)throwsException{if(value %2==0){
                out.collect(value);
                out.collect(value * value);}}}OneInputStreamOperatorTestHarness<Integer,Integer> testHarness;@BeforepublicvoidsetupTestHarness()throwsException{StreamFlatMap<Integer,Integer> operator =newStreamFlatMap<Integer,Integer>(newAlanFlatMapFunction());

        testHarness =newOneInputStreamOperatorTestHarness<Integer,Integer>(operator);
        testHarness.open();}@TestpublicvoidtestFlatMap2()throwsException{long initialTime =0L;ConcurrentLinkedQueue<Object> expectedOutput =newConcurrentLinkedQueue<Object>();

        testHarness.processElement(newStreamRecord<Integer>(1, initialTime +1));
        testHarness.processElement(newStreamRecord<Integer>(2, initialTime +2));
        testHarness.processWatermark(newWatermark(initialTime +2));
        testHarness.processElement(newStreamRecord<Integer>(3, initialTime +3));
        testHarness.processElement(newStreamRecord<Integer>(4, initialTime +4));
        testHarness.processElement(newStreamRecord<Integer>(5, initialTime +5));
        testHarness.processElement(newStreamRecord<Integer>(6, initialTime +6));
        testHarness.processElement(newStreamRecord<Integer>(7, initialTime +7));
        testHarness.processElement(newStreamRecord<Integer>(8, initialTime +8));

        expectedOutput.add(newStreamRecord<Integer>(2, initialTime +2));
        expectedOutput.add(newStreamRecord<Integer>(4, initialTime +2));
        expectedOutput.add(newWatermark(initialTime +2));
        expectedOutput.add(newStreamRecord<Integer>(4, initialTime +4));
        expectedOutput.add(newStreamRecord<Integer>(16, initialTime +4));
        expectedOutput.add(newStreamRecord<Integer>(6, initialTime +6));
        expectedOutput.add(newStreamRecord<Integer>(36, initialTime +6));
        expectedOutput.add(newStreamRecord<Integer>(8, initialTime +8));
        expectedOutput.add(newStreamRecord<Integer>(64, initialTime +8));TestHarnessUtil.assertOutputEquals("输出结果", expectedOutput, testHarness.getOutput());}}

KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。

  • 示例如下
/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 按照城市分类,并将城市缩写变成大写
 */importcom.google.common.collect.Lists;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.api.common.state.ValueState;importorg.apache.flink.api.common.state.ValueStateDescriptor;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.operators.StreamFlatMap;importorg.apache.flink.streaming.runtime.streamrecord.StreamRecord;importorg.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;importorg.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;importorg.apache.flink.util.Collector;importorg.junit.Assert;importorg.junit.Before;importorg.junit.Test;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;publicclassTestStatefulFlatMapDemo2{@Data@NoArgsConstructor@AllArgsConstructorstaticclassUser{privateint id;privateString name;privateint age;privateString city;}staticclassAlanFlatMapFunctionextendsRichFlatMapFunction<User,User>{// The state is only accessible by functions applied on a {@code KeyedStream}ValueState<User> previousInput;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);
            previousInput =getRuntimeContext().getState(newValueStateDescriptor<User>("previousInput",User.class));}@OverridepublicvoidflatMap(User input,Collector<User> out)throwsException{
            previousInput.update(input);
            input.setCity(input.getCity().toUpperCase());
            out.collect(input);}}AlanFlatMapFunction alanFlatMapFunction =newAlanFlatMapFunction();OneInputStreamOperatorTestHarness<User,User> testHarness;@BeforepublicvoidsetupTestHarness()throwsException{
        alanFlatMapFunction =newAlanFlatMapFunction();

        testHarness =newKeyedOneInputStreamOperatorTestHarness<>(newStreamFlatMap<>(alanFlatMapFunction),newKeySelector<User,String>(){@OverridepublicStringgetKey(User value)throwsException{return value.getCity();}},Types.STRING);
        
        testHarness.open();}@TestpublicvoidtestFlatMap()throwsException{
        testHarness.processElement(newUser(1,"alanchan",18,"sh"),10);ValueState<User> previousInput = alanFlatMapFunction.getRuntimeContext().getState(newValueStateDescriptor<>("previousInput",User.class));User stateValue = previousInput.value();Assert.assertEquals(Lists.newArrayList(newStreamRecord<>(newUser(1,"alanchan",18,"sh".toUpperCase()),10)),
                testHarness.extractOutputStreamRecords());Assert.assertEquals(newUser(1,"alanchan",18,"sh".toUpperCase()), stateValue);

        testHarness.processElement(newUser(2,"alan",19,"bj"),10000);Assert.assertEquals(Lists.newArrayList(newStreamRecord<>(newUser(1,"alanchan",18,"sh".toUpperCase()),10),newStreamRecord<>(newUser(2,"alan",19,"bj".toUpperCase()),10000)),
                testHarness.extractOutputStreamRecords());Assert.assertEquals(newUser(2,"alan",19,"bj".toUpperCase()), previousInput.value());}}

4)、Process Function 单元测试

除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。

  • OneInputStreamOperatorTestHarness示例
importcom.google.common.collect.Lists;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.streaming.api.functions.KeyedProcessFunction;importorg.apache.flink.streaming.api.operators.KeyedProcessOperator;importorg.apache.flink.streaming.runtime.streamrecord.StreamRecord;importorg.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;importorg.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;importorg.apache.flink.util.Collector;importorg.junit.Assert;importorg.junit.Before;importorg.junit.Test;/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */publicclassTestProcessOperatorDemo1{// public abstract class KeyedProcessFunction<K, I, O>staticclassAlanProcessFunctionextendsKeyedProcessFunction<String,String,String>{@OverridepublicvoidprocessElement(String value,KeyedProcessFunction<String,String,String>.Context ctx,Collector<String> out)throwsException{
            ctx.timerService().registerProcessingTimeTimer(50);
            out.collect("vx->"+ value);}@OverridepublicvoidonTimer(long timestamp,OnTimerContext ctx,Collector<String> out)throwsException{// 到达时间点触发事件操作
            out.collect(String.format("定时器在 %d 被触发", timestamp));}}privateOneInputStreamOperatorTestHarness<String,String> testHarness;privateAlanProcessFunction processFunction;@BeforepublicvoidsetupTestHarness()throwsException{
        processFunction =newAlanProcessFunction();

        testHarness =newKeyedOneInputStreamOperatorTestHarness<>(newKeyedProcessOperator<>(processFunction),
                x ->"1",Types.STRING);// Function time is initialized to 0
        testHarness.open();}@TestpublicvoidtestProcessElement()throwsException{
        testHarness.processElement("alanchanchn",10);Assert.assertEquals(Lists.newArrayList(newStreamRecord<>("vx->alanchanchn",10)),
                testHarness.extractOutputStreamRecords());}@TestpublicvoidtestOnTimer()throwsException{// test first record
        testHarness.processElement("alanchanchn",10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time 设置为 100
        testHarness.setProcessingTime(100);Assert.assertEquals(Lists.newArrayList(newStreamRecord<>("vx->alanchanchn",10),newStreamRecord<>("定时器在 100 被触发")),
                testHarness.extractOutputStreamRecords());}}
  • ProcessFunctionTestHarnesses示例

本示例通过ProcessFunctionTestHarnesses验证了ProcessFunction、KeyedProcessFunction、CoProcessFunction、KeyedCoProcessFunction和BroadcastProcessFunction,基本完成了覆盖。

importjava.util.Arrays;importjava.util.Collections;importorg.apache.flink.api.common.state.MapStateDescriptor;importorg.apache.flink.api.common.state.ReadOnlyBroadcastState;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.streaming.api.functions.KeyedProcessFunction;importorg.apache.flink.streaming.api.functions.ProcessFunction;importorg.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;importorg.apache.flink.streaming.api.functions.co.CoProcessFunction;importorg.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;importorg.apache.flink.streaming.util.BroadcastOperatorTestHarness;importorg.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;importorg.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;importorg.apache.flink.streaming.util.ProcessFunctionTestHarnesses;importorg.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;importorg.apache.flink.util.Collector;importorg.junit.Assert;importorg.junit.Test;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/*
 * @Author: alanchan
 * 
 * @LastEditors: alanchan
 * 
 * @Description:
 */publicclassTestProcessOperatorDemo3{@Data@NoArgsConstructor@AllArgsConstructorstaticclassUser{privateint id;privateString name;privateint age;privateString city;}// 测试ProcessFunction 的 processElement@TestpublicvoidtestProcessFunction()throwsException{// public abstract class ProcessFunction<I, O>ProcessFunction<String,String> function =newProcessFunction<String,String>(){@OverridepublicvoidprocessElement(String value,Context ctx,Collector<String> out)throwsException{
                out.collect("vx->"+ value);}};OneInputStreamOperatorTestHarness<String,String> harness =ProcessFunctionTestHarnesses.forProcessFunction(function);

        harness.processElement("alanchanchn",10);Assert.assertEquals(harness.extractOutputValues(),Collections.singletonList("vx->alanchanchn"));}// 测试KeyedProcessFunction 的 processElement@TestpublicvoidtestKeyedProcessFunction()throwsException{// public abstract class KeyedProcessFunction<K, I, O>KeyedProcessFunction<String,String,String> function =newKeyedProcessFunction<String,String,String>(){@OverridepublicvoidprocessElement(String value,KeyedProcessFunction<String,String,String>.Context ctx,Collector<String> out)throwsException{
                out.collect("vx->"+ value);}};OneInputStreamOperatorTestHarness<String,String> harness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(function, x ->"name",BasicTypeInfo.STRING_TYPE_INFO);

        harness.processElement("alanchan",10);Assert.assertEquals(harness.extractOutputValues(),Collections.singletonList(1));}// 测试CoProcessFunction 的 processElement1、processElement2@TestpublicvoidtestCoProcessFunction()throwsException{// public abstract class CoProcessFunction<IN1, IN2, OUT>CoProcessFunction<String,User,User> function =newCoProcessFunction<String,User,User>(){@OverridepublicvoidprocessElement1(String value,CoProcessFunction<String,User,User>.Context ctx,Collector<User> out)throwsException{String[] userStr = value.split(",");
                out.collect(newUser(Integer.parseInt(userStr[0]), userStr[1],Integer.parseInt(userStr[2]), userStr[3]));}@OverridepublicvoidprocessElement2(User value,CoProcessFunction<String,User,User>.Context ctx,Collector<User> out)throwsException{
                out.collect(value);}};TwoInputStreamOperatorTestHarness<String,User,User> harness =ProcessFunctionTestHarnesses.forCoProcessFunction(function);

        harness.processElement2(newUser(2,"alan",19,"bj"),100);
        harness.processElement1("1,alanchan,18,sh",10);Assert.assertEquals(harness.extractOutputValues(),Arrays.asList(newUser(1,"alanchan",18,"sh"),newUser(2,"alan",19,"bj")));}// 测试KeyedCoProcessFunction 的 processElement1和processElement2@TestpublicvoidtestKeyedCoProcessFunction()throwsException{// public abstract class KeyedCoProcessFunction<K, IN1, IN2, OUT>KeyedCoProcessFunction<String,String,User,User> function =newKeyedCoProcessFunction<String,String,User,User>(){@OverridepublicvoidprocessElement1(String value,KeyedCoProcessFunction<String,String,User,User>.Context ctx,Collector<User> out)throwsException{String[] userStr = value.split(",");
                out.collect(newUser(Integer.parseInt(userStr[0]), userStr[1],Integer.parseInt(userStr[2]), userStr[3]));}@OverridepublicvoidprocessElement2(User value,KeyedCoProcessFunction<String,String,User,User>.Context ctx,Collector<User> out)throwsException{
                out.collect(value);}};// public static <K,IN1,IN2,OUT>// KeyedTwoInputStreamOperatorTestHarness<K,IN1,IN2,OUT>// forKeyedCoProcessFunction(// KeyedCoProcessFunction<K,IN1,IN2,OUT> function,// KeySelector<IN1,K> keySelector1,// KeySelector<IN2,K> keySelector2,// TypeInformation<K> keyType)KeyedTwoInputStreamOperatorTestHarness<String,String,User,User> harness =ProcessFunctionTestHarnesses.forKeyedCoProcessFunction(function,newKeySelector<String,String>(){@OverridepublicStringgetKey(String value)throwsException{return value.split(",")[3];}},newKeySelector<User,String>(){@OverridepublicStringgetKey(User value)throwsException{return value.getCity();}},TypeInformation.of(String.class));

        harness.processElement2(newUser(2,"alan",19,"bj"),100);
        harness.processElement1("1,alanchan,18,sh",10);Assert.assertEquals(harness.extractOutputValues(),Arrays.asList(newUser(1,"alanchan",18,"sh"),newUser(2,"alan",19,"bj")));}// 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement@TestpublicvoidtestBroadcastOperator()throwsException{// 定义广播// 数据格式:// sh,上海// bj,北京// public class MapStateDescriptor<UK, UV>MapStateDescriptor<String,String> broadcastDesc =newMapStateDescriptor("Alan_RulesBroadcastState",String.class,String.class);// public abstract class BroadcastProcessFunction<IN1, IN2, OUT>// * @param <IN1> The input type of the non-broadcast side.// * @param <IN2> The input type of the broadcast side.// * @param <OUT> The output type of the operator.BroadcastProcessFunction<User,String,User> function =newBroadcastProcessFunction<User,String,User>(){// 负责处理广播流的元素@OverridepublicvoidprocessBroadcastElement(String value,BroadcastProcessFunction<User,String,User>.Context ctx,Collector<User> out)throwsException{System.out.println("收到广播数据:"+ value);// 得到广播流的存储状态
                ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]);}// 处理非广播流,关联维度@OverridepublicvoidprocessElement(User value,BroadcastProcessFunction<User,String,User>.ReadOnlyContext ctx,Collector<User> out)throwsException{// 得到广播流的存储状态ReadOnlyBroadcastState<String,String> state = ctx.getBroadcastState(broadcastDesc);

                value.setCity(state.get(value.getCity()));
                out.collect(value);}};BroadcastOperatorTestHarness<User,String,User> harness =ProcessFunctionTestHarnesses.forBroadcastProcessFunction(function, broadcastDesc);

        harness.processBroadcastElement("sh,上海",10);
        harness.processBroadcastElement("bj,北京",20);

        harness.processElement(newUser(2,"alan",19,"bj"),10);
        harness.processElement(newUser(1,"alanchan",18,"sh"),30);Assert.assertEquals(harness.extractOutputValues(),Arrays.asList(newUser(1,"alanchan",18,"上海"),newUser(2,"alan",19,"北京")));}}

三、测试 Flink 作业

1、JUnit 规则 MiniClusterWithClientResource

Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.

要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>1.17.2</version><scope>test</scope></dependency>

让我们采用与前面几节相同的简单 MapFunction来做示例。

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */packagecom.win;importstaticorg.junit.Assert.assertFalse;importstaticorg.junit.Assert.assertTrue;importjava.util.ArrayList;importjava.util.Arrays;importjava.util.Collections;importjava.util.List;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importorg.apache.flink.test.util.MiniClusterResourceConfiguration;importorg.apache.flink.test.util.MiniClusterWithClientResource;importorg.junit.ClassRule;importorg.junit.Test;publicclassTestExampleIntegrationDemo{staticclassAlanIncrementMapFunctionimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Long record)throwsException{return record +1;}}@ClassRulepublicstaticMiniClusterWithClientResource flinkCluster =newMiniClusterWithClientResource(newMiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(2).setNumberTaskManagers(1).build());@TestpublicvoidtestIncrementPipeline()throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environment
        env.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformations
        env.fromElements(1L,21L,22L).map(newAlanIncrementMapFunction()).addSink(newCollectSink());// execute
        env.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Arrays.asList(2L,22L,23L)));}// create a testing sinkprivatestaticclassCollectSinkimplementsSinkFunction<Long>{// must be staticpublicstaticfinalList<Long> values =Collections.synchronizedList(newArrayList<>());@Overridepublicvoidinvoke(Long value,SinkFunction.Context context)throwsException{
            values.add(value);}}}

关于使用 MiniClusterWithClientResource 进行集成测试的几点备注:

  • 为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。
  • 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中。
  • 如果你的作业使用事件时间计时器,则可以实现自定义的 并行 源函数来发出 watermark。
  • 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。
  • 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。
  • 如果你的 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,你需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。

以上,本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。


本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/134685259
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。

“50、Flink的单元测试介绍及示例”的评论:

还没有评论