0


restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控

要求任务

请写java程序,创建restful web服务,示范利用RestController、Debezium和KafkaListener通过binlog日志监控并获得mysql业务数据库变更,请用具体数据集举例说明以上程序各步骤处理结果

一、了解相关知识

该问题首先要了解restful web服务,Debezium和kafka相关知识。

1.restful web服务

RESTful是HTTP接口调用的一种特殊实现,遵循REST架构风格的规范,能够提供更加标准化、统一化、可读性和易用性的API设计。RESTful调用相对于HTTP接口调用来说,具有更加清晰明了、易于理解和维护的API设计,扩展性和灵活性也更强。

其接口的介绍和实现可参考RESTful接口介绍与实现-CSDN博客

2.Debezium

Debezium 是一个开源的分布式平台,用于监控和捕获数据库的更改事件。它可以连接到各种不同类型的数据库,捕获数据库的更改,并将这些更改以事件流的形式传递给 Apache Kafka 或其他支持的消息队列系统。Debezium 提供了一个强大的变更数据捕获 (CDC) 解决方案,允许您实时地捕获数据库的变更,并将其用于构建实时应用程序、数据湖、分析和报告。

以下是 Debezium 的一些主要特点和概念:

  1. 支持多种数据库: Debezium 支持连接到多种不同类型的数据库,包括 MySQL、PostgreSQL、MongoDB、SQL Server 等。每个数据库都有相应的 Debezium 连接器。
  2. 实时事件捕获: Debezium 使用数据库事务日志 (transaction log) 或类似机制,以实时捕获数据库的更改。这意味着当数据库发生变更时,这些变更将立即被捕获并传递给消费者。
  3. 基于 Apache Kafka 的事件流: Debezium 将捕获的变更以事件流的形式传递给 Apache Kafka,这是一个高性能、持久性的分布式消息队列。这使得可以方便地将数据库变更集成到 Apache Kafka 生态系统中。
  4. 可扩展性: Debezium 可以通过添加新的数据库连接器进行扩展,使其能够连接到支持的数据库类型。这使得它适用于不同的数据库环境。
  5. 事件格式: Debezium 将数据库变更表示为一系列的事件,每个事件包含了一个或多个数据库更改的详细信息。这些事件可以以 JSON 格式进行序列化,易于理解和处理。
  6. 保留数据库历史: Debezium 不仅捕获当前的数据库状态,还保留了变更的历史。这允许您在任意时间点回溯并查看数据库的先前状态。
  7. 用例举例: Debezium 可以用于许多不同的用例,包括实时数据仓库、微服务架构中的事件驱动开发、数据湖、审计跟踪等。

在使用 Debezium 时,您需要配置适当的数据库连接器,以便 Debezium 能够连接到目标数据库,并配置 Kafka 连接信息,以确保数据库变更能够被传递到 Kafka 主题中。然后,您可以使用 Kafka 消费者来订阅这些主题,以实时获取数据库的变更事件。

3.kafka

Apache Kafka 是一个分布式流处理平台,用于构建实时数据管道和流应用程序。它最初是由 LinkedIn 开发,后来成为 Apache 软件基金会的开源项目。Kafka 设计用于高吞吐量、可扩展性、可持久性和容错性,并被广泛用于构建实时数据流平台。

以下是 Kafka 的一些关键概念和特点:

  1. 消息传递系统: Kafka 是一个分布式的消息传递系统,用于可靠地发布和订阅消息。它允许应用程序之间以异步的方式传递数据,而不需要直接连接。
  2. 分布式架构: Kafka 的架构被设计为分布式的,允许它在多个节点上水平扩展。这使得 Kafka 能够处理大量数据并提供高可用性。
  3. 主题(Topics): 消息在 Kafka 中被组织成主题。主题是消息的逻辑容器,用于对消息进行分类。发布者(Producers)将消息发布到特定的主题,而订阅者(Consumers)从主题中读取消息。
  4. 分区(Partitions): 每个主题可以分为一个或多个分区。分区是消息的物理存储单位,并允许 Kafka 水平扩展。每个分区都可以在不同的节点上进行处理。
  5. 生产者(Producers): 生产者负责将消息发布到 Kafka 主题。它们将消息写入特定主题的一个或多个分区。
  6. 消费者(Consumers): 消费者订阅一个或多个主题,并从中读取消息。每个分区只能被一个消费者组中的一个消费者消费,但一个消费者组可以包含多个消费者,以实现水平扩展和容错。
  7. ZooKeeper: Kafka 使用 Apache ZooKeeper 来协调和管理集群中的节点。ZooKeeper用于维护集群的元数据和协助领导选举等任务。
  8. 持久性和日志结构存储: Kafka 使用日志结构存储来保证消息的持久性。消息被追加到不可变的日志中,允许它们在存储中进行有效地管理。
  9. 流处理: Kafka 还包括强大的流处理功能,使其成为构建实时数据流应用程序的理想平台。通过 Kafka Streams API,开发人员可以在 Kafka 上直接构建流处理应用程序。
  10. 社区和生态系统: Kafka 拥有强大的社区支持,并且在其周边有丰富的生态系统,包括各种连接器、工具和扩展。

Kafka 的设计目标之一是能够处理大量数据流,并提供可靠的消息传递和持久性。这使得 Kafka 成为构建大规模实时数据管道和流应用程序的首选选择。

二、debezium与kafka的关系

常见的是,Debezium是通过Apache Kafka连接部署的。Kafka Connect是一个用于实现和操作的框架和运行时 源连接器,它将数据摄取到Kafka 接收连接器,它将数据从Kafka主题写入到其他系统。 下图显示了一个基于Debezium的CDC管道的架构:

总得来说,我的理解就是Debezium时连接数据库与kafka的一个工具,它会实时监控数据库的binlog文件并将数据库变更的信息传输kafka相应主题上,而kafka就相当于一个数据流的管道,它主要有主题和消费者,消费者就是从主题中读取消息的功能,所以debezium是基于kafka实现该功能的一个插件。

三、环境及软件搭建

1.kafka安装及测试

可参考Windows安装启动Kafka,创建测试实例,关闭kafka_kafka本地测试启动-CSDN博客

2.debezium安装步骤:

(1)第一步就是版本匹配问题,一定要根据不同的jdk版本配备相应版本的debezium,然后选择相应数据库的debezium下载即可

查询自己电脑jdk版本可以cmd在终端输入:java -version查看jdk版本

插件下载地址:Debezium Releases Overview

由于我的jdk为8,所以选择1.5.4版本,然后在kafka目录下新建文件夹名称为plugins,将debezium压缩包解压到此文件夹。

然后修改一些kafka的config中distributed.propertites文档中的一些内容,其中

(1)bootstrap.servers=localhost:9092

这个修改为自己安装kafka的端口号即可,默认端口号为9092

(2)group.id=connect-mysqltest

这个是组名,可以自己任意设定

(3)key.converter.schemas.enable=false value.converter.schemas.enable=false

这两处将true该为false即可

(4)listeners=HTTP://:8083

这个是zookeeper接口号

(5)

修改地址为自己安装的debezium地址即可

3.测试debezium

首先打开数据库,终端先依次启动zookeeper与kafka,然后启动debezium,其中

注意要进入kafka文件家内启动一下命令:

(1)windows终端启动zookper命令: bin\windows\zookeeper-server-start.bat config\zookeeper.properties

表示启动zookeeper启动成功。

(2)windows终端启动kafka命令:bin\windows\kafka-server-start.bat config\server.properties

表示kafka服务器启动成功。

(3)windows终端启动debezium命令:bin\windows\connect-distributed.bat config\connect-distributed.properties

debezium启动成功。

想要验证自己的debezium插件路径是否修改正确,可以使用postman软件发送http://localhost:8083/connector-plugins出现红框中内容就表示路径修改正确。或者在终端利用curl命令也可以查询,该命令为curl -H "Content-Type: application/json" http://localhost:8083/connector-plugins能返回红框中内容则表示修改正确。

4.配置debezium服务连接到mysql

(1)想要利用debezium实现对mysql的具体数据库表的业务监控,就需要对debezium服务器进行配置,其中有两种方法可以采用,第一种就是利用postman软件向接受端口发送更新请求,返回没有错误就表示对debezium服务器配置成功。

第二种就是在终端利用curl命令对其进行服务器配置,输入curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"localhost","database.port":"3306","database.user":"root","database.password":"123456","database.server.id":"1","database.server.name":"my-app-connector","table.whitelist":"test01.user","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"schema-changes-topic"}}'不返回任何错误即可。

配置完成就可以在终端利用curl -H "Content-Type: application/json" http://localhost:8083/connectors/查询连接器,能查到也表明配置成功!

(2)配置文件解析

{

"name": "my-app-connector",//指定Connector的名称为mysql-connector。

"config": {//包含Connector的具体配置信息。

"connector.class": "io.debezium.connector.mysql.MySqlConnector",//不用修改

"database.hostname": "localhost",//指定MySQL数据库的主机名

"database.port": "3306", //指定MySQL数据库的端口号

"database.user": "root",//指定连接数据库所使用的用户名

"database.password": "123456",//指定连接数据库所使用的密码

"database.server.id": "1",//指定用于标识MySQL服务器的唯一ID,随便写个数字

"database.server.name": "my-app-connector",//指定在Debezium中用于标识此MySQL服务器的名称

"database.include.list": "test01",//指定要捕获变更事件的表

"database.serverTimezone":"UTC",

"key.deserializer.encoding":"UTF8",

"value.deserializer.encoding":"UTF8",

"key.serializer.encoding":"UTF8",

"value.serializer.encoding":"UTF8",

"database.history.kafka.bootstrap.servers": "localhost:9092",//用于保存数据库 schema 变更的历史记录

"database.history.kafka.topic": "schema-changes-topic"//用于保存数据库 schema 变更的历史记录。

}

}

5.实例测试

首先创建一个消费者来提取kafka对应主题中的数据,该命令为:bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-app-connector.test01.user --group my-listener,这个主题就是前面我们配置debezium时的主题,此时修改数据库中一个用户id由9变为11,在终端显示先删除一个用户id为9的用户,在创建一个用户id为11的用户。其中“op”表示当前数据库进行的操作。

四、利用java写restful web服务

到前面一步我们已经完成了debezium与kafka的安装测试,已经在终端可以实时对数据库业务进行监控输出。创建一个restful的java代码。

这是代码结构:

  1. //application.properties文件中主要是一些端口的配置及数据库连接的配置,前面已经讲过
  2. //application.properties
  3. # ?????
  4. spring.datasource.url=jdbc:mysql://localhost:3306/test01
  5. spring.datasource.username=root
  6. spring.datasource.password=123456
  7. spring.kafka.consumer.group-id=my-listener
  8. server.port=8081
  9. # Debezium ??
  10. debezium.connector.class=io.debezium.connector.mysql.MySqlConnector
  11. debezium.database.hostname=localhost
  12. debezium.database.port=3306
  13. debezium.database.user=root
  14. debezium.database.password=123456
  15. debezium.database.server.id=1
  16. debezium.database.server.name=my-app-connector
  17. debezium.database.include.list=test01
  18. debezium.database.serverTimezone=UTC
  19. debezium.key.deserializer.encoding=UTF8
  20. debezium.value.deserializer.encoding=UTF8
  21. debezium.key.serializer.encoding=UTF8
  22. debezium.value.serializer.encoding=UTF8
  23. debezium.database.history.kafka.bootstrap.servers=localhost:9092
  24. debezium.database.history.kafka.topic=schema-changes-topic
  25. # Kafka ??
  26. spring.kafka.bootstrap-servers=localhost:9092
  1. //启动类Application
  2. package com.example.demo;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. @SpringBootApplication
  6. public class Application {
  7. public static void main(String[] args) {
  8. SpringApplication.run(Application.class, args);
  9. }
  10. }
  1. //控制类主要实现restful服务的功能
  2. package com.example.demo.controller;
  3. import com.example.demo.service.KafkaConsumerService;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.kafka.annotation.KafkaListener;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. @RestController
  10. @RequestMapping("/api")
  11. public class KafkaRestController {
  12. @Autowired
  13. private KafkaConsumerService kafkaConsumerService;
  14. @GetMapping("/latestMessage")
  15. public String getLatestMessage() {
  16. String latestMessage = kafkaConsumerService.getLatestMessage();
  17. System.out.println("成功");
  18. System.out.println("Retrieving latest message: " + latestMessage);
  19. return latestMessage;
  20. }
  21. @KafkaListener(topics = "my-app-connector.test01.user", groupId = "my-listener")
  22. public void consumeMessage(String message) {
  23. kafkaConsumerService.handleMessage(message);
  24. }
  25. // 添加处理根路径的方法
  26. @GetMapping
  27. public String home() {
  28. return "Welcome to KafkaRestController!";
  29. }
  30. }
  1. //服务类:主要获取消费者数据信息的两个函数
  2. package com.example.demo.service;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaConsumerService {
  6. private String latestMessage;
  7. public void handleMessage(String message) {
  8. // Handle your Kafka message here
  9. this.latestMessage = message;
  10. System.out.println("Received message from Kafka: " + message);
  11. }
  12. public String getLatestMessage() {
  13. return latestMessage;
  14. }
  15. }
  1. //用户类
  2. package com.example.demo.entity;
  3. public class User {
  4. private int id;
  5. private String username;
  6. private String email;
  7. // Getter and Setter methods
  8. }
  1. //pom.xml文件主要是一些项目所用到的依赖声明
  2. <?xml version="1.0" encoding="UTF-8"?>
  3. <project xmlns="http://maven.apache.org/POM/4.0.0"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  6. <modelVersion>4.0.0</modelVersion>
  7. <groupId>org.example</groupId>
  8. <artifactId>Kafka-mysql</artifactId>
  9. <version>1.0-SNAPSHOT</version>
  10. <properties>
  11. <maven.compiler.source>8</maven.compiler.source>
  12. <maven.compiler.target>8</maven.compiler.target>
  13. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  14. </properties>
  15. <dependencies>
  16. <!-- Spring Boot Starter Web -->
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-starter-web</artifactId>
  20. <version>2.5.5</version> <!-- 使用你需要的版本 -->
  21. </dependency>
  22. <!-- Debezium Connector for MySQL -->
  23. <dependency>
  24. <groupId>io.debezium</groupId>
  25. <artifactId>debezium-connector-mysql</artifactId>
  26. <version>1.9.0.Final</version> <!-- 使用最新版本 -->
  27. </dependency>
  28. <dependency>
  29. <groupId>com.fasterxml.jackson.core</groupId>
  30. <artifactId>jackson-core</artifactId>
  31. <version>2.13.0</version> <!-- 替换为最新的版本号 -->
  32. </dependency>
  33. <!-- Spring Kafka -->
  34. <dependency>
  35. <groupId>org.springframework.kafka</groupId>
  36. <artifactId>spring-kafka</artifactId>
  37. <version>2.7.4</version> <!-- 使用你想要的版本号 -->
  38. </dependency>
  39. </dependencies>
  40. </project>

至此,我们就已经完成了请写java程序,创建restful web服务,示范利用RestController、Debezium和KafkaListener通过binlog日志监控并获得mysql业务数据库变更,请用具体数据集举例说明以上程序各步骤处理结果,可以通过postman软件访问http://localhost:8081/api/latestMessage地址查看数据库变更信息

标签: 数据库 restful kafka

本文转载自: https://blog.csdn.net/qq_50817552/article/details/135610669
版权归原作者 qq_50817552 所有, 如有侵权,请联系我们删除。

“restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控”的评论:

还没有评论