背景
在处理键值分区状态时,使用ttl设置过期时间是我们经常使用的,但是任何代码的修改都需要首先进行单元测试,本文就使用单元测试来验证一下状态ttl的设置是否正确
测试状态ttl超时的单元测试
首先看一下处理函数:
// 处理函数publicclassMyStateProcessFunctionextendsKeyedProcessFunction<String,String,String>{// 键值分区状态ValueState<String> previousInput;@Overridepublicvoidopen(Configuration parameters)throwsException{ValueStateDescriptor stateDescriptor =newValueStateDescriptor<String>("previousInput",Types.STRING);// 状态ttl超时时间设置StateTtlConfig ttlConfig =StateTtlConfig.newBuilder(Time.minutes(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// check 10 keys for every state access.cleanupIncrementally(10,false).build();
stateDescriptor.enableTimeToLive(ttlConfig);
previousInput =getRuntimeContext().getState(stateDescriptor);}@OverridepublicvoidprocessElement(String in,Context context,Collector<String> collector)throwsException{
context.timerService().registerProcessingTimeTimer(100);String out =(Objects.nonNull(previousInput.value())? previousInput.value():"")+ in;
collector.collect(out);if(!in.contains("NotUpdate")){// 为了模仿有访问状态,但是不更新状态,正常情况下业务逻辑是访问其他key组的其它state,而一直没有访问的key的状态会在超时时间到之后被清理掉
previousInput.update(in);}}@OverridepublicvoidonTimer(long timestamp,OnTimerContext ctx,Collector<String> out)throwsException{if(Objects.nonNull(previousInput.value())){
out.collect(String.format("timer trigger %s", previousInput.value()));}else{
out.collect(String.format("timer trigger state clear", previousInput.value()));}}}
单元测试代码:
/**
* 测试状态处理函数,包含状态的ttl配置,以及ontimer方法
**/@TestpublicvoidtestKeyedStateProcessFunction()throwsException{MyStateProcessFunction myStateProcessFunction =newMyStateProcessFunction();OneInputStreamOperatorTestHarness<String,String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myStateProcessFunction, x ->"1",Types.STRING);
testHarness.open();
testHarness.processElement("hello",10);// 注册了一个定时器,定时器100后过期Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// 测试输出Assert.assertEquals(Lists.newArrayList("hello"), testHarness.extractOutputValues());ValueState<String> previousInput = myStateProcessFunction.getRuntimeContext().getState(newValueStateDescriptor<>("previousInput",Types.STRING));// 查看下状态应该已经被设置Assert.assertEquals("hello", previousInput.value());
testHarness.processElement("world",10);// 再次测试输出Assert.assertEquals(Lists.newArrayList("hello","helloworld"), testHarness.extractOutputValues());// 再次查看下状态应该已经被设置Assert.assertEquals("world", previousInput.value());// 设置时间为1分钟,让状态超时
testHarness.setStateTtlProcessingTime(Time.minutes(1).toMilliseconds());// 触发下状态访问,这样flink就会清理,正常生产中不需要这一步,访问状态本来就一直在进行中,只是可能是其他key分组的状态
testHarness.processElement("NotUpdate1",System.currentTimeMillis());// 查看下状态应该已经被清理Assert.assertNull(previousInput.value());// 设置让定时器过期,顺带确认下状态已经被清理
testHarness.setProcessingTime(100);// 测试输出(包含两个输入+一个定时器的输出)Assert.assertEquals(Lists.newArrayList("hello","helloworld","NotUpdate1","timer trigger state clear"),
testHarness.extractOutputValues());
testHarness.close();}
测试代码中已经包含了详细的注解,我们实现自己的ttl单元测试时可以参考下
本文转载自: https://blog.csdn.net/lixia0417mul2/article/details/134387754
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。