目录
1. Session模式
1.1 安装Java
java 8的安装,请参考我的这篇博客centos7同时安装java8和openJdk11、windows同时安装java8和openJdk11
1.2 下载Flink并解压
[root@k8s-master ~]# wget https://downloads.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
[root@k8s-master ~]#
[root@k8s-master ~]# tar -zxvf flink-1.15.0-bin-scala_2.12.tgz
[root@k8s-master ~]#
1.3 在所有k8s的所有node节点下载flink镜像
[root@k8s-node1 ~]# crictl pull apache/flink:1.15.0-scala_2.12
Image is up to date for sha256:331d27a20cb411d54868766a387c1d6ad98f390528d730d498f8ecd8069df0c6
[root@k8s-node1 ~]#
1.4 创建namespace、service账号和给账号授权
[root@k8s-master ~]# kubectl create ns flink
namespace/flink created
[root@k8s-master ~]#
[root@k8s-master ~]# kubectl create serviceaccount flink -n flink
serviceaccount/flink created
[root@k8s-master ~]#
[root@k8s-master ~]# kubectl create clusterrolebinding flink-role-bind --clusterrole=edit --serviceaccount=flink:flink
clusterrolebinding.rbac.authorization.k8s.io/flink-role-bind created
[root@k8s-master ~]#
1.5 启动flink的jobmanager
[root@k8s-master ~]# cd flink-1.15.0
[root@k8s-master flink-1.15.0]#
[root@k8s-master flink-1.15.0]# bin/kubernetes-session.sh \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.rest-service.exposed.type=NodePort \
> -Dkubernetes.cluster-id=flink-cluster \
> -Dkubernetes.jobmanager.cpu=0.2 \
> -Djobmanager.memory.process.size=1024m \
> -Dresourcemanager.taskmanager-timeout=3600000 \
> -Dkubernetes.taskmanager.cpu=0.2 \
> -Dtaskmanager.memory.process.size=1024m \
> -Dtaskmanager.numberOfTaskSlots=1
2022-05-26 17:59:30,644 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost
2022-05-26 17:59:30,646 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123
2022-05-26 17:59:30,646 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.bind-host, localhost
2022-05-26 17:59:30,646 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m
2022-05-26 17:59:30,646 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.bind-host, localhost
2022-05-26 17:59:30,647 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.host, localhost
2022-05-26 17:59:30,647 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2022-05-26 17:59:30,647 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2022-05-26 17:59:30,647 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1
2022-05-26 17:59:30,648 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2022-05-26 17:59:30,648 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: rest.address, localhost
2022-05-26 17:59:30,648 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: rest.bind-address, localhost
2022-05-26 17:59:30,730 INFO org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.
2022-05-26 17:59:31,701 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2022-05-26 17:59:31,725 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2022-05-26 17:59:31,726 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction network memory (57.600mb (60397978 bytes)) is less than its min value 64.000mb (67108864 bytes), min value will be used instead
2022-05-26 17:59:31,886 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
2022-05-26 17:59:31,887 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
2022-05-26 17:59:33,485 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink session cluster flink-cluster successfully, JobManager Web Interface: http://192.168.23.160:32371
[root@k8s-master flink-1.15.0]#
参数说明如下:
- -Dkubernetes.rest-service.exposed.type=NodePort:表示将flink的8081端口暴露出来,可以在k8s集群外部访问
- -Dkubernetes.rest-service.exposed.type=NodePort:表示给flink指定一个集群ID。很多k8s的资源名称都会依赖这个集群ID
- -Dtaskmanager.numberOfTaskSlots=1:表示taskmanager的slot数量
查看jobmanager pod的信息,如下所示
[root@k8s-master ~]# kubectl get pod -n flink
NAME READY STATUS RESTARTS AGE
flink-cluster-5dfc5d5f8-l69c2 1/1 Running 0 4m6s
[root@k8s-master ~]#
[root@k8s-master ~]# kubectl describe pod flink-cluster-5dfc5d5f8-l69c2 -n flink
Name: flink-cluster-5dfc5d5f8-l69c2
......省略部分......
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 4m19s default-scheduler Successfully assigned flink/flink-cluster-5dfc5d5f8-l69c2 to k8s-node1
Normal Pulled 4m18s kubelet Container image "apache/flink:1.15.0-scala_2.12" already present on machine
Normal Created 4m18s kubelet Created container flink-main-container
Normal Started 4m18s kubelet Started container flink-main-container
[root@k8s-master ~]#
[root@k8s-master ~]# kubectl logs flink-cluster-5dfc5d5f8-l69c2 -n flink
sed: couldn't open temporary file /opt/flink/conf/sedfvCviq: Read-only file system
sed: couldn't open temporary file /opt/flink/conf/sed3Hnj0x: Read-only file system
/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only file system
/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
Starting kubernetes-session as a console application on host flink-cluster-5dfc5d5f8-l69c2.
......省略部分......
2022-05-26 10:18:28,537 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Starting the resource manager.
2022-05-26 10:18:38,638 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0 pods from previous attempts, current attempt id is 1.
2022-05-26 10:18:38,639 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt.
[root@k8s-master ~]#
1.6 访问Flink Web页面
查看k8s service
[root@k8s-master ~]# kubectl get svc -n flink
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-cluster ClusterIP None <none> 6123/TCP,6124/TCP 26m
flink-cluster-rest NodePort 10.96.12.182 <none> 8081:32371/TCP 26m
[root@k8s-master ~]#
访问
http://k8s集群任意节点:32371
,页面如下
1.7 向Flink集群提交任务
向Flink集群提交任务时,jobmanager会启动一个taskmanager。如果同时提交两个任务,会启动两个taskmanager
从远程服务器向Flink集群提交任务命令:
bin/flink run -m 192.168.23.160:32371 examples/batch/WordCount.jar
[root@k8s-master flink-1.15.0]# bin/flink run \
> -e kubernetes-session \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.rest-service.exposed.type=NodePort \
> -Dkubernetes.cluster-id=flink-cluster \
> examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2022-05-26 18:37:45,504 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Retrieve flink cluster flink-cluster successfully, JobManager Web Interface: http://192.168.23.160:32371
Job has been submitted with JobID d133f7e4b38b2acaab927be1b1bae02b
Program execution finished
Job with JobID d133f7e4b38b2acaab927be1b1bae02b has finished.
Job Runtime: 1674 ms
Accumulator Results:
- 28a985b1493a7d60bc4fe504259ea4cf (java.util.ArrayList) [170 elements]
(a,5)
(action,1)
......省略部分......
(wrong,1)
(you,1)
[root@k8s-master flink-1.15.0]#
1.8 再次查看Pod和Flink Web
Pod信息如下
[root@k8s-master ~]# kubectl get pod -n flink
NAME READY STATUS RESTARTS AGE
flink-cluster-5dfc5d5f8-l69c2 1/1 Running 0 17m
flink-cluster-taskmanager-1-1 1/1 Running 0 3m42s
[root@k8s-master ~]#
Flink Web页面如下
1.9 删除Flink集群
[root@k8s-master ~]# kubectl delete deploy flink-cluster -n flink
deployment.apps "flink-cluster" deleted
[root@k8s-master ~]#
2. Application模式
Application模式提供了更好的隔离。每个Application都会启动一个集群,Application运行完成后就关闭集群
2.1 安装Java
java 8的安装,请参考我的这篇博客centos7同时安装java8和openJdk11、windows同时安装java8和openJdk11
2.2 下载Flink并解压
[root@k8s-master ~]# wget https://downloads.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
[root@k8s-master ~]#
[root@k8s-master ~]# tar -zxvf flink-1.15.0-bin-scala_2.12.tgz
[root@k8s-master ~]#
2.3 构建镜像
- 新建Dockerfile文件,内容如下
[root@k8s-master ~]# mkdir flink-image
[root@k8s-master ~]#
[root@k8s-master ~]# cd flink-image
[root@k8s-master flink-image]#
[root@k8s-master flink-image]# cp /root/flink-1.15.0/examples/streaming/TopSpeedWindowing.jar .
[root@k8s-master flink-image]#
[root@k8s-master flink-image]# cat Dockerfile
FROM apache/flink:1.15.0-scala_2.12
RUN mkdir -p $FLINK_HOME/usrlib
# Pod的时区默认是UTC,时间会比我们的少八小时。修改时区为Asia/Shanghai
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
[root@k8s-master flink-image]#
- 然后进行build
[root@k8s-master flink-image]# nerdctl build -t apache/flink:1.15.0-scala_2.12-topSpeedWindowing .
[root@k8s-master flink-image]#
[root@k8s-master flink-image]# nerdctl images
REPOSITORY TAG IMAGE ID CREATED SIZE
docker.io/apache/flink 1.15.0-scala_2.12-topSpeedWindowing ca28d3b370f2 10 minutes ago 529.6 MiB
overlayfs@sha256 ca28d3b370f22b05f02c9955c487461a21c304c8c4b813335c557dba8edd81ab ca28d3b370f2 10 minutes ago 529.6 MiB
[root@k8s-master flink-image]#
- 因为crictl没有build、load、import命令,只有pull命令。所以需要通过nerdctl运行一个私有仓库,将build好的镜像上传到仓库中,具体的操作请参考使用官方的Docker Registry构建私有镜像仓库
如果使用的时Containerd容器运行时,镜像推送push和镜像拉取pull的问题解决可以参考这篇博客,Containerd容器运行时的私有仓库镜像推送push和镜像拉取问题解决
再通过crictl pull拉取镜像
- 拉取后的镜像如下所示
[root@k8s-master ~]# crictl images
IMAGE TAG IMAGE ID SIZE
192.168.23.160:5000/apache/flink 1.15.0-scala_2.12-topSpeedWindowing 1e9d52d3f4d4b 555MB
......省略部分......
[root@k8s-master ~]#
2.4 创建namespace、service账号和给账号授权
[root@k8s-master ~]# kubectl create ns flink
namespace/flink created
[root@k8s-master ~]#
[root@k8s-master ~]# kubectl create serviceaccount flink -n flink
serviceaccount/flink created
[root@k8s-master ~]#
[root@k8s-master ~]# kubectl create clusterrolebinding flink-role-bind --clusterrole=edit --serviceaccount=flink:flink
clusterrolebinding.rbac.authorization.k8s.io/flink-role-bind created
[root@k8s-master ~]#
2.5 启动Application
[root@k8s-master ~]# cd flink-1.15.0
[root@k8s-master flink-1.15.0]#
[root@k8s-master flink-1.15.0]# bin/flink run-application \
> --target kubernetes-application \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.rest-service.exposed.type=NodePort \
> -Dkubernetes.cluster-id=flink-application-cluster \
> -Dkubernetes.container.image=192.168.23.160:5000/apache/flink:1.15.0-scala_2.12-topSpeedWindowing \
> -Denv.java.opts.jobmanager=-Duser.timezone=GMT+08 \
> -Dkubernetes.jobmanager.cpu=0.2 \
> -Djobmanager.memory.process.size=1024m \
> -Dresourcemanager.taskmanager-timeout=3600000 \
> -Denv.java.opts.taskmanager=-Duser.timezone=GMT+08 \
> -Dkubernetes.taskmanager.cpu=0.2 \
> -Dtaskmanager.memory.process.size=1024m \
> -Dtaskmanager.numberOfTaskSlots=1 \
> local:///opt/flink/usrlib/TopSpeedWindowing.jar \
> --output /opt/flink/log/topSpeedWindowing-output
2022-05-30 13:39:44,480 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
2022-05-30 13:39:44,480 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
2022-05-30 13:39:45,850 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink application cluster flink-application-cluster successfully, JobManager Web Interface: http://192.168.23.160:31027
[root@k8s-master flink-1.15.0]#
参数说明如下:
- -Denv.java.opts.jobmanager=-Duser.timezone=GMT+08:虽然jobmanager Pod的时间是当前时间,但是Flink的log还是少八小时。需要配置Flink jobmanager的时区
- -Denv.java.opts.taskmanager=-Duser.timezone=GMT+08:虽然taskmanager Pod的时间是当前时间,但是Flink的log还是少八小时。需要配置Flink taskmanager的时区
- local:///opt/flink/usrlib/TopSpeedWindowing.jar:指定Flink Application运行的jar包,目前只能指定本地的jar包
- –output /opt/flink/log/topSpeedWindowing-output:这里从Flink Web查看taskmanager的output是查看不到的,只能指定output目录进行查看
查看Flink的jobmanager和taskmanager的信息如下:
[root@k8s-master ~]# kubectl get pod -n flink
NAME READY STATUS RESTARTS AGE
flink-application-cluster-667d4f4ccd-8rddd 1/1 Running 0 109s
flink-application-cluster-taskmanager-1-1 1/1 Running 0 39s
[root@k8s-master ~]#
2.6 访问Flink Web页面
查看k8s service
[root@k8s-master ~]# kubectl get svc -n flink
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-application-cluster ClusterIP None <none> 6123/TCP,6124/TCP 5m36s
flink-application-cluster-rest NodePort 10.96.48.144 <none> 8081:31027/TCP 5m36s
[root@k8s-master ~]#
访问
http://k8s集群任意节点:31027
,页面如下
2.7 进入taskmanager Pod查看output
[root@k8s-master ~]# kubectl exec -it flink-application-cluster-taskmanager-1-1 -c flink-main-container -n flink -- /bin/bash
root@flink-application-cluster-taskmanager-1-1:/opt/flink#
root@flink-application-cluster-taskmanager-1-1:/opt/flink# cd log/topSpeedWindowing-output/
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output#
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output# ls
2022-05-30--13
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output#
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output# cd 2022-05-30--13/
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output/2022-05-30--13#
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output/2022-05-30--13# ls -a
. .. .part-13a74b8d-fcbd-47e2-bb8f-58511f66c6e5-0.inprogress.001c01f0-43f4-4374-a99e-0be212bf4f54
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output/2022-05-30--13#
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output/2022-05-30--13# cat .part-13a74b8d-fcbd-47e2-bb8f-58511f66c6e5-0.inprogress.001c01f0-43f4-4374-a99e-0be212bf4f54
(1,60,31.944444444444443,1653889299191)
(0,50,50.0,1653889299481)
(1,60,31.944444444444443,1653889299191)
......省略部分......
(1,75,58826.38888888935,1653889785193)
(0,100,81430.55555555639,1653889784589)
(1,75,58826.38888888935,1653889785193)
root@flink-application-cluster-taskmanager-1-1:/opt/flink/log/topSpeedWindowing-output/2022-05-30--13#
2.8 Flink命令行查看running job和删除job
查看running job
[root@k8s-master flink-1.15.0]# bin/flink list \
> --target kubernetes-application \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.cluster-id=flink-application-cluster
2022-05-30 14:07:56,460 WARN org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
2022-05-30 14:07:56,478 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Retrieve flink cluster flink-application-cluster successfully, JobManager Web Interface: http://192.168.23.160:31027
2022-05-30 14:07:56,510 WARN org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
Waiting for response...
------------------ Running/Restarting Jobs -------------------
30.05.2022 13:40:38 : c679ef666f598dfa19f036d6fbd91683 : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
[root@k8s-master flink-1.15.0]#
删除job。如果删除的是Application的最后一个job,会关闭集群
[root@k8s-master flink-1.15.0]# bin/flink cancel \
> --target kubernetes-application \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.cluster-id=flink-application-cluster \
> c679ef666f598dfa19f036d6fbd91683
Cancelling job c679ef666f598dfa19f036d6fbd91683.
2022-05-30 14:09:24,650 WARN org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
2022-05-30 14:09:24,672 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Retrieve flink cluster flink-application-cluster successfully, JobManager Web Interface: http://192.168.23.160:31027
2022-05-30 14:09:24,712 WARN org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
Cancelled job c679ef666f598dfa19f036d6fbd91683.
[root@k8s-master flink-1.15.0]#
版权归原作者 Bulut0907 所有, 如有侵权,请联系我们删除。