本文介绍从 MySQL 作为源到 ClickHouse 作为目标的整个过程。MySQL 数据库更改通过 Debezium 捕获,并作为事件发布在到 Kafka 上。ClickHouse 通过 Kafka 表引擎按部分顺序应用这些更改,实时并保持最终一致性。相关软件版本如下:
MySQL:8.0.16
ClickHouse:24.1.8
JDK:11.0.22
zookeeper:3.9.1
Kafka:3.7.0
debezium-connector-mysql:2.4.2
这种方案的优点之一是可以做到 ClickHouse 与 MySQL 的数据最终严格一致。
一、总体架构
总体结构如下图所示。
ClickHouse 是由四个实例构成的两分片、每分片两副本集群,票选和协调器使用 ClickHouse 自带的 keeper 组件。分片、副本、keeper 节点、Zookeeper集群、Kafaka集群、Debezium-Connector-MySQL 插件的部署如下表所示。
IP
主机名
实例角色
ClickHouse
Keeper
Zookeeper
Kafka
Debezium
Connector
MySQL
172.18.4.126
node1
分片1副本1
172.18.4.188
node2
分片1副本2
172.18.4.71
node3
分片2副本1
172.18.4.86
node4
分片2副本2
二、安装配置 MySQL 主从复制
配置好主从复制后,在主库创建测试库表及数据:
-- 建库
create database test;
-- 建表
create table test.t1 (
id bigint(20) not null auto_increment,
remark varchar(32) default null comment '备注',
createtime timestamp not null default current_timestamp comment '创建时间',
primary key (id));
-- 插入三条测试数据
insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');
commit;
三、安装配置 ClickHouse 集群
四、安装 JDK
五、安装配置 Zookeeper 集群
六、安装配置 Kafaka 集群
七、安装配置 Debezium-Connector-MySQL 插件
在 node2 上执行以下步骤。
1. 创建插件目录
mkdir $KAFKA_HOME/plugins
2. 解压文件到插件目录
cd ~
# debezium-connector-mysql
unzip debezium-debezium-connector-mysql-2.4.2.zip -d $KAFKA_HOME/plugins/
3. 配置 Kafka Connector
(1)配置属性文件
# 先备份
cp $KAFKA_HOME/config/connect-distributed.properties $KAFKA_HOME/config/connect-distributed.properties.bak
# 编辑 connect-distributed.properties 文件
vim $KAFKA_HOME/config/connect-distributed.properties
内容如下:
bootstrap.servers=node2:9092,node3:9092,node4:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3
offset.flush.interval.ms=10000
plugin.path=/root/kafka_2.13-3.7.0/plugins
(2)分发到其它节点
scp $KAFKA_HOME/config/connect-distributed.properties node3:$KAFKA_HOME/config/
scp $KAFKA_HOME/config/connect-distributed.properties node4:$KAFKA_HOME/config/
scp -r $KAFKA_HOME/plugins node3:$KAFKA_HOME/
scp -r $KAFKA_HOME/plugins node4:$KAFKA_HOME/
(3)以 distributed 方式启动 Kafka connect
connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
# 确认日志是否有 ERROR
grep ERROR ~/kafka_2.13-3.7.0/logs/connectDistributed.out
(4)确认 connector 插件和自动生成的 topic
查看连接器插件:
curl -X GET http://node2:8083/connector-plugins | jq
从输出中可以看到,Kafka connect 已经识别到了 MySqlConnector source 插件:
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connector-plugins | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 403 100 403 0 0 3820 0 --:--:-- --:--:-- --:--:-- 3838
[
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "2.4.2.Final"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.7.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.7.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.7.0"
}
]
[root@vvml-yz-hbase-test~]#
查看 topic:
kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
从输出中可以看到,Kafka connect 启动时自动创建了 connect-configs、connect-offsets、connect-status 三个 topic:
[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
[root@vvml-yz-hbase-test~]#
4. 创建 source connector
(1)Debezium 三个必要的配置说明
Debezium 是一个众所周知的用于读取和解析 MySQL Binlog 的工具。它将 KafkaConnect 作为一个连接器进行集成,并对 Kafka 主题进行每一次更改。
只记录后状态
默认情况下,Debezium 会向 Kafka 发出每个操作的前状态和后状态的每条记录,这很难被 ClickHouse Kafka 表解析。此外,在执行删除操作的情况下(Clickhouse 同样无法解析),它会创建 tombstone 记录,即具有 Null 值的记录。下表展示了这个行为。
操作
操作前
操作后
附加记录
Create
Null
新纪录
Update
更新前的记录
更新后的记录
Delete
删除前的记录
Null
墓碑记录
在 Debezium 配置中使用 ExtractNewRecod 转换器来处理此问题。由于有了这个选项,Debezium 只为创建/更新操作保留 after 状态,而忽略 before 状态。但缺点是,它删除了包含先前状态的 Delete 记录和墓碑记录,换句话说就是不再捕获删除操作。紧接着说明如何解决这个问题。
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
重写删除事件
要捕获删除操作,必须添加如下所示的重写配置:
"transforms.unwrap.delete.handling.mode":"rewrite"
Debezium 使用此配置添加字段 __deleted,对于 delete 操作为 true,对于其他操作为 false。因此,删除将包含以前的状态以及 __deleted:true 字段。
处理非主键更新
在提供上述配置的情况下,更新记录(主键除外的每一列)会发出一个具有新状态的简单记录。通常在关系数据库系统中,更新后的记录会替换前一个记录,但在 ClickHouse 不行。出于性能考虑,ClickHouse 将行级更新变为多版本插入。在本示例中,MySQL 中的 test.t1 表以 id 列为主键,如果更新了 remark 列,在 ClikHouse 中,最终会得到重复的记录,这意味着 id 相同,但 remark 不同! 幸运的是有办法应付这种情况。默认情况下,Debezium 会创建一个删除记录和一个创建记录,用于更新主键。因此,如果源更新 id,它会发出一个带有前一个 id 的删除记录和一个带有新 id 的创建记录。带有 __deleted=ture 字段的前一个记录将替换 CH 中的 stall 记录。然后,可以在视图中过滤暗示删除的记录。可以使用以下选项将此行为扩展到其他列:
"message.key.columns": "test.t1:id;test.t1:remark;test.t1:createtime"
**注意:**
通过更改连接器的键列,Debezium 将这些列用作主键,而不是源表的默认主键。因此,与数据库的一条记录相关的不同操作可能最终会出现在 Kafka 中的其他分区。由于记录在不同分区中失去顺序,除非确保 ClickHouse 顺序键和 Debezium 消息键相同,否则可能会导致 Clikchouse 中的数据不一致。
**经验法则如下:**
根据想要的表结构来设计分区键和排序键。
提取分区和排序键的来源,假设它们是在物化过程中计算的。
合并所有这些列。
将步骤 3 的结果定义为 Debezium 连接器配置中的 message.column.keys。
检查 Clickhouse 排序键是否包含所有这些列。如果没有则添加它们。
现在,通过将上述所有选项和常用选项放在一起,将拥有一个功能齐全的 Debezium 配置,能够处理 ClickHouse 所需的任何更改。
(2)创建源 mysql 配置文件
# 编辑文件
vim $KAFKA_HOME/plugins/source-mysql.json
内容如下:
{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "172.18.16.156",
"database.port": "3307",
"database.user": "dba",
"database.password": "123456",
"database.server.id": "1563307",
"database.server.name": "dbserver1",
"database.include.list": "test",
"table.include.list": "test.t1",
"topic.prefix": "mysql-clickhouse-test",
"schema.history.internal.kafka.bootstrap.servers": "node2:9092,node3:9092,node4:9092",
"schema.history.internal.kafka.topic": "schemahistory.mysql-clickhouse-test",
"message.key.columns": "test.t1:id;test.t1:remark;test.t1:createtime",
"transforms":"unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}
(3)创建 mysql source connector
# 创建 connector
curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"; echo
# 查看 connector 状态
curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
# 查看 topic
kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
从输出中可以看到,mysql-source-connector 状态为 RUNNING,并自动创建了三个 topic:
[root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"; echo
HTTP/1.1 201 Created
Date: Thu, 25 Apr 2024 03:47:26 GMT
Location: http://node2:8083/connectors/mysql-source-connector
Content-Type: application/json
Content-Length: 818
Server: Jetty(9.4.53.v20231009)
{"name":"mysql-source-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"172.18.16.156","database.port":"3307","database.user":"dba","database.password":"123456","database.server.id":"1563307","database.server.name":"dbserver1","database.include.list":"test","table.include.list":"test.t1","topic.prefix":"mysql-clickhouse-test","schema.history.internal.kafka.bootstrap.servers":"node2:9092,node3:9092,node4:9092","schema.history.internal.kafka.topic":"schemahistory.mysql-clickhouse-test","message.key.columns":"test.t1:id;test.t1:remark;test.t1:createtime","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.delete.handling.mode":"rewrite","name":"mysql-source-connector"},"tasks":[],"type":"source"}
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 182 100 182 0 0 24045 0 --:--:-- --:--:-- --:--:-- 26000
{
"name": "mysql-source-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.18.4.188:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.18.4.188:8083"
}
],
"type": "source"
}
[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
mysql-clickhouse-test
mysql-clickhouse-test.test.t1
schemahistory.mysql-clickhouse-test
[root@vvml-yz-hbase-test~]#
八、在 ClickHouse 中创建库表、物化视图和视图
ClickHouse 可以利用 Kafka 表引擎将 Kafka 记录放入一个表中。需要定义三个对象:Kafka 表、主表和消费者物化视图。
1. 建库
create database db2 on cluster cluster_2S_2R;
2. 创建 Kafka 表
CREATE TABLE db2.kafka_t1 on cluster cluster_2S_2R
(
`id` Int64,
`remark` Nullable(String),
`createtime` String,
`__deleted` String
)
ENGINE = Kafka('node2:9092,node3:9092,node4:9092', 'mysql-clickhouse-test.test.t1', 'clickhouse', 'JSONEachRow');
3. 创建主表
主表具有源结构和 __deleted 字段。这里使用的是 ReplicatedReplacingMergeTree,因为需要用已删除或更新的记录替换 stall 记录。
-- 创建本地表
CREATE TABLE db2.stream_t1 on cluster cluster_2S_2R
(
`id` Int64,
`remark` Nullable(String),
`createtime` timestamp,
`__deleted` String
)
ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/tables/{shard}/db2/t1',
'{replica}'
)
ORDER BY (id, createtime)
SETTINGS index_granularity = 8192;
-- 创建分布式表,以源表的主键 id 作为分片键,保证同一 id 的数据落在同一分片上
create table db2.t1_replica_all on cluster cluster_2S_2R
as db2.stream_t1
engine = Distributed(cluster_2S_2R, db2, stream_t1, id);
4. 创建消费者物化视图
在创建物化视图前,先停止MySQL从库的复制。从库停止复制,不影响主库的正常使用,也就不会影响业务。此时从库的数据处于静止状态,不会产生变化,这使得获取存量数据变得轻而易举。然后创建物化视图时会自动将数据写入 db2.t1_replica_all 对应的本地表中。之后在 ClickHouse 集群中的任一实例上,都能从物化视图中查询到一致的 MySQL 存量数据。
-- MySQL 从库停止复制
stop slave;
Kafka 表的每一条记录只读取一次,因为它的消费者组会改变偏移量,不能读取两次。因此,需要定义一个主表,并通过物化视图将每个 Kafka 表记录具化到它:
-- 注意时间戳的处理
CREATE MATERIALIZED VIEW db2.consumer_t1 on cluster cluster_2S_2R
TO db2.t1_replica_all
(
`id` Int64,
`remark` Nullable(String),
`createtime` timestamp,
`__deleted` String
) AS
SELECT id, remark, addHours(toDateTime(substring(createtime,1,length(createtime)-1)),8) createtime, __deleted FROM db2.kafka_t1;
5. 创建视图
最后需要过滤每个被删除的记录,并拥有最新的记录,以防不同的记录具有相同的排序键。可以定义一个简单的视图来隐式完成这项工作:
CREATE VIEW db2.t1 on cluster cluster_2S_2R
(
`id` Int64,
`remark` Nullable(String),
`createtime` String,
`__deleted` String
) AS
SELECT *
FROM db2.consumer_t1
FINAL
WHERE __deleted = 'false';
6. 验证
从 clickhouse 视图查询存量数据:
vvml-yz-hbase-test.172.18.4.126 :) select * from db2.t1;
SELECT *
FROM db2.t1
Query id: 2a51fd5e-6b4f-4b78-b522-62b7be32535b
┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│ 2 │ 第二行:row2 │ 2024-04-25 11:51:07 │ false │
└────┴──────────────┴─────────────────────┴───────────┘
┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│ 1 │ 第一行:row1 │ 2024-04-25 11:51:07 │ false │
│ 3 │ 第三行:row3 │ 2024-04-25 11:51:07 │ false │
└────┴──────────────┴─────────────────────┴───────────┘
3 rows in set. Elapsed: 0.007 sec.
vvml-yz-hbase-test.172.18.4.126 :)
可以看到,存量数据已经与 MySQL 同步。
-- MySQL 主库修改数据
insert into test.t1 (remark) values ('第四行:row4');
update test.t1 set remark = '第五行:row5' where id = 4;
delete from test.t1 where id =1;
insert into test.t1 (remark) values ('第六行:row6');
-- MySQL 从库启动复制
start slave;
此时 MySQL 的数据如下:
mysql> select * from test.t1;
+----+------------------+---------------------+
| id | remark | createtime |
+----+------------------+---------------------+
| 2 | 第二行:row2 | 2024-04-25 11:51:07 |
| 3 | 第三行:row3 | 2024-04-25 11:51:07 |
| 4 | 第五行:row5 | 2024-04-25 11:56:29 |
| 5 | 第六行:row6 | 2024-04-25 11:56:29 |
+----+------------------+---------------------+
4 rows in set (0.00 sec)
从 clickhouse 视图查询增量数据:
vvml-yz-hbase-test.172.18.4.126 :) select * from db2.t1;
SELECT *
FROM db2.t1
Query id: b34bb37b-091b-490e-b55b-a0e9eedf5573
┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│ 2 │ 第二行:row2 │ 2024-04-25 11:51:07 │ false │
└────┴──────────────┴─────────────────────┴───────────┘
┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│ 4 │ 第五行:row5 │ 2024-04-25 11:56:29 │ false │
└────┴──────────────┴─────────────────────┴───────────┘
┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│ 3 │ 第三行:row3 │ 2024-04-25 11:51:07 │ false │
└────┴──────────────┴─────────────────────┴───────────┘
┌─id─┬─remark───────┬─createtime──────────┬─__deleted─┐
│ 5 │ 第六行:row6 │ 2024-04-25 11:56:29 │ false │
└────┴──────────────┴─────────────────────┴───────────┘
4 rows in set. Elapsed: 0.008 sec.
vvml-yz-hbase-test.172.18.4.126 :)
可以看到,增量数据已经与 MySQL 同步,现在从 ClickHouse 视图查询的数据与 MySQL 一致。
查看 Kafka 消费:
kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group clickhouse
输出如下:
[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --describe --group clickhouse
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
clickhouse mysql-clickhouse-test.test.t1 0 8 8 0 ClickHouse-vvml-yz-hbase-test.172.18.4.126-db2-kafka_t1-26e6aa8e-1f08-4491-8af7-f1822f1a7e94 /172.18.4.126 ClickHouse-vvml-yz-hbase-test.172.18.4.126-db2-kafka_t1
[root@vvml-yz-hbase-test~]#
可以看到,最后被消费的消息偏移量是8,MySQL 的存量、增量数据都已经通过 Kafka 消息同步到了 ClickHouse。
参考:
- Apply CDC from MySQL to ClickHouse
- New Record State Extraction
- 基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步
- Greenplum 实时数据仓库实践(5)——实时数据同步
版权归原作者 wzy0623 所有, 如有侵权,请联系我们删除。