学习文档:《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》
学习笔记如下:
当前,用户可以通过 CLI 提交 PyFlink 作业。对于通过
flink run
提交的 Python 作业,Flink 会执行 python 命令。因此,在启动 Python 作业前,需要先确定当前环境中的 python 命令指向 3.7+ 版本的 Python。
示例:执行一个 PyFlink 作业
$ ./bin/flink run --python examples/python/table/word_count.py
通过
--pyFiles
参数,可以添加 Python 的文件依赖,这个文件可以是 Python 文件、Python 包或其他文件。
示例:执行一个 PyFlink 作业并添加依赖文件
$ ./bin/flink run \--python examples/python/table/word_count.py \--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
通过
--jarFile
参数,可以添加 Python 的 Jar 包依赖。在
--jarFile
参数中添加的 Jar 包将会上传到集群。
实例:执行一个 PyFlink 作业并添加 Jar 包依赖
$ ./bin/flink run \--python examples/python/table/word_count.py \--jarfile<jarFile>
通过
--pyModule
参数,可以以 module 模式执行 PyFlink 任务。
示例:使用 module 模式执行 PyFlink 任务
$ ./bin/flink run \--pyModule word_count \--pyFiles examples/python/table
通过
--jobmanager
参数,可以将 PyFlink 作业提交到指定的 JobManager。
示例:将 PyFlink 任务提交到运行在
<jobmanagerHost>
的指定 JobManager
$ ./bin/flink run \--jobmanager<jobmanagerHost>:8081 \--python examples/python/table/word_count.py
通过
--target
参数,可以使用 YARN Cluster in Per-Job Mode 执行 PyFlink 作业。
示例:将 PyFlink 作业提交 YARN Cluster in Per-Job Mode
$ ./bin/flink run \--target yarn-per-job --python examples/python/table/word_count.py
可以使用
run-application -t yarn-application
命令将 PyFlink 作业提交到 YARN cluster in Application Mode。其中,通过
-pyarch
指定的存档文件将通过 blob 服务器分发到 TaskManagers,其中文件大小限制为 2GB。如果存档文件的大小超过 2GB,则可以先将其上传到分布式文件系统,然后使用命令行选项
-pyarch
指定路径。
示例:将 PyFlink 作业提交到 YARN cluster in Application Mode
$ ./bin/flink run-application -t yarn-application \-Djobmanager.memory.process.size=1024m \-Dtaskmanager.memory.process.size=1024m \-Dyarn.application.name=<ApplicationName>\ -Dyarn.ship-files=/path/to/shipfiles \-pyarch shipfiles/venv.zip \-pyclientexec venv.zip/venv/bin/python3 \-pyexec venv.zip/venv/bin/python3 \-pyfs shipfiles \-pym word_count
在这个任务中,假定 Python 依赖位于
/path/to/shipfiles
中;例如,其中应该包含
venv.zip
和
word_count.py
。
当它在 YARN Application Mode 下的 JobManager 执行作业时,
-pyarch
和
pyfs
中指定的路径是相对于 shipfiles 的路径,
shipfiles
是已发送文件的目录名称。
可以使用
run-application -target kubernetes-application
命令将 PyFlink 作业提交到 native Kubernetes cluster,这需要一个已经安装了 PyFlink 的 Docker 镜像。
示例:将 PyFlink 作业提交到 native Kubernetes cluster
$ ./bin/flink run-application \--target kubernetes-application \--parallelism8\ -Dkubernetes.cluster-id=<ClusterId>\-Dtaskmanager.memory.process.size=4096m \-Dkubernetes.taskmanager.cpu=2\-Dtaskmanager.numberOfTaskSlots=4\-Dkubernetes.container.image.ref=<PyFlinkImageName>\--pyModule word_count \--pyFiles /opt/flink/examples/python/table/word_count.py
在
run
命令行
run-application
命令中支持的参数清单:
-py
/--python
:Python 程序入口脚本,其依赖可以通过--pyFiles
参数添加。-pym
/--pyModule
:Module 模式的 Python 程序入口,这个参数必须与--pyFiles
同时使用。-pyfs
/--pyFiles
:为作业附加自定义文件,这些文件将被添加到本地客户端以及远端 Python UDF 的PYTHONPATH
中;其中后缀为.zip
的文件会被提取出来并添加到PYTHONPATH
中;多个文件之间可以试用,
分隔。例如--pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip
。-pyarch
/--pyArchives
:为作业添加 Python 存档文件;这些存档文件会被提取到 Python UDF Worker 的工作目录中。例如--pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable py37.zip/py37/bin/python
。-pyclientexec
/--pyClientExecutable
:在提交 Python 任务时,以及编译 Python UDF 时使用的 Python 解释器路径。例如--pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python
。-pyexec
/--pyExecutable
:执行 Python UDF 的 Python 解释器路径。例如--pyExecutable /usr/local/bin/python3
。-pyreq
/--pyRequirements
:指定作业所需第三方模块的requirements.txt
文件;这些文件将被安装并添加到 Python UDF 的PYTHONPATH
中。可以选择指定一个目录,用于存储这些依赖的安装包,如果指定了目录,则使用#
作为分隔符。例如--pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir
.
版权归原作者 长行 所有, 如有侵权,请联系我们删除。