1、以flinksql的方式直接提交任务
此方式使用起来相对比较简单,但是无法满足需要设置savepoint暂存点的流式任务需求。
使用此方式需要先创建Flink远方的执行环境,然后按序执行FlinkSql,流程如下:
java示例如下:
package com.xw.flink;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
public class testSqlServer {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("192.168.1.88",18082);
TableEnvironment tableEnv = TableEnvironment.create(env.getConfiguration());
Configuration configuration = tableEnv.getConfig().getConfiguration();
//任务名称设定
configuration.setString("pipeline.name","sqlserver");
String sourceDDL = "CREATE TABLE Orders (f1 STRING,f2 STRING,f3 STRING) WITH ( " +
" 'connector' = 'jdbc', " +
" 'driver'='com.microsoft.sqlserver.jdbc.SQLServerDriver', " +
" 'url'='jdbc:sqlserver://192.168.1.40:1433;databaseName=test;useLOBs=false', " +
" 'table-name'='test_czd1', " +
" 'username'='root', " +
" 'password'='root'" +
")";
tableEnv.executeSql(sourceDDL);
String rtLog = "CREATE TABLE logs (node_id STRING, send_count BIGINT,PRIMARY KEY(node_id) NOT ENFORCED) WITH ( " +
" 'connector' = 'jdbc', " +
" 'driver'='com.mysql.cj.jdbc.Driver', " +
" 'url'='jdbc:mysql://192.168.0.68:3306/testDB?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=GMT%2B8', " +
" 'table-name'='rt_log', " +
" 'username'='root', " +
" 'password'='root'," +
" 'sink.buffer-flush.max-rows' = '20000'," +
" 'sink.buffer-flush.interval' = '3000'" +
")";
tableEnv.executeSql(rtLog);
String sql = "insert into logs(node_id) select f1 from Orders limit 5";
tableEnv.executeSql(sql);
}
}
2、以任务jar的方式上传任务
此方式主要通过用java编写一个任务,然后打成jar的形式上传到flink集群。此方式比较灵活,可以精确控制任务的算子。但是对于现场的运维来说是一个比较困难的问题,因为要求运维人员需要有代码开发的能力。
java实现示例:
public class testSqlServer {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(env.getConfiguration());
Configuration configuration = tableEnv.getConfig().getConfiguration();
//任务名称设定
configuration.setString("pipeline.name","sqlserver");
//todo 此部分可以是Flink-table 也可以是Flinksql
}
}
然后将其打成jar包,然后上传到flink
填写class的全路径和并行度即可执行。
3、以Rest API方式进行提交
此方式综合了flinksql和flinkjar的两种形式,你也以在远方编写flinksql,然后通过调用API的形式将FlinkSql和参数发送到flink集群上可执行的jar。jar拿到参数组装flink任务并提交。
Rest API官网可参考REST API | Apache Flink
java编写一个接受参数的jar,模版可参考
public class SqlTemplate {
public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);//获取传递的参数
String arg = parameters.get("arg",null);
if(arg == null){
return ;
}
arg = URLDecoder.decode(arg, StandardCharsets.UTF_8.toString());//URLDecoder解码
String[] programArgs = arg.split("\\|\\|");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置重启策略,最多重启三次,每次间隔5秒钟
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(5)
));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Configuration configuration = tableEnv.getConfig().getConfiguration();
//任务名称设定
configuration.setString("pipeline.name",programArgs[0]);
// 任务并行度设定
env.setParallelism(Integer.parseInt(programArgs[1]));
//任务类型,流式任务强制开启checkpoint
if("stream".equals(programArgs[2])){
//检查点设定
if(!StringUtils.isNullOrWhitespaceOnly(programArgs[3])){
CheckPoint cp = JSON.parseObject(programArgs[3],CheckPoint.class);
//开启检查点
if(cp.getEnable()){
//开启检查点,1S一次
env.enableCheckpointing(cp.getCheckpointInterval());
//检查点策略 EXACTLY_ONCE 精准一次 AT_LEAST_ONCE至少一次
env.getCheckpointConfig().setCheckpointingMode(cp.getCheckPointingMode()==1?CheckpointingMode.EXACTLY_ONCE:CheckpointingMode.AT_LEAST_ONCE);
Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(cp.getCheckpointTimeout());
同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(cp.getMaxConcurrentCheckpoints());
//设置检查点保存位置
env.getCheckpointConfig().setCheckpointStorage(cp.getCheckpointDirectory());
//开启实验性的 unaligned checkpoints
if(cp.getUnalignedCheckpointsEnabled()){
env.getCheckpointConfig().enableUnalignedCheckpoints();
}
}
}else{//开启默认配置
//开启检查点,5S一次
env.enableCheckpointing(5000);
//检查点策略 EXACTLY_ONCE 精准一次 AT_LEAST_ONCE至少一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Checkpoint 必须在五分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(300000);
同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
}
}
//可执行的SQL单节点执行
String sql = programArgs[4];
//特殊符号在连接器里都会被使用,采用双特殊符号进行分割
String[] sqlExecu = sql.split(";;");
List<String> create = new ArrayList<>();
List<String> insert = new ArrayList<>();
for (String script : sqlExecu) {
if(!script.startsWith("insert") && !script.startsWith("INSERT")){
create.add(script);
}else{
insert.add(script);
}
}
//可执行的SQL单节点执行
create.forEach(tableEnv::executeSql);
// 运行多条 INSERT 语句,将原表数据输出到多个结果表中
StatementSet stmtSet = tableEnv.createStatementSet();
insert.forEach(stmtSet::addInsertSql);
//开始执行任务
TableResult execute = stmtSet.execute();
}
版权归原作者 开心编码 所有, 如有侵权,请联系我们删除。