0


Flink|《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》学习笔记

学习文档:《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》

学习笔记如下:


当前,用户可以通过 CLI 提交 PyFlink 作业。对于通过

flink run

提交的 Python 作业,Flink 会执行 python 命令。因此,在启动 Python 作业前,需要先确定当前环境中的 python 命令指向 3.7+ 版本的 Python。

示例:执行一个 PyFlink 作业

$ ./bin/flink run --python examples/python/table/word_count.py

通过

--pyFiles

参数,可以添加 Python 的文件依赖,这个文件可以是 Python 文件、Python 包或其他文件。

示例:执行一个 PyFlink 作业并添加依赖文件

$ ./bin/flink run \--python examples/python/table/word_count.py \--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt

通过

--jarFile

参数,可以添加 Python 的 Jar 包依赖。在

--jarFile

参数中添加的 Jar 包将会上传到集群。

实例:执行一个 PyFlink 作业并添加 Jar 包依赖

$ ./bin/flink run \--python examples/python/table/word_count.py \--jarfile<jarFile>

通过

--pyModule

参数,可以以 module 模式执行 PyFlink 任务。

示例:使用 module 模式执行 PyFlink 任务

$ ./bin/flink run \--pyModule word_count \--pyFiles examples/python/table

通过

--jobmanager

参数,可以将 PyFlink 作业提交到指定的 JobManager。

示例:将 PyFlink 任务提交到运行在

<jobmanagerHost>

的指定 JobManager

$ ./bin/flink run \--jobmanager<jobmanagerHost>:8081 \--python examples/python/table/word_count.py

通过

--target

参数,可以使用 YARN Cluster in Per-Job Mode 执行 PyFlink 作业。

示例:将 PyFlink 作业提交 YARN Cluster in Per-Job Mode

$ ./bin/flink run \--target yarn-per-job
   --python examples/python/table/word_count.py

可以使用

run-application -t yarn-application

命令将 PyFlink 作业提交到 YARN cluster in Application Mode。其中,通过

-pyarch

指定的存档文件将通过 blob 服务器分发到 TaskManagers,其中文件大小限制为 2GB。如果存档文件的大小超过 2GB,则可以先将其上传到分布式文件系统,然后使用命令行选项

-pyarch

指定路径。

示例:将 PyFlink 作业提交到 YARN cluster in Application Mode

$ ./bin/flink run-application -t yarn-application \-Djobmanager.memory.process.size=1024m \-Dtaskmanager.memory.process.size=1024m \-Dyarn.application.name=<ApplicationName>\
   -Dyarn.ship-files=/path/to/shipfiles \-pyarch shipfiles/venv.zip \-pyclientexec venv.zip/venv/bin/python3 \-pyexec venv.zip/venv/bin/python3 \-pyfs shipfiles \-pym word_count

在这个任务中,假定 Python 依赖位于

/path/to/shipfiles

中;例如,其中应该包含

venv.zip

word_count.py

当它在 YARN Application Mode 下的 JobManager 执行作业时,

-pyarch

pyfs

中指定的路径是相对于 shipfiles 的路径,

shipfiles

是已发送文件的目录名称。

可以使用

run-application -target kubernetes-application

命令将 PyFlink 作业提交到 native Kubernetes cluster,这需要一个已经安装了 PyFlink 的 Docker 镜像。

示例:将 PyFlink 作业提交到 native Kubernetes cluster

$ ./bin/flink run-application \--target kubernetes-application \--parallelism8\
   -Dkubernetes.cluster-id=<ClusterId>\-Dtaskmanager.memory.process.size=4096m \-Dkubernetes.taskmanager.cpu=2\-Dtaskmanager.numberOfTaskSlots=4\-Dkubernetes.container.image.ref=<PyFlinkImageName>\--pyModule word_count \--pyFiles /opt/flink/examples/python/table/word_count.py

run

命令行

run-application

命令中支持的参数清单:

  • -py / --python:Python 程序入口脚本,其依赖可以通过 --pyFiles 参数添加。
  • -pym / --pyModule:Module 模式的 Python 程序入口,这个参数必须与 --pyFiles 同时使用。
  • -pyfs / --pyFiles:为作业附加自定义文件,这些文件将被添加到本地客户端以及远端 Python UDF 的 PYTHONPATH 中;其中后缀为 .zip 的文件会被提取出来并添加到 PYTHONPATH 中;多个文件之间可以试用 , 分隔。例如 --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip
  • -pyarch / --pyArchives:为作业添加 Python 存档文件;这些存档文件会被提取到 Python UDF Worker 的工作目录中。例如 --pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable py37.zip/py37/bin/python
  • -pyclientexec / --pyClientExecutable:在提交 Python 任务时,以及编译 Python UDF 时使用的 Python 解释器路径。例如 --pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python
  • -pyexec / --pyExecutable:执行 Python UDF 的 Python 解释器路径。例如 --pyExecutable /usr/local/bin/python3
  • -pyreq / --pyRequirements:指定作业所需第三方模块的 requirements.txt 文件;这些文件将被安装并添加到 Python UDF 的 PYTHONPATH 中。可以选择指定一个目录,用于存储这些依赖的安装包,如果指定了目录,则使用 # 作为分隔符。例如 --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir.
标签: Flink Python PyFlink

本文转载自: https://blog.csdn.net/Changxing_J/article/details/135760541
版权归原作者 长行 所有, 如有侵权,请联系我们删除。

“Flink|《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》学习笔记”的评论:

还没有评论