0


Flink将数据写入MySQL(JDBC)

一、写在前面

在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。

二、代码示例

2.1 版本说明

<flink.version>1.14.6</flink.version><spark.version>2.4.3</spark.version><hadoop.version>2.8.5</hadoop.version><hbase.version>1.4.9</hbase.version><hive.version>2.3.5</hive.version><java.version>1.8</java.version><scala.version>2.11.8</scala.version><mysql.version>8.0.22</mysql.version><scala.binary.version>2.11</scala.binary.version>

2.2 导入相关依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><!--mysql连接器依赖--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.22</version></dependency>

2.3 连接数据库,创建表

mysql> CREATE TABLE `ws`(`id` varchar(100) NOT NULL
      ,`ts` bigint(20) DEFAULT NULL
      ,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`))ENGINE=InnoDB DEFAULT CHARSET=utf8

2.4 创建POJO类

packagecom.flink.POJOs;importjava.util.Objects;/**
 * TODO POJO类的特点
 * 类是公有(public)的
 * 有一个无参的构造方法
 * 所有属性都是公有(public)的
 * 所有属性的类型都是可以序列化的
 */publicclassWaterSensor{//类的公共属性publicString id;publicLong ts;publicInteger vc;//无参构造方法publicWaterSensor(){//System.out.println("调用了无参数的构造方法");}publicWaterSensor(String id,Long ts,Integer vc){this.id = id;this.ts = ts;this.vc = vc;}//生成get和set方法publicvoidsetId(String id){this.id = id;}publicvoidsetTs(Long ts){this.ts = ts;}publicvoidsetVc(Integer vc){this.vc = vc;}publicStringgetId(){return id;}publicLonggetTs(){return ts;}publicIntegergetVc(){return vc;}//重写toString方法@OverridepublicStringtoString(){return"WaterSensor{"+"id='"+ id +'\''+", ts="+ ts +", vc="+ vc +'}';}//重写equals和hasCode方法@Overridepublicbooleanequals(Object o){if(this== o)returntrue;if(o ==null||getClass()!= o.getClass())returnfalse;WaterSensor that =(WaterSensor) o;return id.equals(that.id)&& ts.equals(that.ts)&& vc.equals(that.vc);}@OverridepublicinthashCode(){returnObjects.hash(id, ts, vc);}}//scala的case类?

2.5 自定义map函数

packagecom.flink.POJOs;importorg.apache.flink.api.common.functions.MapFunction;publicclassWaterSensorMapFunctionimplementsMapFunction<String,WaterSensor>{@OverridepublicWaterSensormap(String value)throwsException{String[] datas = value.split(",");returnnewWaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}}

2.5 Flink2MySQL

packagecom.flink.DataStream.Sink;importcom.flink.POJOs.WaterSensor;importcom.flink.POJOs.WaterSensorMapFunction;importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcExecutionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.connector.jdbc.JdbcStatementBuilder;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importjava.sql.PreparedStatement;importjava.sql.SQLException;/**
 * Flink 输出到 MySQL(JDBC)
 */publicclass flinkSinkJdbc {publicstaticvoidmain(String[] args)throwsException{//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);//TODO SourceDataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost",8888);//TODO TransferSingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(newWaterSensorMapFunction());/**TODO 写入 mysql
         * 1、只能用老的 sink 写法
         * 2、JDBCSink 的 4 个参数:
         *   第一个参数: 执行的 sql,一般就是 insert into
         *   第二个参数: 预编译 sql, 对占位符填充值
         *   第三个参数: 执行选项 ---->攒批、重试
         *   第四个参数: 连接选项---->url、用户名、密码
         */SinkFunction<WaterSensor> sinkFunction =JdbcSink.sink("insert into ws values(?,?,?)",newJdbcStatementBuilder<WaterSensor>(){@Overridepublicvoidaccept(PreparedStatement preparedStatement,WaterSensor waterSensor)throwsSQLException{
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");}},JdbcExecutionOptions.builder().withMaxRetries(3)// 重试次数.withBatchSize(100)// 批次的大小:条数.withBatchIntervalMs(3000)// 批次的时间.build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("********").withConnectionCheckTimeoutSeconds(60)// 重试的超时时间.build());//TODO 写入到Mysql
        waterSensorSingleOutputStreamOperator.addSink(sinkFunction);

        streamExecutionEnvironment.execute();}}

2.6 启动necat、Flink,观察数据库写入情况

nc -lk 9999 #启动necat、并监听8888端口,写入数据

在这里插入图片描述
启动Flink程序
在这里插入图片描述
查看数据库写入是否正常
在这里插入图片描述

标签: flink 大数据

本文转载自: https://blog.csdn.net/dgssd/article/details/134065171
版权归原作者 文文鑫 所有, 如有侵权,请联系我们删除。

“Flink将数据写入MySQL(JDBC)”的评论:

还没有评论