0


二百六十八、Kettle——同步ClickHouse清洗数据到Hive的DWD层静态分区表中(每天一次)

一、目的

实时数仓用的是ClickHouse,为了避免Hive还要清洗数据,因此就直接把ClickHouse中清洗数据同步到Hive中就行

二、所需工具

ClickHouse:clickhouse-client-21.9.5.16

Kettle:kettle9.2

Hadoop:hadoop-3.1.3

Hive:hive-3.1.2

海豚调度器:dolphinscheduler-2.0.5

三、技术路径

由于Hive中DWD层是静态分区表,而无法在Kettle中动态指定分区日期

因此只能每日执行kettle任务,从ClickHouse同步到HDFS中,然后到Hive清洗表的每日分区下

四、表结构

4.1 clickhouse

create  table  if not exists  hurys_jw.dwd_statistics(
    id                  String                          comment '唯一ID',
    device_no           String                          comment '设备编号',
    source_device_type  Nullable(String)                comment '设备类型',
    sn                  Nullable(String)                comment '设备序列号 ',
    model               Nullable(String)                comment '设备型号',
    create_time         DateTime                        comment '创建时间',
    cycle               Nullable(Int32)                 comment '统计数据周期' ,
    lane_no             Nullable(Int32)                 comment '车道编号',
    lane_type           Nullable(Int32)                 comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',
    section_no          Nullable(Int32)                 comment '断面编号',
    coil_no             Nullable(Int32)                 comment '线圈编号',
    volume_sum          Nullable(Int32)                 comment '不区分车型机动车总流量',
    volume_person       Nullable(Int32)                 comment '行人流量',
    volume_car_non      Nullable(Int32)                 comment '非机动车流量',
    volume_car_small    Nullable(Int32)                 comment '小车流量',
    volume_car_middle   Nullable(Int32)                 comment '中车流量',
    volume_car_big      Nullable(Int32)                 comment '大车流量',
    speed_avg           Nullable(Decimal(10, 2))        comment '平均速度(km/h)',
    speed_85            Nullable(Decimal(10, 2))        comment '85位速度(km/h)',
    time_occupancy      Nullable(Decimal(10, 2))        comment '时间占有率(%)',
    average_headway     Nullable(Decimal(10, 2))        comment '平均车头时距(s)',
    average_gap         Nullable(Decimal(10, 2))        comment '平均车间时距(s)',
    day                 Date                            comment '日期'
)
ENGINE = MergeTree
PARTITION BY day
PRIMARY KEY (day,id)
ORDER BY (day,id)
TTL day + toIntervalDay(7)
SETTINGS index_granularity = 8192;

4.2 hive

create external table  if not exists  hurys_db.dwd_statistics(
    id                  string              comment '唯一ID',
    device_no           string              comment '设备编号',
    source_device_type  string              comment '设备类型',
    sn                  string              comment '设备序列号 ',
    model               string              comment '设备型号',
    create_time         string              comment '创建时间',
    cycle               int                 comment '统计数据周期' ,
    lane_no             int                 comment '车道编号',
    lane_type           int                 comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',
    section_no          int                 comment '断面编号',
    coil_no             int                 comment '线圈编号',
    volume_sum          int                 comment '不区分车型机动车总流量',
    volume_person       int                 comment '行人流量',
    volume_car_non      int                 comment '非机动车流量',
    volume_car_small    int                 comment '小车流量',
    volume_car_middle   int                 comment '中车流量',
    volume_car_big      int                 comment '大车流量',
    speed_avg           decimal(10,2)       comment '平均速度(km/h)',
    speed_85            decimal(10,2)       comment '85位速度(km/h)',
    time_occupancy      decimal(10,2)       comment '时间占有率(%)',
    average_headway     decimal(10,2)       comment '平均车头时距(s)',
    average_gap         decimal(10,2)       comment '平均车间时距(s)'
)
comment '统计数据外部表——静态分区'
partitioned by (day string)
row format delimited fields terminated by ','
tblproperties("skip.header.line.count"="1")
;

五、实施步骤

5.1 Kettle任务(clickhouse到hdfs)

5.1.1 获取系统信息

5.1.2 字段选择1

5.1.3 自动获取当前日期1

//Script here

var currentDate = date; // 这里 date 应该是从输入流中获取的 Date 对象

// 计算前一天的日期
var previousDate = new Date(currentDate.getTime() - 246060*1000);

5.1.4 字段选择2

5.1.5 clickhouse输入

注意:day字段类型转换

5.1.6 字段选择3

5.1.7 Hadoop file output

5.1.8 运行kettle任务

5.1.9 HDFS文件

5.2 海豚任务(从HDFS到Hive表分区中)

5.2.1 配置海豚任务

#! /bin/bash
source /etc/profile

nowdate=date --date='0 days ago' "+%Y%m%d"
yesdate=date -d yesterday +%Y-%m-%d

hadoop fs -test -e /user/hive/warehouse/hurys_db.db/dwd_statistics/day=$yesdate
if [ $? -ne 0 ]; then
echo "文件不存在"
else
hdfs dfs -rm -r /user/hive/warehouse/hurys_db.db/dwd_statistics/day=$yesdate
fi

/usr/local/hurys/dc_env/kettle/data-integration/pan.sh -rep=hurys_linux_kettle_repository -user=admin -pass=admin -dir=/clickhouse_to_hive/ -trans=01_ClickHouse_to_Hive_dwd_statistics

hdfs dfs -mkdir -p /user/hive/warehouse/hurys_db.db/dwd_statistics/day=$yesdate

hdfs dfs -mv /user/hive/warehouse/hurys_db.db/dwd_statistics/statistics.gz /user/hive/warehouse/hurys_db.db/dwd_statistics/day=$yesdate/statistics.gz

5.2.2 执行海豚任务

5.2.3 Hive分区表

5.2.4 刷新表分区,查看分区数据

--刷新表分区
msck repair table hurys_db.dwd_statistics;
--查看表分区
show partitions hurys_db.dwd_statistics;
--查看表数据
select * from hurys_db.dwd_statistics
where day = '2024-10-16';

搞定!

标签: clickhouse hive kettle

本文转载自: https://blog.csdn.net/tiantang2renjian/article/details/143015930
版权归原作者 天地风雷水火山泽 所有, 如有侵权,请联系我们删除。

“二百六十八、Kettle——同步ClickHouse清洗数据到Hive的DWD层静态分区表中(每天一次)”的评论:

还没有评论