本文将详情讲解ticdc同步kafka消息Topic分发规则
自定义 Kafka Sink 的 Topic 和 Partition 的分发规则
Matcher 匹配规则
以如下示例配置文件中的 dispatchers 配置项为例:
[sink]
dispatchers = [
{matcher = ['test1.*', 'test2.*'], topic = "Topic 表达式 1", partition = "ts" },
{matcher = ['test3.*', 'test4.*'], topic = "Topic 表达式 2", partition = "index-value" },
{matcher = ['test1.*', 'test5.*'], topic = "Topic 表达式 3", partition = "table"},
{matcher = ['test6.*'], partition = "ts"}
]
● 对于匹配了 matcher 规则的表,按照对应的 topic 表达式指定的策略进行分发。例如表 test3.aa,按照 topic 表达式 2 分发;表 test5.aa,按照 topic 表达式 3 分发。
● 对于匹配了多个 matcher 规则的表,以靠前的 matcher 对应的 topic 表达式为准。例如表 test1.aa,按照 topic 表达式 1 分发。
● 对于没有匹配任何 matcher 的表,将对应的数据变更事件发送到 --sink-uri 中指定的默认 topic 中。例如表 test10.aa 发送到默认 topic。
● 对于匹配了 matcher 规则但是没有指定 topic 分发器的表,将对应的数据变更发送到 --sink-uri 中指定的默认 topic 中。例如表 test6.aa 发送到默认 topic。
Topic 分发器
Topic 分发器用 topic = “xxx” 来指定,并使用 topic 表达式来实现灵活的 topic 分发策略。topic 的总数建议小于 1000。
Topic 表达式的基本规则为 [prefix][{schema}][middle][{table}][suffix],详细解释如下:
● prefix:可选项,代表 Topic Name 的前缀。
● {schema}:可选项,用于匹配库名。
● middle:可选项,代表库表名之间的分隔符。
● {table}:可选项,用于匹配表名。
● suffix:可选项,代表 Topic Name 的后缀。
其中 prefix、middle 以及 suffix 仅允许出现大小写字母(a-z、A-Z)、数字(0-9)、点号(.)、下划线(_)和中划线(-);{schema}、{table} 均为小写,诸如 {Schema} 以及 {TABLE} 这样的占位符是无效的。
一些示例如下:
● matcher = [‘test1.table1’, ‘test2.table2’], topic = “hello_{schema}{table}"
○ 对于表 test1.table1 对应的数据变更事件,发送到名为 hello_test1_table1 的 topic 中。
○ 对于表 test2.table2 对应的数据变更事件,发送到名为 hello_test2_table2 的 topic 中。
● matcher = [‘test3.', 'test4.’], topic = "hello{schema}world"
○ 对于 test3 下的所有表对应的数据变更事件,发送到名为 hello_test3_world 的 topic 中。
○ 对于 test4 下的所有表对应的数据变更事件,发送到名为 hello_test4_world 的 topic 中。
● matcher = [‘test5., 'test6.’], topic = “hard_code_topic_name”
○ 对于 test5 和 test6 下的所有表对应的数据变更事件,发送到名为 hard_code_topic_name 的 topic 中。你可以直接指定 topic 名称。
● matcher = [‘.’], topic = "{schema}{table}”
○ 对于 TiCDC 监听的所有表,按照“库名_表名”的规则分别分发到独立的 topic 中;例如对于 test.account 表,TiCDC 会将其数据变更日志分发到名为 test_account 的 Topic 中。
DDL 事件的分发
库级别 DDL
诸如 create database、drop database 这类和某一张具体的表无关的 DDL,称之为库级别 DDL。对于库级别 DDL 对应的事件,被发送到 --sink-uri 中指定的默认 topic 中。
表级别 DDL
诸如 alter table、create table 这类和某一张具体的表相关的 DDL,称之为表级别 DDL。对于表级别 DDL 对应的事件,按照 dispatchers 的配置,被发送到相应的 topic 中。
例如,对于 matcher = [‘test.*’], topic = {schema}_{table} 这样的 dispatchers 配置,DDL 事件分发情况如下:
● 若 DDL 事件中涉及单张表,则将 DDL 事件原样发送到相应的 topic 中。
○ 对于 DDL 事件 drop table test.table1,该事件会被发送到名为 test_table1 的 topic 中。
● 若 DDL 事件中涉及多张表(rename table / drop table / drop view 都可能涉及多张表),则将单个 DDL 事件拆分为多个发送到相应的 topic 中。
○ 对于 DDL 事件 rename table test.table1 to test.table10, test.table2 to test.table20,则将 rename table test.table1 to test.table10 的 DDL 事件发送到名为 test_table1 的 topic 中,将 rename table test.table2 to test.table20 的 DDL 事件发送到名为 test.table2 的 topic 中。
Partition 分发器
partition 分发器用 partition = “xxx” 来指定,支持 default、index-value、columns、table 和 ts 共五种 partition 分发器,分发规则如下:
● default:默认使用 table 分发规则。使用所属库名和表名计算 partition 编号,一张表的数据被发送到相同的 partition。单表数据只存在于一个 partition 中并保证有序,但是发送吞吐量有限,无法通过添加消费者的方式提升消费速度。
● index-value:使用事件所属表的主键、唯一索引或由 index-name 指定的索引的值计算 partition 编号,一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。
● columns:使用由 columns 指定的列的值计算 partition 编号。一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。
● table:使用事件所属的表的库名和表名计算 partition 编号。
● ts:使用事件的 commitTs 计算 partition 编号。一张表的数据被发送到多个 partition。单表数据被发送到多个 partition 中,每个 partition 中的数据有序,可以通过添加消费者的方式提升消费速度。一条数据的多次修改可能被发送到不同的 partition 中。消费者消费进度不同可能导致消费端数据不一致。因此,消费端需要对来自多个 partition 的数据按 commitTs 排序后再进行消费。
以如下示例配置文件中的 dispatchers 配置项为例:
[sink]
dispatchers = [
{matcher = ['test.*'], partition = "index-value"},
{matcher = ['test1.*'], partition = "index-value", index-name = "index1"},
{matcher = ['test2.*'], partition = "columns", columns = ["id", "a"]},
{matcher = ['test3.*'], partition = "table"},
]
● 任何属于库 test 的表均使用 index-value 分发规则,即使用主键或者唯一索引的值计算 partition 编号。如果有主键则使用主键,否则使用最短的唯一索引。
● 任何属于库 test1 的表均使用 index-value 分发规则,并且使用名为 index1 的索引的所有列的值计算 partition 编号。如果指定的索引不存在,则报错。注意,index-name 指定的索引必须是唯一索引。
● 任何属于库 test2 的表均使用 columns 分发规则,并且使用列 id 和 a 的值计算 partition 编号。如果任一列不存在,则报错。
● 任何属于库 test3 的表均使用 table 分发规则。
● 对于属于库 test4 的表,因为不匹配上述任何一个规则,所以使用默认的 default,即 table 分发规则。
如果一张表,匹配了多个分发规则,以第一个匹配的规则为准。
注意
从 v6.1 开始,为了明确配置项的含义,用来指定 partition 分发器的配置项由原来的 dispatcher 改为 partition,partition 为 dispatcher 的别名。例如,以下两条规则完全等价:
[sink]
dispatchers = [
{matcher = ['*.*'], dispatcher = "index-value"},
{matcher = ['*.*'], partition = "index-value"},
]
但是 dispatcher 与 partition 不能出现在同一条规则中。例如,以下规则非法:
{matcher = [‘*.*’], dispatcher = “index-value”, partition = “table”},
版权归原作者 monkey_2024 所有, 如有侵权,请联系我们删除。