文章目录
一.前置工作
1.更改配置文件postgresql.conf
# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots =20# max number of replication slots# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders =20# max number of walsender processes# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值
更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改
2.新建用户并且给用户复制流权限
-- pg新建用户CREATEUSER hadoop WITH PASSWORD '***';-- 给用户复制流权限ALTER ROLE hadoop replication;-- 给用户数据库权限grantCONNECTONDATABASE hadoop to hadoop;-- 把当前库所有表查询权限赋给用户GRANTSELECTONALLTABLESINSCHEMApublicTO hadoop;
3.发布表
-- 设置发布为trueupdate pg_publication set puballtables=truewhere pubname isnotnull;-- 把所有表进行发布CREATE PUBLICATION dbz_publication FORALLTABLES;-- 查询哪些表已经发布select*from pg_publication_tables;
二.java代码示例
importcom.alibaba.ververica.cdc.connectors.postgres.PostgreSQLSource;importcom.yogorobot.gmall.realtime.function.MyDebezium;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importjava.time.Duration;importjava.util.Properties;publicclassFlink_CDCWIthProduct{privatestaticfinallongDEFAULT_HEARTBEAT_MS=Duration.ofMinutes(5).toMillis();//功能:测试实时读取pgsql数据publicstaticvoidmain(String[] args)throwsException{//TODO 创建执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();Properties properties =newProperties();
properties.setProperty("snapshot.mode","never");
properties.setProperty("debezium.slot.name","pg_cdc");
properties.setProperty("debezium.slot.drop.on.stop","true");
properties.setProperty("include.schema.changes","true");//使用连接器配置属性启用定期心跳记录生成
properties.setProperty("heartbeat.interval.ms",String.valueOf(DEFAULT_HEARTBEAT_MS));//TODO 创建Flink-PgSQL-CDC的Source 读取生产环境pgsql数据库SourceFunction<String> pgsqlSource =PostgreSQLSource.<String>builder().hostname("pgr-***.pg.rds.aliyuncs.com").port(1921).database("jarvis_ticket")// monitor postgres database.schemaList("jarvis_ticket")// monitor inventory schema.tableList("jarvis_ticket.t_category")// monitor products table.username("***").password("***")//反序列化.deserializer(newMyDebezium())//标准逻辑解码输出插件.decodingPluginName("pgoutput")//配置.debeziumProperties(properties).build();//TODO 使用CDC Source从PgSQL读取数据DataStreamSource<String> pgsqlDS = env.addSource(pgsqlSource);//TODO 将数据输出到kafka中//pgsqlDS.addSink(MyKafkaUtil.getKafkaSink("***"));//TODO 打印到控制台
pgsqlDS.print();//TODO 执行任务
env.execute();}}
三.new MyDebezium代码示例
importcom.alibaba.fastjson.JSONObject;importcom.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;importio.debezium.data.Envelope;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.util.Collector;importorg.apache.kafka.connect.data.Field;importorg.apache.kafka.connect.data.Schema;importorg.apache.kafka.connect.data.Struct;importorg.apache.kafka.connect.source.SourceRecord;importjava.util.List;publicclassMyDebeziumimplementsDebeziumDeserializationSchema<String>{@Overridepublicvoiddeserialize(SourceRecord sourceRecord,Collector<String> collector)throwsException{//1.创建一个JSONObject用来存放最终封装好的数据JSONObject result =newJSONObject();//2.获取数据库以及表名String topic = sourceRecord.topic();String[] split = topic.split("\\.");//数据库名String schema = split[1];//表名String tableName = split[2];//4.获取数据Struct value =(Struct) sourceRecord.value();//5.获取before数据Struct structBefore = value.getStruct("before");JSONObject beforeJson =newJSONObject();if(structBefore !=null){Schema schemas = structBefore.schema();List<Field> fields = schemas.fields();for(Field field : fields){
beforeJson.put(field.name(), structBefore.get(field));}}//6.获取after数据Struct structAfter = value.getStruct("after");JSONObject afterJson =newJSONObject();if(structAfter !=null){Schema schemas = structAfter.schema();List<Field> fields = schemas.fields();for(Field field : fields){
afterJson.put(field.name(), structAfter.get(field));}}String type="update";if(structBefore==null){
type="insert";}if(structAfter==null){
type="delete";}//将数据封装到JSONObject中
result.put("schema", schema);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);//将数据发送至下游
collector.collect(result.toJSONString());}@OverridepublicTypeInformation<String>getProducedType(){returnBasicTypeInfo.STRING_TYPE_INFO;}}
本文转载自: https://blog.csdn.net/qq_37698495/article/details/127771480
版权归原作者 The Great Ant 所有, 如有侵权,请联系我们删除。
版权归原作者 The Great Ant 所有, 如有侵权,请联系我们删除。