0


【flink番外篇】22、通过 Table API 和 SQL Client 操作 Catalog 示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文以示例展示了sql 和 table api 操作hivecatalog。

一、通过 Table API 和 SQL Client 操作 HiveCatalog

1、注册 Catalog

用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中。

以下通过api 和 配置文件注册catalog及配置。

1)、方式一:java实现

publicclassTestCreateHiveTable{publicstaticfinalString tableName ="alan_hivecatalog_hivedb_testTable";publicstaticfinalString hive_create_table_sql ="CREATE  TABLE  "+ tableName +" (\n"+"  id INT,\n"+"  name STRING,\n"+"  age INT"+") "+"TBLPROPERTIES (\n"+"  'sink.partition-commit.delay'='5 s',\n"+"  'sink.partition-commit.trigger'='partition-time',\n"+"  'sink.partition-commit.policy.kind'='metastore,success-file'"+")";/**
     * @param args
     * @throws DatabaseAlreadyExistException
     * @throws CatalogException
     */publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);String hiveConfDir ="/usr/local/bigdata/apache-hive-3.1.2-bin/conf";String name ="alan_hive";// default 数据库名称String defaultDatabase ="default";HiveCatalog hiveCatalog =newHiveCatalog(name, defaultDatabase, hiveConfDir);
        tenv.registerCatalog("alan_hive", hiveCatalog);
        tenv.useCatalog("alan_hive");String newDatabaseName ="alan_hivecatalog_hivedb";
        tenv.useDatabase(newDatabaseName);// 创建表
        tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tenv.executeSql(hive_create_table_sql);// 插入数据String insertSQL ="insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
        tenv.executeSql(insertSQL);// 查询数据String selectSQL ="select * from alan_hivecatalog_hivedb_testTable";Table table = tenv.sqlQuery(selectSQL);
        table.printSchema();DataStream<Tuple2<Boolean,Row>> result = tenv.toRetractStream(table,Row.class);
        result.print();
        env.execute();}}

2)、方式二:yaml配置

# 定义 catalogscatalogs:-name: alan_hivecatalog
     type: hive
     property-version:1hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf  # 须包含 hive-site.xml# 改变表程序基本的执行行为属性。execution:planner: blink                            # 可选: 'blink' (默认)或 'old'type: streaming                           # 必选:执行模式为 'batch' 或 'streaming'result-mode: table                        # 必选:'table' 或 'changelog'max-table-result-rows:1000000# 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)time-characteristic: event-time           # 可选: 'processing-time' 或 'event-time' (默认)parallelism:1# 可选:Flink 的并行数量(默认为 1)periodic-watermarks-interval:200# 可选:周期性 watermarks 的间隔时间(默认 200 ms)max-parallelism:16# 可选:Flink 的最大并行数量(默认 128)min-idle-state-retention:0# 可选:表程序的最小空闲状态时间max-idle-state-retention:0# 可选:表程序的最大空闲状态时间current-catalog: alan_hivecatalog         # 可选:当前会话 catalog 的名称(默认为 'default_catalog')current-database: viewtest_db # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库)restart-strategy:# 可选:重启策略(restart-strategy)type: fallback                          # 默认情况下“回退”到全局重启策略

2、修改当前的 Catalog 和数据库

Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF。

1)、java实现

代码片段,只列出了关键的代码。

StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);String catalogName ="alan_hive";String defaultDatabase ="default";String databaseName ="viewtest_db";String hiveConfDir ="/usr/local/bigdata/apache-hive-3.1.2-bin/conf";HiveCatalog hiveCatalog =newHiveCatalog(catalogName, defaultDatabase, hiveConfDir);
tenv.registerCatalog(catalogName, hiveCatalog);
tenv.useCatalog(catalogName);

hiveCatalog.createDatabase(databaseName,newCatalogDatabaseImpl(newHashMap(), hiveConfDir){},true);//        tenv.executeSql("create database "+databaseName);
tenv.useDatabase(databaseName);

2)、sql


Flink SQL>USE CATALOG alan_hive;
Flink SQL>USE viewtest_db;

通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息。

  • java

tenv.from("not_the_current_catalog.not_the_current_db.my_table");
  • sql

Flink SQL>SELECT*FROM not_the_current_catalog.not_the_current_db.my_table;

3、列出可用的 Catalog

1)、java实现


tenv.listCatalogs();

2)、sql

show catalogs;

4、列出可用的数据库

1)、java实现


tenv.listDatabases();

2)、sql

showdatabases;

5、列出可用的表

1)、java实现


tenv.listTables();

2)、sql

showtables;

以上,本文以示例展示了sql 和 table api 操作hivecatalog。

标签: flink 数据库 kafka

本文转载自: https://blog.csdn.net/chenwewi520feng/article/details/135459429
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。

“【flink番外篇】22、通过 Table API 和 SQL Client 操作 Catalog 示例”的评论:

还没有评论