从’discover.partitions’='true’分析Hive的TBLPROPERTIES
前言
Hive3.1.2先建表:
showdatabases;use db_lzy;showtables;create external tableifnotexists test_external_20230502(
id int,
comment1 string,
comment2 string
)
stored as parquet
;create external tableifnotexists test_external_par_20230502(
id int,
comment1 string,
comment2 string
)
partitioned by(
dt string
)
stored as parquet
;
然后查看建表语句:
showcreatetable test_external_20230502;showcreatetable test_external_par_20230502;
可以看到:
hive (db_lzy)> show create table test_external_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_20230502`(`id` int,
`comment1` string,
`comment2` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_20230502'
TBLPROPERTIES ('bucketing_version'='2',
'transient_lastDdlTime'='1683028671')
Time taken: 0.181 seconds, Fetched: 15 row(s)
hive (db_lzy)>> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(`id` int,
`comment1` string,
`comment2` string)
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES ('bucketing_version'='2',
'transient_lastDdlTime'='1683028770')
Time taken: 0.121 seconds, Fetched: 17 row(s)
hive (db_lzy)>
可以看到都有
TBLPROPERTIES
表属性的信息,这个信息在写DDL时并没有指定,显然是自动生成的。
在CDP建表有时候会默认添加一个表属性:
'discover.partitions'='true'
这个表属性用于表分区的自动发现。接下来就从这个属性入手,分析Hive的TBLPROPERTIES。
走JDBC增加表属性
先写个简单的JDBC:
packagecom.zhiyong;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.Statement;/**
* @program: zhiyong_study
* @description: 测试Hive的TblProperties
* @author: zhiyong
* @create: 2023-05-02 19:31
**/publicclassTblpropertiesTest1{publicstaticvoidmain(String[] args)throwsException{finalString HIVE_JDBC_DRIVER ="org.apache.hive.jdbc.HiveDriver";finalString HIVE_HOST ="192.168.88.101";finalString HIVE_PORT ="10000";finalString HIVE_USER ="root";finalString HIVE_PASSWORD ="123456";String HIVE_URL ="jdbc:hive2://"+ HIVE_HOST +":"+ HIVE_PORT +"/default";Connection conn =null;Statement stmt =null;ResultSet resultSet =null;String sql_Alter_TBLPROPERTIES ="alter table db_lzy.test_external_par_20230502 set TBLPROPERTIES('zhiyong_key1' = 'zhiyong_value1')";Class.forName(HIVE_JDBC_DRIVER);
conn =DriverManager.getConnection(HIVE_URL, HIVE_USER, HIVE_PASSWORD);
stmt = conn.createStatement();System.out.println("修改表属性:");
stmt.execute(sql_Alter_TBLPROPERTIES);}}}
打断点调试来查看调用的堆栈,可以发现从
HiveConnection
到
TCLIService
、
TBinaryProtocol
、
TSocket
,调用过程和之前的:
https://lizhiyong.blog.csdn.net/article/details/129742904
分析的基本一致,底层就是走了Thrift协议。那么从语言无关的ssql-Client这端显然是看不出啥端倪了。更多有用的源码信息是在server端,暂时先不看。
此时:
hive (db_lzy)> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(`id` int,
`comment1` string,
`comment2` string)
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES ('bucketing_version'='2',
'last_modified_by'='hadoop',
'last_modified_time'='1683030293',
'transient_lastDdlTime'='1683030293',
'zhiyong_key1'='zhiyong_value1')
Time taken: 0.155 seconds, Fetched: 20 row(s)
hive (db_lzy)>
可以看到已经将配置项写入了表属性。
showdatabases;use db_hive_metastore;showtables;select*from TBLS;
显示:
从表名可以找到刚才修改了表属性的就是这个
TBL_ID=17
的表。
select*from TABLE_PARAMS
where TBL_ID=17;
显示:
显然set的表属性就记录在MetaStore元数据库【笔者用的Apache3.1.2是存到了MySQL】的
TABLE_PARAMS
表中。
可以看到这个表还记录了表的外部属性。所以SQL Boy们删除表数据时有时候会先执行:
altertable 库名.表名 set TBLPROPERTIES ('EXTERNAL'='FALSE');
比较奇怪的是在元数据表中没找到:
altertable 库名.表名 set TBLPROPERTIES ("external.table.purge"="true");
这个表属性对应的配置项。
但是在
HiveStrictManagedMigration.java
:
booleanmigrateToExternalTable(Table tableObj,TableType tableType)throwsHiveException{String msg;switch(tableType){case MANAGED_TABLE:if(AcidUtils.isTransactionalTable(tableObj)){
msg =createExternalConversionExcuse(tableObj,"Table is a transactional table");
LOG.debug(msg);returnfalse;}
LOG.info("Converting {} to external table ...",getQualifiedName(tableObj));if(!runOptions.dryRun){String command =String.format("ALTER TABLE %s SET TBLPROPERTIES ('EXTERNAL'='TRUE', 'external.table.purge'='true')",getQualifiedName(tableObj));runHiveCommand(command);}returntrue;case EXTERNAL_TABLE:
msg =createExternalConversionExcuse(tableObj,"Table is already an external table");
LOG.debug(msg);break;default:// VIEW/MATERIALIZED_VIEW
msg =createExternalConversionExcuse(tableObj,"Table type "+ tableType +" cannot be converted");
LOG.debug(msg);break;}returnfalse;}
可以看到和这个配置项相关的内容。说明在Hive3.1.2中该配置项有效。并且可以看出通过purge这个属性,使得外部表可以挂载文件。。。
手动新增一条:
此时:
hive (db_lzy)> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(`id` int,
`comment1` string,
`comment2` string)
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES ('bucketing_version'='2',
'last_modified_by'='hadoop',
'last_modified_time'='1683030293',
'transient_lastDdlTime'='1683030293',
'zhiyong_key1'='zhiyong_value1',
'zhiyong_key2'='zhiyong_value2')
Time taken: 0.098 seconds, Fetched: 21 row(s)
hive (db_lzy)>
重新拿到的建表DDL已经出现了这个配置。。。
至此就可以明白为神马Spark使用Overwrite模式写入时会重建表,并且增加一大坨的
spark.sql
相关的配置。。。也能够明白为神马IceBerg和Hudi会选用Hive的MetaStore做元数据存储。。。也能够明白为神马Flink会使用Hive的MetaStore做Catalog。。。
就地取材,拿这个表当kv键值对存储,就可以保存表的多种属性,存取都很方便。并且可以直接复用Hive的MetaStore,解析别的表的元数据也很方便。走Thrift来进行rpc远程调用和、走JDBC读源数据表,都可以无缝衔接Hive生态圈的多种组件。这可能也是云原生时代Hadoop和Yarn被OSS和K8S暴打,但是Hive还依旧霸气的原因。。。存储、运算、资源调度都可以用别的方式取代,但是文件映射表的MetaStore一时半会儿还没有太好的替代品。
在Github查找
笔者在CDP7.1.5建表时就见识过:
'discover.partitions'='true'
这个属性了。。。所以一定不是无中生有的。。。既然Apache3.1.2中找不到,那就到Github查看是否别的版本有它。。。
PartitionManagementTask类
可以看到:
packageorg.apache.hadoop.hive.metastore;importjava.util.ArrayList;importjava.util.HashSet;importjava.util.List;importjava.util.Map;importjava.util.Set;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hive.common.TableName;importorg.apache.hadoop.hive.metastore.api.Database;importorg.apache.hadoop.hive.metastore.api.Table;importorg.apache.hadoop.hive.metastore.api.hive_metastoreConstants;importorg.apache.hadoop.hive.metastore.conf.MetastoreConf;importorg.apache.hadoop.hive.metastore.conf.TimeValidator;importorg.apache.hadoop.hive.metastore.utils.MetaStoreUtils;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.google.common.annotations.VisibleForTesting;importcom.google.common.util.concurrent.ThreadFactoryBuilder;/**
* Partition management task is primarily responsible for partition retention and discovery based on table properties.
*
* Partition Retention - If "partition.retention.period" table property is set with retention interval, when this
* metastore task runs periodically, it will drop partitions with age (creation time) greater than retention period.
* Dropping partitions after retention period will also delete the data in that partition.
*
* Partition Discovery - If "discover.partitions" table property is set, this metastore task monitors table location
* for newly added partition directories and create partition objects if it does not exist. Also, if partition object
* exist and if corresponding directory does not exists under table location then the partition object will be dropped.
*
*/publicclassPartitionManagementTaskimplementsMetastoreTaskThread{privatestaticfinalLogger LOG =LoggerFactory.getLogger(PartitionManagementTask.class);publicstaticfinalString DISCOVER_PARTITIONS_TBLPROPERTY ="discover.partitions";publicstaticfinalString PARTITION_RETENTION_PERIOD_TBLPROPERTY ="partition.retention.period";privatestaticfinalLock lock =newReentrantLock();// these are just for testingprivatestaticint completedAttempts;privatestaticint skippedAttempts;privateConfiguration conf;@OverridepubliclongrunFrequency(TimeUnit unit){returnMetastoreConf.getTimeVar(conf,MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_FREQUENCY, unit);}@OverridepublicvoidsetConf(Configuration configuration){// we modify conf in setupConf(), so we make a copy
conf =newConfiguration(configuration);}@OverridepublicConfigurationgetConf(){return conf;}privatestaticbooleanpartitionDiscoveryEnabled(Map<String,String> params){return params !=null&& params.containsKey(DISCOVER_PARTITIONS_TBLPROPERTY)&&
params.get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true");}@Overridepublicvoidrun(){if(lock.tryLock()){
skippedAttempts =0;String qualifiedTableName =null;IMetaStoreClient msc =null;try{
msc =newHiveMetaStoreClient(conf);String catalogName =MetastoreConf.getVar(conf,MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME);String dbPattern =MetastoreConf.getVar(conf,MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN);String tablePattern =MetastoreConf.getVar(conf,MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN);String tableTypes =MetastoreConf.getVar(conf,MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES);Set<String> tableTypesSet =newHashSet<>();for(String type : tableTypes.split(",")){try{
tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name());}catch(IllegalArgumentException e){// ignore
LOG.warn("Unknown table type: {}", type);}}// if tableTypes is empty, then a list with single empty string has to specified to scan no tables.// specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables.if(tableTypesSet.isEmpty()){
LOG.info("Skipping partition management as no table types specified");return;}StringBuilder filterBuilder =newStringBuilder().append(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS).append("discover__partitions").append(" like \"true\" ");boolean external = tableTypesSet.contains(TableType.EXTERNAL_TABLE.name());boolean managed = tableTypesSet.contains(TableType.MANAGED_TABLE.name());if(!managed && external){// only for external tables
filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE).append(" = \"").append(TableType.EXTERNAL_TABLE.name()).append("\" ");}elseif(managed &&!external){// only for managed tables
filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE).append(" = \"").append(TableType.MANAGED_TABLE.name()).append("\" ");}if(!tablePattern.trim().isEmpty()){
filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME).append(" like \"").append(tablePattern.replaceAll("\\*",".*")).append("\"");}List<String> databases = msc.getDatabases(catalogName, dbPattern);List<TableName> candidates =newArrayList<>();for(String db : databases){Database database = msc.getDatabase(catalogName, db);if(MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)){
LOG.debug("Skipping table under database: {}", db);continue;}if(MetaStoreUtils.isDbBeingPlannedFailedOver(database)){
LOG.info("Skipping table belongs to database {} being failed over.", db);continue;}List<String> tablesNames = msc.listTableNamesByFilter(catalogName, db,
filterBuilder.toString(),-1);
tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));}if(candidates.isEmpty()){
LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern);return;}// TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation is expensive. Sharing MSC also// will not be safe unless synchronized MSC is used. Using synchronized MSC in multi-threaded context also// defeats the purpose of thread pooled msck repair.int threadPoolSize =MetastoreConf.getIntVar(conf,MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE);finalExecutorService executorService =Executors.newFixedThreadPool(Math.min(candidates.size(), threadPoolSize),newThreadFactoryBuilder().setDaemon(true).setNameFormat("PartitionDiscoveryTask-%d").build());CountDownLatch countDownLatch =newCountDownLatch(candidates.size());
LOG.info("Found {} candidate tables for partition discovery", candidates.size());setupMsckPathInvalidation();Configuration msckConf =Msck.getMsckConf(conf);for(TableName table : candidates){// this always runs in 'sync' mode where partitions can be added and droppedMsckInfo msckInfo =newMsckInfo(table.getCat(), table.getDb(), table.getTable(),null,null,true,true,true,-1);
executorService.submit(newMsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch));}
countDownLatch.await();
executorService.shutdownNow();}catch(Exception e){
LOG.error("Exception while running partition discovery task for table: "+ qualifiedTableName, e);}finally{if(msc !=null){
msc.close();}
lock.unlock();}
completedAttempts++;}else{
skippedAttempts++;
LOG.info("Lock is held by some other partition discovery task. Skipping this attempt..#{}", skippedAttempts);}}publicstaticlonggetRetentionPeriodInSeconds(finalTable table){String retentionPeriod;long retentionSeconds =-1;if(table.getParameters()!=null&& table.getParameters().containsKey(PARTITION_RETENTION_PERIOD_TBLPROPERTY)){
retentionPeriod = table.getParameters().get(PARTITION_RETENTION_PERIOD_TBLPROPERTY);if(retentionPeriod.isEmpty()){
LOG.warn("'{}' table property is defined but empty. Skipping retention period..",
PARTITION_RETENTION_PERIOD_TBLPROPERTY);}else{try{TimeValidator timeValidator =newTimeValidator(TimeUnit.SECONDS);
timeValidator.validate(retentionPeriod);
retentionSeconds =MetastoreConf.convertTimeStr(retentionPeriod,TimeUnit.SECONDS,TimeUnit.SECONDS);}catch(IllegalArgumentException e){
LOG.warn("'{}' retentionPeriod value is invalid. Skipping retention period..", retentionPeriod);// will return -1}}}return retentionSeconds;}privatevoidsetupMsckPathInvalidation(){// if invalid partition directory appears, we just skip and move on. We don't want partition management to throw// when invalid path is encountered as these are background threads. We just want to skip and move on. Users will// have to fix the invalid paths via external means.
conf.set(MetastoreConf.ConfVars.MSCK_PATH_VALIDATION.getVarname(),"skip");}privatestaticclassMsckThreadimplementsRunnable{privateMsckInfo msckInfo;privateConfiguration conf;privateString qualifiedTableName;privateCountDownLatch countDownLatch;MsckThread(MsckInfo msckInfo,Configuration conf,String qualifiedTableName,CountDownLatch countDownLatch){this.msckInfo = msckInfo;this.conf = conf;this.qualifiedTableName = qualifiedTableName;this.countDownLatch = countDownLatch;}@Overridepublicvoidrun(){try{Msck msck =newMsck(true,true);
msck.init(conf);
msck.repair(msckInfo);}catch(Exception e){
LOG.error("Exception while running partition discovery task for table: "+ qualifiedTableName, e);}finally{// there is no recovery from exception, so we always count down and retry in next attempt
countDownLatch.countDown();}}}@VisibleForTestingpublicstaticintgetSkippedAttempts(){return skippedAttempts;}@VisibleForTestingpublicstaticintgetCompletedAttempts(){return completedAttempts;}}
功夫不负有心人,终于在这个类找到了这个表属性。这个任务显然就是守护进程,定时刷新Hive表的MetaStore,从而自动删除多余的分区【对应HDFS只有路径没有文件,但是存在于MetaStore的分区】并且添加缺失的分区【对应HDFS只有路径和文件,但是不存在于MetaStore的分区】。
在获取到同步锁后,就会跑MSCK命令。
在
runFrequency
方法中获取到的刷新频率,显然就是线程池的吊起频率。。。
packageorg.apache.hadoop.hive.metastore;importorg.apache.hadoop.conf.Configurable;importorg.apache.hadoop.conf.Configuration;importjava.util.concurrent.TimeUnit;/**
* Any task that will run as a separate thread in the metastore should implement this
* interface.
*/publicinterfaceMetastoreTaskThreadextendsConfigurable,Runnable{/**
* Get the frequency at which the thread should be scheduled in the thread pool. You must call
* {@link #setConf(Configuration)} before calling this method.
* @param unit TimeUnit to express the frequency in.
* @return frequency
*/longrunFrequency(TimeUnit unit);}
可惜在3.1.2的Hive中还没有这个类:
那么配置类中也就找不到对应的配置项:
还是要去Github找新一点的版本。。。
MetastoreConf类
可以看到:
// Partition management task paramsPARTITION_MANAGEMENT_TASK_FREQUENCY("metastore.partition.management.task.frequency","metastore.partition.management.task.frequency",6,TimeUnit.HOURS,"Frequency at which timer task runs to do automatic partition management for tables\n"+"with table property 'discover.partitions'='true'. Partition management include 2 pieces. One is partition\n"+"discovery and other is partition retention period. When 'discover.partitions'='true' is set, partition\n"+"management will look for partitions in table location and add partitions objects for it in metastore.\n"+"Similarly if partition object exists in metastore and partition location does not exist, partition object\n"+"will be dropped. The second piece in partition management is retention period. When 'discover.partition'\n"+"is set to true and if 'partition.retention.period' table property is defined, partitions that are older\n"+"than the specified retention period will be automatically dropped from metastore along with the data."),
显然这个配置项也不是出现在Hive3.1.2,而是更新的版本!!!而且,默认值是6小时???
HiveMetaStore
在
HiveMetaStore
中:
privatestaticvoidstartRemoteOnlyTasks(Configuration conf)throwsException{if(MetastoreConf.getBoolVar(conf,ConfVars.COMPACTOR_INITIATOR_ON)){ThreadPool.initialize(conf);Collection<String> taskNames =MetastoreConf.getStringCollection(conf,ConfVars.TASK_THREADS_REMOTE_ONLY);Iterator var2 = taskNames.iterator();while(var2.hasNext()){String taskName =(String)var2.next();MetastoreTaskThread task =(MetastoreTaskThread)JavaUtils.newInstance(JavaUtils.getClass(taskName,MetastoreTaskThread.class));
task.setConf(conf);long freq = task.runFrequency(TimeUnit.MILLISECONDS);ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq,TimeUnit.MILLISECONDS);}}}
可以看到会按照频率来使用线程池调度这些任务。单位是毫秒。。。
而
COMPACTOR_INITIATOR_ON("metastore.compactor.initiator.on","hive.compactor.initiator.on",false,"Whether to run the initiator and cleaner threads on this metastore instance or not.\nSet this to true on one instance of the Thrift metastore service as part of turning\non Hive transactions. For a complete list of parameters required for turning on\ntransactions, see hive.txn.manager."),
又是和:
/**
* An interface that allows Hive to manage transactions. All classes
* implementing this should extend {@link HiveTxnManagerImpl} rather than
* implementing this directly.
*/publicinterfaceHiveTxnManager{}
有关的。显然这是保证Hive事务的接口。。。
至此,可知Hive会在
MetastoreConf
类写一些编码级的配置【堆内存中,可以在session会话中set部分值】,并且在元数据表
TABLE_PARAMS
固化一些kv键值对配置。这样,Hive的MetaStore守护进程就可以从这2种方式存储的kv键值根据key拿到value,进而执行事务的操作。
CDP7.1.5【或者更早的发行版】就支持的配置项,对应Apache这边需要alpha4.0才支持。。。白piao版总是要比缴纳了保护费的企业版差劲一些,不管是稳定性还是功能性。。。Kylin4.0企业版的功能在开源的5.0姗姗来迟。。。也是这道理。。。甜点要先服务付费客户,卖剩下的给白piao的客户免费品尝。。。不过能白piao就不错了,还要啥自行车。。。
自动分区发现的使用
Apache版本
按照Apache Hive的官方文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-DiscoverPartitions
这是Hive4.0的特性。
Discover Partitions
Table property “discover.partitions” can now be specified to control automatic discovery and synchronization of partition metadata in Hive Metastore.
When Hive Metastore Service (HMS) is started in remote service mode, a background thread (PartitionManagementTask) gets scheduled periodically every 300s (configurable via metastore.partition.management.task.frequency config) that looks for tables with “discover.partitions” table property set to true and performs MSCK REPAIR in sync mode. If the table is a transactional table, then Exclusive Lock is obtained for that table before performing MSCK REPAIR. With this table property, “MSCK REPAIR TABLE table_name SYNC PARTITIONS” is no longer required to be run manually.
Version information
As of Hive 4.0.0 (HIVE-20707).
对应这个issue:
https://issues.apache.org/jira/browse/HIVE-20707。
Partition Retention
Table property “partition.retention.period” can now be specified for partitioned tables with a retention interval. When a retention interval is specified, the background thread running in HMS (refer Discover Partitions section), will check the age (creation time) of the partition and if the partition’s age is older than the retention period, it will be dropped. Dropping partitions after retention period will also delete the data in that partition. For example, if an external partitioned table with ‘date’ partition is created with table properties “discover.partitions”=“true” and “partition.retention.period”=“7d” then only the partitions created in last 7 days are retained.
Version information
As of Hive 4.0.0 (HIVE-20707).
这个特性是分区保留。。。
CDP版本
也就只能在CDP去使用这个特性了。。。Alpha的Apache Hive4.0估计一时半会儿也没什么人去用。
在CDP中使用Hive时,参照官方的说法:https://docs.cloudera.com/runtime/7.0.3/using-hiveql/topics/hive-automate-msck.html
Automated partition discovery and repair is useful for processing log data, and other data, in Spark and Hive catalogs. You learn how to set the partition discovery parameter to suit your use case. An aggressive partition discovery and repair configuration can delay the upgrade process.
Apache Hive can automatically and periodically discover discrepancies in partition metadata in the Hive metastore and in corresponding directories, or objects, on the file system. After discovering discrepancies, Hive performs synchronization.
The
discover.partitions
table property enables and disables synchronization of the file system with partitions. In external partitioned tables, this property is enabled (
true
) by default when you create the table. To a legacy external table (created using a version of Hive that does not support this feature), you need to add
discover.partitions
to the table properties to enable partition discovery.
By default, the discovery and synchronization of partitions occurs every 5 minutes. This is too often if you are upgrading and can result in the Hive DB being queried every few milliseconds, leading to performance degradation. During upgrading the high frequency of batch routines dictates running discovery and synchronization infrequently, perhaps hourly or even daily. You can configure the frequency as shown in this task.
- Assuming you have an external table created using a version of Hive that does not support partition discovery, enable partition discovery for the table.
ALTER TABLE exttbl SET TBLPROPERTIES ('discover.partitions' = 'true');
- In Cloudera Manager, click Clusters > Hive > Configuration, search for
Hive Server Advanced Configuration Snippet (Safety Valve) for hive-site.xml
. - Add the following property and value to hive-site.xml: Property:
metastore.partition.management.task.frequency
Value: 600.This action sets synchronization of partitions to occur every 10 minutes expressed in seconds. If you are upgrading, consider running discovery and synchonization once every 24 hours by setting the value to 86,400 seconds.
也就是说,CDP的Hive默认外部的分区表是开启了分区自动发现特性,并且默认是5分钟吊起一次。这里设置时单位是秒。至少CDP7.0.3就已经可用了。。。
使用
如果不符合期望,可以手动修改:
ALTERTABLE 表名 SET TBLPROPERTIES ('discover.partitions'='true');ALTERTABLE 表名 SET TBLPROPERTIES ('metastore.partition.management.task.frequency'=600);
这。。。是写到元数据表做持久化的,和Apache的堆内存对象又不同了。。。
不过按照:
/**
* Get the variable as a long indicating a period of time
* @param conf configuration to retrieve it from
* @param var variable to retrieve
* @param outUnit Timeout to return value in
* @return value, or default value if value not in config file
*/publicstaticlonggetTimeVar(Configuration conf,ConfVarsvar,TimeUnit outUnit){assertvar.defaultVal.getClass()==TimeValue.class;String val = conf.get(var.varname);if(val ==null){// Look for it under the old Hive name
val = conf.get(var.hiveName);}if(val !=null){returnconvertTimeStr(val,((TimeValue)var.defaultVal).unit, outUnit);}else{return outUnit.convert(((TimeValue)var.defaultVal).val,((TimeValue)var.defaultVal).unit);}}
如果堆内存没有,就去配置中拿,也没啥毛病。。。
别的问题
自动发现是5分钟,也就是近实时。如果一个任务跑完,下一个任务立即被吊起,这5分钟【甚至更久】里Hive On Tez任务是读不到新分区的!!!Spark或者Flink直接读Parquet文件是另一回事。。。
所以:https://lizhiyong.blog.csdn.net/article/details/127680034
笔者之前做平台开发时,还需要+一个
msck
或者
alter table add partition
,只有这条命令也刷完了,才能保证最终一致性!!!
别的问题
自动发现是5分钟,也就是近实时。如果一个任务跑完,下一个任务被吊起,这5分钟【甚至更久】里Hive On Tez任务是读不到新分区的!!!Spark或者Flink直接读Parquet文件是另一回事。。。
所以:https://lizhiyong.blog.csdn.net/article/details/127680034
笔者之前做平台开发时,还需要+一个
msck
或者
alter table add partition
,只有这条命令也刷完了,才能保证最终一致性!!!
总结
通过一顿操作猛如虎。。。笔者找到了Hive的MetaStore存储的元数据,并且分析出了和表配置相关的运行机理。恰巧,也意识到了白piao版在功能上的滞后性。。。还是有所收获的。。。
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/130468723
版权归原作者 虎鲸不是鱼 所有, 如有侵权,请联系我们删除。