文档地址:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform
一、介绍
和udf差不多的作用,支持用python实现。通过标准输入流从hive读取数据,内部处理完再通过标准输出流将处理结果返回给hive。实现流程上比udf要更简单灵活一些,只需要上传脚本=>add file加载到分布式缓存=>使用。
二、实现
先定义一个名为
transform.py
的脚本,将传入的两个字段值都+1。
#!/usr/bin/env pythonimport sys
for line in sys.stdin:try:
x, y =map(float, line.strip().split('\t'))
x +=1
y +=1print('\t'.join(map(str,[x, y])))except ValueError as e:print('\t'.join([r'\N']*2))
上面对输入流按照
\t
分隔是因为hive中的数据在传递到py脚本时,多个字段间默认会用
\t
分隔拼接为字符串,并且空值null会被转为字符串
\N
。同样将处理结果返回给hive时,如果多个字段,为了hive能够正确解析,也需要用
\t
拼接输出,单独的
\N
在hive中也会被重新解释为null。
除了单独的\N
会被重新解释为null外,还有一种情况也会被hive解释为null,就是脚本里返回的字段个数小于hive中接收的字段个数时,hive中多余的字段会被赋值为null。
1.脚本上传到本地
这里的本地指的是hive主服务hive server2所在的节点,也就是我们客户端连接的那个机器。
先上传到主服务机器下的某个路径:
# 文件上传路径[root@node1 HiveLib]# readlink -e transform.py
/root/HiveLib/transform.py
上传后通过add file命令将脚本添加到分布式缓存,之后就可以直接使用了。
-- 添加到分布式缓存addfile/root/HiveLib/transform.py;-- 创建一个临时表测试执行with`table`as(select'1'as id,'1.6789'as col1,'7.13'as col2
unionallselect'2'as id,'11.568'as col1,nullas col2
unionallselect'3'as id,'26.09761'as col1,'71.89002'as col2
)-- as后面接收脚本返回值的字段也可以指定字段类型, eg:(col1 double, col2 double), 省略时默认都是字符串string类型select transform (col1, col2)using'transform.py'as(col1, col2)from`table`;
2.脚本上传到hdfs
这种方式和本地实现基本一致,只不过需要将脚本上传到hdfs中,add file时后面跟的是hdfs路径。
[root@node1 HiveLib]# hadoop fs -put ./transform.py /user/hive/lib[root@node1 HiveLib]# hadoop fs -ls /user/hive/lib
Found 2 items
-rw-r--r-- 3 root supergroup 41642022-12-18 00:48 /user/hive/lib/hive_udf-1.0-SNAPSHOT.jar
-rw-r--r-- 3 root supergroup 2572024-05-05 19:13 /user/hive/lib/transform.py
sql客户端中执行:
-- 脚本路径换为hdfs路径addfile hdfs://node1:8020/user/hive/lib/transform.py;with`table`as(select'1'as id,'1.6789'as col1,'7.13'as col2
unionallselect'2'as id,'11.568'as col1,nullas col2
unionallselect'3'as id,'26.09761'as col1,'71.89002'as col2
)select transform (col1, col2)using'transform.py'as(col1, col2)from`table`;
三、几个需要注意的点
1.脚本名不要写全路径
using语句后面指定脚本只写脚本名即可,不要写全路径。全路径的话会报错
[08S01][20000] Error while processing statement: FAILED: Execution Error, return code 20000 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. Unable to initialize custom script.
2.using后面语句中,带不带"python"的问题
这里说的是sql语句中,是
using 'transform.py'
还是
using 'python transform.py'
的问题。可以不带python这个关键字,但是前提脚本中必须指定了Shebang,类似于
#!/usr/bin/env python
这样,指定脚本的解释器。如果指定Shebang,using后面带不带python都可以,如果脚本中没指定,using后面必须带python这个关键字,否则报错。
看到有人说需要给py脚本
chmod +x transform.py
赋予可执行权限,实际操作中经过验证本地和hdfs都不需要。
3.py脚本Shebang:#!/usr/bin/env python
Shebang(也称为Hashbang)是一个源于Unix系统中的概念,特别是在类Unix操作系统中广泛使用。它是指脚本文件第一行以#!开头的特殊注释行,用于指定该脚本应该由哪个解释器程序来执行。这个名称来源于这两个起始字符—井号(#)和叹号(!)。
主要解释下
#!/usr/bin/env python
和
#!/usr/bin/python
的区别。两者都是用来指定该脚本的解释器,但是前者比后者有更好的兼容性,可以理解为:后者是指定了一个固定的解释器路径,虽然多数情况下遵循规范解释器路径会在该目录下,但是并不能保证一定存在。而前者逻辑上等价于
env | grep python
,它是从当前所有的环境变量中按照一定的优先级顺序去找python解释器,最先找到哪个就用哪个执行,所以可以有效避免路径指定错误的问题,推荐前面这种写法。
[root@node1 HiveLib]# which python
/root/anaconda3/bin/python
[root@node1 HiveLib]# which env
/usr/bin/env
[root@node1 HiveLib]# env | grep pythonCONDA_PYTHON_EXE=/root/anaconda3/bin/python
4.通过约定增强脚本的通用性
hive transform脚本相比udf有个缺点,就是select中如果使用了transform就不能再选择其它字段了,而实际业务需求每次处理的字段并不总是固定。为了避免每次需求变化而导致需要修改脚本,可以在调用时通过通配符
*
将外部所有参数传递给脚本,脚本处理完再将所有参数返回。这样即使字段发生变化,也只需要在外部sql中动态调整接收脚本返回值的列名即可。这个过程中需要处理的字段在输入输出时只需要按照约定放在一个指定的位置就行了。
select transform (*) using 'transform.py' as (col1, col2, ...) from `table`;
版权归原作者 atwdy 所有, 如有侵权,请联系我们删除。