0


Prometheus监控Flink CDC任务

一、背景

  • 目前项目存在Flink CDC 同步MySQL数据到列数据库,Flink同步过程中存在任务失败,导致数据同步断开,对业务查询造成了影响。现搭建一个SpringBoot 项目使用micrometer配合Prometheus监控Flink CDC的运行状况,任务失败能够通过钉钉提示。

二、项目准备

  • 已经搭建好一套Prometheus的监控系统,参考Prometheus Alertmanager集成钉钉告警

三、Flink CDC监控项目开发

  • 相关jar导入
  1. <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--actuator --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- prometheus --><dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- okhttp --><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.60</version></dependency><!--hutool --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.17</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
  • 定义接口ICustomPrometheusService,flinkMeter方法用于获取任务信息。
  • setFlinkUrl和setFlinkJobName方法使用nacos动态刷新功能,可以实时修改期望运行任务名称以及flink API地址。
  1. publicinterfaceICustomPrometheusService{/**
  2. * flink监控信息
  3. * @methodName meter
  4. * @param registry void
  5. * @author wys
  6. * @date 2024-08-30
  7. */publicvoidflinkMeter(MeterRegistry registry);/**
  8. * 设置Flink API地址
  9. * @methodName setFlinkUrl
  10. * @param flinkUrl void
  11. * @author wys
  12. * @date 2024-08-30
  13. */publicvoidsetFlinkUrl(String flinkUrl);/**
  14. * 设置Flink 数据同步任务名称
  15. * @methodName setFlinkJobName
  16. * @param flinkJobName void
  17. * @author wys
  18. * @date 2024-08-30
  19. */publicvoidsetFlinkJobName(String flinkJobName);}
  • 接口实现,这里通过Flink提供的API接口进行任务信息的读取,Gauge 设置任务信息。
  • flinkUrl为Flink提供的API地址,flinkNames属性是我们期望运行的Flink CDC任务名称,多个任务名使用逗号分割。
  • 通过比较期望运行任务和实际运行任务,来设置哪些任务在线,哪些任务离线。离线的任务通过Prometheus发送钉钉通知。
  1. @Slf4j@ServicepublicclassCustomPrometheusServiceImplimplementsICustomPrometheusService{@ResourceprivateRestTemplateUtil restTemplateUtil;@Value("${flink.url}")privateString flinkUrl;@Value("${flink.job.names:no_job}")privateString flinkNames;privateList<String> jobNames=newArrayList<>();privateMap<String,FlinkJobDto> jobMap=newHashMap<>();privateMap<String,Gauge> gaugeMap=newHashMap<>();privatefinalstaticintUP=1;privatefinalstaticintDOWN=0;@PostConstructpublicvoidinitJobNames(){
  2. jobNames =Arrays.asList(flinkNames.split(","));}@OverridepublicvoidflinkMeter(MeterRegistry registry){List<FlinkJobDto> list=newArrayList<FlinkJobDto>();try{FlinkDto flinkInfo= restTemplateUtil.getForObject(flinkUrl, param ->{},FlinkDto.class);
  3. list = flinkInfo.getJobs();}catch(Exception e){
  4. log.error("调用flink API接口失败,怀疑Flink掉线");}//注册运行中的任务for(FlinkJobDto job:list){setFlinkJobInfo(job, registry,UP);}//运行中的任务Set<String> runJobs= list.stream().map(FlinkJobDto::getName).collect(Collectors.toSet());//离线的任务List<String> offlineJob= jobNames.stream().filter(item->!runJobs.contains(item)).collect(Collectors.toList());//存在离线任务则修改状态if(!ObjectUtils.isEmpty(offlineJob)){for(String name:offlineJob){setFlinkJobInfo(getFailJob(name), registry,DOWN);}}//移除的任务List<String> registryJobs=gaugeMap.keySet().stream().collect(Collectors.toList());List<String> removeJobs= registryJobs.stream().filter(item->!jobNames.contains(item)).collect(Collectors.toList());if(!ObjectUtils.isEmpty(removeJobs)){for(String name:removeJobs){Gauge gauge=gaugeMap.get(name);
  5. registry.remove(gauge);
  6. jobMap.remove(name);
  7. gaugeMap.remove(name);}}}/**
  8. * 注册信息
  9. * @methodName setFlinkJobInfo
  10. * @param job
  11. * @param registry void
  12. * @author wys
  13. * @date 2024-08-30
  14. */privatevoidsetFlinkJobInfo(FlinkJobDto job,MeterRegistry registry,int up){//任务已经注册则修改对象信息if(jobMap.containsKey(job.getName())){FlinkJobDto flinkJob=jobMap.get(job.getName());if(up==1){
  15. flinkJob=BeanUtil.copyProperties(job,FlinkJobDto.class);}
  16. flinkJob.setUp(up);return;}Date date =newDate(job.getStartTime());//注册信息Gauge gauge=Gauge.builder("flink.job.infos", job,FlinkJobDto::getUp).tag("name", job.getName()).tag("jid",job.getJid()).tag("startTime",DateUtil.format(date,"yyyy-MM-dd HH:mm:ss")).register(registry);
  17. job.setUp(up);//存入缓存用于后续动态更新
  18. jobMap.put(job.getName(), job);
  19. gaugeMap.put(job.getName(), gauge);}/**
  20. * 获取失败的任务
  21. * @methodName getFailJob
  22. * @param name
  23. * @return FlinkJobDto
  24. * @author wys
  25. * @date 2024-09-02
  26. */privateFlinkJobDtogetFailJob(String name){FlinkJobDto allFailJob=newFlinkJobDto();
  27. allFailJob.setName(name);
  28. allFailJob.setJid("");
  29. allFailJob.setUp(0);
  30. allFailJob.setStartTime(newDate().getTime());return allFailJob;}@OverridepublicvoidsetFlinkUrl(String flinkUrl){if(!ObjectUtils.isEmpty(flinkUrl)){this.flinkUrl=flinkUrl;
  31. log.info("修改Flink API地址:{}",flinkUrl);}}@OverridepublicvoidsetFlinkJobName(String flinkJobName){if(!ObjectUtils.isEmpty(flinkJobName)){this.flinkNames=flinkJobName;
  32. jobNames =Arrays.asList(flinkNames.split(","));
  33. log.info("修改Flink同步任务名称:{}",flinkJobName);}}}
  • 监控信息配置
  1. importorg.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importio.micrometer.core.instrument.MeterRegistry;importio.micrometer.core.instrument.config.MeterFilter;@ConfigurationpublicclassMeterRegistrConfig{@BeanMeterRegistryCustomizer<MeterRegistry>configurer(){return(registry)-> registry.config().meterFilter(MeterFilter.denyNameStartsWith("tomcat")).meterFilter(MeterFilter.denyNameStartsWith("logback")).meterFilter(MeterFilter.denyNameStartsWith("system")).meterFilter(MeterFilter.denyNameStartsWith("process")).meterFilter(MeterFilter.denyNameStartsWith("http")).meterFilter(MeterFilter.denyNameStartsWith("jvm"));}}
  • 配置每5秒钟获取一次运行任务
  1. importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;importjavax.annotation.Resource;importorg.springframework.stereotype.Component;importcom.wellsun.custom.prometheus.service.ICustomPrometheusService;importio.micrometer.core.instrument.MeterRegistry;importio.micrometer.core.instrument.binder.MeterBinder;@ComponentpublicclassCustomMeterBinderimplementsMeterBinder{@ResourceprivateICustomPrometheusService customPrometheusService;privateScheduledExecutorService executors =Executors.newSingleThreadScheduledExecutor();@OverridepublicvoidbindTo(MeterRegistry registry){//5秒钟采集一次数据
  2. executors.scheduleAtFixedRate(()->customPrometheusService.flinkMeter(registry),1,5,TimeUnit.SECONDS);}}
  • nacos配置文件
  1. management.endpoints.web.exposure.include=*
  2. management.metrics.tags.application=${spring.application.name}server.tomcat.mbeanregistry.enabled=true
  3. ##flinkAPI地址flink.url= http://xxx:8081/jobs/overview
  4. ##同步任务名称flink.job.names=name,name1,name2
  • nacos配置监听
  1. importjavax.annotation.Resource;importorg.springframework.beans.factory.InitializingBean;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importcom.alibaba.cloud.nacos.NacosConfigManager;importcom.alibaba.nacos.api.config.ConfigChangeEvent;importcom.alibaba.nacos.api.config.ConfigChangeItem;importcom.alibaba.nacos.client.config.listener.impl.AbstractConfigChangeListener;importcom.wellsun.custom.prometheus.service.ICustomPrometheusService;@Component(value ="datacenterNacosConfigListener")publicclassNacosConfigListenerextendsAbstractConfigChangeListenerimplementsInitializingBean{@ResourceprivateNacosConfigManager nacosConfigManager;@AutowiredprivateICustomPrometheusService customPrometheusService;@OverridepublicvoidafterPropertiesSet()throwsException{
  2. nacosConfigManager.getConfigService().addListener("prometheus-config","prometheus-group",this);}@OverridepublicvoidreceiveConfigChange(ConfigChangeEvent event){for(ConfigChangeItem changeItem : event.getChangeItems()){if(changeItem.getKey().equals("flink.url")){
  3. customPrometheusService.setFlinkUrl(changeItem.getNewValue());}elseif(changeItem.getKey().equals("flink.job.names")){
  4. customPrometheusService.setFlinkJobName(changeItem.getNewValue());}}}}

四、Prometheus配置

  • Prometheus添加该服务监控信息
  1. - job_name: 'flink-job'
  2. static_configs:
  3. - targets: ['xxx:16100']
  4. labels:
  5. __metrics_path__: "/actuator/prometheus"#自定义监控服务
  6. group: flink
  • 添加监控规则
  1. - alert: Flink任务下线
  2. expr: flink_job_infos{job="flink-job"}==0
  3. for: 20s
  4. labels:
  5. severity: 严重告警
  6. annotations:
  7. summary: "Flink同步任务失败, 请尽快处理!"
  8. description: "{{$labels.name}}同步任务失败. "
  • 自定义监控应用作为一个子服务注册到nacos。启动项目查看监控信息。在这里插入图片描述
  • 测试任务上下线,可以看到钉钉推送相应告警在这里插入图片描述

本文转载自: https://blog.csdn.net/qq_34052481/article/details/141859879
版权归原作者 舒适边缘 所有, 如有侵权,请联系我们删除。

“Prometheus监控Flink CDC任务”的评论:

还没有评论