1.背景
目前很多政府项目都在往信创上面迁移,实时处理这块就会涉及到信创的数据库,本文就是通过FlinkCDC来对人大金仓(kingbase)数据库进行处理。
FlinkCDC本身不支持对Kingbase数据库的CDC,但是Kingbase底层是基于Postgresql数据开发,FlinkCDC已经支持了Postgresql的CDC,所以基于 Postgresql的CDC进行修改。
2.设计思路
获取 flink-cdc-connectors 项目
拷贝一份 flink-connector-postgres-cdc 重命名 flink-connector-kb-cdc 进行修改
获取 debezium-v1.9.8.Final 项目
拷贝一份 debezium-connector-postgres 重命名 debezium-connector-kb 进行修改
具体修改哪些这里不详细介绍,最后附上jar包
3.使用测试
本方法是基于逻辑复制的方式实现
PostgreSQL 目前不支持DDL解析,只能解析DML(INSERT、UPDATE、DELETE).
3.1 在Kingbase数据库设置
3.1.1 建测试表
CREATE TABLE "public"."test_001" (
"a" int8 NOT NULL,
"b" timestamp,
PRIMARY KEY ("a")
)
;
3.1.2 设置 wal_level
*执行语句SHOW wal_level,查看返回值是否为'logical',否则进行修改wal日志方式wal_level = logical
3.1.3 创建复制槽
-- 创建逻辑复制槽
SELECT * FROM pg_create_logical_replication_slot('replica', 'decoderbufs');
-- 逻辑解码
SELECT * FROM pg_logical_slot_peek_changes('replica', NULL, 4096, 'debug-mode', '1');
目前测试不支持pgoutput,decoderbufs,wal2json
3.1.4 设置发布
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
3.1.5 更改复制标识
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 test_001 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE test_001 REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='test_001';
3.2 在原来的FlinkCdc项目的pom.xml中添加
<!-- kingbasees驱动 -->
<dependency>
<groupId>cn.com.kingbase</groupId>
<artifa
版权归原作者 cai yinting 所有, 如有侵权,请联系我们删除。