flink 启动Job加载外部jar都有哪些方法
在 Apache Flink 版本中,启动 Job 时加载外部 Jar 包有几种不同的方法。这些方法允许用户引入自定义的 UDF(用户定义函数)或其他依赖项。以下是几种常见的方法:
- 使用 flink run 命令直接启动 你可以通过命令行工具 flink run 来指定你的 Job 的主类以及外部 Jar 包的位置。例如:
flink run -c com.example.YourMainClass /path/to/your-job.jar -d--classpaths /path/to/external-jar.jar
这里的 -c 参数指定了 Job 的入口类,-d 表示在后台运行 Job,–classpaths 参数用来指定外部 Jar 包的位置。
2. 通过 Flink Web UI 提交 Job
如果使用的是 Flink 的 Web UI,可以直接在界面上选择你要提交的 Jar 包。在 Flink 的 Web UI 页面上,找到 “Submit New Job”,然后选择要上传运行的 Jar 包,并进行任务配置。
3. 使用 Flink 的 YARN Session 模式
如果你是在 YARN 上使用 Flink 的 Session 模式,那么可以在提交 Job 时通过配置文件或者命令行参数指定额外的 ClassPath。例如,在 flink-conf.yaml 中添加额外的 ClassPath:
yarn.application.classpaths: file:///path/to/external-jar.jar
- 使用 flink yarn cluster 或 flink yarn session 命令 当使用 Flink 的 YARN 集群模式时,也可以通过命令行参数来指定外部 Jar 包的位置:
flink yarn cluster --classpaths /path/to/external-jar.jar
或者
flink yarn session --classpaths /path/to/external-jar.jar
- 将 Jar 包打包进主 Job Jar 包中 另一种方法是将所有的依赖 Jar 包都打成一个单独的 Jar 包,然后将这个 Jar 包作为主 Job 的一部分。这样就不需要在运行时显式地指定外部 Jar 包了。 注意事项 确保外部 Jar 包与你的 Flink 版本兼容。 如果使用的是 YARN 模式,确保外部 Jar 包能够正确地分发到各个 TaskManager 上。 如果遇到内存问题,可能需要调整 YARN 或 Flink 的配置,例如增加可用的内存或者调整并行度。
不知道各位是否发现,前面几种方法,都一次指定一个jar。如果某个job依赖多个jar,比如20,30个jar ,通过-C参数一个个指定。不太现实。
第5种方法,虽然可以将多个jar打成一个jar,但是我尝试过,多个jar打包成一个jar了,但是运行时,死活找不到类。下面我介绍第6种方法
6. 修改flink 源码 ,增加支持传入jar目录参数。
这样提供一个jar目录参数,不管有多少jar都放到这个目录。一个命令参数搞定,然后增加的功能,循环从jar目录装载jar到类路径中。这样就比较省事,另外就是不会跟系统或者其他job发生jar冲突。非常棒。
下面以flink 1.13版本为例,
先从git下载,url: https://github.com/apache/flink.git
说明怎么修改代码,一共修改了两个文件,
分别是CliFrontendParser.java,ProgramOptions.java
它们在flink-clients,但是运行环境它在flink-dist.jar中,修改完后,需要替换flink-dist.jar中对应这两个类
CliFrontendParser.java
备注//新增地方就是修改地方
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/packageorg.apache.flink.client.cli;importorg.apache.flink.configuration.CheckpointingOptions;importorg.apache.flink.runtime.jobgraph.SavepointRestoreSettings;importorg.apache.commons.cli.CommandLine;importorg.apache.commons.cli.DefaultParser;importorg.apache.commons.cli.HelpFormatter;importorg.apache.commons.cli.Option;importorg.apache.commons.cli.Options;importorg.apache.commons.cli.ParseException;importjavax.annotation.Nullable;importjava.util.Collection;importjava.util.List;importjava.util.stream.Collectors;/**
* A simple command line parser (based on Apache Commons CLI) that extracts command line options.
*/publicclassCliFrontendParser{staticfinalOptionHELP_OPTION=newOption("h","help",false,"Show the help message for the CLI Frontend or the action.");staticfinalOptionJAR_OPTION=newOption("j","jarfile",true,"Flink program JAR file.");staticfinalOptionCLASS_OPTION=newOption("c","class",true,"Class with the program entry point (\"main()\" method). Only needed if the "+"JAR file does not specify the class in its manifest.");staticfinalOptionCLASSPATH_OPTION=newOption("C","classpath",true,"Adds a URL to each user code "+"classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be "+"accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "+"times for specifying more than one URL. The protocol must be supported by the "+"{@link java.net.URLClassLoader}.");publicstaticfinalOptionPARALLELISM_OPTION=newOption("p","parallelism",true,"The parallelism with which to run the program. Optional flag to override the default value "+"specified in the configuration.");publicstaticfinalOptionDETACHED_OPTION=newOption("d","detached",false,"If present, runs "+"the job in detached mode");publicstaticfinalOptionSHUTDOWN_IF_ATTACHED_OPTION=newOption("sae","shutdownOnAttachedExit",false,"If the job is submitted in attached mode, perform a best-effort cluster shutdown "+"when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");//**********新增地方*****************staticfinalOptionJARDIR_OPTION=newOption("jd","jardir",true,"Adds a jar dir to each user code "+"classloader on all nodes in the cluster. The paths must specify exists and be "+"accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple "+"times for specifying more than one URL. ");//**********新增地方***************** /**
* @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN
* deployments
*/@DeprecatedpublicstaticfinalOptionYARN_DETACHED_OPTION=newOption("yd","yarndetached",false,"If present, runs "+"the job in detached mode (deprecated; use non-YARN specific option instead)");publicstaticfinalOptionARGS_OPTION=newOption("a","arguments",true,"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");publicstaticfinalOptionADDRESS_OPTION=newOption("m","jobmanager",true,"Address of the JobManager to which to connect. "+"Use this flag to connect to a different JobManager than the one specified in the configuration.");publicstaticfinalOptionSAVEPOINT_PATH_OPTION=newOption("s","fromSavepoint",true,"Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");publicstaticfinalOptionSAVEPOINT_ALLOW_NON_RESTORED_OPTION=newOption("n","allowNonRestoredState",false,"Allow to skip savepoint state that cannot be restored. "+"You need to allow this if you removed an operator from your "+"program that was part of the program when the savepoint was triggered.");staticfinalOptionSAVEPOINT_DISPOSE_OPTION=newOption("d","dispose",true,"Path of savepoint to dispose.");// list specific optionsstaticfinalOptionRUNNING_OPTION=newOption("r","running",false,"Show only running programs and their JobIDs");staticfinalOptionSCHEDULED_OPTION=newOption("s","scheduled",false,"Show only scheduled programs and their JobIDs");staticfinalOptionALL_OPTION=newOption("a","all",false,"Show all programs and their JobIDs");staticfinalOptionZOOKEEPER_NAMESPACE_OPTION=newOption("z","zookeeperNamespace",true,"Namespace to create the Zookeeper sub-paths for high availability mode");staticfinalOptionCANCEL_WITH_SAVEPOINT_OPTION=newOption("s","withSavepoint",true,"**DEPRECATION WARNING**: "+"Cancelling a job with savepoint is deprecated. Use \"stop\" instead. \n Trigger"+" savepoint and cancel job. The target directory is optional. If no directory is "+"specified, the configured default directory ("+CheckpointingOptions.SAVEPOINT_DIRECTORY.key()+") is used.");publicstaticfinalOptionSTOP_WITH_SAVEPOINT_PATH=newOption("p","savepointPath",true,"Path to the savepoint (for example hdfs:///flink/savepoint-1537). "+"If no directory is specified, the configured default will be used (\""+CheckpointingOptions.SAVEPOINT_DIRECTORY.key()+"\").");publicstaticfinalOptionSTOP_AND_DRAIN=newOption("d","drain",false,"Send MAX_WATERMARK before taking the savepoint and stopping the pipelne.");publicstaticfinalOptionPY_OPTION=newOption("py","python",true,"Python script with the program entry point. "+"The dependent resources can be configured with the `--pyFiles` option.");publicstaticfinalOptionPYFILES_OPTION=newOption("pyfs","pyFiles",true,"Attach custom files for job. The standard resource file suffixes such as .py/.egg/.zip/.whl or directory are all supported. "+"These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. "+"Files suffixed with .zip will be extracted and added to PYTHONPATH. "+"Comma (',') could be used as the separator to specify multiple files "+"(e.g., --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).");publicstaticfinalOptionPYMODULE_OPTION=newOption("pym","pyModule",true,"Python module with the program entry point. "+"This option must be used in conjunction with `--pyFiles`.");publicstaticfinalOptionPYREQUIREMENTS_OPTION=newOption("pyreq","pyRequirements",true,"Specify a requirements.txt file which defines the third-party dependencies. "+"These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. "+"A directory which contains the installation packages of these dependencies could be specified "+"optionally. Use '#' as the separator if the optional parameter exists "+"(e.g., --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir).");publicstaticfinalOptionPYARCHIVE_OPTION=newOption("pyarch","pyArchives",true,"Add python archive files for job. The archive files will be extracted to the working directory "+"of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory "+"be specified. If the target directory name is specified, the archive file will be extracted to a "+"directory with the specified name. Otherwise, the archive file will be extracted to a "+"directory with the same name of the archive file. The files uploaded via this option are accessible "+"via relative path. '#' could be used as the separator of the archive file path and the target directory "+"name. Comma (',') could be used as the separator to specify multiple archive files. "+"This option can be used to upload the virtual environment, the data files used in Python UDF "+"(e.g., --pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable "+"py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: "+"f = open('data/data.txt', 'r').");publicstaticfinalOptionPYEXEC_OPTION=newOption("pyexec","pyExecutable",true,"Specify the path of the python interpreter used to execute the python UDF worker "+"(e.g.: --pyExecutable /usr/local/bin/python3). "+"The python UDF worker depends on Python 3.6+, Apache Beam (version == 2.27.0), "+"Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). "+"Please ensure that the specified environment meets the above requirements.");static{HELP_OPTION.setRequired(false);JAR_OPTION.setRequired(false);JAR_OPTION.setArgName("jarfile");CLASS_OPTION.setRequired(false);CLASS_OPTION.setArgName("classname");CLASSPATH_OPTION.setRequired(false);CLASSPATH_OPTION.setArgName("url");ADDRESS_OPTION.setRequired(false);ADDRESS_OPTION.setArgName("host:port");PARALLELISM_OPTION.setRequired(false);PARALLELISM_OPTION.setArgName("parallelism");DETACHED_OPTION.setRequired(false);SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false);YARN_DETACHED_OPTION.setRequired(false);JARDIR_OPTION.setRequired(false);//新增地方ARGS_OPTION.setRequired(false);ARGS_OPTION.setArgName("programArgs");ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);RUNNING_OPTION.setRequired(false);SCHEDULED_OPTION.setRequired(false);SAVEPOINT_PATH_OPTION.setRequired(false);SAVEPOINT_PATH_OPTION.setArgName("savepointPath");SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false);ZOOKEEPER_NAMESPACE_OPTION.setRequired(false);ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace");CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false);CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);STOP_WITH_SAVEPOINT_PATH.setRequired(false);STOP_WITH_SAVEPOINT_PATH.setArgName("savepointPath");STOP_WITH_SAVEPOINT_PATH.setOptionalArg(true);STOP_AND_DRAIN.setRequired(false);PY_OPTION.setRequired(false);PY_OPTION.setArgName("pythonFile");PYFILES_OPTION.setRequired(false);PYFILES_OPTION.setArgName("pythonFiles");PYMODULE_OPTION.setRequired(false);PYMODULE_OPTION.setArgName("pythonModule");PYREQUIREMENTS_OPTION.setRequired(false);PYARCHIVE_OPTION.setRequired(false);PYEXEC_OPTION.setRequired(false);}staticfinalOptionsRUN_OPTIONS=getRunCommandOptions();privatestaticOptionsbuildGeneralOptions(Options options){
options.addOption(HELP_OPTION);// backwards compatibility: ignore verbose flag (-v)
options.addOption(newOption("v","verbose",false,"This option is deprecated."));return options;}privatestaticOptionsgetProgramSpecificOptions(Options options){
options.addOption(JAR_OPTION);
options.addOption(CLASS_OPTION);
options.addOption(CLASSPATH_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(ARGS_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
options.addOption(YARN_DETACHED_OPTION);
options.addOption(PY_OPTION);
options.addOption(PYFILES_OPTION);
options.addOption(PYMODULE_OPTION);
options.addOption(PYREQUIREMENTS_OPTION);
options.addOption(PYARCHIVE_OPTION);
options.addOption(PYEXEC_OPTION);
options.addOption(JARDIR_OPTION);//新增地方return options;}privatestaticOptionsgetProgramSpecificOptionsWithoutDeprecatedOptions(Options options){
options.addOption(CLASS_OPTION);
options.addOption(CLASSPATH_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
options.addOption(PY_OPTION);
options.addOption(PYFILES_OPTION);
options.addOption(PYMODULE_OPTION);
options.addOption(PYREQUIREMENTS_OPTION);
options.addOption(PYARCHIVE_OPTION);
options.addOption(PYEXEC_OPTION);
options.addOption(JARDIR_OPTION);//新增地方return options;}publicstaticOptionsgetRunCommandOptions(){Options options =buildGeneralOptions(newOptions());
options =getProgramSpecificOptions(options);
options.addOption(SAVEPOINT_PATH_OPTION);return options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);}staticOptionsgetInfoCommandOptions(){Options options =buildGeneralOptions(newOptions());returngetProgramSpecificOptions(options);}staticOptionsgetListCommandOptions(){Options options =buildGeneralOptions(newOptions());
options.addOption(ALL_OPTION);
options.addOption(RUNNING_OPTION);return options.addOption(SCHEDULED_OPTION);}staticOptionsgetCancelCommandOptions(){Options options =buildGeneralOptions(newOptions());return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);}staticOptionsgetStopCommandOptions(){returnbuildGeneralOptions(newOptions()).addOption(STOP_WITH_SAVEPOINT_PATH).addOption(STOP_AND_DRAIN);}staticOptionsgetSavepointCommandOptions(){Options options =buildGeneralOptions(newOptions());
options.addOption(SAVEPOINT_DISPOSE_OPTION);return options.addOption(JAR_OPTION);}// --------------------------------------------------------------------------------------------// Help// --------------------------------------------------------------------------------------------privatestaticOptionsgetRunOptionsWithoutDeprecatedOptions(Options options){Options o =getProgramSpecificOptionsWithoutDeprecatedOptions(options);
o.addOption(SAVEPOINT_PATH_OPTION);return o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);}privatestaticOptionsgetInfoOptionsWithoutDeprecatedOptions(Options options){
options.addOption(CLASS_OPTION);
options.addOption(PARALLELISM_OPTION);return options;}privatestaticOptionsgetListOptionsWithoutDeprecatedOptions(Options options){
options.addOption(RUNNING_OPTION);
options.addOption(ALL_OPTION);
options.addOption(SCHEDULED_OPTION);return options;}privatestaticOptionsgetCancelOptionsWithoutDeprecatedOptions(Options options){return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);}privatestaticOptionsgetStopOptionsWithoutDeprecatedOptions(Options options){return options.addOption(STOP_WITH_SAVEPOINT_PATH).addOption(STOP_AND_DRAIN);}privatestaticOptionsgetSavepointOptionsWithoutDeprecatedOptions(Options options){
options.addOption(SAVEPOINT_DISPOSE_OPTION);
options.addOption(JAR_OPTION);return options;}/** Prints the help for the client. */publicstaticvoidprintHelp(Collection<CustomCommandLine> customCommandLines){System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");System.out.println();System.out.println("The following actions are available:");printHelpForRun(customCommandLines);printHelpForRunApplication(customCommandLines);printHelpForInfo();printHelpForList(customCommandLines);printHelpForStop(customCommandLines);printHelpForCancel(customCommandLines);printHelpForSavepoint(customCommandLines);System.out.println();}publicstaticvoidprintHelpForRun(Collection<CustomCommandLine> customCommandLines){HelpFormatter formatter =newHelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);System.out.println("\nAction \"run\" compiles and runs a program.");System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"run\" action options:");
formatter.printHelp(" ",getRunOptionsWithoutDeprecatedOptions(newOptions()));printCustomCliOptions(customCommandLines, formatter,true);System.out.println();}publicstaticvoidprintHelpForRunApplication(Collection<CustomCommandLine> customCommandLines){HelpFormatter formatter =newHelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);System.out.println("\nAction \"run-application\" runs an application in Application Mode.");System.out.println("\n Syntax: run-application [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"run-application\" action options:");// Only GenericCLI works with application mode, the other CLIs will be phased out// in the futureList<CustomCommandLine> filteredCommandLines =
customCommandLines.stream().filter((cli)-> cli instanceofGenericCLI).collect(Collectors.toList());printCustomCliOptions(filteredCommandLines, formatter,true);System.out.println();}publicstaticvoidprintHelpForInfo(){HelpFormatter formatter =newHelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);System.out.println("\nAction \"info\" shows the optimized execution plan of the program (JSON).");System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"info\" action options:");
formatter.printHelp(" ",getInfoOptionsWithoutDeprecatedOptions(newOptions()));System.out.println();}publicstaticvoidprintHelpForList(Collection<CustomCommandLine> customCommandLines){HelpFormatter formatter =newHelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);System.out.println("\nAction \"list\" lists running and scheduled programs.");System.out.println("\n Syntax: list [OPTIONS]");
formatter.setSyntaxPrefix(" \"list\" action options:");
formatter.printHelp(" ",getListOptionsWithoutDeprecatedOptions(newOptions()));printCustomCliOptions(customCommandLines, formatter,false);System.out.println();}publicstaticvoidprintHelpForStop(Collection<CustomCommandLine> customCommandLines){HelpFormatter formatter =newHelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);System.out.println("\nAction \"stop\" stops a running program with a savepoint (streaming jobs only).");System.out.println("\n Syntax: stop [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"stop\" action options:");
formatter.printHelp(" ",getStopOptionsWithoutDeprecatedOptions(newOptions()));printCustomCliOptions(customCommandLines, formatter,false);System.out.println();}publicstaticvoidprintHelpForCancel(Collection<CustomCommandLine> customCommandLines){HelpFormatter formatter =newHelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);System.out.println("\nAction \"cancel\" cancels a running program.");System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"cancel\" action options:");
formatter.printHelp(" ",getCancelOptionsWithoutDeprecatedOptions(newOptions()));printCustomCliOptions(customCommandLines, formatter,false);System.out.println();}publicstaticvoidprintHelpForSavepoint(Collection<CustomCommandLine> customCommandLines){HelpFormatter formatter =newHelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones.");System.out.println("\n Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]");
formatter.setSyntaxPrefix(" \"savepoint\" action options:");
formatter.printHelp(" ",getSavepointOptionsWithoutDeprecatedOptions(newOptions()));printCustomCliOptions(customCommandLines, formatter,false);System.out.println();}/**
* Prints custom cli options.
*
* @param formatter The formatter to use for printing
* @param runOptions True if the run options should be printed, False to print only general
* options
*/privatestaticvoidprintCustomCliOptions(Collection<CustomCommandLine> customCommandLines,HelpFormatter formatter,boolean runOptions){// prints options from all available command-line classesfor(CustomCommandLine cli : customCommandLines){
formatter.setSyntaxPrefix(" Options for "+ cli.getId()+" mode:");Options customOpts =newOptions();
cli.addGeneralOptions(customOpts);if(runOptions){
cli.addRunOptions(customOpts);}
formatter.printHelp(" ", customOpts);System.out.println();}}publicstaticSavepointRestoreSettingscreateSavepointRestoreSettings(CommandLine commandLine){if(commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())){String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());boolean allowNonRestoredState =
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());returnSavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);}else{returnSavepointRestoreSettings.none();}}// --------------------------------------------------------------------------------------------// Line Parsing// --------------------------------------------------------------------------------------------publicstaticCommandLineparse(Options options,String[] args,boolean stopAtNonOptions)throwsCliArgsException{finalDefaultParser parser =newDefaultParser();try{return parser.parse(options, args, stopAtNonOptions);}catch(ParseException e){thrownewCliArgsException(e.getMessage());}}/**
* Merges the given {@link Options} into a new Options object.
*
* @param optionsA options to merge, can be null if none
* @param optionsB options to merge, can be null if none
* @return
*/publicstaticOptionsmergeOptions(@NullableOptions optionsA,@NullableOptions optionsB){finalOptions resultOptions =newOptions();if(optionsA !=null){for(Option option : optionsA.getOptions()){
resultOptions.addOption(option);}}if(optionsB !=null){for(Option option : optionsB.getOptions()){
resultOptions.addOption(option);}}return resultOptions;}}
ProgramOptions.java
备注*//新增地方*就是修改地方
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/packageorg.apache.flink.client.cli;importorg.apache.flink.api.common.ExecutionConfig;importorg.apache.flink.configuration.ConfigUtils;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.configuration.CoreOptions;importorg.apache.flink.configuration.DeploymentOptions;importorg.apache.flink.configuration.PipelineOptions;importorg.apache.flink.runtime.jobgraph.SavepointRestoreSettings;importorg.apache.commons.cli.CommandLine;importjava.io.File;importjava.io.FilenameFilter;importjava.net.MalformedURLException;importjava.net.URL;importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;importstaticorg.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;importstaticorg.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;importstaticorg.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;importstaticorg.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;importstaticorg.apache.flink.client.cli.CliFrontendParser.JARDIR_OPTION;//新增地方importstaticorg.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;importstaticorg.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;importstaticorg.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;importstaticorg.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;importstaticorg.apache.flink.client.cli.ProgramOptionsUtils.containsPythonDependencyOptions;importstaticorg.apache.flink.client.cli.ProgramOptionsUtils.createPythonProgramOptions;importstaticorg.apache.flink.client.cli.ProgramOptionsUtils.isPythonEntryPoint;/** Base class for command line options that refer to a JAR file program. */publicclassProgramOptionsextendsCommandLineOptions{privateString jarFilePath;protectedString entryPointClass;privatefinalList<URL> classpaths;privatefinalString[] programArgs;privatefinalint parallelism;privatefinalboolean detachedMode;privatefinalboolean shutdownOnAttachedExit;privatefinalSavepointRestoreSettings savepointSettings;protectedProgramOptions(CommandLine line)throwsCliArgsException{super(line);this.entryPointClass =
line.hasOption(CLASS_OPTION.getOpt())? line.getOptionValue(CLASS_OPTION.getOpt()):null;this.jarFilePath =
line.hasOption(JAR_OPTION.getOpt())? line.getOptionValue(JAR_OPTION.getOpt()):null;this.programArgs =extractProgramArgs(line);List<URL> classpaths =newArrayList<URL>();if(line.hasOption(CLASSPATH_OPTION.getOpt())){for(String path : line.getOptionValues(CLASSPATH_OPTION.getOpt())){try{
classpaths.add(newURL(path));}catch(MalformedURLException e){thrownewCliArgsException("Bad syntax for classpath: "+ path);}}}//*** 新增地方*****// load jardir all jar.if(line.hasOption(JARDIR_OPTION.getOpt())){for(String path : line.getOptionValues(JARDIR_OPTION.getOpt())){List<URL> jarFiles =null;try{
jarFiles =loadAllJarFromPathURl(path);}catch(MalformedURLException e){
e.printStackTrace();thrownewCliArgsException("Bad syntax for classpath: "+ path);}// classpaths.add(new URL(path));
classpaths.addAll(jarFiles);}}//*** 新增地方*****this.classpaths = classpaths;if(line.hasOption(PARALLELISM_OPTION.getOpt())){String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());try{
parallelism =Integer.parseInt(parString);if(parallelism <=0){thrownewNumberFormatException();}}catch(NumberFormatException e){thrownewCliArgsException("The parallelism must be a positive number: "+ parString);}}else{
parallelism =ExecutionConfig.PARALLELISM_DEFAULT;}
detachedMode =
line.hasOption(DETACHED_OPTION.getOpt())|| line.hasOption(YARN_DETACHED_OPTION.getOpt());
shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());this.savepointSettings =CliFrontendParser.createSavepointRestoreSettings(line);}//***新增地方**** startprivateList<URL>loadAllJarFromPathURl(String path)throwsMalformedURLException{// 指定需要搜索的目录.List<URL> urls =newArrayList<>();System.out.println("jar dir:"+ path);// 创建File对象表示目录.File directory =newFile(path);// 使用FilenameFilter过滤出以.jar结尾的文件.File[] jarFiles =
directory.listFiles(newFilenameFilter(){@Overridepublicbooleanaccept(File dir,String name){return name.toLowerCase().endsWith(".jar");}});System.out.println("jarFiles len:"+ jarFiles.length);// 遍历找到的jar文件if(jarFiles !=null){for(File jarFile : jarFiles){System.out.println(jarFile.getAbsolutePath());URL url = jarFile.toURI().toURL();
urls.add(url);}}return urls;}//***新增地方**** endprotectedString[]extractProgramArgs(CommandLine line){String[] args =
line.hasOption(ARGS_OPTION.getOpt())? line.getOptionValues(ARGS_OPTION.getOpt()): line.getArgs();if(args.length >0&&!line.hasOption(JAR_OPTION.getOpt())){
jarFilePath = args[0];
args =Arrays.copyOfRange(args,1, args.length);}return args;}publicvoidvalidate()throwsCliArgsException{// Java program should be specified a JAR fileif(getJarFilePath()==null){thrownewCliArgsException("Java program should be specified a JAR file.");}}publicStringgetJarFilePath(){return jarFilePath;}publicStringgetEntryPointClassName(){return entryPointClass;}publicList<URL>getClasspaths(){return classpaths;}publicString[]getProgramArgs(){return programArgs;}publicintgetParallelism(){return parallelism;}publicbooleangetDetachedMode(){return detachedMode;}publicbooleanisShutdownOnAttachedExit(){return shutdownOnAttachedExit;}publicSavepointRestoreSettingsgetSavepointRestoreSettings(){return savepointSettings;}publicvoidapplyToConfiguration(Configuration configuration){if(getParallelism()!=ExecutionConfig.PARALLELISM_DEFAULT){
configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM,getParallelism());}
configuration.setBoolean(DeploymentOptions.ATTACHED,!getDetachedMode());
configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED,isShutdownOnAttachedExit());ConfigUtils.encodeCollectionToConfig(
configuration,PipelineOptions.CLASSPATHS,getClasspaths(),URL::toString);SavepointRestoreSettings.toConfiguration(getSavepointRestoreSettings(), configuration);}publicstaticProgramOptionscreate(CommandLine line)throwsCliArgsException{if(isPythonEntryPoint(line)||containsPythonDependencyOptions(line)){returncreatePythonProgramOptions(line);}else{returnnewProgramOptions(line);}}}
到底行不行呢?然后我们编写验证代码,分两部分
(1).flink job代码
这里直接复制wordcount例子,改个类名,然后调用(2)的jar中一个类
TestLoadExtJar.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/packageorg.apache.flink.examples.java.testloadextjar;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.utils.MultipleParameterTool;importorg.apache.flink.examples.java.wordcount.util.WordCountData;importorg.apache.flink.util.Collector;importorg.apache.flink.util.Preconditions;importcom.test.A;/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text
* files.
*
* <p>The input is a plain text file with lines separated by newline characters.
*
* <p>Usage: <code>WordCount --input <path> --output <path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
* <p>This example shows how to:
*
* <ul>
* <li>write a simple Flink program.
* <li>use Tuple data types.
* <li>write and use user-defined functions.
* </ul>
*/publicclassTestLoadExtJar{// *************************************************************************// PROGRAM// *************************************************************************publicstaticvoidmain(String[] args)throwsException{finalMultipleParameterTool params =MultipleParameterTool.fromArgs(args);// set up the execution environmentfinalExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();A a =newA();
a.test();// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);// get input dataDataSet<String> text =null;if(params.has("input")){// union all the inputs from text filesfor(String input : params.getMultiParameterRequired("input")){if(text ==null){
text = env.readTextFile(input);}else{
text = text.union(env.readTextFile(input));}}Preconditions.checkNotNull(text,"Input DataSet should not be null.");}else{// get default test text dataSystem.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");
text =WordCountData.getDefaultTextLineDataSet(env);}DataSet<Tuple2<String,Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(newTokenizer())// group by the tuple field "0" and sum up tuple field "1".groupBy(0).sum(1);// emit resultif(params.has("output")){
counts.writeAsCsv(params.get("output"),"\n"," ");// execute program
env.execute("WordCount Example");}else{System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();}}// *************************************************************************// USER FUNCTIONS// *************************************************************************/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
* form of "(word,1)" ({@code Tuple2<String, Integer>}).
*/publicstaticfinalclassTokenizerimplementsFlatMapFunction<String,Tuple2<String,Integer>>{@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out){// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor(String token : tokens){if(token.length()>0){
out.collect(newTuple2<>(token,1));}}}}}
(2)模拟第三方代码
使用idea 创建一个maven项目,创建一个A类很简单,供TestLoadExtJar 调用
A.java
packagecom.test;publicclassA{publicvoidtest(){System.out.println("A");}}
然后把TestLoadExtJar,模拟第三方代码这两个项目打包jar,假如TestLoadExtJar例子打包为TestLoadExtJar.jar 模拟第三方代码打包为testcallextjar-1.0-SNAPSHOT.jar,放在/usr/local/flink-1.13.0/extlib目录下
然后在flink 下运行,先用原来方式运行,然后看报错信息,再加jd参数,指定jar目录,看看能否解决
未加jd参数:
看到没,报类没找到
加了jd参数
然后就可以执行了
【注意】:如果不能从 https://github.com/apache/flink.git下载,可以从https://gitee.com/longsebo/flink.git下载(这个仓库了,代码已经修改)
最后
如果有问题或想沟通,可以留言
版权归原作者 longxibo 所有, 如有侵权,请联系我们删除。