0


Prometheus监控Flink CDC任务

一、背景

  • 目前项目存在Flink CDC 同步MySQL数据到列数据库,Flink同步过程中存在任务失败,导致数据同步断开,对业务查询造成了影响。现搭建一个SpringBoot 项目使用micrometer配合Prometheus监控Flink CDC的运行状况,任务失败能够通过钉钉提示。

二、项目准备

  • 已经搭建好一套Prometheus的监控系统,参考Prometheus Alertmanager集成钉钉告警

三、Flink CDC监控项目开发

  • 相关jar导入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--actuator --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- prometheus --><dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- okhttp --><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.60</version></dependency><!--hutool --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.17</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
  • 定义接口ICustomPrometheusService,flinkMeter方法用于获取任务信息。
  • setFlinkUrl和setFlinkJobName方法使用nacos动态刷新功能,可以实时修改期望运行任务名称以及flink API地址。
publicinterfaceICustomPrometheusService{/**
     * flink监控信息
     * @methodName meter
     * @param registry void
     * @author wys
     * @date 2024-08-30
     */publicvoidflinkMeter(MeterRegistry registry);/**
     * 设置Flink API地址
     * @methodName setFlinkUrl
     * @param flinkUrl void
     * @author wys
     * @date 2024-08-30
     */publicvoidsetFlinkUrl(String flinkUrl);/**
     * 设置Flink 数据同步任务名称
     * @methodName setFlinkJobName
     * @param flinkJobName void
     * @author wys
     * @date 2024-08-30
     */publicvoidsetFlinkJobName(String flinkJobName);}
  • 接口实现,这里通过Flink提供的API接口进行任务信息的读取,Gauge 设置任务信息。
  • flinkUrl为Flink提供的API地址,flinkNames属性是我们期望运行的Flink CDC任务名称,多个任务名使用逗号分割。
  • 通过比较期望运行任务和实际运行任务,来设置哪些任务在线,哪些任务离线。离线的任务通过Prometheus发送钉钉通知。
@Slf4j@ServicepublicclassCustomPrometheusServiceImplimplementsICustomPrometheusService{@ResourceprivateRestTemplateUtil restTemplateUtil;@Value("${flink.url}")privateString flinkUrl;@Value("${flink.job.names:no_job}")privateString flinkNames;privateList<String> jobNames=newArrayList<>();privateMap<String,FlinkJobDto> jobMap=newHashMap<>();privateMap<String,Gauge> gaugeMap=newHashMap<>();privatefinalstaticintUP=1;privatefinalstaticintDOWN=0;@PostConstructpublicvoidinitJobNames(){
        jobNames =Arrays.asList(flinkNames.split(","));}@OverridepublicvoidflinkMeter(MeterRegistry registry){List<FlinkJobDto> list=newArrayList<FlinkJobDto>();try{FlinkDto flinkInfo= restTemplateUtil.getForObject(flinkUrl, param ->{},FlinkDto.class);
            list = flinkInfo.getJobs();}catch(Exception e){
            log.error("调用flink API接口失败,怀疑Flink掉线");}//注册运行中的任务for(FlinkJobDto job:list){setFlinkJobInfo(job, registry,UP);}//运行中的任务Set<String> runJobs= list.stream().map(FlinkJobDto::getName).collect(Collectors.toSet());//离线的任务List<String> offlineJob= jobNames.stream().filter(item->!runJobs.contains(item)).collect(Collectors.toList());//存在离线任务则修改状态if(!ObjectUtils.isEmpty(offlineJob)){for(String name:offlineJob){setFlinkJobInfo(getFailJob(name), registry,DOWN);}}//移除的任务List<String> registryJobs=gaugeMap.keySet().stream().collect(Collectors.toList());List<String> removeJobs= registryJobs.stream().filter(item->!jobNames.contains(item)).collect(Collectors.toList());if(!ObjectUtils.isEmpty(removeJobs)){for(String name:removeJobs){Gauge gauge=gaugeMap.get(name);
                registry.remove(gauge);
                jobMap.remove(name);
                gaugeMap.remove(name);}}}/**
     * 注册信息
     * @methodName setFlinkJobInfo
     * @param job
     * @param registry void
     * @author wys
     * @date 2024-08-30
     */privatevoidsetFlinkJobInfo(FlinkJobDto job,MeterRegistry registry,int up){//任务已经注册则修改对象信息if(jobMap.containsKey(job.getName())){FlinkJobDto flinkJob=jobMap.get(job.getName());if(up==1){
                flinkJob=BeanUtil.copyProperties(job,FlinkJobDto.class);}
            flinkJob.setUp(up);return;}Date date =newDate(job.getStartTime());//注册信息Gauge gauge=Gauge.builder("flink.job.infos", job,FlinkJobDto::getUp).tag("name", job.getName()).tag("jid",job.getJid()).tag("startTime",DateUtil.format(date,"yyyy-MM-dd HH:mm:ss")).register(registry);
        job.setUp(up);//存入缓存用于后续动态更新
        jobMap.put(job.getName(), job);
        gaugeMap.put(job.getName(), gauge);}/**
     * 获取失败的任务
     * @methodName getFailJob
     * @param name
     * @return FlinkJobDto
     * @author wys
     * @date 2024-09-02
     */privateFlinkJobDtogetFailJob(String name){FlinkJobDto allFailJob=newFlinkJobDto();
        allFailJob.setName(name);
        allFailJob.setJid("");
        allFailJob.setUp(0);
        allFailJob.setStartTime(newDate().getTime());return allFailJob;}@OverridepublicvoidsetFlinkUrl(String flinkUrl){if(!ObjectUtils.isEmpty(flinkUrl)){this.flinkUrl=flinkUrl;
            log.info("修改Flink API地址:{}",flinkUrl);}}@OverridepublicvoidsetFlinkJobName(String flinkJobName){if(!ObjectUtils.isEmpty(flinkJobName)){this.flinkNames=flinkJobName;
            jobNames =Arrays.asList(flinkNames.split(","));
            log.info("修改Flink同步任务名称:{}",flinkJobName);}}}
  • 监控信息配置
importorg.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importio.micrometer.core.instrument.MeterRegistry;importio.micrometer.core.instrument.config.MeterFilter;@ConfigurationpublicclassMeterRegistrConfig{@BeanMeterRegistryCustomizer<MeterRegistry>configurer(){return(registry)-> registry.config().meterFilter(MeterFilter.denyNameStartsWith("tomcat")).meterFilter(MeterFilter.denyNameStartsWith("logback")).meterFilter(MeterFilter.denyNameStartsWith("system")).meterFilter(MeterFilter.denyNameStartsWith("process")).meterFilter(MeterFilter.denyNameStartsWith("http")).meterFilter(MeterFilter.denyNameStartsWith("jvm"));}}
  • 配置每5秒钟获取一次运行任务
importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;importjavax.annotation.Resource;importorg.springframework.stereotype.Component;importcom.wellsun.custom.prometheus.service.ICustomPrometheusService;importio.micrometer.core.instrument.MeterRegistry;importio.micrometer.core.instrument.binder.MeterBinder;@ComponentpublicclassCustomMeterBinderimplementsMeterBinder{@ResourceprivateICustomPrometheusService customPrometheusService;privateScheduledExecutorService executors =Executors.newSingleThreadScheduledExecutor();@OverridepublicvoidbindTo(MeterRegistry registry){//5秒钟采集一次数据
        executors.scheduleAtFixedRate(()->customPrometheusService.flinkMeter(registry),1,5,TimeUnit.SECONDS);}}
  • nacos配置文件
management.endpoints.web.exposure.include=*
management.metrics.tags.application=${spring.application.name}server.tomcat.mbeanregistry.enabled=true

##flinkAPI地址flink.url= http://xxx:8081/jobs/overview
##同步任务名称flink.job.names=name,name1,name2
  • nacos配置监听
importjavax.annotation.Resource;importorg.springframework.beans.factory.InitializingBean;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importcom.alibaba.cloud.nacos.NacosConfigManager;importcom.alibaba.nacos.api.config.ConfigChangeEvent;importcom.alibaba.nacos.api.config.ConfigChangeItem;importcom.alibaba.nacos.client.config.listener.impl.AbstractConfigChangeListener;importcom.wellsun.custom.prometheus.service.ICustomPrometheusService;@Component(value ="datacenterNacosConfigListener")publicclassNacosConfigListenerextendsAbstractConfigChangeListenerimplementsInitializingBean{@ResourceprivateNacosConfigManager nacosConfigManager;@AutowiredprivateICustomPrometheusService customPrometheusService;@OverridepublicvoidafterPropertiesSet()throwsException{
        nacosConfigManager.getConfigService().addListener("prometheus-config","prometheus-group",this);}@OverridepublicvoidreceiveConfigChange(ConfigChangeEvent event){for(ConfigChangeItem changeItem : event.getChangeItems()){if(changeItem.getKey().equals("flink.url")){
                customPrometheusService.setFlinkUrl(changeItem.getNewValue());}elseif(changeItem.getKey().equals("flink.job.names")){
                customPrometheusService.setFlinkJobName(changeItem.getNewValue());}}}}

四、Prometheus配置

  • Prometheus添加该服务监控信息
  - job_name: 'flink-job'
    static_configs:
      - targets: ['xxx:16100']
        labels:
          __metrics_path__: "/actuator/prometheus"#自定义监控服务
          group: flink
  • 添加监控规则
 - alert: Flink任务下线
   expr: flink_job_infos{job="flink-job"}==0
   for: 20s
   labels:
     severity: 严重告警
   annotations:
     summary: "Flink同步任务失败, 请尽快处理!"
     description: "{{$labels.name}}同步任务失败. "
  • 自定义监控应用作为一个子服务注册到nacos。启动项目查看监控信息。在这里插入图片描述
  • 测试任务上下线,可以看到钉钉推送相应告警在这里插入图片描述

本文转载自: https://blog.csdn.net/qq_34052481/article/details/141859879
版权归原作者 舒适边缘 所有, 如有侵权,请联系我们删除。

“Prometheus监控Flink CDC任务”的评论:

还没有评论