学习文档:《Flink 官方文档 - 应用开发 - Python API - 依赖管理》
学习笔记如下:
在很多场景下,使用 Python API 的程序都需要使用各种依赖,例如:
- 在 Python UDF 中使用第三方 Python 库
- 在使用机器学习时,可能需要在 Python UDF 中载入机器学习模型
当 PyFlink 在本地运行时,用户可以将第三方 Python 库安装在本地 Python 环境中,并将机器学习模型下载到本地。但是,这个方法在远端运行 PyFlink 作业时无法生效。
Python DataStream API 和 Python Table API 都提供了支持各种依赖的 API。如果作业中既有 Python DataStream API 又有 Python Table API,那么应该通过 Python DataStream API 指定特定依赖来保证在 Python DataStream API 和 Python Table API 中均能生效。
Jar 依赖
Python Table API 的指定方法如下:
# Specify a list of jar URLs via "pipeline.jars". The jars are separated by ";"# and will be uploaded to the cluster.# NOTE: Only local file URLs (start with "file://") are supported.
table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")# It looks like the following on Windows:
table_env.get_config().set("pipeline.jars","file:///E:/my/jar/path/connector.jar;file:///E:/my/jar/path/udf.jar")# Specify a list of URLs via "pipeline.classpaths". The URLs are separated by ";" # and will be added to the classpath during job execution.# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
table_env.get_config().set("pipeline.classpaths","file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
Python DataStream API 的指定方法如下:
# Use the add_jars() to add local jars and the jars will be uploaded to the cluster.# NOTE: Only local file URLs (start with "file://") are supported.
stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar","file:///my/jar/path/connector2.jar")# It looks like the following on Windows:
stream_execution_environment.add_jars("file:///E:/my/jar/path/connector1.jar","file:///E:/my/jar/path/connector2.jar")# Use the add_classpaths() to add the dependent jars URLs into the classpath.# The URLs will also be added to the classpath of both the client and the cluster.# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the # URLs are accessible on both the client and the cluster.
stream_execution_environment.add_classpaths("file:///my/jar/path/connector1.jar","file:///my/jar/path/connector2.jar")
或者在提交作业时通过命令行参数
--jarfile
指定。但是通过
--jarfile
参数只能指定一个 jar 包,所以需要如果有多个 jar 包的话,制作一个 far jar。
Python 依赖
Python libraries
如果你需要在 Python UDF 中使用第三方 Python 仓库,可以使用直接引入 Python 仓库的方式。
Python Table API 的指定方法:
table_env.add_python_file(file_path)
Python DataStream API 的指定方法:
stream_execution_environment.add_python_file(file_path)
也可以通过配置
python.files
或通过命令行参数
-pyfs
或
--pyFiles
指定。
这里设置的 Python libraries 可以是本地文件或本地路径,它们会被添加到 Python UDF Worker 的 PYTHONPATH 中。
requirements.txt
也可以通过指定
requirements.txt
来定义依赖的第三方 Python 模块。这些 Python 依赖会被安装到工作目录中,并添加到 Python UDF Worker 的 PYTHONPATH 中。
可以通过如下命令将指定库添加到
requirements.txt
中:
echonumpy==1.16.5 >> requirements.txt
echopandas==1.0.0 >> requirements.txt
或通过
pip freeze
命令将当前 Python 环境中的所有包添加到
requirements.txt
中:
pip freeze > requirements.txt
requirements.txt
中的内容类似于:
numpy==1.16.5
pandas==1.0.0
也可以手动修改
requirement.txt
中的内容。
在 Python Table API 中,可以使用如下方法添加
requirements.txt
依赖:
# requirements_cache_dir is optional
table_env.set_python_requirements(
requirements_file_path="/path/to/requirements.txt",
requirements_cache_dir="cached_dir")
在 Python DataStream API 中,可以使用如下方法添加
requirements.txt
依赖:
# requirements_cache_dir is optional
stream_execution_environment.set_python_requirements(
requirements_file_path="/path/to/requirements.txt",
requirements_cache_dir="cached_dir")
对于集群中无法访问的依赖,可以使用参数
requirements_cached_dir
指定一个包含这些依赖的安装包的目录。它将上传到集群以支持离线安装。您可以按照以下方式准备
requirements_cache_dir
:
pip download -d cached_dir -r requirements.txt --no-binary :all:
需要确保准备的安装包的能够适配 Flink 集群和 Python 环境。
也可以通过在
python.requirements
配置或通过命令行参数
-pyreq
或
--pyRequirements
来指定
requirements.txt
。
Flink 将会使用 pip 来安装
requirements.txt
中的包,所以需要确保 pip >= 20.3 和 setuptools >= 37.0.0 是可用的。
Archives
也可以通过存档文件的方法指定一个自定义的 Python 虚拟环境、数据文件等。
Python Table API 使用存档文件的方法:
table_env.add_python_archive(archive_path="/path/to/archive_file", target_dir=None)
Python DataStream API 使用存档文件的方法:
stream_execution_environment.add_python_archive(archive_path="/path/to/archive_file", target_dir=None)
其中,
target_dir
参数是可选的,如果添加了该参数,那么存档文件中的数据将在被抽取到被
target_dir
指定的目录中;否则,存档文件中的数据将被抽取到以它自身命名的路径中。
示例:在使用
target_dir
参数时配置并使用存档文件
table_env.add_python_archive("/path/to/py_env.zip","myenv")
defmy_udf():withopen("myenv/py_env/data/data.txt")as f:...
示例:在不使用
target_dir
参数时配置并使用存档文件
table_env.add_python_archive("/path/to/py_env.zip")
defmy_udf():withopen("py_env.zip/py_env/data/data.txt")as f:...
存档文件将被抽取到 Python UDF Worker 的工作目录,通过相对路径就可以直接访问存档文件中的文件。
也可以通过配置
python.archives
或通过命令行参数
-pyarch
或
--pyArchives
来设置存档文件。
如果存档文件中包含一个 Python 的虚拟环境,需要确定该 Python 的虚拟环境能够匹配 Flink 集群运行的平台。
当前,存档文件仅支持 zip 文件或 tar 文件。
Python 解释器
支持通过路径指定 Python UDF Worker 中使用的解释器。
Python Table API 设置 Python UDF 解释器的方法:
table_env.get_config().set_python_executable("/path/to/python")
Python DataStream API 配置 Python UDF 解释器的方法:
stream_execution_environment.set_python_executable("/path/to/python")
支持使用在存档文件中的 Python 解释器环境,如果使用存档文件中的解释器路径,那么需要使用相对路径而不是绝对路径:
# Python Table API
table_env.add_python_archive("/path/to/py_env.zip","venv")
table_env.get_config().set_python_executable("venv/py_env/bin/python")# Python DataStream API
stream_execution_environment.add_python_archive("/path/to/py_env.zip","venv")
stream_execution_environment.set_python_executable("venv/py_env/bin/python")
此外,也可以通过处设置
python.executable
或通过命令行参数
-pyexec
或
--pyExecutable
来设置 Python 解释器。
Python 客户端解释器
在编译 PyFlink 的作业时,需要在客户端侧也有一个 Python 环境用于解析 Python 的自定义函数。
可以通过在当前会话激活的方式指定 Python 解释器:
source my_env/bin/activate
此外也可以通过配置
python.client.executable
,或使用命令行参数
-pyclientexec
或
--pyClientExecutable
,或使用环境变量
PYFLINK_CLIENT_EXECUTABLE
来指定客户端解释器。
指定 Java / Scala 程序中的 Python 依赖
支持再 Java Table API 或纯 SQL 程序中使用 Python UDF。
示例:使用 Python UDF 的 Java Table API 程序
importorg.apache.flink.configuration.CoreOptions;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.TableEnvironment;TableEnvironment tEnv =TableEnvironment.create(EnvironmentSettings.inBatchMode()); tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM,1);// register the Python UDF tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python"); tEnv.createTemporaryView("source", tEnv.fromValues(1L,2L,3L).as("a"));// use Python UDF in the Java Table API program tEnv.executeSql("select add_one(a) as a from source").collect();
然后可以通过 Python 诸如
python.archives
、
python.files
、
python.requirements
、
python.client.executable
、
python.executable
等参数来指定 Python 依赖,也可以在提交作业时通过命令行参数指定。
版权归原作者 长行 所有, 如有侵权,请联系我们删除。