0


Debezium和SeaTunnel实现MySQL到Hadoop的实时数据流和全量同步(基于尚硅谷的集群环境)

1、hadoop集群连接本地MySQL

1.1 首先测试集群是否可以ping通本地

虚拟机可以ping通网关(192.168.10.2),但不能ping通192.168.10.1,这表明问题可能出在Windows主机的防火墙设置或VMware的网络配置上。

1.1.1 检查Windows防火墙设置

确保Windows防火墙没有阻止ICMP请求(ping)可以临时禁用Windows防火墙进行测试:

  • 打开命令提示符(以管理员身份运行)
  • 输入以下命令来禁用防火墙
netsh advfirewall set allprofiles state off

然后再次尝试从虚拟机ping 192.168.10.1。如果现在可以ping通,说明防火墙是问题所在。需要添加一个允许ICMP的规则。

1.1.2 重新启用防火墙并添加规则

输入以下命令来重新启用防火墙

netsh advfirewall set allprofiles state on

在命令提示符中输入以下命令来添加允许ICMP请求的规则

netsh advfirewall firewall add rule name="Allow ICMPv4" protocol=icmpv4:8,any dir=in action=allow

在命令提示符中输入以下命令来添加允许3306端口的入站规则

netsh advfirewall firewall add rule name="Allow MySQL 3306" dir=in action=allow protocol=TCP localport=3306

1.2 虚拟机连接到物理机上的MySQL服务器

需要确保几个关键点:

  1. MySQL服务器配置:确保MySQL服务器允许远程连接。
  2. 网络配置:确保虚拟机和物理机在同一网络中,并且可以相互通信。
  3. 防火墙设置:确保没有防火墙阻止MySQL的端口(默认是3306)。
  4. MySQL用户权限:确保有用户权限从虚拟机的IP地址连接到MySQL服务器。

1.2.1 配置MySQL服务器以允许远程连接

  1. 登录MySQL服务器mysql -uroot -p123456
  2. 查看当前的用户权限SHOW GRANTS FOR 'root'@'localhost';
  3. 创建一个允许远程连接的用户CREATE USER 'remote_user'@'%' IDENTIFIED BY '123456';
  4. 给这个用户授权GRANT ALL PRIVILEGES ON *.* TO 'remote_user'@'%' WITH GRANT OPTION;FLUSH PRIVILEGES;
  5. 确保MySQL配置文件(my.cnf或my.ini)允许远程连接:- 找到MySQL的配置文件,通常在/etc/mysql/目录下。- 确保bind-address设置为0.0.0.0或者被注释掉,这样MySQL服务器可以监听所有网络接口。

在虚拟机上执行如下命令:

mysql -h 192.168.10.1 -u remote_user -p

已经成功配置了从Hadoop集群的

hadoop102

节点访问本地MySQL服务器,并且已经解决了网络和防火墙的问题,现在可以使用SeaTunnel来全量同步

news_data

数据库的数据到HDFS上 。

2、seatunnel导入数据库全量数据到HDFS(单表)

2.1 下载并安装SeaTunnel

官方网站:Apache SeaTunnel | Apache SeaTunnel

导入到/opt/software下,执行如下命令解压

tar -xzvf apache-seatunnel-2.3.8-bin.tar.gz -C /opt/module/

2.2 安装Connector插件

从2.2.0-beta版本开始,二进制包默认不提供connector依赖。因此,需要执行以下命令来安装所需的connector插件:

sh bin/install_plugin.sh $version

2.3 驱动jar包

  • 下载MySQL JDBC驱动: 确保下载了正确版本的MySQL JDBC驱动jar包。从MySQL官网下载。
  • 将驱动添加到SeaTunnel的lib目录: 将下载的MySQL JDBC驱动jar包放到SeaTunnel的lib目录下。这样,当SeaTunnel启动时,它就能够加载所需的MySQL驱动类

2.4 配置SeaTunnel任务

配置文件

mysql_to_hdfs.conf

,并将其放置在SeaTunnel的

config

目录下。这个配置文件应该包含了源(MySQL)和接收器(HDFS)的正确配置。

env {
  execution.parallelism = 1 # 根据你的集群资源调整并行度
}

source {
  JDBC {
    driver = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://192.168.10.1:3306/news_data?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
    user = "remote_user"
    password = "123456"
    query = "SELECT * FROM news_information"  # 使用查询语句来导入数据
  }
}

sink {
  HdfsFile {
    fs.defaultFS = "hdfs://hadoop102:8020"  # HDFS NameNode 地址
    path = "/news"  # HDFS 目标目录路径
    file_type = "text"  # 文件类型,例如 text, csv, orc 等
    field_delimiter = ","  # 字段分隔符,根据实际情况调整
    file_format_type = "text"  # 指定文件格式类型为文本
  }
}

选择执行模式

SeaTunnel支持多种执行模式,包括本地模式(local)、独立模式(standalone)、集群模式(yarn/cluster)等。对于大多数大数据任务,推荐使用集群模式来充分利用Hadoop集群的资源。

2.5 环境变量配置

/opt/module/hadoop/etc/hadoop

是包含Hadoop配置文件的目录。需要将这个目录路径设置为

HADOOP_CONF_DIR

环境变量,以便SeaTunnel的Spark作业能够正确地连接到YARN集群。

使用如下命令

sudo vim /etc/profile.d/my_env.sh

在环境变量最后添加如下

export HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop

运行以下命令使变量立即生效

source /etc/profile.d/my_env.sh

运行命令

./bin/start-seatunnel-spark-3-connector-v2.sh --config ./config/mysql_to_hdfs.conf --master yarn

SeaTunnel作业通过Spark提交到YARN集群,启动一个SeaTunnel作业,从MySQL读取数据并将其写入到HDFS中,使用YARN作为资源管理器。

3、seatunnel导入数据库全量数据到HDFS(多表)

在seatunnel的安装目录下创建脚本mysql_full_hdfs.sh

#!/bin/bash

# 获取当前日期
current_date=$(date +%Y-%m-%d)

# 定义数据库连接信息
DB_URL="jdbc:mysql://192.168.10.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
DB_USER="remote_user"
DB_PASSWORD="123456"

# 定义HDFS路径
HDFS_PATH="hdfs://hadoop102:8020"

# 定义表名数组
TABLES=("stu" "t_user")

# 循环处理每个表
for TABLE in "${TABLES[@]}"; do
  # 创建配置文件内容,包含当前日期
  conf_file="/opt/module/apache-seatunnel-2.3.8/config/${TABLE}_to_hdfs_${current_date}.conf"
  cat > "$conf_file" << EOF
env {
  execution.parallelism = 1
}
source {
  JDBC {
    driver = "com.mysql.cj.jdbc.Driver"
    url = "${DB_URL}"
    user = "${DB_USER}"
    password = "${DB_PASSWORD}"
    query = "SELECT * FROM ${TABLE}"
  }
}
sink {
  HdfsFile {
    fs.defaultFS = "${HDFS_PATH}"
    path = "/news/${TABLE}_full/${current_date}"  # HDFS 目标目录路径,每个表都有自己的目录,并且包含日期
    file_type = "parquet"  # 文件类型改为Parquet
    file_format_type = "parquet"  # 文件格式类型为Parquet
    parquet_compression_CODEC = "gzip"  # 使用gzip压缩
  }
}
EOF

  # 运行Seatunnel任务
  /opt/module/apache-seatunnel-2.3.8/bin/start-seatunnel-spark-3-connector-v2.sh --config "$conf_file" --master yarn

  # 删除配置文件
  rm "$conf_file"
done

使用chmod +x 脚本,再运行脚本。

4、MySQL到Kafka的实时数据流和变更数据捕获(单节点)

Debezium在Kafka Connect集群中监控MySQL数据库变更

4.1 MySQL配置

创建用户:首先,需要使用

CREATE USER

命令来创建一个新的用户。

CREATE USER 'debezium'@'192.168.10.102' IDENTIFIED BY '123456';

授予权限:然后,可以使用

GRANT

命令来授予这个用户所需的权限。

GRANT SUPER, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'192.168.10.102';

**为用户

debezium

授予

LOCK TABLES

权限**

请注意,授予

LOCK TABLES

权限可能会影响数据库的并发性,因为它允许用户锁定表。在授予权限时,应始终确保遵循最小权限原则,只授予用户完成其任务所需的权限。

GRANT SUPER, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'192.168.10.102';

**为用户

debezium

授予

RELOAD

权限 **

GRANT RELOAD ON *.* TO 'debezium'@'192.168.10.102';

请注意,授予

RELOAD

权限允许用户执行

FLUSH TABLES WITH READ LOCK

命令,这在 Debezium 连接器进行初始快照时是必要的。确保在授予权限时,用户

debezium

已经存在于 MySQL 中,并且使用的是正确的主机地址(在本例中为虚拟机的 IP 地址

192.168.10.102

)。

刷新权限:最后,需要刷新 MySQL 的权限设置,使新权限生效:

FLUSH PRIVILEGES;

请注意,上述步骤中的

'192.168.10.102'

应该替换为虚拟机的 IP 地址,因为这是 Kafka Connect 尝试连接到 MySQL 数据库的地址。

MySQL的配置文件:

# 后续换为允许的IP
bind-address = 0.0.0.0
log-bin=mysql-bin
binlog-format = ROW
binlog-row-image = FULL
expire_logs_days = 10

4.2 下载安装Zookeeper

在zoo.cfg下配置如下信息:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/zookeeper/zkData
clientPort=2181

在bin目录下执行

./zkServer.sh start 

4.3 下载安装Kafka

在config目录的server.properties配置文件中,配置如下信息:

broker.id=0
advertised.listeners=PLAINTEXT://hadoop102:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/module/kafka/datas
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=hadoop102:2181/kafka
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

在bin目录下执行

./kafka-server-start.sh /opt/module/kafka/config/server.properties 

4.4 下载安装Debezium

下载到/opt/software

 wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.5.Final/debezium-connector-mysql-1.9.5.Final-plugin.tar.gz

解压到:/opt/module/kafka/libs 下

tar -xzf debezium-connector-mysql-1.9.5.Final-plugin.tar.gz -C /opt/module/kafka/libs/

把 debezium-connector-mysql 里面的jar包移动到libs目录下。

cp debezium-connector-mysql/*.jar /opt/module/kafka/libs/

在/opt/module/kafka/config目录下的connect-distributed.properties 配置文件中添加

plugin.path=/opt/module/kafka/libs

在config目录下创建配置文件mysql-connector.json,并且添加如下信息

{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"192.168.10.1","database.port":"3306","database.user":"debezium","database.password":"123456","database.server.id":"184054","database.server.name":"mydb","database.include.list":"test","database.history.kafka.bootstrap.servers":"hadoop102:9092","schema.history.internal.kafka.bootstrap.servers":"hadoop102:9092","schema.history.internal.kafka.topic":"schema-changes.test","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","database.history.kafka.topic":"dbhistory.test","database.ssl.mode":"REQUIRED","database.ssl.truststore.location":"/path/to/truststore.jks","database.ssl.truststore.password":"truststore-password","database.connectionTimeZone":"UTC"}}

在config目录下创建配置文件debezium.properties,如下信息:

bootstrap.servers=hadoop102:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.flush.interval.ms=10000
# 设置 offset topic 的复制因子为 1
config.storage.topic=connect-configs
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
status.flush.interval.ms=5000
group.id=connect-cluster
name=mysql-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=192.168.10.1
database.port=3306
database.user=debezium
database.password=123456
database.server.id=184054
database.server.name=mydb
database.history.kafka.bootstrap.servers=hadoop102:9092
database.history.kafka.topic=dbhistory.mydb
database.history.store.only.monitored.tables.ddl=true
table.include.list=test.stu,test.t_user

将zookeeper和kafka启动起来,再启动kafka-connect。

kafka的bin目录下

./connect-distributed.sh /opt/module/kafka/config/debezium.properties

执行注册命令,kafka的config目录下

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://hadoop102:8083/connectors/ -d @mysql-connector.json

查看状态

curl -s http://hadoop102:8083/connectors/mysql-connector/status

删除

url -X DELETE localhost:8083/connectors/mysql-connector

5、MySQL到Kafka的实时数据流和变更数据捕获(多节点)

5.1 Zookeeper配置

  • hadoop102节点上
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/zookeeper/zkData
clientPort=2181
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888

/opt/module/zookeeper/zkData目录下有一个文件myid,里面的内容是:2

  • hadoop103节点上
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/zookeeper/zkData
clientPort=2181
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888

/opt/module/zookeeper/zkData目录下有一个文件myid,里面的内容是:3

  • hadoop104节点上
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/module/zookeeper/zkData
clientPort=2181
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888

/opt/module/zookeeper/zkData目录下有一个文件myid,里面的内容是:4

5.2 Kafka配置

  • hadoop102节点上
broker.id=0
advertised.listeners=PLAINTEXT://hadoop102:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/module/kafka/datas
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
  • hadoop103节点上

只需要修改kafka/config/server.properties

broker.id=1
advertised.listeners=PLAINTEXT://hadoop103:9092
  • hadoop104节点上

只需要修改kafka/config/server.properties

broker.id=2
advertised.listeners=PLAINTEXT://hadoop104:9092

5.3 Dezebium下载安装

hadoop102、hadoop103、hadoop104下载了debezium,并且jar包放到了/opt/module/kafka/libs目录。每个节点的**/opt/module/kafka/config/connect-distributed.properties**配置文件内容都是

bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
plugin.path=/opt/module/kafka/libs
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
status.flush.interval.ms=5000

每个节点的**/opt/module/kafka/config下创建了一个配置文件:mysql-connector.json**,内容如下:

{
    "name": "mysql-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "192.168.10.1",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "123456",
        "database.server.id": "184054",
        "database.server.name": "mydb",
        "database.include.list": "news_data",
        "database.history.kafka.bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092",
        "schema.history.internal.kafka.bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092",
        "schema.history.internal.kafka.topic": "schema-changes.news_data",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "database.history.kafka.topic": "dbhistory.news_data",
        "database.ssl.mode": "REQUIRED",
        "database.ssl.truststore.location": "/path/to/truststore.jks",
        "database.ssl.truststore.password": "truststore-password",
        "database.connectionTimeZone": "UTC"
    }
}

启动Zookeeper、Kafka,在hadoop102、hadoop103、hadoop104节点上依次启动Kafka-connect,并且在其中一台(hadoop102)注册即可。

  • hadoop102上执行如下语句
curl -s http://hadoop102:8083/connectors/mysql-connector/status
  • hadoop103上执行如下语句
curl -s http://hadoop103:8083/connectors/mysql-connector/status
  • hadoop104上执行如下语句
curl -s http://hadoop104:8083/connectors/mysql-connector/status

都得到如下结果:

{
    "name": "mysql-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "192.168.10.102:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "192.168.10.102:8083"
        }
    ],
    "type": "source"
}

当hadoop102的Kafka-connect关闭时

  • hadoop103上执行如下语句
curl -s http://hadoop103:8083/connectors/mysql-connector/status
  • hadoop104上执行如下语句
curl -s http://hadoop104:8083/connectors/mysql-connector/status

都得到如下结果:

{
    "name": "mysql-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "192.168.10.103:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "192.168.10.103:8083"
        }
    ],
    "type": "source"
}

成功地在多节点上配置了Zookeeper、Kafka和Debezium连接器,并且能够在Kafka Connect集群中注册和运行Debezium连接器

6、kafka写入HDFS

在hadoop102节点上的flume下编写一个配置文件kafka_to_hdfs.conf,内容如下:

# Define the sources, sinks, and channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# # Configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = mydb.news_data.comment_information,mydb.news_data.media_information,mydb.news_data.news_information,mydb.news_data.user_information,mydb.news_data.user_auth

# # Interceptor configuration
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.atguigu.news.flume.interceptor.TimestampInterceptor$Builder

# # Configure the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# # Configure the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/news_data/%{table}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

编写拦截器代码打jar包放入hadoop102的flume的lib目录下,jar包代码如下:

package org.atguigu.news.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class TimestampInterceptor implements Interceptor {
    @Override
    public void initialize() {
        // Initialization logic, if needed
    }
    @Override
    public Event intercept(Event event) {
        // 1. Get header and body data
        Map<String, String> headers = event.getHeaders();
        String body = new String(event.getBody(), StandardCharsets.UTF_8);

        try {
            // 2. Parse body data as JSON
            JSONObject jsonObject = JSONObject.parseObject(body);
            // 3. Extract timestamp and table name
            String ts = jsonObject.getString("ts_ms");
            JSONObject source = jsonObject.getJSONObject("source");
            String tableName = source.getString("table");
            // 4. Put timestamp and table name into headers
            headers.put("timestamp", ts);
            headers.put("table", tableName);
            return event;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            if (intercept(event) == null) {
                events.remove(event);
            }
        }
        return events;
    }
    @Override
    public void close() {
        // Clean up resources, if needed
    }
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }
        @Override
        public void configure(Context context) {
            // Configuration logic, if needed
        }
    }
}
标签: 数据仓库

本文转载自: https://blog.csdn.net/weixin_64436232/article/details/142983770
版权归原作者 Li-477233969 所有, 如有侵权,请联系我们删除。

“Debezium和SeaTunnel实现MySQL到Hadoop的实时数据流和全量同步(基于尚硅谷的集群环境)”的评论:

还没有评论