一、概述
- Flink 核心是一个流式的数据流执行引擎,并且能够基于同一个 Flink 运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。
- Flink 官网
- 不同版本的文档
- flink on k8s 官方文档
- GitHub 地址
二、Flink 运行模式
- 官方文档
- Flink on yarn 有三种运行模式:
- yarn-session 模式(Seesion Mode)
- yarn-cluster 模式(Per-Job Mode)
- Application 模式(Application Mode)
- 注意:Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用,它将被丢弃在 FLINK-26000 中。
三、Flink on k8s 实战操作
① flink 下载
- 下载地址,如下所示:
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
② 构建基础镜像
docker pull apache/flink:1.14.6-scala_2.12
docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12
docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12
③ session 模式
- Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行,可以在一个 Session 集群上运行多个 Flink 作业,每个作业都需要在集群部署完成后提交到集群。
- Kubernetes 中的 Flink Session 集群部署至少包含三个组件:
- 运行 JobManager 的部署;
- TaskManagers 池的部署;
- 暴露JobManager 的 REST 和 UI 端口的服务。
(A)Native Kubernetes 模式
- 参数配置:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace
- 构建镜像 Dockerfile:
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai/etc/localtime && echo "Asia/Shanghai">/etc/timezone
RUN export LANG=zh_CN.UTF-8
- 开始构建镜像:
docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12.--no-cache
# 上传镜像
docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12
- 创建命名空间和 serviceaccount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
- 创建 flink 集群:
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort
- 提交任务(注意 jdk 版本,目前 jdk8 是正常的):
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar
# 参数配置
./examples/streaming/WordCount.jar
-Dkubernetes.taskmanager.cpu=2000m \
-Dexternal-resource.limits.kubernetes.cpu=4000m \
-Dexternal-resource.limits.kubernetes.memory=10Gi \
-Dexternal-resource.requests.kubernetes.cpu=2000m \
-Dexternal-resource.requests.kubernetes.memory=8Gi \
-Dkubernetes.taskmanager.cpu=2000m \
- 查看:
kubectl get pods -n flink
kubectl logs -f my-first-flink-cluster-taskmanager-1-1
- 删除 flink 集群:
kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force
(B)Standalone 模式
- 构建镜像:默认用户是 flink 用户,这里换成 admin,根据企业需要更换用户,脚本可以通过上面运行的 pod 拿到。启动脚本 docker-entrypoint.sh:
#!/usr/bin/env bash
###############################################################################
# Licensed to the ApacheSoftwareFoundation(ASF) under one
# or more contributor license agreements.See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.TheASF licenses this file
# to you under the ApacheLicense,Version2.0(the
# "License"); you may not use this file except in compliance
# with the License.You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS"BASIS,
# WITHOUTWARRANTIESORCONDITIONSOFANYKIND, either express or implied.
# See the Licensefor the specific language governing permissions and
# limitations under the License.
###############################################################################
COMMAND_STANDALONE="standalone-job"COMMAND_HISTORY_SERVER="history-server"
# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd(){if[ $(id -u)!=0]; then
# Don't need to drop privs ifEUID!=0return
elif [-x /sbin/su-exec ]; then
# Alpine
echo su-exec admin
else
# Others
echo gosu admin
fi
}copy_plugins_if_required(){if[-z "$ENABLE_BUILT_IN_PLUGINS"]; then
return0
fi
echo "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS"| tr ';'' ');do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar}
mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if[!-e "${FLINK_HOME}/opt/${target_plugin}"]; then
echo "Plugin ${target_plugin} does not exist. Exiting."
exit 1else
ln -fs "${FLINK_HOME}/opt/${target_plugin}""${FLINK_HOME}/plugins/${plugin_name}"
echo "Successfully enabled ${target_plugin}"
fi
done
}set_config_option(){
local option=$1
local value=$2
# escape periods for usage in regular expressions
local escaped_option=$(echo ${option}| sed -e "s/\./\\\./g")
# either override an existing entry, or append a new one
if grep -E"^${escaped_option}:.*""${CONF_FILE}">/dev/null; then
sed -i -e "s/${escaped_option}:.*/$option: $value/g""${CONF_FILE}"else
echo "${option}: ${value}">>"${CONF_FILE}"
fi
}prepare_configuration(){
set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
set_config_option blob.server.port 6124
set_config_option query.server.port 6125if[-n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"]; then
set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi
if[-n "${FLINK_PROPERTIES}"]; then
echo "${FLINK_PROPERTIES}">>"${CONF_FILE}"
fi
envsubst <"${CONF_FILE}">"${CONF_FILE}.tmp"&& mv "${CONF_FILE}.tmp""${CONF_FILE}"}maybe_enable_jemalloc(){if["${DISABLE_JEMALLOC:-false}"=="false"]; then
JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if[-f "$JEMALLOC_PATH"]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
elif [-f "$JEMALLOC_FALLBACK"]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif["$JEMALLOC_PATH"="$JEMALLOC_FALLBACK"]; then
MSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
fi
echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
fi
fi
}
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@")if["$1"="help"]; then
printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
printf " Or $(basename "$0") help\n\n"
printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
exit 0
elif ["$1"="jobmanager"]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd)"$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif ["$1"= ${COMMAND_STANDALONE}]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd)"$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif ["$1"= ${COMMAND_HISTORY_SERVER}]; then
args=("${args[@]:1}")
echo "Starting History Server"
exec $(drop_privs_cmd)"$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif ["$1"="taskmanager"]; then
args=("${args[@]:1}")
echo "Starting Task Manager"
exec $(drop_privs_cmd)"$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi
args=("${args[@]}")
# Running command in pass-through mode
exec $(drop_privs_cmd)"${args[@]}"
- 编排 Dockerfile:
FROM myharbor.com/bigdata/centos:7.9.2009USER root
# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai/etc/localtime && echo "Asia/Shanghai">/etc/timezone
RUN mkdir -p /opt/apache
ADD jdk-8u212-linux-x64.tar.gz /opt/apache/ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/ENVFLINK_HOME/opt/apache/flink-1.14.6ENVJAVA_HOME/opt/apache/jdk1.8.0_212ENVPATH$JAVA_HOME/bin:$PATH
# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/
#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/RUN chmod +x /opt/apache/docker-entrypoint.sh
RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME--uid=9999--gid=admin admin
RUN chown -R admin:admin /opt/apache
#设置的工作目录
WORKDIR$FLINK_HOME
# 对外暴露端口
EXPOSE61238081
# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT["/opt/apache/docker-entrypoint.sh"]CMD["help"]
- 开始构建镜像:
docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12.--no-cache
# 上传镜像
docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
# 删除镜像
docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
- 创建命名空间和 serviceaccount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
- 编排 yaml 文件:
- flink-configuration-configmap.yaml:
apiVersion: v1
kind:ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml:|+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots:2
blob.server.port:6124
jobmanager.rpc.port:6123
taskmanager.rpc.port:6122
queryable-state.proxy.ports:6125
jobmanager.memory.process.size: 3200m
taskmanager.memory.process.size: 2728m
taskmanager.memory.flink.size: 2280m
parallelism.default:2
log4j-console.properties:|+
# This affects logging for both user code and Flink
rootLogger.level =INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
rootLogger.appenderRef.rolling.ref=RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level =INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO.The root logger does not override this.You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level =INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level =INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level =INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level =INFO
# Log all infos to the console
appender.console.name =ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern =%d{yyyy-MM-dd HH:mm:ss,SSS}%-5p %-60c %x -%m%n
# Log all infos in the given rolling file
appender.rolling.name =RollingFileAppender
appender.rolling.type=RollingFile
appender.rolling.append =false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type=PatternLayout
appender.rolling.layout.pattern =%d{yyyy-MM-dd HH:mm:ss,SSS}%-5p %-60c %x -%m%n
appender.rolling.policies.type=Policies
appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type=DefaultRolloverStrategy
appender.rolling.strategy.max =10
# Suppress the irrelevant(wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level =OFF
- jobmanager-service.yaml可选服务,仅非 HA 模式需要:
apiVersion: v1
kind:Service
metadata:
name: flink-jobmanager
spec:type:ClusterIP
ports:- name: rpc
port:6123- name: blob-server
port:6124- name: webui
port:8081
selector:
app: flink
component: jobmanager
- jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口:
apiVersion: v1
kind:Service
metadata:
name: flink-jobmanager-rest
spec:type:NodePort
ports:- name: rest
port:8081
targetPort:8081
nodePort:30081
selector:
app: flink
component: jobmanager
- taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口:
apiVersion: v1
kind:Service
metadata:
name: flink-taskmanager-query-state
spec:type:NodePort
ports:- name: query-state
port:6125
targetPort:6125
nodePort:30025
selector:
app: flink
component: taskmanager
- 如下几个配置文件是公共的:
- jobmanager-session-deployment-non-ha.yaml:
apiVersion: apps/v1
kind:Deployment
metadata:
name: flink-jobmanager
spec:
replicas:1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:- name: jobmanager
image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
args:["jobmanager"]
ports:- containerPort:6123
name: rpc
- containerPort:6124
name: blob-server
- containerPort:8081
name: webui
livenessProbe:
tcpSocket:
port:6123
initialDelaySeconds:30
periodSeconds:60
volumeMounts:- name: flink-config-volume
mountPath:/opt/apache/flink-1.14.6/conf/
securityContext:
runAsUser:9999 # refers to user _flink_ from official flink image, change if necessary
volumes:- name: flink-config-volume
configMap:
name: flink-config
items:- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- taskmanager-session-deployment.yaml:
apiVersion: apps/v1
kind:Deployment
metadata:
name: flink-taskmanager
spec:
replicas:2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:- name: taskmanager
image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
args:["taskmanager"]
ports:- containerPort:6122
name: rpc
- containerPort:6125
name: query-state
livenessProbe:
tcpSocket:
port:6122
initialDelaySeconds:30
periodSeconds:60
volumeMounts:- name: flink-config-volume
mountPath:/opt/apache/flink-1.14.6/conf/
securityContext:
runAsUser:9999 # refers to user _flink_ from official flink image, change if necessary
volumes:- name: flink-config-volume
configMap:
name: flink-config
items:- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- 创建 flink 集群:
- 如下所示:
kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink
# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink
# Create the deployments for the cluster
kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
kubectl create -f taskmanager-session-deployment.yaml -n flink
- 镜像逆向解析 dockerfile:
alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler"
whaler flink:1.14.6-scala_2.12
- 查看:
kubectl get pods,svc -n flink -owide
- Web UI 地址。
- 提交任务:
./bin/flink run -m local-168-182-110:30081./examples/streaming/WordCount.jar
kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink
- 删除 flink 集群:
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f taskmanager-session-deployment.yaml -n flink
kubectl delete -f jobmanager-session-deployment.yaml -n flink
kubectl delete ns flink --force
- 访问 flink web:
- 端口就是 jobmanager-rest-service.yaml 文件中的 NodePort:
http://192.168.182.110:30081/#/overview
④ application 模式(推荐)
- Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件:
- 运行 JobManager 的应用程序;
- TaskManagers 池的部署;
- 暴露 JobManager 的 REST 和 UI 端口的服务。
(A)Native Kubernetes 模式(常用)
- 构建镜像 Dockerfile:
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai/etc/localtime && echo "Asia/Shanghai">/etc/timezone
RUN export LANG=zh_CN.UTF-8RUN mkdir -p $FLINK_HOME/usrlib
COPYTopSpeedWindowing.jar $FLINK_HOME/usrlib/
- 开始构建镜像:
docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12.--no-cache
# 上传镜像
docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
# 删除镜像
docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
- 创建命名空间和 serviceacount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
- 创建 flink 集群并提交任务:
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \
-Dkubernetes.jobmanager.replicas=1 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dexternal-resource.limits.kubernetes.cpu=2000m \
-Dexternal-resource.limits.kubernetes.memory=2Gi \
-Dexternal-resource.requests.kubernetes.cpu=1000m \
-Dexternal-resource.requests.kubernetes.memory=1Gi \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:///opt/flink/usrlib/TopSpeedWindowing.jar
- local 是应用模式中唯一支持的方案,local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。查看:
kubectl get pods pods,svc -n flink
kubectl logs -f my-first-application-cluster-taskmanager-1-1-n flink
- 删除 flink 集群:
kubectl delete deployment/my-first-application-cluster -n flink
kubectl delete ns flink --force
(B)Standalone 模式
- 构建镜像 Dockerfile,启动脚本 docker-entrypoint.sh:
#!/usr/bin/env bash
###############################################################################
# Licensed to the ApacheSoftwareFoundation(ASF) under one
# or more contributor license agreements.See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.TheASF licenses this file
# to you under the ApacheLicense,Version2.0(the
# "License"); you may not use this file except in compliance
# with the License.You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS"BASIS,
# WITHOUTWARRANTIESORCONDITIONSOFANYKIND, either express or implied.
# See the Licensefor the specific language governing permissions and
# limitations under the License.
###############################################################################
COMMAND_STANDALONE="standalone-job"COMMAND_HISTORY_SERVER="history-server"
# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd(){if[ $(id -u)!=0]; then
# Don't need to drop privs ifEUID!=0return
elif [-x /sbin/su-exec ]; then
# Alpine
echo su-exec admin
else
# Others
echo gosu admin
fi
}copy_plugins_if_required(){if[-z "$ENABLE_BUILT_IN_PLUGINS"]; then
return0
fi
echo "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS"| tr ';'' ');do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar}
mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if[!-e "${FLINK_HOME}/opt/${target_plugin}"]; then
echo "Plugin ${target_plugin} does not exist. Exiting."
exit 1else
ln -fs "${FLINK_HOME}/opt/${target_plugin}""${FLINK_HOME}/plugins/${plugin_name}"
echo "Successfully enabled ${target_plugin}"
fi
done
}set_config_option(){
local option=$1
local value=$2
# escape periods for usage in regular expressions
local escaped_option=$(echo ${option}| sed -e "s/\./\\\./g")
# either override an existing entry, or append a new one
if grep -E"^${escaped_option}:.*""${CONF_FILE}">/dev/null; then
sed -i -e "s/${escaped_option}:.*/$option: $value/g""${CONF_FILE}"else
echo "${option}: ${value}">>"${CONF_FILE}"
fi
}prepare_configuration(){
set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
set_config_option blob.server.port 6124
set_config_option query.server.port 6125if[-n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"]; then
set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi
if[-n "${FLINK_PROPERTIES}"]; then
echo "${FLINK_PROPERTIES}">>"${CONF_FILE}"
fi
envsubst <"${CONF_FILE}">"${CONF_FILE}.tmp"&& mv "${CONF_FILE}.tmp""${CONF_FILE}"}maybe_enable_jemalloc(){if["${DISABLE_JEMALLOC:-false}"=="false"]; then
JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if[-f "$JEMALLOC_PATH"]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
elif [-f "$JEMALLOC_FALLBACK"]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif["$JEMALLOC_PATH"="$JEMALLOC_FALLBACK"]; then
MSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
fi
echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
fi
fi
}
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@")if["$1"="help"]; then
printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
printf " Or $(basename "$0") help\n\n"
printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
exit 0
elif ["$1"="jobmanager"]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd)"$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif ["$1"= ${COMMAND_STANDALONE}]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd)"$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif ["$1"= ${COMMAND_HISTORY_SERVER}]; then
args=("${args[@]:1}")
echo "Starting History Server"
exec $(drop_privs_cmd)"$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif ["$1"="taskmanager"]; then
args=("${args[@]:1}")
echo "Starting Task Manager"
exec $(drop_privs_cmd)"$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi
args=("${args[@]}")
# Running command in pass-through mode
exec $(drop_privs_cmd)"${args[@]}"
- 编排 Dockerfile:
FROM myharbor.com/bigdata/centos:7.9.2009USER root
# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai/etc/localtime && echo "Asia/Shanghai">/etc/timezone
RUN mkdir -p /opt/apache
ADD jdk-8u212-linux-x64.tar.gz /opt/apache/ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/ENVFLINK_HOME/opt/apache/flink-1.14.6ENVJAVA_HOME/opt/apache/jdk1.8.0_212ENVPATH$JAVA_HOME/bin:$PATH
# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/
#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME--uid=9999--gid=admin admin
RUN chown -R admin:admin /opt/apache
RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh
#设置的工作目录
WORKDIR$FLINK_HOME
# 对外暴露端口
EXPOSE61238081
# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT["/opt/apache/docker-entrypoint.sh"]CMD["help"]
docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12.--no-cache
# 上传镜像
docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
# 删除镜像
docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
- 创建命名空间和 serviceacount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
- 编排 yaml 文件:
- flink-configuration-configmap.yaml:
apiVersion: v1
kind:ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml:|+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots:2
blob.server.port:6124
jobmanager.rpc.port:6123
taskmanager.rpc.port:6122
queryable-state.proxy.ports:6125
jobmanager.memory.process.size: 3200m
taskmanager.memory.process.size: 2728m
taskmanager.memory.flink.size: 2280m
parallelism.default:2
log4j-console.properties:|+
# This affects logging for both user code and Flink
rootLogger.level =INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
rootLogger.appenderRef.rolling.ref=RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level =INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO.The root logger does not override this.You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level =INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level =INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level =INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level =INFO
# Log all infos to the console
appender.console.name =ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern =%d{yyyy-MM-dd HH:mm:ss,SSS}%-5p %-60c %x -%m%n
# Log all infos in the given rolling file
appender.rolling.name =RollingFileAppender
appender.rolling.type=RollingFile
appender.rolling.append =false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type=PatternLayout
appender.rolling.layout.pattern =%d{yyyy-MM-dd HH:mm:ss,SSS}%-5p %-60c %x -%m%n
appender.rolling.policies.type=Policies
appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type=DefaultRolloverStrategy
appender.rolling.strategy.max =10
# Suppress the irrelevant(wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level =OFF
- jobmanager-service.yaml可选服务,仅非 HA 模式需要:
apiVersion: v1
kind:Service
metadata:
name: flink-jobmanager
spec:type:ClusterIP
ports:- name: rpc
port:6123- name: blob-server
port:6124- name: webui
port:8081
selector:
app: flink
component: jobmanager
- jobmanager-rest-service.yaml 可选服务,将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口:
apiVersion: v1
kind:Service
metadata:
name: flink-jobmanager-rest
spec:type:NodePort
ports:- name: rest
port:8081
targetPort:8081
nodePort:30081
selector:
app: flink
component: jobmanager
- taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口:
apiVersion: v1
kind:Service
metadata:
name: flink-taskmanager-query-state
spec:type:NodePort
ports:- name: query-state
port:6125
targetPort:6125
nodePort:30025
selector:
app: flink
component: taskmanager
- jobmanager-application-non-ha.yaml,非高可用(注意这里的挂载 /mnt/bigdata/flink/usrlib,最好这里使用共享目录):
apiVersion: batch/v1
kind:Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy:OnFailure
containers:- name: jobmanager
image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
env:
args:["standalone-job","--job-classname","org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"]
ports:- containerPort:6123
name: rpc
- containerPort:6124
name: blob-server
- containerPort:8081
name: webui
livenessProbe:
tcpSocket:
port:6123
initialDelaySeconds:30
periodSeconds:60
volumeMounts:- name: flink-config-volume
mountPath:/opt/apache/flink-1.14.6/conf
- name: job-artifacts-volume
mountPath:/opt/apache/flink-1.14.6/usrlib
securityContext:
runAsUser:9999 # refers to user _flink_ from official flink image, change if necessary
volumes:- name: flink-config-volume
configMap:
name: flink-config
items:- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path:/mnt/nfsdata/flink/application/job-artifacts
- taskmanager-job-deployment.yaml:
apiVersion: apps/v1
kind:Deployment
metadata:
name: flink-taskmanager
spec:
replicas:2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:- name: taskmanager
image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
env:
args:["taskmanager"]
ports:- containerPort:6122
name: rpc
- containerPort:6125
name: query-state
livenessProbe:
tcpSocket:
port:6122
initialDelaySeconds:30
periodSeconds:60
volumeMounts:- name: flink-config-volume
mountPath:/opt/apache/flink-1.14.6/conf
- name: job-artifacts-volume
mountPath:/opt/apache/flink-1.14.6/usrlib
securityContext:
runAsUser:9999 # refers to user _flink_ from official flink image, change if necessary
volumes:- name: flink-config-volume
configMap:
name: flink-config
items:- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path:/mnt/nfsdata/flink/application/job-artifacts
- 创建 flink 集群并提交任务:
kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink
# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink
# Create the deployments for the cluster
kubectl create -f jobmanager-application-non-ha.yaml -n flink
kubectl create -f taskmanager-job-deployment.yaml -n flink
- 查看:
kubectl get pods,svc -n flink
- 删除 flink 集群:
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f jobmanager-rest-service.yaml -n flink
kubectl delete -f taskmanager-query-state-service.yaml -n flink
kubectl delete -f jobmanager-application-non-ha.yaml -n flink
kubectl delete -f taskmanager-job-deployment.yaml -n flink
kubectl delete ns flink --force
- 查看:
kubectl get pods,svc -n flink
kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash
本文转载自: https://blog.csdn.net/Forever_wj/article/details/131490066
版权归原作者 ╰つ栺尖篴夢ゞ 所有, 如有侵权,请联系我们删除。
版权归原作者 ╰つ栺尖篴夢ゞ 所有, 如有侵权,请联系我们删除。