0


FlinkCDC支持人大金仓

1.背景

    目前很多政府项目都在往信创上面迁移,实时处理这块就会涉及到信创的数据库,本文就是通过FlinkCDC来对人大金仓(kingbase)数据库进行处理。

    FlinkCDC本身不支持对Kingbase数据库的CDC,但是Kingbase底层是基于Postgresql数据开发,FlinkCDC已经支持了Postgresql的CDC,所以基于 Postgresql的CDC进行修改。

2.设计思路

    获取 flink-cdc-connectors 项目

    拷贝一份 flink-connector-postgres-cdc 重命名 flink-connector-kb-cdc 进行修改

    获取 debezium-v1.9.8.Final 项目

    拷贝一份 debezium-connector-postgres 重命名 debezium-connector-kb 进行修改

具体修改哪些这里不详细介绍,最后附上jar包

3.使用测试

本方法是基于逻辑复制的方式实现
PostgreSQL 目前不支持DDL解析,只能解析DML(INSERT、UPDATE、DELETE).

3.1 在Kingbase数据库设置

3.1.1 建测试表

CREATE TABLE "public"."test_001" (
  "a" int8 NOT NULL,
  "b" timestamp,
  PRIMARY KEY ("a")
)
;

3.1.2 设置 wal_level
*执行语句SHOW wal_level,查看返回值是否为'logical',否则进行修改wal日志方式wal_level = logical

3.1.3 创建复制槽

 -- 创建逻辑复制槽
 SELECT * FROM pg_create_logical_replication_slot('replica', 'decoderbufs');
 -- 逻辑解码
 SELECT * FROM pg_logical_slot_peek_changes('replica', NULL, 4096, 'debug-mode', '1');
 目前测试不支持pgoutput,decoderbufs,wal2json

3.1.4 设置发布

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布
select * from pg_publication_tables;

3.1.5 更改复制标识

-- 更改复制标识包含更新和删除之前值(目的是为了确保表 test_001 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE test_001 REPLICA IDENTITY FULL;

-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='test_001';

3.2 在原来的FlinkCdc项目的pom.xml中添加

        <!-- kingbasees驱动 -->
        <dependency>
            <groupId>cn.com.kingbase</groupId>
            <artifa

本文转载自: https://blog.csdn.net/weixin_43966292/article/details/142255849
版权归原作者 cai yinting 所有, 如有侵权,请联系我们删除。

“FlinkCDC支持人大金仓”的评论:

还没有评论