使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程
实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。
- 1. 从源数据库(MySQL和Oracle)实时抽取数据。
- 2. 对数据进行清洗和转换。
- **3. 将转换后的数据写入目标数据库(MySQL)**。
我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力,适合处理实时数据同步和转换任务。
环境准备
- 确保MySQL和Oracle数据库运行**,并创建相应的表。
- 创建Spring Boot项目,并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。
第一步:创建源和目标数据库表
假设我们有以下三个表:
- source_mysql_table(MySQL中的源表)
- source_oracle_table(Oracle中的源表)
- target_table(目标MySQL表)
MySQL源表
CREATEDATABASE source_mysql_db;USE source_mysql_db;CREATETABLE source_mysql_table (
id INTAUTO_INCREMENTPRIMARYKEY,
user_id VARCHAR(255)NOTNULL,actionVARCHAR(255)NOTNULL,timestampVARCHAR(255)NOTNULL);
Oracle源表
CREATETABLE source_oracle_table (
id NUMBER GENERATED BYDEFAULTONNULLASIDENTITY,
user_id VARCHAR2(255)NOTNULL,action VARCHAR2(255)NOTNULL,timestamp VARCHAR2(255)NOTNULL,PRIMARYKEY(id));
目标MySQL表
CREATEDATABASE target_db;USE target_db;CREATETABLE target_table (
id INTAUTO_INCREMENTPRIMARYKEY,
user_id VARCHAR(255)NOTNULL,actionVARCHAR(255)NOTNULL,timestampVARCHAR(255)NOTNULL);
第二步:添加项目依赖
在pom.xml中添加Flink、MySQL和Oracle相关的依赖:
<dependencies><!-- Spring Boot dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!-- MySQL JDBC driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><!-- Oracle JDBC driver --><dependency><groupId>com.oracle.database.jdbc</groupId><artifactId>ojdbc8</artifactId><version>19.8.0.0</version></dependency></dependencies>
第三步:编写Flink ETL任务
创建一个Flink任务类来实现ETL逻辑。
创建一个POJO类表示数据结构
packagecom.example.flink;publicclassUserAction{privateint id;privateString userId;privateString action;privateString timestamp;// Getters and setterspublicintgetId(){return id;}publicvoidsetId(int id){this.id = id;}publicStringgetUserId(){return userId;}publicvoidsetUserId(String userId){this.userId = userId;}publicStringgetAction(){return action;}publicvoidsetAction(String action){this.action = action;}publicStringgetTimestamp(){return timestamp;}publicvoidsetTimestamp(String timestamp){this.timestamp = timestamp;}}
编写Flink任务类
packagecom.example.flink;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importorg.apache.flink.util.Collector;importorg.springframework.boot.CommandLineRunner;importorg.springframework.stereotype.Component;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.ResultSet;@ComponentpublicclassFlinkETLJobimplementsCommandLineRunner{@Overridepublicvoidrun(String... args)throwsException{finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 从MySQL读取数据DataStream<UserAction> mysqlDataStream = env.addSource(newMySQLSource());// 从Oracle读取数据DataStream<UserAction> oracleDataStream = env.addSource(newOracleSource());// 合并两个数据流DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);// 清洗和转换数据DataStream<UserAction> transformedStream = mergedStream.map(newMapFunction<UserAction,UserAction>(){@OverridepublicUserActionmap(UserAction value)throwsException{// 进行清洗和转换
value.setAction(value.getAction().toUpperCase());return value;}});// 将数据写入目标MySQL数据库
transformedStream.addSink(newMySQLSink());// 执行任务
env.execute("Flink ETL Job");}publicstaticclassMySQLSourceimplementsSourceFunction<UserAction>{privatestaticfinalStringJDBC_URL="jdbc:mysql://localhost:3306/source_mysql_db";privatestaticfinalStringJDBC_USER="source_user";privatestaticfinalStringJDBC_PASSWORD="source_password";privatevolatileboolean isRunning =true;@Overridepublicvoidrun(SourceContext<UserAction> ctx)throwsException{try(Connection connection =DriverManager.getConnection(JDBC_URL,JDBC_USER,JDBC_PASSWORD)){while(isRunning){String sql ="SELECT * FROM source_mysql_table";try(PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()){while(resultSet.next()){UserAction userAction =newUserAction();
userAction.setId(resultSet.getInt("id"));
userAction.setUserId(resultSet.getString("user_id"));
userAction.setAction(resultSet.getString("action"));
userAction.setTimestamp(resultSet.getString("timestamp"));
ctx.collect(userAction);}}Thread.sleep(5000);// 模拟实时数据流,每5秒查询一次}}}@Overridepublicvoidcancel(){
isRunning =false;}}publicstaticclassOracleSourceimplementsSourceFunction<UserAction>{privatestaticfinalStringJDBC_URL="jdbc:oracle:thin:@localhost:1521:orcl";privatestaticfinalStringJDBC_USER="source_user";privatestaticfinalStringJDBC_PASSWORD="source_password";privatevolatileboolean isRunning =true;@Overridepublicvoidrun(SourceContext<UserAction> ctx)throwsException{try(Connection connection =DriverManager.getConnection(JDBC_URL,JDBC_USER,JDBC_PASSWORD)){while(isRunning){String sql ="SELECT * FROM source_oracle_table";try(PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()){while(resultSet.next()){UserAction userAction =newUserAction();
userAction.setId(resultSet.getInt("id"));
userAction.setUserId(resultSet.getString("user_id"));
userAction.setAction(resultSet.getString("action"));
userAction.setTimestamp(resultSet.getString("timestamp"));
ctx.collect(userAction);}}Thread.sleep(5000);// 模拟实时数据流,每5秒查询一次}}}@Overridepublicvoidcancel(){
isRunning =false;}}publicstaticclassMySQLSinkextendsRichFlatMapFunction<UserAction,Void>{privatestaticfinalStringJDBC_URL="jdbc:mysql://localhost:3306/target_db";privatestaticfinalStringJDBC_USER="target_user";privatestaticfinalStringJDBC_PASSWORD="target_password";privatetransientConnection connection;privatetransientPreparedStatement statement;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);
connection =DriverManager.getConnection(JDBC_URL,JDBC_USER,JDBC_PASSWORD);String sql ="INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";
statement = connection.prepareStatement(sql);}@OverridepublicvoidflatMap(UserAction value,Collector<Void> out)throwsException{
statement.setString(1, value.getUserId());
statement.setString(2, value.getAction());
statement.setString(3, value.getTimestamp());
statement.executeUpdate();}@Overridepublicvoidclose()throwsException{super.close();if(statement !=null){
statement.close();}if(connection !=null){
connection.close();}}}}
第四步:配置Spring Boot
在application.properties中添加必要的配置:
# Spring Boot configuration
server.port=8080
第五步:运行和测试
- 启动MySQL和Oracle数据库:确保你的源和目标数据库已经运行,并且创建了相应的数据库和表。
- 启动Spring Boot应用:启动Spring Boot应用程序,会自动运行Flink ETL任务。
- 测试Flink ETL任务:插入一些数据到源数据库的表中,验证数据是否同步到目标数据库的表中。
总结
通过上述步骤,你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据,进行数据清洗和转换,并将结果加载到目标MySQL数据库中。根据你的具体需求,你可以扩展和修改这个示例,处理更复杂的数据转换和加载逻辑。
版权归原作者 Jack_hrx 所有, 如有侵权,请联系我们删除。