0


大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Sink 的基本概念等内容
  • Sink的相关信息 配置与使用
  • Sink案例写入Redis

在这里插入图片描述

JDBC Sink

在 Apache Flink 中,通过 JDBC Sink,可以将处理后的数据写入到 MySQL 数据库中。这对于将实时处理的数据持久化或与其他系统进行集成非常有用。

Flink JDBC Sink 简介

Flink 提供了 JdbcSink,它是基于 JDBC 协议的 Sink,可以将数据写入各种关系型数据库,包括 MySQL。在使用 JDBC Sink 时,需要提供数据库连接信息和 SQL 语句,通过这些信息,Flink 将数据流中的记录插入或更新到 MySQL 表中。

Flink 到 MySQL 的基本步骤

将数据流写入 MySQL 的步骤主要包括以下几点:

  • 依赖库配置:确保在项目中引入了 Flink 和 MySQL 相关的依赖库,通常需要配置 Maven 或 Gradle。
  • 定义数据源和数据流:创建并处理数据流。
  • 配置 JDBC Sink:提供数据库的连接信息和插入 SQL 语句。
  • 启动任务:将数据流写入 MySQL。

优化建议

在实际项目中,向 MySQL 插入大量数据时,应考虑以下优化策略:

  • 批量插入:通过 JdbcExecutionOptions 配置批量插入,可以大幅提升写入性能。
  • 连接池:对于高并发的写入操作,建议使用连接池来减少数据库连接开销。
  • 索引优化:为插入的表配置合适的索引,可以提高查询性能,但在大量写入时,索引可能会降低- 插入速度,因此需要权衡。
  • 数据分片:对于非常大规模的数据,可以考虑将数据分片并行写入不同的 MySQL 实例或分区表中。

案例:流数据下沉到MySQL

添加依赖

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>

编写代码

一个Person的类,对应MySQL中的一张表的字段。
模拟几条数据流,写入到 MySQL中。

packageicu.wzk;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;publicclassSinkSqlTest{publicstaticvoidmain(String[] args){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Person> data = env.getJavaEnv().fromElements(newPerson("wzk",18,1),newPerson("icu",20,1),newPerson("wzkicu",13,2));
        data.addSink(newMySqlSinkFunction());

        env.execute();}publicstaticclassMySqlSinkFunctionextendsRichSinkFunction<Person>{privatePreparedStatement preparedStatement =null;privateConnection connection =null;@Overridepublicvoidopen(Configuration parameters)throwsException{String url ="jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";String username ="hive";String password ="[email protected]";
            connection =DriverManager.getConnection(url, username, password);String sql ="INSERT INTI PERSON(name, age, sex) VALUES(?, ?, ?)";
            preparedStatement = connection.prepareStatement(sql);}@Overridepublicvoidinvoke(Person value,Context context)throwsException{
            preparedStatement.setString(1, value.getName());
            preparedStatement.setInt(2, value.getAge());
            preparedStatement.setInt(3, value.getSex());
            preparedStatement.executeUpdate();}@Overridepublicvoidclose()throwsException{if(null!= connection){
                connection.close();}if(null!= preparedStatement){
                preparedStatement.close();}}}publicstaticclassPerson{privateString name;privateInteger age;privateInteger sex;publicPerson(){}publicPerson(String name,Integer age,Integer sex){this.name = name;this.age = age;this.sex = sex;}publicStringgetName(){return name;}publicvoidsetName(String name){this.name = name;}publicIntegergetAge(){return age;}publicvoidsetAge(Integer age){this.age = age;}publicIntegergetSex(){return sex;}publicvoidsetSex(Integer sex){this.sex = sex;}}}

数据库配置

我们新建一张表出来,person表,里边有我们需要的字段。
在这里插入图片描述

运行代码

我们运行代码,等待运行结束。
在这里插入图片描述

查看结果

查看数据库中的数据,我们可以看到刚才模拟的数据已经成功写入了。
在这里插入图片描述

案例:写入到Kafka

编写代码

packageicu.wzk;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.scala.DataStream;importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;publicclassSinkKafkaTest{publicstaticvoidmain(String[] args){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> data = env.socketTextStream("localhost",9999,'\n',0);String brokerList ="h121.wzk.icu:9092";String topic ="flink_test";FlinkKafkaProducer<String> producer =newFlinkKafkaProducer<>(brokerList, topic,newSimpleStringSchema());
        data.addSink(producer);
        env.execute("SinkKafkaTest");}}

运行代码

启动一个 nc

nc-lk9999

我们通过回车的方式,可以发送数据。
在这里插入图片描述
Java 程序中等待
在这里插入图片描述

查看结果

我们登录到服务器查看信息

./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test --from-beginning

可以看到刚才的数据已经写入了:
在这里插入图片描述

标签: 大数据 flink mysql

本文转载自: https://blog.csdn.net/w776341482/article/details/141885165
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka”的评论:

还没有评论