😄伙伴们,好久不见!这里是 叶苍ii ❀ 作为一名大数据博主,我一直致力于分享最新的技术趋势和实战经验。近期,我在参加Flink的顾客营销项目,使用了PyFlink项目进行数据处理和分析。 ❀ 在这个文章合集中,我将与大家分享我的实战经验,探索PyFlink项目的魅力。
1. 安装Apache Flink:
- PyFlink是Apache Flink的Python API,因此首先需要安装和配置Apache Flink。
- 我们从Apache Flink官方网站(https://flink.apache.org/)下载最新版本的Flink,并按照官方文档提供的指南进行安装和配置。
- 这个太慢了,我们使用 国内镜像
1.1.1. Flink版本
目前比较新的版本是 V 1.18.0
我查看了一下文档,总结了Flink 1.7、1.8、1.9和 1.18 新版本 重大区别:
Flink 1.7:
- 引入了基于事件时间的处理模式,使得在流处理中更容易处理乱序事件。
- 引入了动态表连接,允许在运行时动态地连接和断开表。
- 引入了Flink SQL的批处理模式,使得可以使用相同的SQL语法进行批处理作业。
- 引入了对Python Table API的初步支持。
Flink 1.8:
- 引入了状态后端的概念,允许用户选择将状态存储在不同的后端(如内存、RocksDB等)。
- 引入了异步快照机制,提高了检查点性能。
- 引入了对Python DataStream API的支持,使得可以使用Python编写Flink流处理作业。
- 引入了对Elasticsearch的集成,使得可以直接将数据写入Elasticsearch。
Flink 1.9:
- 引入了动态表功能,允许在运行时动态地创建、修改和删除表。
- 引入了对Avro格式的原生支持,使得可以直接读取和写入Avro格式的数据。
- 引入了对Kubernetes的本地集群部署支持,简化了在Kubernetes上部署Flink作业的过程。
- 引入了对Python UDF(用户定义函数)的支持,使得可以使用Python编写自定义函数。
Flink 1.18.0:
- 引入了基于时间特征的处理模式,使得可以更容易地处理事件时间和处理时间之间的转换。
- 引入了动态表函数,允许在运行时动态地创建、修改和删除表函数。
- 引入了对Apache Kafka 2.8的原生支持,包括新的Kafka消费者和生产者API。
- 引入了对Apache Iceberg的集成,使得可以直接读取和写入Iceberg格式的数据。
Flink 1.17.0:
- 引入了异步I/O线程池,提高了异步操作的性能和可扩展性。
- 引入了对Python UDF的改进,包括对Python Scalar UDF的支持。
- 引入了对Apache Hudi的集成,使得可以直接读取和写入Hudi格式的数据。
- 引入了对Apache Beam的批处理和流处理Runner的支持。
Flink 1.16:
- 引入了状态后端的改进,包括对RocksDB状态后端的优化和改进。
- 引入了对Apache ORC的集成,使得可以直接读取和写入ORC格式的数据。
- 引入了对Apache Calcite的升级,提供更好的SQL优化和查询计划生成。
从1.7版本之后,逐渐的对Python友好,我们这次可以使用的是 1.18.0版本
我们使用 国内镜像下载:Index of /dist/flink
详细下载地址:
scala : Index of /dist/flink/flink-1.18.0
python: Index of /dist/flink/flink-1.18.0/python
1.1.2. Windows部署Flink
下载完解压 tgz
进入bin 目录,新增两个.bat文件
start-cluster.bat文件 flink.bat文件
::###############################################################################
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
:: regarding copyright ownership. The ASF licenses this file
:: to you under the Apache License, Version 2.0 (the
:: "License"); you may not use this file except in compliance
:: with the License. You may obtain a copy of the License at
::
:: http://www.apache.org/licenses/LICENSE-2.0
::
:: Unless required by applicable law or agreed to in writing, software
:: distributed under the License is distributed on an "AS IS" BASIS,
:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
:: See the License for the specific language governing permissions and
:: limitations under the License.
::###############################################################################
@echo off
setlocal EnableDelayedExpansion
SET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET FLINK_CONF_DIR=%FLINK_HOME%\conf
SET FLINK_LOG_DIR=%FLINK_HOME%\log
SET JVM_ARGS=-Xms1024m -Xmx1024m
SET FLINK_CLASSPATH=%FLINK_LIB_DIR%\*
SET logname_jm=flink-%username%-jobmanager.log
SET logname_tm=flink-%username%-taskmanager.log
SET log_jm=%FLINK_LOG_DIR%\%logname_jm%
SET log_tm=%FLINK_LOG_DIR%\%logname_tm%
SET outname_jm=flink-%username%-jobmanager.out
SET outname_tm=flink-%username%-taskmanager.out
SET out_jm=%FLINK_LOG_DIR%\%outname_jm%
SET out_tm=%FLINK_LOG_DIR%\%outname_tm%
SET log_setting_jm=-Dlog.file="%log_jm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
SET log_setting_tm=-Dlog.file="%log_tm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
:: Log rotation (quick and dirty)
CD "%FLINK_LOG_DIR%"
for /l %%x in (5, -1, 1) do (
SET /A y = %%x+1
RENAME "%logname_jm%.%%x" "%logname_jm%.!y!" 2> nul
RENAME "%logname_tm%.%%x" "%logname_tm%.!y!" 2> nul
RENAME "%outname_jm%.%%x" "%outname_jm%.!y!" 2> nul
RENAME "%outname_tm%.%%x" "%outname_tm%.!y!" 2> nul
)
RENAME "%logname_jm%" "%logname_jm%.0" 2> nul
RENAME "%logname_tm%" "%logname_tm%.0" 2> nul
RENAME "%outname_jm%" "%outname_jm%.0" 2> nul
RENAME "%outname_tm%" "%outname_tm%.0" 2> nul
DEL "%logname_jm%.6" 2> nul
DEL "%logname_tm%.6" 2> nul
DEL "%outname_jm%.6" 2> nul
DEL "%outname_tm%.6" 2> nul
for %%X in (java.exe) do (set FOUND=%%~$PATH:X)
if not defined FOUND (
echo java.exe was not found in PATH variable
goto :eof
)
echo Starting a local cluster with one JobManager process and one TaskManager process.
echo You can terminate the processes via CTRL-C in the spawned shell windows.
echo Web interface by default on http://localhost:8081/.
start /b java %JVM_ARGS% %log_setting_jm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir "%FLINK_CONF_DIR%" > "%out_jm%" 2>&1
start /b java %JVM_ARGS% %log_setting_tm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir "%FLINK_CONF_DIR%" > "%out_tm%" 2>&1
endlocal
::###############################################################################
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
:: regarding copyright ownership. The ASF licenses this file
:: to you under the Apache License, Version 2.0 (the
:: "License"); you may not use this file except in compliance
:: with the License. You may obtain a copy of the License at
::
:: http://www.apache.org/licenses/LICENSE-2.0
::
:: Unless required by applicable law or agreed to in writing, software
:: distributed under the License is distributed on an "AS IS" BASIS,
:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
:: See the License for the specific language governing permissions and
:: limitations under the License.
::###############################################################################
@echo off
setlocal
SET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET JVM_ARGS=-Xmx512m
SET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\*
java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.cli.CliFrontend %*
endlocal
————————————————
版权声明:本文为CSDN博主「向嘴子哥哥」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/FrankHsiang/article/details/131701080
1.1.3. Mac部署Flink
1.1.3.1. 安装brew
使用以下指令一键安装brew
/bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"
开始执行Brew自动安装程序
[[email protected]]
[2020-08-28 11:26:26][10.15]
https://zhuanlan.zhihu.com/p/111014448
请选择一个下载镜像,例如中科大,输入1回车。
源有时候不稳定,如果git克隆报错重新运行脚本选择源。cask非必须,有部分人需要。
1、中科大下载源 2、清华大学下载源 3、北京外国语大学下载源 4、腾讯下载源(不显示下载进度) 5、阿里巴巴下载源(缺少cask源)
请输入序号: 1
你选择了中国科学技术大学下载源
!!!此脚本将要删除之前的brew(包括它下载的软件),请自行备份。
->是否现在开始执行脚本(N/Y)y
--> 脚本开始执行
==> 通过命令删除之前的brew、创建一个新的Homebrew文件夹
(设置开机密码:在左上角苹果图标->系统偏好设置->"用户与群组"->更改密码)
(如果提示This incident will be reported. 在"用户与群组"中查看是否管理员)
请输入开机密码,输入过程不显示,输入完后回车
Password:
开始执行
---备份要删除的文件夹到系统桌面....
---/usr/local/Homebrew 备份完成
-> 创建文件夹 /usr/local/Homebrew
此步骤成功
---备份要删除的文件夹到系统桌面....
---/usr/local/Caskroom 备份完成
-> 创建文件夹 /usr/local/Caskroom
此步骤成功
---备份要删除的文件夹到系统桌面....
---/usr/local/Cellar 备份完成
-> 创建文件夹 /usr/local/Cellar
此步骤成功
---备份要删除的文件夹到系统桌面....
---/usr/local/var/homebrew 备份完成
-> 创建文件夹 /usr/local/var/homebrew
此步骤成功
-> 创建文件夹 /usr/local/var/homebrew/linked
此步骤成功
git version 2.24.3 (Apple G
1.1.3.2. brew_Flink 部署
方式1:brew 在线安装 Flink Mac本地安装、运行_mac flink 关闭-CSDN博客
#1、查看java版本
java -version
#2、安装flink
brew install apache-flink
#3、查看flink版本
flink --version
1.1.3.3. 本地安装利用下好的tgz 安装
方式2 ,上述的brew 安装 失败的话 可以本地安装
Flink项目实践 | Flink 单机安装部署
Flink项目实践 | Flink 单机安装部署-腾讯云开发者社区-腾讯云
2. Python环境:
PyFlink是基于Python的API,所以你需要确保在你的机器上已经安装了Python。推荐使用Python 3.x版本,因为PyFlink对Python 3的支持更好。
2.1.1. python下载
我们从Python官方网站 下载并安装适合你操作系统的Python版本。
下载: Python Release Python 3.11.3 | Python.org
2.1.2. python配置环境变量
python安装及环境变量配置(mac版)
python安装及环境变量配置(mac版)_mac python 环境变量-CSDN博客
2.1.3. FLink包
- PyFlink包:一旦你有了Apache Flink和Python环境,你就可以通过pip或conda等包管理工具安装PyFlink包。运行以下命令即可安装PyFlink:
pip install apache-flink
或者
conda install -c conda-forge pyflink
- 配置文件:在安装完成后,你需要根据你的需求进行一些配置。主要的配置文件是flink-conf.yaml,它位于Flink的安装目录下的conf文件夹中。你可以根据需要调整配置项,如并行度、内存分配、检查点设置等。
- IDE或编辑器:为了编写和运行PyFlink程序,我们需要选择一个适合的集成开发环境(IDE)或文本编辑器。
- 常用的选择包括PyCharm、VS Code、Jupyter Notebook等。确保你在IDE或编辑器中正确配置了Python和PyFlink的环境。
✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈ ✈
Flink实战(1)-了解Flink
Python专题-pip切换
版权归原作者 叶沧ii 所有, 如有侵权,请联系我们删除。