0


flink postgresql cdc实时同步(含pg安装配置等)

文章目录

1. 环境信息

类型版本/描述docker20.10.9Postgresql10.6初始化账号密码:postgres/postgres
普通用户:test1/test123
数据库:test_dbflink1.13.6

2. 安装

step1: 拉取 PostgreSQL 10.6 版本的镜像:

docker pull postgres:10.6

step2:创建并启动

PostgreSQL

容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为

postgres

,并将

pgoutput

插件加载到

PostgreSQL

实例中:

docker run -d-p30028:5432 --name postgres-10.6 -ePOSTGRES_PASSWORD=postgres postgres:10.6 -c'shared_preload_libraries=pgoutput'

step3: 查看容器是否创建成功:

dockerps|grep postgres-10.6

3. 配置

step1:docker进去Postgresql数据的容器:

dockerexec-it postgres-10.6  bash

step2:编辑

postgresql.conf

配置文件:

vi /var/lib/postgresql/data/postgresql.conf 

配置内容如下:

# 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
wal_level = logical  

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots =20# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders =20# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s              

step3:重启容器:

docker restart postgres-10.6

连接数据库,如果查询一下语句,返回

logical

表示修改成功:

SHOW wal_level;

4. 新建用户并赋权

使用创建容器时的账号密码(

postgres/postgres

)登录Postgresql数据库。

先创建数据库和表:

-- 创建数据库 test_dbCREATEDATABASE test_db;-- 连接到新创建的数据库 test_db
\c test_db

-- 创建 t_user 表CREATETABLE"public"."t_user"("id" int8 NOTNULL,"name"varchar(255),"age" int2,PRIMARYKEY("id"));

新建用户并且给用户权限:

-- pg新建用户CREATEUSER test1 WITH PASSWORD 'test123';-- 给用户复制流权限ALTER ROLE test1 replication;-- 给用户登录数据库权限GRANTCONNECTONDATABASE test_db to test1;-- 把当前库public下所有表查询权限赋给用户GRANTALLPRIVILEGESONALLTABLESINSCHEMApublicTO test1;

5. 发布表

-- 设置发布为trueupdate pg_publication set puballtables=truewhere pubname isnotnull;-- 把所有表进行发布CREATE PUBLICATION dbz_publication FORALLTABLES;-- 查询哪些表已经发布select*from pg_publication_tables;

更改表的复制标识包含更新和删除的值:

-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)ALTERTABLE t_user REPLICA IDENTITYFULL;-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)select relreplident from pg_class where relname='t_user';

6. flink sql

-- 源表定义CREATETABLE`table_source_pg`(
      id BIGINT,
      name STRING,
      age INT)WITH('connector'='postgres-cdc','hostname'='10.194.183.120','port'='30028','username'='test1','password'='test123','database-name'='test_db','schema-name'='public','table-name'='t_user','decoding.plugin.name'='pgoutput')-- 目标表表定义CREATETABLE`table_sink_mysql`(
      id BIGINT,
      name STRING,
      age INT,PRIMARYKEY(`id`)NOT ENFORCED
      )WITH('connector'='jdbc','url'='jdbc:mysql://10.194.183.120:30306/test','username'='root','password'='root','table-name'='t_user_copy')-- insert语句INSERTINTO`table_sink_mysql`(`id`,`name`,`age`)(SELECT`id`,`name`,`age`FROM`table_source_pg`)

7. 命令汇总

-- pg新建用户CREATEUSER test1 WITH PASSWORD 'test123';-- 给用户复制流权限ALTER ROLE ODPS_ETL replication;-- 给用户数据库权限GRANTCONNECTONDATABASE test_db to test1;-- 设置发布开关update pg_publication set puballtables=truewhere pubname isnotnull;-- 把所有表进行发布CREATE PUBLICATION dbz_publication FORALLTABLES;-- 查询哪些表已经发布select*from pg_publication_tables;-- 给表查询权限grantselectonTABLE aa to ODPS_ETL;-- 给用户读写权限grantselect,insert,update,deleteONALLTABLESINSCHEMApublicto bd_test;-- 把当前库所有表查询权限赋给用户GRANTSELECTONALLTABLESINSCHEMApublicTO ODPS_ETL;-- 把当前库以后新建的表查询权限赋给用户alterdefaultprivilegesinschemapublicgrantselectontablesto ODPS_ETL;-- 更改复制标识包含更新和删除之前值ALTERTABLE test0425 REPLICA IDENTITYFULL;-- 查看复制标识select relreplident from pg_class where relname='test0425';-- 查看solt使用情况SELECT*FROM pg_replication_slots;-- 删除soltSELECT pg_drop_replication_slot('zd_org_goods_solt');-- 查询用户当前连接数select usename,count(*)from pg_stat_activity groupby usename orderbycount(*)desc;-- 设置用户最大连接数alter role odps_etl connection limit200;

附:


本文转载自: https://blog.csdn.net/qq_20042935/article/details/131826615
版权归原作者 杨林伟 所有, 如有侵权,请联系我们删除。

“flink postgresql cdc实时同步(含pg安装配置等)”的评论:

还没有评论