0


PostgreSQL内核源码分析 逻辑复制基本流程,发布订阅创建背后的故事

逻辑复制代码框架

专栏内容

  • postgresql使用入门基础
  • 手写数据库toadb
  • 并发编程

个人主页:我的主页
管理社区:开源数据库
座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物.

✅ 🔥🔥🔥重大消息🔥🔥🔥 ❤️❤️❤️❤️ 关注公众号【开源无限】可免费领取《手写数据库内核toadb》源代码一份 ❤️❤️❤️❤️

一、概述


在我们使用数据库时,往往需要感知数据库中某些数据库对象的变化,比如表中insert/update操作使数据发生了变化。

为了及时得到数据的变化,我们常常会开启一个循环和定时器,不断的查询比较,这个工作非常耗时和容易出错,还不是很准确,令人非常头疼。

在Postgresql中有两种实时的复制模式:

  • 一种对文件内容的复制,也就是文件中二进制数据直接复制,也称为物理复制;
  • 另一种是对数据库对象的复制,数据库对象比如table, database都是逻辑概念,所以也称为逻辑复制;

逻辑复制本身就是基于数据库对象的变化,当有变化时就需要产生复制事件,这一特性刚好就可以用来作为数据库对象的变化事件,这样只需要订阅这一事件即可,由数据库来检查数据库对象的变化,并通知我们,即省力就及时。

PostgreSQL中如何实现逻辑复制功能呢?我们从几个方面来逐层展开介绍,首先介绍一下逻辑复制的代码结构,再来看一下产生通知的流程,以及如何应用到备份。

本文就来分享一下逻辑复制的代码框架结构,在整体上对逻辑复制有初步的认识。

二、 创建发布与订阅


create publication

create subscription

之后,整个逻辑复制就建立起来了,那么我们首先来看一下这两个命令处理中做了什么事情。

2.1 发布

在主库创建发布,它是一个命令处理,一般在

src/backend/commands/

路径下就有对应的命令处理,每个命令会对应一个或多个源代码文件,可以看到

src/backend/commands/publicationcmds.c

,就是发布命令的处理代码了。

在这个源文件中,可以看到创建create,删除 remove, 修改 alter等几个接口。

我们重点来看创建发布者中主要逻辑,下面摘选了代码中的部分内容来分析。

/*
 * Create new publication.
 */
ObjectAddress
CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt);
  • 检查是否已经存在
    puboid =GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,CStringGetDatum(stmt->pubname));if(OidIsValid(puboid))ereport(ERROR,(errcode(ERRCODE_DUPLICATE_OBJECT),errmsg("publication \"%s\" already exists",
                        stmt->pubname)));
  • 生成新发布者

一个新的发布者,新产生一行数据,插入到系统表中,以及相关系统表的处理。

/*  */
    tup =heap_form_tuple(RelationGetDescr(rel), values, nulls);recordDependencyOnOwner(PublicationRelationId, puboid,GetUserId());ObjectAddressSet(myself, PublicationRelationId, puboid);
  • 数据库对象的处理

下面是对于发布者关联数据库对象的处理,数据库对象为:所有表,某个表或某个字段,或者某个schema等等

if(stmt->for_all_tables){/* Invalidate relcache so that publication info is rebuilt. */CacheInvalidateRelcacheAll();}else{/* FOR TABLES IN SCHEMA requires superuser */if(schemaidlist != NIL &&!superuser())ereport(ERROR,errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));if(relations != NIL){PublicationAddTables(puboid, rels, true,NULL);}if(schemaidlist != NIL){PublicationAddSchemas(puboid, schemaidlist, true,NULL);}}
  • 配置参数

最后对于配置参数的检查,逻辑复制时WAL_LEVEL必须为logical

/*  */if(wal_level != WAL_LEVEL_LOGICAL)ereport(WARNING,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),errmsg("wal_level is insufficient to publish logical changes"),errhint("Set wal_level to \"logical\" before creating subscriptions.")));

如果在搭建逻辑复制时,没有修改配置参数,这里就会报错,创建发布失败。

好了,至此发布源就创建好了,就等待订阅任务来触发了。

2.2 订阅

通过在备库创建订阅,来启动对数据库对象进行逻辑复制。

创建订阅主要会进行如下几步:

  • 系统表中创建订阅记录

同样

create subscription

命令的执行对应的源代码在

src/backend/commands/subscriptioncmds.c

中,下面我们摘取重点代码进行分享。

创建订阅的函数如下,同样还有删除和修改的函数。

/*
 * Create new subscription.
 */
ObjectAddress
CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
                   bool isTopLevel);

首先是命令检查,权限检查,同时还会在系统表中检查是否已经存在。

这里使用了动态库

libpqwalreceiver

来处理连接和WAL接收,它是一个公共组件,在这里会用,同样在流复制的工具中也会用到。

load_file("libpqwalreceiver", false);walrcv_check_conninfo(conninfo, opts.passwordrequired &&!superuser());

在系统表

pg_subscription

中插入一条新的数据,同时处理与之依赖的其它系统表的数据。

tup =heap_form_tuple(RelationGetDescr(rel), values, nulls);/* Insert tuple into catalog. */CatalogTupleInsert(rel, tup);
  • 与发布者校验信息

通过动态库,创建与发布者的连接。


wrconn =walrcv_connect(conninfo, true, true, must_use_password,
                                stmt->subname,&err);

从发布者获取订阅的表的信息,还有数据库的版本信息等,进行检查。

  • 启动 worker

当上面订阅相关工作处理完成后,此时会注册一个在事务提交时执行的任务,这个任务就是通知

logical replication launcher

进程。

if(opts.enabled)ApplyLauncherWakeupAtCommit();

此后

create subscription

命令主体就结束了,最后事务提交后,在

logical replication launcher

进程中会收到信号通知。

当收到信号之后,会检查当前集簇中的订阅列表,如果订阅是启用状态,给每个订阅对应的启动一个

logical replication apply worker

进程,

用于从主库接收逻辑复制数据,并应用于备库。

这里查询备库对应的后台进程,可以看到多了一个

postgres: logical replication apply worker for subscription 16397

进程。

[senllang@hatch bin]$ ps -ef|grep postgres |grep2477570
senllang 247757010 Nov19 ?        00:00:00 /opt/postgres/bin/postgres -p 5433
senllang 247757124775700 Nov19 ?        00:00:00 postgres: checkpointer
senllang 247757224775700 Nov19 ?        00:00:00 postgres: background writer
senllang 247757424775700 Nov19 ?        00:00:00 postgres: walwriter
senllang 247757524775700 Nov19 ?        00:00:00 postgres: autovacuum launcher
senllang 247757624775700 Nov19 ?        00:00:00 postgres: logical replication launcher
senllang 251568124775700 08:26 ?        00:00:00 postgres: logical replication apply worker for subscription 16397
  • 建立主备间的逻辑复制通信

当备库的apply worker进程进行工作之后,会主动连接主库,主库也会创建一个

walsender

进程,来专门处理此订阅的WAL数据发送和逻辑复制的流程控制。

[senllang@hatch bin]$ ps -ef|grep postgres |grep2477547
senllang 247754710 Nov19 ?        00:00:00 /opt/postgres/bin/postgres -D pgA
senllang 247754824775470 Nov19 ?        00:00:00 postgres: checkpointer
senllang 247754924775470 Nov19 ?        00:00:00 postgres: background writer
senllang 247755124775470 Nov19 ?        00:00:00 postgres: walwriter
senllang 247755224775470 Nov19 ?        00:00:00 postgres: autovacuum launcher
senllang 247755324775470 Nov19 ?        00:00:00 postgres: logical replication launcher
senllang 251501924775470 08:21 ?        00:00:00 postgres: senllang postgres ::1(48026) idle
senllang 251568224775470 08:26 ?        00:00:00 postgres: walsender senllang postgres ::1(57106) START_REPLICATION

查询主数据库的后台服务进程,可以看到也会多出来一个

postgres: walsender senllang postgres ::1(57106) START_REPLICATION

进程,

START_REPLICATION

是逻辑复制的状态信息。

至此,整个逻辑复制从主库到备库的通信链路就建立起来了,当主库有事务提交时,就会有WAL日志的落盘,此时会触发

walsender

来检查,当有符合当前订阅的WAL数据时,就会发送到订阅端,订阅端会进行应用到当前备库中。

五、总结


本文主要分享了逻辑复制搭建时的内核框架代码逻辑,在主库创建发布者,在备库创建订阅时,会主动与发布者建立连接,此时发布者才会准备WAL进行发布,整个流程概览就是这样,内部细节步骤还有很多,后面会分多篇进行分类介绍。

结尾


非常感谢大家的支持,在浏览的同时别忘了留下您宝贵的评论,如果觉得值得鼓励,请点赞,收藏,我会更加努力!

作者邮箱:study@senllang.onaliyun.com
如有错误或者疏漏欢迎指出,互相学习。

注:未经同意,不得转载!


本文转载自: https://blog.csdn.net/senllang/article/details/143962319
版权归原作者 韩楚风 所有, 如有侵权,请联系我们删除。

“PostgreSQL内核源码分析 逻辑复制基本流程,发布订阅创建背后的故事”的评论:

还没有评论