0


使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。

  • 1. 从源数据库(MySQL和Oracle)实时抽取数据
  • 2. 对数据进行清洗和转换
  • **3. 将转换后的数据写入目标数据库(MySQL)**。请添加图片描述

我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力,适合处理实时数据同步和转换任务。

环境准备

  • 确保MySQL和Oracle数据库运行**,并创建相应的表。
  • 创建Spring Boot项目,并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。

第一步:创建源和目标数据库表

假设我们有以下三个表:

  • source_mysql_table(MySQL中的源表)
  • source_oracle_table(Oracle中的源表)
  • target_table(目标MySQL表)

MySQL源表

  1. CREATEDATABASE source_mysql_db;USE source_mysql_db;CREATETABLE source_mysql_table (
  2. id INTAUTO_INCREMENTPRIMARYKEY,
  3. user_id VARCHAR(255)NOTNULL,actionVARCHAR(255)NOTNULL,timestampVARCHAR(255)NOTNULL);

Oracle源表

  1. CREATETABLE source_oracle_table (
  2. id NUMBER GENERATED BYDEFAULTONNULLASIDENTITY,
  3. user_id VARCHAR2(255)NOTNULL,action VARCHAR2(255)NOTNULL,timestamp VARCHAR2(255)NOTNULL,PRIMARYKEY(id));

目标MySQL表

  1. CREATEDATABASE target_db;USE target_db;CREATETABLE target_table (
  2. id INTAUTO_INCREMENTPRIMARYKEY,
  3. user_id VARCHAR(255)NOTNULL,actionVARCHAR(255)NOTNULL,timestampVARCHAR(255)NOTNULL);

第二步:添加项目依赖

在pom.xml中添加Flink、MySQL和Oracle相关的依赖:

  1. <dependencies><!-- Spring Boot dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!-- MySQL JDBC driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><!-- Oracle JDBC driver --><dependency><groupId>com.oracle.database.jdbc</groupId><artifactId>ojdbc8</artifactId><version>19.8.0.0</version></dependency></dependencies>

第三步:编写Flink ETL任务

创建一个Flink任务类来实现ETL逻辑。

创建一个POJO类表示数据结构

  1. packagecom.example.flink;publicclassUserAction{privateint id;privateString userId;privateString action;privateString timestamp;// Getters and setterspublicintgetId(){return id;}publicvoidsetId(int id){this.id = id;}publicStringgetUserId(){return userId;}publicvoidsetUserId(String userId){this.userId = userId;}publicStringgetAction(){return action;}publicvoidsetAction(String action){this.action = action;}publicStringgetTimestamp(){return timestamp;}publicvoidsetTimestamp(String timestamp){this.timestamp = timestamp;}}

编写Flink任务类

  1. packagecom.example.flink;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importorg.apache.flink.util.Collector;importorg.springframework.boot.CommandLineRunner;importorg.springframework.stereotype.Component;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.ResultSet;@ComponentpublicclassFlinkETLJobimplementsCommandLineRunner{@Overridepublicvoidrun(String... args)throwsException{finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 从MySQL读取数据DataStream<UserAction> mysqlDataStream = env.addSource(newMySQLSource());// 从Oracle读取数据DataStream<UserAction> oracleDataStream = env.addSource(newOracleSource());// 合并两个数据流DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);// 清洗和转换数据DataStream<UserAction> transformedStream = mergedStream.map(newMapFunction<UserAction,UserAction>(){@OverridepublicUserActionmap(UserAction value)throwsException{// 进行清洗和转换
  2. value.setAction(value.getAction().toUpperCase());return value;}});// 将数据写入目标MySQL数据库
  3. transformedStream.addSink(newMySQLSink());// 执行任务
  4. env.execute("Flink ETL Job");}publicstaticclassMySQLSourceimplementsSourceFunction<UserAction>{privatestaticfinalStringJDBC_URL="jdbc:mysql://localhost:3306/source_mysql_db";privatestaticfinalStringJDBC_USER="source_user";privatestaticfinalStringJDBC_PASSWORD="source_password";privatevolatileboolean isRunning =true;@Overridepublicvoidrun(SourceContext<UserAction> ctx)throwsException{try(Connection connection =DriverManager.getConnection(JDBC_URL,JDBC_USER,JDBC_PASSWORD)){while(isRunning){String sql ="SELECT * FROM source_mysql_table";try(PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()){while(resultSet.next()){UserAction userAction =newUserAction();
  5. userAction.setId(resultSet.getInt("id"));
  6. userAction.setUserId(resultSet.getString("user_id"));
  7. userAction.setAction(resultSet.getString("action"));
  8. userAction.setTimestamp(resultSet.getString("timestamp"));
  9. ctx.collect(userAction);}}Thread.sleep(5000);// 模拟实时数据流,每5秒查询一次}}}@Overridepublicvoidcancel(){
  10. isRunning =false;}}publicstaticclassOracleSourceimplementsSourceFunction<UserAction>{privatestaticfinalStringJDBC_URL="jdbc:oracle:thin:@localhost:1521:orcl";privatestaticfinalStringJDBC_USER="source_user";privatestaticfinalStringJDBC_PASSWORD="source_password";privatevolatileboolean isRunning =true;@Overridepublicvoidrun(SourceContext<UserAction> ctx)throwsException{try(Connection connection =DriverManager.getConnection(JDBC_URL,JDBC_USER,JDBC_PASSWORD)){while(isRunning){String sql ="SELECT * FROM source_oracle_table";try(PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()){while(resultSet.next()){UserAction userAction =newUserAction();
  11. userAction.setId(resultSet.getInt("id"));
  12. userAction.setUserId(resultSet.getString("user_id"));
  13. userAction.setAction(resultSet.getString("action"));
  14. userAction.setTimestamp(resultSet.getString("timestamp"));
  15. ctx.collect(userAction);}}Thread.sleep(5000);// 模拟实时数据流,每5秒查询一次}}}@Overridepublicvoidcancel(){
  16. isRunning =false;}}publicstaticclassMySQLSinkextendsRichFlatMapFunction<UserAction,Void>{privatestaticfinalStringJDBC_URL="jdbc:mysql://localhost:3306/target_db";privatestaticfinalStringJDBC_USER="target_user";privatestaticfinalStringJDBC_PASSWORD="target_password";privatetransientConnection connection;privatetransientPreparedStatement statement;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);
  17. connection =DriverManager.getConnection(JDBC_URL,JDBC_USER,JDBC_PASSWORD);String sql ="INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";
  18. statement = connection.prepareStatement(sql);}@OverridepublicvoidflatMap(UserAction value,Collector<Void> out)throwsException{
  19. statement.setString(1, value.getUserId());
  20. statement.setString(2, value.getAction());
  21. statement.setString(3, value.getTimestamp());
  22. statement.executeUpdate();}@Overridepublicvoidclose()throwsException{super.close();if(statement !=null){
  23. statement.close();}if(connection !=null){
  24. connection.close();}}}}

第四步:配置Spring Boot

在application.properties中添加必要的配置:

  1. # Spring Boot configuration
  2. server.port=8080

第五步:运行和测试

  • 启动MySQL和Oracle数据库:确保你的源和目标数据库已经运行,并且创建了相应的数据库和表。
  • 启动Spring Boot应用:启动Spring Boot应用程序,会自动运行Flink ETL任务。
  • 测试Flink ETL任务:插入一些数据到源数据库的表中,验证数据是否同步到目标数据库的表中。

总结

通过上述步骤,你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据,进行数据清洗和转换,并将结果加载到目标MySQL数据库中。根据你的具体需求,你可以扩展和修改这个示例,处理更复杂的数据转换和加载逻辑。

标签: apache flink mysql

本文转载自: https://blog.csdn.net/qq_38411796/article/details/139770741
版权归原作者 Jack_hrx 所有, 如有侵权,请联系我们删除。

“使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程”的评论:

还没有评论