问题背景
一开始编写了一个udf函数:
public class ArrayContains extends ScalarFunction {
private static final int EXIST = 1;
private static final int NOT_EXIST = -1;
// 第一个参数是待检查的数组,第二个参数是待验证元素是否存在于第一个参数中
public static int eval(List<Integer> array, List<Integer> targetArray) {
if (CollectionUtils.isEmpty(array)) {
return NOT_EXIST;
}
for (Object target : targetArray) {
if (array.contains(target)) {
return EXIST;
}
}
return NOT_EXIST;
}
功能其实很简单:判断数组中是否包含特定内容,包含任意之一就返回1,否则返回-1
之前的参数类型都是List<Integer>
然后新的需求来了:需要传入的参数类型是List<String>
显然,我们不能新建一个udf来处理List<String>的情况,但是如果我们简单改写为:
public class ArrayContains extends ScalarFunction {
private static final int EXIST = 1;
private static final int NOT_EXIST = -1;
// 第一个参数是待检查的数组,第二个参数是待验证元素是否存在于第一个参数中
public static int eval(List<Object> array, List<Object> targetArray) {
if (CollectionUtils.isEmpty(array)) {
return NOT_EXIST;
}
for (Object target : targetArray) {
if (array.contains(target)) {
return EXIST;
}
}
return NOT_EXIST;
}
会报错:
Could not extract a data type from 'java.util.List<java.lang.Object>' in parameter 0 of method 'eval' in class 'dp.udf.ArrayContains'. Please pass the required data type manually or allow RAW types
Cannot extract a data type from a pure 'java.lang.Object' class. Usually, this indicates that class information is missing or got lost. Please specify a more concrete class or treat it as a RAW type.
所以就要用到flink的自动类型推导,具体来说,有时我们希望一种求值方法可以同时处理多种数据类型,有时又要求对重载的多个求值方法仅声明一次通用的结果类型,就可以用
@FunctionHint
注解来提供从入参数据类型到结果数据类型的映射。
(详细具体可以查看:自定义函数 | Apache Flink)
解决思路
具体udf修改如下:
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.ScalarFunction;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
/**
* 判断数组中是否包含特定内容,包含任意之一就返回1,否则返回-1
* flink1.13版本还没有ARRAY_CONTAINS内置函数
*/
@FunctionHint(input = {@DataTypeHint("ARRAY<STRING>"), @DataTypeHint("ARRAY<Int>")}, output = @DataTypeHint("Int"))
@FunctionHint(input = {@DataTypeHint("ARRAY<Int>"), @DataTypeHint("ARRAY<Int>")}, output = @DataTypeHint("Int"))
public class ArrayContains extends ScalarFunction {
private static final int EXIST = 1;
private static final int NOT_EXIST = -1;
// 第一个参数是待检查的数组,第二个参数是待验证元素是否存在于第一个参数中
public static int eval(Object[] array, Object[] targetArray) {
if (array == null) {
return NOT_EXIST;
}
List arrayList = Arrays.asList(array);
if (CollectionUtils.isEmpty(arrayList)) {
return NOT_EXIST;
}
for (Object target : targetArray) {
if (arrayList.contains(target)) {
return EXIST;
}
}
return NOT_EXIST;
}
其实就是增加了类上面的 FunctionHint声明
验证
写了个几个测试用例,全部通过:
@Test
public void test() {
assertEquals(eval(null, new Integer[]{101, 1}), NOT_EXIST);
assertEquals(eval(new Integer[]{}, new Integer[]{101, 1}), NOT_EXIST);
assertEquals(eval(new Integer[]{1, 2, 101}, new Integer[]{101, 1}), EXIST);
assertEquals(eval(new Integer[]{1, 2, 101}, new Integer[]{1, 2}), EXIST);
assertEquals(eval(new Integer[]{1, 2, 101}, new Integer[]{3}), NOT_EXIST);
assertEquals(eval(new Integer[]{2}, new Integer[]{1, 2, 3, 4, 5, 6, 8, 100, 101}), EXIST);
assertEquals(eval(new Integer[]{3, 100, 101}, new Integer[]{1, 2, 3, 4, 5, 6, 8, 100, 101}), EXIST);
assertEquals(eval(new Integer[]{3, 100, 101}, new Integer[]{99, 101}), EXIST);
assertEquals(eval(new Integer[]{3, 100, 101}, new Integer[]{99}), NOT_EXIST);
assertEquals(eval(null, new String[]{"1", "101"}), NOT_EXIST);
assertEquals(eval(new String[]{}, new String[]{"1", "101"}), NOT_EXIST);
assertEquals(eval(new String[]{"1", "2", "101"}, new String[]{"1", "101"}), EXIST);
assertEquals(eval(new String[]{"1", "2", "101"}, new String[]{"1", "2"}), EXIST);
assertEquals(eval(new String[]{"1", "2", "101"}, new String[]{"3"}), NOT_EXIST);
assertEquals(eval(new String[]{"2"}, new String[]{"1", "2", "3", "4", "5", "6", "8", "100", "101"}), EXIST);
assertEquals(eval(new String[]{"101", "3", "100"}, new String[]{"1", "2", "3", "4", "5", "6", "8", "100", "101"}), EXIST);
assertEquals(eval(new String[]{"101", "3", "100"}, new String[]{"101", "99"}), EXIST);
assertEquals(eval(new String[]{"101", "3", "100"}, new String[]{"99"}), NOT_EXIST);
}
版权归原作者 天上地下 所有, 如有侵权,请联系我们删除。