0


Flink通过Native Kubernetes(k8s)方式Session模式和Application模式进行部署

目录

1. Session模式

1.1 安装Java

java 8的安装,请参考我的这篇博客centos7同时安装java8和openJdk11、windows同时安装java8和openJdk11

1.2 下载Flink并解压

[root@k8s-master ~]# wget https://downloads.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
[root@k8s-master ~]# 
[root@k8s-master ~]# tar -zxvf flink-1.15.0-bin-scala_2.12.tgz
[root@k8s-master ~]# 

1.3 在所有k8s的所有node节点下载flink镜像

[root@k8s-node1 ~]# crictl pull apache/flink:1.15.0-scala_2.12
Image is up to date for sha256:331d27a20cb411d54868766a387c1d6ad98f390528d730d498f8ecd8069df0c6
[root@k8s-node1 ~]# 

1.4 创建namespace、service账号和给账号授权

[root@k8s-master ~]# kubectl create ns flink
namespace/flink created
[root@k8s-master ~]# 
[root@k8s-master ~]# kubectl create serviceaccount flink -n flink
serviceaccount/flink created
[root@k8s-master ~]# 
[root@k8s-master ~]# kubectl create clusterrolebinding flink-role-bind --clusterrole=edit --serviceaccount=flink:flink
clusterrolebinding.rbac.authorization.k8s.io/flink-role-bind created
[root@k8s-master ~]# 

1.5 启动flink的jobmanager

[root@k8s-master ~]# cd flink-1.15.0
[root@k8s-master flink-1.15.0]# 
[root@k8s-master flink-1.15.0]# bin/kubernetes-session.sh \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.rest-service.exposed.type=NodePort \
> -Dkubernetes.cluster-id=flink-cluster \
> -Dkubernetes.jobmanager.cpu=0.2 \
> -Djobmanager.memory.process.size=1024m \
> -Dresourcemanager.taskmanager-timeout=3600000 \
> -Dkubernetes.taskmanager.cpu=0.2 \
> -Dtaskmanager.memory.process.size=1024m \
> -Dtaskmanager.numberOfTaskSlots=1
2022-05-26 17:59:30,644 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, localhost
2022-05-26 17:59:30,646 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2022-05-26 17:59:30,646 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.bind-host, localhost
2022-05-26 17:59:30,646 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2022-05-26 17:59:30,646 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.bind-host, localhost
2022-05-26 17:59:30,647 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.host, localhost
2022-05-26 17:59:30,647 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2022-05-26 17:59:30,647 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2022-05-26 17:59:30,647 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2022-05-26 17:59:30,648 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2022-05-26 17:59:30,648 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: rest.address, localhost
2022-05-26 17:59:30,648 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: rest.bind-address, localhost
2022-05-26 17:59:30,730 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.
2022-05-26 17:59:31,701 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2022-05-26 17:59:31,725 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2022-05-26 17:59:31,726 INFO  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction network memory (57.600mb (60397978 bytes)) is less than its min value 64.000mb (67108864 bytes), min value will be used instead
2022-05-26 17:59:31,886 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
2022-05-26 17:59:31,887 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
2022-05-26 17:59:33,485 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink session cluster flink-cluster successfully, JobManager Web Interface: http://192.168.23.160:32371
[root@k8s-master flink-1.15.0]#

参数说明如下:

  • -Dkubernetes.rest-service.exposed.type=NodePort:表示将flink的8081端口暴露出来,可以在k8s集群外部访问
  • -Dkubernetes.rest-service.exposed.type=NodePort:表示给flink指定一个集群ID。很多k8s的资源名称都会依赖这个集群ID
  • -Dtaskmanager.numberOfTaskSlots=1:表示taskmanager的slot数量

查看jobmanager pod的信息,如下所示

[root@k8s-master ~]# kubectl get pod -n flink
NAME                            READY   STATUS    RESTARTS   AGE
flink-cluster-5dfc5d5f8-l69c2   1/1     Running   0          4m6s
[root@k8s-master ~]# 
[root@k8s-master ~]# kubectl describe pod flink-cluster-5dfc5d5f8-l69c2 -n flink
Name:         flink-cluster-5dfc5d5f8-l69c2
......省略部分......
Tolerations:                 node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                             node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type    Reason     Age    From               Message
  ----    ------     ----   ----               -------
  Normal  Scheduled  4m19s  default-scheduler  Successfully assigned flink/flink-cluster-5dfc5d5f8-l69c2 to k8s-node1
  Normal  Pulled     4m18s  kubelet            Container image "apache/flink:1.15.0-scala_2.12" already present on machine
  Normal  Created    4m18s  kubelet            Created container flink-main-container
  Normal  Started    4m18s  kubelet            Started container flink-main-container
[root@k8s-master ~]# 
[root@k8s-master ~]# kubectl logs flink-cluster-5dfc5d5f8-l69c2 -n flink
sed: couldn't open temporary file /opt/flink/conf/sedfvCviq: Read-only file system
sed: couldn't open temporary file /opt/flink/conf/sed3Hnj0x: Read-only file system
/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only file system
/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
Starting kubernetes-session as a console application on host flink-cluster-5dfc5d5f8-l69c2.
......省略部分......
2022-05-26 10:18:28,537 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Starting the resource manager.
2022-05-26 10:18:38,638 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 pods from previous attempts, current attempt id is 1.
2022-05-26 10:18:38,639 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt.
[root@k8s-master ~]# 

1.6 访问Flink Web页面

查看k8s service

[root@k8s-master ~]# kubectl get svc -n flink
NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)             AGE
flink-cluster        ClusterIP   None           <none>        6123/TCP,6124/TCP   26m
flink-cluster-rest   NodePort    10.96.12.182   <none>        8081:32371/TCP      26m
[root@k8s-master ~]#

访问

http://k8s集群任意节点:32371

,页面如下
Flink Web

1.7 向Flink集群提交任务

向Flink集群提交任务时,jobmanager会启动一个taskmanager。如果同时提交两个任务,会启动两个taskmanager

从远程服务器向Flink集群提交任务命令:

bin/flink run -m 192.168.23.160:32371 examples/batch/WordCount.jar
[root@k8s-master flink-1.15.0]# bin/flink run \
> -e kubernetes-session \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.rest-service.exposed.type=NodePort \
> -Dkubernetes.cluster-id=flink-cluster \
> examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2022-05-26 18:37:45,504 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster flink-cluster successfully, JobManager Web Interface: http://192.168.23.160:32371
Job has been submitted with JobID d133f7e4b38b2acaab927be1b1bae02b
Program execution finished
Job with JobID d133f7e4b38b2acaab927be1b1bae02b has finished.
Job Runtime: 1674 ms
Accumulator Results: 
- 28a985b1493a7d60bc4fe504259ea4cf (java.util.ArrayList) [170 elements]

(a,5)
(action,1)
......省略部分......
(wrong,1)
(you,1)
[root@k8s-master flink-1.15.0]#

1.8 再次查看Pod和Flink Web

Pod信息如下

[root@k8s-master ~]# kubectl get pod -n flink
NAME                            READY   STATUS    RESTARTS   AGE
flink-cluster-5dfc5d5f8-l69c2   1/1     Running   0          17m
flink-cluster-taskmanager-1-1   1/1     Running   0          3m42s
[root@k8s-master ~]# 

Flink Web页面如下
Flink Web

1.9 删除Flink集群

[root@k8s-master ~]# kubectl delete deploy flink-cluster -n flink
deployment.apps "flink-cluster" deleted
[root@k8s-master ~]#

2. Application模式

Application模式提供了更好的隔离。每个Application都会启动一个集群,Application运行完成后就关闭集群

2.1 安装Java

java 8的安装,请参考我的这篇博客centos7同时安装java8和openJdk11、windows同时安装java8和openJdk11

2.2 下载Flink并解压

[root@k8s-master ~]# wget https://downloads.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
[root@k8s-master ~]# 
[root@k8s-master ~]# tar -zxvf flink-1.15.0-bin-scala_2.12.tgz
[root@k8s-master ~]# 

2.3 构建镜像

  1. 新建Dockerfile文件,内容如下
[root@k8s-master ~]# mkdir flink-image
[root@k8s-master ~]#
[root@k8s-master ~]# cd flink-image
[root@k8s-master flink-image]# 
[root@k8s-master flink-image]# cp /root/flink-1.15.0/examples/streaming/TopSpeedWindowing.jar .
[root@k8s-master flink-image]# 
[root@k8s-master flink-image]# cat Dockerfile 
FROM apache/flink:1.15.0-scala_2.12
RUN mkdir -p $FLINK_HOME/usrlib
# Pod的时区默认是UTC,时间会比我们的少八小时。修改时区为Asia/Shanghai
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar

[root@k8s-master flink-image]# 
  1. 然后进行build
[root@k8s-master flink-image]# nerdctl build -t apache/flink:1.15.0-scala_2.12-topSpeedWindowing .
[root@k8s-master flink-image]# 
[root@k8s-master flink-image]# nerdctl images
REPOSITORY                TAG                                                                 IMAGE ID        CREATED           SIZE
docker.io/apache/flink    1.15.0-scala_2.12-topSpeedWindowing                                 ca28d3b370f2    10 minutes ago    529.6 MiB
overlayfs@sha256          ca28d3b370f22b05f02c9955c487461a21c304c8c4b813335c557dba8edd81ab    ca28d3b370f2    10 minutes ago    529.6 MiB
[root@k8s-master flink-image]#
  1. 因为crictl没有build、load、import命令,只有pull命令。所以需要通过nerdctl运行一个私有仓库,将build好的镜像上传到仓库中,具体的操作请参考使用官方的Docker Registry构建私有镜像仓库

如果使用的时Containerd容器运行时,镜像推送push和镜像拉取pull的问题解决可以参考这篇博客,Containerd容器运行时的私有仓库镜像推送push和镜像拉取问题解决

再通过crictl pull拉取镜像

  1. 拉取后的镜像如下所示
[root@k8s-master ~]# crictl images
IMAGE                                                                TAG                                   IMAGE ID            SIZE
192.168.23.160:5000/apache/flink                                     1.15.0-scala_2.12-topSpeedWindowing   1e9d52d3f4d4b       555MB
......省略部分......
[root@k8s-master ~]#

2.4 创建namespace、service账号和给账号授权

[root@k8s-master ~]# kubectl create ns flink
namespace/flink created
[root@k8s-master ~]# 
[root@k8s-master ~]# kubectl create serviceaccount flink -n flink
serviceaccount/flink created
[root@k8s-master ~]# 
[root@k8s-master ~]# kubectl create clusterrolebinding flink-role-bind --clusterrole=edit --serviceaccount=flink:flink
clusterrolebinding.rbac.authorization.k8s.io/flink-role-bind created
[root@k8s-master ~]# 

2.5 启动Application

[root@k8s-master ~]# cd flink-1.15.0
[root@k8s-master flink-1.15.0]# 
[root@k8s-master flink-1.15.0]# bin/flink run-application \
> --target kubernetes-application \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.rest-service.exposed.type=NodePort \
> -Dkubernetes.cluster-id=flink-application-cluster \
> -Dkubernetes.container.image=192.168.23.160:5000/apache/flink:1.15.0-scala_2.12-topSpeedWindowing \
> -Denv.java.opts.jobmanager=-Duser.timezone=GMT+08 \
> -Dkubernetes.jobmanager.cpu=0.2 \
> -Djobmanager.memory.process.size=1024m \
> -Dresourcemanager.taskmanager-timeout=3600000 \
> -Denv.java.opts.taskmanager=-Duser.timezone=GMT+08 \
> -Dkubernetes.taskmanager.cpu=0.2 \
> -Dtaskmanager.memory.process.size=1024m \
> -Dtaskmanager.numberOfTaskSlots=1 \
> local:///opt/flink/usrlib/TopSpeedWindowing.jar \
> --output /opt/flink/log/topSpeedWindowing-output
2022-05-30 13:39:44,480 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
2022-05-30 13:39:44,480 INFO  org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
2022-05-30 13:39:45,850 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster flink-application-cluster successfully, JobManager Web Interface: http://192.168.23.160:31027
[root@k8s-master flink-1.15.0]#

参数说明如下:

  • -Denv.java.opts.jobmanager=-Duser.timezone=GMT+08:虽然jobmanager Pod的时间是当前时间,但是Flink的log还是少八小时。需要配置Flink jobmanager的时区
  • -Denv.java.opts.taskmanager=-Duser.timezone=GMT+08:虽然taskmanager Pod的时间是当前时间,但是Flink的log还是少八小时。需要配置Flink taskmanager的时区
  • local:///opt/flink/usrlib/TopSpeedWindowing.jar:指定Flink Application运行的jar包,目前只能指定本地的jar包
  • –output /opt/flink/log/topSpeedWindowing-output:这里从Flink Web查看taskmanager的output是查看不到的,只能指定output目录进行查看

查看Flink的jobmanager和taskmanager的信息如下:

[root@k8s-master ~]# kubectl get pod -n flink
NAME                                         READY   STATUS    RESTARTS   AGE
flink-application-cluster-667d4f4ccd-8rddd   1/1     Running   0          109s
flink-application-cluster-taskmanager-1-1    1/1     Running   0          39s
[root@k8s-master ~]# 

2.6 访问Flink Web页面

查看k8s service

[root@k8s-master ~]# kubectl get svc -n flink
NAME                             TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)             AGE
flink-application-cluster        ClusterIP   None           <none>        6123/TCP,6124/TCP   5m36s
flink-application-cluster-rest   NodePort    10.96.48.144   <none>        8081:31027/TCP      5m36s
[root@k8s-master ~]#

访问

http://k8s集群任意节点:31027

,页面如下
Flink Web

2.7 进入taskmanager Pod查看output

[root@k8s-master ~]# kubectl exec -it flink-application-cluster-taskmanager-1-1 -c flink-main-container -n flink -- /bin/bash
root@flink-application-cluster-taskmanager-1-1:/opt/flink# 
root@flink-application-cluster-taskmanager-1-1:/opt/flink# cd log/topSpeedWindowing-output/
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output# 
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output# ls
2022-05-30--13
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output# 
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output# cd 2022-05-30--13/
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output/2022-05-30--13# 
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output/2022-05-30--13# ls -a
.  ..  .part-13a74b8d-fcbd-47e2-bb8f-58511f66c6e5-0.inprogress.001c01f0-43f4-4374-a99e-0be212bf4f54
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output/2022-05-30--13# 
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output/2022-05-30--13# cat .part-13a74b8d-fcbd-47e2-bb8f-58511f66c6e5-0.inprogress.001c01f0-43f4-4374-a99e-0be212bf4f54 
(1,60,31.944444444444443,1653889299191)
(0,50,50.0,1653889299481)
(1,60,31.944444444444443,1653889299191)
......省略部分......
(1,75,58826.38888888935,1653889785193)
(0,100,81430.55555555639,1653889784589)
(1,75,58826.38888888935,1653889785193)
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output/2022-05-30--13#

2.8 Flink命令行查看running job和删除job

查看running job

[root@k8s-master flink-1.15.0]# bin/flink list \
> --target kubernetes-application \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.cluster-id=flink-application-cluster
2022-05-30 14:07:56,460 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
2022-05-30 14:07:56,478 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster flink-application-cluster successfully, JobManager Web Interface: http://192.168.23.160:31027
2022-05-30 14:07:56,510 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
Waiting for response...
------------------ Running/Restarting Jobs -------------------
30.05.2022 13:40:38 : c679ef666f598dfa19f036d6fbd91683 : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
[root@k8s-master flink-1.15.0]#

删除job。如果删除的是Application的最后一个job,会关闭集群

[root@k8s-master flink-1.15.0]# bin/flink cancel \
> --target kubernetes-application \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.cluster-id=flink-application-cluster \
> c679ef666f598dfa19f036d6fbd91683
Cancelling job c679ef666f598dfa19f036d6fbd91683.
2022-05-30 14:09:24,650 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
2022-05-30 14:09:24,672 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Retrieve flink cluster flink-application-cluster successfully, JobManager Web Interface: http://192.168.23.160:31027
2022-05-30 14:09:24,712 WARN  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
Cancelled job c679ef666f598dfa19f036d6fbd91683.
[root@k8s-master flink-1.15.0]#
标签: kubernetes flink k8s

本文转载自: https://blog.csdn.net/yy8623977/article/details/124989262
版权归原作者 Bulut0907 所有, 如有侵权,请联系我们删除。

“Flink通过Native Kubernetes(k8s)方式Session模式和Application模式进行部署”的评论:

还没有评论