0


Hive on spark源码编译与调优

文章目录

一、编译环境准备

1、hadoop和hive安装

hive官网版本依赖:https://hive.apache.org/general/downloads/

这里都是用了hadoop3.1.3和hive3.1.3版本,具体的安装可以参考之前的文章

2、编译环境搭建

使用了ubuntu20作为的编译环境

# ==================安装jdk===========# 卸载现有JDK# centos 操作# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps# 下面是ubuntusudoapt-get remove openjdk-8-jre-headless

# 将JDK上传到虚拟机的/opt/software文件夹下面tar-zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
# 配置JDK环境变量sudovim /etc/profile.d/my_env.sh
#JAVA_HOMEexportJAVA_HOME=/opt/module/jdk1.8.0_212
exportPATH=$PATH:$JAVA_HOME/bin

# 让环境生效source /etc/profile.d/my_env.sh
# 查看是否成功java-version# ==================安装maven========wget https://dlcdn.apache.org/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
# 解压Maven到/opt/module目录下tar-zxvf apache-maven-3.6.3-bin.tar.gz -C /opt/module/
# 配置Maven环境变量sudovim /etc/profile.d/my_env.sh
# MAVEN_HOMEexportMAVEN_HOME=/opt/module/apache-maven-3.6.3
exportPATH=$PATH:$MAVEN_HOME/bin

# 让环境变量生效source /etc/profile.d/my_env.sh
mvn -version# 配置仓库镜像vim /opt/module/apache-maven-3.6.3/conf/settings.xml
# 在<mirrors></mirrors>节点中增加以下内容 <mirror><id>aliyunmaven</id><mirrorOf>central</mirrorOf><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></mirror># ====================安装Git====================# 这是对于centos的sudo yum install https://repo.ius.io/ius-release-el7.rpm https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
sudo yum install-y git236
# 对于ubuntu来说,甚至可以自带sudoapt-getinstallgit# =====================安装IDEA=======================wget https://download.jetbrains.com/idea/ideaIU-2021.1.3.tar.gz
# 解压IDEA到/opt/module目录下tar-zxvf ideaIU-2021.1.3.tar.gz -C /opt/module/
# 启动IDEA(在图形化界面启动)nohup /opt/module/idea-IU-211.7628.21/bin/idea.sh 1>/dev/null 2>&1&# 配置Maven,配置好maven

3、Hive on Spark配置

# 1、Spark官网下载jar包地址,在Hive所在节点部署Spark纯净版
http://spark.apache.org/downloads.html
# 上传并解压解压spark-3.1.3-bin-without-hadoop.tgztar-zxvf spark-3.1.3-bin-without-hadoop.tgz -C /opt/module/
mv /opt/module/spark-3.1.3-bin-hadoop3 /opt/module/spark
# 修改spark-env.sh配置文件mv /opt/module/spark/conf/spark-env.sh.template /opt/module/spark/conf/spark-env.sh
# 增加如下内容exportSPARK_DIST_CLASSPATH=$(hadoop classpath)# 2、配置SPARK_HOME环境变量sudovim /etc/profile.d/my_env.sh
# 添加如下内容# SPARK_HOMEexportSPARK_HOME=/opt/module/spark
exportPATH=$PATH:$SPARK_HOME/bin

# 生效source /etc/profile.d/my_env.sh

# 3、在hive中创建spark配置文件vim /opt/module/hive/conf/spark-defaults.conf
spark.master                               yarn
spark.eventLog.enabled                   true
spark.eventLog.dir                        hdfs://hadoop102:8020/spark-history
spark.executor.memory                    1g
spark.driver.memory             1g

# 在HDFS创建如下路径,用于存储历史日志
hadoop fs -mkdir /spark-history

向HDFS上传Spark纯净版jar包

  • 说明1:由于Spark3.0.0非纯净版默认支持的是hive2.3.7版本,直接使用会和安装的Hive3.1.2出现兼容性问题。所以采用Spark纯净版jar包,不包含hadoop和hive相关依赖,避免冲突。
  • 说明2:Hive任务最终由Spark来执行,Spark任务资源分配由Yarn来调度,该任务有可能被分配到集群的任何一个节点。所以需要将Spark的依赖上传到HDFS集群路径,这样集群中任何一个节点都能获取到。
# 上传并解压spark-3.0.0-bin-without-hadoop.tgztar-zxvf /opt/software/spark-3.0.0-bin-without-hadoop.tgz
# 上传Spark纯净版jar包到HDFS
hadoop fs -mkdir /spark-jars
hadoop fs -put spark-3.0.0-bin-without-hadoop/jars/* /spark-jars

# 修改hive-site.xml文件vim /opt/module/hive/conf/hive-site.xml
<!--Spark依赖位置(注意:端口号8020必须和namenode的端口号一致)--><property><name>spark.yarn.jars</name><value>hdfs://hadoop102:8020/spark-jars/*</value></property><!--Hive执行引擎--><property><name>hive.execution.engine</name><value>spark</value></property>

Hive on Spark测试

# 启动hive客户端
bin/hive
# 创建一张测试表
hive (default)> create table student(id int, name string);# 通过insert测试效果
hive (default)> insert into table student values(1,'abc');

二、Hive相关问题

1、Hadoop和Hive的兼容性问题

1.1 问题描述

配置好3.1.3版本之后,启动hive会报错

java.lang.NoSuchMethodError: 
com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

上述问题是由Hadoop3.1.3版本所依赖的guava-27.0-jre和Hive-3.1.3版本所依赖的guava-19.0不兼容所致

1.2 解决思路

  • 更换Hadoop版本 经过观察发现,Hadoop-3.1.0,Hadoop-3.1.1,Hadoop-3.1.2版本的guava依赖均为guava-11.0.2,而到了Hadoop-3.1.3版本,guava依赖的版本突然升级到了guava-27.0-jre。Hive-3的所有发行版本的guava依赖均为guava-19.0。而guava-19.0和guava-11.0.2版本是兼容的,所以理论上降低Hadoop版本,这个问题就能得到有效的解决(将hadoop的guava-27.0-jre复制到hive中也可以暂时使用)
  • 升级Hive-3.1.3中的guava依赖版本,并重新编译Hive若将Hive-3.1.3中的guava依赖版本升级到guava-27.0-jre,这样就能避免不同版本的guava依赖冲突,上述问题同样能得到解决。

1.3 修改并编译Hive源码

# Hive源码的远程仓库地址:
https://github.com/apache/hive.git
# 国内镜像地址:
https://gitee.com/apache/hive.git
# 编译官网:https://cwiki.apache.org/confluence/display/Hive/GettingStarted#GettingStarted-BuildingHivefromSource# 首先测试一下能不能成功打包
mvn clean package -Pdist-DskipTests-Dmaven.javadoc.skip=true

# 修改Maven父工程的pom.xml文件中的guava.version参数# 将<guava.version>19.0</guava.version>改为<guava.version>27.0-jre</guava.version># 不停排错,直至编译打包成功

2、Hive插入数据StatsTask失败问题

3.1 问题描述

# 启动hive客户端
bin/hive
# 创建一张测试表
create table student(id int, name string);# 执行insert语句
insert into table student values(1,'abc');# 测试发现过程发现如下错误信息
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.StatsTask
# 该问题问由Hive自身存在的bug所致,bug详情可参照以下连接:https://issues.apache.org/jira/browse/HIVE-19316

3.2 解决思路

该bug已经在3.2.0, 4.0.0, 4.0.0-alpha-1等版本修复了,所以可以参考修复问题的PR,再修改Hive源码并重新编译

3、Hive和Spark兼容性问题

3.1 问题描述

# 配置好hive on spark 后,启动hive客户端
bin/hive
insert into table student values(1,'abc');# 测试发现过程发现如下错误信息
Job failed with java.lang.NoSuchMethodError: 
org.apache.spark.api.java.JavaSparkContext.accumulator(Ljava/lang/Object;Ljava/lang/String;Lorg/apache/spark/AccumulatorParam;)Lorg/apache/spark/Accumulator;

问题是官网下载的Hive3.1.3和Spark3.0.0默认是不兼容的。因为Hive3.1.3支持的Spark版本是2.3.0,所以需要我们重新编译Hive3.1.3版本

3.2 解决思路

  • 降低Spark版本经过观察发现Hive-3.1.3,版本所兼容的Spark版本为Spark-2.3.0,故降低Spark版本便可有效解决该问题。
  • 升级Hive-3.1.3中的Spark依赖版本至Spark-3.1.3,并重新编译Hive将Hive源码中的Spark依赖版本升级为Spark-3.1.3,并修改源码,重新编译打包后,同样能解决该问题。

3.3 修改实操

# 修改Hive项目的pom.xml文件,将spark依赖的版本改为3.1.3<spark.version>2.3.0</spark.version><scala.binary.version>2.11</scala.binary.version><scala.version>2.11.8</scala.version># 将上面的依赖变为<spark.version>3.1.3</spark.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.10</scala.version># 然后将错误信息弄好即可
mvn clean package -Pdist-DskipTests-Dmaven.javadoc.skip=true

4、Hive编译实战

两篇参考文章:Hive 3.1.3 编译 / hive on spark hadoop3.x修改源码依赖

下面我说一下我编译过程遇到的问题,首先将hive源码克隆下来,git选择

checkout Tag or Revision

,选择

rel/release-3.1.3

进行编译

# 开始前需要首先编译测试一下环境
mvn clean package -Pdist-DskipTests-Dmaven.javadoc.skip=true
# 这里有可能会遇到5.1.5-jhyde没有这个jar包错误,将其放入maven仓库即可# 然后0001-guava-27.0-jre.patch/HIVE-19316.patch/spark-3_1_3.patch这三个git补丁包分别对应上面三个问题# 选择git apply即可导入比较,这个自行研究了# 同时针对HIVE-19316问题,可以在可视化的git log中搜索HIVE-19316,选择最新的commit,点击cherry-pick和打补丁一样的效果

补丁文件包,依赖包,hive3.1.2-spark3.0.0和hive3.1.3-spark3.1.3二进制包已经全部放进该压缩包

hive3.x编译spark3.x包

三、调优之Yarn和Spark配置

1、环境配置介绍

一般生产环境NN和RM吃资源少的会单独配置,而工作节点会单独配置资源较多,例如Master节点配置为16核CPU、64G内存;Workder节点配置为32核CPU、128G内存,五台服务器如下所示
hadoop100hadoop101hadoop102hadoop103hadoop104mastermasterworkerworkerworkerNameNodeNameNodeDataNodeDataNodeDataNodeResourceManagerResourceManagerNodeManagerNodeManagerNodeManagerJournalNodeJournalNodeJournalNodeZookeeperZookeeperZookeeperKafkaKafkaKafkaHiveserver2Metastorehive-clienthive-clienthive-clientSparkSparkSparkSparkDS-masterDS-masterDS-workerDS-workerDS-worderMaxwellmysqlflumeflume

2、Yarn配置

2.1 Yarn配置说明

需要调整的Yarn参数均与CPU、内存等资源有关,核心配置参数如下

  • yarn.nodemanager.resource.memory-mb

一个NodeManager节点分配给Container使用的内存。该参数的配置,取决于NodeManager所在节点的总内存容量和该节点运行的其他服务的数量。考虑上述因素,此处可将该参数设置为64G

<property><name>yarn.nodemanager.resource.memory-mb</name><value>65536</value></property>
  • yarn.nodemanager.resource.cpu-vcores

一个NodeManager节点分配给Container使用的CPU核数。该参数的配置,同样取决于NodeManager所在节点的总CPU核数和该节点运行的其他服务。考虑上述因素,此处可将该参数设置为16

<property><name>yarn.nodemanager.resource.cpu-vcores</name><value>16</value></property>
  • yarn.scheduler.maximum-allocation-mb

该参数的含义是,单个Container能够使用的最大内存。由于Spark的yarn模式下,Driver和Executor都运行在Container中,故该参数不能小于Driver和Executor的内存配置,推荐配置如下

<property><name>yarn.scheduler.maximum-allocation-mb</name><value>16384</value></property>
  • yarn.scheduler.minimum-allocation-mb

该参数的含义是,单个Container能够使用的最小内存,推荐配置如下:

<property><name>yarn.scheduler.minimum-allocation-mb</name><value>512</value></property>

2.2 Yarn配置实操

修改

$HADOOP_HOME/etc/hadoop/yarn-site.xml

文件,修改如下参数,然后分发重启yarn(注意,对于单台的话,想修改哪台资源就动对应的机器)

<property><name>yarn.nodemanager.resource.memory-mb</name><value>65536</value></property><property><name>yarn.nodemanager.resource.cpu-vcores</name><value>16</value></property><property><name>yarn.scheduler.maximum-allocation-mb</name><value>16384</value></property><property><name>yarn.scheduler.minimum-allocation-mb</name><value>512</value></property>

3、Spark配置

3.1 Executor配置说明

  • Executor CPU核数配置

单个Executor的CPU核数,由spark.executor.cores参数决定,建议配置为4-6,具体配置为多少,视具体情况而定,原则是尽量充分利用资源

此处单个节点共有16个核可供Executor使用,则spark.executor.core配置为4最合适。原因是,若配置为5,则单个节点只能启动3个Executor,会剩余1个核未使用;若配置为6,则只能启动2个Executor,会剩余4个核未使用

  • Executor内存配置

Spark在Yarn模式下的Executor内存模型如下图所示

Executor相关的参数有:

spark.executor.memory

spark.executor.memoryOverhead

spark.executor.memory

用于指定Executor进程的堆内存大小,这部分内存用于任务的计算和存储;

spark.executor.memoryOverhead

用于指定Executor进程的堆外内存,这部分内存用于JVM的额外开销,操作系统开销等。两者的和才算一个Executor进程所需的总内存大小。默认情况下spark.executor.memoryOverhead的值等于spark.executor.memory*0.1

以上两个参数的推荐配置思路是,先按照单个NodeManager的核数和单个Executor的核数,计算出每个NodeManager最多能运行多少个Executor。在将NodeManager的总内存平均分配给每个Executor,最后再将单个Executor的内存按照大约10:1的比例分配到

spark.executor.memory

spark.executor.memoryOverhead

。根据上述思路,可得到如下关系:

# (spark.executor.memory+spark.executor.memoryOverhead)= # yarn.nodemanager.resource.memory-mb * (spark.executor.cores/yarn.nodemanager.resource.cpu-vcores)# 经计算,此处应做如下配置:
spark.executor.memory    14G
spark.executor.memoryOverhead    2G

3.2 Executor个数配置

此处的Executor个数是指分配给一个Spark应用的Executor个数,Executor个数对于Spark应用的执行速度有很大的影响,所以Executor个数的确定十分重要。一个Spark应用的Executor个数的指定方式有两种,静态分配动态分配

  • 静态分配

可通过

spark.executor.instances

指定一个Spark应用启动的Executor个数。这种方式需要自行估计每个Spark应用所需的资源,并为每个应用单独配置Executor个数。

  • 动态分配

动态分配可根据一个Spark应用的工作负载,动态的调整其所占用的资源(Executor个数)。这意味着一个Spark应用程序可以在运行的过程中,需要时,申请更多的资源(启动更多的Executor),不用时,便将其释放。在生产集群中,推荐使用动态分配。动态分配相关参数如下:

#启动动态分配
spark.dynamicAllocation.enabled    true#启用Spark shuffle服务
spark.shuffle.service.enabled    true#Executor个数初始值
spark.dynamicAllocation.initialExecutors    1#Executor个数最小值
spark.dynamicAllocation.minExecutors    1#Executor个数最大值
spark.dynamicAllocation.maxExecutors    12#Executor空闲时长,若某Executor空闲时间超过此值,则会被关闭
spark.dynamicAllocation.executorIdleTimeout    60s
#积压任务等待时长,若有Task等待时间超过此值,则申请启动新的Executor
spark.dynamicAllocation.schedulerBacklogTimeout 1s
#spark shuffle老版本协议
spark.shuffle.useOldFetchProtocol true

说明:Spark shuffle服务的作用是管理Executor中的各Task的输出文件,主要是shuffle过程map端的输出文件。由于启用资源动态分配后,Spark会在一个应用未结束前,将已经完成任务,处于空闲状态的Executor关闭。Executor关闭后,其输出的文件,也就无法供其他Executor使用了。需要启用Spark shuffle服务,来管理各Executor输出的文件,这样就能关闭空闲的Executor,而不影响后续的计算任务了

3.3 Driver配置说明

Driver主要配置内存即可,相关的参数有

spark.driver.memory

spark.driver.memoryOverhead

spark.driver.memory

用于指定Driver进程的堆内存大小,

spark.driver.memoryOverhead

用于指定Driver进程的堆外内存大小。默认情况下,两者的关系如下:

spark.driver.memoryOverhead=spark.driver.memory*0.1

。两者的和才算一个Driver进程所需的总内存大小。

一般情况下,按照如下经验进行调整即可:假定

yarn.nodemanager.resource.memory-mb

设置为X,若X>50G,则Driver可设置为12G,若12G<X<50G,则Driver可设置为4G。若1G<X<12G,则Driver可设置为1G。 此处

yarn.nodemanager.resource.memory-mb

为64G,则Driver的总内存可分配12G,所以上述两个参数可配置为。

spark.driver.memory    10G
spark.yarn.driver.memoryOverhead    2G

3.4 Spark配置实操

修改

$HIVE_HOME/conf/spark-defaults.conf

,注意hive连哪台就修改哪台,也可以都分发

spark.master                               yarn
spark.eventLog.enabled                   true
spark.eventLog.dir    hdfs://myNameService1/spark-history
spark.executor.cores    4
spark.executor.memory    14g
spark.executor.memoryOverhead    2g
spark.driver.memory    10g
spark.driver.memoryOverhead    2g
spark.dynamicAllocation.enabled  true
spark.shuffle.service.enabled  true
spark.dynamicAllocation.executorIdleTimeout  60s
spark.dynamicAllocation.initialExecutors    1
spark.dynamicAllocation.minExecutors  1
spark.dynamicAllocation.maxExecutors  12
spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.shuffle.useOldFetchProtocol    true

然后配置Spark shuffle服务,Spark Shuffle服务的配置因Cluster Manager(standalone、Mesos、Yarn)的不同而不同。此处以Yarn作为Cluster Manager

  • 拷贝$SPARK_HOME/yarn/spark-3.0.0-yarn-shuffle.jar$HADOOP_HOME/share/hadoop/yarn/lib
  • 分发$HADOOP_HOME/share/hadoop/yarn/lib/yarn/spark-3.0.0-yarn-shuffle.jar
  • 修改$HADOOP_HOME/etc/hadoop/yarn-site.xml文件
<property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle,spark_shuffle</value></property><property><name>yarn.nodemanager.aux-services.spark_shuffle.class</name><value>org.apache.spark.network.yarn.YarnShuffleService</value></property>
  • 分发$HADOOP_HOME/etc/hadoop/yarn-site.xml文件
  • 重启Yarn

四、查询优化

具体的hive优化可以参考Hive文章中的企业级调优,这里仅当复习

1、Hive SQL执行计划

参考1:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Explain
参考2:https://cwiki.apache.org/confluence/download/attachments/44302539/hos_explain.pdf?version=1&modificationDate=1425575903211&api=v2

Hive SQL的执行计划,可由Explain查看。Explain呈现的执行计划,由一系列Stage组成,这个Stage具有依赖关系,每个Stage对应一个MapReduce Job或者Spark Job,或者一个文件系统操作等。每个Stage由一系列的Operator组成,一个Operator代表一个逻辑操作,例如TableScan Operator,Select Operator,Join Operator等。

desc formated xxx

2、分组聚合优化

优化思路为map-side聚合。所谓map-side聚合,就是在map端维护一个hash table,利用其完成分区内的、部分的聚合,然后将部分聚合的结果,发送至reduce端,完成最终的聚合。map-side聚合能有效减少shuffle的数据量,提高分组聚合运算的效率。

--启用map-side聚合set hive.map.aggr=true;--hash map占用map端内存的最大比例set hive.map.aggr.hash.percentmemory=0.5;--用于检测源表是否适合map-side聚合的条数。set hive.groupby.mapaggr.checkinterval=100000;--map-side聚合所用的HashTable,占用map任务堆内存的最大比例,若超出该值,则会对HashTable进行一次flush。set hive.map.aggr.hash.force.flush.memory.threshold=0.9;

3、Join优化

3.1 Hive Join算法概述

Hive拥有多种join算法,包括common join,map join,sort Merge Bucket Map Join等

  • common join

Map端负责读取参与join的表的数据,并按照关联字段进行分区,将其发送到Reduce端,Reduce端完成最终的关联操作

  • map join

若参与join的表中,有n-1张表足够小,Map端就会缓存小表全部数据,然后扫描另外一张大表,在Map端完成关联操作

  • Sort Merge Bucket Map Join

若参与join的表均为分桶表,且关联字段为分桶字段,且分桶字段是有序的,且大表的分桶数量是小表分桶数量的整数倍。此时,就可以以分桶为单位,为每个Map分配任务了,Map端就无需再缓存小表的全表数据了,而只需缓存其所需的分桶

3.2 Map Join优化

join的两表一大一小,可考虑map join优化

--启用map join自动转换set hive.auto.convert.join=true;--common join转map join小表阈值set hive.auto.convert.join.noconditionaltask.size=1612000

3.3 Sort Merge Bucket Map Join

两张表都相对较大,可以考虑采用SMB Map Join对分桶大小是没有要求的。首先需要依据源表创建两个的有序的分桶表,dwd_trade_order_detail_inc建议分36个bucket,dim_user_zip建议分6个bucket,注意分桶个数的倍数关系以及分桶字段和排序字段。(创建的时候就要创建桶,一般应用场景比较小)

--启动Sort Merge Bucket Map Join优化set hive.optimize.bucketmapjoin.sortedmerge=true;--使用自动转换SMB Joinset hive.auto.convert.sortmerge.join=true;

4、数据倾斜优化

4.1 数据倾斜说明

数据倾斜问题,通常是指参与计算的数据分布不均,即某个key或者某些key的数据量远超其他key,导致在shuffle阶段,大量相同key的数据被发往一个Reduce,进而导致该Reduce所需的时间远超其他Reduce,成为整个任务的瓶颈。Hive中的数据倾斜常出现在分组聚合和join操作的场景中

4.2 分组聚合导致的数据倾斜

-- 第一种方案--启用map-side聚合set hive.map.aggr=true;--hash map占用map端内存的最大比例set hive.map.aggr.hash.percentmemory=0.5;-- 第二种方案-- 启用skew groupby优化-- 其原理是启动两个MR任务,第一个MR按照随机数分区,将数据分散发送到Reduce,完成部分聚合,第二个MR按照分组字段分区,完成最终聚合--启用分组聚合数据倾斜优化set hive.groupby.skewindata=true;

4.3 join导致的数据倾斜

-- 第一种方案--启用map join自动转换set hive.auto.convert.join=true;--common join转map join小表阈值set hive.auto.convert.join.noconditionaltask.size

-- 第二种方案--启用skew join优化set hive.optimize.skewjoin=true;--触发skew join的阈值,若某个key的行数超过该参数值,则触发set hive.skewjoin.key=100000;

5、任务并行度优化

5.1 优化说明

对于一个分布式的计算任务而言,设置一个合适的并行度十分重要。在Hive中,无论其计算引擎是什么,所有的计算任务都可分为Map阶段和Reduce阶段。所以并行度的调整,也可从上述两个方面进行调整

5.2 Map阶段并行度

ap端的并行度,也就是Map的个数。是由输入文件的切片数决定的。一般情况下,Map端的并行度无需手动调整。Map端的并行度相关参数如下

--可将多个小文件切片,合并为一个切片,进而由一个map任务处理,默认开启的set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;--一个切片的最大值set mapreduce.input.fileinputformat.split.maxsize=256000000;

5.3 Reduce阶段并行度

Reduce端的并行度,相对来说,更需要关注。默认情况下,Hive会根据Reduce端输入数据的大小,估算一个Reduce并行度。但是在某些情况下,其估计值不一定是最合适的,故需要人为调整其并行度

--指定Reduce端并行度,默认值为-1,表示用户未指定set mapreduce.job.reduces;--Reduce端并行度最大值set hive.exec.reducers.max;--单个Reduce Task计算的数据量,用于估算Reduce并行度set hive.exec.reducers.bytes.per.reducer;

Reduce端并行度的确定逻辑为,若指定参数mapreduce.job.reduces的值为一个非负整数,则Reduce并行度为指定值。否则,Hive会自行估算Reduce并行度,估算逻辑如下:

假设Reduce端输入的数据量大小为totalInputBytes,参数

hive.exec.reducers.bytes.per.reducer

的值为bytesPerReducer,参数

hive.exec.reducers.max

的值为maxReducers,则Reduce端的并行度为:

其中,Reduce端输入的数据量大小,是从Reduce上游的Operator的Statistics(统计信息)中获取的。为保证Hive能获得准确的统计信息,需配置如下参数

--执行DML语句时,收集表级别的统计信息,默认trueset hive.stats.autogather=true;--执行DML语句时,收集字段级别的统计信息,默认trueset hive.stats.column.autogather=true;--计算Reduce并行度时,从上游Operator统计信息获得输入数据量,默认trueset hive.spark.use.op.stats=true;--计算Reduce并行度时,使用列级别的统计信息估算输入数据量,默认falseset hive.stats.fetch.column.stats=true;

6、小文件合并优化

小文件合并优化,分为两个方面,分别是Map端输入的小文件合并,和Reduce端输出的小文件合并

--可将多个小文件切片,合并为一个切片,进而由一个map任务处理set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;--开启合并Hive on Spark任务输出的小文件set hive.merge.sparkfiles=true;

其他优化:

参考1:https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/admin_hos_tuning.html#hos_tuning

参考2:https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started

标签: hive spark hadoop

本文转载自: https://blog.csdn.net/lemon_TT/article/details/137791011
版权归原作者 魅Lemon 所有, 如有侵权,请联系我们删除。

“Hive on spark源码编译与调优”的评论:

还没有评论