Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
- 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
- 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
- 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
- 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
- 5、Flink 监控系列 本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
本文以示例展示了sql 和 table api 操作hivecatalog。
一、通过 Table API 和 SQL Client 操作 HiveCatalog
1、注册 Catalog
用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中。
以下通过api 和 配置文件注册catalog及配置。
1)、方式一:java实现
publicclassTestCreateHiveTable{publicstaticfinalString tableName ="alan_hivecatalog_hivedb_testTable";publicstaticfinalString hive_create_table_sql ="CREATE TABLE "+ tableName +" (\n"+" id INT,\n"+" name STRING,\n"+" age INT"+") "+"TBLPROPERTIES (\n"+" 'sink.partition-commit.delay'='5 s',\n"+" 'sink.partition-commit.trigger'='partition-time',\n"+" 'sink.partition-commit.policy.kind'='metastore,success-file'"+")";/**
* @param args
* @throws DatabaseAlreadyExistException
* @throws CatalogException
*/publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);String hiveConfDir ="/usr/local/bigdata/apache-hive-3.1.2-bin/conf";String name ="alan_hive";// default 数据库名称String defaultDatabase ="default";HiveCatalog hiveCatalog =newHiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
tenv.useCatalog("alan_hive");String newDatabaseName ="alan_hivecatalog_hivedb";
tenv.useDatabase(newDatabaseName);// 创建表
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
tenv.executeSql(hive_create_table_sql);// 插入数据String insertSQL ="insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
tenv.executeSql(insertSQL);// 查询数据String selectSQL ="select * from alan_hivecatalog_hivedb_testTable";Table table = tenv.sqlQuery(selectSQL);
table.printSchema();DataStream<Tuple2<Boolean,Row>> result = tenv.toRetractStream(table,Row.class);
result.print();
env.execute();}}
2)、方式二:yaml配置
# 定义 catalogscatalogs:-name: alan_hivecatalog
type: hive
property-version:1hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf # 须包含 hive-site.xml# 改变表程序基本的执行行为属性。execution:planner: blink # 可选: 'blink' (默认)或 'old'type: streaming # 必选:执行模式为 'batch' 或 'streaming'result-mode: table # 必选:'table' 或 'changelog'max-table-result-rows:1000000# 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)time-characteristic: event-time # 可选: 'processing-time' 或 'event-time' (默认)parallelism:1# 可选:Flink 的并行数量(默认为 1)periodic-watermarks-interval:200# 可选:周期性 watermarks 的间隔时间(默认 200 ms)max-parallelism:16# 可选:Flink 的最大并行数量(默认 128)min-idle-state-retention:0# 可选:表程序的最小空闲状态时间max-idle-state-retention:0# 可选:表程序的最大空闲状态时间current-catalog: alan_hivecatalog # 可选:当前会话 catalog 的名称(默认为 'default_catalog')current-database: viewtest_db # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库)restart-strategy:# 可选:重启策略(restart-strategy)type: fallback # 默认情况下“回退”到全局重启策略
2、修改当前的 Catalog 和数据库
Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF。
1)、java实现
代码片段,只列出了关键的代码。
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv =StreamTableEnvironment.create(env);String catalogName ="alan_hive";String defaultDatabase ="default";String databaseName ="viewtest_db";String hiveConfDir ="/usr/local/bigdata/apache-hive-3.1.2-bin/conf";HiveCatalog hiveCatalog =newHiveCatalog(catalogName, defaultDatabase, hiveConfDir);
tenv.registerCatalog(catalogName, hiveCatalog);
tenv.useCatalog(catalogName);
hiveCatalog.createDatabase(databaseName,newCatalogDatabaseImpl(newHashMap(), hiveConfDir){},true);// tenv.executeSql("create database "+databaseName);
tenv.useDatabase(databaseName);
2)、sql
Flink SQL>USE CATALOG alan_hive;
Flink SQL>USE viewtest_db;
通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息。
- java
tenv.from("not_the_current_catalog.not_the_current_db.my_table");
- sql
Flink SQL>SELECT*FROM not_the_current_catalog.not_the_current_db.my_table;
3、列出可用的 Catalog
1)、java实现
tenv.listCatalogs();
2)、sql
show catalogs;
4、列出可用的数据库
1)、java实现
tenv.listDatabases();
2)、sql
showdatabases;
5、列出可用的表
1)、java实现
tenv.listTables();
2)、sql
showtables;
以上,本文以示例展示了sql 和 table api 操作hivecatalog。
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。