0


hiveSql解析DolphinScheduler调度系统7位crontab表达式

背景

先说下我们数仓大致的数据链路。

各个业务系统的数据库 -> hive -> doris -> 报表/邮件/系统

hive里ods层几乎所有的数据都依赖于从业务库拉取,但是偶尔也难以避免的会遇到拉取失败或者集群(主要是业务库)宕机的情况,导致ods层数据大批量的拉取失败,从而影响到dwd层、dws层、ads层等数以千计的hive表,进而影响到数据。每当遇到宕机的情况我们的解决方案是——重新拉取数据后把失败的工作流从后往前一个个重跑,一上午甚至大半天的时间就这么过去了,手累,心也累。

最主要的问题在于,各个DS工作流之前的顺序是不能搞错的,否则还需要重跑。所以单纯按照工作流失败的顺序来执行,不仅麻烦且容易出错。

于是我们迫切地需要一个工作流的执行顺序表,但是问题是——现有的很多库或者是工具都不能解析DS的crontab表达式,因为它是7位的,比如 :0 0 0 * * ? * 表示每年的每个月的每天的0时0分0秒执行一次。然而现有的一些库,如python中的corniter都不能解析7位的表达式,

也试过一些在线工具,比如在线crontab表达式执行时间计算工具_蛙蛙工具,但是人家也说了

仅支持5位的crontab表达式。

想来想去最终还是决定用hive sql来解决这个问题,手动实现一个sql的crontab表达式解析。(有种回到大学做算法题的感觉)。

大致思路

1、通过对秒、分、时、月天、月、周天、年七个维度进行分别解析,获取每个表达式在每个维度上的执行时间;

2、然后再通过周天、月天过滤只取今天执行的crontab表达式,再通过时分秒获取具体执行时间,这样就获取了每个crontab表达式在今天的执行时间;

3、最后用DS的调度定义表关联计算结果,即可获得所有工作流在今日的执行时间。

具体代码

set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
with tmp_current_date_each_process_crontab as (
    select distinct crontab
    from bak_dolphinscheduler.t_ds_schedules
), tmp_current_date_each_process_hour as (
    select
        crontab,
        cast(t.hour as int) hour
    from
    (
        select
            crontab,
            split(crontab,' ')[2] hours
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[2] rlike ','
    )t lateral view explode(split(hours,',')) t as hour

    union all
    -- part_2,通过斜杠分隔:
    select
        crontab,
        cast(tmp.start_hour+t.pos as int) as hour
    from
    (
        select
            crontab,
            split(split(crontab,' ')[2],'/')[0] start_hour,
            split(split(crontab,' ')[2],'/')[1] separator_hour
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[2] rlike '/'
    ) tmp
    lateral view posexplode(split(space(cast(23-start_hour as int)), '')) t as pos, val
    where split(cast(pos/separator_hour as string),'\\.')[1]='0'

    union all
    -- part_3,通过横杠分隔:
    select
        crontab,
        cast(tmp.start_hour+t.pos as int) as hour
    from
    (
        select
            crontab,
            split(split(crontab,' ')[2],'-')[0] start_hour,
            split(split(crontab,' ')[2],'-')[1] end_hour
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[2] rlike '-'
    ) tmp
    lateral view posexplode(split(space(cast(end_hour-start_hour as int)), '')) t as pos, val

    union all
    -- part_4,每一小时
    select
        crontab,
        cast(t.pos as int) as hour
    from
    (
        select distinct
            crontab
        from tmp_current_date_each_process_crontab
        where split(crontab,' ')[2] = '*'
    )t lateral view posexplode(split(space(23), '')) t as pos, val

    union all
    -- part_5,指定小时
    select
        crontab,
        cast(hour as int) hour
    from
    (
        select distinct crontab,split(crontab,' ')[2] hour
        from tmp_current_date_each_process_crontab
        where split(crontab,' ')[2] not rlike '\\*|\\?|/|,|-'
    )t
) ,tmp_current_date_each_process_minute as (
    -- part_1,通过逗号分隔:
    select
        crontab,
        cast(t.minute as int) minute
    from
    (
        select
            crontab,
            split(crontab,' ')[1] minutes
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[1] rlike ','
    )t lateral view explode(split(minutes,',')) t as minute

    union all
    -- part_2,通过斜杠分隔:
    select
        crontab,
        cast(tmp.start_minute+t.pos as int) as minute
    from
    (
        select
            crontab,
            split(split(crontab,' ')[1],'/')[0] start_minute,
            split(split(crontab,' ')[1],'/')[1] separator_minute
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[1] rlike '/'
    ) tmp
    lateral view posexplode(split(space(cast(59-start_minute as int)), '')) t as pos, val
    where split(cast(pos/separator_minute as string),'\\.')[1]='0'

    union all
    -- part_3,通过横杠分隔:
    select
        crontab,
        cast(tmp.start_minute+t.pos as int) as minute
    from
    (
        select
            crontab,
            split(split(crontab,' ')[1],'-')[0] start_minute,
            split(split(crontab,' ')[1],'-')[1] end_minute
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[1] rlike '-'
    ) tmp
    lateral view posexplode(split(space(cast(end_minute-start_minute as int)), '')) t as pos, val

    union all
    -- part_4,每一分钟
    select
        crontab,
        cast(t.pos as int) as minute
    from
    (
        select distinct
            crontab
        from tmp_current_date_each_process_crontab
        where split(crontab,' ')[1] = '*'
    )t lateral view posexplode(split(space(59), '')) t as pos, val

    union all
    -- part_5,指定分钟
    select
        crontab,
        cast(minute as int) minute
    from
    (
        select distinct crontab,split(crontab,' ')[1] minute
        from tmp_current_date_each_process_crontab
        where split(crontab,' ')[1] not rlike '\\*|\\?|/|,|-'
    )t
), tmp_current_date_each_process_second as (
    -- part_1,通过逗号分隔:
    select
        crontab,
        cast(t.second as int) second
    from
    (
        select
            crontab,
            split(crontab,' ')[0] seconds
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[0] rlike ','
    )t lateral view explode(split(seconds,',')) t as second

    union all
    -- part_2,通过斜杠分隔:
    select
        crontab,
        cast(tmp.start_second+t.pos as int) as second
    from
    (
        select
            crontab,
            split(split(crontab,' ')[0],'/')[0] start_second,
            split(split(crontab,' ')[0],'/')[1] separator_second
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[0] rlike '/'
    ) tmp
    lateral view posexplode(split(space(cast(59-start_second as int)), '')) t as pos, val
    where split(cast(pos/separator_second as string),'\\.')[1]='0'

    union all
    -- part_3,通过横杠分隔:
    select
        crontab,
        cast(tmp.start_second+t.pos as int) as second
    from
    (
        select
            crontab,
            split(split(crontab,' ')[0],'-')[0] start_second,
            split(split(crontab,' ')[0],'-')[1] end_second
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[0] rlike '-'
    ) tmp
    lateral view posexplode(split(space(cast(end_second-start_second as int)), '')) t as pos, val

    union all
    -- part_4,每一秒
    select
        crontab,
        cast(t.pos as int) as second
    from
    (
        select distinct
            crontab
        from tmp_current_date_each_process_crontab
        where split(crontab,' ')[0] = '*'
    )t lateral view posexplode(split(space(59), '')) t as pos, val

    union all
    -- part_5,指定秒
    select
        crontab,
        cast(second as int ) second
    from
    (
        select distinct crontab,split(crontab,' ')[0] second
        from tmp_current_date_each_process_crontab
        where split(crontab,' ')[0] not rlike '\\*|\\?|/|,|-'
    )t
), tmp_current_date_each_process_crontab_week_trans as (
    select distinct crontab original_exp,replace(replace(replace(replace(replace(replace(replace(crontab,'SUN','1'),'MON','2'),'TUE','3'),'WED','4'),'THU','5'),'FRI','6'),'SAT','7') crontab
    from tmp_current_date_each_process_crontab
), tmp_current_date_each_process_weekday as (
    select
        original_exp crontab,
        cast(week as int) weekday
    from
    (
        select
            crontab,original_exp,
            split(crontab,' ')[5] weeks
        from
        (
            select crontab, original_exp
            from tmp_current_date_each_process_crontab_week_trans
        )t where split(crontab,' ')[3] = '?'
                and split(crontab,' ')[5] rlike ','
    )t lateral view explode(split(weeks,',')) t as week

    union all
    -- part_1_2,通过斜杠分隔:
    select
        original_exp crontab,
        cast(tmp.start_week+t.pos as int) as weekday
    from
    (
        select
            crontab,original_exp,
            split(split(crontab,' ')[5],'/')[0] start_week,
            split(split(crontab,' ')[5],'/')[1] separator_week
        from
        (
            select crontab, original_exp
            from tmp_current_date_each_process_crontab_week_trans
        )t where split(crontab,' ')[3] = '?' and split(crontab,' ')[5] rlike '/'
    ) tmp
    lateral view posexplode(split(space(cast(7-start_week as int)), '')) t as pos, val
    where split(cast(pos/separator_week as string),'\\.')[1]='0'

    union all
    -- part_1_3,通过横杠分隔:
    select
        crontab,
        cast(tmp.start_week+t.pos as int) weekday
    from
    (
        select
            crontab,
            split(split(crontab,' ')[5],'-')[0] start_week,
            split(split(crontab,' ')[5],'-')[1] end_week
        from
        (
            select crontab
            from tmp_current_date_each_process_crontab_week_trans
        )t where split(crontab,' ')[3] = '?' and split(crontab,' ')[5] rlike '-'
    ) tmp
    lateral view posexplode(split(space(cast(end_week-start_week as int)), '')) t as pos, val

    union all
    -- part_1_4,每一周天,理论上其实不会有这种情况
    select
        crontab,
        cast(t.pos+1 as int) as weekday
    from
    (
        select crontab, original_exp
        from tmp_current_date_each_process_crontab_week_trans
        where split(original_exp,' ')[3] = '?' and split(original_exp,' ')[5] = '*'
    )t lateral view posexplode(split(space(6), '')) t as pos, val

    union all
    -- part_1_5,指定星期
    select
        crontab,
        cast(split(crontab,' ')[5] as int) weekday
    from
    (
        select crontab
        from tmp_current_date_each_process_crontab_week_trans
        where split(original_exp,' ')[5] not rlike '\\*|\\?|/|,|-'
    )t
), tmp_current_date_each_process_monthday as (
    -- part_2_1,通过逗号分隔:
    select
        crontab,
        cast(day as int) monthday
    from
    (
        select
            crontab,
            split(crontab,' ')[3] days
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[3] <> '?'
                and split(crontab,' ')[3] rlike ','
    )t lateral view explode(split(days,',')) t as day

    union all
    -- part_2_2,通过斜杠分隔:
    select
        crontab,
        cast(tmp.start_day+t.pos as int) as monthday
    from
    (
        select
            crontab,
            split(split(crontab,' ')[3],'/')[0] start_day,
            split(split(crontab,' ')[3],'/')[1] separator_day
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[3] <> '?' and split(crontab,' ')[3] rlike '/'
    ) tmp
    lateral view posexplode(split(space(cast(dayofmonth(last_day(current_date()))-start_day as int)), '')) t as pos, val
    where split(cast(pos/separator_day as string),'\\.')[1]='0'

    union all
    -- part_2_3,通过横杠分隔:
    select
        crontab,
        cast(tmp.start_day+t.pos as int) as monthday
    from
    (
        select
            crontab,
            split(split(crontab,' ')[3],'-')[0] start_day,
            split(split(crontab,' ')[3],'-')[1] end_day
        from
        (
            select distinct crontab
            from tmp_current_date_each_process_crontab
        )t where split(crontab,' ')[3] <> '?' and split(crontab,' ')[3] rlike '-'
    ) tmp
    lateral view posexplode(split(space(cast(end_day-start_day as int)), '')) t as pos, val

    union all
    -- part_2_4,每一月天
    select
        crontab,
        cast(t.pos+1 as int) as monthday
    from
    (
        select distinct crontab
        from tmp_current_date_each_process_crontab
        where split(crontab,' ')[3] = '*'
    )t lateral view posexplode(split(space(dayofmonth(last_day(current_date()))-1), '')) t as pos, val

    union all
    -- part_2_5,指定天
    select
        crontab,
        cast(day as int) monthday
    from
    (
        select distinct crontab,split(crontab,' ')[3] day
        from tmp_current_date_each_process_crontab
        where split(crontab,' ')[3] not rlike '\\*|\\?|/|,|-'
    )t
), tmp_current_date_each_process_crontab_month_trans as (
    select distinct crontab original_exp, replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(crontab,'JAN','1'),'FEB','2'),'MAR','3'),'APR','4'),'MAY','5'),'JUN','6'),'JUL','7'),'AUG','8'),'SEP','9'),'OCT','10'),'NOV','11'),'DEC','12') crontab
    from tmp_current_date_each_process_crontab
), tmp_current_date_each_process_month as (
    -- part_1,通过逗号分隔:
    select
        original_exp crontab,
        cast(month as int) month
    from
    (
        select
            crontab,original_exp,
            split(crontab,' ')[4] months
        from
        (
            select crontab, original_exp
            from tmp_current_date_each_process_crontab_month_trans
        )t where split(crontab,' ')[4] not rlike '\\*|\\?'
               and split(crontab,' ')[4] rlike ','
    )t lateral view explode(split(months,',')) t as month

    union all
    -- part_2,通过斜杠分隔:
    select
        original_exp crontab,
        cast(tmp.start_month+t.pos as int) as month
    from
    (
        select
            crontab,original_exp,
            split(split(crontab,' ')[4],'/')[0] start_month,
            split(split(crontab,' ')[4],'/')[1] separator_month
        from
        (
            select crontab, original_exp
            from tmp_current_date_each_process_crontab_month_trans
        )t where split(crontab,' ')[4] not rlike '\\*|\\?'
               and split(crontab,' ')[4] rlike '/'
    ) tmp
    lateral view posexplode(split(space(cast(12-start_month as int)), '')) t as pos, val
    where split(cast(pos/separator_month as string),'\\.')[1]='0'

    union all
    -- part_3,通过横杠分隔:
    select
        original_exp crontab,
        cast(tmp.start_month+t.pos as int) as month
    from
    (
        select
            crontab,original_exp,
            split(split(crontab,' ')[4],'-')[0] start_month,
            split(split(crontab,' ')[4],'-')[1] end_month
        from
        (
            select crontab, original_exp
            from tmp_current_date_each_process_crontab_month_trans
        )t where split(crontab,' ')[4] not rlike '\\*|\\?'
               and split(crontab,' ')[4] rlike '-'
    ) tmp
    lateral view posexplode(split(space(cast(end_month-start_month as int)), '')) t as pos, val

    union all
    -- part_4,每一月
    select
        original_exp crontab,
        cast(t.pos+1 as int) as month
    from
    (
        select crontab, original_exp
        from tmp_current_date_each_process_crontab_month_trans
        where split(original_exp,' ')[4] = '*'
    )t lateral view posexplode(split(space(11), '')) t as pos, val

    union all
    -- part_5,指定月
    select
        original_exp crontab,
        cast(split(crontab,' ')[4] as int) month
    from
    (
        select crontab, original_exp
        from tmp_current_date_each_process_crontab_month_trans
        where split(original_exp,' ')[4] not rlike '\\*|\\?|/|,|-'
    )t
), tmp_current_date_each_process_scheduler_list_today as (
    select
        a.crontab,
        b.monthday,
        b.weekday,
        d.month
    from
    (
        select crontab
        from tmp_current_date_each_process_crontab
    )a
    inner join
    (
        select
            crontab,
            concat_ws(',',collect_set(cast(monthday as string))) monthday,
            concat_ws(',',collect_set(cast(weekday as string))) weekday
        from
        (
            -- 按月天筛选
            select crontab,monthday,null weekday
            from tmp_current_date_each_process_monthday
            where cast(day(current_date()) as int)=cast(monthday as int)
            group by crontab, monthday
            union all
            -- 按周天筛选
            select crontab,day(current_date()) monthday,weekday
            from tmp_current_date_each_process_weekday
            where cast(dayofweek(current_date()) as int)=cast(weekday as int)
            group by crontab, weekday
        )t group by crontab
    )b on a.crontab=b.crontab
    inner join
    (-- 按月筛选
        select crontab,month
        from tmp_current_date_each_process_month
        where cast(month(current_date()) as int)=cast(month as int)
        group by crontab,month
    )d on a.crontab=d.crontab
), tmp_current_date_each_process_scheduler_time_today as (
    select
        a.crontab,
        month,
        monthday,
        weekday,
        hour,
        minute,
        second,
        concat(year(current_date()),'-',month,'-',monthday,' ',hour,':',minute,':',second) scheduler_time,
        from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') etl_time
    from
    (
        select crontab,
               case when monthday<10 then concat('0',monthday) else monthday end monthday,
               case when weekday<10 then concat('0',weekday) else weekday end  weekday,
               case when month<10 then concat('0',month) else month end month
        from tmp_current_date_each_process_scheduler_list_today
    )a
    left join
    (   -- 获取执行的小时
        select crontab,case when hour<10 then concat('0',hour) else hour end hour
        from tmp_current_date_each_process_hour
    )b on a.crontab=b.crontab
    left join
    (   -- 获取执行的分钟
        select crontab,case when minute<10 then concat('0',minute) else minute end minute
        from tmp_current_date_each_process_minute
    )c on a.crontab=c.crontab
    left join
    (   -- 获取执行的秒
        select crontab,case when second<10 then concat('0',second) else second end second
        from tmp_current_date_each_process_second
    )d on a.crontab=d.crontab
)
insert overwrite table dwd.dwd_dolphinscheduler_process_scheduler_time partition (d)
select
    a.code project_code,
    a.project_name,
    b.code process_code,
    b.name process_name,
    b.description process_description,
    c.crontab,
    c.month scheduler_month,
    c.monthday scheduler_monthday,
    c.weekday scheduler_weekday,
    c.hour scheduler_hour,
    c.minute scheduler_minute,
    c.second scheduler_second,
    c.scheduler_time,
    from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') etl_time,
    date_format(current_date(),'yyyyMMdd') d
from
(
    select code,name project_name
    from bak_dolphinscheduler.t_ds_project
)a
left join
(
    select code,name,description,project_code,user_id
    from bak_dolphinscheduler.t_ds_process_definition
    where release_state=1 and flag=1
)b on a.code = b.project_code
inner join
(
    select
        aa.process_definition_code,
        aa.crontab,
        bb.month, monthday, weekday, hour, minute, second,
        bb.scheduler_time
    from
    (
        select process_definition_code,crontab
        from bak_dolphinscheduler.t_ds_schedules
        where release_state=1
    )aa
    left join
    (
        select crontab, month, monthday, weekday, hour, minute, second, scheduler_time
        from tmp_current_date_each_process_scheduler_time_today
    )bb on aa.crontab=bb.crontab
)c on b.code=c.process_definition_code  ;

最终结果

在计算完成后即可根据该分区表获取到每个工作流的当日理论执行时间,为重跑工作流提供了更清晰的执行顺序。不过现有代码其实还不够完全,比较熟悉crontab表达式的朋友应该可以注意到没有对L和W进行处理,不过已经满足我们的需求了,有需要的朋友可以在此基础上自行添加相关逻辑。后续在捋清楚工作流直接的依赖关系后再通过shell和DS提供的API即可实现整体重跑(前提是你的工作流不会因重跑导致发生问题)。

select project_name,process_name,crontab,min(scheduler_time) first_scheduler_time,max(scheduler_time) last_scheduler_time,count(distinct scheduler_time) scheduler_times
from dwd.dwd_dolphinscheduler_process_scheduler_time
where d=date_format(current_date(),'yyyyMMdd') and scheduler_time is not null and project_name='dma'
group by project_name, process_name,crontab
order by first_scheduler_time ;

附录

附上表的DDL语句和表血缘关系,其中bak_dolphinscheduler库下的表来源于dolphinscheduler的Mysql数据库。

CREATE TABLE dwd.dwd_dolphinscheduler_process_scheduler_time(
  `project_code` string COMMENT '项目编码', 
  `project_name` string COMMENT '项目名称', 
  `process_code` string COMMENT '工作流编码', 
  `process_name` string COMMENT '工作流名称', 
  `process_description` string COMMENT '工作流说明', 
  `crontab` string COMMENT 'cron表达式', 
  `scheduler_month` string COMMENT '调度执行-月', 
  `scheduler_monthday` string COMMENT '调度执行-日', 
  `scheduler_weekday` string COMMENT '调度执行-星期', 
  `scheduler_hour` string COMMENT '调度执行-小时', 
  `scheduler_minute` string COMMENT '调度执行-分钟', 
  `scheduler_second` string COMMENT '调度执行-秒', 
  `scheduler_time` string COMMENT '调度执行时间', 
  `etl_time` string COMMENT '计算时间'
)COMMENT '工作流每日理论调度时间分区表'
PARTITIONED BY (`d` string) 
dwd.dwd_dolphinscheduler_process_scheduler_time
hive来源表:
├─ bak_dolphinscheduler.t_ds_schedules
├─ bak_dolphinscheduler.t_ds_project
├─ bak_dolphinscheduler.t_ds_process_definition
with语句临时表:
├─ tmp_current_date_each_process_hour
├─ tmp_current_date_each_process_minute
├─ tmp_current_date_each_process_second
├─ tmp_current_date_each_process_crontab_week_trans
├─ tmp_current_date_each_process_weekday
├─ tmp_current_date_each_process_monthday
├─ tmp_current_date_each_process_crontab_month_trans
├─ tmp_current_date_each_process_month
├─ tmp_current_date_each_process_scheduler_list_today
├─ tmp_current_date_each_process_scheduler_time_today
└─ tmp_current_date_each_process_crontab
标签: sql hive 大数据

本文转载自: https://blog.csdn.net/qq_45629229/article/details/135669285
版权归原作者 Aser5544 所有, 如有侵权,请联系我们删除。

“hiveSql解析DolphinScheduler调度系统7位crontab表达式”的评论:

还没有评论