文章目录
概要
利用python装包能力(pip)轻松实现flink安装,并通过flink参数配置,实现job、task节点配置,实现flink集群部署,并通过flink-python语句,将python程序部署到flink集群运行。
例如:以3台Linux虚拟机为基础,搭建flink集群,其中一台为jobmanager节点,本台+另外两台为taskmanager节点,jobmanager启动python任务并分发到task节点。
整体架构流程
技术名词解释
- python
- flink1.18集群
- jobmanager、taskmanager、sqlApi、tableApi
- Linux
技术细节
flink1.18的python接口,需要python版本在3.8以上,例中以python3.10为基础,pip、python命令软连接为pip3、python3,另外python需安装升级openssl、_bz2。
下面是具体安装步骤:
- 首先安装openssl、python3.10、_bz2包
- 安装pyflink,会自动安装好flink
- 3台服务器免密ssh登录
- 3台服务器同样操作安装openssl、python3.10、_bz2、pyflink
- 修改flink配置参数(flink-conf.yaml、master、worker)
- flink-conf、master、worker是参数分发
- 修改启动端口
- 启动flink集群
- 部署python程序到flink集群
编译安装openssl
解压安装包
tar -zxf openssl-1.1.1n.tar.gz
cd openssl-1.1.1n/
./config -fPIC --prefix=/usr/include/openssl enable-shared
make
make install
编译安装python3.10
tar -zxf Python-3.10.0.tgz
mkdir python310
cd Python-3.10.0/
./configure --prefix=/home/python310 --with-zlib=/usr/include/ --with-openssl-rpath=auto --with-openssl=/usr/include/openssl OPENSSL_LDFLAGS=-L/usr/include/openssl OPENSSL_LIBS=-l/usr/include/openssl/ssl OPENSSL_INCLUDES=-I/usr/include/openssl
make -j 4
make install
建立Python3.10和pip3.10的软链:
ln -s /home/python310/bin/python3.10 /usr/bin/python4
ln -s /home/python310/bin/pip3.10 /usr/bin/pip4
安装 _bz2
_bz2 文件放到 /home/python310/lib/python3.10/lib-dynload/_bz2.cpython-310-x86_64-linux-gnu.so
chmod +x _bz2.cpython-310-x86_64-linux-gnu.so
安装pyflink(会自动安装好flink1.18)
pip install apache-flink==1.18 -i https://pypi.tuna.tsinghua.edu.cn/simple
安装pyflink会自动完成flink的安装,无需再单独安装flink,后续只需修改flink参数,启动flink集群即可。
3台服务器免密ssh登录
配置免密钥ssh登录(flink-conf.yaml中host由localhost调整为ip地址后,也要添加本机免密登录,步骤2中拷贝地址写本机ip)
1.A服务器上生成密钥对:ssh-keygen
cd /root/.ssh
2.拷贝公钥到B服务器:ssh-copy-id root@127.0.0.0.2 -p 10022
(如B服务器上已经被拷贝过 则 删除authorized_keys 文件,/root/.ssh 下执行 rfile authorized_keys)
ssh -p ‘10022’ ‘root@127.0.0.0.2’ 查看是否通
修改flink参数
flink集群配置
vim flink-conf.yaml
jobmanager.rpc.address: 127.0.0.1–修改为主节点地址 原为 localhost
jobmanager.bind-host: 0.0.0.0 – 几个 bind-host 都调整为0.0.0.0 不然会报错(Could not connect to BlobServer at address localhost/127.0.0.1:35744)
taskmanager.bind-host: 0.0.0.0 (同上)
taskmanager.host: 127.0.0.1–修改为本task节点ip
jobmanager.memory.process.size: 1600m–修改job节点内存
taskmanager.memory.process.size: 4096m–修改task节点内存
taskmanager.numberOfTaskSlots: 8–修改task任务曹,一般与cpu核数对应
rest.address: 127.0.0.1
rest.bind-address: 0.0.0.0(同上)
vim ./conf/master --集群模式配置为一个jm,ha模式为两个jm
127.0.0.1:8081 --将localhost修改为主节点主机名或者ip 原为localhost:8081
vim ./conf/worker
127.0.0.1–将localhost修改为各个节点主机名 自己的ip
将配置文件分发到各个节点
scp -P 10022 -r /home/python310/lib/python3.10/site-packages/pyflink/conf/workers 127.0.0.2:/home/python310/lib/python3.10/site-packages/pyflink/conf/workers
上边同样语句发送master、flink-conf.yaml
分发完成后需将 work节点的ip调整为自己本机的ip
修改启动端口(如服务器默认端口为 22 则不需要修改,上边所有命令的对应的10022 调整为 22)
修改vim start-cluster.sh ssh -p 10022 -n $FLINK_SSH_OPTS KaTeX parse error: Expected group as argument to '"' at end of input: …/bin/bash -l "{FLINK_BIN_DIR}/jobmanager.sh" start ${master} ${webuiport} &"
修改vim config.sh ssh -p 10022 -n $FLINK_SSH_OPTS KaTeX parse error: Expected group as argument to '"' at end of input: …/bin/bash -l "{FLINK_BIN_DIR}/taskmanager.sh" “${CMD}” &"
至此完成flink集群配置,下边可以启动flink集群,运行flink程序
启动flink集群
job节点cd /home/python310/lib/python3.10/site-packages/pyflink/bin 进入flink安装路径
start-cluster.sh 为启动命令
stop-cluster.sh 为停止命令
./start-cluster.sh 启动flink集群 出现下列命令说明启动完成
Starting cluster.
Starting standalonesession daemon on host 127.0.0.1(服务器名字)
Starting taskexecutor daemon on host 127.0.0.1(服务器名字,task节点有3个)
ps -ef |grep flink 查看进程会发现多了两个进程,一个是job进程 一个是task进程,两个worker节点服务器只有一个task节点。
部署python程序到flink集群
/home/python310/lib/python3.10/site-packages/pyflink/bin/flink run -py /home/datax_pro/main_flink_cdc.py -pyexec /home/python310/bin/python3.10
/home/python310/lib/python3.10/site-packages/pyflink/bin/flink --为flink安装路径
/home/datax_pro/main_flink_cdc.py --为python跑批程序启动路径
/home/python310/bin/python3.10 --为python安装路径
启动后效果如下图
小结
利用python安装flink,简单、易上手,而且通过pyflink接口,将python程序部署到flink集群,可以运用flink集群的优势运行python程序,将简单的python开发与高大上的flink工具结合起来,使数据跑批更快、更高效
版权归原作者 欧阳伯疼 所有, 如有侵权,请联系我们删除。