0


flink所有支持的catalog详解

1. 版本说明

本文档介绍的各种

flink sql

的语法基于

flink-1.13.x

,flink版本低于1.13.x的用户,在sql运行出错误时,需要自行去flink官网查看对应版本的语法支持。

另外,flink新版本支持的语法,文档中会进行特殊标注,说明对应语法在 flink 哪个版本开始支持,但凡是没有特殊标注的,均支持

flink-1.13.x

及以上版本。

2. hive catalog

sql

CREATE CATALOG myhive WITH('type'='hive','default-database'='mydatabase','hive-conf-dir'='/opt/hive-conf');-- SQLUSE CATALOG myhive;

下表列出了通过 DDL 定义 HiveCatalog 时所支持的参数。
参数必选默认值类型描述type是(无)StringCatalog 的类型。 创建 HiveCatalog 时,该参数必须设置为 hive。hive-conf-dir否(无)String指向包含 hive-site.xml 目录的 URI。 该 URI 必须是 Hadoop 文件系统所支持的类型。 如果指定一个相对 URI,即不包含 scheme,则默认为本地文件系统。如果该参数没有指定,我们会在 class path 下查找 hive-site.xml。default-database否defaultString当一个catalog被设为当前catalog时,所使用的默认当前database。hive-version否(无)StringHiveCatalog 能够自动检测使用的 Hive 版本。我们建议不要手动设置 Hive 版本,除非自动检测机制失败。hadoop-conf-dir否(无)StringHadoop 配置文件目录的路径。目前仅支持本地文件系统路径。我们推荐使用

HADOOP_CONF_DIR

环境变量来指定 Hadoop 配置。因此仅在环境变量不满足您的需求时再考虑使用该参数,例如当您希望为每个 HiveCatalog 单独设置 Hadoop 配置时。

3. postGres catalog

注意,该特性只支持到 flink-1.14.x ,在 flink-1.15.x 中,将

postgre

mysql

合并到一起实现了。

JdbcCatalog 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。

目前,PostgresCatalog 是 JDBC Catalog 的唯一实现,PostgresCatalog 只支持有限的 Catalog 方法,包括:

// Postgres Catalog 支持的方法PostgresCatalog.databaseExists(String databaseName)PostgresCatalog.listDatabases()PostgresCatalog.getDatabase(String databaseName)PostgresCatalog.listTables(String databaseName)PostgresCatalog.getTable(ObjectPath tablePath)PostgresCatalog.tableExists(ObjectPath tablePath)

其他的 Catalog 方法现在还是不支持的。

3.1. PostgresCatalog 的使用

请参阅 Dependencies 部分了解如何配置 JDBC 连接器和 Postgres 驱动。

Postgres catalog 支持以下参数:

  • name:必填,catalog 的名称。
  • default-database:必填,默认要连接的数据库。
  • username:必填,Postgres 账户的用户名。
  • password:必填,账户的密码。
  • base-url:必填,应该符合 jdbc:postgresql://<ip>:<port> 的格式,同时这里不应该包含数据库名。

SQL

CREATE CATALOG mypg WITH('type'='jdbc','default-database'='...','username'='...','password'='...','base-url'='...');USE CATALOG mypg;

3.2. PostgresSQL 元空间映射

除了数据库之外,postgresSQL 还有一个额外的命名空间

schema

。一个 Postgres 实例可以拥有多个数据库,每个数据库可以拥有多个 schema,其中一个 schema 默认名为

public

,每个 schema 可以包含多张表。

在 Flink 中,当查询由 Postgres catalog 注册的表时,用户可以使用

schema_name.table_name

或只写

table_name

,其中

schema_name

是可选的,默认值为

public

因此,Flink Catalog 和 Postgres 之间的元空间映射如下:
Flink Catalog 元空间结构Postgres 元空间结构catalog 名称 (只能在flink中定义)N/Adatabase 名称database 名称table 名称[schema_name.]table_name
Flink 中的 Postgres 表的完整路径应该是:

<catalog>.<db>.`<schema.table>`

如果指定了 schema,请注意需要转义

<schema.table>

,也就是使用转义字符 ` 将其括起来。

这里提供了一些访问 Postgres 表的例子:

-- 扫描 'public' schema(即默认 schema)中的 'test_table' 表,schema 名称可以省略SELECT*FROM mypg.mydb.test_table;SELECT*FROM mydb.test_table;SELECT*FROM test_table;-- 扫描 'custom_schema' schema 中的 'test_table2' 表,-- 自定义 schema 不能省略,并且必须与表一起转义。SELECT*FROM mypg.mydb.`custom_schema.test_table2`SELECT*FROM mydb.`custom_schema.test_table2`;SELECT*FROM`custom_schema.test_table2`;

4. jdbc catalog

从 flink-1.15.x 开始支持,并且将 postgre 和 mysql 合并到了一起实现。

JdbcCatalog 允许用户使用 flink 通过 JDBC 协议去连接关系型数据库。

目前已经有两个 JDBC catalog 的实现,Postgres CatalogMySQL Catalog 。他们支持下面的 catalog 函数,其他函数目前还不支持。

// Postgres 和 MySQL Catalog 支持的函数databaseExists(String databaseName);listDatabases();getDatabase(String databaseName);listTables(String databaseName);getTable(ObjectPath tablePath);tableExists(ObjectPath tablePath);

其他的 catalog 函数目前还不支持。

4.1. 使用 JDBC catalog

该章节描述怎么创建并使用 Postgres Catalog 或 MySQL Catalog 。怎么添加 JDBC 连接器和相关的驱动,请参考上面的依赖章节。

JDBC catalog 支持下面的选项配置:

  • name:必选,catalog的名称。
  • default-database:必选,连快将诶的默认数据库。
  • username:必选,Postgres/MySQL 账户的用户名。
  • password:必选,账户的密码。
  • base-url:必选,不要包含数据库名称。 - 对于 Postgres Catalog ,该参数应该为:jdbc:postgresql://<ip>:<port>- 对于 MySQL Catalog ,该参数应该为:jdbc:mysql://<ip>:<port>

SQL:

CREATE CATALOG my_catalog WITH('type'='jdbc','default-database'='...','username'='...','password'='...','base-url'='...');USE CATALOG my_catalog;

4.2. JDBC Catalog for PostgreSQL

PostgreSQL 元空间映射。

PostgreSQL 基于数据库有一个额外的命名空间作为 schema 。一个 Postgres 示例可以有多个数据库,每个数据可以有多个 schema ,默认的 schema 名为 public ,每个 schema 可以有多张表。

在 flink 中,当查询注册到 Postgres catalog 中的表时,用户可以使用 schema_name.table_name 或者是只使用 table_name 。schema_name 是可选的,默认为 public

在 flink catalog 和 Postgres 之间的元空间映射如下:
Flink Catalog 元空间结构Postgres 元空间结构catalog name (只能在 flink 中定义)N/Adatabase namedatabase nametable name[schema_name.]table_name
Flink 中的 Postgres 表的完整路径应该是:

<catalog>.<db>.`<schema.table>`

如果指定了 schema,请注意需要转义

<schema.table>

,也就是使用转义字符 ` 将其括起来。

下面是一些访问 Postgres 表的案例:

-- 浏览名为 'public' 的 schema 下的 'test_table' 表,使用默认的 schema,schema 名称可以被省略。SELECT*FROM mypg.mydb.test_table;SELECT*FROM mydb.test_table;SELECT*FROM test_table;-- 浏览名为 'custom_schema' 的 schema 下的 'test_table2' 表,自定义 schema 不能被省略,而且必须和表一起被转义。SELECT*FROM mypg.mydb.`custom_schema.test_table2`SELECT*FROM mydb.`custom_schema.test_table2`;SELECT*FROM`custom_schema.test_table2`;

4.3. JDBC Catalog for MySQL

MySQL 元空间映射。

MySQL 实例中的数据库和注册到 MySQL catalog 中的数据库有相同的映射级别。一个 MySQL 实例可以有多个数据库,每个数据库可以有多张表。

在 flink 中,当查询注册到 MySQL catalog 中的表时,用户可以使用 database.table_name 或者只指定 table_name 。默认的数据库名为创建 MySQL Catalog 时指定的默认数据库。

flink Catalog 和 MySQL Catalog 之间的元空间映射关系如下:
Flink Catalog Metaspace StructureMySQL Metaspace Structurecatalog name (只能在 flink 中定义)N/Adatabase namedatabase nametable nametable_name
flink 中的 MySQL 表的完全路径应该为:

`<catalog>`.`<db>`.`<table>`

下面是一些访问 MySQL 表的案例:

-- 浏览 'test_table' 表,默认数据库为 'mydb'SELECT*FROM mysql_catalog.mydb.test_table;SELECT*FROM mydb.test_table;SELECT*FROM test_table;-- 浏览指定数据库下的 'test_table' 表。SELECT*FROM mysql_catalog.given_database.test_table2;SELECT*FROM given_database.test_table2;
标签: flink flink sql

本文转载自: https://blog.csdn.net/u012443641/article/details/126539798
版权归原作者 第一片心意 所有, 如有侵权,请联系我们删除。

“flink所有支持的catalog详解”的评论:

还没有评论