导言
MongoDB 是一个比较成熟的文档数据库,在业务场景中,通常需要采集 MongoDB 的数据到数据仓库或数据湖中,面向分析场景使用。
Flink MongoDB CDC 是 Flink CDC 社区提供的一个用于捕获变更数据(Change Data Capturing)的 Flink 连接器,可连接到 MongoDB 数据库和集合,并捕获其中的文档增加、更新、替换、删除等变更操作。
Apache Paimon (incubating) 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。
Paimon CDC
Paimon CDC 是整合了 Flink CDC、Kafka、Paimon 的入湖工具,帮助你更好更方便的完成一键入湖。
你可以通过 Flink SQL 或者 Flink DataStream API 将 Flink CDC 数据写入 Paimon 中,也可以通过Paimon 提供的 CDC 工具来完成入湖。那这两种方式有什么区别呢?
上图是使用 Flink SQL 来完成入湖,简单,但是当源表添加新列后,同步作业不会同步新的列,下游 Paimon 表也不会增加新列。
上图是使用 Paimon CDC 工具来同步数据,可以看到,当源表发生列的新增后,流作业会自动新增列的同步,并传导到下游的 Paimon 表中,完成 Schema Evolution 的同步。
另外 Paimon CDC 工具也提供了整库同步:
整库同步可以帮助你:
- 一个作业同步多张表,以低成本的方式同步大量小表
- 作业里同时自动进行 Schema Evolution
- 新表将会被自动进行同步,你不用重启作业,全自动完成
Demo 说明
你可以跟随 Demo 步骤体验 Paimon CDC 的全自动同步之旅,Demo 展示同步 Mongo DB 的数据到Paimon 中,如下图。
以下的 Demo 使用 Flink 来完成入湖,使用 Spark SQL 来查询,当然你可以使用 Flink SQL 来查询,或者使用其它计算引擎,包括 Trino、Presto、StarRocks、Doris 、Hive 等等。
Demo 准备
步骤一:
首先下载 MongoDB Community Server,免费版,不用交钱。
https://www.mongodb.com/try/download/community
启动 MongoDB Server:
mkdir /tmp/mongodata ./mongod --replSet rs0 --dbpath /tmp/mongodata
注意:这里开启了replSet,详见 MongoDB 文档,只有开启了 replSet 的库才会产生 changelog,也就才会被 Flink Mongo CDC 可以增量读取 CDC 数据。
步骤二:
下载 MongoDB Shell:
https://www.mongodb.com/try/download/shell
并启动:
./mongosh
另外需要初始化 replSet,否者 MongoDB Server 会一直报错。
rs.initiate()
步骤三:
下载 Flink,请到官网下载最新 Flink:
https://www.apache.org/dyn/closer.lua/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
并依次下载以下 Jars 到 Flink 的 lib 目录中:
paimon-flink-1.18-0.6-*.jar,paimon-flink 集成 Jar:
https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.18/0.6-SNAPSHOT/
flink-shaded-hadoop-*.jar,Paimon 需要 hadoop 相关依赖:
flink-sql-connector-mongodb-cdc-*.jar:
在 flink/conf/flink-conf.yaml 文件中设置 checkpoint 间隔:
execution.checkpointing.interval: 10 s
生产中不推荐使用此间隔,太快会产生大量文件导致 Cost 上升,一般推荐的 Checkpoint 间隔是 1 - 5 分钟。
启动 Flink 集群:
./bin/start-cluster.sh
启动 Flink 同步任务:
./bin/flink run lib/paimon-flink-action-0.6-*.jar mongodb-sync-database --warehouse /tmp/warehouse1 --database test --mongodb-conf hosts=127.0.0.1:27017 --mongodb-conf database=test --table-conf bucket=1
参数说明:
- Warehouse 指定 paimon 所在文件系统目录,如你有 HDFS 集群或者对象存储,可以替换成你的目录。
- MongoDB 相关配置,如有密码,请填写密码。
- 最后指定 bucket 个数,目前整库同步只支持了固定 Bucket 的表,如有特殊需求,可以修改个别表的 Bucket 个数。
可以看到,作业已成功启动,拓扑主要包含三个节点:
- Source:Flink MongoDB CDC Source,并完成 Schema Evolution 和自动加表。
- CDC MultiplexWriter:复杂多个表的 Paimon 表 Writer,自动动态加表。
- Multiplex Global Committer:两阶段提交的文件提交节点。
Writer 和 Committer 都有可能成为瓶颈,Writer 和 Committer 的并发都可以通过 Flink 的配置影响。
你可以考虑打开全异步模式来避免 Writer 的 Compaction 瓶颈:
https://paimon.apache.org/docs/master/maintenance/write-performance/#asynchronous-compaction
步骤四:
下载 Spark,请到官网下载最新版本:
https://spark.apache.org/downloads.html
下载 Paimon Spark 集成 Jar:
启动 Spark SQL:
./bin/spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.warehouse=file:/tmp/warehouse1
使用 Paimon Catalog,指定 Database:
USE paimon;USE rs0;
Demo 开始
步骤一:
我们首先测试下写入的数据可以被成功读取到。
我们先给 MongoDB 插入一条数据:
db.orders.insertOne({id: 1, price: 5})
然后我们在 Spark SQL 里查询:
可以看到这条数据被同步到 Paimon 里,并且可以看到 orders 表的 Schema 里多了一列 “_id”,这列是MongoDB 自动生成的隐含的主键。
步骤二:
我们再来看看更新是如何被同步的。
在 Mongo Shell 里更新下数据:
db.orders.update({id: 1}, {$set: { price: 8 }})
Spark 里查询:
数据的 price 被更新为 8
步骤三:
我们再来看看添加字段的同步情况。
在 Mongo Shell 里新插入一条数据,多了一列:
db.orders.insertOne({id: 2, price: 6, desc: “haha”})
Spark 里查询:
可以看到,Paimon 对应的表里已经新增了一列,查询数据显示,老的数据默认值为 NULL。
步骤四:
我们再来看看新增表的同步情况。
在 Mongo Shell 里新插入一张表的数据:
db.brands.insertOne({id: 1, brand: “NBA”})
Spark 里查询:
Paimon 里已经自动多出来一张表,数据也被同步过来。
总结
通过上面的操作你感受到了吗,通过 Paimon CDC 的入湖程序可以让你全自动的同步业务数据库到 Paimon 里,数据、Schema Evolution、新增表,全部被自动完成,你只用管好这一个 Flink 作业即可。这套入湖程序已经被部署到各行各业,各个公司里,给业务数据带来非常方便的镜像到湖存储里面的能力。
更有其它数据源等你来体验:Mysql、Kafka、MongoDB、Pulsar、PostgresSQL。
Paimon 的长期使命包括:
- 极致易用性、高性能的数据入湖,方便的湖存储管理,丰富生态的查询。
- 方便的数据流读,与 Flink 生态的良好集成,给业务带来1分钟新鲜度的数据。
- 加强的 Append 数据处理,时间旅行、数据排序带来高效的查询,升级 Hive 数仓。
版权归原作者 之乎者也· 所有, 如有侵权,请联系我们删除。