参考文章:
https://www.cnblogs.com/liugp/p/16755095.html
如果你的程序中需要用到HDFS(一般是checkpint的存储目录),可以先参考上一篇文章:
Hadoop、Hive On k8s
《Hadoop、Hive On k8s》
介绍
Flink On K8s 有两种方式
- session:先启动一个jobmanager,在提交任务,启动taskmanger
- application:直接将jar提交运行,每一个jar所在的环境都是单独的(主要讲这个)
组件版本:
- hadoop:2.7.2
- flink:1.15.1
一、构建DockerFile
vim dockerfile-flink-1.15.1
# 截至2023-03-01日,在dockerhub能下载最新的flink镜像版本为:flink:1.14.2-scala_2.12
# 这里将dockerhub的镜像上传到本地的阿里云仓库
# 由于我这边的程序是基于flink-1.15.1版本开发的,所以这边需要手动去升级flink镜像,并且在镜像内部添加一些环境
FROM registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.14.2-scala_2.12
USER root
ENV FLINK_VERSION 1.15.1
ENV SCALA_VERSION 2.12
ENV HADOOP_VERSION 2.7.2
ENV HADOOP_HOME=/opt/hadoop
ENV HADOOP_CONFIG_DIF=/etc/hadoop/conf/
ENV HADOOP_COMMON_HOME=${HADOOP_HOME} \
HADOOP_HDFS_HOME=${HADOOP_HOME} \
HADOOP_MAPRED_HOME=${HADOOP_HOME} \
HADOOP_YARN_HOME=${HADOOP_HOME} \
HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop \
PATH=${PATH}:${HADOOP_HOME}/bin
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
# 手动删除flink-1.14.2的环境,替换为flink-1.15.1
RUN rm -rf $FLINK_HOME/*
COPY flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz /tmp/flink.tgz
RUN tar --directory /opt -xf /tmp/flink.tgz && rm /tmp/flink.tgz
RUN mkdir -p /opt/flink && mv /opt/flink-${FLINK_VERSION}/* /opt/flink/ && rm -rf /opt/flink-${FLINK_VERSION}
# flink的checkpoint需要用到HDFS,引入hadoop环境
COPY flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar /opt/flink/lib/
COPY hadoop-${HADOOP_VERSION}.tar.gz /tmp/hadoop.tgz
RUN tar --directory /opt -xzf /tmp/hadoop.tgz && rm /tmp/hadoop.tgz
RUN ln -s /opt/hadoop-${HADOOP_VERSION} ${HADOOP_HOME}
# 拷贝HDFS的配置文件
COPY hadoop-env.sh ${HADOOP_CONFIG_DIF}/
COPY core-site.xml ${HADOOP_CONFIG_DIF}/
COPY hdfs-site.xml ${HADOOP_CONFIG_DIF}/
ENV FLINK_HOME /opt/flink
ENV PATH=${PATH}:${FLINK_HOME}/bin
RUN chown -R flink:flink /opt/flink
RUN mkdir -p $FLINK_HOME/usrlib
WORKDIR $FLINK_HOME
RUN chown -R flink:flink /opt/flink && chmod -R 666 /opt/flink/conf/*
# 拷贝启动jar和启动配置文件
COPY flink-realtime-1.0-SNAPSHOT.jar $FLINK_HOME/lib/
COPY flink-realtime-hdfs.properties $FLINK_HOME/usrlib/
COPY handle-table.txt $FLINK_HOME/usrlib/
# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["help"]
注意:Dockerfile 中所需的flink、hadoop的tgz包,需要先手动下载至当前目录(速度更快)
docker build -f dockerfile-flink-1.15.1 -t flink:1.15.1 . --no-cache
后续通过
docker tag
+
docker push
上传至自己的镜像仓库
二、Flink环境部署
1、宿主机部署Flink-Client
wget https://archive.apache.org/dist/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
mv flink-1.15.2-bin-scala_2.12.tgz /opt &&cd /opt &&tar-xf flink-1.15.2-bin-scala_2.12.tgz
ll /opt/flink-1.15.1
2、创建命令空间和serviceacount
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
3、提交Flink任务
提交Flink自带测试程序
./bin/flink run-application \--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \-Dkubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05 \-Dkubernetes.jobmanager.replicas=1\-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dexternal-resource.limits.kubernetes.cpu=1000m \
-Dexternal-resource.limits.kubernetes.memory=2Gi \
-Dexternal-resource.requests.kubernetes.cpu=1000m \
-Dexternal-resource.requests.kubernetes.memory=1Gi \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/examples/streaming/TopSpeedWindowing.jar
提交用户自己开发程序
./bin/flink run-application \--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \# 指定容器启动的镜像(与之前提交的保持一致)-Dkubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05 \-Dkubernetes.jobmanager.replicas=1\# 指定容器运行的命名空间-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dexternal-resource.limits.kubernetes.cpu=1000m \
-Dexternal-resource.limits.kubernetes.memory=2Gi \
-Dexternal-resource.requests.kubernetes.cpu=1000m \
-Dexternal-resource.requests.kubernetes.memory=1Gi \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dclassloader.resolve-order=parent-first \# yaml 模板,为解决hosts映射,后续可以通过编排此yaml文件,实现动态替换启动jar包和配置文件
-Dkubernetes.pod-template-file=/opt/flink-1.14.2/flink-templeta.yaml \# Main方法-c com.clb.hadoop.hub.flink.realtime.launch.FlinkConsumeKafkaToHdfs \# 启动Jar包和启动配置文件的绝对路径(容器内部,不是宿主机)
local:///opt/flink/lib/flink-realtime-1.0-SNAPSHOT.jar /opt/flink/usrlib/flink-realtime-hdfs.properties
4、查看任务
kubectl get -n flink pod,svc,deployment,configmap,pv
- pod/flink-cluster-79ff8b6dd-nn6sk:jobmanager服务
- pod/flink-cluster-taskmanager-1-1:taskmanager服务
- pod/hadoop-dn-node1-0:hadoop服务下的datanode
- pod/hadoop-nn-0:hadoop服务下的namenode
可以通过下面命令查看Pod的状态及日志
# 查看jobmanager的日志
kubectl logs flink-cluster-79ff8b6dd-nn6sk -n flink
# 查看taskmanager的日志
kubectl logs flink-cluster-taskmanager-1-1 -n flink
# 查看jobmanager的pod信息
kubectl describe pod flink-cluster-79ff8b6dd-nn6sk -n flink
# 停止刚刚提交的程序
kubectl delete deployment flink-cluster -n flink
三、注意事项:
1、yaml 模板
为什么需要用到yaml模板?
我的代码中用到了kafka组件。比如我当前的kafka环境为:node2:9092、node3:9092、node4:9094
KafkaClient在连接KafkaBroker时,会获取集群内部的所有broker的连接信息。如果我们没有提前映射IP,容器内部去根据node2、node3、node4这些主机名连接对应的服务时会报错(因为/etc/hosts文件内没有映射)
apiVersion: v1
kind: Pod
metadata:name: flink-pod-template
namespace: flink
spec:initContainers:-name: artifacts-fetcher
image: registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05command:volumeMounts:-mountPath: /opt/flink/usrHome
name: flink-usr-home
-mountPath: /opt/flink/usrlib
name: flink-usr-extr-lib
# 提前在容器内部映射域名hostAliases:-ip: 192.168.0.111
hostnames:-"master"-ip: 192.168.0.113
hostnames:-"node2"-ip: 192.168.0.114
hostnames:-"node3"-ip: 192.168.0.115
hostnames:-"node4"containers:# Do not change the main container name-name: flink-main-container
image: registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05resources:requests:ephemeral-storage: 2048Mi
limits:ephemeral-storage: 2048Mi
volumeMounts:-mountPath: /opt/flink/usrHome
name: flink-usr-home
-mountPath: /opt/flink/lib/extr-lib
name: flink-usr-extr-lib
volumes:-name: flink-usr-home
emptyDir:{}-name: flink-usr-extr-lib
emptyDir:{}
2、hadoop-config-volume问题
问题:
如果你的程序时用到了hadoop环境,那么你的taskmanager在启动时可能会卡住
通过命令查看原因:
kubectl describe pod flink-cluster-taskmanager-1-1 -n flink
MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap "hadoop-config-flink-cluster" not found
解决:
添加一个configmap,将HDFS环境下的core-site.xml、hdfs-site.xml添加进去
注意:
- namespace:要与之前创建的namespace保持一致
- app:要与之前提交任务时的kubernetes.cluster-id保持一致
vim hadoop-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:labels:app: flink-cluster
type: flink-native-kubernetes
name: hadoop-config-flink-cluster
namespace: flink
data:core-site.xml:|
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop-nn-0.hadoop-nn-service:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/var/hadoop</value>
</property>
</configuration>hdfs-site.xml:|
<configuration>
<property>
<name>dfs.name.dir</name>
<value>/dfs/nn</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/dfs/dn/data/</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.datanode.registration.ip-hostname-check</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>
启动此configmap
kubectl apply -f hadoop-config.yaml
停止程序
kubectl delete deployment flink-cluster -n flink
,重新提交即可
3、启动参数
-Dresourcemanager.taskmanager-timeout=345600\-Dkubernetes.namespace=flink-session-cluster-test-1213 \
-Dkubernetes.service-account=flink \
-Dkubernetes.cluster-id=flink-stream-reatime-dw11 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://cdh104:8020/flink/recovery \-Dkubernetes.container.image=flink:1.13.2-scala_2.11-java8 \-Dstate.checkpoints.dir=hdfs://cdh104:8020/flink/checkpoints/flink-stream-application-cluster-08 \-Dstate.savepoints.dir=hdfs://cdh104:8020/flink/savepoints/flink-stream-application-cluster-08 \-Dexecution.checkpointing.interval=2s \-Dexecution.checkpointing.mode=EXACTLY_ONCE \-Dstate.backend=filesystem \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Drestart-strategy=failure-rate \
-Drestart-strategy.failure-rate.delay=1s \
-Drestart-strategy.failure-rate.failure-rate-interval=5s \
-Drestart-strategy.failure-rate.max-failures-per-interval=1\-Dtaskmanager.memory.process.size=1096m \-Dkubernetes.taskmanager.cpu=1\-Dtaskmanager.numberOfTaskSlots=1\
四、问题及解决
1、 flink任务的hosts问题?
可以通过flink 提供的yaml模板, 将hosts配置放在yaml中, 然后在命令使用-Dkubernetes.pod-template-file指定
2、 如果使用application模式, 解决自定义jar包不想打入镜像的问题?
可以在yaml模板中, initContainers使用wget方式引入
vi flink-template.yaml
apiVersion: v1
kind: Pod
metadata:name: flink-pod-template
spec:initContainers:-name: artifacts-fetcher
image: native_realtime:1.0.3
# 添加自定义运行的jar包以及各种配置文件command:["/bin/sh","-c"]args:["wget http://xxxxxx:8082/flinkhistory/1.13.2/tt.sql -O /opt/flink/usrHome/taa.sql ; wget http://xxxx:8082/flinkhistory/1.13.2/realtime-dw-service-1.0.1-SNAPSHOT.jar -O /opt/flink/usrHome/realtime-dw-service-1.0.1.jar"]volumeMounts:-mountPath: /opt/flink/usrHome
name: flink-usr-home
hostAliases:-ip: 10.1.1.103
hostnames:-"cdh103"-ip: 10.1.1.104
hostnames:-"cdh104"-ip: 10.1.1.105
hostnames:-"cdh105"-ip: 10.1.1.106
hostnames:-"cdh106"containers:# Do not change the main container name-name: flink-main-container
resources:requests:ephemeral-storage: 2048Mi
limits:ephemeral-storage: 2048Mi
volumeMounts:-mountPath: /opt/flink/usrHome
name: flink-usr-home
volumes:-name: flink-usr-home
hostPath:path: /tmp
type: Directory
版权归原作者 地球人是我哈 所有, 如有侵权,请联系我们删除。