一、hive优化
要针对hive优化,首先要明白hive的本质和运行慢的原因
hive 的本质是:hdfs+MapReduce
hive 运行速度慢的原因大致有如下几个:
1、数据倾斜倾斜:抽样
1.1、内容倾斜
1.2、group by
1.3、小表 join 大表
2、连接操作过多
2.1、join过多导致job过多
2.2、小文件过多
2.3、Mapper或Reducer过多
3、使用不当
3.1、count(distinct)
3.2、join ... on ... where
3.3、select sum(field) from TABLE;
首要最好从源头,hive的模型设计上考虑。整体最优,考虑全局的情况下,合理减少表数量。数据建模上推荐使用【星型】,雪花,星座不宜使用。
使用维度表存放静态数据,事实表(动态数据:4W1H)
维度退化 => 星型
sqoop|maxwell|cancal : query "select ... join ..."
ods -> dwd insert into ... select ... join ...
充分了解业务,提前设计好预聚合
分层=>轻量聚合
分区=>避免交换
on... where...
分桶=>拉链表(分桶表)、抽样
压缩=>减少体量
配置压缩格式,表存储格式,查看是否支持压缩,是否支持切片。
hadoop 内存管理
# mapred
set mapreduce.map.memory.mb=256;
set mapreduce.reduce.memory.mb=512;
set mapreduce.map.java.opts=?
set mapreduce.reduce.java.opts=?
# yarn
set yarn.nodemanager.resource.memory-mb=-1;
set yarn.scheduler.minimum-allocation-mb=1024;
set yarn.scheduler.maximum-allocation-mb=8192;
数据倾斜(内置自动优化)
可能触发的原因:
连接字段在连接表之间分布不均,或缺乏连接关系
【连接键】的值分布不均:值集中的少数分区,其他分区没有值
某张表中数据分布不均
手动处理方案:
1、手动选择合适的连接键
2、连接键拆分或随机映射
3、引入hash分区或分桶使得数据分布均衡
4、增加或减少任务的并行度
hive自动处理配置项
默认false,如果join键倾斜则设为true
set hive.optimize.skewjoin=true;
# 默认join键对应的记录数超过该值则进行倾斜分析
set hive.skewjoin.key=100000;
# 默认10000,设置倾斜处理mapper数量,过大会导致过多的任务切割和额外的开销,过小会导致不能优化
set hive.skewjoin.mapjoin.map.tasks=10000;
# 默认32M,倾斜最小切片大小,配合上一项使用,避免Mapper数量过多
set hive.skewjoin.mapjoin.min.split=32M;
# map join: 大小表
# 默认true,即默认自动开启 mapjoin
set hive.auto.convert.join=true;
# 默认小表<=25M
set hive.mapjoin.smalltable.filesize=25M;
# 默认false,分桶表表mapjoin专用
set hive.optimize.bucketmapjoin=true;
# combiner: #默认true,即默认开启Mapper端聚合
set hive.map.aggr=true;
# groupby:HashPartitioner
# 默认-1,倾斜的倍数(倾斜度) n = 倾斜数据总均量/其他数据总均量 + 其他数据的差异数
# 抽样(tablesample(bucket COUNT outof TOTAL))确定是偶倾斜与倾斜程度
set mapreduce.job.reduces=n; (见下面 Reducer 数量控制)
# 默认false
set hive.groupby.skewindata=true;
Map或Reduce输出过多小文件合并
# 若满足以下设置条件,任务结束后会单起MapReduce对输出文件进行合并
# 默认为true,map-only输出是否合并
set hive.merge.mapfiles=true;
# 默认为false,mapreduce输出是否合并
set hive.merge.mapredfiles=true;
# 默认256M,合并文件操作阈值,如果输入数据超过256M,则触发合并操作
set hive.merge.size.per.task=256M;
# 默认16M,合并文件平均大小小于该阈值则将他们合并为大文件
set hive.merge.smallfiles.avgsize=16M;
控制Mapper和Reducer数量,mapper的启动和初始化开销较大,【数量过多】导致开销大于逻辑处理,浪费资源
Mapper端设置
# 默认的Mapper数量
int default_num = total_file_size/dfs.block.size;
#默认为2, 只有大于2时才会生效
set mapred.map.tasks=2; #旧版
set mapreduce.job.maps=2; #新版
#Mapper数量有限值
Math.max(min.split.size,Math.min(dfs.block.size,max.split.size))
# 默认128M
set dfs.block.size=128M;
# 默认单个Mapper处理数据上限256M
set mapred.max.split.size=256M;
# 默认1字节
set mapred.min.split.size=1;
# 默认单个节点处理的数据下限1字节
set mapred.min.split.size.per.node=1;
# 默认单个机架处理的数据下限1字节
set mapred.min.split.size.per.rack=1;
# Mapper输入多个小文件合并后再切片
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
Mapper切片的大小
表数据文件过大设置:
set mapred.min.split.size = N;
需要综合考虑Yarn的内存权限,分布式计算的均衡,若表A单行内容量大,且处理逻辑复杂,需要将文件拆分(列裁剪,行筛选),将数据通过分区表拆分成更小粒度
set mapreduce.job.reduces=3;
create table A_SPLITS as
select * from A distribute by rand(3);
Reducer端设置
# 默认-1,可以根据需要在客户端设置 :
int n = Math.min(SIZE/bytes.per.reducer, reducers.max) | num_partitions
set mapred.reduce.tasks=n; # 旧版本使用
set mapreduce.job.reduces=n; # 新版本使用 ✔
# 若存在数据倾斜,则 Hive 会单独分配 Reducer 处理倾斜数据
# 若未设置 Reducer 数量,自动计算 Reducer 数量
# 默认每个Reducer的数据为256M
set hive.exec.reducers.bytes.per.reducer=256M;
# 默认单个任务最大Reducer数量
set hive.exec.reducers.max=1009;
# Reducer只能为1的情况
# 没有group by,直接使用sum,count,max,min,avg,collect_list,concat_ws等聚合函数
#优化方案
select sum(sum_a) from (select sum(a) from A group by STH)T
# 使用了order by
#优化方案
#group by 同键分组且必须聚合,distribute by 仅按键分散数据
#orderby 全局排序仅1个Reducer,sort by 仅分区内有序
set mapreduce.job.reduces=N;
select * from (select * from A distribute by a sort by a) order by a;
# 存在笛卡尔积,尽量不用
# 若出现小表+大表的笛卡尔积
# 小表扩展join key,并根据需求复制 DN_COUNT 份
# 大表扩展join key,根据 DN_COUNT 随机生成
# 关闭自动mapjoin : set hive.auto.convert.join=false
# 设置reducer的数量为:set mapreduce.job.reduces=DN_COUNT
减少数据规模, 调整存储格式
# create 建表的默认格式和分区表
#默认TextFile, 可选:orc,RCFile,SequenceFile
set hive.default.fileformat=orc;
# 提升IO性能,但会增加CPU压力
# Mapper压缩
# 开启map输出压缩功能;默认false
set mapreduce.map.output.compress=true;
# 设置map输出数据的压缩方式;默认DefaultCodec
set mapreduce.map.output.compress.codec =
org.apache.hadoop.io.compress.SnappyCodec;
# 默认false,任务过程输出是否压缩
set hive.exec.compress.intermediate=true;
# Reducer压缩
# 开启Reducer输出压缩功能;默认false
set hive.exec.compress.output=true;
# reduce最终输出数据压缩;默认false
set mapreduce.output.fileoutputformat.compress=true;
# reduce最终数据输出压缩为块压缩;默认RECORD
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
# reduce最终数据输出压缩方式;默认DefaultCodec
set mapreduce.output.fileoutputformat.compress.codec =
org.apache.hadoop.io.compress.SnappyCodec;
动态分区设置
# 默认开启
set hive.exec.dynamic.partition=true;
# 默认strict
set hive.exec.dynamic.partition.mode=nonstrict;
# 默认最大动态分区数1000
set hive.exec.max.dynamic.partitions=1000;
# 默认单节点最大动态分区数100
set hive.exec.max.dynamic.partitions.pernode=100;
# 动态添加多分区数据(需要一张源数据表)
insert into table TABLE_PARTITION partition(partition_field)
select *, partition_field from TABLE_SOURCE where ...;
# 静态分区数据挂载
load data [local] inpath 'DATA_PATH'
[overwrite|into] table TABLE_PARTITION partition(partition_field=VALUE);
# 查看分区
show partitions TABLE_PARTITION;
# 添加分区
alter table TABLE_PARTITION add partition(partition_field=VALUE);
# 删除分区
alter table TABLE_PARTITION drop partition(partition_field=VALUE);
尽量不使用distinct
# count(distinct)
# 不妥:select count(distict b) from TAB group by a
# 稳妥:select count(b) from (select a,b from TAB group by a,b) group by a
# CBO (COST BASED OPTIMIZER)
# 默认true
set hive.cbo.enable=true;
分区裁剪:
以 on,where 多条件字段顺序,建【多重】分区表,默认开启支持,以分区字段为条件筛选数据,tez引擎:动态分区剪裁支持
# 默认mr, tez|spark DAG
set hive.execution.engine=tez;
并行执行无依赖job
# 默认false(关闭)
set hive.exec.parallel=true;
# 默认8,最大并行任务数
set hive.exec.parallel.thread.number=8;
JVM重用 : hive3已经取消
配置依赖hadoop mapred-site.xml
#每个jvm运行的任务数
set mapreduce.job.jvm.numtasks = 8;
本地化运算
# 默认1,启动本地化模式reducer数量必须为0|1
set mapreduce.job.reduces=0/1;
# 默认 yarn
set mapreduce.framework.name=local;
# 开启自动本地化模式
set hive.exec.mode.local.auto=true;
# 默认4,本地化文件数量上限
set hive.exec.mode.local.auto.input.files.max=4;
# 默认128M,本地化文件大小上限
set hive.exec.mode.local.auto.inputbytes.max=128M;
# 可能会导致内存溢出:java.lang.OutofMemoryError : java heap space
# 修改 mv hive-env.sh.template hive-env.sh
# 去掉注释
------------------------------
# export HADOOP_HEAPSIZE=1024
------------------------------
# llap
# 默认container,2.0之后扩展此项,llap可选
set hive.execution.mode=llap;
# llap为DataNode常驻进程,混合模型,小型任务可以由llap解决,大任务由yarn容器执行
# fetch
# 默认more,简单查询不走mr,直接提取
set hive.fetch.task.conversion=more;
开启谓词(筛选条件表达式)下推(见以下表格)
set hive.optimize.ppd=true;
join中谓词:on
join后谓词:where
左右外连接
主表:全部显示, on条件不能下推,where可以下推
从表:不存在以NULL填充,where不能下推,on可以下推
内连接:on和where都下推
全外连接:on和where都不下推
谓词下推 :过滤条件最接近数据源,换言之即先筛选掉无关数据再做其他处理
select * from A left join B on A.id=B.id where A.id=1;
select * from (select * from A where A.id=1)T left join B on T.id=B.id;
select * from A left join B on A.id=B.id and A.id=1;
whether pushinner joinleft joinright joinfull joinleftrightleftrightleftrightleftrightonyesyesnoyesyesnononowhereyesyesyesnonoyesnono
版权归原作者 inori1256 所有, 如有侵权,请联系我们删除。