0


使用 Kafka 和 CDC 将数据从 MongoDB Atlas 流式传输到 SingleStore Kai

SingleStore 提供了变更数据捕获 (CDC) 解决方案,可将数据从 MongoDB 流式传输到 SingleStore Kai。在本文中,我们将了解如何将 Apache Kafka 代理连接到 MongoDB Atlas,然后使用 CDC 解决方案将数据从 MongoDB Atlas 流式传输到 SingleStore Kai。我们还将使用 Metabase 为 SingleStore Kai 创建一个简单的分析仪表板。

介绍

CDC 是一种跟踪数据库或系统中发生的更改的方法。SingleStore 现在提供了与 MongoDB 配合使用的 CDC 解决方案。

为了演示 CDC 解决方案,我们将使用Kafka 代理将数据流式传输到 MongoDB Atlas 集群,然后使用 CDC 管道将数据从 MongoDB Atlas 传播到 SingleStore Kai。我们还将使用 Metabase 创建一个简单的分析仪表板。

图 1 显示了我们系统的高级架构。

高层架构

图 1. 高级架构(来源:SingleStore)。

我们将在以后的文章中重点介绍使用 CDC 解决方案的其他场景。

MongoDB Atlas

我们将在 M0 沙箱中使用 MongoDB Atlas。我们将在Database Access下配置具有atlasAdmin权限的管理员用户。我们将暂时允许从网络访问下的任何地方(IP 地址 0.0.0.0/0)进行访问。我们将记下用户名密码主机

Apache Kafka

我们将配置 Kafka 代理将数据流式传输到MongoDB Atlas中。我们将使用 Jupyter Notebook 来实现此目的。

首先,我们将安装一些库:

  1. !pip install pymongo kafka-python --quiet

接下来,我们将连接到 MongoDB Atlas 和 Kafka 代理:

  1. from kafka import KafkaConsumer
  2. from pymongo import MongoClient
  3. try:
  4. client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")
  5. db = client.adtech
  6. print("Connected successfully")
  7. except:
  8. print("Could not connect")
  9. consumer = KafkaConsumer(
  10. "ad_events",
  11. bootstrap_servers = ["public-kafka.memcompute.com:9092"]

我们将用我们之前从 MongoDB Atlas 保存的值替换

  1. <username>

,

  1. <password>

和。

  1. <host>

最初,我们将 100 条记录加载到 MongoDB Atlas 中,如下所示:

  1. MAX_ITERATIONS = 100
  2. for iteration, message in enumerate(consumer, start = 1):
  3. if iteration > MAX_ITERATIONS:
  4. break
  5. try:
  6. record = message.value.decode("utf-8")
  7. user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t"))
  8. events_record = {
  9. "user_id": int(user_id),
  10. "event_name": event_name,
  11. "advertiser": advertiser,
  12. "campaign": int(campaign.split()[0]),
  13. "gender": gender,
  14. "income": income,
  15. "page_url": page_url,
  16. "region": region,
  17. "country": country
  18. }
  19. db.events.insert_one(events_record)
  20. except Exception as e:
  21. print(f"Iteration {iteration}: Could not insert data - {str(e)}")

数据应该成功加载,我们应该看到一个名为 的数据库,

  1. adtech

其中包含一个名为 的集合

  1. events

。集合中的文档在结构上应类似于以下示例:

  1. _id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
  2. user_id: 3857963415
  3. event_name: "Impression"
  4. advertiser: "Sherwin-Williams"
  5. campaign: 13
  6. gender: "Female"
  7. income: "25k and below",
  8. page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
  9. region: "Michigan"
  10. country: "US"

这些文档代表广告活动事件。该

  1. events

集合存储 的详细信息

  1. advertiser

以及

  1. campaign

有关用户的各种人口统计信息,例如

  1. gender

  1. income

SingleStore Kai

上一篇文章介绍了创建免费 SingleStoreDB 云帐户的步骤。我们将使用以下设置:

  • 工作区组名称: CDC 演示组
  • 云提供商: AWS
  • 区域:美国东部 1(弗吉尼亚北部)
  • 工作区名称: cdc-demo
  • 尺码: S-00
  • 设置: - SingleStore Kai 选择

一旦工作区可用,我们将记下密码主机该主机可从CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host获取。稍后我们将需要元数据库的此信息。我们还将通过在CDC 演示组 > 防火墙下配置防火墙来暂时允许从任何地方进行访问。

从左侧导航窗格中,我们选择DEVELOP > SQL Editor来创建

  1. adtech

数据库

  1. link

,如下所示:

  1. CREATE DATABASE IF NOT EXISTS adtech;
  2. USE adtech;
  3. DROP LINK adtech.link;
  4. CREATE LINK adtech.link AS MONGODB
  5. CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017",
  6. "collection.include.list": "adtech.*",
  7. "mongodb.ssl.enabled": "true",
  8. "mongodb.authsource": "admin",
  9. "mongodb.members.auto.discover": "false"}'
  10. CREDENTIALS '{"mongodb.user": "<username>",
  11. "mongodb.password": "<password>"}';
  12. CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;

我们将用我们之前从 MongoDB Atlas 保存的值替换

  1. <username>

和。

  1. <password>

我们还需要将

  1. <primary>

  1. <secondary>

和的值替换

  1. <secondary>

为 MongoDB Atlas 中每个值的完整地址。

我们现在将检查是否有任何表,如下所示:

  1. SHOW TABLES;

这应该显示一张名为

  1. events

  1. +------------------+
  2. | Tables_in_adtech |
  3. +------------------+
  4. | events |
  5. +------------------+

我们将检查表的结构:

  1. DESCRIBE events;

输出应如下所示:

  1. +-------+------+------+------+---------+-------+
  2. | Field | Type | Null | Key | Default | Extra |
  3. +-------+------+------+------+---------+-------+
  4. | _id | text | NO | UNI | NULL | |
  5. | _more | JSON | NO | | NULL | |
  6. +-------+------+------+------+---------+-------+

接下来,我们将检查是否有

  1. pipelines

  1. SHOW PIPELINES;

这将显示

  1. events

当前调用的一个管道

  1. Stopped

  1. +---------------------+---------+-----------+
  2. | Pipelines_in_adtech | State | Scheduled |
  3. +---------------------+---------+-----------+
  4. | events | Stopped | False |
  5. +---------------------+---------+-----------+

现在我们将启动

  1. events

管道:

  1. START ALL PIPELINES;

并且状态应更改为

  1. Running

  1. +---------------------+---------+-----------+
  2. | Pipelines_in_adtech | State | Scheduled |
  3. +---------------------+---------+-----------+
  4. | events | Running | False |
  5. +---------------------+---------+-----------+

如果我们现在运行以下命令:

  1. SELECT COUNT(*) FROM events;

它应该返回 100 作为结果:

  1. +----------+
  2. | COUNT(*) |
  3. +----------+
  4. | 100 |
  5. +----------+

我们将检查表中的一行

  1. events

,如下所示:

  1. SELECT * FROM events LIMIT 1;

输出应类似于以下内容:

  1. +--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  2. | _id | _more |
  3. +--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  4. | {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} |
  5. +--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

CDC 解决方案已成功连接到 MongoDB Atlas 并将所有 100 条记录复制到 SingleStore Kai。

现在让我们使用 Metabase 创建一个仪表板。

元数据库

上一篇文章描述了如何安装、配置和创建元数据库连接的详细信息。我们将使用前一篇文章中使用的查询的细微变化来创建可视化。

1. 活动总数

  1. SELECT COUNT(*) FROM events;

2. 各地区活动

  1. SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
  2. FROM adtech.events AS events
  3. GROUP BY 1;

3. Top 5 广告商活动

  1. SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
  2. FROM adtech.events AS events
  3. WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
  4. GROUP BY 1
  5. ORDER BY `events.count` DESC;

4. 按性别和收入划分的广告访问者

  1. SELECT *
  2. FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
  3. WHEN xx.z___min_rank = xx.z___rank THEN 1
  4. ELSE 0
  5. END AS z__is_highest_ranked_cell
  6. FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
  7. FROM (SELECT *, RANK() OVER (ORDER BY CASE
  8. WHEN bb.z__pivot_col_rank = 1 THEN (CASE
  9. WHEN bb.`events.count` IS NOT NULL THEN 0
  10. ELSE 1
  11. END)
  12. ELSE 2
  13. END, CASE
  14. WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
  15. ELSE NULL
  16. END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
  17. FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
  18. WHEN ww.`events.gender` IS NULL THEN 1
  19. ELSE 0
  20. END, ww.`events.gender`) AS z__pivot_col_rank
  21. FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`
  22. FROM adtech.events AS events
  23. WHERE (_more::income <> 'unknown' OR _more::income IS NULL)
  24. GROUP BY 1, 2) ww) bb
  25. WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
  26. WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
  27. ORDER BY zz.z___pivot_row_rank;

图 2 显示了 AdTech 仪表板上图表大小和位置的示例。我们将自动刷新选项设置为 1 分钟。

图 2.最终仪表板。

图 2.最终仪表板。

如果我们通过更改 使用 Jupyter Notebook 将更多数据加载到 MongoDB Atlas 中

  1. MAX_ITERATIONS

,我们将看到数据传播到 SingleStore Kai 以及 AdTech 仪表板中反映的新数据。

总结

在本文中,我们创建了一个 CDC 管道,以使用 SingleStore Kai 增强 MongoDB Atlas。正如多个基准测试所强调的那样,SingleStore Kai 因其卓越的性能而可用于分析。我们还使用 Metabase 创建了一个快速的可视化仪表板,以帮助我们深入了解我们的广告活动。


作者:Akmal Chaudhri ​

更多技术干货请关注公号【云原生数据库

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

irds.cn,多数据库管理平台(私有云)。

标签: kafka mongodb 分布式

本文转载自: https://blog.csdn.net/weixin_48804451/article/details/135401681
版权归原作者 沃趣数据库管理平台 所有, 如有侵权,请联系我们删除。

“使用 Kafka 和 CDC 将数据从 MongoDB Atlas 流式传输到 SingleStore Kai”的评论:

还没有评论