0


restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控

要求任务

请写java程序,创建restful web服务,示范利用RestController、Debezium和KafkaListener通过binlog日志监控并获得mysql业务数据库变更,请用具体数据集举例说明以上程序各步骤处理结果

一、了解相关知识

该问题首先要了解restful web服务,Debezium和kafka相关知识。

1.restful web服务

RESTful是HTTP接口调用的一种特殊实现,遵循REST架构风格的规范,能够提供更加标准化、统一化、可读性和易用性的API设计。RESTful调用相对于HTTP接口调用来说,具有更加清晰明了、易于理解和维护的API设计,扩展性和灵活性也更强。

其接口的介绍和实现可参考RESTful接口介绍与实现-CSDN博客

2.Debezium

Debezium 是一个开源的分布式平台,用于监控和捕获数据库的更改事件。它可以连接到各种不同类型的数据库,捕获数据库的更改,并将这些更改以事件流的形式传递给 Apache Kafka 或其他支持的消息队列系统。Debezium 提供了一个强大的变更数据捕获 (CDC) 解决方案,允许您实时地捕获数据库的变更,并将其用于构建实时应用程序、数据湖、分析和报告。

以下是 Debezium 的一些主要特点和概念:

  1. 支持多种数据库: Debezium 支持连接到多种不同类型的数据库,包括 MySQL、PostgreSQL、MongoDB、SQL Server 等。每个数据库都有相应的 Debezium 连接器。
  2. 实时事件捕获: Debezium 使用数据库事务日志 (transaction log) 或类似机制,以实时捕获数据库的更改。这意味着当数据库发生变更时,这些变更将立即被捕获并传递给消费者。
  3. 基于 Apache Kafka 的事件流: Debezium 将捕获的变更以事件流的形式传递给 Apache Kafka,这是一个高性能、持久性的分布式消息队列。这使得可以方便地将数据库变更集成到 Apache Kafka 生态系统中。
  4. 可扩展性: Debezium 可以通过添加新的数据库连接器进行扩展,使其能够连接到支持的数据库类型。这使得它适用于不同的数据库环境。
  5. 事件格式: Debezium 将数据库变更表示为一系列的事件,每个事件包含了一个或多个数据库更改的详细信息。这些事件可以以 JSON 格式进行序列化,易于理解和处理。
  6. 保留数据库历史: Debezium 不仅捕获当前的数据库状态,还保留了变更的历史。这允许您在任意时间点回溯并查看数据库的先前状态。
  7. 用例举例: Debezium 可以用于许多不同的用例,包括实时数据仓库、微服务架构中的事件驱动开发、数据湖、审计跟踪等。

在使用 Debezium 时,您需要配置适当的数据库连接器,以便 Debezium 能够连接到目标数据库,并配置 Kafka 连接信息,以确保数据库变更能够被传递到 Kafka 主题中。然后,您可以使用 Kafka 消费者来订阅这些主题,以实时获取数据库的变更事件。

3.kafka

Apache Kafka 是一个分布式流处理平台,用于构建实时数据管道和流应用程序。它最初是由 LinkedIn 开发,后来成为 Apache 软件基金会的开源项目。Kafka 设计用于高吞吐量、可扩展性、可持久性和容错性,并被广泛用于构建实时数据流平台。

以下是 Kafka 的一些关键概念和特点:

  1. 消息传递系统: Kafka 是一个分布式的消息传递系统,用于可靠地发布和订阅消息。它允许应用程序之间以异步的方式传递数据,而不需要直接连接。
  2. 分布式架构: Kafka 的架构被设计为分布式的,允许它在多个节点上水平扩展。这使得 Kafka 能够处理大量数据并提供高可用性。
  3. 主题(Topics): 消息在 Kafka 中被组织成主题。主题是消息的逻辑容器,用于对消息进行分类。发布者(Producers)将消息发布到特定的主题,而订阅者(Consumers)从主题中读取消息。
  4. 分区(Partitions): 每个主题可以分为一个或多个分区。分区是消息的物理存储单位,并允许 Kafka 水平扩展。每个分区都可以在不同的节点上进行处理。
  5. 生产者(Producers): 生产者负责将消息发布到 Kafka 主题。它们将消息写入特定主题的一个或多个分区。
  6. 消费者(Consumers): 消费者订阅一个或多个主题,并从中读取消息。每个分区只能被一个消费者组中的一个消费者消费,但一个消费者组可以包含多个消费者,以实现水平扩展和容错。
  7. ZooKeeper: Kafka 使用 Apache ZooKeeper 来协调和管理集群中的节点。ZooKeeper用于维护集群的元数据和协助领导选举等任务。
  8. 持久性和日志结构存储: Kafka 使用日志结构存储来保证消息的持久性。消息被追加到不可变的日志中,允许它们在存储中进行有效地管理。
  9. 流处理: Kafka 还包括强大的流处理功能,使其成为构建实时数据流应用程序的理想平台。通过 Kafka Streams API,开发人员可以在 Kafka 上直接构建流处理应用程序。
  10. 社区和生态系统: Kafka 拥有强大的社区支持,并且在其周边有丰富的生态系统,包括各种连接器、工具和扩展。

Kafka 的设计目标之一是能够处理大量数据流,并提供可靠的消息传递和持久性。这使得 Kafka 成为构建大规模实时数据管道和流应用程序的首选选择。

二、debezium与kafka的关系

常见的是,Debezium是通过Apache Kafka连接部署的。Kafka Connect是一个用于实现和操作的框架和运行时 源连接器,它将数据摄取到Kafka 接收连接器,它将数据从Kafka主题写入到其他系统。 下图显示了一个基于Debezium的CDC管道的架构:

总得来说,我的理解就是Debezium时连接数据库与kafka的一个工具,它会实时监控数据库的binlog文件并将数据库变更的信息传输kafka相应主题上,而kafka就相当于一个数据流的管道,它主要有主题和消费者,消费者就是从主题中读取消息的功能,所以debezium是基于kafka实现该功能的一个插件。

三、环境及软件搭建

1.kafka安装及测试

可参考Windows安装启动Kafka,创建测试实例,关闭kafka_kafka本地测试启动-CSDN博客

2.debezium安装步骤:

(1)第一步就是版本匹配问题,一定要根据不同的jdk版本配备相应版本的debezium,然后选择相应数据库的debezium下载即可

查询自己电脑jdk版本可以cmd在终端输入:java -version查看jdk版本

插件下载地址:Debezium Releases Overview

由于我的jdk为8,所以选择1.5.4版本,然后在kafka目录下新建文件夹名称为plugins,将debezium压缩包解压到此文件夹。

然后修改一些kafka的config中distributed.propertites文档中的一些内容,其中

(1)bootstrap.servers=localhost:9092

这个修改为自己安装kafka的端口号即可,默认端口号为9092

(2)group.id=connect-mysqltest

这个是组名,可以自己任意设定

(3)key.converter.schemas.enable=false value.converter.schemas.enable=false

这两处将true该为false即可

(4)listeners=HTTP://:8083

这个是zookeeper接口号

(5)

修改地址为自己安装的debezium地址即可

3.测试debezium

首先打开数据库,终端先依次启动zookeeper与kafka,然后启动debezium,其中

注意要进入kafka文件家内启动一下命令:

(1)windows终端启动zookper命令: bin\windows\zookeeper-server-start.bat config\zookeeper.properties

表示启动zookeeper启动成功。

(2)windows终端启动kafka命令:bin\windows\kafka-server-start.bat config\server.properties

表示kafka服务器启动成功。

(3)windows终端启动debezium命令:bin\windows\connect-distributed.bat config\connect-distributed.properties

debezium启动成功。

想要验证自己的debezium插件路径是否修改正确,可以使用postman软件发送http://localhost:8083/connector-plugins出现红框中内容就表示路径修改正确。或者在终端利用curl命令也可以查询,该命令为curl -H "Content-Type: application/json" http://localhost:8083/connector-plugins能返回红框中内容则表示修改正确。

4.配置debezium服务连接到mysql

(1)想要利用debezium实现对mysql的具体数据库表的业务监控,就需要对debezium服务器进行配置,其中有两种方法可以采用,第一种就是利用postman软件向接受端口发送更新请求,返回没有错误就表示对debezium服务器配置成功。

第二种就是在终端利用curl命令对其进行服务器配置,输入curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"localhost","database.port":"3306","database.user":"root","database.password":"123456","database.server.id":"1","database.server.name":"my-app-connector","table.whitelist":"test01.user","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"schema-changes-topic"}}'不返回任何错误即可。

配置完成就可以在终端利用curl -H "Content-Type: application/json" http://localhost:8083/connectors/查询连接器,能查到也表明配置成功!

(2)配置文件解析

{

"name": "my-app-connector",//指定Connector的名称为mysql-connector。

"config": {//包含Connector的具体配置信息。

"connector.class": "io.debezium.connector.mysql.MySqlConnector",//不用修改

"database.hostname": "localhost",//指定MySQL数据库的主机名

"database.port": "3306", //指定MySQL数据库的端口号

"database.user": "root",//指定连接数据库所使用的用户名

"database.password": "123456",//指定连接数据库所使用的密码

"database.server.id": "1",//指定用于标识MySQL服务器的唯一ID,随便写个数字

"database.server.name": "my-app-connector",//指定在Debezium中用于标识此MySQL服务器的名称

"database.include.list": "test01",//指定要捕获变更事件的表

"database.serverTimezone":"UTC",

"key.deserializer.encoding":"UTF8",

"value.deserializer.encoding":"UTF8",

"key.serializer.encoding":"UTF8",

"value.serializer.encoding":"UTF8",

"database.history.kafka.bootstrap.servers": "localhost:9092",//用于保存数据库 schema 变更的历史记录

"database.history.kafka.topic": "schema-changes-topic"//用于保存数据库 schema 变更的历史记录。

}

}

5.实例测试

首先创建一个消费者来提取kafka对应主题中的数据,该命令为:bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-app-connector.test01.user --group my-listener,这个主题就是前面我们配置debezium时的主题,此时修改数据库中一个用户id由9变为11,在终端显示先删除一个用户id为9的用户,在创建一个用户id为11的用户。其中“op”表示当前数据库进行的操作。

四、利用java写restful web服务

到前面一步我们已经完成了debezium与kafka的安装测试,已经在终端可以实时对数据库业务进行监控输出。创建一个restful的java代码。

这是代码结构:

//application.properties文件中主要是一些端口的配置及数据库连接的配置,前面已经讲过
//application.properties
# ?????
spring.datasource.url=jdbc:mysql://localhost:3306/test01
spring.datasource.username=root
spring.datasource.password=123456
spring.kafka.consumer.group-id=my-listener
server.port=8081
​
​
# Debezium ??
debezium.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.database.hostname=localhost
debezium.database.port=3306
debezium.database.user=root
debezium.database.password=123456
debezium.database.server.id=1
debezium.database.server.name=my-app-connector
debezium.database.include.list=test01
debezium.database.serverTimezone=UTC
debezium.key.deserializer.encoding=UTF8
debezium.value.deserializer.encoding=UTF8
debezium.key.serializer.encoding=UTF8
debezium.value.serializer.encoding=UTF8
debezium.database.history.kafka.bootstrap.servers=localhost:9092
debezium.database.history.kafka.topic=schema-changes-topic
​
# Kafka ??
spring.kafka.bootstrap-servers=localhost:9092
//启动类Application
package com.example.demo;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class Application {
​
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
//控制类主要实现restful服务的功能
package com.example.demo.controller;
​
import com.example.demo.service.KafkaConsumerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
​
@RestController
@RequestMapping("/api")
public class KafkaRestController {
​
    @Autowired
    private KafkaConsumerService kafkaConsumerService;
​
    @GetMapping("/latestMessage")
    public String getLatestMessage() {
​
        String latestMessage = kafkaConsumerService.getLatestMessage();
        System.out.println("成功");
        System.out.println("Retrieving latest message: " + latestMessage);
        return latestMessage;
    }
    @KafkaListener(topics = "my-app-connector.test01.user", groupId = "my-listener")
    public void consumeMessage(String message) {
        kafkaConsumerService.handleMessage(message);
    }
    // 添加处理根路径的方法
    @GetMapping
    public String home() {
        return "Welcome to KafkaRestController!";
    }
}
//服务类:主要获取消费者数据信息的两个函数
package com.example.demo.service;
​
import org.springframework.stereotype.Service;
​
@Service
public class KafkaConsumerService {
​
    private String latestMessage;
​
​
    public void handleMessage(String message) {
        // Handle your Kafka message here
        this.latestMessage = message;
        System.out.println("Received message from Kafka: " + message);
    }
​
    public String getLatestMessage() {
        return latestMessage;
    }
}
//用户类
package com.example.demo.entity;
​
public class User {
    private int id;
    private String username;
    private String email;
​
    // Getter and Setter methods
}
//pom.xml文件主要是一些项目所用到的依赖声明
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
​
    <groupId>org.example</groupId>
    <artifactId>Kafka-mysql</artifactId>
    <version>1.0-SNAPSHOT</version>
​
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.5</version> <!-- 使用你需要的版本 -->
        </dependency>
​
        <!-- Debezium Connector for MySQL -->
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-mysql</artifactId>
            <version>1.9.0.Final</version> <!-- 使用最新版本 -->
        </dependency>
​
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.13.0</version> <!-- 替换为最新的版本号 -->
        </dependency>
​
​
​
        <!-- Spring Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.7.4</version> <!-- 使用你想要的版本号 -->
        </dependency>
​
    </dependencies>
​
​
​
</project>

至此,我们就已经完成了请写java程序,创建restful web服务,示范利用RestController、Debezium和KafkaListener通过binlog日志监控并获得mysql业务数据库变更,请用具体数据集举例说明以上程序各步骤处理结果,可以通过postman软件访问http://localhost:8081/api/latestMessage地址查看数据库变更信息

标签: 数据库 restful kafka

本文转载自: https://blog.csdn.net/qq_50817552/article/details/135610669
版权归原作者 qq_50817552 所有, 如有侵权,请联系我们删除。

“restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控”的评论:

还没有评论