0


二百六十六、Hive——Hive的DWD层数据清洗、清洗记录、数据修复、数据补全

一、目的

数据清洗是数据治理的关键,是提高数据质量的核心!数据清洗后,还有错误数据、清洗记录、数据重复性、数据准确性、错误数据修复、缺少数据补全等等

二、清洗步骤(以转向比数据为案例)

2.1 ODS层原始数据

create external table  if not exists  hurys_db.ods_turnratio(
    device_no           string          comment '设备编号',
    source_device_type  string          comment '设备类型',
    sn                  string          comment '设备序列号 ',
    model               string          comment '设备型号',
    create_time         string       comment '创建时间',
    cycle               int             comment '转向比数据周期' ,
    volume_sum          int             comment '指定时间段内通过路口的车辆总数',
    speed_avg           float           comment '指定时间段内通过路口的所有车辆速度的平均值',
    volume_left         int             comment '指定时间段内通过路口的左转车辆总数',
    speed_left          float           comment '指定时间段内通过路口的左转车辆速度的平均值',
    volume_straight     int             comment '指定时间段内通过路口的直行车辆总数',
    speed_straight      float           comment '指定时间段内通过路口的直行车辆速度的平均值',
    volume_right        int             comment '指定时间段内通过路口的右转车辆总数',
    speed_right         float           comment '指定时间段内通过路口的右转车辆速度的平均值',
    volume_turn         int             comment '指定时间段内通过路口的掉头车辆总数',
    speed_turn          float           comment '指定时间段内通过路口的掉头车辆速度的平均值'
)
comment '转向比数据外部表——静态分区'
partitioned by (day string)
row format delimited fields terminated by ','
stored as SequenceFile
;

2.2 DWD层原始数据清洗

主要是数据格式、数值范围以及逻辑方面的清洗

2.2.1 建表语句

create  table  if not exists  hurys_db.dwd_turnratio(
    id                  string          comment '唯一ID',
    device_no           string          comment '设备编号',
    source_device_type  string          comment '设备类型',
    sn                  string          comment '设备序列号 ',
    model               string          comment '设备型号',
    create_time         string       comment '创建时间',
    cycle               int             comment '转向比数据周期' ,
    volume_sum          int             comment '指定时间段内通过路口的车辆总数',
    speed_avg           decimal(10,2)   comment '指定时间段内通过路口的所有车辆速度的平均值',
    volume_left         int             comment '指定时间段内通过路口的左转车辆总数',
    speed_left          decimal(10,2)   comment '指定时间段内通过路口的左转车辆速度的平均值',
    volume_straight     int             comment '指定时间段内通过路口的直行车辆总数',
    speed_straight      decimal(10,2)   comment '指定时间段内通过路口的直行车辆速度的平均值',
    volume_right        int             comment '指定时间段内通过路口的右转车辆总数',
    speed_right         decimal(10,2)   comment '指定时间段内通过路口的右转车辆速度的平均值',
    volume_turn         int             comment '指定时间段内通过路口的掉头车辆总数',
    speed_turn          decimal(10,2)   comment '指定时间段内通过路口的掉头车辆速度的平均值'
)
comment '转向比数据表——动态分区'
partitioned by (day string)   --分区字段不能是表中已经存在的数据,可以将分区字段看作表的伪列。
stored as orc                 --表存储数据格式为orc
;

2.2.2 清洗规则

2.2.3 清洗SQL

with t1 as (
select
        device_no,
        source_device_type,
        sn,
        model,
        create_time,
        cycle,
        case when  volume_sum      is null then 0 else volume_sum       end as  volume_sum,
        case when  speed_avg       is null then 0 else cast(speed_avg   as decimal(10,2))     end as   speed_avg,
        case when  volume_left     is null then 0 else volume_left      end as  volume_left ,
        case when  speed_left      is null then 0 else cast(speed_left  as decimal(10,2))     end as  speed_left,
        case when  volume_straight is null then 0 else volume_straight  end as  volume_straight,
        case when  speed_straight  is null then 0 else cast(speed_straight as decimal(10,2))  end as speed_straight,
        case when  volume_right    is null then 0 else volume_right     end as  volume_right,
        case when  speed_right     is null then 0 else cast(speed_right as decimal(10,2))     end as speed_right ,
        case when  volume_turn     is null then 0 else volume_turn      end as  volume_turn ,
        case when  speed_turn      is null then 0 else cast(speed_turn  as decimal(10,2))     end as  speed_turn,
        substr(create_time,1,10) day
from hurys_db.ods_turnratio
where  day ='2024-09-10'
)
insert overwrite table hurys_db.dwd_turnratio partition (day)
select
       UUID()  as  id,
       device_no,
       source_device_type,
       sn,
       model,
       create_time,
       cycle,
       volume_sum,
       speed_avg,
       volume_left,
       speed_left,
       volume_straight,
       speed_straight,
       volume_right,
       speed_right,
       volume_turn,
       speed_turn,
       day
from t1
where  day ='2024-09-10' and device_no is not null  and create_time  is not null
and volume_sum       between 0 and 1000  and speed_avg        between 0 and 150
and volume_left      between 0 and 1000  and speed_left       between 0 and 100
and volume_straight  between 0 and 1000  and speed_straight   between 0 and 150
and volume_right     between 0 and 1000  and speed_right      between 0 and 100
and volume_turn      between 0 and 100   and speed_turn       between 0 and 100
group by device_no, source_device_type, sn, model, create_time, cycle, volume_sum, speed_avg, volume_left, speed_left, volume_straight, speed_straight, volume_right, speed_right, volume_turn, speed_turn, day
;

2.3 转向比错误数据表

2.3.1 建表语句

create  table  if not exists  hurys_db.dwd_turnratio_error(
    id                  string          comment '唯一ID',
    device_no           string          comment '设备编号',
    source_device_type  string          comment '设备类型',
    sn                  string          comment '设备序列号 ',
    model               string          comment '设备型号',
    create_time         string       comment '创建时间',
    cycle               int             comment '转向比数据周期' ,
    volume_sum          int             comment '指定时间段内通过路口的车辆总数',
    speed_avg           float           comment '指定时间段内通过路口的所有车辆速度的平均值',
    volume_left         int             comment '指定时间段内通过路口的左转车辆总数',
    speed_left          float           comment '指定时间段内通过路口的左转车辆速度的平均值',
    volume_straight     int             comment '指定时间段内通过路口的直行车辆总数',
    speed_straight      float           comment '指定时间段内通过路口的直行车辆速度的平均值',
    volume_right        int             comment '指定时间段内通过路口的右转车辆总数',
    speed_right         float           comment '指定时间段内通过路口的右转车辆速度的平均值',
    volume_turn         int             comment '指定时间段内通过路口的掉头车辆总数',
    speed_turn          float           comment '指定时间段内通过路口的掉头车辆速度的平均值'
)
comment '转向比错误数据表——动态分区'
partitioned by (day string)   --分区字段不能是表中已经存在的数据,可以将分区字段看作表的伪列。
stored as orc                 --表存储数据格式为orc
;

2.3.2 SQL语句

insert  overwrite  table  hurys_db.dwd_turnratio_error partition(day)
select
UUID()  as  id,
t2.device_no, t2.source_device_type,t2.sn, t2.model, t2.create_time, t2.cycle, t2.volume_sum,
t2.speed_avg, t2.volume_left, t2.speed_left, t2.volume_straight, t2.speed_straight, t2.volume_right,
t2.speed_right, t2.volume_turn, t2.speed_turn, t2.day
from hurys_db.ods_turnratio as t2
left join hurys_db.dwd_turnratio as t3
on t3.device_no=t2.device_no and t3.create_time=t2.create_time
where t3.device_no is null and t3.create_time is null and t2.day='2024-09-10'
;

2.4 转向比数据清洗记录表

2.4.1 建表语句

create  table  if not exists  hurys_db.dwd_data_clean_record_turnratio(
    id             string     comment '唯一ID',
    data_type      int        comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',
    device_no      string     comment '设备编号',
    create_time    string  comment '创建时间',
    field_name     string     comment '字段名',
    field_value    string     comment '字段值'
)
comment '转向比数据清洗记录表'
partitioned by (day string)
stored as orc
;

2.4.2 SQL语句

with t3 as(
select
       id,
       device_no,
       case when device_no is null then CONCAT('device_no:', 'null')  END AS device_no_value,
       create_time,
       case when volume_sum < 0 or volume_sum >1000 then CONCAT('volume_sum:', CAST(volume_sum AS STRING)) END AS volume_sum_value,
       case when speed_avg < 0 or speed_avg >150 then CONCAT('speed_avg:', CAST(speed_avg AS STRING)) END AS speed_avg_value,
       case when volume_left < 0 or volume_left >1000 then CONCAT('volume_left:', CAST(volume_left AS STRING)) END AS volume_left_value,
       case when speed_left < 0 or speed_left >100 then CONCAT('speed_left:', CAST(speed_left AS STRING)) END AS speed_left_value,
       case when volume_straight < 0 or volume_straight >1000 then CONCAT('volume_straight:', CAST(volume_straight AS STRING)) END AS volume_straight_value,
       case when speed_straight < 0 or speed_straight >150 then CONCAT('speed_straight:', CAST(speed_straight AS STRING)) END AS speed_straight_value,
       case when volume_right < 0 or volume_right >1000 then CONCAT('volume_right:', CAST(volume_right AS STRING)) END AS volume_right_value,
       case when speed_right < 0 or speed_right >100 then CONCAT('speed_right:', CAST(speed_right AS STRING)) END AS speed_right_value,
       case when volume_turn < 0 or volume_turn >100 then CONCAT('volume_turn:', CAST(volume_turn AS STRING)) END AS volume_turn_value,
       case when speed_turn < 0 or speed_turn >100 then CONCAT('speed_turn:', CAST(speed_turn AS STRING)) END AS speed_turn_value,
       concat_ws(',',
           case when device_no is null then CONCAT('device_no:','null') END ,
           case when volume_sum < 0 or volume_sum >1000 then CONCAT('volume_sum:', CAST(volume_sum AS STRING)) END,
           case when speed_avg < 0 or speed_avg >150 then CONCAT('speed_avg:', CAST(speed_avg AS STRING)) END,
           case when volume_left < 0 or volume_left >1000 then CONCAT('volume_left:', CAST(volume_left AS STRING)) END,
           case when speed_left < 0 or speed_left >100 then CONCAT('speed_left:', CAST(speed_left AS STRING)) END,
           case when volume_straight < 0 or volume_straight >1000 then CONCAT('volume_straight:', CAST(volume_straight AS STRING)) END,
           case when speed_straight < 0 or speed_straight >150 then CONCAT('speed_straight:', CAST(speed_straight AS STRING)) END,
           case when volume_right < 0 or volume_right >1000 then CONCAT('volume_right:', CAST(volume_right AS STRING)) END,
           case when speed_right < 0 or speed_right >100 then CONCAT('speed_right:', CAST(speed_right AS STRING)) END,
           case when volume_turn < 0 or volume_turn >100 then CONCAT('volume_turn:', CAST(volume_turn AS STRING)) END,
           case when speed_turn < 0 or speed_turn >100 then CONCAT('speed_turn:', CAST(speed_turn AS STRING)) END
       ) AS kv_pairs  ,
       day
from hurys_db.dwd_turnratio_error
    where day='2024-09-10'
)
insert  overwrite  table  hurys_db.dwd_data_clean_record_turnratio partition(day)
select
    id,
    '1' data_type,
    t3.device_no,
    create_time,
    split(pair, ':')[0] AS field_name,
    split(pair, ':')[1] AS field_value,
    day
from t3
lateral view explode(split(t3.kv_pairs , ',')) exploded_table AS pair
where device_no_value is not null   or volume_sum_value is not null      or volume_left_value is not null
or volume_right_value is not null   or volume_straight_value is not null or volume_turn_value is not null
or speed_avg_value is not null      or speed_left_value is not null      or speed_right_value is not null
or speed_straight_value is not null or speed_turn_value is not null
;

2.5 数据重复性统计表

2.5.1 建表语句

create  table  if not exists  hurys_db.dwd_data_duplicate(
    data_type      int        comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',
    device_no      string     comment '设备编号',
    data_duplicate float      comment '数据重复率'
)
comment '数据重复性统计表'
partitioned by (day string)
stored as orc
;

2.5.2 SQL语句

insert  overwrite  table  hurys_db.dwd_data_duplicate partition(day)
select
       '1' data_type,
       device_no,
       round(sum(num)/count_num,2)  data_duplicate,
       day
from (select
       device_no,
       create_time,
       count(1) num,
       count_num,
       day
from (select
       device_no,
       create_time ,
       count(device_no) over(partition by device_no,day)  count_num  ,
       day
from  hurys_db.ods_turnratio
        where day='2024-09-04'
) as t2
group by device_no, create_time, count_num, day
having count(1) > 1
) as t3
group by device_no, count_num, day;

2.6 数据准确性统计表

2.6.1 建表语句

create  table  if not exists  hurys_db.dwd_data_accuracy(
    data_type               int        comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',
    device_no               string     comment '设备编号',
    field_name              string     comment '字段名',
    data_unreasonable_rate  float      comment '数据不合理率',
    data_null_rate          float      comment '数据空值率'
)
comment '数据准确性统计表'
partitioned by (day string)
stored as orc
;

2.6.2 SQL语句

insert  overwrite  table  hurys_db.dwd_data_accuracy  partition(day)
select
       t1.data_type,
       t1.device_no,
       t1.field_name,
       round((sum(case when t1.field_value is not null then 1 else 0 end)/t2.count_device_all),2)  data_unreasonable_rate,
       round((sum(case when t1.field_value is null then 1 else 0 end)/t2.count_device_all),2) data_null_rate ,
       t1.day
from hurys_db.dwd_data_clean_record_turnratio as t1
left join (select
                device_no,
                day,
                count(device_no) count_device_all
           from hurys_db.ods_turnratio
           where day='2024-09-04'
           group by device_no, day
          ) as  t2
on t2.device_no=t1.device_no and t2.day=t1.day
where t2.count_device_all is not null
group by t1.data_type, t1.device_no, t1.field_name, t2.count_device_all, t1.day;

2.7 转向比字段数据修复

2.7.1 修复策略

使用前三周同期数据取平均进行修复

2.7.2 SQL语句

insert into table  hurys_db.dwd_turnratio  partition(day)
select
       a3.id,
       a3.device_no,
       a3.source_device_type,
       a3.sn,
       a3.model,
       a3.create_time,
       a3.cycle,
       case when  a3.volume_sum between 0 and 1000 then a3.volume_sum else a2.avg_volume_sum end as volume_sum ,
       case when  a3.speed_avg between 0 and 150 then a3.speed_avg else a2.avg_speed_avg end as speed_avg,
       case when  a3.volume_left between 0 and 1000 then a3.volume_left else a2.avg_volume_left end as volume_left,
       case when  a3.speed_left between 0 and 100 then a3.speed_left else a2.avg_speed_left end as speed_left,
       case when  a3.volume_straight between 0 and 1000 then a3.volume_straight else a2.avg_volume_straight end as volume_straight   ,
       case when  a3.speed_straight between 0 and 150 then a3.speed_straight else a2.avg_speed_straight end as speed_straight,
       case when  a3.volume_right between 0 and 1000 then a3.volume_right else a2.avg_volume_right end as volume_right ,
       case when  a3.speed_right between 0 and 100 then a3.speed_right else a2.avg_speed_right end as speed_right,
       case when  a3.volume_turn between 0 and 100 then a3.volume_turn else a2.avg_volume_turn end as volume_turn,
       case when  a3.speed_turn between 0 and 100 then a3.speed_turn else a2.avg_speed_turn end as speed_turn,
       day
from hurys_db.dwd_turnratio_error as a3
right join (select
       a1.device_no,
       a1.create_time,
       round(avg(volume_sum),0)      avg_volume_sum,
       round(avg(speed_avg),2)       avg_speed_avg,
       round(avg(volume_left),0)     avg_volume_left,
       round(avg(speed_left),2)      avg_speed_left,
       round(avg(volume_straight),0) avg_volume_straight,
       round(avg(speed_straight),2)  avg_speed_straight,
       round(avg(volume_right),0)    avg_volume_right,
       round(avg(speed_right),2)     avg_speed_right,
       round(avg(volume_turn),0)     avg_volume_turn,
       round(avg(speed_turn),2)      avg_speed_turn
from(select
t1.device_no, t1.create_time start_time, t2.create_time, t1.volume_sum, t1.speed_avg, t1.volume_left, t1.speed_left,
t1.volume_straight, t1.speed_straight, t1.volume_right, t1.speed_right, t1.volume_turn, t1.speed_turn
from hurys_db.dwd_turnratio as t1
right join hurys_db.dwd_turnratio_error as t2
on t2.device_no=t1.device_no and  concat(date_sub(t2.create_time,7),substr(t2.create_time,11,10)) = t1.create_time
where t1.device_no is not null
union all
select
t1.device_no, t1.create_time start_time, t2.create_time, t1.volume_sum, t1.speed_avg, t1.volume_left, t1.speed_left,
t1.volume_straight, t1.speed_straight, t1.volume_right, t1.speed_right, t1.volume_turn, t1.speed_turn
from hurys_db.dwd_turnratio as t1
right join hurys_db.dwd_turnratio_error as t2
on t2.device_no=t1.device_no and  concat(date_sub(t2.create_time,14),substr(t2.create_time,11,10)) = t1.create_time
where t1.device_no is not null
union all
select
t1.device_no, t1.create_time start_time, t2.create_time, t1.volume_sum, t1.speed_avg, t1.volume_left, t1.speed_left,
t1.volume_straight, t1.speed_straight, t1.volume_right, t1.speed_right, t1.volume_turn, t1.speed_turn
from hurys_db.dwd_turnratio as t1
right join hurys_db.dwd_turnratio_error as t2
on t2.device_no=t1.device_no and  concat(date_sub(t2.create_time,21),substr(t2.create_time,11,10)) = t1.create_time
where t1.device_no is not null) as a1
group by a1.device_no, create_time
    ) as a2
on a3.device_no=a2.device_no and a3.create_time=a2.create_time
where a3.day='2024-09-04'
;

2.8 转向比整条数据补全

2.8.1 补全策略

使用前三周同期数据取平均进行补全

2.8.2 SQL语句

insert into table  hurys_db.dwd_turnratio partition(day)
select
       UUID()  as  id,
       a3.device_no,
       a3.source_device_type,
       a3.sn,
       a3.model,
       a3.miss_time create_time,
       round(avg(cycle),0)           cycle,
       round(avg(volume_sum),0)      volume_sum,
       round(avg(speed_avg),2)       speed_avg,
       round(avg(volume_left),0)     volume_left,
       round(avg(speed_left),2)      speed_left,
       round(avg(volume_straight),0) volume_straight,
       round(avg(speed_straight),2)  speed_straight,
       round(avg(volume_right),0)    volume_right,
       round(avg(speed_right),2)     speed_right,
       round(avg(volume_turn),0)     volume_turn,
       round(avg(speed_turn),2)      speed_turn,
       a3.day
from(
with a2 as (
select
     a1.device_no,a1.day,all_time  miss_time, a1.source_device_type, a1.sn, a1.model
from (select
            t1.device_no,t1.day,t1.source_device_type, t1.sn, t1.model,
            concat(substr(t1.create_time, 1, 11), t2.frequency_time) all_time
      from hurys_db.dwd_turnratio as t1
      cross join hurys_db.dwd_frequency_time as t2
      where t1.day = '2024-09-04' and t2.frequency_rate='5分钟'
      group by t1.device_no, t1.day, t1.source_device_type, t1.sn, t1.model, concat(substr(t1.create_time, 1, 11), t2.frequency_time)
     ) as a1
left join hurys_db.dwd_turnratio as t3
on a1.device_no=t3.device_no and a1.all_time=t3.create_time and t3.day='2024-09-04'
 where t3.create_time is null
)
select
a2.device_no, a2.source_device_type, a2.sn, a2.model, a2.miss_time,t4.create_time, t4.cycle,
t4.volume_sum, t4.speed_avg, t4.volume_left, t4.speed_left,t4.volume_straight,
t4.speed_straight, t4.volume_right, t4.speed_right, t4.volume_turn, t4.speed_turn,a2.day
from a2
left join hurys_db.dwd_turnratio as t4
on a2.device_no=t4.device_no and concat(date_sub(a2.miss_time,7),substr(a2.miss_time,11,10)) = t4.create_time
where t4.device_no is not null
union all
select
a2.device_no, a2.source_device_type, a2.sn, a2.model, a2.miss_time,t4.create_time, t4.cycle,
t4.volume_sum, t4.speed_avg, t4.volume_left, t4.speed_left,t4.volume_straight,
t4.speed_straight, t4.volume_right, t4.speed_right, t4.volume_turn, t4.speed_turn,a2.day
from a2
left join hurys_db.dwd_turnratio as t4
on a2.device_no=t4.device_no and concat(date_sub(a2.miss_time,14),substr(a2.miss_time,11,10)) = t4.create_time
where t4.device_no is not null
union all
select
a2.device_no, a2.source_device_type, a2.sn, a2.model, a2.miss_time,t4.create_time, t4.cycle,
t4.volume_sum, t4.speed_avg, t4.volume_left, t4.speed_left,t4.volume_straight,
t4.speed_straight, t4.volume_right, t4.speed_right, t4.volume_turn, t4.speed_turn,a2.day
from a2
left join hurys_db.dwd_turnratio as t4
on a2.device_no=t4.device_no and concat(date_sub(a2.miss_time,21),substr(a2.miss_time,11,10)) = t4.create_time
where t4.device_no is not null
    ) as a3
group by a3.device_no, a3.source_device_type, a3.sn, a3.model, a3.miss_time, a3.day
;

2.9 数据补全以及数据修复记录表

2.9.1 建表语句

create  table  if not exists  hurys_db.dwd_data_correction_record(
    data_type      int        comment '数据类型 1:转向比,2:统计,3:评价,4:区域,6:静态排队,7:动态排队',
    device_no      string     comment '设备编号',
    id             string     comment '唯一ID',
    create_time    timestamp  comment '创建时间',
    record_type    int        comment '记录类型 0:补全,1:修复'
)
comment '数据补全以及数据修复记录表'
partitioned by (day string)
stored as orc
;

2.9.2 SQL语句

2.9.2.1 转向比数据修复记录
insert into table  hurys_db.dwd_data_correction_record partition(day)
select
       '1' data_type,
       t1.device_no,
       t1.id,
       t1.create_time,
       '1' record_type,
       t1.day
from hurys_db.dwd_turnratio_error as t1
right join hurys_db.dwd_turnratio as t2
on t1.id=t2.id and t1.device_no=t2.device_no
where t1.id is not null and t1.day='2024-09-04'
;
2.9.2.2 转向比补全记录
insert into table  hurys_db.dwd_data_correction_record partition(day)
select
       '1' data_type,
       t2.device_no,
       t2.id,
       t2.create_time,
       '0' record_type,
       t2.day
from hurys_db.dwd_turnratio as t2
left join (
    select
        device_no, create_time, day
    from hurys_db.ods_turnratio
        where day='2024-09-04'
        ) as t3
on t3.device_no=t2.device_no and t3.create_time=t2.create_time
where t3.device_no is null and t3.create_time is null  and t2.day='2024-09-04'
;

搞定!目前数据清洗这一层就这么多,后面继续完善!


本文转载自: https://blog.csdn.net/tiantang2renjian/article/details/142358838
版权归原作者 天地风雷水火山泽 所有, 如有侵权,请联系我们删除。

“二百六十六、Hive——Hive的DWD层数据清洗、清洗记录、数据修复、数据补全”的评论:

还没有评论