0


数仓基础知识_拉链表的详细讲解

拉链表

没错,就像衣服的拉链一样重要,实用性非常强,使用频率非常高。

拉链表核心思想,像个拉链,支持开链,支持闭链,支持退链,我们通常将最新的数据称为开链数据,历史数据称为闭链数据,拉链表支持历史数据查询,且空间占用较小,但是数据加工处理较为繁琐,属于时间换空间的设计方式,拉链表一个时间维度中同一个用户只保存一条用户状态。拉链表通常会增加三个技术字段“开始日期starttime、结束日期endtime、状态标识mark”。通过主键(PK)与历史数据进行对比,判断当前数据与历史数据是否发生变化,如果发生变化或者新增则进行相应的开链、闭链操作。

1.1 用户基础表拉链示例
以下使用用户基本信息表对拉链表操作进行深入刨析
2021-01-01用户基础表原始数据如下:

userID为主键,可变化字段为登录名(userName)、电话号码(phoneNum)、账号状态(status)、最近登录日期(lastLoginDate)。

setp1:首先设计拉链表的主键,根据原始数据表的表结构,选择userID作为拉链表PK键;
setp2:选择phoneNum、status作为notPK键,notPK作为对比字段,这里去除了lastLoginDate与dataTime这两个经常变化的字段,如果需要跟踪登陆日期lastLoginDate数据,请使用事件表,notPK键选择原则需要同时满足如下条件:

会发生变化的字段,且满足缓慢变化维SCD;

不能选择每天都发生变化的无对比意义字段(如dataTime字段,对于原始数据表这个字段是每天都变化的,所以没有对比意义);

setp3:确定数仓技术字段,本次加入数据加工日期etlTimestamp、开始时间startTime、结束时间endTime、标识位mark("i":新增,"u":修改,"d":删除)四个技术字段;

数据采集方式为T+1方式(今天计算昨天的数据)。

2021-01-02用户拉链表如下:

第一次加载因为拉链表历史数据为空,所以所有数据都为新增数据,标识位标为新增状态"i",开始时间为数据日期2021/1/1,当前所有数据都为最新数据,多以结束时间设置为一个较大的时间2999/12/31作为开链时间标识日期。

2021-01-02用户基础表原始数据如下:

使用原始数据表主键userID关联拉链表中开链数据(where endTime=2999/12/31)的userID,对比notPK字段是否相同,我们选择的notPK字段为phoneNum和status,发生变化的数据:
coolniu2021a0001用户电话号码发生变化;
coolniu2021a0005用户电话号码发生变化;

coolniu2021a0007用户状态发生变化;

新增coolniu2021a0008、coolniu2021a0009、coolniu2021a0010三位用户;

2021-01-03用户拉链表如下:

coolniu2021a0001用户拉链分析

coolniu2021a0001电话号码发生变化,在拉链表中将结束日期修改为变化时间的数据日期2021/1/2

代表coolniu2021a0001用户上一个状态结束。
又新增了一行coolniu2021a0001的数据

针对这条数据新的状态开始时间startTime为变化日期的数据日期2021/1/2,结束时间标识为2999/12/31(代表最新开链状态),标识位"u"(代表这条数据是修改状态)。
coolniu2021a0005用户拉链分析
coolniu2021a0005与coolniu2021a0001用户一样,都是电话号码发生了变化,拉链处理方式与coolniu2021a0001一致。

coolniu2021a0007用户拉链分析

coolniu2021a0007用户状态由1变成了销户状态"2",拉链表处理:

coolniu2021a0007账号状态为1的数据,结束时间修改为数据发生变化的时间2021/1/2

新增一条coolniu2021a0007记录,标识位mark标识为"d",开始时间为数据变化时间2021/1/2,结束时间修改为2021/1/2,表示该条数据已经闭链。

新增用户拉链分析
coolniu2021a0008、coolniu2021a0009、coolniu2021a0010三个用户为2021-01-02日新增用户,开始时间为2021/1/2,结束时间为2999/12/31(标识该数据为最新开链状态),mark位标为新增标识"i"。

未变化用户拉链分析
用户coolniu2021a0002、coolniu2021a0003、coolniu2021a0004、coolniu2021a0006用户notPK数据未变化,所以保持不变。

2021-01-03用户基础表原始数据如下:

1月3日新增了一条coolniu2021a0011数据。
2021-01-04用户拉链表如下:

因为其他天没有发生变化,所以在拉链表中保持不变,只新增一条coolniu2021a0011的记录。

1.2 拉链表算法示例
1.2.1 建表
source_table

CREATE TABLE

source_table

(

userid

string,

loginname

string,

regiondate

string,

phonenum

string,

birthday

string,

status

string,

lastlogindate

string)
PARTITIONED BY (datatime string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’
STORED AS TEXTFILE;

1.2.2 load数据

load data local INPATH ‘/tmp/xinniu/20210101’ into table source_table partition (datatime=‘20210101’);
load data local INPATH ‘/tmp/xinniu/20210102’ into table source_table partition (datatime=‘20210102’);
load data local INPATH ‘/tmp/xinniu/20210103’ into table source_table partition (datatime=‘20210103’);

1.2.3 创建拉链表

CREATE TABLE

zip_table

(

userid

string,

loginname

string,

regiondate

string,

phonenum

string,

birthday

string,

status

string,

lastlogindate

string,

datatime

string,

etltimestamp

string,

starttime

string,

endtime

string,

mark

string)

1.2.4 创建算法文件sqlfile

vim /tmp/xinniu/sqlfile

填写如下内容

– 创建一张拉链表的备份表 备份拉链表历史开链数据
CREATE TABLE IF NOT EXISTS xinniu.zip_table_bk stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
1 = 0 ;

– 将拉链表历史开链数据插入到bk备份表中
– 卡拉链条件:startTime < to_date(from_unixtime(unix_timestamp(‘

       h 
      
     
       i 
      
     
       v 
      
     
       e 
      
     
       c 
      
     
       o 
      
     
       n 
      
     
       f 
      
     
       : 
      
     
       b 
      
     
       a 
      
     
       t 
      
     
       c 
      
      
      
        h 
       
      
        d 
       
      
     
       a 
      
     
       t 
      
     
       e 
      
     
    
      ′ 
     
    
    
    
      , 
     
    
      ′ 
     
    
   
     y 
    
   
     y 
    
   
     y 
    
   
     y 
    
   
     M 
    
   
     M 
    
   
     d 
    
    
    
      d 
     
    
      ′ 
     
    
   
     ) 
    
   
     ) 
    
   
     ) 
    
   
     A 
    
   
     N 
    
   
     D 
    
   
     e 
    
   
     n 
    
   
     d 
    
   
     T 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     > 
    
   
     = 
    
   
     t 
    
    
    
      o 
     
    
      d 
     
    
   
     a 
    
   
     t 
    
   
     e 
    
   
     ( 
    
   
     f 
    
   
     r 
    
   
     o 
    
    
    
      m 
     
    
      u 
     
    
   
     n 
    
   
     i 
    
   
     x 
    
   
     t 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     ( 
    
   
     u 
    
   
     n 
    
   
     i 
    
    
    
      x 
     
    
      t 
     
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     s 
    
   
     t 
    
   
     a 
    
   
     m 
    
   
     p 
    
    
    
      ( 
     
    
      ′ 
     
    
   
  
    {hiveconf:batch_date}' ,'yyyyMMdd'))) AND endTime >= to_date(from_unixtime(unix_timestamp(' 
   
  
hiveconf:batchd​ate′,′yyyyMMdd′)))ANDendTime>=tod​ate(fromu​nixtime(unixt​imestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))

– 备份表非空判断:(select count(1) from xinniu.zip_table_bk limit 1) = 0 判断备份表非空才插入 此处必须判空 不能使用drop或者truncate的方式清空备份表 会导致失败重跑时丢数
INSERT
INTO
xinniu.zip_table_bk
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
join (select count(1) cnt from xinniu.zip_table_bk limit 1) b
WHERE
startTime < to_date(from_unixtime(unix_timestamp(‘

       h 
      
     
       i 
      
     
       v 
      
     
       e 
      
     
       c 
      
     
       o 
      
     
       n 
      
     
       f 
      
     
       : 
      
     
       b 
      
     
       a 
      
     
       t 
      
     
       c 
      
      
      
        h 
       
      
        d 
       
      
     
       a 
      
     
       t 
      
     
       e 
      
     
    
      ′ 
     
    
    
    
      , 
     
    
      ′ 
     
    
   
     y 
    
   
     y 
    
   
     y 
    
   
     y 
    
   
     M 
    
   
     M 
    
   
     d 
    
    
    
      d 
     
    
      ′ 
     
    
   
     ) 
    
   
     ) 
    
   
     ) 
    
   
     A 
    
   
     N 
    
   
     D 
    
   
     e 
    
   
     n 
    
   
     d 
    
   
     T 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     > 
    
   
     = 
    
   
     t 
    
    
    
      o 
     
    
      d 
     
    
   
     a 
    
   
     t 
    
   
     e 
    
   
     ( 
    
   
     f 
    
   
     r 
    
   
     o 
    
    
    
      m 
     
    
      u 
     
    
   
     n 
    
   
     i 
    
   
     x 
    
   
     t 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     ( 
    
   
     u 
    
   
     n 
    
   
     i 
    
    
    
      x 
     
    
      t 
     
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     s 
    
   
     t 
    
   
     a 
    
   
     m 
    
   
     p 
    
    
    
      ( 
     
    
      ′ 
     
    
   
  
    {hiveconf:batch_date}' ,'yyyyMMdd'))) AND endTime >= to_date(from_unixtime(unix_timestamp(' 
   
  
hiveconf:batchd​ate′,′yyyyMMdd′)))ANDendTime>=tod​ate(fromu​nixtime(unixt​imestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))

AND b.cnt = 0
;

– 创建拉链表闭链数据备份表bf
CREATE TABLE IF NOT EXISTS xinniu.zip_table_bf stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
1 = 0 ;
– 备份拉链表中历史闭链数据 卡拉链条件:endTime < to_date(to_timestamp(‘${hiveconf:batch_date}’ , ‘yyyyMMdd’))

– 备份表非空判断:(select count(1) from xinniu.zip_table_bf limit 1) = 0 同上,不能使用drop或者truncate的方式清空备份表
INSERT
INTO
xinniu.zip_table_bf
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
join
(
SELECT
count(1) cnt
FROM
xinniu.zip_table_bf
LIMIT 1) b
WHERE
endTime < to_date(from_unixtime(unix_timestamp(‘${hiveconf:batch_date}’ ,‘yyyyMMdd’)))
AND b.cnt = 0 ;

– 中间加工表清空
DROP TABLE IF EXISTS xinniu.zip_table_nw;
DROP TABLE IF EXISTS xinniu.zip_table_od;

– 创建中间表 新增变化修改中间表
CREATE TABLE IF NOT EXISTS xinniu.zip_table_nw stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
0 = 1;

– 创建中间表 未变化中间表
CREATE TABLE IF NOT EXISTS xinniu.zip_table_od stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
0 = 1;

– 新增、修改、删除变化数据插入变化中间表nw
– 原始数据表与拉链表进行full join关联,通过主键is_pk关联 根据不同情况生成对应的startTime与endTime及mark三个技术字段
– 字段值选择原始表与目标表的非空字段值 nvl(n.@{source_column_names}, o.@{xinniu.zip_table_column_names})
– 本逻辑中mark分为I、D两种,新增与修改为I,删除为D
INSERT
INTO
TABLE xinniu.zip_table_nw
SELECT
nvl(n.userID,o.userID) ,
nvl(n.loginName,o.loginName) ,
nvl(n.regionDate,o.regionDate) ,
nvl(n.phoneNum,o.phoneNum) ,
nvl(n.birthday,o.birthday) ,
nvl(n.status,o.status) ,
nvl(n.lastLoginDate,o.lastLoginDate) ,
nvl(n.dataTime,o.dataTime) ,
current_date AS etlTimestamp ,
CASE
WHEN n.dataTime IS NULL THEN o.startTime
ELSE to_date(from_unixtime(unix_timestamp(‘

       h 
      
     
       i 
      
     
       v 
      
     
       e 
      
     
       c 
      
     
       o 
      
     
       n 
      
     
       f 
      
     
       : 
      
     
       b 
      
     
       a 
      
     
       t 
      
     
       c 
      
      
      
        h 
       
      
        d 
       
      
     
       a 
      
     
       t 
      
     
       e 
      
     
    
      ′ 
     
    
    
    
      , 
     
    
      ′ 
     
    
   
     y 
    
   
     y 
    
   
     y 
    
   
     y 
    
   
     M 
    
   
     M 
    
   
     d 
    
    
    
      d 
     
    
      ′ 
     
    
   
     ) 
    
   
     ) 
    
   
     ) 
    
   
     E 
    
   
     N 
    
   
     D 
    
   
     A 
    
   
     S 
    
   
     s 
    
   
     t 
    
   
     a 
    
   
     r 
    
   
     t 
    
   
     T 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     , 
    
   
     C 
    
   
     A 
    
   
     S 
    
   
     E 
    
   
     W 
    
   
     H 
    
   
     E 
    
   
     N 
    
   
     n 
    
   
     . 
    
   
     d 
    
   
     a 
    
   
     t 
    
   
     a 
    
   
     T 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     I 
    
   
     S 
    
   
     N 
    
   
     U 
    
   
     L 
    
   
     L 
    
   
     T 
    
   
     H 
    
   
     E 
    
   
     N 
    
   
     t 
    
    
    
      o 
     
    
      d 
     
    
   
     a 
    
   
     t 
    
   
     e 
    
   
     ( 
    
   
     f 
    
   
     r 
    
   
     o 
    
    
    
      m 
     
    
      u 
     
    
   
     n 
    
   
     i 
    
   
     x 
    
   
     t 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     ( 
    
   
     u 
    
   
     n 
    
   
     i 
    
    
    
      x 
     
    
      t 
     
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     s 
    
   
     t 
    
   
     a 
    
   
     m 
    
   
     p 
    
    
    
      ( 
     
    
      ′ 
     
    
   
  
    {hiveconf:batch_date}' ,'yyyyMMdd'))) END AS startTime , CASE WHEN n.dataTime IS NULL THEN to_date(from_unixtime(unix_timestamp(' 
   
  
hiveconf:batchd​ate′,′yyyyMMdd′)))ENDASstartTime,CASEWHENn.dataTimeISNULLTHENtod​ate(fromu​nixtime(unixt​imestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))

ELSE to_date(from_unixtime(unix_timestamp(‘29991231’ ,‘yyyyMMdd’)))
END AS endTime ,
CASE
WHEN ( n.userID is null ) THEN ‘D’
ELSE ‘I’
END AS mark
FROM
(
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime
FROM
xinniu.source_table
WHERE
dataTime = ‘${hiveconf:batch_date}’ ) n
FULL JOIN xinniu.zip_table_bk o ON
o.userID = n.userID
WHERE
(
o.userID IS NULL )
OR (
n.userID IS NULL )
OR (
nvl( CAST(o.phoneNum AS string) , ‘’ ) <> nvl( CAST(n.phoneNum AS string) , ‘’ )
OR nvl( CAST(o.status AS string) , ‘’ ) <> nvl( CAST(n.status AS string) , ‘’ )
)
;

– 闭链发生变化的数据 endTime改为hiveconf:batch_date
– 未变化数据保持原来状态 新增与修改状态统一"I"
– 发生变化的endTime逻辑:when n.startTime is not null then to_date(to_timestamp(‘

       h 
      
     
       i 
      
     
       v 
      
     
       e 
      
     
       c 
      
     
       o 
      
     
       n 
      
     
       f 
      
     
       : 
      
     
       b 
      
     
       a 
      
     
       t 
      
     
       c 
      
      
      
        h 
       
      
        d 
       
      
     
       a 
      
     
       t 
      
     
       e 
      
     
    
      ′ 
     
    
    
    
      , 
     
    
      ′ 
     
    
   
     y 
    
   
     y 
    
   
     y 
    
   
     y 
    
   
     M 
    
   
     M 
    
   
     d 
    
    
    
      d 
     
    
      ′ 
     
    
   
     ) 
    
   
     ) 
    
   
     − 
    
   
     − 
    
   
     没发生变化的 
    
   
     e 
    
   
     n 
    
   
     d 
    
   
     T 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     逻辑: 
    
   
     w 
    
   
     h 
    
   
     e 
    
   
     n 
    
   
     o 
    
   
     . 
    
   
     e 
    
   
     n 
    
   
     d 
    
   
     T 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     > 
    
   
     = 
    
   
     t 
    
    
    
      o 
     
    
      d 
     
    
   
     a 
    
   
     t 
    
   
     e 
    
   
     ( 
    
   
     t 
    
    
    
      o 
     
    
      t 
     
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     s 
    
   
     t 
    
   
     a 
    
   
     m 
    
   
     p 
    
    
    
      ( 
     
    
      ′ 
     
    
   
  
    {hiveconf:batch_date}' , 'yyyyMMdd')) -- 没发生变化的endTime逻辑:when o.endTime >= to_date(to_timestamp(' 
   
  
hiveconf:batchd​ate′,′yyyyMMdd′))−−没发生变化的endTime逻辑:wheno.endTime>=tod​ate(tot​imestamp(′{hiveconf:batch_date}’ , ‘yyyyMMdd’)) then to_date(to_timestamp(‘29991231’,‘yyyyMMdd’))

INSERT
INTO
TABLE xinniu.zip_table_od
SELECT
o.userID,
o.loginName,
o.regionDate,
o.phoneNum,
o.birthday,
o.status,
o.lastLoginDate,
o.dataTime,
o.etlTimestamp ,
o.startTime ,
CASE
WHEN n.startTime IS NOT NULL THEN to_date(from_unixtime(unix_timestamp(‘

       h 
      
     
       i 
      
     
       v 
      
     
       e 
      
     
       c 
      
     
       o 
      
     
       n 
      
     
       f 
      
     
       : 
      
     
       b 
      
     
       a 
      
     
       t 
      
     
       c 
      
      
      
        h 
       
      
        d 
       
      
     
       a 
      
     
       t 
      
     
       e 
      
     
    
      ′ 
     
    
    
    
      , 
     
    
      ′ 
     
    
   
     y 
    
   
     y 
    
   
     y 
    
   
     y 
    
   
     M 
    
   
     M 
    
   
     d 
    
    
    
      d 
     
    
      ′ 
     
    
   
     ) 
    
   
     ) 
    
   
     ) 
    
   
     W 
    
   
     H 
    
   
     E 
    
   
     N 
    
   
     o 
    
   
     . 
    
   
     e 
    
   
     n 
    
   
     d 
    
   
     T 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     > 
    
   
     = 
    
   
     t 
    
    
    
      o 
     
    
      d 
     
    
   
     a 
    
   
     t 
    
   
     e 
    
   
     ( 
    
   
     f 
    
   
     r 
    
   
     o 
    
    
    
      m 
     
    
      u 
     
    
   
     n 
    
   
     i 
    
   
     x 
    
   
     t 
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     ( 
    
   
     u 
    
   
     n 
    
   
     i 
    
    
    
      x 
     
    
      t 
     
    
   
     i 
    
   
     m 
    
   
     e 
    
   
     s 
    
   
     t 
    
   
     a 
    
   
     m 
    
   
     p 
    
    
    
      ( 
     
    
      ′ 
     
    
   
  
    {hiveconf:batch_date}' ,'yyyyMMdd'))) WHEN o.endTime >= to_date(from_unixtime(unix_timestamp(' 
   
  
hiveconf:batchd​ate′,′yyyyMMdd′)))WHENo.endTime>=tod​ate(fromu​nixtime(unixt​imestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))

THEN to_date(from_unixtime(unix_timestamp(‘29991231’ ,‘yyyyMMdd’)))
ELSE o.endTime
END AS endTime ,
‘I’ AS mark
FROM
xinniu.zip_table_bk o
LEFT JOIN xinniu.zip_table_nw n ON
o.userID = n.userID
WHERE
nvl(n.endTime,to_date(from_unixtime(unix_timestamp(‘29991231’ ,‘yyyyMMdd’)))) <> to_date(from_unixtime(unix_timestamp(‘${hiveconf:batch_date}’ ,‘yyyyMMdd’)))
;

– 清空拉链表
TRUNCATE TABLE xinniu.zip_table;

– 插入数据到拉链表
INSERT
INTO
TABLE xinniu.zip_table
SELECT
*
FROM
xinniu.zip_table_nw
UNION ALL
SELECT
*
FROM
xinniu.zip_table_od
UNION ALL
SELECT
*
FROM
xinniu.zip_table_bf ;

– 清空临时表
DROP TABLE xinniu.zip_table_bk;

DROP TABLE xinniu.zip_table_bf;

DROP TABLE xinniu.zip_table_nw;

DROP TABLE xinniu.zip_table_od;

1.2.5 执行跑批任务

hive -hiveconf batch_date=20210101 -f /tmp/xinniu/sqlfile && hive -hiveconf batch_date=20210102 -f /tmp/xinniu/sqlfile && hive -hiveconf batch_date=20210103 -f /tmp/xinniu/sqlfile

1.3 效果验证
1.3.1 全表验证

select * from xinniu.zip_table;

1.3.2 卡拉链,查询历史某一节点数据

select * from xinniu.zip_table where startTime<=‘2021-01-01’ and endTime>‘2021-01-01’;

select * from xinniu.zip_table where startTime<=‘2021-01-02’ and endTime>‘2021-01-02’;


本文转载自: https://blog.csdn.net/qq_22201881/article/details/140838716
版权归原作者 数字天下 所有, 如有侵权,请联系我们删除。

“数仓基础知识_拉链表的详细讲解”的评论:

还没有评论