0


基于postgresql传统数据仓库搭建

目录

概述

数仓选型对比

数据库存储过程性能可扩展性安全性成本支持度数据一致性数据压缩数据备份和恢复数据分析功能PostgreSQL支持高高高低高高支持支持支持Oracle支持高高高高高高支持支持支持MySQL支持中中中低中中支持支持支持MSSQL支持高高高高高高支持支持支持Greenplum支持高高高高高高支持支持高Starrocks支持高高高中中高支持支持高GBase支持高高高中中高支持支持支持Hive不支持低高中低中中支持支持高Impala不支持高高高中中高支持支持高GaussDB支持高高高中中高支持支持支持

  • 考虑数据规模、计算规模以及成本
  • 数据规模较小,计算能力要求不高,预算低,选型Postgresql主从
  • 数据规模较大,计算能力要求高,有一定预算,选型Greenplum

当前数仓架构问题

  • 无法保证数据一致性:没有严格的数据血缘依赖,导致数据在计算时可能出现不一致的情况
  • 不可见的任务调度:没有一站式界面管理调度
  • 无对外提供数据能力:数仓无法对外输出表
  • 无任务告警:任务报错对开发人员无感知
  • 长时间的锁等待:业务库直接查询物化视图,物化视图的刷新造成的长时间的锁等待
  • 重复工作较多:开发一套环境,生产一套环境,大量的重复开发工作
  • 大材小用:业务库Mysql,数仓Postgresql,使用kettle大材小用了
  • 任务调度不合理:配置调度任务需要java发版,不太合理;任务调度只能调度一条select语句,不合理;任务调度只能每15分钟跑一次,不合理
  • 任务调度不统一:宽表用java做任务调度,kettle用crontab做任务调度,其余还有的用pg_cron做任务调度
  • 数据安全性低:用户管理与权限控制不完善,敏感信息未做脱敏处理
  • 报表查询效率低:没有数仓分层,导致报表查询效率较低

解决方案

问题解决方案说明无法保证数据一致性改用海豚调度/等待与唤醒海豚调度提供层级间的依赖,等待与唤醒提供表级别依赖不可见的任务调度改用海豚调度海豚调度提供可视化的界面来管理任务无对外提供数据能力Magic-APIMagic-API提供统一的接口平台,对外提供数据无任务告警改用海豚调度海豚调度提供了钉钉、企业微信等告警接口长时间的锁等待弃用物化视图,改变etl方式采用drop table和rename的方式做etl处理重复工作较多弃用开发环境,改用pg的用户模式映射开发环境即在生产环境上,在生产环境中区分开发用户和集中用户,开发用户为个人开发(即开发环境),集中用户为集中跑批的用户(即生产环境)大材小用弃用kettle,改用外部数据包装器或datax业务库只有Mysql,直接通过pg的插件mysql_fdw进行数据抽取;也可以通过海豚调度提供的datax组件进行数据抽取调度不合理改用海豚调度海豚调度提供的sql组件可以执行sql脚本,海豚调度可以自定义调度时间、频率和并发度任务调度不统一改用海豚调度海豚调度提供统一的任务调度平台数据安全性低严格控制用户的权限用户模式一一对应,禁用public的create权限,回收函数的execute权限报表查询效率低数仓分层ods、dw、dm、dim、app(ads)

架构设计

数据仓库设计

在这里插入图片描述

  • public:所有用户的根
  • dev:开发用户组 - yuzhenchao和yuxiaotan:以开发人员的全拼命名,属于dev用户组,建立的所有的对象都在自己同名的模式下,对其他模式没有create权限,只有usage权限
  • pro:生产用户组 - mdl:用来集中跑模型的用户,最终生成的模型在mdl模式下- apl:用来集中跑应用(报表)的用户,最终生成的结果表在apl模式下- jkfw:用于Magic-API连接数仓的用户,jkfw模式下存取magic-api的元数据
  • tool:工具用户,etl相关的工具存储过程将存放在tool模式下
  • readonly:只读用户组,在该组下面的用户拥有只读权限 - finebi:用于finebi连接数仓的用户- dbselect:用于dblink和fdw连接数仓的用户

命名规范

对象类型对象格式例子序列seq_*seq_dblink_id临时表*tmp_tmp_jg624_rb1ods层表ods_原库名_原表名ods_xkorder_orderdw层表dwd_/dws_/dw_*dwd_order_stage/dws_order_stage/dw_order_stagedm层表*dm_*dm_order_efficiencydim层表*dim_*dim_dealerapp层表接口的实时表:*jk_需求号_real
接口的日表:jk_需求号_rb
接口的周表 :jk_需求号_zb
接口的月表:jk_需求号_yb
finebi的实时表:bi_需求号_real
finebi的日表:bi_需求号_rb
finebi的周表:bi_需求号_zb
finebi的月表:bi_需求号_yb接口的实时表:jk_jg624_real
接口的日表:jk_jg624_rb
接口的周表 :jk_jg624_zb
接口的月表:jk_jg624_yb
finebi的实时表:bi_jg624_real
finebi的日表:bi_jg624_rb
finebi的周表:bi_jg624_zb
finebi的月表:bi_jg624_yb主键约束pk_表名_字段名pk_order_order_no索引idx_表名_字段名idx_order_order_id

模型设计

  • ods层(数据贴源层):用于存储从各个业务系统中提取的原始数据,保留数据的完整性和一致性,做一些简单的处理。比如,空字符串处理成null,is_delete字段统一处理成0和1
  • dwd层(数据明细层):用于存储经过清洗、加工、转换后的数据,保留数据的历史变化,为后续的数据分析和决策提供基础数据。比如,宽带表、移动表、itv表
  • dws层(数据汇总层):用于业务层面汇合的数据,提供更高效的数据查询和分析。比如,dwd层的宽带表、移动表、itv表就会在dws层合并成一个全业务表
  • dm层(数据集市层):存储高度聚合的数据。比如,按月汇总的应收表和实收表
  • dim层(公共维度层):用于存储与业务相关的维度信息,如时间、地域、产品等,为数据分析和决策提供维度支持。比如:代理商表,产品表等
  • app层(应用层):用于存储各种业务应用所需的数据,如报表、分析、可视化等,为业务应用提供数据支持

PostgreSQL的安装

Centos7.6安装postgresql15

【postgresql 数据库运维文档】

数据仓库的建立

创建数据库

createdatabase etl;
\c etl postgres

创建用户组

create role dev;create role pro;create role readonly;

创建用户

create role yuzhenchao with login password '${YZC_PWD}' connection limit20;create role yuxiaotan with login password '${YXT_PWD}' connection limit20;create role mdl with login password '${MDL_PWD}' connection limit250;create role apl with login password '${APL_PWD}' connection limit250;create role tool with login password '${TOOL_PWD}' connection limit20;create role finebi with login password '${FINEBI_PWD}' connection limit100;create role dbselect with login password '${DBSELECT_PWD}' connection limit100;create role jkfw with login password '${JKFW_PWD}' connection limit100;

用户加入到用户组

altergroup dev adduser yuzhenchao;altergroup dev adduser yuxiaotan;altergroup pro adduser mdl;altergroup pro adduser apl;altergroup pro adduser jkfw;altergroup readonly adduser finebi;altergroup readonly adduser dbselect;

创建模式

--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfwcreateschema ${USERNAME};

模式授权用户

--用户同名模式授权所有权限--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfwgrantcreate,usageonschema ${USERNAME} to ${USERNAME};--公开模式的usage权限--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfwgrantusageonschema ${USERNAME} topublic;--任何用户都拥有public模式的所有权限--出于安全,回收任何用户在public的create权限revokecreateonschemapublicfrompublic;

收回函数的执行权限

/*
 * pg中函数默认公开execute权限
 * 通过pg的基于schema和基于role的默认权限实现
 */--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw--在schema为yuzhenchao上创建的任何函数,除定义者外,其他人调用需要显式授权alterdefaultprivilegesfor role ${USERNAME} revokeexecuteon functions frompublic;--由yuzhenchao用户创建的任何函数,除定义者外,其他人调用需要显式授权alterdefaultprivilegesinschema ${USERNAME} revokeexecuteon functions frompublic;

公开表的select权限

--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、tool、jkfw--在schema为yuzhenchao上创建的任何表默认公开select权限alterdefaultprivilegesinschema ${USERNAME} grantselectontablestopublic;--由yuzhenchao用户创建的任何表默认公开select权限alterdefaultprivilegesfor role ${USERNAME} grantselectontablestopublic;

动态sql函数

/*
 * 为了方便各用户的管理
 * 需要用定义者权限创建动态sql函数
 * 最终由tool用户集中管理
 */createorreplacefunction tool.sp_exec(
    vsql charactervarying)returns void
    language plpgsql
    security defineras $function$ 
/*
 * 作者 : v-yuzhenc
 * 功能 : 以集定义者权限执行sql
 * vsql : 需要执行的sql语句
 * */beginexecute vsql;end;
$function$
;alterfunction tool.sp_exec(varchar) owner to tool;grantallonfunction tool.sp_exec(varchar)to tool;--${USERNAME}分别输入yuzhenchao、yuxiaotan、mdl、apl、jkfwcreateorreplacefunction ${USERNAME}.sp_exec(
    vsql charactervarying)returns void
    language plpgsql
    security defineras $function$ 
/*
 * 作者 : v-yuzhenc
 * 功能 : 以集定义者权限执行sql
 * vsql : 需要执行的sql语句
 * */beginexecute vsql;end;
$function$
;alterfunction ${USERNAME}.sp_exec(varchar) owner to ${USERNAME};grantallonfunction ${USERNAME}.sp_exec(varchar)to ${USERNAME};grantexecuteonfunction ${USERNAME}.sp_exec(varchar)to tool;

集中处理函数

createorreplacefunction tool.sp_execsql(
     exec_sql charactervarying,exec_user charactervarying)returns void
    language plpgsql
    security defineras $function$ 
/* 作者 : v-yuzhenc
 * 功能 : 集中处理程序,以某用户的权限执行某条sql语句
 * exec_sql : 需要执行的sql语句
 * exec_user : 需要以哪个用户的权限执行该sql语句
 * */declare 
    p_user varchar := exec_user;
    o_search_path varchar;begin--记录原来的模式搜索路径execute'show search_path;'into o_search_path;--临时切换模式搜索路径execute'SET search_path TO '||p_user||',public,oracle';case p_user 
        when'yuzhenchao'then perform yuzhenchao.sp_exec(exec_sql);when'yuxiaotan'then perform yuxiaotan.sp_exec(exec_sql);when'mdl'then perform mdl.sp_exec(exec_sql);when'apl'then perform apl.sp_exec(exec_sql);when'tool'then perform tool.sp_exec(exec_sql);when'jkfw'then perform jkfw.sp_exec(exec_sql);else raise exception '未配置该用户:%',p_user;endcase;--恢复模式搜索路径execute'SET search_path TO '||o_search_path;

    exception when others then--恢复模式搜索路径execute'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;end;
$function$
;alterfunction tool.sp_execsql(varchar,varchar) owner to tool;grantallonfunction tool.sp_execsql(varchar,varchar)to tool;

fdw实现数据抽取

安装mysql_fdw

mysql_fdw的安装与使用

安装postgres_fdw

create extension postgres_fdw;

授权tool用户fdw的使用

grantallonforeigndata wrapper mysql_fdw to tool;grantallonforeigndata WRAPPER postgres_fdw to tool;

创建连接信息表

\c etl tool
createtable tool.dblink_connection_info (
     connname varchar(63)notnull,conntype varchar(63)null,hostname varchar(15)null,port varchar(15)null,dbname varchar(63)null,username varchar(63)null,userpwd varchar(63)null,fdw_server varchar(63)null,createtime timestampnulldefaultcurrent_timestamp,constraint dblink_connection_info_pkey primarykey(connname));commentontable tool.dblink_connection_info is'存放用于dblink的连接信息,不公开';commentoncolumn tool.dblink_connection_info.connname is'连接名字,自定义即可,唯一';commentoncolumn tool.dblink_connection_info.conntype is'数据源类型';commentoncolumn tool.dblink_connection_info.hostname is'数据库所在主机ip地址';commentoncolumn tool.dblink_connection_info.port is'数据库所在端口';commentoncolumn tool.dblink_connection_info.dbname is'数据库名字';commentoncolumn tool.dblink_connection_info.username is'用于连接的用户名';commentoncolumn tool.dblink_connection_info.userpwd is'用户名对应的密码';commentoncolumn tool.dblink_connection_info.fdw_server is'对应的fdw_server的名字';commentoncolumn tool.dblink_connection_info.createtime is'创建时间';altertable tool.dblink_connection_info owner to tool;revokeselecton tool.dblink_connection_info frompublic;grantselect(connname,conntype,hostname,port,dbname,username,fdw_server ,createtime)on tool.dblink_connection_info topublic;

创建序列

create sequence tool.seq_tmp_fdw_id
    increment by1
    minvalue 1
    maxvalue 9999start1
    cache 1cycle;

创建fdw_server和用户映射

create server ${MYSQL_SERVER_NAME} foreigndata wrapper mysql_fdw options (host '${MYSQL_HOSTNAME}', port '${MYSQL_PORT}');createuser mapping forpublic server ${MYSQL_SERVER_NAME} options (username '${MYSQL_USERNAME}', password '${MYSQL_USERPWD}');create server ${PG_SERVER_NAME} foreigndata wrapper postgres_fdw options (host '${PG_HOSTNAME}', port '${PG_PORT}',dbname '${PG_DATABASE}');createuser mapping forpublic server ${PG_SERVER_NAME} options (user'${PG_USERNAME}', password '${PG_USERPWD}');

辅助函数

get_ddl

createorreplacefunction tool.get_ddl(
     schematable charactervarying,getmode charactervaryingdefault'table'::charactervarying,newtablename charactervaryingdefaultnull::charactervarying)returnstextlanguage plpgsql
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定表名(区分大小写),返回当前表名的建表语句,备注语句
 *         默认当前模式,其他模式请加 模式.表名
 * schematable : schemaname.tablename或者tablename
 * getmode : 默认table(获取表的建表语句)
 *         view(获取视图的建视图语句)
 *         viewtable(获取视图对应的建表语句)
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * */declare 
    p_tablename varchar;
    p_schemaname varchar :=user::varchar(64);
    p_newtablename varchar := newtablename;
    p_result text :=null;
    p_array varchar[];begin--校验getmode是否正确,不正确直接向外抛异常if getmode notin('table','view','viewtable')then 
        raise exception '参数2必须为table、view或者viewtable!';endif;--如果传参为null直接抛出异常if schematable isnullthen 
        raise exception '表名或视图名不能为空!';endif;--含有多个点时,直接抛出异常--if instr(schematable,'.',1,2) <> 0 then --    raise exception '表名或视图名输入不正确!';--end if;--解析schematable
    p_array := string_to_array(schematable,'.');if p_array[2]isnullthen
        p_tablename := p_array[1];else 
        p_tablename := trim(p_array[2]);
        p_schemaname := trim(p_array[1]);endif;
    p_newtablename :=coalesce(p_newtablename,lower(p_tablename));if getmode in('table','viewtable')thenif getmode ='table'andnotexists(select1from pg_tables where tablename = p_tablename and schemaname = p_schemaname)then 
            raise exception '%.%表不存在!',p_schemaname,p_tablename;
        elsif getmode ='viewtable'andnotexists(select1from pg_views where viewname = p_tablename and schemaname = p_schemaname)then
            raise exception '%.%视图不存在!',p_schemaname,p_tablename;endif;select'drop table if exists "'||p_newtablename||'";'||
            chr(10)||'create table "'||p_newtablename||'" ('||
            chr(10)||
            string_agg(chr(9)||casewhen attnum =1then' 'else','end||'"'||c.attname||'" '||--字段名
                format_type(c.atttypid, c.atttypmod)||--字段类型coalesce(' default '||substring(pg_catalog.pg_get_expr(d.adbin, d.adrelid)for128),'')||--字段默认值casewhen c.attnotnull =truethen' not null'else' null'end,chr(10)orderby c.attnum
            )||--主键约束coalesce(chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
            chr(10)||');'||--压缩信息--coalesce(' with ( '||chr(10)||chr(9)||' '||array_to_string(a.reloptions,chr(10)||chr(9)||',')||chr(10)||')','') ||--分布策略--case when e.policytype = 'r' then ' distributed replicated;' when e.policytype = 'p' then coalesce(' distributed by ('||--string_agg(case when array_position(string_to_array(array_to_string(e.distkey::int2[],','),',')::int[],c.attnum::int,1) <> 0 then '"'||c.attname||'"' end,',' order by string_to_array(array_to_string(e.distkey::int2[],','),',')::int[])||--');',' distributed randomly;') else ' distributed randomly;' end||--表备注(注释)coalesce(chr(10)||'comment on table "'||p_newtablename||'" is '''||replace(h.description,'''','''''')||''';','')||--字段备注(注释)coalesce(chr(10)||string_agg(casewhen f.description isnotnullthen'comment on column "'||p_newtablename||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';'end,chr(10)orderby c.attnum),'')into p_result
        from pg_class a 
        innerjoin pg_namespace b 
        on(a.relnamespace = b.oid)innerjoin pg_attribute c 
        on(a.oid = c.attrelid)leftjoin pg_attrdef d 
        on(c.attrelid = d.adrelid and c.attnum = d.adnum)--left join gp_distribution_policy e --on (a.oid = e.localoid)leftjoin pg_description f 
        on(a.oid = f.objoid and c.attnum = f.objsubid)leftjoin(select d.indrelid
                ,string_agg('"'||c.attname||'"',','orderby c.attnum) prikey
            from pg_class a, pg_namespace b, pg_attribute c, pg_index d 
            where a.relnamespace = b.oid
                and a.oid = c.attrelid
                and a.oid = d.indrelid
                and d.indisprimary =trueand c.attnum =any(d.indkey)and a.relname = p_tablename
                and b.nspname = p_schemaname  
             groupby d.indrelid
        ) g 
        on(a.oid = g.indrelid)leftjoin pg_description h 
        on(a.oid = h.objoid and h.objsubid =0)where c.attnum >0andnot c.attisdropped
            and a.relname = p_tablename
            and b.nspname = p_schemaname
        groupby b.nspname,a.relname,a.reloptions,h.description,g.prikey;elseif getmode ='view'andnotexists(select1from pg_views where viewname = p_tablename and schemaname = p_schemaname)then
            raise exception '%.%视图不存在!',p_schemaname,p_tablename;endif;select' CREATE OR REPLACE VIEW "'||p_newtablename||'" AS '||chr(10)||d.definition||--表备注(注释)coalesce(chr(10)||'comment on view "'||p_newtablename||'" is '''||replace(h.description,'''','''''')||''';','')||--字段备注(注释)coalesce(chr(10)||string_agg(casewhen f.description isnotnullthen'comment on column "'||p_newtablename||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';'end,chr(10)orderby c.attnum),'')into p_result
        from pg_class a 
        innerjoin pg_namespace b 
        on(a.relnamespace = b.oid)innerjoin pg_attribute c 
        on(a.oid = c.attrelid)leftjoin pg_description f 
        on(a.oid = f.objoid and c.attnum = f.objsubid)innerjoin pg_views d
        on(b.nspname = d.schemaname and a.relname = d.viewname)leftjoin pg_description h 
        on(a.oid = h.objoid and h.objsubid =0)where d.viewname = p_tablename
            and d.schemaname = p_schemaname
        groupby a.relname,d.definition,h.description;endif;return p_result;end;
$function$
;grantexecuteonfunction tool.get_ddl(varchar,varchar,varchar)topublic;

get_ddl_pg2mysql

createorreplacefunction tool.get_ddl_pg2mysql(
     tablename charactervarying,schemaname charactervarying,newtablename charactervaryingdefaultnull::charactervarying)returnstextlanguage plpgsql
AS $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定本地pg数据库的表名、模式名,
 *        以mysql的语法返回指定模式下指定表的ddl语句
 * tablename : 指定pg的表名
 * schemaname : 指定pg的模式名
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * */declare 
    p_tablename varchar := tablename;
    p_schemaname varchar := schemaname;
    p_newtablename varchar := newtablename;
    p_result text :=null;
    existbj int;
    v_sql varchar;begin--如果传参为null直接抛出异常if p_tablename isnullthen 
        raise exception '表名或视图名不能为空!';endif;if p_schemaname isnullthen 
        raise exception '模式名不能为空!';endif;
    p_newtablename :=coalesce(p_newtablename,lower(p_tablename));--判断表或视图是否存在execute $v_sql$
        select1from pg_tables
        where tablename ='$v_sql$||p_tablename||$v_sql$'and schemaname ='$v_sql$||p_schemaname||$v_sql$'unionallselect1from pg_views
        where viewname ='$v_sql$||p_tablename||$v_sql$'and schemaname ='$v_sql$||p_schemaname||$v_sql$';
    $v_sql$ into existbj;if existbj isnullthen
        raise exception '表或视图不存在!';endif;
    v_sql := $v_sql$
        select'drop table if exists `'||'$v_sql$||p_newtablename||$v_sql$'||'`;'||
            chr(10)||'create table `'||'$v_sql$||p_newtablename||$v_sql$'||'` ('||
            chr(10)||
            string_agg(chr(9)||casewhen c.attnum =1then' 'else','end||'`'||c.attname||'` '||--字段名casewhen i.data_type ='int'then'int(11)'when i.data_type ='character varying'thencasewhen regexp_substr(format_type(c.atttypid, c.atttypmod),'[1-9]+')isnullor regexp_substr(format_type(c.atttypid, c.atttypmod),'[1-9]+')::int>16341then'text'elsereplace(format_type(c.atttypid, c.atttypmod),'character varying','varchar')endwhen i.data_type ='character'thenreplace(format_type(c.atttypid, c.atttypmod),'character','char')when i.data_type ='date'then'date'when i.data_type ='timestamp with time zone'thenreplace(replace(format_type(c.atttypid, c.atttypmod),' with time zone',''),'timestamp','datetime')when i.data_type ='timestamp without time zone'thenreplace(replace(format_type(c.atttypid, c.atttypmod),' without time zone',''),'timestamp','datetime')when i.data_type ='bigint'then'bigint(20)'when i.data_type ='double precision'then'double'when i.data_type ='smallint'then'smallint(6)'when i.data_type ='text'then'text'when i.data_type ='bytea'then'blob'when i.data_type ='real'then'float'when i.data_type ='numeric'then format_type(c.atttypid, c.atttypmod)when i.data_type ='time'then'interval'when i.data_type ='json'then'json'else'text'end||--字段类型casewhen c.attnotnull =trueor((d.typtype ='d'::"char")AND d.typnotnull)then' not null'else' null'end||coalesce(' comment '''||replace(f.description,'''','''''')||'''',''),chr(10)orderby c.attnum
                
            )||--主键约束coalesce(chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
            chr(10)||')'||coalesce('comment '''||replace(h.description,'''','''''')||'''','')||';'from pg_class a 
        innerjoin pg_namespace b 
        on(a.relnamespace = b.oid)innerjoin pg_attribute c 
        on(a.oid = c.attrelid)leftjoin pg_type d
        on(c.atttypid = d.oid)leftjoin pg_description f 
        on(a.oid = f.objoid and c.attnum = f.objsubid)leftjoin(select d.indrelid
                ,string_agg('`'||c.attname||'`',','orderby c.attnum) prikey
            from pg_class a, pg_namespace b, pg_attribute c, pg_index d 
            where a.relnamespace = b.oid
                and a.oid = c.attrelid
                and a.oid = d.indrelid
                and d.indisprimary =trueand c.attnum =any(d.indkey)and a.relname ='$v_sql$||p_tablename||$v_sql$'and b.nspname ='$v_sql$||p_schemaname||$v_sql$'groupby d.indrelid
        ) g 
        on(a.oid = g.indrelid)leftjoin pg_description h 
        on(a.oid = h.objoid and h.objsubid =0)leftjoin information_schema.columns i
        on(a.relname = i.table_name and b.nspname = i.table_schema and c.attnum = i.ordinal_position)where c.attnum >0andnot c.attisdropped
            and a.relname ='$v_sql$||p_tablename||$v_sql$'and b.nspname ='$v_sql$||p_schemaname||$v_sql$'groupby b.nspname,a.relname,h.description,g.prikey;
    $v_sql$
    ;execute v_sql into p_result;return p_result;end;
$function$
;grantexecuteonfunction tool.get_ddl_pg2mysql(varchar,varchar,varchar)topublic;

get_ddl_remote_mysql2pg

createorreplacefunction tool.get_ddl_remote_mysql2pg(
     tablename charactervarying,schemaname charactervarying,newtablename charactervaryingdefaultnull::charactervarying,remote_connname charactervaryingdefault'${CONNNAME}'::charactervarying)returnstextlanguage plpgsql
    security defineras $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定远程mysql数据库的表名、库名和连接信息,
 *        以pg的语法返回指定库下指定表的ddl语句
 * tablename : 指定mysql的表名
 * schemaname : 指定mysql的库名
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * remote_connname:远程连接名,有效值为 select connname from tool.dblink_connection_info;
 * */declare 
    p_tablename varchar := tablename;
    p_schemaname varchar := schemaname;
    p_newtablename varchar := newtablename;
    p_remote_connname varchar := remote_connname;
    p_result text :=null;
    tmp_fdw_id varchar := nextval('seq_tmp_fdw_id')::varchar;
    tbname_1 varchar :='tmp_fdw_tables_'||tmp_fdw_id;
    tbname_2 varchar :='tmp_fdw_views_'||tmp_fdw_id;
    tbname_3 varchar :='tmp_fdw_columns_'||tmp_fdw_id;
    existbj int;
    v_sql varchar;
    o_search_path varchar;--模式搜索路径begin--如果传参为null直接抛出异常if p_tablename isnullthen 
        raise exception '表名或视图名不能为空!';endif;if p_schemaname isnullthen 
        raise exception '模式名(库名)不能为空!';endif;
    p_newtablename :=coalesce(p_newtablename,lower(p_tablename));--记录原来的模式搜索路径execute'show search_path;'into o_search_path;--临时切换模式搜索路径execute'SET search_path TO tool,'||o_search_path;--创建外部表select $v_sql$
        --存在临时的外部表时,直接删除dropforeigntableifexists $v_sql$||tbname_1||$v_sql$;dropforeigntableifexists $v_sql$||tbname_2||$v_sql$;dropforeigntableifexists $v_sql$||tbname_3||$v_sql$;--创建tables映射表createforeigntable $v_sql$||tbname_1||$v_sql$(
             table_name varchar(64),table_schema varchar(64),table_comment text) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'tables');--创建views映射表createforeigntable $v_sql$||tbname_2||$v_sql$(
             table_name varchar(64),table_schema varchar(64)) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'views');--创建columns映射表createforeigntable $v_sql$||tbname_3||$v_sql$(
             table_schema varchar(64),table_name varchar(64),column_name varchar(64),ordinal_position int,is_nullable varchar(3),data_type text,column_type text,column_comment text,column_key varchar(3)) server $v_sql$||fdw_server||$v_sql$ options(dbname 'information_schema',table_name 'columns');
    $v_sql$
    into v_sql
    from tool.dblink_connection_info
    where connname = p_remote_connname
    ;execute v_sql;--判断表或视图是否存在execute $v_sql$
        select1from"$v_sql$||tbname_1||$v_sql$"where table_name ='$v_sql$||p_tablename||$v_sql$'and table_schema ='$v_sql$||p_schemaname||$v_sql$'unionallselect1from"$v_sql$||tbname_2||$v_sql$"where table_name ='$v_sql$||p_tablename||$v_sql$'and table_schema ='$v_sql$||p_schemaname||$v_sql$';
    $v_sql$ into existbj;if existbj isnullthen
        raise exception '表或视图不存在!';endif;
    v_sql := $v_sql$
        with tmp_a as((select'$v_sql$||p_newtablename||$v_sql$'as table_name
                ,table_schema
                ,coalesce(chr(10)||'comment on table "'||'$v_sql$||p_newtablename||$v_sql$'||'" is '''||replace(casewhen table_comment =''thennullelse table_comment end,'''','''''')||''';','')as table_comment
            from $v_sql$||tbname_1||$v_sql$ 
            where upper(table_name)= upper('$v_sql$||p_tablename||$v_sql$')and upper(table_schema)= upper('$v_sql$||p_schemaname||$v_sql$')limit1)unionall(select'$v_sql$||p_newtablename||$v_sql$'as table_name
                ,table_schema
                ,nullas table_comment
            from $v_sql$||tbname_2||$v_sql$ 
            where upper(table_name)= upper('$v_sql$||p_tablename||$v_sql$')and upper(table_schema)= upper('$v_sql$||p_schemaname||$v_sql$')limit1)), tmp_b as(select'$v_sql$||p_newtablename||$v_sql$'as table_name
                ,table_schema
                ,string_agg(chr(9)||casewhen ordinal_position =1then' 'else','end||--字段名'"'||lower(column_name)||'"'||' '||--数据类型casewhen data_type ='int'then data_type
                        when data_type ='varchar'thenreplace(column_type,'varchar(0)','varchar(1)')when data_type ='char'thenreplace(replace(column_type,'char','varchar'),'varchar(0)','varchar(1)')when data_type ='date'then'date'when data_type ='datetime'thenreplace(column_type, data_type,'timestamp')when data_type ='timestamp'then'timestamp'when data_type ='bigint'then'bigint'when data_type ='double'then'double precision'when data_type ='smallint'then'smallint'when data_type ='decimal'thenreplace(column_type,'unsigned zerofill','')when data_type ='longtext'then'text'when data_type ='text'then'text'when data_type ='tinyint'then'int'when data_type ='longblob'then'bytea'when data_type ='blob'then'bytea'when data_type ='float'then'real'when data_type ='tinytext'then'text'when data_type ='mediumtext'then'text'when data_type ='numeric'then'numeric'when data_type ='time'then'interval'when data_type ='json'then'json'else'varchar'end||' '||--空约束casewhen is_nullable ='NO'then'not null'when is_nullable ='NO'then'null'else'null'end,chr(10)orderby ordinal_position
                 )||--主键约束coalesce(chr(10)||chr(9)||',primary key ('||string_agg(casewhen column_key ='PRI'then'"'||lower(column_name)||'"'end,','orderby ordinal_position)||')','')as column_info
                --字段备注,coalesce(chr(10)||string_agg('comment on column "'||'$v_sql$||p_newtablename||$v_sql$'||'"."'||lower(column_name)||'" is '''||replace(casewhen column_comment =''thennullelse column_comment end,'''','''''')||''';',chr(10)orderby ordinal_position),'') column_comment
            from $v_sql$||tbname_3||$v_sql$
            where upper(table_name)= upper('$v_sql$||p_tablename||$v_sql$')and upper(table_schema)= upper('$v_sql$||p_schemaname||$v_sql$')groupby 
                 table_schema
            limit1)select'drop table if exists "'||a.table_name||'";'||chr(10)||'create table "'||a.table_name||'" ('||chr(10)||
            b.column_info||chr(10)||');'||a.table_comment||b.column_comment
        from tmp_a a
        innerjoin tmp_b b 
        on(a.table_schema = b.table_schema and a.table_name = b.table_name);
    $v_sql$
    ;execute v_sql into p_result;--删除临时外部表execute $v_sql$
        --存在临时的外部表时,直接删除dropforeigntableifexists $v_sql$||tbname_1||$v_sql$;dropforeigntableifexists $v_sql$||tbname_2||$v_sql$;dropforeigntableifexists $v_sql$||tbname_3||$v_sql$;
    $v_sql$
    ;--恢复模式搜索路径execute'SET search_path TO '||o_search_path;return p_result;
    exception when others then--恢复模式搜索路径execute'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;end;
$function$
;grantexecuteonfunction tool.get_ddl_remote_mysql2pg(varchar,varchar,varchar,varchar)topublic;

get_ddl_remote_pg2pg

createorreplacefunction tool.get_ddl_remote_pg2pg(
     tablename charactervarying,schemaname charactervarying,newtablename charactervaryingdefaultnull::charactervarying,remote_connname charactervaryingdefault'${CONNNAME}'::charactervarying)returnstextlanguage plpgsql
    security defineras $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定远程pg数据库的表名、模式名和连接信息,
 *        以pg的语法返回指定模式下指定表的ddl语句
 * tablename : 指定pg的表名
 * schemaname : 指定pg的模式名
 * newtablename : 以指定新表名返回建表语句,默认与原表名相同
 * remote_connname:远程连接名,有效值为 select connname from tool.dblink_connection_info;
 * */declare 
    p_tablename varchar := tablename;
    p_schemaname varchar := schemaname;
    p_newtablename varchar := newtablename;
    p_remote_connname varchar := remote_connname;
    p_result text :=null;
    tmp_fdw_id varchar := nextval('seq_tmp_fdw_id')::varchar;
    tbname_1 varchar :='tmp_fdw_pg_class_'||tmp_fdw_id;
    tbname_2 varchar :='tmp_fdw_pg_namespace_'||tmp_fdw_id;
    tbname_3 varchar :='tmp_fdw_pg_attribute_'||tmp_fdw_id;
    tbname_5 varchar :='tmp_fdw_pg_description_'||tmp_fdw_id;
    tbname_6 varchar :='tmp_fdw_pg_index_'||tmp_fdw_id;
    tbname_8 varchar :='tmp_fdw_pg_tables_'||tmp_fdw_id;
    tbname_9 varchar :='tmp_fdw_pg_views_'||tmp_fdw_id;
    existbj int;
    v_sql varchar;
    o_search_path varchar;--模式搜索路径begin--如果传参为null直接抛出异常if p_tablename isnullthen 
        raise exception '表名或视图名不能为空!';endif;if p_schemaname isnullthen 
        raise exception '模式名不能为空!';endif;
    p_newtablename :=coalesce(p_newtablename,lower(p_tablename));--记录原来的模式搜索路径execute'show search_path;'into o_search_path;--临时切换模式搜索路径execute'SET search_path TO tool,'||o_search_path;--创建外部表select $v_sql$
        --存在临时的外部表时,直接删除dropforeigntableifexists $v_sql$||tbname_1||$v_sql$;dropforeigntableifexists $v_sql$||tbname_2||$v_sql$;dropforeigntableifexists $v_sql$||tbname_3||$v_sql$;dropforeigntableifexists $v_sql$||tbname_5||$v_sql$;dropforeigntableifexists $v_sql$||tbname_6||$v_sql$;dropforeigntableifexists $v_sql$||tbname_8||$v_sql$;dropforeigntableifexists $v_sql$||tbname_9||$v_sql$;--创建pg_class映射表createforeigntable $v_sql$||tbname_1||$v_sql$("oid" oid notnull,"relname" name notnull,"relnamespace" oid notnull) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_class');createforeigntable $v_sql$||tbname_2||$v_sql$("oid" oid notnull,"nspname" name notnull) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_namespace');createforeigntable $v_sql$||tbname_3||$v_sql$("attrelid" oid notnull,"attname" name notnull,"atttypid" oid notnull,"attnum"smallintnotnull,"atttypmod"integernotnull,"attnotnull"booleannotnull,"attisdropped"booleannotnull) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_attribute');createforeigntable $v_sql$||tbname_5||$v_sql$("objoid" oid notnull,"classoid" oid notnull,"objsubid"integernotnull,"description"textnotnull) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_description');createforeigntable $v_sql$||tbname_6||$v_sql$("indrelid" oid notnull,"indisprimary"booleannotnull,"indkey" int2vector notnull) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_index');createforeigntable $v_sql$||tbname_8||$v_sql$("schemaname" name null,"tablename" name null) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_tables');createforeigntable $v_sql$||tbname_9||$v_sql$("schemaname" name null,"viewname" name null) server $v_sql$||fdw_server||$v_sql$ options(schema_name 'pg_catalog',table_name 'pg_views');
    $v_sql$
    into v_sql
    from tool.dblink_connection_info
    where connname = p_remote_connname
    ;execute v_sql;--判断表或视图是否存在execute $v_sql$
        select1from"$v_sql$||tbname_8||$v_sql$"where tablename ='$v_sql$||p_tablename||$v_sql$'and schemaname ='$v_sql$||p_schemaname||$v_sql$'unionallselect1from"$v_sql$||tbname_9||$v_sql$"where viewname ='$v_sql$||p_tablename||$v_sql$'and schemaname ='$v_sql$||p_schemaname||$v_sql$';
    $v_sql$ into existbj;if existbj isnullthen
        raise exception '表或视图不存在!';endif;
    v_sql := $v_sql$
        select'drop table if exists "'||'$v_sql$||p_newtablename||$v_sql$'||'";'||
            chr(10)||'create table "'||'$v_sql$||p_newtablename||$v_sql$'||'" ('||
            chr(10)||
            string_agg(chr(9)||casewhen c.attnum =1then' 'else','end||'"'||c.attname||'" '||--字段名
                format_type(c.atttypid, c.atttypmod)||--字段类型casewhen c.attnotnull =truethen' not null'else' null'end,chr(10)orderby c.attnum
            )||--主键约束coalesce(chr(10)||chr(9)||',primary key ('||g.prikey||')','')||
            chr(10)||');'||--表备注(注释)coalesce(chr(10)||'comment on table "'||'$v_sql$||p_newtablename||$v_sql$'||'" is '''||replace(h.description,'''','''''')||''';','')||--字段备注(注释)coalesce(chr(10)||string_agg(casewhen f.description isnotnullthen'comment on column "'||'$v_sql$||p_newtablename||$v_sql$'||'"."'||c.attname||'" is '''||replace(f.description,'''','''''')||''';'end,chr(10)orderby c.attnum),'')from $v_sql$||tbname_1||$v_sql$ a 
        innerjoin $v_sql$||tbname_2||$v_sql$ b 
        on(a.relnamespace = b.oid)innerjoin $v_sql$||tbname_3||$v_sql$ c 
        on(a.oid = c.attrelid)leftjoin $v_sql$||tbname_5||$v_sql$ f 
        on(a.oid = f.objoid and c.attnum = f.objsubid)leftjoin(select d.indrelid
                ,string_agg('"'||c.attname||'"',','orderby c.attnum) prikey
            from $v_sql$||tbname_1||$v_sql$ a, $v_sql$||tbname_2||$v_sql$ b, $v_sql$||tbname_3||$v_sql$ c, $v_sql$||tbname_6||$v_sql$ d 
            where a.relnamespace = b.oid
                and a.oid = c.attrelid
                and a.oid = d.indrelid
                and d.indisprimary =trueand c.attnum =any(d.indkey)and a.relname ='$v_sql$||p_tablename||$v_sql$'and b.nspname ='$v_sql$||p_schemaname||$v_sql$'groupby d.indrelid
        ) g 
        on(a.oid = g.indrelid)leftjoin $v_sql$||tbname_5||$v_sql$ h 
        on(a.oid = h.objoid and h.objsubid =0)where c.attnum >0andnot c.attisdropped
            and a.relname ='$v_sql$||p_tablename||$v_sql$'and b.nspname ='$v_sql$||p_schemaname||$v_sql$'groupby b.nspname,a.relname,h.description,g.prikey;
    $v_sql$
    ;execute v_sql into p_result;--删除临时外部表execute $v_sql$
        --存在临时的外部表时,直接删除dropforeigntableifexists $v_sql$||tbname_1||$v_sql$;dropforeigntableifexists $v_sql$||tbname_2||$v_sql$;dropforeigntableifexists $v_sql$||tbname_3||$v_sql$;dropforeigntableifexists $v_sql$||tbname_5||$v_sql$;dropforeigntableifexists $v_sql$||tbname_6||$v_sql$;dropforeigntableifexists $v_sql$||tbname_8||$v_sql$;dropforeigntableifexists $v_sql$||tbname_9||$v_sql$;
    $v_sql$
    ;--恢复模式搜索路径execute'SET search_path TO '||o_search_path;return p_result;
    exception when others then--恢复模式搜索路径execute'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;end;
$function$
;grantexecuteonfunction tool.get_ddl_remote_pg2pg(varchar,varchar,varchar,varchar)topublic;

sp_extract_mapping

createorreplacefunction tool.sp_extract_mapping(
     tablename charactervarying,schemaname charactervarying,newtablename charactervarying,remote_connname charactervaryingdefault'${CONNNAME}'::charactervarying)returns void
    language plpgsql
    security defineras $function$
/* 作者 : v-yuzhenc
 * 功能 : 给定远程(远程连接)表的表名,模式名(mysql为库名),
 *        在tool用户下创建一个指定新表名的外部映射表,
 *        访问该映射表,相当于直接访问远程表。
 *        select * from tool.newtablename;
 * tablename : 指定远程表的表名
 * schemaname : 指定远程表的模式名(mysql为库名)
 * newtablename : 在tool用户下创建的外部表表名
 * remote_connname : 远程连接名,来自tool.dblink_connection_info.connname
 * */declare 
    p_tablename varchar := tablename;
    p_schemaname varchar := schemaname;
    p_newtablename varchar := newtablename;
    p_remote_connname tool.dblink_connection_info.connname%type := remote_connname;
    v_datasource_type tool.dblink_connection_info.conntype%type;
    v_sql varchar;
    o_search_path varchar;--模式搜索路径begin--判断表是否为空if p_tablename isnullthen 
        raise exception '参数tablename不能为空!';endif;if p_schemaname isnullthen 
        raise exception '参数schemaname不能为空!';endif;if p_newtablename notlike'tmp\_%'then 
        raise exception '参数newtablename必须以tmp_开头!';endif;--记录原来的模式搜索路径execute'show search_path;'into o_search_path;--临时切换模式搜索路径execute'SET search_path TO tool,'||o_search_path;
    
    v_datasource_type := conntype from tool.dblink_connection_info where connname = p_remote_connname limit1;case v_datasource_type 
        when'pg'then v_sql := tool.get_ddl_remote_pg2pg(p_tablename,p_schemaname,p_newtablename,p_remote_connname);when'mysql'then v_sql := tool.get_ddl_remote_mysql2pg(p_tablename,p_schemaname,p_newtablename,p_remote_connname);else v_sql :=null;endcase;if v_sql isnullthen 
        raise exception '不支持的数据源类型!目前只支持mysql和pg!';endif;--拼接外部表ddlselectreplace(replace(replace(replace(replace(regexp_replace(v_sql
        ,' not null| null','','g'),'drop table ','drop foreign table '),'create table ','create foreign table '),'comment on ','--comment on '),',primary key ','--,primary key '),');',') server '||fdw_server||' options('||case conntype when'pg'then'schema_name 'when'mysql'then'dbname'end||''''||p_schemaname||''''||',table_name '||''''||p_tablename||''''||');')into v_sql
    from tool.dblink_connection_info
    where connname = p_remote_connname;execute v_sql;--execute 'show search_path;' into o_search_path;--raise notice '%',v_sql;--raise notice '%',o_search_path;--恢复模式搜索路径execute'SET search_path TO '||o_search_path;
    exception when others then--恢复模式搜索路径execute'SET search_path TO '||o_search_path;
        raise exception '%',sqlerrm;end;
$function$
;grantexecuteonfunction tool.sp_extract_mapping(varchar,varchar,varchar,varchar)topublic;

等待与唤醒

创建dblink插件

\c etl postgres
create extension dblink;

ETL任务通知表

\c etl tool
CREATETABLE tool.etl_task_notice (
    table_name varchar(64)NULL,-- 表名
    schema_name varchar(64)NULL,-- 模式名
    update_time timestampNULLDEFAULTCURRENT_TIMESTAMP-- 表的更新时间);COMMENTONTABLE tool.etl_task_notice IS'etl任务通知,etl任务完成时,得到结果表后向该表中插入一条数据';COMMENTONCOLUMN tool.etl_task_notice.table_name IS'表名';COMMENTONCOLUMN tool.etl_task_notice.schema_name IS'模式名';COMMENTONCOLUMN tool.etl_task_notice.update_time IS'表的更新时间';ALTERTABLE tool.etl_task_notice OWNER TO tool;

创建序列

create sequence tool.seq_dblink_sessionid
    increment by1
    minvalue 1
    maxvalue 9999start1
    cache 1cycle;

辅助函数

is_return_result

createorreplacefunction tool.is_return_result(
     select_statement charactervarying,retrytimes integerdefault60)returnsintegerlanguage plpgsql
    security defineras $function$
/*
 * 作者:v-yuzhenc
 * 功能:执行动态select语句,并且该执行过程是自治的,
 *      判断是否有结果返回,有则返回1,否则返回0
 * vsql:执行的动态sql
 * retrytimes:拿不到连接时拿连接重试次数,默认重试60次
 * */declare
    p_select_statement varchar := select_statement;
    v_sql varchar;--动态sql
    p_retrytimes int := retrytimes;
    p_count int :=0;
    p_session_id varchar := nextval('seq_dblink_sessionid')::varchar;
    p_session_name varchar :='dblink_'||p_session_id;
    p_result int :=0;
    dblink_conn varchar;begin--尝试拿连接whiletrueloopbegin--获取dblink连接select'host='||hostname||' port='||port||' dbname='||dbname||' user='||username||' password='||userpwd 
        into dblink_conn
        from tool.dblink_connection_info
        where connname ='101.34.75.200-pg-etl';
        perform dblink_connect(p_session_name,dblink_conn);exit;
        exception when others thenif p_count >= p_retrytimes thenexit;endif;
        p_count := p_count +1;--睡眠1s再拿连接
        perform pg_sleep(1);continue;end;endloop;
    v_sql :='select 1 from ('|| p_select_statement ||') a limit 1';--执行动态sql语句selectcount(1)into p_result from dblink(p_session_name,v_sql)as(id int);--关闭dblink连接
    perform dblink_disconnect(p_session_name);return p_result;--报错时得先把连接关掉再把错误抛出来
    exception when others thenbegin
            perform dblink_disconnect(p_session_name);
            exception when others thennull;end;
        raise exception '%',SQLERRM;end;
$function$
;grantallonfunction tool.is_return_result(varchar, int4)topublic;

get_15_interval_time

createorreplacefunction tool.get_15_interval_time(
     v_time timestampwithtime zone defaultcurrent_timestamp,time_type charactervaryingdefault'before'::charactervarying)returnstimestampwithtime zone
    language plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:获取某个时间所在分钟0-15,15-30,30-45,45-60的先后时间
 * v_time:指定时间
 * time_type:时间类型,时间段的前一段则BEFORE(默认),时间段的后一段则AFTER
 * */declare
    p_v_time timestamptz := v_time;
    p_time_type varchar := upper(time_type);
    v_result timestamptz;beginif p_time_type notin('BEFORE','AFTER')then 
        raise exception 'p_time_type必须是BEFORE或者AFTER!';endif;if p_time_type ='BEFORE'then 
        v_result := to_timestamp(to_char(p_v_time,'yyyymmddhh24')||casewhen to_char(p_v_time,'mi')::numeric-0>=0and to_char(p_v_time,'mi')::numeric-0<15then'00'when to_char(p_v_time,'mi')::numeric-15>=0and to_char(p_v_time,'mi')::numeric-15<15then'15'when to_char(p_v_time,'mi')::numeric-30>=0and to_char(p_v_time,'mi')::numeric-30<15then'30'when to_char(p_v_time,'mi')::numeric-45>=0and  to_char(p_v_time,'mi')::numeric-45<15then'45'end||'00','yyyymmddhh24miss');
    elsif p_time_type ='AFTER'then 
        v_result := to_timestamp(to_char(p_v_time,'yyyymmddhh24')||casewhen to_char(p_v_time,'mi')::numeric-0>=0and to_char(p_v_time,'mi')::numeric-0<15then'00'when to_char(p_v_time,'mi')::numeric-15>=0and to_char(p_v_time,'mi')::numeric-15<15then'15'when to_char(p_v_time,'mi')::numeric-30>=0and to_char(p_v_time,'mi')::numeric-30<15then'30'when to_char(p_v_time,'mi')::numeric-45>=0and  to_char(p_v_time,'mi')::numeric-45<15then'45'end||'00','yyyymmddhh24miss')+interval'15 minutes';endif;return v_result;end;
$function$
;grantexecuteonfunction tool.get_15_interval_time(timestamptz,varchar)topublic;

wait_table

createorreplacefunction tool.wait_table(
     sql_statement charactervarying,check_freq integerdefault30,check_time integerdefault300)returnsintegerlanguage plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:等待某个sql执行返回结果集,只能是select语句,等待成功返回1,等待失败返回0
 * sql_statement:sql语句
 * check_freq:检测频率,默认每30s检测一次
 * check_time:最多检测多少秒,默认5分钟
 * */declare 
    p_sql_statement varchar := sql_statement;
    p_check_freq numeric := check_freq;
    p_check_time numeric := check_time;
    v_result int :=0;--返回结果
    v_sql varchar(32767);--动态sql
    v_id numeric;
    check_begin numeric :=0;
    v_hint varchar(400);begin--只能select语句if(trim(lower(p_sql_statement))~'^select')=falsethen
        raise exception 'sql_statement参数必须以select开头!';endif;--开始检测
    raise notice '----------开始检测----------';--动态sql
    v_sql :='select 1 from ('|| p_sql_statement ||') a limit 1';
    raise notice '检测语句为:%',v_sql;loop
        v_hint :='当前检测时间为:'||to_char(clock_timestamp(),'yyyy-mm-dd hh24:mi:ss');
        raise notice '%',v_hint;begin
            v_id := tool.is_return_result(v_sql);--判断v_id是否为空if v_id =1then--等表标识置为1
                v_result :=1;else--当前检测没有检测通过,则初始时间后移
                check_begin := check_begin + p_check_freq;endif;--若动态sql因为某表的不存在而产生异常,则不退出,继续下一次等表
            exception when others then
                check_begin := check_begin + p_check_freq;end;--退出循环的条件就是:等表超时或者等表标识为1exitwhen check_begin > p_check_time or v_result =1; 
        perform pg_sleep(p_check_freq);endloop;if v_result =1then 
        raise notice '----------等表成功----------';else
        raise notice '----------等表超时----------';
        raise exception '----------等表超时----------';endif;return v_result;end;
$function$
;grantexecuteonfunction tool.wait_table(varchar, int4, int4)topublic;

sp_wait

createorreplacefunction tool.sp_wait(
     tablename charactervarying,schemaname charactervarying,waittype charactervaryingdefault'real'::charactervarying,checkfreq integerdefault20,checktime integerdefault600)returns void
    language plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:等待某张表被sp_notify存过唤醒。
 * tablename : 等待的表名
 * schemaname : 表名对应的模式名
 * waittype : 等待类型 year month day hour real
 *     year:当年被唤醒过
 *     month:当月被唤醒过
 *     day:当天被唤醒过
 *     hour:当前所在的小时被唤醒过
 *     real:伪实时,0-15,15-30,30-45,45-60,当前分钟所在的分钟范围被唤醒过
 * checkfreq:检测频率,默认每隔20s检测一次
 * checktime:最多检测多少秒,默认检测600s
 * */declare 
    p_tablename varchar := lower(tablename);
    p_schemaname varchar := lower(schemaname);
    p_waittype varchar := lower(waittype);
    p_checkfreq int := checkfreq;
    p_checktime int := checktime;
    v_sql varchar;beginif p_tablename isnullthen 
        raise exception '参数tablename不能为空!';endif;if p_schemaname isnullthen 
        raise exception '参数schemaname不能为空!';endif;if p_waittype notin('year','month','day','hour','real')then 
        raise exception '参数waittype范围[''year'',''month'',''day'',''hour'',''real'']!';endif;if p_checkfreq >=61or p_checkfreq <=0then 
        raise exception '参数checkfreq范围 1 ~ 60 !';endif;if p_checktime >=1801or p_checktime <=0then 
        raise exception '参数checktime范围 1 ~ 1800 !';endif;
   
    v_sql := $v_sql$select1from tool.etl_task_notice
        where table_name ='$v_sql$||p_tablename||$v_sql$'and schema_name ='$v_sql$||p_schemaname||$v_sql$'and $v_sql$||case p_waittype
                when'year'then $v_sql$to_char(update_time,'yyyy')= to_char(current_timestamp,'yyyy')$v_sql$
                when'month'then $v_sql$to_char(update_time,'yyyymm')= to_char(current_timestamp,'yyyymm')$v_sql$
                when'hour'then $v_sql$to_char(update_time,'yyyymmddhh24')= to_char(current_timestamp,'yyyymmddhh24')$v_sql$
                when'real'then $v_sql$update_time >= tool.get_15_interval_time(current_timestamp,'BEFORE')and update_time <= tool.get_15_interval_time(current_timestamp,'AFTER')$v_sql$
            end;--raise notice '%',v_sql;
    perform tool.wait_table(v_sql,p_checkfreq,p_checktime);end;
$function$
;grantallonfunction tool.sp_wait(varchar,varchar,varchar, int4, int4)topublic;

sp_notify

createorreplacefunction tool.sp_notify(
     tablename charactervarying,schemaname charactervarying)returns void
    language plpgsql
    security defineras $function$
/*
 * 作者:v-yuzhenc
 * 功能:唤醒某个模式下的某个表,
 *       即通知该表已经etl完成
 * tablename:表名
 * schemaname:模式名
 * */declare 
    p_tablename tool.etl_task_notice.table_name%type := lower(tablename);
    p_schemaname tool.etl_task_notice.schema_name%type := lower(schemaname);beginif p_tablename isnullthen 
        raise exception '参数tablename不能为空';endif;if p_schemaname isnullthen 
        raise exception '参数schemaname不能为空';endif;insertinto tool.etl_task_notice(
         table_name 
        ,schema_name
    )values(
         p_tablename
        ,p_schemaname
    );end;
$function$
;grantallonfunction tool.sp_notify(varchar,varchar)topublic;

海豚调度

安装

Centos7.6集群部署海豚调度3.1.5

架构

在这里插入图片描述

搭建

对象对象实例说明租户root操作系统执行用户普通用户yuzhenchao姓名全拼普通用户yuxiaotan姓名全拼数据源dbselect@223.242.39.75:5432/dp海豚调度只有select权限的数据源数据源mdl@101.34.75.200:5432/etletl数据库中模型开发的数据源数据源apl@101.34.75.200:5432/etletl数据库中应用层开发的数据源项目海豚调度元数据建模对海豚调度的元数据进行etl处理项目海豚调度元数据应用开发——yuzhenchaoyuzhenchao的专属应用层项目,自己的报表或者应用自己维护,一个工作流一个应用一个调度项目海豚调度元数据应用开发——yuxiaotanyuxiaotan的专属应用层项目,自己的报表或者应用自己维护,一个工作流一个应用一个调度告警实例模型告警模型不管成功还是失败,都需要进行告警告警实例应用告警-yuzhenchao应用告警通知指定的人yuzhenchao,一般只告警失败的任务告警实例应用告警-yuxiaotan应用告警通知指定的人yuxiaotan,一般只告警失败的任务告警组模型告警直接与告警实例一一对应告警组应用告警-yuzhenchao直接与告警实例一一对应告警组应用告警-yuxiaotan直接与告警实例一一对应环境管理datax-execdatax的执行环境配置

创建租户root

在这里插入图片描述

创建普通用户

在这里插入图片描述
在这里插入图片描述

创建数据源

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建项目

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建告警实例(钉钉告警)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建告警组

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

创建datax环境

在这里插入图片描述

注:

  • 我这里选择的默认分组,所以需要在每台worker的机器上安装datax,并且让DATAX_HOME=/usr/local/datax
  • 数据库版本过高时,需要在对应的插件目录将旧版本的驱动删除,否则会连接失败

DataX / userGuid.md

授权管理

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

ETL过程

可回溯的etl过程——替代变量

变量名变量值变量说明today$[yyyyMMdd]今天yesterday$[yyyyMMdd-1]昨天day2before$[yyyyMMdd-2]2天前day3before$[yyyyMMdd-3]3天前day4before$[yyyyMMdd-4]4天前day5before$[yyyyMMdd-5]5天前day6before$[yyyyMMdd-6]6天前day7before$[yyyyMMdd-7]7天前

模型层工作流

ods层

  • ods层抽取数据,为了避免表产生死锁等待以及读脏数据,我们采用临时表方式进行数据抽取

  • 先创建临时表,将数据抽取到临时表,最终将临时表重命名为目标表
    源库名源表名目标模式目标表名表说明dpt_ds_projectmdlods_dp_t_ds_project项目表dpt_ds_process_definitionmdlods_dp_t_ds_process_definition工作流定义表dpt_ds_tenantmdlods_dp_t_ds_tenant租户表dpt_ds_schedulesmdlods_dp_t_ds_schedules调度表dpt_ds_process_task_relationmdlods_dp_t_ds_process_task_relation工作流任务关系表dpt_ds_task_definitionmdlods_dp_t_ds_task_definition任务定义表dpt_ds_process_instancemdlods_dp_t_ds_process_instance工作流实例表dpt_ds_task_instancemdlods_dp_t_ds_task_instance任务实例表dpt_ds_relation_process_instancemdlods_dp_t_ds_relation_process_instance存放的数据用于处理流程定义中含有子流程的情况dpt_ds_sessionmdlods_dp_t_ds_session会话表dpt_ds_usermdlods_dp_t_ds_user用户表dpt_ds_datasourcemdlods_dp_t_ds_datasource数据源表dpt_ds_access_tokenmdlods_dp_t_ds_access_token访问令牌表dpt_ds_relation_datasource_usermdlods_dp_t_ds_relation_datasource_user用户数据源关系表dpt_ds_queuemdlods_dp_t_ds_queue队列表

    fdw实现抽数(不建议)
  • 先选择项目在这里插入图片描述

  • 选择工作流定义在这里插入图片描述

  • 选择创建工作流在这里插入图片描述

  • 拖拉拽sql图标在这里插入图片描述

  • 在开发用户下开发ods的脚本,然后在海豚调度上配置任务,我们以表 t_ds_project举例

--通过该函数获取建表语句--select tool.get_ddl_remote_pg2pg('t_ds_project','public','tmp_ods_dp_t_ds_project');droptableifexists"tmp_ods_dp_t_ds_project";createtable"tmp_ods_dp_t_ds_project"("id"integernotnull,"name"charactervarying(100)null,"code"bigintnotnull,"description"charactervarying(255)null,"user_id"integernull,"flag"integernull,"create_time"timestamp without time zone null,"update_time"timestamp without time zone null,primarykey("id"));commentontable"tmp_ods_dp_t_ds_project"is'项目表';commentoncolumn"tmp_ods_dp_t_ds_project"."id"is'项目id';commentoncolumn"tmp_ods_dp_t_ds_project"."name"is'项目名称';commentoncolumn"tmp_ods_dp_t_ds_project"."code"is'项目编码';commentoncolumn"tmp_ods_dp_t_ds_project"."description"is'项目描述';commentoncolumn"tmp_ods_dp_t_ds_project"."user_id"is'用户id,对应t_ds_user.id';commentoncolumn"tmp_ods_dp_t_ds_project"."create_time"is'项目创建时间';commentoncolumn"tmp_ods_dp_t_ds_project"."update_time"is'项目最近更新时间';--创建映射表do $$
begin
    perform tool.sp_extract_mapping('t_ds_project','public','tmp_ods_dp_t_ds_project');end$$;--抽取数据insertinto tmp_ods_dp_t_ds_project
select*from tool.tmp_ods_dp_t_ds_project
;do $$
begin--增加last_pg_time时间字段altertable tmp_ods_dp_t_ds_project add last_pg_time timestampdefaultcurrent_timestamp;--空字符串处理成null
    perform tool.replace_to_null('tmp_ods_dp_t_ds_project');end$$;do $$
begin--如果ods表存在就删除droptableifexists ods_dp_t_ds_project;--表重命名altertable tmp_ods_dp_t_ds_project renameto ods_dp_t_ds_project;end$$;do $$
begin--ods_dp_t_ds_project
    perform tool.sp_notify('ods_dp_t_ds_project','mdl');--表分析analyze ods_dp_t_ds_project;end$$;

在这里插入图片描述

  • 点击保存在这里插入图片描述在这里插入图片描述
  • 点击上线然后运行工作流在这里插入图片描述在这里插入图片描述
  • 查看日志在这里插入图片描述
  • 其他同理,连线可以控制并发度在这里插入图片描述
datax实现抽数(建议)
  • 创建工作流,拖拉拽sql组件和datax组件在这里插入图片描述
  • 编写脚本,任务配置
  • 前置sql组件
droptableifexists"tmp_ods_dp_t_ds_project";createtable"tmp_ods_dp_t_ds_project"("id"integernotnull,"name"charactervarying(100)null,"code"bigintnotnull,"description"charactervarying(255)null,"user_id"integernull,"flag"integernull,"create_time"timestamp without time zone null,"update_time"timestamp without time zone null,primarykey("id"));commentontable"tmp_ods_dp_t_ds_project"is'项目表';commentoncolumn"tmp_ods_dp_t_ds_project"."id"is'项目id';commentoncolumn"tmp_ods_dp_t_ds_project"."name"is'项目名称';commentoncolumn"tmp_ods_dp_t_ds_project"."code"is'项目编码';commentoncolumn"tmp_ods_dp_t_ds_project"."description"is'项目描述';commentoncolumn"tmp_ods_dp_t_ds_project"."user_id"is'用户id,对应t_ds_user.id';commentoncolumn"tmp_ods_dp_t_ds_project"."create_time"is'项目创建时间';commentoncolumn"tmp_ods_dp_t_ds_project"."update_time"is'项目最近更新时间';

在这里插入图片描述

  • datax的select语句
--数据抽取select语句select"id","name","code","description","user_id","flag","create_time","update_time"from t_ds_project
;--后置操作do $$
begin--增加last_pg_time时间字段altertable tmp_ods_dp_t_ds_project add last_pg_time timestampdefaultcurrent_timestamp;--空字符串处理成null
    perform tool.replace_to_null('tmp_ods_dp_t_ds_project');end$$;do $$
begin--如果ods表存在就删除droptableifexists ods_dp_t_ds_project;--表重命名altertable tmp_ods_dp_t_ds_project renameto ods_dp_t_ds_project;end$$;do $$
begin--ods_dp_t_ds_project
    perform tool.sp_notify('ods_dp_t_ds_project','mdl');--表分析analyze ods_dp_t_ds_project;end$$;

在这里插入图片描述

  • 其他同理

在这里插入图片描述

  • 效率对比在这里插入图片描述

dw层——sql脚本

dim层——sql脚本

dm层——sql脚本

总控

应用层工作流

Magic-API统一接口平台

其他函数

array_position

createorreplacefunction tool.array_position(
     arrayint integer[],elementint integer,times integerdefault1)returnsintegerlanguage plpgsql
as $function$
/* 作者 : v-yuzhenc
 * 功能 : 返回数组指定元素所在的位置,未匹配到返回0
 * arrayint : 数组
 * element : 指定元素
 * times  : 第几次出现的位置
 * */declare 
    p_times int :=0;
    p_result int :=0;beginif array_length(arrayint,1)isnullthenreturn p_result;endif;for i in1..array_length(arrayint,1)loopif arrayint[i]= elementint then 
            p_times := p_times +1;endif;if p_times = times thenreturn i;endif;endloop;return p_result;end;
$function$
;grantexecuteonfunction tool.array_position(_int4, int4, int4)topublic;

replace_to_null

createorreplacefunction tool.replace_to_null(
     tablename charactervarying,schemaname charactervaryingdefault("current_user"())::charactervarying(64),execuser varchardefaultcurrent_user::varchar)returns void
    language plpgsql
    security defineras $function$
/* 作者 : v-yuzhenc
 * 功能:扫描指定表的所有varchar和text类型的字段,将字段值为''替换成null
 * tablename : 需要扫描的表名
 * schemaname : 需要扫描的模式名
 * */declare 
    p_tablename varchar := lower(tablename);
    p_schemaname varchar := lower(schemaname);
    p_execuser varchar(64) := execuser;--调用者
    existbj int :=0;--存在标记
    v_sql varchar;--动态sqlbeginif p_schemaname <> p_execuser then 
        raise exception '你只有权限操作自己模式下的表!';endif;--扫描varchar和text字段selectcount(1)into existbj
    from pg_class a
    innerjoin pg_namespace b
    on(a.relnamespace = b.oid)innerjoin pg_attribute c
    on(a.oid = c.attrelid)innerjoin pg_type d
    on(c.atttypid = d.oid)where c.attnum >0and d.typname in('varchar','text')and a.relname = p_tablename
        and b.nspname = p_schemaname;--若不存在varchar或者text字段,则不做处理if existbj =0then
        raise notice '%.%表不存在或者不需要处理空字符串!',p_schemaname,p_tablename;return;endif;--拼接处理空字符串语句select 
        string_agg('update '||p_tablename||' 
    set '||c.attname||' = null where '||c.attname||' = '''';',chr(10))into v_sql
    from pg_class a
    innerjoin pg_namespace b
    on(a.relnamespace = b.oid)innerjoin pg_attribute c
    on(a.oid = c.attrelid)innerjoin pg_type d
    on(c.atttypid = d.oid)where c.attnum >0and d.typname in('varchar','text')and a.relname = p_tablename
        and b.nspname = p_schemaname;--通过集中处理程序执行动态sql
    perform tool.sp_execsql(v_sql,p_schemaname);end;
$function$
;grantexecuteonfunction tool.replace_to_null(varchar,varchar,varchar)topublic;

sp_jzdb

createorreplacefunction tool.sp_jzdb(
     tablename charactervarying,oldschema charactervarying,newschema charactervarying,tablename_new charactervaryingdefaultnull::charactervarying,execuser varchardefaultcurrent_user::varchar)returns void
    language plpgsql
    security defineras $function$ 
/* 
 * 作者 : v-yuzhenc
 * 功能:集中导表,将指定模式下的表以新的表名导入到新的模式下
 * tablename : 指定模式下的表,不区分大小写
 * oldschema : 指定模式,不区分大小写
 * newschema : 新的模式,不区分大小写
 * tablename_new : 新的表名,不区分大小写,默认与旧表名相同
 * execuser : 调用者用户名,无需传参,默认值即可
 * */declare 
    p_tablename varchar(64) := lower(tablename);
    p_tablename_new varchar(64) :=coalesce(lower(tablename_new),p_tablename);
    p_oldschema varchar(64) := lower(oldschema);
    p_newschema varchar(64) := lower(newschema);
    p_execuser varchar(64) := execuser;--调用者用户名,无需人工传参
    jzdb_tname varchar(64) :='jzdb_'||p_tablename_new;
    existbj numeric;
    v_sql varchar;
    bak_tname varchar;begin--调用者可以将自己的表导入到其他模式--也可以将其他模式的表导入到自己模式if p_execuser <> p_oldschema and p_execuser <> p_newschema then 
        raise exception 'oldschema和newschema参数之一必须与当前用户名一致,因为只允许操作与自己相关的表';endif;--建表
    v_sql := tool.get_ddl(p_oldschema||'.'||p_tablename,'table',jzdb_tname);
    perform tool.sp_execsql(v_sql,p_newschema);--插入数据
    perform tool.sp_execsql('insert into "'||jzdb_tname||'" select * from "'||p_oldschema||'"."'||p_tablename||'";',p_newschema);--判断新模式下的新表是否有重名的selectcount(1)into existbj
    from pg_tables a 
    where a.tablename = p_tablename_new
        and a.schemaname = p_newschema;if existbj <>0then--如果有重名的表存在--则做备份,加前缀 o_模式名_表名_版本号selectcount(1)+1 v_no  --版本号into existbj
        from pg_tables a
        where substr(a.tablename,4+length(p_newschema),length(p_tablename_new))= p_tablename_new
            and a.schemaname ='tool';--新表名建在tool下,加前缀 o_模式名_表名_版本号
        bak_tname :='o_'||p_newschema||'_'||p_tablename_new||'_'||existbj::varchar;--建表
        v_sql := tool.get_ddl(p_newschema||'.'||p_tablename_new,'table',bak_tname);
        perform tool.sp_execsql(v_sql,'tool');--插入数据
        perform tool.sp_execsql('insert into "'||bak_tname||'" select * from "'||p_newschema||'"."'||p_tablename_new||'";','tool');--分析表
        perform tool.sp_execsql('analyze "'||bak_tname||'";','tool');
        perform tool.sp_execsql('drop table "'||p_tablename_new||'";',p_newschema);endif;--重命名表
    perform tool.sp_execsql('alter table "'||jzdb_tname||'" rename to '||p_tablename_new,p_newschema);--分析表
    perform tool.sp_execsql('analyze "'||p_tablename_new||'";',p_newschema);end;
$function$
;grantexecuteonfunction tool.sp_jzdb(varchar,varchar,varchar,varchar,varchar)topublic;

sp_sqlexec_efficient

createorreplacefunction tool.sp_sqlexec_efficient(
     sqlexec charactervarying,exectimes integerdefault1)returnscharactervaryinglanguage plpgsql
as $function$
/*
 * 作者:v-yuzhenc
 * 功能:sql执行效率检测,事物不会提交,
 *       返回每次执行时间和平均执行时间
 * sqlexec:需要检测的sql
 * exectimes:需要检测的次数,范围1-10
 * */declare 
    begin_time timestamp;
    end_time timestamp;
    exec_duration interval;
    v_result varchar :='';
    p_sqlexec varchar := sqlexec;
    p_exectimes int := exectimes;
    exec_begin int :=0;
    sum_sqlexec interval :='00:00:00.000000'::interval;
    sqlexec_avg interval;beginif p_exectimes <=0or p_exectimes >=11then 
        raise exception 'exectimes参数值范围为1~10';endif;while exec_begin >=0and exec_begin <= p_exectimes-1loopbegin
            begin_time := clock_timestamp();execute p_sqlexec;
            end_time = clock_timestamp();
            raise exception '回滚'using errcode ='12345';
            exception when sqlstate '12345'thennull;end;
        exec_duration = end_time-begin_time;
        v_result := v_result||to_char(begin_time,'yyyy-mm-dd hh24:mi:ss.us')||'~'||to_char(end_time,'yyyy-mm-dd hh24:mi:ss.us')||':'||exec_duration::varchar||';'||chr(10);
        sum_sqlexec := sum_sqlexec + exec_duration;
        exec_begin := exec_begin +1;endloop;
    sqlexec_avg := sum_sqlexec/p_exectimes;
    v_result := v_result||'avg:'||sqlexec_avg::varchar||';';return v_result;end;
$function$
;grantexecuteonfunction tool.sp_sqlexec_efficient(varchar, int4)topublic;

本文转载自: https://blog.csdn.net/qq_33445829/article/details/130485788
版权归原作者 sqlboy-yuzhenc 所有, 如有侵权,请联系我们删除。

“基于postgresql传统数据仓库搭建”的评论:

还没有评论