文章目录
操作系统:ubuntu-22.04,运行于wsl 2【
注意,请务必使用wsl 2;wsl 1会出现各种各样的问题】
软件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0
一、PostgreSQL作为数据来源(source),由flink读取
1.postgre安装与配置
已有postgre的跳过此步
(1)pg安装
https://zhuanlan.zhihu.com/p/143156636
sudoaptinstall postgresql
sudo-u postgres psql -c"SELECT version();"sudo-u postgres psql # 连接进入postgre shell(以管理员用户)
(2)pg配置
# 创建新用户和数据库sudosu - postgres -c"createuser domeya"sudosu - postgres -c"createdb domeya_db"sudo-u postgres psql # 进入psql(管理员用户postgres)
grant all privileges on database domeya_db to domeya;# 授权用户操作数据库\password # 设置当前用户密码(\password domeya,可设置用户domeya的密码)\q # 退出psql# psql postgres://username:password@host:port/dbname
psql postgres://domeya:123@localhost:5432/domeya_db # 新用户测试连接
可能出现的问题
sudo -u postgres psql
报错:
psql: error: connection to server on socket “/var/run/postgresql/.s.PGSQL.5432” failed: No such file or directory
Is the server running locally and accepting connections on that socket?
https://blog.csdn.net/psiitoy/article/details/7310003
【解决关键】:重启pg服务
# 重启sudoservice postgresql restart # 重要!ps-ef|grep postgres
(可选)重装pg
# 卸载
dpkg --list|grep postgresql
dpkg --purge postgresql postgresql-14 postgresql-client-14 postgresql-client-common postgresql-common # 根据dpkg --list | grep postgresql中展示的结果进行填写# rm -rf /var/lib/postgresql/# 重装sudoaptinstall postgresql
2.flink安装与配置
已有flink的跳过此步
flink安装,配置环境变量
# https://flink.apache.org/downloads/curl-O-L https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar zxvf flink-1.17.1-bin-scala_2.12.tgz -C /opt
sudovim /etc/profile.d/flink.sh
# flink.shexportFLINK_HOME=/opt/flink-1.17.1
exportPATH=$PATH:$FLINK_HOME/bin
source /etc/profile
如果webUI无法外机访问把
rest.bind-address: 0.0.0.0
这个设置放开权限即可
cd$FLINK_HOME/conf
cp flink-conf.yaml flink-conf.yaml.backup
vim flink-conf.yaml
# 修改以下设置
rest.bind-address: 0.0.0.0
启动flink
cd$FLINK_HOME
./bin/start-cluster.sh # 启动flink
jps # 查看是否启动StandaloneSessionClusterEntrypoint, TaskManagerRunner# ./bin/stop-cluster.sh # 关闭flink
3.flink cdc postgre配置
3.1 postgre配置(for flink cdc)
https://www.cnblogs.com/xiongmozhou/p/14817641.html
(1)修改配置文件
cd /etc/postgresql/14/main
cp postgresql.conf postgresql.conf.backup
vim postgresql.conf
postgresql.conf
修改几个关键配置如下:
# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots =20# max number of replication slots# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders =20# max number of walsender processes# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
修改完之后重启postgresql,
service postgresql restart
(2)赋予权限
以管理员进入psql,
sudo -u postgres psql
(可选)如果没有测试表,可以新建一个
-- 如果没有测试表,可创建一个CREATETABLE test_table1(
id varchar(8),
p_dt varchar(8));-- 查看表
\d
insertinto test_table1 values('1','20230820');select*from test_table1;
赋予普通用户复制流权限、发布表、更改表的复制标识包含更新和删除的值
-- 给用户复制流权限ALTER ROLE domeya replication;-- 查看权限
\du
\c domeya_db -- 重要:进入到domeya_db数据库(以管理员账号进入)-- 设置发布为trueupdate pg_publication set puballtables=truewhere pubname isnotnull;-- 把所有表进行发布(包括以后新建的表);-- 注意,此处PUBLICATION名字必须为dbz_publication,否则后续flink sql报错must be superuser to create FOR ALL TABLES publicationCREATE PUBLICATION dbz_publication FORALLTABLES;-- 查询哪些表已经发布select*from pg_publication_tables;-- 更改复制标识包含更新和删除之前值ALTERTABLE test_table1 REPLICA IDENTITYFULL;-- 对应前面创建的测试表-- 查看复制标识(为f标识说明设置成功)select relreplident from pg_class where relname='test_table1';-- 对应前面创建的测试表-- 退出
\q
wal_level = logical源表的数据修改时,默认的逻辑复制流只包含历史记录的primary key,如果需要输出更新记录的历史记录的所有字段,需要在表级别修改参数:ALTER TABLE tableName REPLICA IDENTITY FULL; 这样才能捕获到源表所有字段更新后的值
发布所有表可能太多,也可以创建publication,添加指定表到publication。
update pg_publication set puballtables=falsewhere pubname isnotnull;-- 默认发布所有表为falseCREATE PUBLICATION flink_cdc_publication;alter publication flink_cdc_publication addtable test_table1;select*from pg_publication;select*from pg_publication_tables;
3.2 flink cdc postgres的jar包下载
下载flink cdc postgres相关jar包,放在
$FLINK_HOME/lib
cd$FLINK_HOME/lib
# 以下用于flink cdc postgres连接# 注意:用于flink sql的jar包是flink-sql-connector-postgres-cdc,不是flink-connector-postgres-cdc# https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc/2.4.0wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.4.0/flink-sql-connector-postgres-cdc-2.4.0.jar
如果flink在运行状态,需要重启flink,之后再启动flink sql client
cd$FLINK_HOME
./bin/stop-cluster.sh
./bin/start-cluster.sh
4.flink cdc postgre测试
启动flink sql client(之前重启了flink cluster)
cd$FLINK_HOME
./bin/sql-client.sh
在flink sql client创建表,与pg中的表结构对应,表名字可以不同
CREATETABLE source_table (
id STRING,
p_dt STRING
)WITH('connector'='postgres-cdc','hostname'='localhost','port'='5432','username'='domeya','password'='123','database-name'='domeya_db','schema-name'='public','table-name'='test_table1','slot.name'='flink',-- experimental feature: incremental snapshot (default off)-- 'scan.incremental.snapshot.enabled' = 'true''decoding.plugin.name'='pgoutput'-- 必须加,否则报错could not access file "decoderbufs");select*from source_table;
可能出现的问题
运行
select * from source_table;
时报错
报错1:
[ERROR] Could not execute SQL statement. Reason:
org.postgresql.util.PSQLException: ERROR: could not access file “decoderbufs”: No such file or directory
https://github.com/ververica/flink-cdc-connectors/issues/37
table sql加:
WITH('decoding.plugin.name' = 'pgoutput')
【flink sql】
dataStream加:
PostgreSQLSource.<String>builder().decodingPluginName("pgoutput").build()
报错2:
[ERROR] Could not execute SQL statement. Reason:
org.postgresql.util.PSQLException: ERROR: must be superuser to create FOR ALL TABLES publication
https://gist.github.com/alexhwoods/4c4c90d83db3c47d9303cb734135130d
检查之前的操作(psql):
CREATE PUBLICATION dbz_publication FORALLTABLES;select*from pg_publication_tables;
报错3:
Caused by: org.postgresql.util.PSQLException: ERROR: replication slot “flink” already exists
https://zhuanlan.zhihu.com/p/449066277
当上面debezium.slot.name的值超过20个,就会报错,即使之前的job已经下线,这个slot文件依旧在,此时需要执行下面语句并删除slot即可:
psql:
-- https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATIONSELECT slot_name, slot_type, active FROM pg_replication_slots;SELECT pg_drop_replication_slot('flink');# 这个和之前flink sql中的'slot.name' = 'flink'对应
注意,flink postgres-cdc只能读(作为source),不能写(作为sink)
Flink SQL>insertinto source_table values('3','20230820');[ERROR] Could notexecuteSQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'postgres-cdc' can only be used as a source. It cannot be used as a sink.
二、Tidb作为数据去向(sink),由flink写入
1.tidb安装与配置
已有tidb的跳过此步
https://docs.pingcap.com/zh/tidb/stable/quick-start-with-tidb
su xxx # 切换到你的普通用户curl--proto'=https'--tlsv1.2-sSf https://tiup-mirrors.pingcap.com/install.sh |shsource /home/xxx/.bashrc # 按上个命令输出的路径来,上面显示的是Shell profile: /home/xxx/.bashrc
tiup playground # 下载镜像,并启动某个版本的集群
- 以这种方式执行的 playground,在结束部署测试后 TiUP 会清理掉原集群数据,重新执行该命令后会得到一个全新的集群。
- 若希望持久化数据,可以执行 TiUP 的
--tag
参数:tiup --tag <your-tag> playground ...
下载完毕,启动成功之后展示信息:
Connect TiDB: mysql --comments --host 127.0.0.1 --port 4000 -u root
TiDB Dashboard: http://127.0.0.1:2379/dashboard
Grafana: http://127.0.0.1:3000
连接tidb
# 使用mysql client连接tidbsudoaptinstall mysql-client
mysql --comments--host127.0.0.1 --port4000-u root
# 设置root密码 # https://docs.pingcap.com/zh/tidb/stable/user-account-management#%E8%AE%BE%E7%BD%AE%E5%AF%86%E7%A0%81# https://blog.csdn.net/qq_45675449/article/details/106866700
SET PASSWORD FOR 'root'@'%'='123';# root的localhost是%,可通过 select user,host from mysql.user; 查看exit;# mysql -uroot -p无法连接,必须加上port和host,并且host不能写成localhost# https://blog.csdn.net/hjf161105/article/details/78850658
mysql -uroot--port4000-h127.0.0.1 -p
2.flink cdc tidb的jar包下载
下载用于jdbc mysql连接的jar包,用于flink cdc tidb连接。
特别注意:Tidb的sink模式得用jdbc+mysql连接,不用官方提供的tidb cdc因为其不能作为sink,只能曲线救国参考这种方法了。
cd$FLINK_HOME/lib
# 以下用于jdbc mysql(用于flink cdc tidb连接)# https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc/3.1.1-1.17wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
# https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.1.0/mysql-connector-j-8.1.0.jar
如果flink在运行状态,需要重启flink,之后再启动flink sql client
cd$FLINK_HOME
./bin/stop-cluster.sh
./bin/start-cluster.sh
3.flink cdc tidb测试
基于Flink CDC实时同步数据(MySQL到MySQL)
flink cdc tidb 官方文档demo(无法作为sink,只能作为source)
(可选)tidb创建测试表
# mysql -uroot --port 4000 -h 127.0.0.1 -p# 创建测试表CREATETABLE test.test_table1(
id varchar(8),
p_dt varchar(8));insertinto test.test_table1 values('3','20230819');
启动flink sql client(之前重启了flink cluster)
cd$FLINK_HOME
./bin/sql-client.sh
flink sql连接tidb,仿照mysql的连接
-- checkpoint every 3000 milliseconds SET'execution.checkpointing.interval'='3s';-- register a TiDB table in Flink SQLCREATETABLE sink_table (
id STRING,
p_dt STRING,PRIMARYKEY(id)NOT ENFORCED
-- 必须写PRIMARY KEY,否则报错:[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.)WITH('connector'='jdbc','url'='jdbc:mysql://localhost:4000/test','driver'='com.mysql.cj.jdbc.Driver','username'='root','password'='123','table-name'='test_table1');-- read snapshot and binlogs from tableSELECT*FROM sink_table;
三、用Flink SQL Client同步PostgreSQL到Tidb
# 将会提交一个作业,进行source_table->sink_table的单向同步insertinto sink_table select*from source_table;
[INFO] Submitting SQL update statement to the cluster…
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8e47bfa3ea78da4c47b395f7517c2812
在flink web ui上可以看到作业运行状态。
只要这个作业是正常runnning,那么对source_table的任何修改都会同步到sink_table。注意这种是单向同步,source_table的变动(增/删/改)会同步到sink_table,但反过来sink_table的变动不会影响到source_table(不会触发source_table->sink_table的同步)。
版权归原作者 nefu-ljw 所有, 如有侵权,请联系我们删除。