0


flink on k8s

一、环境准备

k8s平台:kubesphere

image-20220804180605541

k8s中每个命名空间都有一个默认服务帐户。但是,default 服务帐户可能没有在 Kubernetes 集群中创建或删除 Pod 的权限。用户可能需要更新 default 服务账号的权限或指定另一个绑定了正确角色的服务账号。

kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default

如果不想使用 default 服务帐户,可以使用以下命令创建新的 flink-service-account 服务帐户并设置角色绑定。然后使用 config 选项 -Dkubernetes.service-account=flink-service-account 使 JobManager pod 使用 flink-service-account 服务帐户来创建/删除 TaskManager pod。

kubectl create serviceaccount flink-service-account
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account

若权限不足 flink 在任务启动时将无法创建 akka

其他环境:

java(1.8)、docker(20.10.17)、flink(1.13.6)、mysql(8.0.23)

二、开始部署

2.1 编写 flink 任务

这里创建一个简单的 flink 任务,每秒生成一个随机数写入 mysql 中

packagetest;importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcExecutionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importjava.util.UUID;importjava.util.concurrent.TimeUnit;/**
 * @author wjun
 * @date 2022/8/4 15:09
 * @email [email protected]
 * @describe
 */publicclassK8sDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

    env.addSource(newSourceFunction<String>(){privatevolatileboolean isRunning =true;@Overridepublicvoidrun(SourceContext<String> ctx)throwsException{while(isRunning){
          ctx.collect(UUID.randomUUID().toString());TimeUnit.SECONDS.sleep(1);}}@Overridepublicvoidcancel(){
        isRunning =false;}}).addSink(JdbcSink.sink("insert into dev.k8s values(?)",(ps, t)->{
        ps.setString(1, t);},JdbcExecutionOptions.builder().withBatchSize(1).build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://***").withUsername("***").withPassword("***").build()));

    env.execute();}}

最终将任务打成 jar 包

2.2 构建镜像

这里使用 Application Mode 模式在生产环境可以为应用提供更好的隔离。on k8s 要求代码与 flink 镜像绑定在一起,Application Mode 确保在应用程序终止后正确清理所有 Flink 组件。

使用 flink 社区提供的基础 docker 镜像

FROM flink:1.13.6
RUN mkdir -p $FLINK_HOME/jobs
COPY flink-on-k8s.jar $FLINK_HOME/jobs/flink-on-k8s.jar

最终 dockerfile 的工作空间如下:

image-20220804174533420

构建 docker 镜像

docker build -t super/flink-on-k8s-demo .

image-20220804174634513

2.3 提交任务

使用下面命令提交任务

flink run-application \
--class test.K8sDemo \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=super/flink-on-k8s-demo \
local:///opt/flink/jobs/flink-on-k8s.jar
  • –class: 指定任务的主类名
  • – target: 指定任务运行模式为 native k8s application
  • -Dkubernetes.cluster-id: 指定集群名称并且必须是唯一的,若不指定 flink 将随机生成
  • -Dkubernetes.container.image: 用于启动 pod 的镜像
  • local: 指定镜像的任务 jar

image-20220804175056425

kubeshpere 平台中查看任务情况

image-20220804175158040

mysql 中观察数据是否写入

image-20220804175252557

2.4 任务取消

kubesphere 平台中点击应用负载-服务,根据提交任务时候指定的 cluster-id 找到对应的 rest 服务

image-20220804175447672

找到 NodePort 端口

image-20220804175625323

使用 节点ip:NodePort 即可进入熟悉的 flink web ui 点击 cancel 即可,同时 kubeshpere 会自动删除与之相关的组件。

这样 flink on k8s 初步的任务提交、运行、取消就搞定啦

标签: flink java kubernetes

本文转载自: https://blog.csdn.net/qq_41858402/article/details/126164308
版权归原作者 小王是个弟弟 所有, 如有侵权,请联系我们删除。

“flink on k8s”的评论:

还没有评论