0


使用Apache Flink实现MySQL数据读取和写入的完整指南

1. 导言:

Apache Flink是一款功能强大的流式处理引擎,可用于实时处理大规模数据。本文将介绍如何使用Flink与MySQL数据库进行交互,以清洗股票数据为例。

2. 环境准备:

首先,确保已安装Apache Flink并配置好MySQL数据库。导入相关依赖包,并创建必要的Table。同时需要提前创建好mysql表,一行source表,一张sink表。

CREATETABLE `re_stock_code_price` (
  `id` bigint NOTNULLAUTO_INCREMENT,
  `code` varchar(64)CHARACTERSET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOTNULLCOMMENT'股票代码',
  `name` varchar(64)CHARACTERSET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOTNULLCOMMENT'股票名称',
  `close` doubleDEFAULTNULLCOMMENT'最新价',
  `change_percent` doubleDEFAULTNULLCOMMENT'涨跌幅',
  `change` doubleDEFAULTNULLCOMMENT'涨跌额',
  `volume` doubleDEFAULTNULLCOMMENT'成交量(手)',
  `amount` doubleDEFAULTNULLCOMMENT'成交额',
  `amplitude` doubleDEFAULTNULLCOMMENT'振幅',
  `turnover_rate` doubleDEFAULTNULLCOMMENT'换手率',
  `peration` doubleDEFAULTNULLCOMMENT'市盈率',
  `volume_rate` doubleDEFAULTNULLCOMMENT'量比',
  `hign` doubleDEFAULTNULLCOMMENT'最高',
  `low` doubleDEFAULTNULLCOMMENT'最低',
  `open` doubleDEFAULTNULLCOMMENT'今开',
  `previous_close` doubleDEFAULTNULLCOMMENT'昨收',
  `pb` doubleDEFAULTNULLCOMMENT'市净率',
  `create_time` varchar(64)NOTNULLCOMMENT'写入时间',
  `rise` intNOTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=11207DEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 

CREATETABLE `t_stock_code_price` (
  `id` bigint NOTNULLAUTO_INCREMENT,
  `code` varchar(64)CHARACTERSET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOTNULLCOMMENT'股票代码',
  `name` varchar(64)CHARACTERSET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOTNULLCOMMENT'股票名称',
  `close` doubleDEFAULTNULLCOMMENT'最新价',
  `change_percent` doubleDEFAULTNULLCOMMENT'涨跌幅',
  `change` doubleDEFAULTNULLCOMMENT'涨跌额',
  `volume` doubleDEFAULTNULLCOMMENT'成交量(手)',
  `amount` doubleDEFAULTNULLCOMMENT'成交额',
  `amplitude` doubleDEFAULTNULLCOMMENT'振幅',
  `turnover_rate` doubleDEFAULTNULLCOMMENT'换手率',
  `peration` doubleDEFAULTNULLCOMMENT'市盈率',
  `volume_rate` doubleDEFAULTNULLCOMMENT'量比',
  `hign` doubleDEFAULTNULLCOMMENT'最高',
  `low` doubleDEFAULTNULLCOMMENT'最低',
  `open` doubleDEFAULTNULLCOMMENT'今开',
  `previous_close` doubleDEFAULTNULLCOMMENT'昨收',
  `pb` doubleDEFAULTNULLCOMMENT'市净率',
  `create_time` varchar(64)NOTNULLCOMMENT'写入时间',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=11207DEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 
packageorg.east;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.scala.StreamTableEnvironment;object TableETL {def main(args: Array[String]):Unit={val senv = StreamExecutionEnvironment.getExecutionEnvironment
            .setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)// 定义源表val source_table ="""
            CREATE TEMPORARY TABLE t_stock_code_price (
                id BIGINT NOT NULL,
                code STRING NOT NULL,
                -- 其他字段...
                create_time STRING NOT NULL,
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://localhost:3306/mydb',
                'driver' = 'com.mysql.cj.jdbc.Driver',
                'table-name' = 't_stock_code_price',
                'username' = 'root',
                'password' = '12345678'
            )
            """.stripMargin

        // 定义目标表val sink_table ="""
            CREATE TEMPORARY TABLE re_stock_code_price (
                id BIGINT NOT NULL,
                code STRING NOT NULL,
                -- 其他字段...
                create_time STRING NOT NULL,
                rise INT,
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://localhost:3306/mydb',
                'driver' = 'com.mysql.cj.jdbc.Driver',
                'table-name' = 're_stock_code_price',
                'username' = 'root',
                'password' = '12345678'
            )
            """.stripMargin

        tEnv.executeSql(source_table)
        tEnv.executeSql(sink_table)

在这段代码中,我们首先创建了Flink的流式执行环境和StreamTableEnvironment。然后,我们定义了两个临时表,用于存储原始股票数据和清洗后的数据。

3. 数据清洗:

接下来,我们执行数据清洗操作,并将结果写入目标表。

// 执行清洗操作,并将结果写入目标表
        tEnv.executeSql("INSERT INTO re_stock_code_price "+"SELECT *, CASE WHEN change_percent > 0 THEN 1 ELSE 0 END AS rise FROM t_stock_code_price")

在这里,我们计算了股票涨跌情况,并将结果写入到目标表中。在这个例子中,我们假设change_percent字段表示股票价格的变化百分比,rise字段为1表示股票上涨,为0表示股票下跌。

4. 结果展示:

最后,我们查询目标表并打印结果。

在这里插入图片描述
在这里插入图片描述

5. 完整代码:

下面是完整的代码:

packageorg.east;importorg.apache.flink.api.common.RuntimeExecutionMode
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object TableETL {def main(args: Array[String]):Unit={val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(senv)val source_table ="""
        |CREATE TEMPORARY TABLE t_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 't_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin

    val sink_table ="""
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin

    tEnv.executeSql(source_table)
    tEnv.executeSql(sink_table)
    tEnv.executeSql("insert into re_stock_code_price select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")val user_DS = tEnv.executeSql("select * from re_stock_code_price")

    user_DS.print()}}

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。
在这里插入图片描述

标签: apache flink mysql

本文转载自: https://blog.csdn.net/lhyandlwl/article/details/137121693
版权归原作者 放学-别走 所有, 如有侵权,请联系我们删除。

“使用Apache Flink实现MySQL数据读取和写入的完整指南”的评论:

还没有评论