0


flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读.
在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务,
然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交,
本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命令+yaml文件将任务提交,其他的功能会在之后的文章中解读
大数据小菜鸡在努力学习中,文中内容有误多多指点.

目录

概述
流程图
flink-cdc.sh解读
完整代码
逐行解读
参考

概述

首先需要思考一下,如果是自己来实现这一效果,那么应该如何设计,用什么技术?

我们知道flinkcdc的同步任务实际上也是一个flink任务,最终的提交的还是一个flink任务,而flink任务实际上就是个java任务,用jps命令都是可以查到的.

我们在编写flink streaming程序的时候,实际上主要的流程都是在一个main方法中,而main方法是可以接收参数的,所以这块设计起来其实很简单就是在shell脚本中获取到FLINK_HOME路径,然后将yaml文件通过命令行的方式传递到main方法中,然后再设计一个类来解析这个yaml文件形成一个任务实体类,然后根据这个实体类来生成一个flink任务,这就是一个大概的思路,里面肯定还有很多的细节,接下来就通过这个flink-cdc.sh脚本的解读来进一步看看大佬们是如何来实现这一功能的.

流程图

这里使用一个流程图来描述整个的流程,看完这个就知道这一脚本的大概内容了,如果有兴趣可以继续往下阅读,后面都是将脚本的一行一行的解读并配有中文注释.
image.png

flink-cdc.sh解读

源码路径 : flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh

完整代码

#!/usr/bin/env bash#################################################################################  Copyright 2023 Ververica Inc.##  Licensed under the Apache License, Version 2.0 (the "License");#  you may not use this file except in compliance with the License.#  You may obtain a copy of the License at##      http://www.apache.org/licenses/LICENSE-2.0##  Unless required by applicable law or agreed to in writing, software#  distributed under the License is distributed on an "AS IS" BASIS,#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#  See the License for the specific language governing permissions and# limitations under the License.################################################################################# Setup FLINK_HOMEargs=("$@")# Check if FLINK_HOME is set in command-line arguments by "--flink-home"for((i=0; i < ${#args[@]}; i++));docase"${args[i]}"in
        --flink-home)if[[-n"${args[i+1]}"]];thenFLINK_HOME="${args[i+1]}"breakfi;;esacdoneif[[-z$FLINK_HOME]];thenecho"[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"."exit1fi# Setup Flink related configurations# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it_FLINK_HOME_DETERMINED=1# FLINK_CONF_DIR is required by config.shFLINK_CONF_DIR=$FLINK_HOME/conf
# Use config.sh to setup Flink related configurations.$FLINK_HOME/bin/config.sh

# Define Flink CDC directoriesSCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")"&> /dev/null &&pwd)FLINK_CDC_HOME="$SCRIPT_DIR"/..exportFLINK_CDC_HOME=$FLINK_CDC_HOMEFLINK_CDC_CONF="$FLINK_CDC_HOME"/conf
FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib
FLINK_CDC_LOG="$FLINK_CDC_HOME"/log

# Build Java classpathCLASSPATH=""# Add Flink libraries to the classpathforjarin"$FLINK_HOME"/lib/*.jar;doCLASSPATH=$CLASSPATH:$jardone# Add Flink CDC libraries to classpathforjarin"$FLINK_CDC_LIB"/*.jar;doCLASSPATH=$CLASSPATH:$jardone# Add Hadoop classpath, which is defined in config.shCLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS# Trim classpathCLASSPATH=${CLASSPATH#:}# Setup loggingLOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log
LOG_SETTINGS=(-Dlog.file="$LOG"-Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties)# JAVA_RUN should have been setup in config.shexec"$JAVA_RUN"-classpath"$CLASSPATH""${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"

逐行解析

参数传入

#!/usr/bin/env bash# Setup FLINK_HOME# 获取这个脚本的所有参数,然后存储到args变量中# ${#args[@]} 获取数组长度# ${args[i]} 获取数组第i个值args=("$@")

设置FLINK_HOME这个变量

# Check if FLINK_HOME is set in command-line arguments by "--flink-home"# 遍历传入的参数检查是否FLINK_HOME这个环境变量是通过命令行参数 --flink-home传递进来的# shell中case的语法# case 值 in#      模式1) # 这里的模式指的是shell中的通配符模式不是正则表达式,例如 a*,就是a开头的任意字符串#           代码块#           ;;#      模式2)#           代码块#           ;;#      *)#           默认代码块#           ;;# esacfor((i=0; i < ${#args[@]}; i++));docase"${args[i]}"in
        --flink-home)# 如果匹配到到了就取他的下一个值给FLINK_HOME赋值,取值之前要判断一下是否存在# -n 就是检查字符串长度是否大于0,大于0返回true,否则falseif[[-n"${args[i+1]}"]];thenFLINK_HOME="${args[i+1]}"breakfi;;esacdone

校验FLINK_HOME这个变量是否设置成功

# 如果经过上面的循环还是没有给FLINK_HOME赋值就退出程序# 提示 [错误] 不能够在命令行参数--flink-home 或者 环境变量FLINK_HOME 找到 FLINK_HOME的值 if[[-z$FLINK_HOME]];thenecho"[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"."exit1fi

获取Flink的一些相关配置

# Setup Flink related configurations# 设置flink相关的配置# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it# 为了避免config.sh(这个文件在$FLINK_HOME/bin/config.sh)覆盖掉FLINK_HOME这个变量,所以这里将它置位1# 为什么置为1呢,这里可以看一下config.sh中的相关代码,如下# 可以看到如果变量_FLINK_HOME_DETERMINED为空那么就会把FLINK_HOME的值替换掉,所以这里将它的值赋值为1就是为了避免这个# 具体FLINK_HOME会被替换成什么值呢# dirname 就是要获取文件路径的路径,例如dirname /home/user/a.txt 返回 /home/user/# $SYMLINK_RESOLVED_BIN 是什么值呢# 是切换到$bin路径下,的绝对路径(pwd -P的意思就是获取实际文件系统路径,pwd是获取链接路径)# $bin是target的路径# target="$0" # $0就是当前脚本的名称# -L 判断是否是一个链接符号,判断target是否是一个链接符号# 如果是一个链接符号,那么就执行循环的代码块# 跳出的条件是target变量不是一个链接符号或者循环了100次跳出循环,-gt是大于 -ge是大于等于# ls 就是列出目录信息# -ld 有两个参数 -l和-d,-l是长格式进行显示,包括文件的属性和权限信息,相当于ll# -d是只显示目录自身的信息,而不列出目录中的文件,无论是文件还是目录,都不会进入它,仅是显示它自身的信息# -- 是一个特殊的选项,    用于分隔选项与参数.它的作用是确保$target被视作参数,即使$target是 - 开头的,避免将其解析成选项# 解释一下 target=`expr "$ls" : '.* -> \(.*\)$'`# 这行大概意思就是通过expr命令和正则表达式提取$ls变量中符号链接的目标路径或者目录,然后赋值给target# expr 是一个执行表达式的命令# "$ls" 是作为参数传递给expr# : '.* -> \(.*\)$' 这是一个正则表达式,用于匹配符号链接中的目标文件或目录.通过使用圆括号 ( ) 捕获模式,可以将匹配到的部分提取出来# target="$0"# # For the case, the executable has been directly symlinked, figure out# # the correct bin path by following its symlink up to an upper bound.# # Note: we can't use the readlink utility here if we want to be POSIX# # compatible.# iteration=0# while [ -L "$target" ]; do#     if [ "$iteration" -gt 100 ]; then#         echo "Cannot resolve path: You have a cyclic symlink in $target."#         break#     fi#     ls=`ls -ld -- "$target"`#     target=`expr "$ls" : '.* -> \(.*\)$'`#     iteration=$((iteration + 1))# done# Convert relative path to absolute path and resolve directory symlinks# bin=`dirname "$target"`# SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`# if [ -z "$_FLINK_HOME_DETERMINED" ]; then#     FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`# fi_FLINK_HOME_DETERMINED=1# FLINK_CONF_DIR is required by config.sh# config.sh 需要 FLINK_CONF_DIR 配置FLINK_CONF_DIR=$FLINK_HOME/conf
# Use config.sh to setup Flink related configurations# 使用config.sh来配置 Flink相关的配置.$FLINK_HOME/bin/config.sh

定义Flink cdc 的一些路径

# Define Flink CDC directories# 定义Flink cdc 的路径# SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )# 这行的大概意思就是要获取脚本的绝对路径# ${BASH_SOURCE[0]} bash的特殊变量,获取当前运行脚本的名称# $(dirname -- ${BASH_SOURCE[0]}) 获取当前运行脚本的路径(不能直接用这个,因为可能会因为软连接或者其他情况导致路径获取不准确,最稳妥的方法就是cd 到这个路径然后pwd获取绝对路径),这里的 -- 就是防止后面的变量被识别成选项例如-开头# cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" 切换到这个路径下# &> /dev/null 就是将一些标准输出和错误输出都重定向到/dev/null,这样可以使输出更清晰# && 当前一个命令执行成功后执行后面的命令# pwd 获取当前路径# FLINK_CDC_HOME="$SCRIPT_DIR"/..# SCRIPT_DIR 的上级路径就是FLINK_CDC_HOME的值,就是切换到了bin目录的根目录SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")"&> /dev/null &&pwd)FLINK_CDC_HOME="$SCRIPT_DIR"/..exportFLINK_CDC_HOME=$FLINK_CDC_HOMEFLINK_CDC_CONF="$FLINK_CDC_HOME"/conf
FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib
FLINK_CDC_LOG="$FLINK_CDC_HOME"/log

构建任务启动需要的classpath

# Build Java classpath# 构建 Java的calsspathCLASSPATH=""# Add Flink libraries to the classpath# 将flink路径下lib的jar包都添加到classpath中forjarin"$FLINK_HOME"/lib/*.jar;doCLASSPATH=$CLASSPATH:$jardone# Add Flink CDC libraries to classpath# 将cdc下lib的jar包都添加到classpathforjarin"$FLINK_CDC_LIB"/*.jar;doCLASSPATH=$CLASSPATH:$jardone# Add Hadoop classpath, which is defined in config.sh# 添加hadoop 的classpathCLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS# Trim classpath# 去掉字符串开头的冒号 ,如果要去掉结尾的冒号 ${CLASSPATH%:}CLASSPATH=${CLASSPATH#:}

设置日志相关的配置

# Setup logging# 配置日志LOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log
# 启动命令中将日志的配置参数拼接,指定日志文件以及日志配置文件LOG_SETTINGS=(-Dlog.file="$LOG"-Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties)

启动任务

# JAVA_RUN should have been setup in config.sh# exec 是一个用于替换当前进程的命令,一般用在脚本中,会将当前脚本的执行进程执行的内容替换成exec后面命令# 有什么作用呢?# 1.减少系统资源 : 不用创建一个新的进程# 2.重定向标准输入/输出 : 通过使用 exec 命令执行新的命令.可以将标准输入,输出和错误重定向到新命令所指定的位置.# 3.执行后续操作:在脚本中,使用 exec 命令可以执行一些命令或操作后,将控制权交给新的命令.这可以用于在脚本中完成某些初始化操作后,将脚本完全替换为另一个命令或程序.# $JAVA_RUN 在config.sh就定义了,一般是java 或者 /bin/java# -classpath 指定classpath路径# "${LOG_SETTINGS[@]}" 日志的一些配置信息# com.ververica.cdc.cli.CliFrontend 入口类# "$@" 所有的命令行参数传到入口类中,通过String args[] 来接收exec"$JAVA_RUN"-classpath"$CLASSPATH""${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"

参考

[1] : https://github.com/apache/flink

[2] : https://github.com/ververica/flink-cdc-connectors

[3] : https://blog.csdn.net/wang2leee/article/details/132521566

标签: flink flinkcdc 源码

本文转载自: https://blog.csdn.net/weixin_44745147/article/details/136034151
版权归原作者 Antgeek 所有, 如有侵权,请联系我们删除。

“flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh”的评论:

还没有评论