文章目录
一、前言
本文基于flink-1.13.6
SQL Client: Init scripts and Statement Sets
这个版本极大地改进了 SQL 客户端的功能。现在 SQL Client 和 SQL 脚本都支持 通过Java 应用程序执行的几乎所有操作(从 TableEnvironment 以编程方式启动查询)。这意味着 SQL 用户在 SQL 部署中需要的代码少了很多。其中最核心的功能就是支持了
-i
命令用来初始化脚本,
-f
命令用来执行 SQL 语句,之前的 YAML 文件这个版本不再支持了,相反更多的是通过 SQL 脚本的方式来配置会话和提交任务.
类似于下面这种方式:
sql-client.sh -i init.sql-f test.sql
1.1、 -i 初始化 SQL Client
SET execution.runtime-mode=batch;
SET sql-client.execution.result-mode=TABLEAU;
SET pipeline.name=batch_demo
init.sql 初始化脚本文件支持的功能还非常多,我这里就简单的设置了几个,更多的属性可以参考官网.
使用 -i <init.sql> 选项初始化 SQL Client 会话时,初始化 SQL 文件中允许以下语句:
DDL(CREATE/DROP/ALTER),USE CATALOG/DATABASE,LOAD/UNLOAD MODULE,SET command,
RESET command.
1.2、-f SQL脚本
create table rate_history (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time
) WITH (
'connector.type' = 'filesystem',
'connector.path' = '/tmp/ratesHistory.csv',
'format.type' = 'csv'
);
CREATE TABLE printb
(
num bigint
)
WITH ('connector' = 'print');
-- 两条sql语句
insert into printb select count(1) from rate_history;
insert into printb select count(1) from rate_history;
执行:
./bin/sql-client.sh -i test/init.sql -f test/batch.sql
查看flink web 页面发现两个job
SQL Client 将每个 INSERT INTO 语句作为单个 Flink 作业执行。但是,由于管道的某些部分可以重复使用,因此有时不是最佳选择。
SQL Client 支持 STATEMENT SET 语法来执行一组 SQL 语句。这是 Table API 中StatementSet 的等效功能。STATEMENT SET 语法包含一个或多个 INSERT INTO 语句。全面优化了STATEMENT SET 块中的所有语句,并将其作为单个 Flink 作业执行。联合优化和执行允许重用常见的中间结果,因此可以显着提高执行多个查询的效率。
STATEMENT SET 的语法格式如下:
BEGIN STATEMENT SET;-- one or more INSERT INTO statements
{ INSERTINTO|OVERWRITE <select_statement>; }+END;-- 修改上面的sql脚本-- 两条sql语句BEGIN STATEMENT SET;insertinto printb selectcount(1)from rate_history;insertinto printb selectcount(1)from rate_history;END;
.接下来就来看一下底层源码是怎么实现的.
二、源码分析
2.1、从
sql-client.sh
找到执行的入口类是
org.apache.flink.table.client.SqlClient
然后来看下 SqlClient 对象属性源码如下:
publicclassSqlClient{// 标记是否是 embedded 模式privatefinalboolean isEmbedded;// 提交命令选项privatefinalCliOptions options;// 用来返回结果的privatefinalSupplier<Terminal> terminalFactory;// 目前只支持 embeddedpublicstaticfinalString MODE_EMBEDDED ="embedded";publicstaticfinalString MODE_GATEWAY ="gateway";// ...}
2.2、接着来看 SqlClient 的 main 方法,也就是程序的入口
main 方法里面调用的是 startClient 方法,所以直接来看 startClient 方法的源码:
@VisibleForTestingprotectedstaticvoidstartClient(String[] args,Supplier<Terminal> terminalFactory){finalString mode;finalString[] modeArgs;// 设置启动模式默认是 embeddedif(args.length <1|| args[0].startsWith("-")){// mode is not specified, use the default `embedded` mode
mode = MODE_EMBEDDED;
modeArgs = args;}else{// mode is specified, extract the mode value and reaming args
mode = args[0];// remove mode
modeArgs =Arrays.copyOfRange(args,1, args.length);}switch(mode){case MODE_EMBEDDED:// 解析提交命令里的参数finalCliOptions options =CliOptionsParser.parseEmbeddedModeClient(modeArgs);// 打印参数说明if(options.isPrintHelp()){CliOptionsParser.printHelpEmbeddedModeClient();}else{try{// 构建 SqlClient 对象finalSqlClient client =newSqlClient(true, options, terminalFactory);
client.start();}catch(SqlClientException e){//...}}break;case MODE_GATEWAY:// gateway 模式暂时不支持thrownewSqlClientException("Gateway mode is not supported yet.");default:CliOptionsParser.printHelpClient();}}
2.2.1、解析参数
调用 parseEmbeddedModeClient 方法解析提交命令里面的各种参数.包括我们上面用到的 -i 和 -f 都是在这一步解析并赋值的.
publicstaticCliOptionsparseEmbeddedModeClient(String[] args){try{DefaultParser parser =newDefaultParser();CommandLine line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args,true);returnnewCliOptions(
line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()),checkSessionId(line),// 解析 -i 初始化文件checkUrl(line,CliOptionsParser.OPTION_INIT_FILE),// 解析 -f sql脚本checkUrl(line,CliOptionsParser.OPTION_FILE),checkUrls(line,CliOptionsParser.OPTION_JAR),checkUrls(line,CliOptionsParser.OPTION_LIBRARY),
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),getPythonConfiguration(line));}catch(ParseException e){thrownewSqlClientException(e.getMessage());}}publicstaticfinalOption OPTION_INIT_FILE =Option.builder("i").required(false).longOpt("init").numberOfArgs(1).argName("initialization file").desc("Script file that used to init the session context. "+"If get error in execution, the sql client will exit. Notice it's not allowed to add query or insert into the init file.").build();publicstaticfinalOption OPTION_FILE =Option.builder("f").required(false).longOpt("file").numberOfArgs(1).argName("script file").desc("Script file that should be executed. In this mode, "+"the client will not open an interactive terminal.").build();
2.2.2、构建 SqlClient
finalSqlClient client =newSqlClient(true, options, terminalFactory);
2.2.3、启动 SqlClient
client.start();privatevoidstart(){if(isEmbedded){// create local executor with default environmentDefaultContext defaultContext =LocalContextUtils.buildDefaultContext(options);// 创建一个 LocalExecutor 对象,用于本地执行程序finalExecutor executor =newLocalExecutor(defaultContext);
executor.start();// Open an new sessionString sessionId = executor.openSession(options.getSessionId());try{// add shutdown hookRuntime.getRuntime().addShutdownHook(newEmbeddedShutdownThread(sessionId, executor));// do the actual work 真正执行 SQL 的地方openCli(sessionId, executor);}finally{
executor.closeSession(sessionId);}}else{thrownewSqlClientException("Gateway mode is not supported yet.");}}
2.2.4、真正执行 SQL 的地方是 openCli 方法
/**
* Opens the CLI client for executing SQL statements.
*
* @param sessionId session identifier for the current client.
* @param executor executor
*/privatevoidopenCli(String sessionId,Executor executor){Path historyFilePath;if(options.getHistoryFilePath()!=null){
historyFilePath =Paths.get(options.getHistoryFilePath());}else{
historyFilePath =Paths.get(System.getProperty("user.home"),SystemUtils.IS_OS_WINDOWS ?"flink-sql-history":".flink-sql-history");}boolean hasSqlFile = options.getSqlFile()!=null;boolean hasUpdateStatement = options.getUpdateStatement()!=null;if(hasSqlFile && hasUpdateStatement){thrownewIllegalArgumentException(//...}try(CliClient cli =newCliClient(terminalFactory, sessionId, executor, historyFilePath)){// 执行初始化 SQL -i 参数if(options.getInitFile()!=null){boolean success = cli.executeInitialization(readFromURL(options.getInitFile()));if(!success){// ...}}if(!hasSqlFile &&!hasUpdateStatement){
cli.executeInInteractiveMode();}else{// 执行真正的 SQL 文件 -f
cli.executeInNonInteractiveMode(readExecutionContent());}}}
这个里面会先获取 historyFilePath 的路径,然后判断是否存在 -i -f 这两个文件,如果有的话会先调用 executeInitialization 执行初始化的脚本.实际调用的是 executeInitialization#executeFile 方法来执行脚本,executeFile 的源码如下:
privatebooleanexecuteFile(String content,ExecutionMode mode){
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EXECUTE_FILE).toAnsi());for(String statement :CliStatementSplitter.splitContent(content)){
terminal.writer().println(newAttributedString(String.format("%s%s", prompt, statement)).toString());
terminal.flush();// 执行 if(!executeStatement(statement, mode)){// cancel execution when meet error or ctrl + C;returnfalse;}}returntrue;}
其实不管是 -i 还是 -f 最终都会调用 executeFile 这个方法去解析脚本里的内容并且执行,这里方法里面先调用 splitContent 方法去做解析.
publicstaticList<String>splitContent(String content){List<String> statements =newArrayList<>();List<String> buffer =newArrayList<>();for(String line : content.split("\n")){if(isEndOfStatement(line)){
buffer.add(line);
statements.add(String.join("\n", buffer));
buffer.clear();}else{
buffer.add(line);}}if(!buffer.isEmpty()){
statements.add(String.join("\n", buffer));}return statements;}privatestaticbooleanisEndOfStatement(String line){return line.replaceAll(MASK,"").trim().endsWith(";");}
其实就是一行一行的读取初始化脚本和 SQL 脚本里面的内容,然后放到一个 List 里面.然后循环这个 List 调用 executeStatement 方法去执行 SQL 脚本.
// 执行 SQL 脚本.privatebooleanexecuteStatement(String statement,ExecutionMode executionMode){try{finalOptional<Operation> operation =parseCommand(statement);
operation.ifPresent(op ->callOperation(op, executionMode));}catch(SqlExecutionException e){printExecutionException(e);returnfalse;}returntrue;}
执行之前会先对 SQL 做一个清洗,具体逻辑在 parseCommand 方法中.
// 其实就是把 SQL 后面的 ; 去掉,并在遇到 bad case 的时候返回空.然后调用 parseStatement 方法将 SQL 语句解析成 Operation,后面的过程就跟 Flink SQL 翻译成代码的过程差不多.就不在往后面跟了.privateOptional<Operation>parseCommand(String stmt){// normalize
stmt = stmt.trim();// remove ';' at the endif(stmt.endsWith(";")){
stmt = stmt.substring(0, stmt.length()-1).trim();}// meet bad case, e.g ";\n"if(stmt.trim().isEmpty()){returnOptional.empty();}Operation operation = executor.parseStatement(sessionId, stmt);returnOptional.of(operation);}
-f 参数调用的是 executeInNonInteractiveMode 方法,实际也会调用 executeFile 方法,跟 -i 的执行逻辑是一样的.这里就不再分析了.
另外当前的 SQL Client 仅支持嵌入式模式(也就是 embedded 模式)。将来,社区计划通过提供基于 REST 的SQL 客户端网关来扩展其功能,有关更多信息,请参见 FLIP-24 和 FLIP-91。
版权归原作者 宝哥大数据 所有, 如有侵权,请联系我们删除。