文章目录
1. 环境信息
类型版本/描述docker20.10.9Postgresql10.6初始化账号密码:postgres/postgres
普通用户:test1/test123
数据库:test_dbflink1.13.6
2. 安装
step1: 拉取 PostgreSQL 10.6 版本的镜像:
docker pull postgres:10.6
step2:创建并启动
PostgreSQL
容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为
postgres
,并将
pgoutput
插件加载到
PostgreSQL
实例中:
docker run -d-p30028:5432 --name postgres-10.6 -ePOSTGRES_PASSWORD=postgres postgres:10.6 -c'shared_preload_libraries=pgoutput'
step3: 查看容器是否创建成功:
dockerps|grep postgres-10.6
3. 配置
step1:docker进去Postgresql数据的容器:
dockerexec-it postgres-10.6 bash
step2:编辑
postgresql.conf
配置文件:
vi /var/lib/postgresql/data/postgresql.conf
配置内容如下:
# 更改wal日志方式为logical(方式有:minimal、replica 、logical )
wal_level = logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots =20# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders =20# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s
step3:重启容器:
docker restart postgres-10.6
连接数据库,如果查询一下语句,返回
logical
表示修改成功:
SHOW wal_level;
4. 新建用户并赋权
使用创建容器时的账号密码(
postgres/postgres
)登录Postgresql数据库。
先创建数据库和表:
-- 创建数据库 test_dbCREATEDATABASE test_db;-- 连接到新创建的数据库 test_db
\c test_db
-- 创建 t_user 表CREATETABLE"public"."t_user"("id" int8 NOTNULL,"name"varchar(255),"age" int2,PRIMARYKEY("id"));
新建用户并且给用户权限:
-- pg新建用户CREATEUSER test1 WITH PASSWORD 'test123';-- 给用户复制流权限ALTER ROLE test1 replication;-- 给用户登录数据库权限GRANTCONNECTONDATABASE test_db to test1;-- 把当前库public下所有表查询权限赋给用户GRANTALLPRIVILEGESONALLTABLESINSCHEMApublicTO test1;
5. 发布表
-- 设置发布为trueupdate pg_publication set puballtables=truewhere pubname isnotnull;-- 把所有表进行发布CREATE PUBLICATION dbz_publication FORALLTABLES;-- 查询哪些表已经发布select*from pg_publication_tables;
更改表的复制标识包含更新和删除的值:
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)ALTERTABLE t_user REPLICA IDENTITYFULL;-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)select relreplident from pg_class where relname='t_user';
6. flink sql
-- 源表定义CREATETABLE`table_source_pg`(
id BIGINT,
name STRING,
age INT)WITH('connector'='postgres-cdc','hostname'='10.194.183.120','port'='30028','username'='test1','password'='test123','database-name'='test_db','schema-name'='public','table-name'='t_user','decoding.plugin.name'='pgoutput')-- 目标表表定义CREATETABLE`table_sink_mysql`(
id BIGINT,
name STRING,
age INT,PRIMARYKEY(`id`)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://10.194.183.120:30306/test','username'='root','password'='root','table-name'='t_user_copy')-- insert语句INSERTINTO`table_sink_mysql`(`id`,`name`,`age`)(SELECT`id`,`name`,`age`FROM`table_source_pg`)
7. 命令汇总
-- pg新建用户CREATEUSER test1 WITH PASSWORD 'test123';-- 给用户复制流权限ALTER ROLE ODPS_ETL replication;-- 给用户数据库权限GRANTCONNECTONDATABASE test_db to test1;-- 设置发布开关update pg_publication set puballtables=truewhere pubname isnotnull;-- 把所有表进行发布CREATE PUBLICATION dbz_publication FORALLTABLES;-- 查询哪些表已经发布select*from pg_publication_tables;-- 给表查询权限grantselectonTABLE aa to ODPS_ETL;-- 给用户读写权限grantselect,insert,update,deleteONALLTABLESINSCHEMApublicto bd_test;-- 把当前库所有表查询权限赋给用户GRANTSELECTONALLTABLESINSCHEMApublicTO ODPS_ETL;-- 把当前库以后新建的表查询权限赋给用户alterdefaultprivilegesinschemapublicgrantselectontablesto ODPS_ETL;-- 更改复制标识包含更新和删除之前值ALTERTABLE test0425 REPLICA IDENTITYFULL;-- 查看复制标识select relreplident from pg_class where relname='test0425';-- 查看solt使用情况SELECT*FROM pg_replication_slots;-- 删除soltSELECT pg_drop_replication_slot('zd_org_goods_solt');-- 查询用户当前连接数select usename,count(*)from pg_stat_activity groupby usename orderbycount(*)desc;-- 设置用户最大连接数alter role odps_etl connection limit200;
附:
版权归原作者 杨林伟 所有, 如有侵权,请联系我们删除。