0


Flink Catalog解读

文章目录

01 引言

我们知道 Flink

Table

(表)、

View

(视图)、

Function

(函数/算子)、

Database

(数据库)的概念,这都类似于我们平常使用的关系型数据库里面的概念。

相对于关系型数据库的这些概念,Flink 里还有一个 **

Catalog

(目录)** 的概念,本文来讲解下。

在这里插入图片描述

02 Catalog

2.1 Catalog概述

数据处理最关键的方面之一是管理元数据

  • 元数据可以是临时的,例如在Flink中临时表、或者通过 TableEnvironment 注册的 UDF
  • 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。

Catalog在Flink中提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

Catalog 

提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

2.2 Catalog分类

Catalog目前分为以下几类:
分类描述缺陷GenericInMemoryCatalog基于内存实现的 Catalog所有元数据只在 session 的生命周期内可用JdbcCatalog可以将 Flink 通过 JDBC 协议连接到关系数据库JDBC Catalog只实现了PostgresCatalogHiveCatalog作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口Hive Metastore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 区分大小写。自定义 Catalog通过实现 Catalog 接口来开发自定义 Catalog-

2.3 Catalog API

2.3.1 数据库操作

// create database
catalog.createDatabase("mydb",newCatalogDatabaseImpl(...),false);// drop database
catalog.dropDatabase("mydb",false);// alter database
catalog.alterDatabase("mydb",newCatalogDatabaseImpl(...),false);// get database
catalog.getDatabase("mydb");// check if a database exist
catalog.databaseExists("mydb");// list databases in a catalog
catalog.listDatabases("mycatalog");

2.3.2 表操作

// create table
catalog.createTable(newObjectPath("mydb","mytable"),newCatalogTableImpl(...),false);// drop table
catalog.dropTable(newObjectPath("mydb","mytable"),false);// alter table
catalog.alterTable(newObjectPath("mydb","mytable"),newCatalogTableImpl(...),false);// rename table
catalog.renameTable(newObjectPath("mydb","mytable"),"my_new_table");// get table
catalog.getTable("mytable");// check if a table exist or not
catalog.tableExists("mytable");// list tables in a database
catalog.listTables("mydb");

2.3.3 视图操作

// create view
catalog.createTable(newObjectPath("mydb","myview"),newCatalogViewImpl(...),false);// drop view
catalog.dropTable(newObjectPath("mydb","myview"),false);// alter view
catalog.alterTable(newObjectPath("mydb","mytable"),newCatalogViewImpl(...),false);// rename view
catalog.renameTable(newObjectPath("mydb","myview"),"my_new_view",false);// get view
catalog.getTable("myview");// check if a view exist or not
catalog.tableExists("mytable");// list views in a database
catalog.listViews("mydb");

2.3.4 分区操作

// create view
catalog.createPartition(newObjectPath("mydb","mytable"),newCatalogPartitionSpec(...),newCatalogPartitionImpl(...),false);// drop partition
catalog.dropPartition(newObjectPath("mydb","mytable"),newCatalogPartitionSpec(...),false);// alter partition
catalog.alterPartition(newObjectPath("mydb","mytable"),newCatalogPartitionSpec(...),newCatalogPartitionImpl(...),false);// get partition
catalog.getPartition(newObjectPath("mydb","mytable"),newCatalogPartitionSpec(...));// check if a partition exist or not
catalog.partitionExists(newObjectPath("mydb","mytable"),newCatalogPartitionSpec(...));// list partitions of a table
catalog.listPartitions(newObjectPath("mydb","mytable"));// list partitions of a table under a give partition spec
catalog.listPartitions(newObjectPath("mydb","mytable"),newCatalogPartitionSpec(...));// list partitions of a table by expression filter
catalog.listPartitions(newObjectPath("mydb","mytable"),Arrays.asList(epr1,...));

2.3.5 函数操作

catalog.createFunction(newObjectPath("mydb","myfunc"),newCatalogFunctionImpl(...),false);// drop function
catalog.dropFunction(newObjectPath("mydb","myfunc"),false);// alter function
catalog.alterFunction(newObjectPath("mydb","myfunc"),newCatalogFunctionImpl(...),false);// get function
catalog.getFunction("myfunc");// check if a function exist or not
catalog.functionExists("myfunc");// list functions in a database
catalog.listFunctions("mydb");

2.4 Catalog 示例(SQL Client的方式)

① 首先需要注册Catalog:用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中,创建方式如下(可以使用Flink里面的Factory工厂模式动态加载):

tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

② 指定使用的内容:Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF,代码如下:

Flink SQL>USE CATALOG myCatalog;
Flink SQL>USE myDB;

也可以通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息,代码如下:

Flink SQL>SELECT*FROM not_the_current_catalog.not_the_current_db.my_table;

③ 其它常规命令

-- 列出可用的 Catalog
Flink SQL>show catalogs;-- 列出可用的数据库 
Flink SQL>showdatabases;-- 列出可用的表
Flink SQL>showtables;

03 文末

本文主要讲解了Flink Catalog的概念以及用法,如果大家有兴趣可以进一步去官网查看相关的文档,这里我列出相关比较核心的文档:

接下来我的计划是编写 “如何自定义Catalog” ,以及Catalog的应用场景(有兴趣可先阅读《Ververica Platform-阿里巴巴全新Flink企业版揭秘》)相关的博客,谢谢大家的阅读,希望能帮助到大家,本文完!


本文转载自: https://blog.csdn.net/qq_20042935/article/details/125925410
版权归原作者 杨林伟 所有, 如有侵权,请联系我们删除。

“Flink Catalog解读”的评论:

还没有评论