4、Flink SQl 客户端
1、启动一个flink的 集群
- 可以使用flink独立集群
- 也可以使用yarn-session.sh
# 启动一个flinkyarn-sesion集群
yarn-sesion.sh -d
2、启动sql-client
sql-client.sh
3、测试命令行
-- 创建source表
CREATE TABLE datagen (
id STRING,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5', -- 每秒生成的数据行数据
'fields.id.length' = '5', --字段长度限制
'fields.name.length'='3',
'fields.age.min' ='1', -- 最小值
'fields.age.max'='100' -- 最大值
)
-- 执行sql
-- 可以直接在命令行中查看直接结果
-- 会实时打印结果
select age,count(1) as c from datagen group by age;
-- 输出结果模式
SET 'sql-client.execution.result-mode' = 'table';
SET 'sql-client.execution.result-mode' = 'changelog';
SET 'sql-client.execution.result-mode' = 'tableau';
-- 在flinksql中执行insert into
CREATE TABLE age_num_mysql (
age INT,
num BIGINT,
PRIMARY KEY (age) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'age_num', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
)
-- 在数据库中创建表
CREATE TABLE `age_num` (
`age` int NOT NULL,
`num` bigint(20) DEFAULT NULL,
PRIMARY KEY (`age`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 插入数据
-- insert into 的语句会提交到flink的集群中运行,和本地客户端就没有关系了
insert into age_num_mysql
select age,count(1) as num from datagen group by age;
执行:select age,count(1) as c from datagen group by age;
结果如下
执行:insert into age_num_mysql
select age,count(1) as num from datagen group by age;效果如下
4、sql-client.sh -i
-- 可以将通用的sql放在一个初始的sql文件中
--文件中可以写多个sql,
vim sql-client.sql
CREATE CATALOG mysql_catalog WITH(
'type' = 'jdbc',
'default-database' = 'bigdata17',
'username' = 'root',
'password' = '123456',
'base-url' = 'jdbc:mysql://master:3306'
);
use catalog mysql_catalog;
-- 启动sql-client
sql-client.sh -i sql-client.sql
5、sql-client.sh -f
可以直接执行一个sql文件
-- 创建一个sql文件
vim age_num.sql
-- source 表
CREATE TABLE datagen (
id STRING,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5', -- 每秒生成的数据行数据
'fields.id.length' = '5', --字段长度限制
'fields.name.length'='3',
'fields.age.min' ='1', -- 最小值
'fields.age.max'='100' -- 最大值
);
-- 多个sql使用分号分隔
-- sink表
CREATE TABLE age_num_mysql (
age INT,
num BIGINT,
PRIMARY KEY (age) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'age_num', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
);
-- 插入数据
insert into age_num_mysql
select age,count(1) as num from datagen group by age;
-- 启动
sql-client.sh -f age_num.sql
-- 效果和刚才的分步骤操作差不多,只不过这次是通过一个sql脚本文件一起操作
5、Catalog
catalog ---> database ---> table ---> 字段
catalog是flink用于保存元数据的一种机制
元数据(表结构)
1、GenericInMemoryCatalog(默认)
基于内存的catalog,元数据只在当前会话中起作用
也是flink默认的catalog
2、Jdbc Catalog 整库同步
只能在flink中直接读写数据库中的表
不能在JdbcCatalog中创建flink的表
-- 创建jdbc catalog
CREATE CATALOG mysql_catalog WITH(
'type' = 'jdbc',
'default-database' = 'bigdata',
'username' = 'root',
'password' = '123456',
'base-url' = 'jdbc:mysql://master:3306'
);
-- 查看当前所有的catalog;
show catalogs;
-- 切换catalog
use catalog mysql_catalog;
3、hive catalog(重点)
hive catalog 可以用于flink读取hvie中的表,可以用于在hive元数据中保存flink的
- 配置Hadoop classpath
vim /etc/profile# 放在PAth的后面export HADOOP_CLASSPATH=`hadoop classpath`source /etc/profile
- 将flink-sql-connector-hive-1.2.2_2.12-1.15.0.jar依赖包上传到flink的lib目录下
flink-sql-connector-hive-1.2.2_2.12-1.15.0.jar
- 将mysql驱动放在flink lib目录下
mysql-connector-java-5.1.49.jar
- 删除flink lib目录下flink-table-planner-loader-1.15.0.jar包
flink-table-planner-loader-1.15.0.jar
- 将flink opt目录flink-table-planner_2.12-1.15.0.jar包复制到flink的 lib目录下
cp /usr/local/soft/flink-1.15.0/opt/flink-table-planner_2.12-1.15.0.jar /usr/local/soft/flink-1.15.0/lib/
- 启动hive的元数据服务
nohup hive --service metastore >> metastore.log 2>&1 &
- 重启yarn-session集群
# 查看yarn中正在运行的任务yarn application -list# 关闭yarn-sessionyarn application -kill application_1659099426082_0003# 启动yarn-sessionyarn-session.sh -d
- 在sql-client中创建hive 的catalog
-- 进入sql客户端
sql-client.sh
-- 创建hive catalog
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'default-database' = 'bigdata17',
'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hive_catalog;
- 在flink中可以直接查询hive的表
select * from students3;
将 hadoop-mapreduce-client-core-2.7.6.jar 放到 flink/lib 的目录下,然后kill掉已经启动的yarn任务,重新生成任务,再次执行结果如下:
- 在flink中创建表,表的元数据可以保存到hive的元数据中> flink将表的元数据保存在hive的元数据中,在hive中可以看到flink的表,但是不能对flink的表进行查询> > flink的元数据保存在hive中,元数据不会丢失
CREATE TABLE datagen ( id STRING, name STRING, age INT) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', -- 每秒生成的数据行数据 'fields.id.length' = '5', --字段长度限制 'fields.name.length'='3', 'fields.age.min' ='1', -- 最小值 'fields.age.max'='100' -- 最大值);
4、使用hive的方言
spark sql默认就监控hive语法的
-- 指定hive方言
-- 开启hive的方言之后就不能使用flink自己的语法了
set table.sql-dialect=hive;
--默认方言
set table.sql-dialect=default;
5、使用hive的函数
-- 加载hive模块,在fink中就可以直接使用hive的函数了
LOAD MODULE hive WITH ('hive-version' = '1.2.1');
-- 查看所有的模块
SHOW MODULES;
flink 中不支持hive中的很多语法比如split
加载hive模块后,就能使用了如下:
版权归原作者 a-tao必须奥利给 所有, 如有侵权,请联系我们删除。