一、环境准备
k8s平台:kubesphere
k8s中每个命名空间都有一个默认服务帐户。但是,default 服务帐户可能没有在 Kubernetes 集群中创建或删除 Pod 的权限。用户可能需要更新 default 服务账号的权限或指定另一个绑定了正确角色的服务账号。
kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
如果不想使用 default 服务帐户,可以使用以下命令创建新的 flink-service-account 服务帐户并设置角色绑定。然后使用 config 选项 -Dkubernetes.service-account=flink-service-account 使 JobManager pod 使用 flink-service-account 服务帐户来创建/删除 TaskManager pod。
kubectl create serviceaccount flink-service-account
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account
若权限不足 flink 在任务启动时将无法创建 akka
其他环境:
java(1.8)、docker(20.10.17)、flink(1.13.6)、mysql(8.0.23)
二、开始部署
2.1 编写 flink 任务
这里创建一个简单的 flink 任务,每秒生成一个随机数写入 mysql 中
packagetest;importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcExecutionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importjava.util.UUID;importjava.util.concurrent.TimeUnit;/**
* @author wjun
* @date 2022/8/4 15:09
* @email [email protected]
* @describe
*/publicclassK8sDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(newSourceFunction<String>(){privatevolatileboolean isRunning =true;@Overridepublicvoidrun(SourceContext<String> ctx)throwsException{while(isRunning){
ctx.collect(UUID.randomUUID().toString());TimeUnit.SECONDS.sleep(1);}}@Overridepublicvoidcancel(){
isRunning =false;}}).addSink(JdbcSink.sink("insert into dev.k8s values(?)",(ps, t)->{
ps.setString(1, t);},JdbcExecutionOptions.builder().withBatchSize(1).build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://***").withUsername("***").withPassword("***").build()));
env.execute();}}
最终将任务打成 jar 包
2.2 构建镜像
这里使用 Application Mode 模式在生产环境可以为应用提供更好的隔离。on k8s 要求代码与 flink 镜像绑定在一起,Application Mode 确保在应用程序终止后正确清理所有 Flink 组件。
使用 flink 社区提供的基础 docker 镜像
FROM flink:1.13.6
RUN mkdir -p $FLINK_HOME/jobs
COPY flink-on-k8s.jar $FLINK_HOME/jobs/flink-on-k8s.jar
最终 dockerfile 的工作空间如下:
构建 docker 镜像
docker build -t super/flink-on-k8s-demo .
2.3 提交任务
使用下面命令提交任务
flink run-application \
--class test.K8sDemo \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=super/flink-on-k8s-demo \
local:///opt/flink/jobs/flink-on-k8s.jar
- –class: 指定任务的主类名
- – target: 指定任务运行模式为 native k8s application
- -Dkubernetes.cluster-id: 指定集群名称并且必须是唯一的,若不指定 flink 将随机生成
- -Dkubernetes.container.image: 用于启动 pod 的镜像
- local: 指定镜像的任务 jar
kubeshpere 平台中查看任务情况
mysql 中观察数据是否写入
2.4 任务取消
kubesphere 平台中点击应用负载-服务,根据提交任务时候指定的 cluster-id 找到对应的 rest 服务
找到 NodePort 端口
使用 节点ip:NodePort 即可进入熟悉的 flink web ui 点击 cancel 即可,同时 kubeshpere 会自动删除与之相关的组件。
这样 flink on k8s 初步的任务提交、运行、取消就搞定啦
版权归原作者 小王是个弟弟 所有, 如有侵权,请联系我们删除。