0


Seatunnel-2.3.0源码解析

一、概述

SeaTunnel是一个简单易用的数据集成框架,在企业中,由于开发时间或开发部门不通用,往往有多个异构的、运行在不同的软硬件平台上的信息系统同时运行。数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,从而为企业提供全面的数据共享。SeaTunnel支持海量数据的实时同步。它每天可以稳定高效地同步数百亿数据。

SeaTunnel适用于以下场景

SeaTunnel的特点

  • 海量数据的同步

  • 海量数据的集成

  • 海量数据的ETL

  • 海量数据聚合

  • 多源数据处理

  • 基于配置的低代码开发,易用性高,方便维护。

  • 支持实时流式传输

  • 离线多源数据分析

  • 高性能、海量数据处理能力

  • 模块化的插件架构,易于扩展

  • 支持用SQL进行数据操作和数据聚合

  • 支持Spark structured streaming

  • 支持Spark 2.x

二、工作原理

   以官方案例为例, 通过使用bin/start-seatunnel-flink.sh脚本来提交Flink任务,脚本内容如下:
set -eu
# resolve links - $0 may be a softlink
PRG="$0"

while [ -h "$PRG" ] ; do
  # shellcheck disable=SC2006
  ls=`ls -ld "$PRG"`
  # shellcheck disable=SC2006
  link=`expr "$ls" : '.*-> \(.*\)$'`
  if expr "$link" : '/.*' > /dev/null; then
    PRG="$link"
  else
    # shellcheck disable=SC2006
    PRG=`dirname "$PRG"`/"$link"
  fi
done

PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/starter/seatunnel-flink-13-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter"

if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
    . "${CONF_DIR}/seatunnel-env.sh"
fi

if [ $# == 0 ]
then
    args="-h"
else
    args=$@
fi

set +u
# Log4j2 Config
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
  JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-flink-starter"
fi

CLASS_PATH=${APP_DIR}/starter/logging/*:${APP_JAR}

CMD=$(java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
if [ ${EXIT_CODE} -eq 234 ]; then
    # print usage
    echo "${CMD}"
    exit 0
elif [ ${EXIT_CODE} -eq 0 ]; then
    echo "Execute SeaTunnel Flink Job: $(echo "${CMD}" | tail -n 1)"
    eval $(echo "${CMD}" | tail -n 1)
else
    echo "${CMD}"
    exit ${EXIT_CODE}
fi                                                                       
     其中,比较重要的两条命令为:

APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter"

CMD=$(java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?

     SeaTunnel通过脚本去执行了seatunnel-core-flink.jar并且入口类为org.apache.seatunnel.core.flink.FlinkStarter,我们接下来移步源码来看这个FlinkStarter类。
public class FlinkStarter implements Starter {

    private static final String APP_NAME = SeatunnelFlink.class.getName();
    public static final String APP_JAR_NAME = "seatunnel-flink-starter.jar";
    /**
     * SeaTunnel parameters, used by SeaTunnel application. e.g. `-c config.conf`
     */
    private final FlinkCommandArgs flinkCommandArgs;
    /**
     * SeaTunnel flink job jar.
     */
    private final String appJar;

    FlinkStarter(String[] args) {
        this.flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), StarterConstant.SHELL_NAME, true);
        // set the deployment mode, used to get the job jar path.
        Common.setDeployMode(flinkCommandArgs.getDeployMode());
        Common.setStarter(true);
        this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
    }

    @SuppressWarnings("checkstyle:RegexpSingleline")
    public static void main(String[] args) {
        FlinkStarter flinkStarter = new FlinkStarter(args);
        System.out.println(String.join(" ", flinkStarter.buildCommands()));
    }
    @Override
    public List<String> buildCommands() {
        List<String> command = new ArrayList<>();
        command.add("${FLINK_HOME}/bin/flink");
        command.add(flinkCommandArgs.getRunMode().getMode());
        command.addAll(flinkCommandArgs.getOriginalParameters());
        command.add("-c");
        command.add(APP_NAME);
        command.add(appJar);
        command.add("--config");
        command.add(flinkCommandArgs.getConfigFile());
        if (flinkCommandArgs.isCheckConfig()) {
            command.add("--check");
        }
        //set job name
        command.add("-Dpipeline.name=" + flinkCommandArgs.getJobName());
        // set System properties
        flinkCommandArgs.getVariables().stream()
                .filter(Objects::nonNull)
                .map(String::trim)
                .forEach(variable -> command.add("-D" + variable));
        return command;
    }
}
  在FlinkStarter类中,通过buildCommands()构建Flink提交任务命令。至此,可以大概推断出来SeaTunnel的执行逻辑,即将自己封装为一个Jar包提交Flink执行,在执行时,SeaTunnel根据用户编写的conf来装填对应的Source、Transform、Sink插件,最终将拼好任务后提交给Flink构建StreamGraph和JobGraph。

   FlinkStarter类中,将要执行的jar的main设置为SeatunnelFlink.class,该类主要完成flinkcommand构建和运行。
public class SeatunnelFlink {

    public static void main(String[] args) throws CommandException {
        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), StarterConstant.SHELL_NAME, true);
        Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
            .buildCommand(flinkCommandArgs);
        Seatunnel.run(flinkCommand);
    }
}
    CommandLineUtils类中实现了所有实现AbstractCommandArgs接口的对象的参数解析
public static <T extends AbstractCommandArgs> T parse(String[] args, T obj, String programName, boolean acceptUnknownOptions) {
        JCommander jCommander = JCommander.newBuilder()
                .programName(programName)
                .addObject(obj)
                .acceptUnknownOptions(acceptUnknownOptions)
                .build();
        try {
            jCommander.parse(args);
            // The args is not belongs to SeaTunnel, add into engine original parameters
            obj.setOriginalParameters(jCommander.getUnknownOptions());
        } catch (ParameterException e) {
            System.err.println(e.getLocalizedMessage());
            exit(jCommander);
        }

        if (obj.isHelp()) {
            exit(jCommander);
        }
        return obj;
    }
    Seatunnel.run方法主要负责提交command执行,而command作为接口,包含七个实现类,其中flink相关的,包含 FlinkApiConfValidateCommand (校验配置参数是否规范)和FlinkApiTaskExecuteCommand (真正构建执行任务) 
    public static <T extends CommandArgs> void run(Command<T> command) throws CommandException {
        try {
            command.execute();
        } catch (ConfigRuntimeException e) {
            showConfigError(e);
            throw e;
        } catch (Exception e) {
            showFatalError(e);
            throw e;
        }
    }

   FlinkApiConfValidateCommand类主要通过获取config配置信息,FlinkApiTaskExecuteCommand类主要负责任务执行。该类中通过构建FlinkExecution对象执行任务,FlinkExecution对象是TaskExecution接口的具体实现类。构造函数通过传入config,初始化对应jarpath、env,source、transform、sink。其中source、transform、sink对应为PluginExecuteProcessor 接口的对应实现类对象。 
public class FlinkApiTaskExecuteCommand implements Command<FlinkCommandArgs> {

    private final FlinkCommandArgs flinkCommandArgs;

    public FlinkApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
        this.flinkCommandArgs = flinkCommandArgs;
    }

    @Override
    public void execute() throws CommandExecuteException {
        Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
        checkConfigExist(configFile);
        Config config = new ConfigBuilder(configFile).getConfig();
        FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);
        try {
            seaTunnelTaskExecution.execute();
        } catch (Exception e) {
            throw new CommandExecuteException("Flink job executed failed", e);
        }
    }
}
   FlinkApiConfValidateCommand类主要通过获取config配置信息,然后构建FlinkExecution对象,FlinkExecution对象是TaskExecution接口的具体实现类。构造函数通过传入config,初始化对应jarpath、env,source、transform、sink。其中source、transform、sink对应为PluginExecuteProcessor 接口的对应实现类对象。
private final FlinkEnvironment flinkEnvironment;
private final PluginExecuteProcessor sourcePluginExecuteProcessor;
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
private final List<URL> jarPaths;

public FlinkExecution(Config config) {
      try {
          jarPaths = new ArrayList<>(Collections.singletonList(
              new File(Common.appStarterDir().resolve(FlinkStarter.APP_JAR_NAME).toString()).toURI().toURL()));
      } catch (MalformedURLException e) {
          throw new SeaTunnelException("load flink starter error.", e);
      }
      registerPlugin(config.getConfig("env"));
      JobContext jobContext = new JobContext();
      jobContext.setJobMode(FlinkEnvironmentFactory.getJobMode(config));

      this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
      this.transformPluginExecuteProcessor = new TransformExecuteProcessor(jarPaths,
          TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()), jobContext);
      this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(jarPaths, config.getConfigList(Constants.SINK), jobContext);

      this.flinkEnvironment = new FlinkEnvironmentFactory(this.registerPlugin(config, jarPaths)).getEnvironment();

      this.sourcePluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
      this.transformPluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
      this.sinkPluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
}
    FlinkExecution对象构建完成后,执行execute方法,分别对sourcePluginExecuteProcessor、transformPluginExecuteProcessor、sinkPluginExecuteProcessor执行execute方法。其中,FlinkEnvironment和SparkEnvironment一样,是对Flink和Spark中执行上下文的封装。
@Override
public void execute() throws TaskExecuteException {
    List<DataStream<Row>> dataStreams = new ArrayList<>();
    dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
    dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
    sinkPluginExecuteProcessor.execute(dataStreams);

    log.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
    try {
          flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
    } catch (Exception e) {
        throw new TaskExecuteException("Execute Flink job error", e);
    }
}
   sourcePluginExecuteProcessor中execute方法通过addsource获取带有Schema的DataStreamSource<Row>,最终返回DataStream<Row>的列表。
    @Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {
        StreamExecutionEnvironment executionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
        List<DataStream<Row>> sources = new ArrayList<>();
        for (int i = 0; i < plugins.size(); i++) {
            //插件方式,动态引入数据源
            SeaTunnelSource internalSource = plugins.get(i);
            BaseSeaTunnelSourceFunction sourceFunction;
            //检查数据源是否支持协同
            if (internalSource instanceof SupportCoordinate) {
                sourceFunction = new SeaTunnelCoordinatedSource(internalSource);
            } else {
                sourceFunction = new SeaTunnelParallelSource(internalSource);
            }
            DataStreamSource<Row> sourceStream = addSource(executionEnvironment,
                sourceFunction,
                "SeaTunnel " + internalSource.getClass().getSimpleName(),
                internalSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.BOUNDED);
            Config pluginConfig = pluginConfigs.get(i);
            if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
                int parallelism = pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                sourceStream.setParallelism(parallelism);
            }
            // 将处理后的数据注册为表
            registerResultTable(pluginConfig, sourceStream);
            sources.add(sourceStream);
        }
        return sources;
    }

    private DataStreamSource<Row> addSource(
        final StreamExecutionEnvironment streamEnv,
        final BaseSeaTunnelSourceFunction function,
        final String sourceName,
        boolean bounded) {
        checkNotNull(function);
        checkNotNull(sourceName);
        checkNotNull(bounded);

        TypeInformation<Row> resolvedTypeInfo = function.getProducedType();
        boolean isParallel = function instanceof ParallelSourceFunction;
        streamEnv.clean(function);

        final StreamSource<Row, ?> sourceOperator = new StreamSource<>(function);
        return new DataStreamSource<>(streamEnv, resolvedTypeInfo, sourceOperator, isParallel, sourceName, bounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED);
    }
   transformPluginExecuteProcessor中execute方法首先判断transform插件是否为空,为空则代表无transform操作,根据transform插件依次对数据进行转换,最后将结果写入result返回。
@Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
        if (plugins.isEmpty()) {
            return upstreamDataStreams;
        }
        DataStream<Row> input = upstreamDataStreams.get(0);
        List<DataStream<Row>> result = new ArrayList<>();
        for (int i = 0; i < plugins.size(); i++) {
            try {
                FlinkStreamTransform transform = plugins.get(i);
                Config pluginConfig = pluginConfigs.get(i);
                DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
                input = transform.processStream(flinkEnvironment, stream);
                registerResultTable(pluginConfig, input);
                transform.registerFunction(flinkEnvironment);
                result.add(input);
            } catch (Exception e) {
                throw new TaskExecuteException(
                    String.format("SeaTunnel transform task: %s execute error", plugins.get(i).getPluginName()), e);
            }
        }
        return result;
    }
   SinkExecuteProcessor中execute方法同理,最终通过stream.sinkTO将结果写入对于sink端,同时根据sink端配置信息,设置是否并行。 
@Override
    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
        DataStream<Row> input = upstreamDataStreams.get(0);
        for (int i = 0; i < plugins.size(); i++) {
            Config sinkConfig = pluginConfigs.get(i);
            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
            DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
            seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
            DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
            if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
                int parallelism = sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                dataStreamSink.setParallelism(parallelism);
            }
        }
        // the sink is the last stream
        return null;
}

三、源码补充

Flink DataSource

   在sourcePluginExecuteProcessor的execute方法中,创建了一个名为sourcefunction的BaseSeaTunnelFunction对象,该类继承至抽象类RichSourceFunction,并实现了CheckpointListener、CheckpointedFunction、ResultTypeQueryable<Row>三个接口。其中RichSourceFunction抽象类又继承至抽象类AbstractRichFunction,并实现SourcFunction接口。AbstractRichFunction的父类是抽象类**RichFunction**,该类是用户自定义function的基类   。     

    sourcePluginExecuteProcessor的execute中通过**addSource**方法将一个StreamFunction封装为StreamSource,并最终返回一个DataStreamSource对象,DataStreamSource表示DataStream的起点,是SingleOutputStreamOperator的子类,超级父类为DataStream,所有的数据源都会被包装为**DataStreamSource**。通过addSource方法添加具有**自定义类型信息**的数据源到计算任务中。用户自定义数据源则通过实现**SourceFunction**接口,该接口供两个方法:

run执行方法,实现读取数据的实际操作cancel取消函数,用于取消/关闭连接使用
SourceFunction定义的数据为非并行的,实现ParallelSourceFunction接口或继承RichParallelSourceFunction来定义并行的source。SeaTunnel中SeaTunnelParallelSource实现ParallelSourceFunction接口提供并行数据源。

seatunnel Source

seatunnel ExecuteProcessor

标签: java 大数据

本文转载自: https://blog.csdn.net/qq_25062299/article/details/129832957
版权归原作者 kiss火葱花 所有, 如有侵权,请联系我们删除。

“Seatunnel-2.3.0源码解析”的评论:

还没有评论