0


flink的常见的任务提交方式

1、以flinksql的方式直接提交任务

此方式使用起来相对比较简单,但是无法满足需要设置savepoint暂存点的流式任务需求。

使用此方式需要先创建Flink远方的执行环境,然后按序执行FlinkSql,流程如下:

java示例如下:

package com.xw.flink;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;

public class testSqlServer {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("192.168.1.88",18082);
        TableEnvironment tableEnv = TableEnvironment.create(env.getConfiguration());
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //任务名称设定
        configuration.setString("pipeline.name","sqlserver");
        String sourceDDL = "CREATE TABLE Orders (f1 STRING,f2 STRING,f3 STRING) WITH ( " +
                " 'connector' = 'jdbc',  " +
                " 'driver'='com.microsoft.sqlserver.jdbc.SQLServerDriver',  " +
                " 'url'='jdbc:sqlserver://192.168.1.40:1433;databaseName=test;useLOBs=false',  " +
                " 'table-name'='test_czd1',  " +
                " 'username'='root',  " +
                " 'password'='root'" +
                ")";
        tableEnv.executeSql(sourceDDL);
        String rtLog = "CREATE TABLE logs (node_id STRING, send_count BIGINT,PRIMARY KEY(node_id) NOT ENFORCED) WITH ( " +
                " 'connector' = 'jdbc',  " +
                " 'driver'='com.mysql.cj.jdbc.Driver',  " +
                " 'url'='jdbc:mysql://192.168.0.68:3306/testDB?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=GMT%2B8',  " +
                " 'table-name'='rt_log',  " +
                " 'username'='root',  " +
                " 'password'='root'," +
                " 'sink.buffer-flush.max-rows' = '20000'," +
                " 'sink.buffer-flush.interval' = '3000'" +
                ")";
        tableEnv.executeSql(rtLog);
        String sql = "insert into logs(node_id) select f1 from Orders limit 5";
        tableEnv.executeSql(sql);
    }
}

2、以任务jar的方式上传任务

此方式主要通过用java编写一个任务,然后打成jar的形式上传到flink集群。此方式比较灵活,可以精确控制任务的算子。但是对于现场的运维来说是一个比较困难的问题,因为要求运维人员需要有代码开发的能力。

java实现示例:

public class testSqlServer {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        TableEnvironment tableEnv = TableEnvironment.create(env.getConfiguration());
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //任务名称设定
        configuration.setString("pipeline.name","sqlserver");
        //todo 此部分可以是Flink-table  也可以是Flinksql
      
    }
}

然后将其打成jar包,然后上传到flink

填写class的全路径和并行度即可执行。

3、以Rest API方式进行提交

此方式综合了flinksql和flinkjar的两种形式,你也以在远方编写flinksql,然后通过调用API的形式将FlinkSql和参数发送到flink集群上可执行的jar。jar拿到参数组装flink任务并提交。

Rest API官网可参考REST API | Apache Flink

java编写一个接受参数的jar,模版可参考

public class SqlTemplate {
    public static void main(String[] args) throws Exception {
        ParameterTool parameters = ParameterTool.fromArgs(args);//获取传递的参数
        String arg = parameters.get("arg",null);
        if(arg == null){
            return ;
        }
        arg = URLDecoder.decode(arg, StandardCharsets.UTF_8.toString());//URLDecoder解码
        String[] programArgs =  arg.split("\\|\\|");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置重启策略,最多重启三次,每次间隔5秒钟
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,                    Time.seconds(5)
        ));
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //任务名称设定
        configuration.setString("pipeline.name",programArgs[0]);
        // 任务并行度设定
        env.setParallelism(Integer.parseInt(programArgs[1]));
        //任务类型,流式任务强制开启checkpoint
        if("stream".equals(programArgs[2])){
            //检查点设定
            if(!StringUtils.isNullOrWhitespaceOnly(programArgs[3])){
                CheckPoint cp = JSON.parseObject(programArgs[3],CheckPoint.class);
                //开启检查点
                if(cp.getEnable()){
                    //开启检查点,1S一次
                    env.enableCheckpointing(cp.getCheckpointInterval());
                    //检查点策略 EXACTLY_ONCE 精准一次  AT_LEAST_ONCE至少一次
                    env.getCheckpointConfig().setCheckpointingMode(cp.getCheckPointingMode()==1?CheckpointingMode.EXACTLY_ONCE:CheckpointingMode.AT_LEAST_ONCE);
                     Checkpoint 必须在一分钟内完成,否则就会被抛弃
                    env.getCheckpointConfig().setCheckpointTimeout(cp.getCheckpointTimeout());
                     同一时间只允许一个 checkpoint 进行
                    env.getCheckpointConfig().setMaxConcurrentCheckpoints(cp.getMaxConcurrentCheckpoints());
                    //设置检查点保存位置
                    env.getCheckpointConfig().setCheckpointStorage(cp.getCheckpointDirectory());
                    //开启实验性的 unaligned checkpoints
                    if(cp.getUnalignedCheckpointsEnabled()){
                        env.getCheckpointConfig().enableUnalignedCheckpoints();
                    }
                }
            }else{//开启默认配置
                //开启检查点,5S一次
                env.enableCheckpointing(5000);
                //检查点策略 EXACTLY_ONCE 精准一次  AT_LEAST_ONCE至少一次
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                 Checkpoint 必须在五分钟内完成,否则就会被抛弃
                env.getCheckpointConfig().setCheckpointTimeout(300000);
                 同一时间只允许一个 checkpoint 进行
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
                //开启实验性的 unaligned checkpoints
                env.getCheckpointConfig().enableUnalignedCheckpoints();
            }
        }
        //可执行的SQL单节点执行
        String sql = programArgs[4];
        //特殊符号在连接器里都会被使用,采用双特殊符号进行分割
        String[] sqlExecu = sql.split(";;");
        List<String> create = new ArrayList<>();
        List<String> insert = new ArrayList<>();
        for (String script : sqlExecu) {
            if(!script.startsWith("insert") && !script.startsWith("INSERT")){
                create.add(script);
            }else{
                insert.add(script);
            }
        }
        //可执行的SQL单节点执行
        create.forEach(tableEnv::executeSql);
        // 运行多条 INSERT 语句,将原表数据输出到多个结果表中
        StatementSet stmtSet = tableEnv.createStatementSet();
        insert.forEach(stmtSet::addInsertSql);
        //开始执行任务
        TableResult execute = stmtSet.execute();
     }
标签: flink 大数据

本文转载自: https://blog.csdn.net/zhizhi120/article/details/135859287
版权归原作者 开心编码 所有, 如有侵权,请联系我们删除。

“flink的常见的任务提交方式”的评论:

还没有评论