一、背景
- 目前项目存在Flink CDC 同步MySQL数据到列数据库,Flink同步过程中存在任务失败,导致数据同步断开,对业务查询造成了影响。现搭建一个SpringBoot 项目使用micrometer配合Prometheus监控Flink CDC的运行状况,任务失败能够通过钉钉提示。
二、项目准备
- 已经搭建好一套Prometheus的监控系统,参考Prometheus Alertmanager集成钉钉告警
三、Flink CDC监控项目开发
- 相关jar导入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--actuator --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- prometheus --><dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- okhttp --><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.60</version></dependency><!--hutool --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.17</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
- 定义接口ICustomPrometheusService,flinkMeter方法用于获取任务信息。
- setFlinkUrl和setFlinkJobName方法使用nacos动态刷新功能,可以实时修改期望运行任务名称以及flink API地址。
publicinterfaceICustomPrometheusService{/**
* flink监控信息
* @methodName meter
* @param registry void
* @author wys
* @date 2024-08-30
*/publicvoidflinkMeter(MeterRegistry registry);/**
* 设置Flink API地址
* @methodName setFlinkUrl
* @param flinkUrl void
* @author wys
* @date 2024-08-30
*/publicvoidsetFlinkUrl(String flinkUrl);/**
* 设置Flink 数据同步任务名称
* @methodName setFlinkJobName
* @param flinkJobName void
* @author wys
* @date 2024-08-30
*/publicvoidsetFlinkJobName(String flinkJobName);}
- 接口实现,这里通过Flink提供的API接口进行任务信息的读取,Gauge 设置任务信息。
- flinkUrl为Flink提供的API地址,flinkNames属性是我们期望运行的Flink CDC任务名称,多个任务名使用逗号分割。
- 通过比较期望运行任务和实际运行任务,来设置哪些任务在线,哪些任务离线。离线的任务通过Prometheus发送钉钉通知。
@Slf4j@ServicepublicclassCustomPrometheusServiceImplimplementsICustomPrometheusService{@ResourceprivateRestTemplateUtil restTemplateUtil;@Value("${flink.url}")privateString flinkUrl;@Value("${flink.job.names:no_job}")privateString flinkNames;privateList<String> jobNames=newArrayList<>();privateMap<String,FlinkJobDto> jobMap=newHashMap<>();privateMap<String,Gauge> gaugeMap=newHashMap<>();privatefinalstaticintUP=1;privatefinalstaticintDOWN=0;@PostConstructpublicvoidinitJobNames(){
jobNames =Arrays.asList(flinkNames.split(","));}@OverridepublicvoidflinkMeter(MeterRegistry registry){List<FlinkJobDto> list=newArrayList<FlinkJobDto>();try{FlinkDto flinkInfo= restTemplateUtil.getForObject(flinkUrl, param ->{},FlinkDto.class);
list = flinkInfo.getJobs();}catch(Exception e){
log.error("调用flink API接口失败,怀疑Flink掉线");}//注册运行中的任务for(FlinkJobDto job:list){setFlinkJobInfo(job, registry,UP);}//运行中的任务Set<String> runJobs= list.stream().map(FlinkJobDto::getName).collect(Collectors.toSet());//离线的任务List<String> offlineJob= jobNames.stream().filter(item->!runJobs.contains(item)).collect(Collectors.toList());//存在离线任务则修改状态if(!ObjectUtils.isEmpty(offlineJob)){for(String name:offlineJob){setFlinkJobInfo(getFailJob(name), registry,DOWN);}}//移除的任务List<String> registryJobs=gaugeMap.keySet().stream().collect(Collectors.toList());List<String> removeJobs= registryJobs.stream().filter(item->!jobNames.contains(item)).collect(Collectors.toList());if(!ObjectUtils.isEmpty(removeJobs)){for(String name:removeJobs){Gauge gauge=gaugeMap.get(name);
registry.remove(gauge);
jobMap.remove(name);
gaugeMap.remove(name);}}}/**
* 注册信息
* @methodName setFlinkJobInfo
* @param job
* @param registry void
* @author wys
* @date 2024-08-30
*/privatevoidsetFlinkJobInfo(FlinkJobDto job,MeterRegistry registry,int up){//任务已经注册则修改对象信息if(jobMap.containsKey(job.getName())){FlinkJobDto flinkJob=jobMap.get(job.getName());if(up==1){
flinkJob=BeanUtil.copyProperties(job,FlinkJobDto.class);}
flinkJob.setUp(up);return;}Date date =newDate(job.getStartTime());//注册信息Gauge gauge=Gauge.builder("flink.job.infos", job,FlinkJobDto::getUp).tag("name", job.getName()).tag("jid",job.getJid()).tag("startTime",DateUtil.format(date,"yyyy-MM-dd HH:mm:ss")).register(registry);
job.setUp(up);//存入缓存用于后续动态更新
jobMap.put(job.getName(), job);
gaugeMap.put(job.getName(), gauge);}/**
* 获取失败的任务
* @methodName getFailJob
* @param name
* @return FlinkJobDto
* @author wys
* @date 2024-09-02
*/privateFlinkJobDtogetFailJob(String name){FlinkJobDto allFailJob=newFlinkJobDto();
allFailJob.setName(name);
allFailJob.setJid("");
allFailJob.setUp(0);
allFailJob.setStartTime(newDate().getTime());return allFailJob;}@OverridepublicvoidsetFlinkUrl(String flinkUrl){if(!ObjectUtils.isEmpty(flinkUrl)){this.flinkUrl=flinkUrl;
log.info("修改Flink API地址:{}",flinkUrl);}}@OverridepublicvoidsetFlinkJobName(String flinkJobName){if(!ObjectUtils.isEmpty(flinkJobName)){this.flinkNames=flinkJobName;
jobNames =Arrays.asList(flinkNames.split(","));
log.info("修改Flink同步任务名称:{}",flinkJobName);}}}
- 监控信息配置
importorg.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importio.micrometer.core.instrument.MeterRegistry;importio.micrometer.core.instrument.config.MeterFilter;@ConfigurationpublicclassMeterRegistrConfig{@BeanMeterRegistryCustomizer<MeterRegistry>configurer(){return(registry)-> registry.config().meterFilter(MeterFilter.denyNameStartsWith("tomcat")).meterFilter(MeterFilter.denyNameStartsWith("logback")).meterFilter(MeterFilter.denyNameStartsWith("system")).meterFilter(MeterFilter.denyNameStartsWith("process")).meterFilter(MeterFilter.denyNameStartsWith("http")).meterFilter(MeterFilter.denyNameStartsWith("jvm"));}}
- 配置每5秒钟获取一次运行任务
importjava.util.concurrent.Executors;importjava.util.concurrent.ScheduledExecutorService;importjava.util.concurrent.TimeUnit;importjavax.annotation.Resource;importorg.springframework.stereotype.Component;importcom.wellsun.custom.prometheus.service.ICustomPrometheusService;importio.micrometer.core.instrument.MeterRegistry;importio.micrometer.core.instrument.binder.MeterBinder;@ComponentpublicclassCustomMeterBinderimplementsMeterBinder{@ResourceprivateICustomPrometheusService customPrometheusService;privateScheduledExecutorService executors =Executors.newSingleThreadScheduledExecutor();@OverridepublicvoidbindTo(MeterRegistry registry){//5秒钟采集一次数据
executors.scheduleAtFixedRate(()->customPrometheusService.flinkMeter(registry),1,5,TimeUnit.SECONDS);}}
- nacos配置文件
management.endpoints.web.exposure.include=*
management.metrics.tags.application=${spring.application.name}server.tomcat.mbeanregistry.enabled=true
##flinkAPI地址flink.url= http://xxx:8081/jobs/overview
##同步任务名称flink.job.names=name,name1,name2
- nacos配置监听
importjavax.annotation.Resource;importorg.springframework.beans.factory.InitializingBean;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importcom.alibaba.cloud.nacos.NacosConfigManager;importcom.alibaba.nacos.api.config.ConfigChangeEvent;importcom.alibaba.nacos.api.config.ConfigChangeItem;importcom.alibaba.nacos.client.config.listener.impl.AbstractConfigChangeListener;importcom.wellsun.custom.prometheus.service.ICustomPrometheusService;@Component(value ="datacenterNacosConfigListener")publicclassNacosConfigListenerextendsAbstractConfigChangeListenerimplementsInitializingBean{@ResourceprivateNacosConfigManager nacosConfigManager;@AutowiredprivateICustomPrometheusService customPrometheusService;@OverridepublicvoidafterPropertiesSet()throwsException{
nacosConfigManager.getConfigService().addListener("prometheus-config","prometheus-group",this);}@OverridepublicvoidreceiveConfigChange(ConfigChangeEvent event){for(ConfigChangeItem changeItem : event.getChangeItems()){if(changeItem.getKey().equals("flink.url")){
customPrometheusService.setFlinkUrl(changeItem.getNewValue());}elseif(changeItem.getKey().equals("flink.job.names")){
customPrometheusService.setFlinkJobName(changeItem.getNewValue());}}}}
四、Prometheus配置
- Prometheus添加该服务监控信息
- job_name: 'flink-job'
static_configs:
- targets: ['xxx:16100']
labels:
__metrics_path__: "/actuator/prometheus"#自定义监控服务
group: flink
- 添加监控规则
- alert: Flink任务下线
expr: flink_job_infos{job="flink-job"}==0
for: 20s
labels:
severity: 严重告警
annotations:
summary: "Flink同步任务失败, 请尽快处理!"
description: "{{$labels.name}}同步任务失败. "
- 自定义监控应用作为一个子服务注册到nacos。启动项目查看监控信息。
- 测试任务上下线,可以看到钉钉推送相应告警
本文转载自: https://blog.csdn.net/qq_34052481/article/details/141859879
版权归原作者 舒适边缘 所有, 如有侵权,请联系我们删除。
版权归原作者 舒适边缘 所有, 如有侵权,请联系我们删除。