0


FlinkSQL+HDFS+Hive+SparkSQL实现业务数据增量进入数据仓库

0. 相关文章链接

开发随笔文章汇总

1. 为什么要实现将业务数据实时写入到数据仓库中

    在我们的数据仓库中,一般情况下都是通过Spoop和Datax等数据传输框架,将数据按天同步到数据仓库中。

    而且根据业务库的表的类型,可以选择全量同步(每天全量或者一次拉取,比如省份表等)、增量同步(按照更新时间字段拉取数据,并将数据和原表合并,比如订单表)、拉链表(按照更新时间拉取数据,并将数据和原表进行计算,比如用户表)等类型进行数据同步。

    但是有时当我们需要实时数据的时候(一般是指小于1小时,但是又没有达到秒级),这按天同步就不能满足需求了,而如果每个任务都使用Flink的话,又会照成资源浪费。

    这个时候就可以考虑将业务数据实时同步到数据仓库中,然后后端使用PerstoAPI等框架,每5分钟等计算一次,并将数据进行缓存,这样就可以进行准实时数据协助,并能有效节省资源。

2. 架构设计

  1. 在Hive表中会存在一张外部表,该表为分区表,存储每天对应业务表的历史数据(有2种来源,1种是每天凌晨通过Spoop从MySQL业务库中拉取全量数据,第2种是昨天的离线表和昨天的增量表合并生成的)
  2. 需要使用Cancal将MySQL中的binlog数据获取,并发送到Kafka对应的topic中
  3. 通过FlinkSQL将Kafka的数据获取出来,并直接将数据通过filesystem写入到HDFS中,通过外部表指定路径
  4. 每天凌晨 msck repair 增量外部表,使能获取当天的数据,并注意,可以删除历史数据,保留7天即可
  5. 创建视图,归并历史数据和增量数据

3. FlinkSQL将binlog写入到HDFS中

注意:博主使用的是华为云产品,所以使用的是华为云DLI中的FlinkSQL和OBS,可以类比开源的FlinkSQL和HDFS


-- kafka的 bigdata_mysql_binlog 主题中示例数据如下

-- 注意:其中的nanoTime是使用Java中的System.nanoTime()函数生成的,也就是说,如果程序失败,需要将分区数据删除,然后在flink作业中设置拉取时间重跑

-- 具体字段描述如下:
-- db:             在MySQL中对应的数据库
-- tb:             在MySQL中对应的表
-- columns:        一个JSONArray,表示所有的字段,如下示例,一个JSONArray中存在该表中所有字段,存储的json的key为字段名,在该json中又有value这个属性,表示该字段对应的值
-- event:          event对应操作类型和数值: insert = 0 ; delete = 1 ; update = 2 ; alter = 3 ;
-- sql:            触发该更改或删除等操作的sql语句
-- primary_key:    该更改的主键
-- create_time:    该数据从binlog中获取出来的时间(分区就用此时间)
-- sendTime:       该数据进入到kafka的时间(该时间不用)
-- nano_time:      纳秒级时间戳,用了区分数据的前后,使用此时间可以保证数据唯一

-- {
--     "columns": {
--         "admin_note": {
--             "key": false,
--             "mysqlType": "varchar(255)",
--             "name": "admin_note",
--             "null_val": false,
--             "update": false,
--             "value": ""
--         },
--         "allot_num": {
--             "key": false,
--             "mysqlType": "smallint(5) unsigned",
--             "name": "allot_num",
--             "null_val": false,
--             "update": false,
--             "value": "0"
--         }
--     },
--     "createTime": 1649213973044,
--     "db": "xxx",
--     "event": 2,
--     "nanoTime": 23494049498709146,
--     "primaryKey": "157128418",
--     "sendTime": 1649213973045,
--     "sql": "",
--     "tb": "fmys_order_infos"
-- }

-- source端,对接kafka,从kafka中获取数据
CREATE SOURCE STREAM bigdata_binlog_ingest_source_bigdata_mysql_binlog (
    db STRING,
    tb STRING,
    columns STRING,
    event INT,
    `sql` STRING,
    `primary_key` STRING,
    create_time bigint,
    send_time bigint,
    nano_time bigint
) WITH (
    type = "kafka",
    kafka_bootstrap_servers = "bigdata1:9092,bigdata2:9092,bigdata3:9092",
    kafka_group_id = "bigdata_mysql_binlog_dim_to_obs_test",
    kafka_topic = "bigdata_mysql_binlog_dim",
    encode = "json",
    json_config = "db=db;tb=tb;columns=columns;event=event;sql=sql;primary_key=primaryKey;create_time=createTime;send_time=sendTime;nano_time=nanoTime;"
);

-- sink端,对接obs,将数据写入到obs中
CREATE SINK STREAM bigdata_binlog_ingest_sink_bigdata_mysql_binlog (
    db STRING,
    tb STRING,
    columns STRING,
    event int,
    `sql` STRING,
    `primary_key` STRING,
    create_time bigint,
    nano_time bigint,
    dt STRING
) PARTITIONED BY(dt,tb) WITH (
    type = "filesystem",
    file.path = "obs://xxx-bigdata/xxx/xxx/ods_binlog_data",
    encode = "parquet",
    ak = "akxxx",
    sk = "skxxx"
);

-- 从source表获取数据,写出到sink表中
CREATE FUNCTION date_formatted AS 'com.xxx.cdc.udf.DateFormattedUDF';
INSERT INTO
    bigdata_binlog_ingest_sink_bigdata_mysql_binlog
SELECT
    db,
    tb,
    columns,
    event,
    `sql`,
    `primary_key`,
    create_time,
    nano_time,
    date_formatted(
        cast(coalesce(create_time, UNIX_TIMESTAMP_MS()) / 1000 as VARCHAR(10)),
        'yyyyMMdd'
  ) as dt
FROM
    bigdata_binlog_ingest_source_bigdata_mysql_binlog;

上述FlinkSQL主要通过source获取Kafka主题中的数据,然后什么都不要改变,将数据写入到HDFS中,写入的时候注意,以dt为一级分区,tb为二级分区,主要考虑如下几点:

  1. binlog增量数据总体来说数据量并不大,并是使用了分区,所以所有的数据写入到一张表中即可
  2. dt日期分区为一级分区,tb表名为二级分区,主要考虑的是删除历史数据容易删除

4. 创建增量外部表(binlog表)

外部表建表语句如下:

create external table xxx.ods_binlog_data_table(
    db STRING comment '数据库名',
    columns STRING comment '列数据',
    event int comment '操作类型',
    `sql` STRING comment '执行的SQL',
    `primary_key` STRING comment '主键的值',
    create_time bigint comment '获取binlog的13位时间戳',
    nano_time bigint comment '纳秒级更新时间戳'
) COMMENT 'ods层binlog表(所有通过binlog进入数仓的数据均在此表中)' 
PARTITIONED BY (dt STRING COMMENT '日期分区', tb string comment '表名') 
location 'obs://xxx-bigdata/xxx/xxx/ods_binlog_data'
;

建表完后查询结果如下:

select * from xxx.ods_binlog_data_table;

注意:

  1. 要msck该表,执行语句为: MSCK REPAIR TABLE xxx.ods_binlog_data_table;
  2. 在华为云中,需要设置 spark.sql.forcePartitionPredicatesOnPartitionedTable.enabled 为 false,主要是不带分区能查询数据

5. 创建全量历史表

注意:业务中一张表对应数仓中一张历史全量表, 在这里以商品表为例

CREATE EXTERNAL TABLE xxx.ods_fmys_goods_old(
    `goods_id`        BIGINT,
    `cat_id`          BIGINT,
    `special_id`      BIGINT,
    `goods_tag`       STRING,
    `brand_tag`       STRING,
    `custom_tag`      STRING,
    `goods_kh`        STRING,
    `goods_name`      STRING,
    `provider_name`   STRING,
    `enough_number`   BIGINT,
    `click_count`     BIGINT,
    `click_cr`        BIGINT,
    `goods_number`    BIGINT,
    `goods_weight`    BIGINT,
    `market_price`    DOUBLE,
    `shoppe_price`    DOUBLE,
    `shoppe_discount` BIGINT,
    `vip_discount`    BIGINT,
    `shop_price`      DOUBLE,
    `goods_brief`     STRING,
    `goods_desc`      STRING,
    `goods_thumb`     STRING,
    `goods_img`       STRING,
    `add_time`        BIGINT,
    `sort_order`      BIGINT,
    `is_delete`       BIGINT,
    `is_best`         BIGINT,
    `is_new`          BIGINT,
    `is_hot`          BIGINT,
    `last_update`     BIGINT,
    `seller_note`     STRING,
    `suppliers_id`    BIGINT,
    `is_on_sale`      BIGINT,
    `is_check`        BIGINT,
    `sales`           BIGINT,
    `real_sales`      BIGINT,
    `attribute`       STRING,
    `attribute_true`  STRING,
    `goods_middle`    STRING,
    `extra_price`     STRING,
    `goods_no`        BIGINT,
    `add_admin`       BIGINT,
    `picker_group`    BIGINT,
    `picker_group_code` STRING,
    `provider_id`     BIGINT,
    `brand_id`        BIGINT,
    `market_id`       BIGINT,
    `supplier_id`     BIGINT,
    `asking_status`   BIGINT,
    `goods_status`    BIGINT,
    `special_status`  BIGINT,
    `sales_of_7`      BIGINT,
    `real_sales_of_7` BIGINT,
    `is_show_stock`   BIGINT,
    `goods_model_id`  BIGINT,
    `fabric_tag`      STRING,
    `fabric_id`       BIGINT,
    `is_stockout`     BIGINT,
    `max_sales`       BIGINT,
    `is_replenish`    BIGINT,
    `origin`          BIGINT,
    `video_url`       STRING,
    `video_status`    BIGINT,
    `is_action`       BIGINT,
    `action_value`    BIGINT,
    `is_alone`        BIGINT,
    `alone_discount`  BIGINT,
    `alone_price`     DOUBLE,
    `update_admin`    BIGINT,
    `update_time`     BIGINT,
    `auto_time`       STRING,
    `alone_sort`      BIGINT
) COMMENT '商品表'
PARTITIONED BY (dt bigint COMMENT '日期分区(yyyymmdd)')
STORED AS orc LOCATION 'obs://xxx-bigdata/xxx/xxx/ods_table_old/ods_fmys_goods_old'
;

6. 创建Spoop任务同步商品表数据

具体创建任务这里就不详细描述了,使用的是华为云的CDM产品,主要注意如下几点:

  1. 在华为云的CDM中拉取该业务表全部数据即可,并且要注意要匹配分区,分区为运行时间的当日的日期(格式yyyymmdd)
  2. 将数据写入到数仓中使用的写入模式为 insert overwriter
  3. 因为是使用 insert overwriter 模式,所以并行度只能设置为1,会拉取的较慢,该问题可以忽略,因为正常情况只需要拉取一次即可
  4. 在调度时配置为凌晨0点30分调度,并且注意该作业中除了CDM还有一个脚本运行,后边会详细描述

7. 历史数据和增量数据合并

    该脚本为每天凌晨运行,直接从ods_fmys_goods_old表中获取昨天的数据,然后从ods_binlog_data_table表中也获取昨天并对应的业务表的数据

    然后根据主键开窗,按照nano_time降序排序,并取第一条,并且要过滤event为1的(event为1的数据不要,这是删除的数据)

    取出数据后将数据插入到ods_fmys_goods_old中今天的分区中(保留历史数据,降低误差,提高健壮性)

    注意:在ods_fmys_goods_old表中没有nano_time和event字段,需要设置nano_time为-987654321012345(这个值可以随意设置,只要够小就行,关于这样设置可以参考如下的nanoTime的含义),也需要设置event为0,表示这条数据为插入。

具体代码如下所示:


-- msck2张表(商品的old表 和 binlog表)
MSCK REPAIR TABLE xxx.ods_fmys_goods_old;
MSCK REPAIR TABLE xxx.ods_binlog_data_table;

-- 从ods_fmys_goods_old表中获取昨天的数据,然后从ods_binlog_data_table表中也获取昨天并对应的业务表的数据,然后进行关联计算
insert overwrite table xxx.ods_fmys_goods_old partition(dt)
select
    goods_id
    , cat_id
    , special_id
    , goods_tag
    , brand_tag
    , custom_tag
    , goods_kh
    , goods_name
    , provider_name
    , enough_number
    , click_count
    , click_CR
    , goods_number
    , goods_weight
    , market_price
    , shoppe_price
    , shoppe_discount
    , vip_discount
    , shop_price
    , goods_brief
    , goods_desc
    , goods_thumb
    , goods_img
    , add_time
    , sort_order
    , is_delete
    , is_best
    , is_new
    , is_hot
    , last_update
    , seller_note
    , suppliers_id
    , is_on_sale
    , is_check
    , sales
    , real_sales
    , attribute
    , attribute_true
    , goods_middle
    , extra_price
    , goods_no
    , add_admin
    , picker_group
    , picker_group_code
    , provider_id
    , brand_id
    , market_id
    , supplier_id
    , asking_status
    , goods_status
    , special_status
    , sales_of_7
    , real_sales_of_7
    , is_show_stock
    , goods_model_id
    , fabric_tag
    , fabric_id
    , is_stockout
    , max_sales
    , is_replenish
    , origin
    , video_url
    , video_status
    , is_action
    , action_value
    , is_alone
    , alone_discount
    , alone_price
    , update_admin
    , update_time
    , auto_time
    , alone_sort
    , substr(${bdp.system.cyctime}, 1, 8)
from (
    select
        goods_id
        , cat_id
        , special_id
        , goods_tag
        , brand_tag
        , custom_tag
        , goods_kh
        , goods_name
        , provider_name
        , enough_number
        , click_count
        , click_CR
        , goods_number
        , goods_weight
        , market_price
        , shoppe_price
        , shoppe_discount
        , vip_discount
        , shop_price
        , goods_brief
        , goods_desc
        , goods_thumb
        , goods_img
        , add_time
        , sort_order
        , is_delete
        , is_best
        , is_new
        , is_hot
        , last_update
        , seller_note
        , suppliers_id
        , is_on_sale
        , is_check
        , sales
        , real_sales
        , attribute
        , attribute_true
        , goods_middle
        , extra_price
        , goods_no
        , add_admin
        , picker_group
        , picker_group_code
        , provider_id
        , brand_id
        , market_id
        , supplier_id
        , asking_status
        , goods_status
        , special_status
        , sales_of_7
        , real_sales_of_7
        , is_show_stock
        , goods_model_id
        , fabric_tag
        , fabric_id
        , is_stockout
        , max_sales
        , is_replenish
        , origin
        , video_url
        , video_status
        , is_action
        , action_value
        , is_alone
        , alone_discount
        , alone_price
        , update_admin
        , update_time
        , auto_time
        , alone_sort
        , nano_time
        , event
        , row_number() over(partition by goods_id order by nano_time desc) as row_number
    from (
        select
            goods_id
            , cat_id
            , special_id
            , goods_tag
            , brand_tag
            , custom_tag
            , goods_kh
            , goods_name
            , provider_name
            , enough_number
            , click_count
            , click_CR
            , goods_number
            , goods_weight
            , market_price
            , shoppe_price
            , shoppe_discount
            , vip_discount
            , shop_price
            , goods_brief
            , goods_desc
            , goods_thumb
            , goods_img
            , add_time
            , sort_order
            , is_delete
            , is_best
            , is_new
            , is_hot
            , last_update
            , seller_note
            , suppliers_id
            , is_on_sale
            , is_check
            , sales
            , real_sales
            , attribute
            , attribute_true
            , goods_middle
            , extra_price
            , goods_no
            , add_admin
            , picker_group
            , picker_group_code
            , provider_id
            , brand_id
            , market_id
            , supplier_id
            , asking_status
            , goods_status
            , special_status
            , sales_of_7
            , real_sales_of_7
            , is_show_stock
            , goods_model_id
            , fabric_tag
            , fabric_id
            , is_stockout
            , max_sales
            , is_replenish
            , origin
            , video_url
            , video_status
            , is_action
            , action_value
            , is_alone
            , alone_discount
            , alone_price
            , update_admin
            , update_time
            , auto_time
            , alone_sort
            , -987654321012345 as nano_time
            , 0 as event
        from xxx.ods_fmys_goods_old
        where dt = ${bdp.system.bizdate}

        union all

        select
            get_json_object(columns ,'$.goods_id.value') as goods_id
            , get_json_object(columns ,'$.cat_id.value') as cat_id
            , get_json_object(columns ,'$.special_id.value') as special_id
            , get_json_object(columns ,'$.goods_tag.value') as goods_tag
            , get_json_object(columns ,'$.brand_tag.value') as brand_tag
            , get_json_object(columns ,'$.custom_tag.value') as custom_tag
            , get_json_object(columns ,'$.goods_kh.value') as goods_kh
            , get_json_object(columns ,'$.goods_name.value') as goods_name
            , get_json_object(columns ,'$.provider_name.value') as provider_name
            , get_json_object(columns ,'$.enough_number.value') as enough_number
            , get_json_object(columns ,'$.click_count.value') as click_count
            , get_json_object(columns ,'$.click_CR.value') as click_CR
            , get_json_object(columns ,'$.goods_number.value') as goods_number
            , get_json_object(columns ,'$.goods_weight.value') as goods_weight
            , get_json_object(columns ,'$.market_price.value') as market_price
            , get_json_object(columns ,'$.shoppe_price.value') as shoppe_price
            , get_json_object(columns ,'$.shoppe_discount.value') as shoppe_discount
            , get_json_object(columns ,'$.vip_discount.value') as vip_discount
            , get_json_object(columns ,'$.shop_price.value') as shop_price
            , get_json_object(columns ,'$.goods_brief.value') as goods_brief
            , get_json_object(columns ,'$.goods_desc.value') as goods_desc
            , get_json_object(columns ,'$.goods_thumb.value') as goods_thumb
            , get_json_object(columns ,'$.goods_img.value') as goods_img
            , get_json_object(columns ,'$.add_time.value') as add_time
            , get_json_object(columns ,'$.sort_order.value') as sort_order
            , get_json_object(columns ,'$.is_delete.value') as is_delete
            , get_json_object(columns ,'$.is_best.value') as is_best
            , get_json_object(columns ,'$.is_new.value') as is_new
            , get_json_object(columns ,'$.is_hot.value') as is_hot
            , get_json_object(columns ,'$.last_update.value') as last_update
            , get_json_object(columns ,'$.seller_note.value') as seller_note
            , get_json_object(columns ,'$.suppliers_id.value') as suppliers_id
            , get_json_object(columns ,'$.is_on_sale.value') as is_on_sale
            , get_json_object(columns ,'$.is_check.value') as is_check
            , get_json_object(columns ,'$.sales.value') as sales
            , get_json_object(columns ,'$.real_sales.value') as real_sales
            , get_json_object(columns ,'$.attribute.value') as attribute
            , get_json_object(columns ,'$.attribute_true.value') as attribute_true
            , get_json_object(columns ,'$.goods_middle.value') as goods_middle
            , get_json_object(columns ,'$.extra_price.value') as extra_price
            , get_json_object(columns ,'$.goods_no.value') as goods_no
            , get_json_object(columns ,'$.add_admin.value') as add_admin
            , get_json_object(columns ,'$.picker_group.value') as picker_group
            , get_json_object(columns ,'$.picker_group_code.value') as picker_group_code
            , get_json_object(columns ,'$.provider_id.value') as provider_id
            , get_json_object(columns ,'$.brand_id.value') as brand_id
            , get_json_object(columns ,'$.market_id.value') as market_id
            , get_json_object(columns ,'$.supplier_id.value') as supplier_id
            , get_json_object(columns ,'$.asking_status.value') as asking_status
            , get_json_object(columns ,'$.goods_status.value') as goods_status
            , get_json_object(columns ,'$.special_status.value') as special_status
            , get_json_object(columns ,'$.sales_of_7.value') as sales_of_7
            , get_json_object(columns ,'$.real_sales_of_7.value') as real_sales_of_7
            , get_json_object(columns ,'$.is_show_stock.value') as is_show_stock
            , get_json_object(columns ,'$.goods_model_id.value') as goods_model_id
            , get_json_object(columns ,'$.fabric_tag.value') as fabric_tag
            , get_json_object(columns ,'$.fabric_id.value') as fabric_id
            , get_json_object(columns ,'$.is_stockout.value') as is_stockout
            , get_json_object(columns ,'$.max_sales.value') as max_sales
            , get_json_object(columns ,'$.is_replenish.value') as is_replenish
            , get_json_object(columns ,'$.origin.value') as origin
            , get_json_object(columns ,'$.video_url.value') as video_url
            , get_json_object(columns ,'$.video_status.value') as video_status
            , get_json_object(columns ,'$.is_action.value') as is_action
            , get_json_object(columns ,'$.action_value.value') as action_value
            , get_json_object(columns ,'$.is_alone.value') as is_alone
            , get_json_object(columns ,'$.alone_discount.value') as alone_discount
            , get_json_object(columns ,'$.alone_price.value') as alone_price
            , get_json_object(columns ,'$.update_admin.value') as update_admin
            , get_json_object(columns ,'$.update_time.value') as update_time
            , get_json_object(columns ,'$.auto_time.value') as auto_time
            , get_json_object(columns ,'$.alone_sort.value') as alone_sort
            , nano_time as nano_time
            , event as event
        from xxx.ods_binlog_data_table
        where 
            dt = ${bdp.system.bizdate}
            and tb = 'fmys_goods'
    )
)
where row_number = 1 and event != 1
;

8. Java的nanoTime()

java有两个获取和时间相关的秒数方法,一个是广泛使用的 System.currentTimeMillis() , 返回的是从一个长整型结果,表示毫秒。

另一个是 System.nanoTime(), 返回的是纳秒。

“纳”这个单位 一般不是第一次见。前几年相当火爆的“纳米”和他是同一级别。纳表示的是10的-9次方。在真空中,光一纳秒也只能传播30厘米。比纳秒大一级别的是微秒,10的-6次方;然后是就是毫秒,10的-3次方。纳秒下面还有皮秒、飞秒等。既然纳秒比毫秒高10的6次方精度,那么他们的比值就应该是10的6次方。然而并非如此。

大家可能都知道毫秒方法返回的是自1970年到现在的毫秒数。而Java的日期也是如此,所以他俩是等值的。但是因为纳秒数值精度太高,所以不能从指定1970年到现在纳秒数,这个输出在不同的机器上可能不一样。

具体参考如下纳秒方法的注释:

Returns the current value of the running Java Virtual Machine's high-resolution time source, in nanoseconds.
This method can only be used to measure elapsed time and is not related to any other notion of system or wall-clock time. The value returned represents nanoseconds since some fixed but arbitrary origin time (perhaps in the future, so values may be negative). The same origin is used by all invocations of this method in an instance of a Java virtual machine; other virtual machine instances are likely to use a different origin.

返回当前JVM的高精度时间。该方法只能用来测量时段而和系统时间无关。
它的返回值是从某个固定但随意的时间点开始的(可能是未来的某个时间)。
不同的JVM使用的起点可能不同。

这样有点恐怖的是我们相同的代码在不同机器运行导致结果可能不同。所以它很少用来计算。通常都是测量,并且还有可能是负数。

所以如上7中,设置ods_fmys_goods_old表中nano_time的数值,才会设置一个负数,并且是一个够小的负数。

9. 创建视图完成按分钟级别更新数仓中的业务表

创建一个视图,从ods_fmys_goods_old表中获取今天的数据,然后从ods_binlog_data_table表中也获取今天并对应的业务表的数据,然后进行关联计算。

因为上述目录7脚本中对这2张表进行了MSCK REPAIR 操作,所以会导入到今天最新的分区,可以获取到最新的数据。

另外还有请注意,创建视图需要指定分区,所以在每天凌晨会删除对应的视图,然后通过参数,传入今天的日期,创建一个今天的视图,代码如下:

drop view if exists xxx.ods_fmys_goods;
create view if not exists xxx.ods_fmys_goods(
    goods_id
    , cat_id
    , special_id
    , goods_tag
    , brand_tag
    , custom_tag
    , goods_kh
    , goods_name
    , provider_name
    , enough_number
    , click_count
    , click_CR
    , goods_number
    , goods_weight
    , market_price
    , shoppe_price
    , shoppe_discount
    , vip_discount
    , shop_price
    , goods_brief
    , goods_desc
    , goods_thumb
    , goods_img
    , add_time
    , sort_order
    , is_delete
    , is_best
    , is_new
    , is_hot
    , last_update
    , seller_note
    , suppliers_id
    , is_on_sale
    , is_check
    , sales
    , real_sales
    , attribute
    , attribute_true
    , goods_middle
    , extra_price
    , goods_no
    , add_admin
    , picker_group
    , picker_group_code
    , provider_id
    , brand_id
    , market_id
    , supplier_id
    , asking_status
    , goods_status
    , special_status
    , sales_of_7
    , real_sales_of_7
    , is_show_stock
    , goods_model_id
    , fabric_tag
    , fabric_id
    , is_stockout
    , max_sales
    , is_replenish
    , origin
    , video_url
    , video_status
    , is_action
    , action_value
    , is_alone
    , alone_discount
    , alone_price
    , update_admin
    , update_time
    , auto_time
    , alone_sort
) comment 'fmys_goods表视图'
as
select
    goods_id
    , cat_id
    , special_id
    , goods_tag
    , brand_tag
    , custom_tag
    , goods_kh
    , goods_name
    , provider_name
    , enough_number
    , click_count
    , click_CR
    , goods_number
    , goods_weight
    , market_price
    , shoppe_price
    , shoppe_discount
    , vip_discount
    , shop_price
    , goods_brief
    , goods_desc
    , goods_thumb
    , goods_img
    , add_time
    , sort_order
    , is_delete
    , is_best
    , is_new
    , is_hot
    , last_update
    , seller_note
    , suppliers_id
    , is_on_sale
    , is_check
    , sales
    , real_sales
    , attribute
    , attribute_true
    , goods_middle
    , extra_price
    , goods_no
    , add_admin
    , picker_group
    , picker_group_code
    , provider_id
    , brand_id
    , market_id
    , supplier_id
    , asking_status
    , goods_status
    , special_status
    , sales_of_7
    , real_sales_of_7
    , is_show_stock
    , goods_model_id
    , fabric_tag
    , fabric_id
    , is_stockout
    , max_sales
    , is_replenish
    , origin
    , video_url
    , video_status
    , is_action
    , action_value
    , is_alone
    , alone_discount
    , alone_price
    , update_admin
    , update_time
    , auto_time
    , alone_sort
from (
    select
        goods_id
        , cat_id
        , special_id
        , goods_tag
        , brand_tag
        , custom_tag
        , goods_kh
        , goods_name
        , provider_name
        , enough_number
        , click_count
        , click_CR
        , goods_number
        , goods_weight
        , market_price
        , shoppe_price
        , shoppe_discount
        , vip_discount
        , shop_price
        , goods_brief
        , goods_desc
        , goods_thumb
        , goods_img
        , add_time
        , sort_order
        , is_delete
        , is_best
        , is_new
        , is_hot
        , last_update
        , seller_note
        , suppliers_id
        , is_on_sale
        , is_check
        , sales
        , real_sales
        , attribute
        , attribute_true
        , goods_middle
        , extra_price
        , goods_no
        , add_admin
        , picker_group
        , picker_group_code
        , provider_id
        , brand_id
        , market_id
        , supplier_id
        , asking_status
        , goods_status
        , special_status
        , sales_of_7
        , real_sales_of_7
        , is_show_stock
        , goods_model_id
        , fabric_tag
        , fabric_id
        , is_stockout
        , max_sales
        , is_replenish
        , origin
        , video_url
        , video_status
        , is_action
        , action_value
        , is_alone
        , alone_discount
        , alone_price
        , update_admin
        , update_time
        , auto_time
        , alone_sort
        , nano_time
        , event
        , row_number() over(partition by goods_id order by nano_time desc) as row_number
    from (
        select
            goods_id
            , cat_id
            , special_id
            , goods_tag
            , brand_tag
            , custom_tag
            , goods_kh
            , goods_name
            , provider_name
            , enough_number
            , click_count
            , click_CR
            , goods_number
            , goods_weight
            , market_price
            , shoppe_price
            , shoppe_discount
            , vip_discount
            , shop_price
            , goods_brief
            , goods_desc
            , goods_thumb
            , goods_img
            , add_time
            , sort_order
            , is_delete
            , is_best
            , is_new
            , is_hot
            , last_update
            , seller_note
            , suppliers_id
            , is_on_sale
            , is_check
            , sales
            , real_sales
            , attribute
            , attribute_true
            , goods_middle
            , extra_price
            , goods_no
            , add_admin
            , picker_group
            , picker_group_code
            , provider_id
            , brand_id
            , market_id
            , supplier_id
            , asking_status
            , goods_status
            , special_status
            , sales_of_7
            , real_sales_of_7
            , is_show_stock
            , goods_model_id
            , fabric_tag
            , fabric_id
            , is_stockout
            , max_sales
            , is_replenish
            , origin
            , video_url
            , video_status
            , is_action
            , action_value
            , is_alone
            , alone_discount
            , alone_price
            , update_admin
            , update_time
            , auto_time
            , alone_sort
            , -987654321012345 as nano_time
            , 0 as event
        from xxx.ods_fmys_goods_old
        where dt = substr(${bdp.system.cyctime}, 1, 8)

        union all

        select
            get_json_object(columns ,'$.goods_id.value') as goods_id
            , get_json_object(columns ,'$.cat_id.value') as cat_id
            , get_json_object(columns ,'$.special_id.value') as special_id
            , get_json_object(columns ,'$.goods_tag.value') as goods_tag
            , get_json_object(columns ,'$.brand_tag.value') as brand_tag
            , get_json_object(columns ,'$.custom_tag.value') as custom_tag
            , get_json_object(columns ,'$.goods_kh.value') as goods_kh
            , get_json_object(columns ,'$.goods_name.value') as goods_name
            , get_json_object(columns ,'$.provider_name.value') as provider_name
            , get_json_object(columns ,'$.enough_number.value') as enough_number
            , get_json_object(columns ,'$.click_count.value') as click_count
            , get_json_object(columns ,'$.click_CR.value') as click_CR
            , get_json_object(columns ,'$.goods_number.value') as goods_number
            , get_json_object(columns ,'$.goods_weight.value') as goods_weight
            , get_json_object(columns ,'$.market_price.value') as market_price
            , get_json_object(columns ,'$.shoppe_price.value') as shoppe_price
            , get_json_object(columns ,'$.shoppe_discount.value') as shoppe_discount
            , get_json_object(columns ,'$.vip_discount.value') as vip_discount
            , get_json_object(columns ,'$.shop_price.value') as shop_price
            , get_json_object(columns ,'$.goods_brief.value') as goods_brief
            , get_json_object(columns ,'$.goods_desc.value') as goods_desc
            , get_json_object(columns ,'$.goods_thumb.value') as goods_thumb
            , get_json_object(columns ,'$.goods_img.value') as goods_img
            , get_json_object(columns ,'$.add_time.value') as add_time
            , get_json_object(columns ,'$.sort_order.value') as sort_order
            , get_json_object(columns ,'$.is_delete.value') as is_delete
            , get_json_object(columns ,'$.is_best.value') as is_best
            , get_json_object(columns ,'$.is_new.value') as is_new
            , get_json_object(columns ,'$.is_hot.value') as is_hot
            , get_json_object(columns ,'$.last_update.value') as last_update
            , get_json_object(columns ,'$.seller_note.value') as seller_note
            , get_json_object(columns ,'$.suppliers_id.value') as suppliers_id
            , get_json_object(columns ,'$.is_on_sale.value') as is_on_sale
            , get_json_object(columns ,'$.is_check.value') as is_check
            , get_json_object(columns ,'$.sales.value') as sales
            , get_json_object(columns ,'$.real_sales.value') as real_sales
            , get_json_object(columns ,'$.attribute.value') as attribute
            , get_json_object(columns ,'$.attribute_true.value') as attribute_true
            , get_json_object(columns ,'$.goods_middle.value') as goods_middle
            , get_json_object(columns ,'$.extra_price.value') as extra_price
            , get_json_object(columns ,'$.goods_no.value') as goods_no
            , get_json_object(columns ,'$.add_admin.value') as add_admin
            , get_json_object(columns ,'$.picker_group.value') as picker_group
            , get_json_object(columns ,'$.picker_group_code.value') as picker_group_code
            , get_json_object(columns ,'$.provider_id.value') as provider_id
            , get_json_object(columns ,'$.brand_id.value') as brand_id
            , get_json_object(columns ,'$.market_id.value') as market_id
            , get_json_object(columns ,'$.supplier_id.value') as supplier_id
            , get_json_object(columns ,'$.asking_status.value') as asking_status
            , get_json_object(columns ,'$.goods_status.value') as goods_status
            , get_json_object(columns ,'$.special_status.value') as special_status
            , get_json_object(columns ,'$.sales_of_7.value') as sales_of_7
            , get_json_object(columns ,'$.real_sales_of_7.value') as real_sales_of_7
            , get_json_object(columns ,'$.is_show_stock.value') as is_show_stock
            , get_json_object(columns ,'$.goods_model_id.value') as goods_model_id
            , get_json_object(columns ,'$.fabric_tag.value') as fabric_tag
            , get_json_object(columns ,'$.fabric_id.value') as fabric_id
            , get_json_object(columns ,'$.is_stockout.value') as is_stockout
            , get_json_object(columns ,'$.max_sales.value') as max_sales
            , get_json_object(columns ,'$.is_replenish.value') as is_replenish
            , get_json_object(columns ,'$.origin.value') as origin
            , get_json_object(columns ,'$.video_url.value') as video_url
            , get_json_object(columns ,'$.video_status.value') as video_status
            , get_json_object(columns ,'$.is_action.value') as is_action
            , get_json_object(columns ,'$.action_value.value') as action_value
            , get_json_object(columns ,'$.is_alone.value') as is_alone
            , get_json_object(columns ,'$.alone_discount.value') as alone_discount
            , get_json_object(columns ,'$.alone_price.value') as alone_price
            , get_json_object(columns ,'$.update_admin.value') as update_admin
            , get_json_object(columns ,'$.update_time.value') as update_time
            , get_json_object(columns ,'$.auto_time.value') as auto_time
            , get_json_object(columns ,'$.alone_sort.value') as alone_sort
            , nano_time as nano_time
            , event as event
        from xxx.ods_binlog_data_table
        where 
            dt = substr(${bdp.system.cyctime}, 1, 8)
            and tb = 'fmys_goods'
    )
)
where row_number = 1 and event != 1
;

10. 创建定时调度作业

目的:

  1. 定时创建视图
  2. 定时将昨天的old表数据和昨天的binlog表数据合并,并导入今天的old表中
  3. 当数据不对或者失败时,可以通过重跑从业务库中拉取全量数据(提高抗风险性)

具体架构如下图:

  1. 该调度任务时间为凌晨 0点 30分
  2. 前置为创建视图脚本,如上目录9所示,该脚本目的为创建一个视图,该视图为按分钟级别更新业务库中的数据。
  3. 上面部分CDM任务的IF条件为 #{DateUtil.format(Job.planTime,"HHmmss") != '003000' ? 'true' : 'false'} ,即当调度时间不为 00点30分00秒 时,会执行此任务,从业务库中拉取全量数据到数仓中(即不是自动调度就会执行此选项)
  4. 下面部分DLI任务的IF条件为 #{DateUtil.format(Job.planTime,"HHmmss") == '003000' ? 'true' : 'false'} ,即当调度时间为 00点30分00秒 时,会执行此任务,将ods昨天的数据和binlog昨天的数据合并,并导入到ods今天的数据中(当自动调度就会执行此选项)

11. 总结

  1. 进行如上操作后,数仓中的业务库的表就会按照分钟级别更新,同理,对流量数据,也可以参考如上操作,做到分钟级别更新
  2. 分钟级别更新会比较消耗资源,但白天没作业运行时,集群中的资源也是空闲状态,可以利用上述方法提供一些分钟级别的数据协助
  3. 上述操作主要是提供分钟级别数据协助,这样消耗的资源比Flink的秒级更新会少很多,具体使用可以考虑资源和效率的选择
  4. 上述操作均在华为云中进行,如使用其他云平台或者使用开源大数据平台,可以参照上述思想来具体实现

注:其他相关文章链接由此进 -> 开发随笔文章汇总



本文转载自: https://blog.csdn.net/yang_shibiao/article/details/124040348
版权归原作者 电光闪烁 所有, 如有侵权,请联系我们删除。

“FlinkSQL+HDFS+Hive+SparkSQL实现业务数据增量进入数据仓库”的评论:

还没有评论