一、写在前面
在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。
二、代码示例
2.1 版本说明
<flink.version>1.14.6</flink.version><spark.version>2.4.3</spark.version><hadoop.version>2.8.5</hadoop.version><hbase.version>1.4.9</hbase.version><hive.version>2.3.5</hive.version><java.version>1.8</java.version><scala.version>2.11.8</scala.version><mysql.version>8.0.22</mysql.version><scala.binary.version>2.11</scala.binary.version>
2.2 导入相关依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><!--mysql连接器依赖--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.22</version></dependency>
2.3 连接数据库,创建表
mysql> CREATE TABLE `ws`(`id` varchar(100) NOT NULL
,`ts` bigint(20) DEFAULT NULL
,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`))ENGINE=InnoDB DEFAULT CHARSET=utf8
2.4 创建POJO类
packagecom.flink.POJOs;importjava.util.Objects;/**
* TODO POJO类的特点
* 类是公有(public)的
* 有一个无参的构造方法
* 所有属性都是公有(public)的
* 所有属性的类型都是可以序列化的
*/publicclassWaterSensor{//类的公共属性publicString id;publicLong ts;publicInteger vc;//无参构造方法publicWaterSensor(){//System.out.println("调用了无参数的构造方法");}publicWaterSensor(String id,Long ts,Integer vc){this.id = id;this.ts = ts;this.vc = vc;}//生成get和set方法publicvoidsetId(String id){this.id = id;}publicvoidsetTs(Long ts){this.ts = ts;}publicvoidsetVc(Integer vc){this.vc = vc;}publicStringgetId(){return id;}publicLonggetTs(){return ts;}publicIntegergetVc(){return vc;}//重写toString方法@OverridepublicStringtoString(){return"WaterSensor{"+"id='"+ id +'\''+", ts="+ ts +", vc="+ vc +'}';}//重写equals和hasCode方法@Overridepublicbooleanequals(Object o){if(this== o)returntrue;if(o ==null||getClass()!= o.getClass())returnfalse;WaterSensor that =(WaterSensor) o;return id.equals(that.id)&& ts.equals(that.ts)&& vc.equals(that.vc);}@OverridepublicinthashCode(){returnObjects.hash(id, ts, vc);}}//scala的case类?
2.5 自定义map函数
packagecom.flink.POJOs;importorg.apache.flink.api.common.functions.MapFunction;publicclassWaterSensorMapFunctionimplementsMapFunction<String,WaterSensor>{@OverridepublicWaterSensormap(String value)throwsException{String[] datas = value.split(",");returnnewWaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}}
2.5 Flink2MySQL
packagecom.flink.DataStream.Sink;importcom.flink.POJOs.WaterSensor;importcom.flink.POJOs.WaterSensorMapFunction;importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcExecutionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.connector.jdbc.JdbcStatementBuilder;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importjava.sql.PreparedStatement;importjava.sql.SQLException;/**
* Flink 输出到 MySQL(JDBC)
*/publicclass flinkSinkJdbc {publicstaticvoidmain(String[] args)throwsException{//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setParallelism(1);//TODO SourceDataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost",8888);//TODO TransferSingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(newWaterSensorMapFunction());/**TODO 写入 mysql
* 1、只能用老的 sink 写法
* 2、JDBCSink 的 4 个参数:
* 第一个参数: 执行的 sql,一般就是 insert into
* 第二个参数: 预编译 sql, 对占位符填充值
* 第三个参数: 执行选项 ---->攒批、重试
* 第四个参数: 连接选项---->url、用户名、密码
*/SinkFunction<WaterSensor> sinkFunction =JdbcSink.sink("insert into ws values(?,?,?)",newJdbcStatementBuilder<WaterSensor>(){@Overridepublicvoidaccept(PreparedStatement preparedStatement,WaterSensor waterSensor)throwsSQLException{
preparedStatement.setString(1, waterSensor.getId());
preparedStatement.setLong(2, waterSensor.getTs());
preparedStatement.setInt(3, waterSensor.getVc());System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");}},JdbcExecutionOptions.builder().withMaxRetries(3)// 重试次数.withBatchSize(100)// 批次的大小:条数.withBatchIntervalMs(3000)// 批次的时间.build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("********").withConnectionCheckTimeoutSeconds(60)// 重试的超时时间.build());//TODO 写入到Mysql
waterSensorSingleOutputStreamOperator.addSink(sinkFunction);
streamExecutionEnvironment.execute();}}
2.6 启动necat、Flink,观察数据库写入情况
nc -lk 9999 #启动necat、并监听8888端口,写入数据
启动Flink程序
查看数据库写入是否正常
本文转载自: https://blog.csdn.net/dgssd/article/details/134065171
版权归原作者 文文鑫 所有, 如有侵权,请联系我们删除。
版权归原作者 文文鑫 所有, 如有侵权,请联系我们删除。