0


Flink On k8s

参考文章:
https://www.cnblogs.com/liugp/p/16755095.html

如果你的程序中需要用到HDFS(一般是checkpint的存储目录),可以先参考上一篇文章:
Hadoop、Hive On k8s
《Hadoop、Hive On k8s》

介绍

Flink On K8s 有两种方式

  • session:先启动一个jobmanager,在提交任务,启动taskmanger
  • application:直接将jar提交运行,每一个jar所在的环境都是单独的(主要讲这个)

组件版本:

  • hadoop:2.7.2
  • flink:1.15.1

一、构建DockerFile

vim dockerfile-flink-1.15.1
# 截至2023-03-01日,在dockerhub能下载最新的flink镜像版本为:flink:1.14.2-scala_2.12
# 这里将dockerhub的镜像上传到本地的阿里云仓库
# 由于我这边的程序是基于flink-1.15.1版本开发的,所以这边需要手动去升级flink镜像,并且在镜像内部添加一些环境
FROM registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.14.2-scala_2.12

USER root

ENV FLINK_VERSION 1.15.1
ENV SCALA_VERSION 2.12
ENV HADOOP_VERSION 2.7.2
ENV HADOOP_HOME=/opt/hadoop
ENV HADOOP_CONFIG_DIF=/etc/hadoop/conf/
ENV HADOOP_COMMON_HOME=${HADOOP_HOME} \
    HADOOP_HDFS_HOME=${HADOOP_HOME} \
    HADOOP_MAPRED_HOME=${HADOOP_HOME} \
    HADOOP_YARN_HOME=${HADOOP_HOME} \
    HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop \
    PATH=${PATH}:${HADOOP_HOME}/bin

RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8

# 手动删除flink-1.14.2的环境,替换为flink-1.15.1
RUN rm -rf $FLINK_HOME/*
COPY flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz /tmp/flink.tgz 
RUN tar --directory /opt -xf /tmp/flink.tgz && rm /tmp/flink.tgz 
RUN mkdir -p /opt/flink && mv /opt/flink-${FLINK_VERSION}/* /opt/flink/ && rm -rf  /opt/flink-${FLINK_VERSION}

# flink的checkpoint需要用到HDFS,引入hadoop环境
COPY flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar /opt/flink/lib/
COPY hadoop-${HADOOP_VERSION}.tar.gz /tmp/hadoop.tgz
RUN tar --directory /opt -xzf /tmp/hadoop.tgz && rm /tmp/hadoop.tgz
RUN ln -s /opt/hadoop-${HADOOP_VERSION} ${HADOOP_HOME}

# 拷贝HDFS的配置文件
COPY hadoop-env.sh ${HADOOP_CONFIG_DIF}/
COPY core-site.xml ${HADOOP_CONFIG_DIF}/
COPY hdfs-site.xml ${HADOOP_CONFIG_DIF}/ 

ENV FLINK_HOME /opt/flink
ENV PATH=${PATH}:${FLINK_HOME}/bin

RUN chown -R flink:flink /opt/flink

RUN mkdir -p $FLINK_HOME/usrlib

WORKDIR $FLINK_HOME

RUN chown -R flink:flink /opt/flink && chmod -R 666 /opt/flink/conf/*

# 拷贝启动jar和启动配置文件
COPY flink-realtime-1.0-SNAPSHOT.jar  $FLINK_HOME/lib/
COPY flink-realtime-hdfs.properties $FLINK_HOME/usrlib/
COPY handle-table.txt $FLINK_HOME/usrlib/

# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["help"]

注意:Dockerfile 中所需的flink、hadoop的tgz包,需要先手动下载至当前目录(速度更快)

docker build -f dockerfile-flink-1.15.1 -t flink:1.15.1 . --no-cache

后续通过

docker tag

+

docker push

上传至自己的镜像仓库
image.png

二、Flink环境部署

1、宿主机部署Flink-Client

wget https://archive.apache.org/dist/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
mv flink-1.15.2-bin-scala_2.12.tgz /opt &&cd /opt &&tar-xf flink-1.15.2-bin-scala_2.12.tgz
ll /opt/flink-1.15.1

image.png

2、创建命令空间和serviceacount

# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

3、提交Flink任务

提交Flink自带测试程序

./bin/flink run-application \--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster  \-Dkubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05 \-Dkubernetes.jobmanager.replicas=1\-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dexternal-resource.limits.kubernetes.cpu=1000m \
-Dexternal-resource.limits.kubernetes.memory=2Gi \
-Dexternal-resource.requests.kubernetes.cpu=1000m \
-Dexternal-resource.requests.kubernetes.memory=1Gi \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/examples/streaming/TopSpeedWindowing.jar

提交用户自己开发程序

./bin/flink run-application \--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster  \# 指定容器启动的镜像(与之前提交的保持一致)-Dkubernetes.container.image=registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05 \-Dkubernetes.jobmanager.replicas=1\# 指定容器运行的命名空间-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dexternal-resource.limits.kubernetes.cpu=1000m \
-Dexternal-resource.limits.kubernetes.memory=2Gi \
-Dexternal-resource.requests.kubernetes.cpu=1000m \
-Dexternal-resource.requests.kubernetes.memory=1Gi \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dclassloader.resolve-order=parent-first \# yaml 模板,为解决hosts映射,后续可以通过编排此yaml文件,实现动态替换启动jar包和配置文件
-Dkubernetes.pod-template-file=/opt/flink-1.14.2/flink-templeta.yaml \# Main方法-c com.clb.hadoop.hub.flink.realtime.launch.FlinkConsumeKafkaToHdfs \# 启动Jar包和启动配置文件的绝对路径(容器内部,不是宿主机)
local:///opt/flink/lib/flink-realtime-1.0-SNAPSHOT.jar /opt/flink/usrlib/flink-realtime-hdfs.properties

4、查看任务

kubectl get -n flink pod,svc,deployment,configmap,pv

image.png

  • pod/flink-cluster-79ff8b6dd-nn6sk:jobmanager服务
  • pod/flink-cluster-taskmanager-1-1:taskmanager服务
  • pod/hadoop-dn-node1-0:hadoop服务下的datanode
  • pod/hadoop-nn-0:hadoop服务下的namenode

可以通过下面命令查看Pod的状态及日志

# 查看jobmanager的日志
kubectl logs flink-cluster-79ff8b6dd-nn6sk  -n flink

# 查看taskmanager的日志
kubectl logs flink-cluster-taskmanager-1-1  -n flink

# 查看jobmanager的pod信息
kubectl describe pod flink-cluster-79ff8b6dd-nn6sk -n flink

# 停止刚刚提交的程序
kubectl delete deployment flink-cluster -n flink

三、注意事项:

1、yaml 模板

为什么需要用到yaml模板?
我的代码中用到了kafka组件。比如我当前的kafka环境为:node2:9092、node3:9092、node4:9094
KafkaClient在连接KafkaBroker时,会获取集群内部的所有broker的连接信息。如果我们没有提前映射IP,容器内部去根据node2、node3、node4这些主机名连接对应的服务时会报错(因为/etc/hosts文件内没有映射)

apiVersion: v1
kind: Pod
metadata:name: flink-pod-template
  namespace: flink
spec:initContainers:-name: artifacts-fetcher
      image: registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05command:volumeMounts:-mountPath: /opt/flink/usrHome
          name: flink-usr-home
        -mountPath: /opt/flink/usrlib
          name: flink-usr-extr-lib
  # 提前在容器内部映射域名hostAliases:-ip: 192.168.0.111
    hostnames:-"master"-ip: 192.168.0.113
    hostnames:-"node2"-ip: 192.168.0.114
    hostnames:-"node3"-ip: 192.168.0.115
    hostnames:-"node4"containers:# Do not change the main container name-name: flink-main-container
      image: registry.cn-hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05resources:requests:ephemeral-storage: 2048Mi
        limits:ephemeral-storage: 2048Mi
      volumeMounts:-mountPath: /opt/flink/usrHome
          name: flink-usr-home
        -mountPath: /opt/flink/lib/extr-lib
          name: flink-usr-extr-lib
  volumes:-name: flink-usr-home
      emptyDir:{}-name: flink-usr-extr-lib
      emptyDir:{}

2、hadoop-config-volume问题

问题:
如果你的程序时用到了hadoop环境,那么你的taskmanager在启动时可能会卡住
image.png
通过命令查看原因:

kubectl describe pod flink-cluster-taskmanager-1-1 -n flink

image.png

MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap "hadoop-config-flink-cluster" not found

解决:
添加一个configmap,将HDFS环境下的core-site.xml、hdfs-site.xml添加进去
注意:

  • namespace:要与之前创建的namespace保持一致
  • app:要与之前提交任务时的kubernetes.cluster-id保持一致
vim hadoop-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:labels:app: flink-cluster
    type: flink-native-kubernetes
  name: hadoop-config-flink-cluster
  namespace: flink
data:core-site.xml:|
          <configuration>
            <property>
                <name>fs.defaultFS</name>
                <value>hdfs://hadoop-nn-0.hadoop-nn-service:8020</value>
            </property>
            <property>
                <name>hadoop.tmp.dir</name>
                <value>/var/hadoop</value>
            </property>
          </configuration>hdfs-site.xml:|
          <configuration>
            <property>
                <name>dfs.name.dir</name>
                <value>/dfs/nn</value>
            </property>
            <property>
                <name>dfs.data.dir</name>
                <value>/dfs/dn/data/</value>
            </property>
            <property>
                <name>dfs.replication</name>
                <value>3</value>
            </property>
            <property>
                <name>dfs.namenode.datanode.registration.ip-hostname-check</name>
                <value>false</value>
            </property>
            <property>
                <name>dfs.datanode.use.datanode.hostname</name>
                <value>true</value>
            </property>
          </configuration>

启动此configmap

kubectl apply -f hadoop-config.yaml

停止程序

kubectl delete deployment flink-cluster -n flink

,重新提交即可

3、启动参数

-Dresourcemanager.taskmanager-timeout=345600\-Dkubernetes.namespace=flink-session-cluster-test-1213 \
-Dkubernetes.service-account=flink \
-Dkubernetes.cluster-id=flink-stream-reatime-dw11 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://cdh104:8020/flink/recovery \-Dkubernetes.container.image=flink:1.13.2-scala_2.11-java8 \-Dstate.checkpoints.dir=hdfs://cdh104:8020/flink/checkpoints/flink-stream-application-cluster-08 \-Dstate.savepoints.dir=hdfs://cdh104:8020/flink/savepoints/flink-stream-application-cluster-08 \-Dexecution.checkpointing.interval=2s \-Dexecution.checkpointing.mode=EXACTLY_ONCE \-Dstate.backend=filesystem \
-Dkubernetes.rest-service.exposed.type=NodePort  \
-Drestart-strategy=failure-rate  \
-Drestart-strategy.failure-rate.delay=1s  \
-Drestart-strategy.failure-rate.failure-rate-interval=5s \
-Drestart-strategy.failure-rate.max-failures-per-interval=1\-Dtaskmanager.memory.process.size=1096m \-Dkubernetes.taskmanager.cpu=1\-Dtaskmanager.numberOfTaskSlots=1\

四、问题及解决

1、 flink任务的hosts问题?

可以通过flink 提供的yaml模板, 将hosts配置放在yaml中, 然后在命令使用-Dkubernetes.pod-template-file指定

2、 如果使用application模式, 解决自定义jar包不想打入镜像的问题?

可以在yaml模板中, initContainers使用wget方式引入

vi flink-template.yaml

apiVersion: v1
kind: Pod
metadata:name: flink-pod-template
spec:initContainers:-name: artifacts-fetcher
      image: native_realtime:1.0.3
      # 添加自定义运行的jar包以及各种配置文件command:["/bin/sh","-c"]args:["wget http://xxxxxx:8082/flinkhistory/1.13.2/tt.sql -O /opt/flink/usrHome/taa.sql ; wget http://xxxx:8082/flinkhistory/1.13.2/realtime-dw-service-1.0.1-SNAPSHOT.jar -O /opt/flink/usrHome/realtime-dw-service-1.0.1.jar"]volumeMounts:-mountPath: /opt/flink/usrHome
          name: flink-usr-home
  hostAliases:-ip: 10.1.1.103
    hostnames:-"cdh103"-ip: 10.1.1.104
    hostnames:-"cdh104"-ip: 10.1.1.105
    hostnames:-"cdh105"-ip: 10.1.1.106
    hostnames:-"cdh106"containers:# Do not change the main container name-name: flink-main-container
      resources:requests:ephemeral-storage: 2048Mi
        limits:ephemeral-storage: 2048Mi
      volumeMounts:-mountPath: /opt/flink/usrHome
          name: flink-usr-home
  volumes:-name: flink-usr-home
      hostPath:path: /tmp
        type: Directory

本文转载自: https://blog.csdn.net/weixin_44865574/article/details/129367383
版权归原作者 地球人是我哈 所有, 如有侵权,请联系我们删除。

“Flink On k8s”的评论:

还没有评论