要求任务
请写java程序,创建restful web服务,示范利用RestController、Debezium和KafkaListener通过binlog日志监控并获得mysql业务数据库变更,请用具体数据集举例说明以上程序各步骤处理结果
一、了解相关知识
该问题首先要了解restful web服务,Debezium和kafka相关知识。
1.restful web服务
RESTful是HTTP接口调用的一种特殊实现,遵循REST架构风格的规范,能够提供更加标准化、统一化、可读性和易用性的API设计。RESTful调用相对于HTTP接口调用来说,具有更加清晰明了、易于理解和维护的API设计,扩展性和灵活性也更强。
其接口的介绍和实现可参考RESTful接口介绍与实现-CSDN博客
2.Debezium
Debezium 是一个开源的分布式平台,用于监控和捕获数据库的更改事件。它可以连接到各种不同类型的数据库,捕获数据库的更改,并将这些更改以事件流的形式传递给 Apache Kafka 或其他支持的消息队列系统。Debezium 提供了一个强大的变更数据捕获 (CDC) 解决方案,允许您实时地捕获数据库的变更,并将其用于构建实时应用程序、数据湖、分析和报告。
以下是 Debezium 的一些主要特点和概念:
- 支持多种数据库: Debezium 支持连接到多种不同类型的数据库,包括 MySQL、PostgreSQL、MongoDB、SQL Server 等。每个数据库都有相应的 Debezium 连接器。
- 实时事件捕获: Debezium 使用数据库事务日志 (transaction log) 或类似机制,以实时捕获数据库的更改。这意味着当数据库发生变更时,这些变更将立即被捕获并传递给消费者。
- 基于 Apache Kafka 的事件流: Debezium 将捕获的变更以事件流的形式传递给 Apache Kafka,这是一个高性能、持久性的分布式消息队列。这使得可以方便地将数据库变更集成到 Apache Kafka 生态系统中。
- 可扩展性: Debezium 可以通过添加新的数据库连接器进行扩展,使其能够连接到支持的数据库类型。这使得它适用于不同的数据库环境。
- 事件格式: Debezium 将数据库变更表示为一系列的事件,每个事件包含了一个或多个数据库更改的详细信息。这些事件可以以 JSON 格式进行序列化,易于理解和处理。
- 保留数据库历史: Debezium 不仅捕获当前的数据库状态,还保留了变更的历史。这允许您在任意时间点回溯并查看数据库的先前状态。
- 用例举例: Debezium 可以用于许多不同的用例,包括实时数据仓库、微服务架构中的事件驱动开发、数据湖、审计跟踪等。
在使用 Debezium 时,您需要配置适当的数据库连接器,以便 Debezium 能够连接到目标数据库,并配置 Kafka 连接信息,以确保数据库变更能够被传递到 Kafka 主题中。然后,您可以使用 Kafka 消费者来订阅这些主题,以实时获取数据库的变更事件。
3.kafka
Apache Kafka 是一个分布式流处理平台,用于构建实时数据管道和流应用程序。它最初是由 LinkedIn 开发,后来成为 Apache 软件基金会的开源项目。Kafka 设计用于高吞吐量、可扩展性、可持久性和容错性,并被广泛用于构建实时数据流平台。
以下是 Kafka 的一些关键概念和特点:
- 消息传递系统: Kafka 是一个分布式的消息传递系统,用于可靠地发布和订阅消息。它允许应用程序之间以异步的方式传递数据,而不需要直接连接。
- 分布式架构: Kafka 的架构被设计为分布式的,允许它在多个节点上水平扩展。这使得 Kafka 能够处理大量数据并提供高可用性。
- 主题(Topics): 消息在 Kafka 中被组织成主题。主题是消息的逻辑容器,用于对消息进行分类。发布者(Producers)将消息发布到特定的主题,而订阅者(Consumers)从主题中读取消息。
- 分区(Partitions): 每个主题可以分为一个或多个分区。分区是消息的物理存储单位,并允许 Kafka 水平扩展。每个分区都可以在不同的节点上进行处理。
- 生产者(Producers): 生产者负责将消息发布到 Kafka 主题。它们将消息写入特定主题的一个或多个分区。
- 消费者(Consumers): 消费者订阅一个或多个主题,并从中读取消息。每个分区只能被一个消费者组中的一个消费者消费,但一个消费者组可以包含多个消费者,以实现水平扩展和容错。
- ZooKeeper: Kafka 使用 Apache ZooKeeper 来协调和管理集群中的节点。ZooKeeper用于维护集群的元数据和协助领导选举等任务。
- 持久性和日志结构存储: Kafka 使用日志结构存储来保证消息的持久性。消息被追加到不可变的日志中,允许它们在存储中进行有效地管理。
- 流处理: Kafka 还包括强大的流处理功能,使其成为构建实时数据流应用程序的理想平台。通过 Kafka Streams API,开发人员可以在 Kafka 上直接构建流处理应用程序。
- 社区和生态系统: Kafka 拥有强大的社区支持,并且在其周边有丰富的生态系统,包括各种连接器、工具和扩展。
Kafka 的设计目标之一是能够处理大量数据流,并提供可靠的消息传递和持久性。这使得 Kafka 成为构建大规模实时数据管道和流应用程序的首选选择。
二、debezium与kafka的关系
常见的是,Debezium是通过Apache Kafka连接部署的。Kafka Connect是一个用于实现和操作的框架和运行时 源连接器,它将数据摄取到Kafka 接收连接器,它将数据从Kafka主题写入到其他系统。 下图显示了一个基于Debezium的CDC管道的架构:
总得来说,我的理解就是Debezium时连接数据库与kafka的一个工具,它会实时监控数据库的binlog文件并将数据库变更的信息传输kafka相应主题上,而kafka就相当于一个数据流的管道,它主要有主题和消费者,消费者就是从主题中读取消息的功能,所以debezium是基于kafka实现该功能的一个插件。
三、环境及软件搭建
1.kafka安装及测试
可参考Windows安装启动Kafka,创建测试实例,关闭kafka_kafka本地测试启动-CSDN博客
2.debezium安装步骤:
(1)第一步就是版本匹配问题,一定要根据不同的jdk版本配备相应版本的debezium,然后选择相应数据库的debezium下载即可
查询自己电脑jdk版本可以cmd在终端输入:java -version查看jdk版本
插件下载地址:Debezium Releases Overview
由于我的jdk为8,所以选择1.5.4版本,然后在kafka目录下新建文件夹名称为plugins,将debezium压缩包解压到此文件夹。
然后修改一些kafka的config中distributed.propertites文档中的一些内容,其中
(1)bootstrap.servers=localhost:9092
这个修改为自己安装kafka的端口号即可,默认端口号为9092
(2)group.id=connect-mysqltest
这个是组名,可以自己任意设定
(3)key.converter.schemas.enable=false value.converter.schemas.enable=false
这两处将true该为false即可
(4)listeners=HTTP://:8083
这个是zookeeper接口号
(5)
修改地址为自己安装的debezium地址即可
3.测试debezium
首先打开数据库,终端先依次启动zookeeper与kafka,然后启动debezium,其中
注意要进入kafka文件家内启动一下命令:
(1)windows终端启动zookper命令: bin\windows\zookeeper-server-start.bat config\zookeeper.properties
表示启动zookeeper启动成功。
(2)windows终端启动kafka命令:bin\windows\kafka-server-start.bat config\server.properties
表示kafka服务器启动成功。
(3)windows终端启动debezium命令:bin\windows\connect-distributed.bat config\connect-distributed.properties
debezium启动成功。
想要验证自己的debezium插件路径是否修改正确,可以使用postman软件发送http://localhost:8083/connector-plugins出现红框中内容就表示路径修改正确。或者在终端利用curl命令也可以查询,该命令为curl -H "Content-Type: application/json" http://localhost:8083/connector-plugins能返回红框中内容则表示修改正确。
4.配置debezium服务连接到mysql
(1)想要利用debezium实现对mysql的具体数据库表的业务监控,就需要对debezium服务器进行配置,其中有两种方法可以采用,第一种就是利用postman软件向接受端口发送更新请求,返回没有错误就表示对debezium服务器配置成功。
第二种就是在终端利用curl命令对其进行服务器配置,输入curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"localhost","database.port":"3306","database.user":"root","database.password":"123456","database.server.id":"1","database.server.name":"my-app-connector","table.whitelist":"test01.user","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"schema-changes-topic"}}'不返回任何错误即可。
配置完成就可以在终端利用curl -H "Content-Type: application/json" http://localhost:8083/connectors/查询连接器,能查到也表明配置成功!
(2)配置文件解析
{
"name": "my-app-connector",//指定Connector的名称为mysql-connector。
"config": {//包含Connector的具体配置信息。
"connector.class": "io.debezium.connector.mysql.MySqlConnector",//不用修改
"database.hostname": "localhost",//指定MySQL数据库的主机名
"database.port": "3306", //指定MySQL数据库的端口号
"database.user": "root",//指定连接数据库所使用的用户名
"database.password": "123456",//指定连接数据库所使用的密码
"database.server.id": "1",//指定用于标识MySQL服务器的唯一ID,随便写个数字
"database.server.name": "my-app-connector",//指定在Debezium中用于标识此MySQL服务器的名称
"database.include.list": "test01",//指定要捕获变更事件的表
"database.serverTimezone":"UTC",
"key.deserializer.encoding":"UTF8",
"value.deserializer.encoding":"UTF8",
"key.serializer.encoding":"UTF8",
"value.serializer.encoding":"UTF8",
"database.history.kafka.bootstrap.servers": "localhost:9092",//用于保存数据库 schema 变更的历史记录
"database.history.kafka.topic": "schema-changes-topic"//用于保存数据库 schema 变更的历史记录。
}
}
5.实例测试
首先创建一个消费者来提取kafka对应主题中的数据,该命令为:bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-app-connector.test01.user --group my-listener,这个主题就是前面我们配置debezium时的主题,此时修改数据库中一个用户id由9变为11,在终端显示先删除一个用户id为9的用户,在创建一个用户id为11的用户。其中“op”表示当前数据库进行的操作。
四、利用java写restful web服务
到前面一步我们已经完成了debezium与kafka的安装测试,已经在终端可以实时对数据库业务进行监控输出。创建一个restful的java代码。
这是代码结构:
//application.properties文件中主要是一些端口的配置及数据库连接的配置,前面已经讲过
//application.properties
# ?????
spring.datasource.url=jdbc:mysql://localhost:3306/test01
spring.datasource.username=root
spring.datasource.password=123456
spring.kafka.consumer.group-id=my-listener
server.port=8081
# Debezium ??
debezium.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.database.hostname=localhost
debezium.database.port=3306
debezium.database.user=root
debezium.database.password=123456
debezium.database.server.id=1
debezium.database.server.name=my-app-connector
debezium.database.include.list=test01
debezium.database.serverTimezone=UTC
debezium.key.deserializer.encoding=UTF8
debezium.value.deserializer.encoding=UTF8
debezium.key.serializer.encoding=UTF8
debezium.value.serializer.encoding=UTF8
debezium.database.history.kafka.bootstrap.servers=localhost:9092
debezium.database.history.kafka.topic=schema-changes-topic
# Kafka ??
spring.kafka.bootstrap-servers=localhost:9092
//启动类Application
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
//控制类主要实现restful服务的功能
package com.example.demo.controller;
import com.example.demo.service.KafkaConsumerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api")
public class KafkaRestController {
@Autowired
private KafkaConsumerService kafkaConsumerService;
@GetMapping("/latestMessage")
public String getLatestMessage() {
String latestMessage = kafkaConsumerService.getLatestMessage();
System.out.println("成功");
System.out.println("Retrieving latest message: " + latestMessage);
return latestMessage;
}
@KafkaListener(topics = "my-app-connector.test01.user", groupId = "my-listener")
public void consumeMessage(String message) {
kafkaConsumerService.handleMessage(message);
}
// 添加处理根路径的方法
@GetMapping
public String home() {
return "Welcome to KafkaRestController!";
}
}
//服务类:主要获取消费者数据信息的两个函数
package com.example.demo.service;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
private String latestMessage;
public void handleMessage(String message) {
// Handle your Kafka message here
this.latestMessage = message;
System.out.println("Received message from Kafka: " + message);
}
public String getLatestMessage() {
return latestMessage;
}
}
//用户类
package com.example.demo.entity;
public class User {
private int id;
private String username;
private String email;
// Getter and Setter methods
}
//pom.xml文件主要是一些项目所用到的依赖声明
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Kafka-mysql</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.5</version> <!-- 使用你需要的版本 -->
</dependency>
<!-- Debezium Connector for MySQL -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>1.9.0.Final</version> <!-- 使用最新版本 -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.0</version> <!-- 替换为最新的版本号 -->
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.4</version> <!-- 使用你想要的版本号 -->
</dependency>
</dependencies>
</project>
至此,我们就已经完成了请写java程序,创建restful web服务,示范利用RestController、Debezium和KafkaListener通过binlog日志监控并获得mysql业务数据库变更,请用具体数据集举例说明以上程序各步骤处理结果,可以通过postman软件访问http://localhost:8081/api/latestMessage地址查看数据库变更信息
版权归原作者 qq_50817552 所有, 如有侵权,请联系我们删除。