0


【Flink metric(3)】chunjun是如何实现脏数据管理的

文章目录

一. 基础逻辑

脏数据管理模块的基本逻辑是:

  1. 当数据消费失败时,将脏数据拦截并保存到dirtyDataCollector中;
  2. 全局metric判断:脏数据达到设定值之后,任务报错,flink停止运行,并将脏数据输出到flink日志中、或mysql的配置中。

对于代码实现:

DirtyManager用于管理DirtyDataCollector,串起DirtyDataCollector的生命周期,DirtyDataCollector主要用于收集脏数据并输出(到日志中,mysql中),脏数据数量达到设定值之后,flink停止运行。

具体的DataCollector实现有:

在这里插入图片描述

分别用于输出到taskmanager的日志、(最后报错时)jobmanager日志、输出到mysql表中。

所以这里有三层代码结构:

  • DirtyManager:管理DirtyDataCollector
  • DirtyDataCollector:主要用于收集脏数据并输出,并判断脏数据是否达到临界值
  • 具体的DataCollector的实现:具体的输出实现:输出到日志,输出到mysql。

接下来我们逐个看每层的具体实现逻辑

二. DirtyManager

DirtyManager用于管理DirtyDataCollector,串起DirtyDataCollector的生命周期(open、run、close),主要流程如下:

  1. 设置系统配置给DirtyDataCollector
  2. 开启DirtyManager线程,主要用于DirtyDataCollector消费脏数据(收集脏数据)
  3. 关闭资源:DirtyDataCollector、DirtyManager的线程资源。

1. 初始化

初始化DirtyManager

  • 根据配置加载特定的DirtyDataCollector:用于脏数据的收集
  • 获取系统信息:jobId、jobName、operationName
  • 获取脏数据metric,用于定期合并脏数据为全局脏数据。
publicDirtyManager(DirtyConfig dirtyConfig,RuntimeContext runtimeContext){//通过反射注册DirtyDataCollectorthis.consumer =DataSyncFactoryUtil.discoverDirty(dirtyConfig);Map<String,String> allVariables = runtimeContext.getMetricGroup().getAllVariables();this.jobId = allVariables.get(JOB_ID);this.jobName = allVariables.getOrDefault(JOB_NAME,"defaultJobName");this.operationName = allVariables.getOrDefault(OPERATOR_NAME,"defaultOperatorName");this.errorCounter = runtimeContext.getLongCounter(Metrics.NUM_ERRORS);}

2. 收集脏数据并check

被具体的连接器调用:
具体当连接器生产数据或写数据到数据源报错时,调用此方法收集脏数据

  1. 创建线程,用于异步执行DirtyDataCollector,开始消费脏数据到日志或mysql表中
  2. 添加脏数据条数,同步到全局脏数据metric中
  3. 脏数据信息,存到队列中,等待具体的脏数据收集器消费
  4. 子流程:判断脏数据条数是否大于总脏数据条数
publicvoidcollect(Object data,Throwable cause,String field,long globalErrors){if(executor ==null){execute();}DirtyDataEntry entity =newDirtyDataEntry();  
    entity.setJobId(jobId);  
    entity.setJobName(jobName);  
    entity.setOperatorName(operationName);  
    entity.setCreateTime(newTimestamp(System.currentTimeMillis()));  
    entity.setDirtyContent(toString(data));  
    entity.setFieldName(field);  
    entity.setErrorMessage(ExceptionUtil.getErrorMessage(cause));//积累metric:errorCounter,这里直接同步到jobmanager?
    errorCounter.add(1L);//将脏数据添加到队列,等待消费。
    consumer.offer(entity, globalErrors);}/**  
 * 创建线程,用于异步执行DirtyDataCollector  
 */publicvoidexecute(){if(executor ==null){  
        executor =newThreadPoolExecutor(MAX_THREAD_POOL_SIZE,MAX_THREAD_POOL_SIZE,0,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<>(),newChunJunThreadFactory("dirty-consumer",true,(t, e)->{  
                                    log.error(String.format("Thread [%s] consume failed.", t.getName()),  
                                            e);}),newThreadPoolExecutor.CallerRunsPolicy());}//初始化DirtyDataCollector:比如脏数据定时发送到mysql时的线程注册  
    consumer.open();//拿出一个线程执行DirtyDataCollector的execute方法  
    executor.execute(consumer);}

3. 关闭资源

/** Close manager. */publicvoidclose(){if(!isAlive.get()){return;}//先关闭datacollector的资源if(consumer !=null){  
        consumer.close();}//再关闭executor线程if(executor !=null){  
        executor.shutdown();}  
  
    isAlive.compareAndSet(true,false);}

三. DirtyDataCollector

处于第二层的dirtyDataCollector实现了脏数据的临时保存并等待具体DataCollector的消费,
它的基本逻辑是:

  1. 当脏数据消费失败时,将脏数据拦截并保存到consumeQueue中,等待被消费
  2. 全局的metric:脏数据达到设定值之后,任务报错,flink停止运行,并将脏数据输出到flink日志中。

1. 初始化

在DirtyManager实例化时,注册DirtyDataCollector时的操作,

这里获取脏数据最大值,允许消费脏数据失败的条数,以及对具体DataCollector的初始化,我们下节分析。

publicvoidinitializeConsumer(DirtyConfig conf){this.maxConsumed = conf.getMaxConsumed();this.maxFailedConsumed = conf.getMaxFailedConsumed();this.init(conf);}

被DirtyManager调用:在开启脏数据收集器线程之前执行

初始化具体脏数据收集器:目前之后mysql脏数据收集器实现了此方法:消费线程、mysql连接

publicvoidopen(){}

2. 收集脏数据并check

offer方法被DirtyManager的collect方法调用

  • 用于存储具体脏数据并更新单个slot的脏数据条数。
  • 每添加一条脏数据,就判断脏数据是否达到了设定值,如果是则抛出异常。

其中:globalErrors是上文AccumulatorCollector定期更新的结果。

//存储脏数据具体内容,并更新单个slot的脏数据条数publicsynchronizedvoidoffer(DirtyDataEntry dirty,long globalErrors){  
    consumeQueue.offer(dirty);addConsumed(1L, dirty, globalErrors);}/**  
 * 添加脏数据  
 * 通过metric判断此时的脏数据条数,是否已经超过全局设置的脏数据条数  
 * @param count  
 * @param dirty  
 * @param globalErrors  
 */protectedvoidaddConsumed(long count,DirtyDataEntry dirty,long globalErrors){  
    consumedCounter.add(count);// 因为总体的脏数据需要tm和jm进行通讯(每tm心跳+1s),会有延迟,且当单slot运行时误差将达到最大  // 所以这里需要判断延迟情况  long max =  
            consumedCounter.getLocalValue()>= globalErrors  
                    ? consumedCounter.getLocalValue(): globalErrors;// 但这里仍然有误差:此时如果所有的slot都消费了脏数据那么其他slot的脏数据就记录不到。也就是会多消费脏数据  // 所以这里要有取舍:是否要消费完全准确的脏数据  if(max >= maxConsumed){StringJoiner dirtyMessage =newStringJoiner("\n").add("\n****************Dirty Data Begin****************\n").add(dirty.toString()).add("\n****************Dirty Data End******************\n");thrownewNoRestartException(String.format("The dirty consumer shutdown, due to the consumed count exceed the max-consumed [%s]",  
                        maxConsumed)+ dirtyMessage);}}

3. run:消费脏数据

由DirtyManager开启脏数据消费线程,

具体的DataCollector(log、mysql)消费脏数据,发送到Taskmanager日志或mysql表中。

/**  
 * 开启脏数据消费线程  
 * 定时消费脏数据,发送到执行脏数据管理器中:log、mysql等  
 */@Overridepublicvoidrun(){while(isRunning.get()){try{//指定的DataCollector消费脏数据DirtyDataEntry dirty = consumeQueue.take();consume(dirty);}catch(Exception e){//未成功将脏数据收集到脏数据管理模块中  addFailedConsumed(e,1L);}}}/**  
 *  消费脏数据用于输出到日志、mysql等  
 */protectedabstractvoidconsume(DirtyDataEntry dirty)throwsException;

4. 释放资源

不同的DataCollector有不同的操作,下节分析

publicabstractvoidclose();

四. LogDirtyDataCollector

实现比较简单:拿到的数据直接打印到Taskmanager中,关闭时,设定isRunning为false

/**  
 * 没有线程,调用即输出到日志中  
 */@Slf4jpublicclassLogDirtyDataCollectorextendsDirtyDataCollector{privatestaticfinallong serialVersionUID =7366317208451727471L;privateLong printRate;@Overrideprotectedvoidinit(DirtyConfig conf){this.printRate = conf.getPrintRate();}/**  
     * 输出脏数据到taskmanager  
     * @param dirty dirty-data which should be consumed.  
     */@Overrideprotectedvoidconsume(DirtyDataEntry dirty){if(consumedCounter.getLocalValue()% printRate ==0){StringJoiner dirtyMessage =newStringJoiner("\n").add("\n====================Dirty Data=====================").add(dirty.toString()).add("\n===================================================");  
            log.warn(dirtyMessage.toString());}}@Overridepublicvoidclose(){  
        isRunning.compareAndSet(true,false);  
        log.info("Print consumer closed.");}}

下篇分析MysqlDirtyDataCollector是如何消费数据。

标签: flink

本文转载自: https://blog.csdn.net/hiliang521/article/details/139940708
版权归原作者 roman_日积跬步-终至千里 所有, 如有侵权,请联系我们删除。

“【Flink metric(3)】chunjun是如何实现脏数据管理的”的评论:

还没有评论