背景
先说下我们数仓大致的数据链路。
各个业务系统的数据库 -> hive -> doris -> 报表/邮件/系统
hive里ods层几乎所有的数据都依赖于从业务库拉取,但是偶尔也难以避免的会遇到拉取失败或者集群(主要是业务库)宕机的情况,导致ods层数据大批量的拉取失败,从而影响到dwd层、dws层、ads层等数以千计的hive表,进而影响到数据。每当遇到宕机的情况我们的解决方案是——重新拉取数据后把失败的工作流从后往前一个个重跑,一上午甚至大半天的时间就这么过去了,手累,心也累。
最主要的问题在于,各个DS工作流之前的顺序是不能搞错的,否则还需要重跑。所以单纯按照工作流失败的顺序来执行,不仅麻烦且容易出错。
于是我们迫切地需要一个工作流的执行顺序表,但是问题是——现有的很多库或者是工具都不能解析DS的crontab表达式,因为它是7位的,比如 :0 0 0 * * ? * 表示每年的每个月的每天的0时0分0秒执行一次。然而现有的一些库,如python中的corniter都不能解析7位的表达式,
也试过一些在线工具,比如在线crontab表达式执行时间计算工具_蛙蛙工具,但是人家也说了
仅支持5位的crontab表达式。
想来想去最终还是决定用hive sql来解决这个问题,手动实现一个sql的crontab表达式解析。(有种回到大学做算法题的感觉)。
大致思路
1、通过对秒、分、时、月天、月、周天、年七个维度进行分别解析,获取每个表达式在每个维度上的执行时间;
2、然后再通过周天、月天过滤只取今天执行的crontab表达式,再通过时分秒获取具体执行时间,这样就获取了每个crontab表达式在今天的执行时间;
3、最后用DS的调度定义表关联计算结果,即可获得所有工作流在今日的执行时间。
具体代码
set hive.exec.dynamici.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
with tmp_current_date_each_process_crontab as (
select distinct crontab
from bak_dolphinscheduler.t_ds_schedules
), tmp_current_date_each_process_hour as (
select
crontab,
cast(t.hour as int) hour
from
(
select
crontab,
split(crontab,' ')[2] hours
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[2] rlike ','
)t lateral view explode(split(hours,',')) t as hour
union all
-- part_2,通过斜杠分隔:
select
crontab,
cast(tmp.start_hour+t.pos as int) as hour
from
(
select
crontab,
split(split(crontab,' ')[2],'/')[0] start_hour,
split(split(crontab,' ')[2],'/')[1] separator_hour
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[2] rlike '/'
) tmp
lateral view posexplode(split(space(cast(23-start_hour as int)), '')) t as pos, val
where split(cast(pos/separator_hour as string),'\\.')[1]='0'
union all
-- part_3,通过横杠分隔:
select
crontab,
cast(tmp.start_hour+t.pos as int) as hour
from
(
select
crontab,
split(split(crontab,' ')[2],'-')[0] start_hour,
split(split(crontab,' ')[2],'-')[1] end_hour
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[2] rlike '-'
) tmp
lateral view posexplode(split(space(cast(end_hour-start_hour as int)), '')) t as pos, val
union all
-- part_4,每一小时
select
crontab,
cast(t.pos as int) as hour
from
(
select distinct
crontab
from tmp_current_date_each_process_crontab
where split(crontab,' ')[2] = '*'
)t lateral view posexplode(split(space(23), '')) t as pos, val
union all
-- part_5,指定小时
select
crontab,
cast(hour as int) hour
from
(
select distinct crontab,split(crontab,' ')[2] hour
from tmp_current_date_each_process_crontab
where split(crontab,' ')[2] not rlike '\\*|\\?|/|,|-'
)t
) ,tmp_current_date_each_process_minute as (
-- part_1,通过逗号分隔:
select
crontab,
cast(t.minute as int) minute
from
(
select
crontab,
split(crontab,' ')[1] minutes
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[1] rlike ','
)t lateral view explode(split(minutes,',')) t as minute
union all
-- part_2,通过斜杠分隔:
select
crontab,
cast(tmp.start_minute+t.pos as int) as minute
from
(
select
crontab,
split(split(crontab,' ')[1],'/')[0] start_minute,
split(split(crontab,' ')[1],'/')[1] separator_minute
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[1] rlike '/'
) tmp
lateral view posexplode(split(space(cast(59-start_minute as int)), '')) t as pos, val
where split(cast(pos/separator_minute as string),'\\.')[1]='0'
union all
-- part_3,通过横杠分隔:
select
crontab,
cast(tmp.start_minute+t.pos as int) as minute
from
(
select
crontab,
split(split(crontab,' ')[1],'-')[0] start_minute,
split(split(crontab,' ')[1],'-')[1] end_minute
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[1] rlike '-'
) tmp
lateral view posexplode(split(space(cast(end_minute-start_minute as int)), '')) t as pos, val
union all
-- part_4,每一分钟
select
crontab,
cast(t.pos as int) as minute
from
(
select distinct
crontab
from tmp_current_date_each_process_crontab
where split(crontab,' ')[1] = '*'
)t lateral view posexplode(split(space(59), '')) t as pos, val
union all
-- part_5,指定分钟
select
crontab,
cast(minute as int) minute
from
(
select distinct crontab,split(crontab,' ')[1] minute
from tmp_current_date_each_process_crontab
where split(crontab,' ')[1] not rlike '\\*|\\?|/|,|-'
)t
), tmp_current_date_each_process_second as (
-- part_1,通过逗号分隔:
select
crontab,
cast(t.second as int) second
from
(
select
crontab,
split(crontab,' ')[0] seconds
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[0] rlike ','
)t lateral view explode(split(seconds,',')) t as second
union all
-- part_2,通过斜杠分隔:
select
crontab,
cast(tmp.start_second+t.pos as int) as second
from
(
select
crontab,
split(split(crontab,' ')[0],'/')[0] start_second,
split(split(crontab,' ')[0],'/')[1] separator_second
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[0] rlike '/'
) tmp
lateral view posexplode(split(space(cast(59-start_second as int)), '')) t as pos, val
where split(cast(pos/separator_second as string),'\\.')[1]='0'
union all
-- part_3,通过横杠分隔:
select
crontab,
cast(tmp.start_second+t.pos as int) as second
from
(
select
crontab,
split(split(crontab,' ')[0],'-')[0] start_second,
split(split(crontab,' ')[0],'-')[1] end_second
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[0] rlike '-'
) tmp
lateral view posexplode(split(space(cast(end_second-start_second as int)), '')) t as pos, val
union all
-- part_4,每一秒
select
crontab,
cast(t.pos as int) as second
from
(
select distinct
crontab
from tmp_current_date_each_process_crontab
where split(crontab,' ')[0] = '*'
)t lateral view posexplode(split(space(59), '')) t as pos, val
union all
-- part_5,指定秒
select
crontab,
cast(second as int ) second
from
(
select distinct crontab,split(crontab,' ')[0] second
from tmp_current_date_each_process_crontab
where split(crontab,' ')[0] not rlike '\\*|\\?|/|,|-'
)t
), tmp_current_date_each_process_crontab_week_trans as (
select distinct crontab original_exp,replace(replace(replace(replace(replace(replace(replace(crontab,'SUN','1'),'MON','2'),'TUE','3'),'WED','4'),'THU','5'),'FRI','6'),'SAT','7') crontab
from tmp_current_date_each_process_crontab
), tmp_current_date_each_process_weekday as (
select
original_exp crontab,
cast(week as int) weekday
from
(
select
crontab,original_exp,
split(crontab,' ')[5] weeks
from
(
select crontab, original_exp
from tmp_current_date_each_process_crontab_week_trans
)t where split(crontab,' ')[3] = '?'
and split(crontab,' ')[5] rlike ','
)t lateral view explode(split(weeks,',')) t as week
union all
-- part_1_2,通过斜杠分隔:
select
original_exp crontab,
cast(tmp.start_week+t.pos as int) as weekday
from
(
select
crontab,original_exp,
split(split(crontab,' ')[5],'/')[0] start_week,
split(split(crontab,' ')[5],'/')[1] separator_week
from
(
select crontab, original_exp
from tmp_current_date_each_process_crontab_week_trans
)t where split(crontab,' ')[3] = '?' and split(crontab,' ')[5] rlike '/'
) tmp
lateral view posexplode(split(space(cast(7-start_week as int)), '')) t as pos, val
where split(cast(pos/separator_week as string),'\\.')[1]='0'
union all
-- part_1_3,通过横杠分隔:
select
crontab,
cast(tmp.start_week+t.pos as int) weekday
from
(
select
crontab,
split(split(crontab,' ')[5],'-')[0] start_week,
split(split(crontab,' ')[5],'-')[1] end_week
from
(
select crontab
from tmp_current_date_each_process_crontab_week_trans
)t where split(crontab,' ')[3] = '?' and split(crontab,' ')[5] rlike '-'
) tmp
lateral view posexplode(split(space(cast(end_week-start_week as int)), '')) t as pos, val
union all
-- part_1_4,每一周天,理论上其实不会有这种情况
select
crontab,
cast(t.pos+1 as int) as weekday
from
(
select crontab, original_exp
from tmp_current_date_each_process_crontab_week_trans
where split(original_exp,' ')[3] = '?' and split(original_exp,' ')[5] = '*'
)t lateral view posexplode(split(space(6), '')) t as pos, val
union all
-- part_1_5,指定星期
select
crontab,
cast(split(crontab,' ')[5] as int) weekday
from
(
select crontab
from tmp_current_date_each_process_crontab_week_trans
where split(original_exp,' ')[5] not rlike '\\*|\\?|/|,|-'
)t
), tmp_current_date_each_process_monthday as (
-- part_2_1,通过逗号分隔:
select
crontab,
cast(day as int) monthday
from
(
select
crontab,
split(crontab,' ')[3] days
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[3] <> '?'
and split(crontab,' ')[3] rlike ','
)t lateral view explode(split(days,',')) t as day
union all
-- part_2_2,通过斜杠分隔:
select
crontab,
cast(tmp.start_day+t.pos as int) as monthday
from
(
select
crontab,
split(split(crontab,' ')[3],'/')[0] start_day,
split(split(crontab,' ')[3],'/')[1] separator_day
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[3] <> '?' and split(crontab,' ')[3] rlike '/'
) tmp
lateral view posexplode(split(space(cast(dayofmonth(last_day(current_date()))-start_day as int)), '')) t as pos, val
where split(cast(pos/separator_day as string),'\\.')[1]='0'
union all
-- part_2_3,通过横杠分隔:
select
crontab,
cast(tmp.start_day+t.pos as int) as monthday
from
(
select
crontab,
split(split(crontab,' ')[3],'-')[0] start_day,
split(split(crontab,' ')[3],'-')[1] end_day
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
)t where split(crontab,' ')[3] <> '?' and split(crontab,' ')[3] rlike '-'
) tmp
lateral view posexplode(split(space(cast(end_day-start_day as int)), '')) t as pos, val
union all
-- part_2_4,每一月天
select
crontab,
cast(t.pos+1 as int) as monthday
from
(
select distinct crontab
from tmp_current_date_each_process_crontab
where split(crontab,' ')[3] = '*'
)t lateral view posexplode(split(space(dayofmonth(last_day(current_date()))-1), '')) t as pos, val
union all
-- part_2_5,指定天
select
crontab,
cast(day as int) monthday
from
(
select distinct crontab,split(crontab,' ')[3] day
from tmp_current_date_each_process_crontab
where split(crontab,' ')[3] not rlike '\\*|\\?|/|,|-'
)t
), tmp_current_date_each_process_crontab_month_trans as (
select distinct crontab original_exp, replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(replace(crontab,'JAN','1'),'FEB','2'),'MAR','3'),'APR','4'),'MAY','5'),'JUN','6'),'JUL','7'),'AUG','8'),'SEP','9'),'OCT','10'),'NOV','11'),'DEC','12') crontab
from tmp_current_date_each_process_crontab
), tmp_current_date_each_process_month as (
-- part_1,通过逗号分隔:
select
original_exp crontab,
cast(month as int) month
from
(
select
crontab,original_exp,
split(crontab,' ')[4] months
from
(
select crontab, original_exp
from tmp_current_date_each_process_crontab_month_trans
)t where split(crontab,' ')[4] not rlike '\\*|\\?'
and split(crontab,' ')[4] rlike ','
)t lateral view explode(split(months,',')) t as month
union all
-- part_2,通过斜杠分隔:
select
original_exp crontab,
cast(tmp.start_month+t.pos as int) as month
from
(
select
crontab,original_exp,
split(split(crontab,' ')[4],'/')[0] start_month,
split(split(crontab,' ')[4],'/')[1] separator_month
from
(
select crontab, original_exp
from tmp_current_date_each_process_crontab_month_trans
)t where split(crontab,' ')[4] not rlike '\\*|\\?'
and split(crontab,' ')[4] rlike '/'
) tmp
lateral view posexplode(split(space(cast(12-start_month as int)), '')) t as pos, val
where split(cast(pos/separator_month as string),'\\.')[1]='0'
union all
-- part_3,通过横杠分隔:
select
original_exp crontab,
cast(tmp.start_month+t.pos as int) as month
from
(
select
crontab,original_exp,
split(split(crontab,' ')[4],'-')[0] start_month,
split(split(crontab,' ')[4],'-')[1] end_month
from
(
select crontab, original_exp
from tmp_current_date_each_process_crontab_month_trans
)t where split(crontab,' ')[4] not rlike '\\*|\\?'
and split(crontab,' ')[4] rlike '-'
) tmp
lateral view posexplode(split(space(cast(end_month-start_month as int)), '')) t as pos, val
union all
-- part_4,每一月
select
original_exp crontab,
cast(t.pos+1 as int) as month
from
(
select crontab, original_exp
from tmp_current_date_each_process_crontab_month_trans
where split(original_exp,' ')[4] = '*'
)t lateral view posexplode(split(space(11), '')) t as pos, val
union all
-- part_5,指定月
select
original_exp crontab,
cast(split(crontab,' ')[4] as int) month
from
(
select crontab, original_exp
from tmp_current_date_each_process_crontab_month_trans
where split(original_exp,' ')[4] not rlike '\\*|\\?|/|,|-'
)t
), tmp_current_date_each_process_scheduler_list_today as (
select
a.crontab,
b.monthday,
b.weekday,
d.month
from
(
select crontab
from tmp_current_date_each_process_crontab
)a
inner join
(
select
crontab,
concat_ws(',',collect_set(cast(monthday as string))) monthday,
concat_ws(',',collect_set(cast(weekday as string))) weekday
from
(
-- 按月天筛选
select crontab,monthday,null weekday
from tmp_current_date_each_process_monthday
where cast(day(current_date()) as int)=cast(monthday as int)
group by crontab, monthday
union all
-- 按周天筛选
select crontab,day(current_date()) monthday,weekday
from tmp_current_date_each_process_weekday
where cast(dayofweek(current_date()) as int)=cast(weekday as int)
group by crontab, weekday
)t group by crontab
)b on a.crontab=b.crontab
inner join
(-- 按月筛选
select crontab,month
from tmp_current_date_each_process_month
where cast(month(current_date()) as int)=cast(month as int)
group by crontab,month
)d on a.crontab=d.crontab
), tmp_current_date_each_process_scheduler_time_today as (
select
a.crontab,
month,
monthday,
weekday,
hour,
minute,
second,
concat(year(current_date()),'-',month,'-',monthday,' ',hour,':',minute,':',second) scheduler_time,
from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') etl_time
from
(
select crontab,
case when monthday<10 then concat('0',monthday) else monthday end monthday,
case when weekday<10 then concat('0',weekday) else weekday end weekday,
case when month<10 then concat('0',month) else month end month
from tmp_current_date_each_process_scheduler_list_today
)a
left join
( -- 获取执行的小时
select crontab,case when hour<10 then concat('0',hour) else hour end hour
from tmp_current_date_each_process_hour
)b on a.crontab=b.crontab
left join
( -- 获取执行的分钟
select crontab,case when minute<10 then concat('0',minute) else minute end minute
from tmp_current_date_each_process_minute
)c on a.crontab=c.crontab
left join
( -- 获取执行的秒
select crontab,case when second<10 then concat('0',second) else second end second
from tmp_current_date_each_process_second
)d on a.crontab=d.crontab
)
insert overwrite table dwd.dwd_dolphinscheduler_process_scheduler_time partition (d)
select
a.code project_code,
a.project_name,
b.code process_code,
b.name process_name,
b.description process_description,
c.crontab,
c.month scheduler_month,
c.monthday scheduler_monthday,
c.weekday scheduler_weekday,
c.hour scheduler_hour,
c.minute scheduler_minute,
c.second scheduler_second,
c.scheduler_time,
from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') etl_time,
date_format(current_date(),'yyyyMMdd') d
from
(
select code,name project_name
from bak_dolphinscheduler.t_ds_project
)a
left join
(
select code,name,description,project_code,user_id
from bak_dolphinscheduler.t_ds_process_definition
where release_state=1 and flag=1
)b on a.code = b.project_code
inner join
(
select
aa.process_definition_code,
aa.crontab,
bb.month, monthday, weekday, hour, minute, second,
bb.scheduler_time
from
(
select process_definition_code,crontab
from bak_dolphinscheduler.t_ds_schedules
where release_state=1
)aa
left join
(
select crontab, month, monthday, weekday, hour, minute, second, scheduler_time
from tmp_current_date_each_process_scheduler_time_today
)bb on aa.crontab=bb.crontab
)c on b.code=c.process_definition_code ;
最终结果
在计算完成后即可根据该分区表获取到每个工作流的当日理论执行时间,为重跑工作流提供了更清晰的执行顺序。不过现有代码其实还不够完全,比较熟悉crontab表达式的朋友应该可以注意到没有对L和W进行处理,不过已经满足我们的需求了,有需要的朋友可以在此基础上自行添加相关逻辑。后续在捋清楚工作流直接的依赖关系后再通过shell和DS提供的API即可实现整体重跑(前提是你的工作流不会因重跑导致发生问题)。
select project_name,process_name,crontab,min(scheduler_time) first_scheduler_time,max(scheduler_time) last_scheduler_time,count(distinct scheduler_time) scheduler_times
from dwd.dwd_dolphinscheduler_process_scheduler_time
where d=date_format(current_date(),'yyyyMMdd') and scheduler_time is not null and project_name='dma'
group by project_name, process_name,crontab
order by first_scheduler_time ;
附录
附上表的DDL语句和表血缘关系,其中bak_dolphinscheduler库下的表来源于dolphinscheduler的Mysql数据库。
CREATE TABLE dwd.dwd_dolphinscheduler_process_scheduler_time(
`project_code` string COMMENT '项目编码',
`project_name` string COMMENT '项目名称',
`process_code` string COMMENT '工作流编码',
`process_name` string COMMENT '工作流名称',
`process_description` string COMMENT '工作流说明',
`crontab` string COMMENT 'cron表达式',
`scheduler_month` string COMMENT '调度执行-月',
`scheduler_monthday` string COMMENT '调度执行-日',
`scheduler_weekday` string COMMENT '调度执行-星期',
`scheduler_hour` string COMMENT '调度执行-小时',
`scheduler_minute` string COMMENT '调度执行-分钟',
`scheduler_second` string COMMENT '调度执行-秒',
`scheduler_time` string COMMENT '调度执行时间',
`etl_time` string COMMENT '计算时间'
)COMMENT '工作流每日理论调度时间分区表'
PARTITIONED BY (`d` string)
dwd.dwd_dolphinscheduler_process_scheduler_time
hive来源表:
├─ bak_dolphinscheduler.t_ds_schedules
├─ bak_dolphinscheduler.t_ds_project
├─ bak_dolphinscheduler.t_ds_process_definition
with语句临时表:
├─ tmp_current_date_each_process_hour
├─ tmp_current_date_each_process_minute
├─ tmp_current_date_each_process_second
├─ tmp_current_date_each_process_crontab_week_trans
├─ tmp_current_date_each_process_weekday
├─ tmp_current_date_each_process_monthday
├─ tmp_current_date_each_process_crontab_month_trans
├─ tmp_current_date_each_process_month
├─ tmp_current_date_each_process_scheduler_list_today
├─ tmp_current_date_each_process_scheduler_time_today
└─ tmp_current_date_each_process_crontab
版权归原作者 Aser5544 所有, 如有侵权,请联系我们删除。