0


PyFlink使用教程,Flink,Python,Java

环境准备

环境要求

Java 11
Python 3.7, 3.8, 3.9 or 3.10

文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/installation/

打开

Anaconda3 Prompt
> java -version
java version "11.0.22"2024-01-16 LTS
Java(TM) SE Runtime Environment 18.9(build 11.0.22+9-LTS-219)
Java HotSpot(TM)64-Bit Server VM 18.9(build 11.0.22+9-LTS-219, mixed mode)> python --version
Python 3.8.8

> conda env list
# conda environments:#
base                  *  d:\ProgramData\Anaconda3
tensorflow2.4            d:\ProgramData\Anaconda3\envs\tensorflow2.4

> conda create -n PyFlink-1.17.1 python==3.8.8

> conda activate PyFlink-1.17.1

> python -m pip install apache-flink==1.17.1

> conda list
# packages in environment at d:\ProgramData\Anaconda3\envs\PyFlink-1.17.1:## Name                    Version                   Build  Channel
apache-beam               2.43.0                   pypi_0    pypi
apache-flink              1.17.1                   pypi_0    pypi
apache-flink-libraries    1.17.1                   pypi_0    pypi
avro-python3              1.9.2.1                  pypi_0    pypi
ca-certificates           2023.12.12           haa95532_0    defaults
certifi                   2023.11.17               pypi_0    pypi
charset-normalizer        3.3.2                    pypi_0    pypi
cloudpickle               2.2.0                    pypi_0    pypi
crcmod                    1.7                      pypi_0    pypi
dill                      0.3.1.1                  pypi_0    pypi
docopt                    0.6.2                    pypi_0    pypi
fastavro                  1.4.7                    pypi_0    pypi
fasteners                 0.19                     pypi_0    pypi
grpcio                    1.60.0                   pypi_0    pypi
hdfs                      2.7.3                    pypi_0    pypi
httplib2                  0.20.4                   pypi_0    pypi
idna                      3.6                      pypi_0    pypi
numpy                     1.21.6                   pypi_0    pypi
objsize                   0.5.2                    pypi_0    pypi
openssl                   1.1.1w               h2bbff1b_0    defaults
orjson                    3.9.12                   pypi_0    pypi
pandas                    1.3.5                    pypi_0    pypi
pip                       23.3.1           py38haa95532_0    defaults
proto-plus                1.23.0                   pypi_0    pypi
protobuf                  3.20.3                   pypi_0    pypi
py4j                      0.10.9.7                 pypi_0    pypi
pyarrow                   8.0.0                    pypi_0    pypi
pydot                     1.4.2                    pypi_0    pypi
pymongo                   3.13.0                   pypi_0    pypi
pyparsing                 3.1.1                    pypi_0    pypi
python                    3.8.0                hff0d562_2    defaults
python-dateutil           2.8.2                    pypi_0    pypi
pytz                      2023.3.post1             pypi_0    pypi
regex                     2023.12.25               pypi_0    pypi
requests                  2.31.0                   pypi_0    pypi
setuptools                68.2.2           py38haa95532_0    defaults
six                       1.16.0                   pypi_0    pypi
sqlite                    3.41.2               h2bbff1b_0    defaults
typing-extensions         4.9.0                    pypi_0    pypi
urllib3                   2.1.0                    pypi_0    pypi
vc                        14.2                 h21ff451_1    defaults
vs2015_runtime            14.27.29016          h5e58377_2    defaults
wheel                     0.41.2           py38haa95532_0    defaults
zstandard                 0.22.0                   pypi_0    pypi

下载的包存储在

Anaconda3\envs\PyFlink-1.17.1\Lib\site-packages

PyFlink 案例

从Flink 1.11版本开始, PyFlink 作业支持在 Windows 系统上运行,因此您也可以在 Windows 上开发和调试 PyFlink 作业了。

打开 VSCode 切换到 PyFlink-1.17.1 环境,按照 教程 写一个 Table API 的示例

learn_pyflink/tableAPIJob.py
import argparse
import logging
import sys

from pyflink.common import Row
from pyflink.table import(EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

word_count_data =["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]defword_count(input_path, output_path):
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one file
    t_env.get_config().set("parallelism.default","1")# define the sourceif input_path isnotNone:
        t_env.create_temporary_table('source',
            TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())
        tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")
        tab = t_env.from_elements(map(lambda i:(i,), word_count_data),
                                  DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path isnotNone:
        t_env.create_temporary_table('sink',
            TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")
        t_env.create_temporary_table('sink',
            TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])defsplit(line: Row):for s in line[0].split():yield Row(s)# compute word count
    tab.flat_map(split).alias('word') \
        .group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert('sink') \
        .wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ =='__main__':
    logging.basicConfig(stream=sys.stdout,
                        level=logging.INFO,format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
        dest='input',
        required=False,help='Input file to process.')
    parser.add_argument('--output',
        dest='output',
        required=False,help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)

要在

PyFlink-1.17.1

环境下运行

> python tableAPIJob.py

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
+I[To, 1]
+I[be,, 1]
+I[or, 1]
+I[not, 1]
+I[to, 1]
+I[be,--that, 1]
+I[is, 1]
+I[the, 1]
+I[question:--, 1]
+I[Whether, 1]
+I['tis, 1]
+I[nobler, 1]
+I[in, 1]
-U[the, 1]
+U[the, 2]
+I[mind, 1]
-U[to, 1]
+U[to, 2]...

提交 PyFlink 作业到 Flink

参考:https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/cli/#submitting-pyflink-jobs

我的 Flink 是安装在 WSL 上面的,因此也要准备环境。

下载 java-11-linux:https://download.oracle.com/otn/java/jdk/11.0.22%2B9/8662aac2120442c2a89b1ee9c67d7069/jdk-11.0.22_linux-x64_bin.tar.gz

>tar -zxf jdk-11.0.22_linux-x64_bin.tar.gz -C /usr/lib/jdk

# 生成 jre> bin/jlink --module-path jmods --add-modules java.desktop --output jre

>vi /etc/profile

exportJAVA_HOME=/usr/lib/jdk/jdk-11.0.22
exportJRE_HOME=${JAVA_HOME}/jre    
exportCLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib    
exportPATH=${JAVA_HOME}/bin:$PATH>source /etc/profile 

>ls -l /usr/bin/python*

lrwxrwxrwx 1 root root       9 Mar 262019 /usr/bin/python3 -> python3.7
lrwxrwxrwx 1 root root      16 Mar 262019 /usr/bin/python3-config -> python3.7-config
-rwxr-xr-x 1 root root    1018 Mar  42018 /usr/bin/python3-jsondiff
-rwxr-xr-x 1 root root    3661 Mar  42018 /usr/bin/python3-jsonpatch
-rwxr-xr-x 1 root root    1342 May  22016 /usr/bin/python3-jsonpointer
-rwxr-xr-x 1 root root     398 Nov 222018 /usr/bin/python3-jsonschema
-rwxr-xr-x 2 root root 4877888 Apr  32019 /usr/bin/python3.7
lrwxrwxrwx 1 root root      33 Apr  32019 /usr/bin/python3.7-config -> x86_64-linux-gnu-python3.7-config
-rwxr-xr-x 2 root root 4877888 Apr  32019 /usr/bin/python3.7m
lrwxrwxrwx 1 root root      34 Apr  32019 /usr/bin/python3.7m-config -> x86_64-linux-gnu-python3.7m-config
lrwxrwxrwx 1 root root      10 Mar 262019 /usr/bin/python3m -> python3.7m
lrwxrwxrwx 1 root root      17 Mar 262019 /usr/bin/python3m-config -> python3.7m-config

> python --version

Command 'python' not found, but can be installed with:

aptinstall python3         # version 3.7.3-1, oraptinstall python          # version 2.7.16-1aptinstall python-minimal  # version 2.7.16-1

You also have python3 installed, you can run 'python3' instead.

> python3 --version
Python 3.7.3

# 已经安装了 python-3.7.3,创建一个软连接即可>ln -s /usr/bin/python3.7 /usr/local/bin/python

> python --version
Python 3.7.3

# 设置镜像源,否则会非常慢> python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip
> pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

# 启动 Flink-1.17.1> bin/start-cluster.sh

除此之外,在 WSL 上还需要安装有此python脚本依赖的库,也就是

apache-flink

库。因为 Flink 需要调用 python 命令来解析 pytion 脚本,这里面涉及到 python 和 java 之间的通讯。这一块还只是在 Flink 客户端上面(

bin/flink run ...

),而 Flink 的 TaskManager 在运行此任务的时候还需要调用 python 解释器,因为上面代码中有

UDF

函数,这个函数在Java中是不存在的,关于 Flink 支持 Python 任务的内部原理后面再写一篇。

> python -m pip install apache-flink==1.17.1
> pip list

然后将代码中的

.wait()

调用删掉

tab.flat_map(split).alias('word') \
       .group_by(col('word')) \
       .select(col('word'), lit(1).count) \
       .execute_insert('sink')

提交任务

> ./bin/flink run --python /mnt/d/dev/php/magook/trunk/server/learn-python/learn_pyflink/tableAPIJob.py

Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID a18e581d16785a9872336073efdf5df0

来到 webUI 查看任务

在这里插入图片描述

在这里插入图片描述

标签: flink python java

本文转载自: https://blog.csdn.net/raoxiaoya/article/details/135916513
版权归原作者 raoxiaoya 所有, 如有侵权,请联系我们删除。

“PyFlink使用教程,Flink,Python,Java”的评论:

还没有评论