0


Hive拉链表

Hive拉链表

背景

笔者在将DataStage任务翻写为Hive On Tez任务时,遇到一个拉链表,实在是头大,特此将脱敏后的套路及心得记录下来,以备后续翻阅。

原理

DataStage

在这里插入图片描述

脱敏后大致如图所示。首先从异构数据源获取新数据。然后和Oracle的结果表做left join,关联失败的当然是要插入到结果表的新数据,不解释。关联成功后,要去判断非主键的字段是否发生了变化,如果没有变化,当然是还要保留;如果发生了变化,就要对历史数据做Update,并且将变更后的新数据给插入到结果表。古人用DataStage操作的还是相当的熟练,奈何Oracle大势已去,现在是大数据的天下了。。。

HQL

HQL不像Spark和Flink那样可以从异构数据源做联邦查询。但是异构数据源的问题好办,统一入湖到Hive的ODS即可,就可以一句HQL读到数据。

由于结果表不是那种ACID的Orc表,Parquet表就没有办法去Update了。但是问题总还是需要解决的。

可以考虑全量覆盖的方式,将保留不变的历史数据、新增的数据、修改后的数据、修改后补充插入的新数据这4部分累加起来,就是所需的结果了。但是从没有分区的Hive表读数据再回灌回去有个问题。。。

由于做insert overwrite时,会在location指定的hdfs路径生成一个./tmp的中间路径,写完后,会删除老的Parquet文件,再去rename将./tmp的parquet“搬”回来。当删除老的Parquet文件,还没有来得及rename时MR任务失败,这种小概率事件有可能导致数据发生丢失,就需要手动敲命令抓trash或者手动rename,具有一定的风险,而且不利于发生异常时重跑和修数据,所以要采用类似Java中2PC【两阶段提交】的模式,先将结果灌入到tmp表,再从tmp表回灌到结果表。这么做可以保障事务,当HQL任务跑失败时可以回滚。就可以方便后续SQL Boy们运维时重跑任务。如果数据全部是从上游运算出来的,那么这种不保留历史数据的情况,大不了重跑,直接从结果表读数据运算后写回去也没啥太大的问题。

遇到的问题

Caused by:ong.apache.hadoop.hive.gl.parse.SemanticException:Column 代理键SK Found in more than OneTables/Subauenies--所以使用如下方式:insert overwrite覆盖历史数据,再insert into追加新数据

由于HQL写的太长了,可能遇到了解析的问题,所以解析失败,但是4个tmp单独都是能跑出数据的。所以笔者灵机一动,采用先overwrite覆盖历史数据,再去insert into追加数据的方式。由于迁移平台一定要迁移历史数据,那么这样也可以保证重跑任务时的幂等性。

伪代码

具体的业务不同,大体上拉链表都是这样的。

脱敏后大体如下:

with tmp_new as(--获取到异构数据源的新数据select
        需要的字段
    from
        统一入湖到ODS的Hive表1where
        某些条件
    
    unionallselect
        需要的字段
    from
        统一入湖到ODS的Hive表2where
        某些条件
    
    unionallselect
        需要的字段
    from
        统一入湖到ODS的Hive表3where
        某些条件
),
tmp_self1 as(--从自己取数select
        需要的字段,
        代理键sk
    from
        result_table t1
),
tmp_main as(--join出主数据select
        t1.需要的字段,
        t2.需要的字段
    from
        tmp_new t1
    leftjoin
        tmp_self1 t2
        on t1.主键=t2.主键
),
tmp_change as(--获取到发生变化的数据select
        需要的字段,
        t2.需要的字段,
        t2.代理键sk
    from
        tmp_main t1
    where
        代理键sk isnotnull--tmp_self1的代理键and(
            新的非主键字段值!=对应的原来的非主键字段值
        )),
tmp_exists_no as(--join失败的数据就是结果表不存在的数据,后续insert用select
        需要的字段,
        t2.需要的字段
    from
        tmp_main t1
    where
        代理键sk isnull),
tmp_exists_no_ins as(--实际插入的数据select
        需要的字段,1as flg_是否有效,
        昨天 as 开始的有效日期,
        to_date('9999-12-31')as 结束日期,casewhen 最大的sk isnullthen 代理键sk,else 代理键sk+最大的sk
            endas 代理键sk        
    from(select
            需要的字段,
            row_number()over(orderby 随便一个字段)as 代理键sk--后续生成实际的代理键时要用from(select
                t3.需要的字段,
                t3.代理键sk,
                t4.最大的sk
            from(select
                    t1.需要的字段,
                    t2.代理键sk_self
                from
                    tmp_exists_no t1
                leftjoin(--需要改动的数据select
                        主键,
                        代理键sk as 代理键sk
                    from
                        result_table
                    where
                        结束日期=前天
                    ) t2
                    on t1.主键=t2.主键
                )t3,(select
                        cast(max(代理键sk)asint)as 最大的sk
                    from
                        result_table
                )t4
            )t5
        ) t6
),
tmp_change_update as(--发生变化的数据要更新select
        t1.代理键,
        t2.所需保留数据的字段
        前天 as 结束日期,--需要修改的字段0as flg_是否有效,--需要修改的字段current_timestamp()as 末次时间戳
    from
        tmp_change t1,
        result_table t2
    where
        t1.主键=t2.主键
),
tmp_change_ins as(selectcasewhen 最大的sk isnullthen 代理键sk,else 代理键sk+最大的sk
            endas 代理键sk,
        需要的字段,
        昨天 as 开始的有效日期,current_timestamp()as 加载时间戳,from(select
            需要的字段,
            最大的sk,
            row_number()over(orderby 随便一个字段)as 代理键sk--生成从1开始的连续自然数from(select
                t1.需要的字段,
                t2.最大的sk
            from
                tmp_change t1,(select
                        cast(max(代理键sk)asint)as 最大的sk
                    from(select
                            代理键sk
                        from
                            result_table
                        
                        unionallselect
                            代理键sk
                        from
                            tmp_exists_no_ins    --因为是先插入这部分,才去插入修改后的数据。当然也可以换个顺序,这一坨就移到tmp_exists_no_ins中) t1
                ) t2
        )t3
    )t4
),
tmp_his as(--需要保留的历史数据select
    需要的字段
from
    result_table
where
    代理键sk notin(--不需要update的数据就是需要保留的历史数据,代理键sk唯一确定数据,可以当主键用select
            代理键sk
        from
            tmp_change_update
    ))insert overwrite table tmp表    --一定有历史数据select
    需要的字段
from
    tmp_his
;--这里在ctrl v一大坨一毛一样的HQLinsertintotable tmp表    --追加新insert的数据select
    需要的字段
from
    tmp_change_update
    
unionallselect
    需要的字段
from
    tmp_change_ins
    
unionallselect
    需要的字段
from
    tmp_exists_no_ins
;

由于笔者不是专业的SQL Boy,实在是没有能力一句insert 写完,故按照Spark的那种一步一个DataFrame的方式,HQL中一步一个tmp。。。

如果解决了

Found in more than OneTables/Subauenies

的问题,其实HQL可以短一点。。。

广大学徒工们可以参照笔者的套路,保持大体结构,修改业务不相同的部分,即可开发出拉链表的Hive On Tez任务。
在这里插入图片描述

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129679071

标签: hive 大数据 hadoop

本文转载自: https://blog.csdn.net/qq_41990268/article/details/129679071
版权归原作者 虎鲸不是鱼 所有, 如有侵权,请联系我们删除。

“Hive拉链表”的评论:

还没有评论