拉链表
没错,就像衣服的拉链一样重要,实用性非常强,使用频率非常高。
拉链表核心思想,像个拉链,支持开链,支持闭链,支持退链,我们通常将最新的数据称为开链数据,历史数据称为闭链数据,拉链表支持历史数据查询,且空间占用较小,但是数据加工处理较为繁琐,属于时间换空间的设计方式,拉链表一个时间维度中同一个用户只保存一条用户状态。拉链表通常会增加三个技术字段“开始日期starttime、结束日期endtime、状态标识mark”。通过主键(PK)与历史数据进行对比,判断当前数据与历史数据是否发生变化,如果发生变化或者新增则进行相应的开链、闭链操作。
1.1 用户基础表拉链示例
以下使用用户基本信息表对拉链表操作进行深入刨析
2021-01-01用户基础表原始数据如下:
userID为主键,可变化字段为登录名(userName)、电话号码(phoneNum)、账号状态(status)、最近登录日期(lastLoginDate)。
setp1:首先设计拉链表的主键,根据原始数据表的表结构,选择userID作为拉链表PK键;
setp2:选择phoneNum、status作为notPK键,notPK作为对比字段,这里去除了lastLoginDate与dataTime这两个经常变化的字段,如果需要跟踪登陆日期lastLoginDate数据,请使用事件表,notPK键选择原则需要同时满足如下条件:
会发生变化的字段,且满足缓慢变化维SCD;
不能选择每天都发生变化的无对比意义字段(如dataTime字段,对于原始数据表这个字段是每天都变化的,所以没有对比意义);
setp3:确定数仓技术字段,本次加入数据加工日期etlTimestamp、开始时间startTime、结束时间endTime、标识位mark("i":新增,"u":修改,"d":删除)四个技术字段;
数据采集方式为T+1方式(今天计算昨天的数据)。
2021-01-02用户拉链表如下:
第一次加载因为拉链表历史数据为空,所以所有数据都为新增数据,标识位标为新增状态"i",开始时间为数据日期2021/1/1,当前所有数据都为最新数据,多以结束时间设置为一个较大的时间2999/12/31作为开链时间标识日期。
2021-01-02用户基础表原始数据如下:
使用原始数据表主键userID关联拉链表中开链数据(where endTime=2999/12/31)的userID,对比notPK字段是否相同,我们选择的notPK字段为phoneNum和status,发生变化的数据:
coolniu2021a0001用户电话号码发生变化;
coolniu2021a0005用户电话号码发生变化;
coolniu2021a0007用户状态发生变化;
新增coolniu2021a0008、coolniu2021a0009、coolniu2021a0010三位用户;
2021-01-03用户拉链表如下:
coolniu2021a0001用户拉链分析
coolniu2021a0001电话号码发生变化,在拉链表中将结束日期修改为变化时间的数据日期2021/1/2
代表coolniu2021a0001用户上一个状态结束。
又新增了一行coolniu2021a0001的数据
针对这条数据新的状态开始时间startTime为变化日期的数据日期2021/1/2,结束时间标识为2999/12/31(代表最新开链状态),标识位"u"(代表这条数据是修改状态)。
coolniu2021a0005用户拉链分析
coolniu2021a0005与coolniu2021a0001用户一样,都是电话号码发生了变化,拉链处理方式与coolniu2021a0001一致。
coolniu2021a0007用户拉链分析
coolniu2021a0007用户状态由1变成了销户状态"2",拉链表处理:
coolniu2021a0007账号状态为1的数据,结束时间修改为数据发生变化的时间2021/1/2
新增一条coolniu2021a0007记录,标识位mark标识为"d",开始时间为数据变化时间2021/1/2,结束时间修改为2021/1/2,表示该条数据已经闭链。
新增用户拉链分析
coolniu2021a0008、coolniu2021a0009、coolniu2021a0010三个用户为2021-01-02日新增用户,开始时间为2021/1/2,结束时间为2999/12/31(标识该数据为最新开链状态),mark位标为新增标识"i"。
未变化用户拉链分析
用户coolniu2021a0002、coolniu2021a0003、coolniu2021a0004、coolniu2021a0006用户notPK数据未变化,所以保持不变。
2021-01-03用户基础表原始数据如下:
1月3日新增了一条coolniu2021a0011数据。
2021-01-04用户拉链表如下:
因为其他天没有发生变化,所以在拉链表中保持不变,只新增一条coolniu2021a0011的记录。
1.2 拉链表算法示例
1.2.1 建表
source_table
CREATE TABLE
source_table
(
userid
string,
loginname
string,
regiondate
string,
phonenum
string,
birthday
string,
status
string,
lastlogindate
string)
PARTITIONED BY (datatime string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’
STORED AS TEXTFILE;
1.2.2 load数据
load data local INPATH ‘/tmp/xinniu/20210101’ into table source_table partition (datatime=‘20210101’);
load data local INPATH ‘/tmp/xinniu/20210102’ into table source_table partition (datatime=‘20210102’);
load data local INPATH ‘/tmp/xinniu/20210103’ into table source_table partition (datatime=‘20210103’);
1.2.3 创建拉链表
CREATE TABLE
zip_table
(
userid
string,
loginname
string,
regiondate
string,
phonenum
string,
birthday
string,
status
string,
lastlogindate
string,
datatime
string,
etltimestamp
string,
starttime
string,
endtime
string,
mark
string)
1.2.4 创建算法文件sqlfile
vim /tmp/xinniu/sqlfile
填写如下内容
– 创建一张拉链表的备份表 备份拉链表历史开链数据
CREATE TABLE IF NOT EXISTS xinniu.zip_table_bk stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
1 = 0 ;
– 将拉链表历史开链数据插入到bk备份表中
– 卡拉链条件:startTime < to_date(from_unixtime(unix_timestamp(‘
h
i
v
e
c
o
n
f
:
b
a
t
c
h
d
a
t
e
′
,
′
y
y
y
y
M
M
d
d
′
)
)
)
A
N
D
e
n
d
T
i
m
e
>
=
t
o
d
a
t
e
(
f
r
o
m
u
n
i
x
t
i
m
e
(
u
n
i
x
t
i
m
e
s
t
a
m
p
(
′
{hiveconf:batch_date}' ,'yyyyMMdd'))) AND endTime >= to_date(from_unixtime(unix_timestamp('
hiveconf:batchdate′,′yyyyMMdd′)))ANDendTime>=todate(fromunixtime(unixtimestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))
– 备份表非空判断:(select count(1) from xinniu.zip_table_bk limit 1) = 0 判断备份表非空才插入 此处必须判空 不能使用drop或者truncate的方式清空备份表 会导致失败重跑时丢数
INSERT
INTO
xinniu.zip_table_bk
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
join (select count(1) cnt from xinniu.zip_table_bk limit 1) b
WHERE
startTime < to_date(from_unixtime(unix_timestamp(‘
h
i
v
e
c
o
n
f
:
b
a
t
c
h
d
a
t
e
′
,
′
y
y
y
y
M
M
d
d
′
)
)
)
A
N
D
e
n
d
T
i
m
e
>
=
t
o
d
a
t
e
(
f
r
o
m
u
n
i
x
t
i
m
e
(
u
n
i
x
t
i
m
e
s
t
a
m
p
(
′
{hiveconf:batch_date}' ,'yyyyMMdd'))) AND endTime >= to_date(from_unixtime(unix_timestamp('
hiveconf:batchdate′,′yyyyMMdd′)))ANDendTime>=todate(fromunixtime(unixtimestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))
AND b.cnt = 0
;
– 创建拉链表闭链数据备份表bf
CREATE TABLE IF NOT EXISTS xinniu.zip_table_bf stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
1 = 0 ;
– 备份拉链表中历史闭链数据 卡拉链条件:endTime < to_date(to_timestamp(‘${hiveconf:batch_date}’ , ‘yyyyMMdd’))
– 备份表非空判断:(select count(1) from xinniu.zip_table_bf limit 1) = 0 同上,不能使用drop或者truncate的方式清空备份表
INSERT
INTO
xinniu.zip_table_bf
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
join
(
SELECT
count(1) cnt
FROM
xinniu.zip_table_bf
LIMIT 1) b
WHERE
endTime < to_date(from_unixtime(unix_timestamp(‘${hiveconf:batch_date}’ ,‘yyyyMMdd’)))
AND b.cnt = 0 ;
– 中间加工表清空
DROP TABLE IF EXISTS xinniu.zip_table_nw;
DROP TABLE IF EXISTS xinniu.zip_table_od;
– 创建中间表 新增变化修改中间表
CREATE TABLE IF NOT EXISTS xinniu.zip_table_nw stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
0 = 1;
– 创建中间表 未变化中间表
CREATE TABLE IF NOT EXISTS xinniu.zip_table_od stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
0 = 1;
– 新增、修改、删除变化数据插入变化中间表nw
– 原始数据表与拉链表进行full join关联,通过主键is_pk关联 根据不同情况生成对应的startTime与endTime及mark三个技术字段
– 字段值选择原始表与目标表的非空字段值 nvl(n.@{source_column_names}, o.@{xinniu.zip_table_column_names})
– 本逻辑中mark分为I、D两种,新增与修改为I,删除为D
INSERT
INTO
TABLE xinniu.zip_table_nw
SELECT
nvl(n.userID,o.userID) ,
nvl(n.loginName,o.loginName) ,
nvl(n.regionDate,o.regionDate) ,
nvl(n.phoneNum,o.phoneNum) ,
nvl(n.birthday,o.birthday) ,
nvl(n.status,o.status) ,
nvl(n.lastLoginDate,o.lastLoginDate) ,
nvl(n.dataTime,o.dataTime) ,
current_date AS etlTimestamp ,
CASE
WHEN n.dataTime IS NULL THEN o.startTime
ELSE to_date(from_unixtime(unix_timestamp(‘
h
i
v
e
c
o
n
f
:
b
a
t
c
h
d
a
t
e
′
,
′
y
y
y
y
M
M
d
d
′
)
)
)
E
N
D
A
S
s
t
a
r
t
T
i
m
e
,
C
A
S
E
W
H
E
N
n
.
d
a
t
a
T
i
m
e
I
S
N
U
L
L
T
H
E
N
t
o
d
a
t
e
(
f
r
o
m
u
n
i
x
t
i
m
e
(
u
n
i
x
t
i
m
e
s
t
a
m
p
(
′
{hiveconf:batch_date}' ,'yyyyMMdd'))) END AS startTime , CASE WHEN n.dataTime IS NULL THEN to_date(from_unixtime(unix_timestamp('
hiveconf:batchdate′,′yyyyMMdd′)))ENDASstartTime,CASEWHENn.dataTimeISNULLTHENtodate(fromunixtime(unixtimestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))
ELSE to_date(from_unixtime(unix_timestamp(‘29991231’ ,‘yyyyMMdd’)))
END AS endTime ,
CASE
WHEN ( n.userID is null ) THEN ‘D’
ELSE ‘I’
END AS mark
FROM
(
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime
FROM
xinniu.source_table
WHERE
dataTime = ‘${hiveconf:batch_date}’ ) n
FULL JOIN xinniu.zip_table_bk o ON
o.userID = n.userID
WHERE
(
o.userID IS NULL )
OR (
n.userID IS NULL )
OR (
nvl( CAST(o.phoneNum AS string) , ‘’ ) <> nvl( CAST(n.phoneNum AS string) , ‘’ )
OR nvl( CAST(o.status AS string) , ‘’ ) <> nvl( CAST(n.status AS string) , ‘’ )
)
;
– 闭链发生变化的数据 endTime改为hiveconf:batch_date
– 未变化数据保持原来状态 新增与修改状态统一"I"
– 发生变化的endTime逻辑:when n.startTime is not null then to_date(to_timestamp(‘
h
i
v
e
c
o
n
f
:
b
a
t
c
h
d
a
t
e
′
,
′
y
y
y
y
M
M
d
d
′
)
)
−
−
没发生变化的
e
n
d
T
i
m
e
逻辑:
w
h
e
n
o
.
e
n
d
T
i
m
e
>
=
t
o
d
a
t
e
(
t
o
t
i
m
e
s
t
a
m
p
(
′
{hiveconf:batch_date}' , 'yyyyMMdd')) -- 没发生变化的endTime逻辑:when o.endTime >= to_date(to_timestamp('
hiveconf:batchdate′,′yyyyMMdd′))−−没发生变化的endTime逻辑:wheno.endTime>=todate(totimestamp(′{hiveconf:batch_date}’ , ‘yyyyMMdd’)) then to_date(to_timestamp(‘29991231’,‘yyyyMMdd’))
INSERT
INTO
TABLE xinniu.zip_table_od
SELECT
o.userID,
o.loginName,
o.regionDate,
o.phoneNum,
o.birthday,
o.status,
o.lastLoginDate,
o.dataTime,
o.etlTimestamp ,
o.startTime ,
CASE
WHEN n.startTime IS NOT NULL THEN to_date(from_unixtime(unix_timestamp(‘
h
i
v
e
c
o
n
f
:
b
a
t
c
h
d
a
t
e
′
,
′
y
y
y
y
M
M
d
d
′
)
)
)
W
H
E
N
o
.
e
n
d
T
i
m
e
>
=
t
o
d
a
t
e
(
f
r
o
m
u
n
i
x
t
i
m
e
(
u
n
i
x
t
i
m
e
s
t
a
m
p
(
′
{hiveconf:batch_date}' ,'yyyyMMdd'))) WHEN o.endTime >= to_date(from_unixtime(unix_timestamp('
hiveconf:batchdate′,′yyyyMMdd′)))WHENo.endTime>=todate(fromunixtime(unixtimestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))
THEN to_date(from_unixtime(unix_timestamp(‘29991231’ ,‘yyyyMMdd’)))
ELSE o.endTime
END AS endTime ,
‘I’ AS mark
FROM
xinniu.zip_table_bk o
LEFT JOIN xinniu.zip_table_nw n ON
o.userID = n.userID
WHERE
nvl(n.endTime,to_date(from_unixtime(unix_timestamp(‘29991231’ ,‘yyyyMMdd’)))) <> to_date(from_unixtime(unix_timestamp(‘${hiveconf:batch_date}’ ,‘yyyyMMdd’)))
;
– 清空拉链表
TRUNCATE TABLE xinniu.zip_table;
– 插入数据到拉链表
INSERT
INTO
TABLE xinniu.zip_table
SELECT
*
FROM
xinniu.zip_table_nw
UNION ALL
SELECT
*
FROM
xinniu.zip_table_od
UNION ALL
SELECT
*
FROM
xinniu.zip_table_bf ;
– 清空临时表
DROP TABLE xinniu.zip_table_bk;
DROP TABLE xinniu.zip_table_bf;
DROP TABLE xinniu.zip_table_nw;
DROP TABLE xinniu.zip_table_od;
1.2.5 执行跑批任务
hive -hiveconf batch_date=20210101 -f /tmp/xinniu/sqlfile && hive -hiveconf batch_date=20210102 -f /tmp/xinniu/sqlfile && hive -hiveconf batch_date=20210103 -f /tmp/xinniu/sqlfile
1.3 效果验证
1.3.1 全表验证
select * from xinniu.zip_table;
1.3.2 卡拉链,查询历史某一节点数据
select * from xinniu.zip_table where startTime<=‘2021-01-01’ and endTime>‘2021-01-01’;
select * from xinniu.zip_table where startTime<=‘2021-01-02’ and endTime>‘2021-01-02’;
版权归原作者 数字天下 所有, 如有侵权,请联系我们删除。