0


Flink|《Flink 官方文档 - 应用开发 - Python API - 依赖管理》学习笔记

学习文档:《Flink 官方文档 - 应用开发 - Python API - 依赖管理》

学习笔记如下:


在很多场景下,使用 Python API 的程序都需要使用各种依赖,例如:

  • 在 Python UDF 中使用第三方 Python 库
  • 在使用机器学习时,可能需要在 Python UDF 中载入机器学习模型

当 PyFlink 在本地运行时,用户可以将第三方 Python 库安装在本地 Python 环境中,并将机器学习模型下载到本地。但是,这个方法在远端运行 PyFlink 作业时无法生效。

Python DataStream API 和 Python Table API 都提供了支持各种依赖的 API。如果作业中既有 Python DataStream API 又有 Python Table API,那么应该通过 Python DataStream API 指定特定依赖来保证在 Python DataStream API 和 Python Table API 中均能生效。

Jar 依赖

Python Table API 的指定方法如下:

# Specify a list of jar URLs via "pipeline.jars". The jars are separated by ";"# and will be uploaded to the cluster.# NOTE: Only local file URLs (start with "file://") are supported.
table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")# It looks like the following on Windows:
table_env.get_config().set("pipeline.jars","file:///E:/my/jar/path/connector.jar;file:///E:/my/jar/path/udf.jar")# Specify a list of URLs via "pipeline.classpaths". The URLs are separated by ";" # and will be added to the classpath during job execution.# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
table_env.get_config().set("pipeline.classpaths","file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

Python DataStream API 的指定方法如下:

# Use the add_jars() to add local jars and the jars will be uploaded to the cluster.# NOTE: Only local file URLs (start with "file://") are supported.
stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar","file:///my/jar/path/connector2.jar")# It looks like the following on Windows:
stream_execution_environment.add_jars("file:///E:/my/jar/path/connector1.jar","file:///E:/my/jar/path/connector2.jar")# Use the add_classpaths() to add the dependent jars URLs into the classpath.# The URLs will also be added to the classpath of both the client and the cluster.# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the # URLs are accessible on both the client and the cluster.
stream_execution_environment.add_classpaths("file:///my/jar/path/connector1.jar","file:///my/jar/path/connector2.jar")

或者在提交作业时通过命令行参数

--jarfile

指定。但是通过

--jarfile

参数只能指定一个 jar 包,所以需要如果有多个 jar 包的话,制作一个 far jar。

Python 依赖

Python libraries

如果你需要在 Python UDF 中使用第三方 Python 仓库,可以使用直接引入 Python 仓库的方式。

Python Table API 的指定方法:

table_env.add_python_file(file_path)

Python DataStream API 的指定方法:

stream_execution_environment.add_python_file(file_path)

也可以通过配置

python.files

或通过命令行参数

-pyfs

--pyFiles

指定。

这里设置的 Python libraries 可以是本地文件或本地路径,它们会被添加到 Python UDF Worker 的 PYTHONPATH 中。

requirements.txt

也可以通过指定

requirements.txt

来定义依赖的第三方 Python 模块。这些 Python 依赖会被安装到工作目录中,并添加到 Python UDF Worker 的 PYTHONPATH 中。

可以通过如下命令将指定库添加到

requirements.txt

中:

echonumpy==1.16.5 >> requirements.txt
echopandas==1.0.0 >> requirements.txt

或通过

pip freeze

命令将当前 Python 环境中的所有包添加到

requirements.txt

中:

pip freeze > requirements.txt
requirements.txt

中的内容类似于:

numpy==1.16.5
pandas==1.0.0

也可以手动修改

requirement.txt

中的内容。

在 Python Table API 中,可以使用如下方法添加

requirements.txt

依赖:

# requirements_cache_dir is optional
table_env.set_python_requirements(
    requirements_file_path="/path/to/requirements.txt",
    requirements_cache_dir="cached_dir")

在 Python DataStream API 中,可以使用如下方法添加

requirements.txt

依赖:

# requirements_cache_dir is optional
stream_execution_environment.set_python_requirements(
    requirements_file_path="/path/to/requirements.txt",
    requirements_cache_dir="cached_dir")

对于集群中无法访问的依赖,可以使用参数

requirements_cached_dir

指定一个包含这些依赖的安装包的目录。它将上传到集群以支持离线安装。您可以按照以下方式准备

requirements_cache_dir

pip download -d cached_dir -r requirements.txt --no-binary :all:

需要确保准备的安装包的能够适配 Flink 集群和 Python 环境。

也可以通过在

python.requirements

配置或通过命令行参数

-pyreq

--pyRequirements

来指定

requirements.txt

Flink 将会使用 pip 来安装

requirements.txt

中的包,所以需要确保 pip >= 20.3 和 setuptools >= 37.0.0 是可用的。

Archives

也可以通过存档文件的方法指定一个自定义的 Python 虚拟环境、数据文件等。

Python Table API 使用存档文件的方法:

table_env.add_python_archive(archive_path="/path/to/archive_file", target_dir=None)

Python DataStream API 使用存档文件的方法:

stream_execution_environment.add_python_archive(archive_path="/path/to/archive_file", target_dir=None)

其中,

target_dir

参数是可选的,如果添加了该参数,那么存档文件中的数据将在被抽取到被

target_dir

指定的目录中;否则,存档文件中的数据将被抽取到以它自身命名的路径中。

示例:在使用

target_dir

参数时配置并使用存档文件

table_env.add_python_archive("/path/to/py_env.zip","myenv")
defmy_udf():withopen("myenv/py_env/data/data.txt")as f:...

示例:在不使用

target_dir

参数时配置并使用存档文件

table_env.add_python_archive("/path/to/py_env.zip")
defmy_udf():withopen("py_env.zip/py_env/data/data.txt")as f:...

存档文件将被抽取到 Python UDF Worker 的工作目录,通过相对路径就可以直接访问存档文件中的文件。

也可以通过配置

python.archives

或通过命令行参数

-pyarch

--pyArchives

来设置存档文件。

如果存档文件中包含一个 Python 的虚拟环境,需要确定该 Python 的虚拟环境能够匹配 Flink 集群运行的平台。

当前,存档文件仅支持 zip 文件或 tar 文件。

Python 解释器

支持通过路径指定 Python UDF Worker 中使用的解释器。

Python Table API 设置 Python UDF 解释器的方法:

table_env.get_config().set_python_executable("/path/to/python")

Python DataStream API 配置 Python UDF 解释器的方法:

stream_execution_environment.set_python_executable("/path/to/python")

支持使用在存档文件中的 Python 解释器环境,如果使用存档文件中的解释器路径,那么需要使用相对路径而不是绝对路径:

# Python Table API
table_env.add_python_archive("/path/to/py_env.zip","venv")
table_env.get_config().set_python_executable("venv/py_env/bin/python")# Python DataStream API
stream_execution_environment.add_python_archive("/path/to/py_env.zip","venv")
stream_execution_environment.set_python_executable("venv/py_env/bin/python")

此外,也可以通过处设置

python.executable

或通过命令行参数

-pyexec

--pyExecutable

来设置 Python 解释器。

Python 客户端解释器

在编译 PyFlink 的作业时,需要在客户端侧也有一个 Python 环境用于解析 Python 的自定义函数。

可以通过在当前会话激活的方式指定 Python 解释器:

source my_env/bin/activate

此外也可以通过配置

python.client.executable

,或使用命令行参数

-pyclientexec

--pyClientExecutable

,或使用环境变量

PYFLINK_CLIENT_EXECUTABLE

来指定客户端解释器。

指定 Java / Scala 程序中的 Python 依赖

支持再 Java Table API 或纯 SQL 程序中使用 Python UDF。

示例:使用 Python UDF 的 Java Table API 程序

importorg.apache.flink.configuration.CoreOptions;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.TableEnvironment;TableEnvironment tEnv =TableEnvironment.create(EnvironmentSettings.inBatchMode());
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM,1);// register the Python UDF
tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");

tEnv.createTemporaryView("source", tEnv.fromValues(1L,2L,3L).as("a"));// use Python UDF in the Java Table API program
tEnv.executeSql("select add_one(a) as a from source").collect();

然后可以通过 Python 诸如

python.archives

python.files

python.requirements

python.client.executable

python.executable

等参数来指定 Python 依赖,也可以在提交作业时通过命令行参数指定。

标签: flink Python API 部署

本文转载自: https://blog.csdn.net/Changxing_J/article/details/135819830
版权归原作者 长行 所有, 如有侵权,请联系我们删除。

“Flink|《Flink 官方文档 - 应用开发 - Python API - 依赖管理》学习笔记”的评论:

还没有评论