0


Flink Catalog

1.Flink侧创建

  按照SQL的解析处理流程在Parse解析SQL以后,进入执行流程——executeInternal。
  其中有个分支专门处理创建Catalog的SQL命令

}elseif(operation instanceofCreateCatalogOperation){returncreateCatalog((CreateCatalogOperation) operation);
createCatalog方法里完成两件事:1、创建Catalog对象;2、向catalogManager注册
Catalog catalog =FactoryUtil.createCatalog(
                catalogName, properties, tableConfig, userClassLoader);
catalogManager.registerCatalog(catalogName, catalog);

  创建Catalog会去全包查找对应的CatalogFactory的子类,然后使用配置的子类构建

finalCatalogFactory legacyFactory =TableFactoryService.find(CatalogFactory.class, options, classLoader);return legacyFactory.createCatalog(catalogName, options);

  这里注意,上面的步骤只查询classpath下的类,像HiveCatalog这种外置增加的,在这个步骤里找不到,会抛出NoMatchingTableFactoryException异常之后继续其他步骤处理来获取

}catch(NoMatchingTableFactoryException e){// No matching legacy factory found, try using the new stackfinalDefaultCatalogContext discoveryContext =newDefaultCatalogContext(catalogName, options, configuration, classLoader);try{finalCatalogFactory factory =getCatalogFactory(discoveryContext);

  最终在FactoryUtil.discoverFactory的方法中进行过滤查找,这里用到了type配置做过滤,基于Factory的

factoryIdentifier获取工厂的字段与配置做对比
finalList<Factory> matchingFactories =
        foundFactories.stream().filter(f -> f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());

2.HiveCatalog

  获取到对应的Factory以后,会调用其createCatalog方法创建对应的Catalog

returnnewHiveCatalog(
        context.getName(),
        helper.getOptions().get(DEFAULT_DATABASE),
        helper.getOptions().get(HIVE_CONF_DIR),
        helper.getOptions().get(HADOOP_CONF_DIR),
        helper.getOptions().get(HIVE_VERSION));

  HiveCatalog的整个创建过程主要是发现Hive配置的过程,其他接口就是对库表的操作接口
  获取配置主要是基于上面hive-conf-dir、hadoop-conf-dir来的,首先是根据这两个配置去获取hive配置,如果都获取不到,会从classpath下面去获取hive的配置文件

URL hiveSite =Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE);

3.IcebergCatalog

  Iceberg走的应该是前面TableFactoryService.find能找到的接口,因为它实现的是properties参数的接口,clusterHadoopConf()就是调用的Flink里的方法获取Hadoop的配置

@OverridepublicCatalogcreateCatalog(String name,Map<String,String> properties){returncreateCatalog(name, properties,clusterHadoopConf());}

3.1.CatalogLoader

  第一步是创建CatalogLoader,这是Iceberg Catalog的类加载器
  这里可以配置自定义类加载器,相关配置:catalog-impl,如果没有配置则走默认
  默认流程根据catalog-type配置选择实例化Hive的还是Hadoop的,默认是Hive的

String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE,ICEBERG_CATALOG_TYPE_HIVE);switch(catalogType.toLowerCase(Locale.ENGLISH)){caseICEBERG_CATALOG_TYPE_HIVE:// The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in// that case it will// fallback to parse those values from hadoop configuration which is loaded from classpath.String hiveConfDir = properties.get(HIVE_CONF_DIR);String hadoopConfDir = properties.get(HADOOP_CONF_DIR);Configuration newHadoopConf =mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir);returnCatalogLoader.hive(name, newHadoopConf, properties);caseICEBERG_CATALOG_TYPE_HADOOP:returnCatalogLoader.hadoop(name, hadoopConf, properties);}

  创建CatalogLoader主要就是进行一些基本参数的设置

privateHiveCatalogLoader(String catalogName,Configuration conf,Map<String,String> properties){this.catalogName = catalogName;this.hadoopConf =newSerializableConfiguration(conf);this.uri = properties.get(CatalogProperties.URI);this.warehouse = properties.get(CatalogProperties.WAREHOUSE_LOCATION);this.clientPoolSize =
      properties.containsKey(CatalogProperties.CLIENT_POOL_SIZE)?Integer.parseInt(properties.get(CatalogProperties.CLIENT_POOL_SIZE)):CatalogProperties.CLIENT_POOL_SIZE_DEFAULT;this.properties =Maps.newHashMap(properties);}

3.2.FlinkCatalog

  接下来就是进行一些配置然后创建FlinkCatalog
  配置里注意Hadoop有一个特殊的配置:base-namespace,这是配置namespa的,会自动带上前缀,应该就是在warehouse加上前缀
  这里还有缓存配置:cache-enabled、cache.expiration-interval-ms,控制Catalog是否缓存表入口

3.3.loadCatalog

  FlinkCatalog会使用CatalogLoader加载Catalog,最终会到CatalogUtil.loadCatalog()
  这里最终会用Class.forName来加载类,基于Constructor来构建实例

  ctor =DynConstructors.builder(Catalog.class).impl(impl).buildChecked();

  catalog = ctor.newInstance();

3.4.HiveCatalog

  Hive类型最终创建的是org.apache.iceberg.hive.HiveCatalog
  initialize初始化也基本上是进行配置,有两个注意的对象:FileIO、CachedClientPool
  io-impl可以配置文件读取,默认用Iceberg的HadoopFileIO

this.fileIO =
    fileIOImpl ==null?newHadoopFileIO(conf):CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

  CachedClientPool是一个Hive连接缓存,缓存的是HiveMetaStoreClient

returnGET_CLIENT.invoke(
    hiveConf,(HiveMetaHookLoader) tbl ->null,HiveMetaStoreClient.class.getName());
标签: flink 大数据

本文转载自: https://blog.csdn.net/blackjjcat/article/details/129195227
版权归原作者 不甚了然 所有, 如有侵权,请联系我们删除。

“Flink Catalog”的评论:

还没有评论