点一下关注吧!!!非常感谢!!持续更新!!!
Java篇开始了!
目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(已更完)
- Druid(已更完)
- Kylin(已更完)
- Elasticsearch(已更完)
- DataX(已更完)
- Tez(已更完)
- 数据挖掘(已更完)
- Prometheus(已更完)
- Grafana(已更完)
- 离线数仓(正在更新…)
章节内容
上节我们完成了如下的内容:
- ODS层的构建 Hive处理
- JSON 数据处理 结构化
JSON 数据处理
续接上节,上节到了内建函数。
使用UDF处理
自定义UDF处理JSON串中的数组,自定义UDF函数:
- 输入:JSON串、数组的Key
- 输出:字符串数组
UDF(User Defined Function)
UDF 是用户定义的函数,用于扩展大数据处理系统的功能。通过 UDF,用户可以实现特定的业务逻辑,用于数据的转换或计算。
UDF 的特点
- 扩展性:Hive 等工具提供内置函数,但 UDF 允许用户实现自定义逻辑,满足复杂需求。
- 灵活性:UDF 可以用多种编程语言实现(Java、Python 等)。
多种类型:
- 普通 UDF:用于单行输入的计算,返回一个值。
- UDAF(User Defined Aggregation Function):用户定义的聚合函数,处理多行数据并返回单个结果。
- UDTF(User Defined Table-generating Function):用户定义的表生成函数,处理单行输入并输出多行数据。
导入依赖
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>2.3.7</version><scope>provided</scope></dependency>
编写代码
packageicu.wzk;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONArray;importcom.alibaba.fastjson.JSONException;importcom.alibaba.fastjson.JSONObject;importorg.apache.hadoop.hive.ql.exec.UDF;importorg.apache.parquet.Strings;importjava.util.ArrayList;publicclassParseJsonArrayextends UDF {publicArrayList<String>evaluate(String jsonStr,String arrKey){if(Strings.isNullOrEmpty(jsonStr)){returnnull;}try{JSONObject object = JSON.parseObject(jsonStr);JSONArray jsonArray = object.getJSONArray(arrKey);ArrayList<String> result =newArrayList<>();for(Object o : jsonArray){
result.add(o.toString());}return result;}catch(JSONException e){returnnull;}}}
打包代码
mvn clean package;
上传:“hive-parse-json-array-1.0-SNAPSHOT-jar-with-dependencies.jar”文件到指定的目录下。
测试函数
使用自定义UDF函数:
-- 启动hive-- 添加自定义的jar包add jar /opt/wzk/hive-parse-json-array-1.0-SNAPSHOT-jar-with-dependencies.jar;-- 创建临时函数,指定类需要完整的路径CREATEtemporaryfunction wzk_json_array AS"icu.wzk.ParseJsonArray";
执行结果如下图所示:
运行函数进行解析:
-- 执行查询进行测试
SELECT
username, age, sex, wzk_json_array(json, "ids") ids
FROM jsont1;
运行结果如下图所示:
-- 解析json串中的数组,并展开SELECT
username, age, sex, ids1
FROM jsont1
lateral view explode(wzk_json_array(json,"ids")) t1 AS ids1;
运行结果如下图所示:
-- 解析json串中的数组,并展开SELECT
username, age, sex, ids1, id, num
FROM jsont1
lateral view explode(wzk_json_array(json,"ids")) t1 AS ids1
lateral view json_tuple(json,'id','total_number') t1 AS id, num;
运行结果如下图所示:
使用SerDe处理
SerDe(Serializer and Deserializer)
SerDe 是序列化与反序列化的缩写,用于定义数据的读写方式。在大数据框架中,数据通常以结构化或非结构化形式存储,SerDe 用于将这些数据转化为系统可以理解的格式,或从系统中导出成所需格式。
SerDe 的作用
- 反序列化:将存储中的字节流(例如文件)解析成 Hive 表中的行数据。
- 序列化:将 Hive 表中的行数据转换为存储格式(如 JSON、CSV、Avro 等)。
- 支持自定义数据格式:当 Hive 的内置格式不满足需求时,可以编写自定义 SerDe。
基本信息
序列化是对象转换为字节序列的过程,反序列化是字节序列恢复为对象的过程,对象的序列化主要有两种用途:
- 对象的序列化,即把对象转换为字节序列后保存到文件中
- 对象数据的网络传送
SerDe是Serializer和Deserilizer的简写形式,Hive使用Serde进行行对象的序列化与反序列化,最后实现把文件内容映射到Hive表中的字段数据类型。SerDe包括Serialize、Deserilize两个功能:
- Serializa把Hive使用的JavaObject转换成能写入HDFS字节序列,或者其他系统能识别的流文件
- Deserilize把字符串或者二进制流转换成Hive能识别的JavaObject对象
Read:HDFS Files => InputFileFormat => <Key, Value> => Deserializer => Row Object
Write: Row Object => Serializer => <Key, Value> => OutputFormat => HDFS files
Hive本身自带了几个内置的SerDe,还有其他一些第三方的SerDe可供选择。
CREATETABLE t11(id string)
stored AS parquet;CREATETABLE t12(id string)
stored AS ORC;DESC formatted t11;DESC formatted t12;
创建数据
对于纯JSON格式的数据,可以使用JsonSerDe来处理:
vim /opt/wzk/json2.dat
写入内容如下所示:
{"id":1,"ids":[101,102,103],"total_number":3}{"id":2,"ids":[201,202,203,204],"total_number":4}{"id":3,"ids":[301,302,303,304,305],"total_number":5}{"id":4,"ids":[401,402,403,304],"total_number":5}{"id":5,"ids":[501,502,503],"total_number":3}
写入的数据如下所示:
进行测试
我们先启动Hive
hive
然后执行SQL进行测试:
-- 创建表数据CREATETABLE jsont2(
id int,
ids array<string>,
total_number int)ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';-- 加载数据loaddatalocal inpath '/opt/wzk/json2.dat'intotable jsont2;
执行结果如下图所示:
最后小结
各种JSON格式处理的小结:
- 简单格式的JSON数据,使用 get_json_object、json_tuple处理
- 对于嵌套数据类型,可以使用UDF
- 纯JSON串可使用JsonSerDe处理更简单
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。