0


Hive 自定义函数

导读

大家好,我是数据开发者,非常感谢大家最近的关注,你们的关注是我持续输出的动力,让我们共同提高。

大数据平台下的数据仓库搭建过程中存在许多个性化的业务需求,或者说是系统需求,无法在 Hive 提供的内置函数中找到解决办法,为了解决这个问题,我们可以使用 Hive 自定义函数满足这种个性化需求。我们本篇就使用 Hive 中的自定义函数实现几种数仓搭建过程中的需求。

Hive 有哪些自定义函数?

Hive 中存在三种类型的函数,分别是 UDF, UDAF, UDTF,这三种类型的 UDF 分别是:

UDF(user defined function):用户自定义函数,操作单行数据,并产生单行数据,也就通常理解的单进单出,与 concat,nvl 等类似; UDAF(user defined aggregation function):用户自定义聚合函数,操作多行数据,并产生单条数据,与常见的 sum,count 函数类似,也可以理解为多进一出; UDTF(user defined table-generating function):用户自定义表生成函数,操作单行数据,并产生多行输出,可以理解为一进多出,类似于 Hive 内置的 explode 函数。

以上三种类型的函数对我们理解数据开发过程有很大帮助,无论是关系型数据库,还是大数据平台的数据开发,这三种类型分别代表了三种数据处理方式,可以映射到数据仓库开发过程中的 ETL 过程,为了更好的理解这三种自定义函数,我们需要使用 Java 开发自定义函数。

开发环境准备

本次代码可以通过代码包的方式发送给大家参考学习,大家可以关注公众号后发送 udf 获取代码包。
1,JDK 1.8.0
2,Eclipse 2022-06 (4.24.0)
2,Maven 3.8.1
3,Hive 3.1.2

除了以上软件的准备之外,我们还需要准备一张表,这张表结构如下:
字段名称字段类型业务含义idint代理键namestring姓名order_dtestring订单日期formulastring计算公式
根据上面的表结构准备测试样例数据,样例数据如下所示:

create table udf_exmaple(
id int,
name string,
order_dte string,
formula string
)
comment 'udf example'
stored as parquet;
OK
Time taken: 0.405 seconds

select id,name,order_dte, formula from udf_example;

id name order_dte formula
4029 udf_test4029 2021-12-12 1*3*2000/4029
4028 udf_test4028 2021-12-11 1*3*2000/4028
4027 udf_test4027 2021-12-10 1*3*2000/4027
4026 udf_test4026 2021-12-09 1*3*2000/4026
4025 udf_test4025 2021-12-08 1*3*2000/4025

UDF 开发过程

为了能够掌握 Hive UDF 的开发, 我们设定如下需求:

  • 通过 UDF 实现 eval() 公式,该函数可以完成字符串公式的计算,比如 eval(4/2) = 2.0;
    

1),首先通过终端控制台进入项目创建目录,通过下面的 maven archetype 创建 maven 项目,也可以通过 eclipse 创建项目向导创建 maven 项目。

mvn archetype:generate \
-DgroupId=com.data.developer \   
-DartifactId=UDF \
-DpackageName=com.data.developer.udf \
-Dversion=1.0 \
-DinteractiveMode=false \

2),打开 eclipse 开发工具,并导入第一步创建的 Maven 项目;
导入 Maven 项目
导入 Maven 项目

3),UDF 项目 POM 内容如下,其中依赖项中的

元素的作用是控制依赖元素的使用范围。通俗的将,就是控制 Jar 包在哪些范围被加载和使用。包括编译,测试,运行,并且还可以加上是否被打入包中。常见的作用域范围有: compile, test, provided, runtime, system, import。默认值是 compile,是比较强的依赖,适用于所有阶段,意味着依赖会被打包,随着项目一起发布,在编译,测试,运行都有效。我们这里对其中一个依赖包使用了编译范围,其余 Hive 相关依赖包在运行环境不需要,就使用了 provided 范围,所以运行时 provided 范围的依赖不会被打包。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.data.developer</groupId>
 <artifactId>HiveUDF</artifactId>
 <version>1.0.0</version>
 <packaging>jar</packaging>

 <name>HiveUDF</name>

 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <hadoop.version>3.1.2</hadoop.version>
  <hive.version>3.1.2</hive.version>
  <grovvy.version>3.0.9</grovvy.version>
  <junit.version>3.8.1</junit.version>
 </properties>

 <dependencies>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>${junit.version}</version>
   <scope>test</scope>

  </dependency>
  <dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-exec</artifactId>
   <version>${hive.version}</version>
   <scope>provided</scope>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-common</artifactId>
   <version>${hadoop.version}</version>
   <scope>provided</scope>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.codehaus.groovy/groovy-jsr223 -->
  <dependency>
      <groupId>org.codehaus.groovy</groupId>
      <artifactId>groovy-jsr223</artifactId>
      <version>${grovvy.version}</version>
  </dependency>
 </dependencies>
</project>

4),创建 UDF 代码,Hive 提供了两种实现 UDF 的方法,一个是基础的 UDF (org.apache.hadoop.hive.ql. exec.UDF),另外一个是复杂的 GenericUDF (org.apache.hadoop.hive.ql.udf.generic.GenericUDF)。其中基础的 UDF 函数支持 Hive 的基本类型(Text, IntWritable, LongWritable, Doublewritable等),复杂的 GenericUDF 可以处理 Map, List, Set 类型。我这里提供了两种类型的实现方式,具体代码如下。

继承 UDF 的实现方式:

/**
 * 公众号: 数据开发者
 * @author Data Developer
 * 该函数用于对字符串表达式进行计算
 */
@SuppressWarnings("deprecation")
public class FomulaUDF extends UDF{
 
 private BigDecimal result;
 
 private ScriptEngineManager sem = new ScriptEngineManager();
 

 public BigDecimal evaluate(String str){
        
  ScriptEngine engine = sem.getEngineByName("JavaScript");
  result = new BigDecimal(0);
  
  try {
   
   Double eval = Double.valueOf(engine.eval(str).toString());
   result = new BigDecimal(eval);
   
  } catch (ScriptException e) {
   
   e.printStackTrace();
  }
  
  return result;
    }  

}

继承 GenericUDF 的实现方式如下:

/**
 * 公众号: 数据开发者
 * @author Data Developer
 * 该函数用于对字符串表达式进行计算
 */
public class FormulaGenericUDF extends GenericUDF {

 private ScriptEngineManager sem = new ScriptEngineManager();
 private StringObjectInspector formula;
 private HiveDecimalWritable result;

 @Override
 public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

  if (arguments.length != 1) {
   throw new UDFArgumentLengthException("eval only takes 1 argument: String");
  }
  // 1. 检查是否接收到正确的参数类型
  ObjectInspector input = arguments[0];

  if (!(input instanceof StringObjectInspector)) {
   throw new UDFArgumentException("first argument must be a string");
  }

  this.formula = (StringObjectInspector) input;

  // 返回类型是 HiveDecimal,所以我们提供了正确的 HiveDecimalObjectInspector
  return PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector;
 }

 @Override
 public Object evaluate(DeferredObject[] arguments) throws HiveException {

  String formulaStr = formula.getPrimitiveJavaObject(arguments[1].get());
  ScriptEngine engine = sem.getEngineByName("JavaScript");

  result = new HiveDecimalWritable(HiveDecimal.ZERO);

  try {

   Double eval = Double.valueOf(engine.eval(formulaStr).toString());

   result = new HiveDecimalWritable(HiveDecimal.create(eval));

  } catch (ScriptException e) {
   e.printStackTrace();
  }

  return result;
 }

 @Override
 public String getDisplayString(String[] children) {

  return "eval(str)";
 }

}

5),Maven 打包,在 target 目录下得到 UDF Jar 包,打包命令如下:

(base) leik@liuleideiMac HiveUDF % mvn clean package -U
[INFO] Scanning for projects...
[INFO] 
[INFO] ---------------------< com.data.developer:HiveUDF >---------------------
[INFO] Building HiveUDF 1.0.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ HiveUDF ---
[INFO] Deleting /Users/leik/Workspace/BigData/HiveUDF/HiveUDF/target
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ HiveUDF ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Users/leik/Workspace/BigData/HiveUDF/HiveUDF/src/main/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ HiveUDF ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 3 source files to /Users/leik/Workspace/BigData/HiveUDF/HiveUDF/target/classes
[WARNING] /Users/leik/Workspace/BigData/HiveUDF/HiveUDF/src/main/java/com/data/developer/hiveudf/FormulaUDF.java: 某些输入文件使用或覆盖了已过时的 API。
[WARNING] /Users/leik/Workspace/BigData/HiveUDF/HiveUDF/src/main/java/com/data/developer/hiveudf/FormulaUDF.java: 有关详细信息, 请使用 -Xlint:deprecation 重新编译。
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ HiveUDF ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Users/leik/Workspace/BigData/HiveUDF/HiveUDF/src/test/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ HiveUDF ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /Users/leik/Workspace/BigData/HiveUDF/HiveUDF/target/test-classes
[INFO] 
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ HiveUDF ---
[INFO] Surefire report directory: /Users/leik/Workspace/BigData/HiveUDF/HiveUDF/target/surefire-reports
Downloading from alimaven: https://maven.aliyun.com/repository/public/org/apache/maven/surefire/surefire-junit3/2.12.4/surefire-junit3-2.12.4.pom
Downloaded from alimaven: https://maven.aliyun.com/repository/public/org/apache/maven/surefire/surefire-junit3/2.12.4/surefire-junit3-2.12.4.pom (0 B at 0 B/s)
Downloading from alimaven: https://maven.aliyun.com/repository/public/org/apache/maven/surefire/surefire-providers/2.12.4/surefire-providers-2.12.4.pom
Downloaded from alimaven: https://maven.aliyun.com/repository/public/org/apache/maven/surefire/surefire-providers/2.12.4/surefire-providers-2.12.4.pom (0 B at 0 B/s)
Downloading from alimaven: https://maven.aliyun.com/repository/public/org/apache/maven/surefire/surefire-junit3/2.12.4/surefire-junit3-2.12.4.jar
Downloaded from alimaven: https://maven.aliyun.com/repository/public/org/apache/maven/surefire/surefire-junit3/2.12.4/surefire-junit3-2.12.4.jar (0 B at 0 B/s)

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.data.developer.hiveudf.AppTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.004 sec

Results :

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ HiveUDF ---
[INFO] Building jar: /Users/leik/Workspace/BigData/HiveUDF/HiveUDF/target/HiveUDF-1.0.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  2.575 s
[INFO] Finished at: 2022-08-18T22:50:48+08:00
[INFO] ------------------------------------------------------------------------

项目发布

1),上传 UDF Jar 包到 Hive 服务器;

hive> add jar /u01/app/udf/HiveUDF-1.0.0.jar;
Added [/u01/app/udf/HiveUDF-1.0.0.jar] to class path
Added resources: [/u01/app/udf/HiveUDF-1.0.0.jar]

2),创建 UDF 临时函数,临时函数只在 hive 当前会话有效,会话结束,函数也就不会存在了。函数创建后,再测试 UDF 函数;

使用第一种继承方法创建 UDF

hive> add jar /u01/app/udf/HiveUDF-1.0.0.jar;
Added [/u01/app/udf/HiveUDF-1.0.0.jar] to class path
Added resources: [/u01/app/udf/HiveUDF-1.0.0.jar]
hive> create temporary function udf_eval as 'com.data.developer.hiveudf.FormulaUDF';
OK
Time taken: 0.434 seconds
hive> select udf_eval('3*2'); 
OK
6.0
Time taken: 1.516 seconds, Fetched: 1 row(s)
hive> 

使用第二种继承方法创建 UDF

hive> create temporary function udf_eval2 as 'com.data.developer.hiveudf.FormulaGenericUDF';
OK
Time taken: 0.015 seconds
hive> select udf_eval2('2*3*5');
OK
30
Time taken: 1.52 seconds, Fetched: 1 row(s)

--测试样例数据如下:
hive> select formula,udf_eval2(formula) from udf_exmaple;
OK
1*2*19/2        19.000000000000000000
Time taken: 0.166 seconds, Fetched: 1 row(s)
hive> 

3),我们还可以上传 UDF Jar 包到 HDFS 文件夹,再使用 HDFS 上的 Jar 包创建函数,具体操作如下:

[hadoop@node02 udf]$ hadoop dfs -mkdir /user/hive/udf/jar
WARNING: Use of this script to execute dfs is deprecated.
WARNING: Attempting to execute replacement "hdfs dfs" instead.

[hadoop@node02 udf]$ hadoop dfs -put ./HiveUDF-1.0.0.jar /user/hive/udf/jar/
WARNING: Use of this script to execute dfs is deprecated.
WARNING: Attempting to execute replacement "hdfs dfs" instead.

[hadoop@node02 udf]$ hadoop dfs -ls /user/hive/udf/jar
WARNING: Use of this script to execute dfs is deprecated.
WARNING: Attempting to execute replacement "hdfs dfs" instead.

Found 1 items
-rw-r--r--   3 hadoop supergroup       5858 2022-08-19 06:15 /user/hive/udf/jar/HiveUDF-1.0.0.jar

创建 UDF 函数如下:

--创建临时函数
hive> create temporary function udf_example as 'com.data.developer.hiveudf.FormulaGenericUDF' using jar 'hdfs://hadoop33ha/user/hive/udf/jar/HiveUDF-1.0.0.jar';
Added [/u01/app/hive-3.1.2/iotmp/HiveUDF-1.0.0.jar] to class path
Added resources: [hdfs://hadoop33ha/user/hive/udf/jar/HiveUDF-1.0.0.jar]
OK
Time taken: 0.317 seconds

--创建永久函数
hive> create function udf_eval as 'com.data.developer.hiveudf.FormulaGenericUDF' using jar 'hdfs://hadoop33ha/user/hive/udf/jar/HiveUDF-1.0.0.jar';
Added [/u01/app/hive-3.1.2/iotmp/HiveUDF-1.0.0.jar] to class path
Added resources: [hdfs://hadoop33ha/user/hive/udf/jar/HiveUDF-1.0.0.jar]
OK
Time taken: 0.046 seconds

--测试永久函数
hive> select udf_eval('3*5*8/4');
OK
30
Time taken: 1.55 seconds, Fetched: 1 row(s)
  
--退出会话重新打开 Hive,调用永久 UDF 函数,依然可以使用,但是使用 Hive 本地服务器的 Jar 包创建的永久函数则会报错 "FAILED: SemanticException [Error 10011]: Invalid function udf_eval3",这里建议使用上传 Jar 包到 HDFS 的方法创建自定义函数。

hive> select udf_eval('3*5*8/4');
Added [/u01/app/hive-3.1.2/iotmp/HiveUDF-1.0.0.jar] to class path
Added resources: [hdfs://hadoop33ha/user/hive/udf/jar/HiveUDF-1.0.0.jar]
OK
30
Time taken: 2.065 seconds, Fetched: 1 row(s)
hive> 

4),使用样例数据测试 UDF 函数

hive> select udf_eval(formula), formula from udf_exmaple;
OK
19.000000000000000000   1*2*19/2
Time taken: 1.065 seconds, Fetched: 1 row(s)

5),删除 UDF 函数,如上所说临时函数在会话窗口关闭后不再存在,永久函数创建后一直存在,在不需要的情况下使用如下语句删除。

hive> drop function if exists udf_eval;
OK
Time taken: 0.019 seconds

最后,我们这里使用 Hive 提供的自定义函数方法创建了一个 UDF 函数,相信你们一定对 UDF 的创建,开发,部署和使用都有了基本的了解,实际项目需求中使用的 UDF 场景肯定会存在比本篇文章的案例更复杂的场景。UDF 是离线数仓开发过程中的一把利器,但也不是万万不能的,从未来运维和系统升级情况下考虑,如果 Hive 本身提供的内置函数可以实现的情况下,我们依然推荐使用 Hive 内置函数。最后谢谢大家的关注和阅读,如果感觉本篇文章对你未来的工作有帮助,还请点赞收藏转发。
alt

往期文章

数据仓库系列

数据仓库之维度表

数据仓库之日期维度表

Hive 系列

Hive 必知必会(一)介绍

Hive 必知必会(二)基本操作

Hive 必知必会(三)基本操作(续)

一文读懂 Hive Explain 执行计划

SQL Server 优化

一文读懂 SQL Server 执行计划

SQL 开发中的十个高级用法

本文由 mdnice 多平台发布

标签: 后端

本文转载自: https://blog.csdn.net/weixin_56046673/article/details/126478183
版权归原作者 Data-Developer 所有, 如有侵权,请联系我们删除。

“Hive 自定义函数”的评论:

还没有评论