0


Flink之JDBC Sink

这里介绍一下Flink Sink中jdbc sink的使用方法,以

mysql

为例,这里代码分为两种,事务和非事务

  • 非事务代码
importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcExecutionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.connector.jdbc.JdbcStatementBuilder;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importjava.sql.PreparedStatement;importjava.sql.SQLException;/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/8/2
 * @Description: 测试
 **/publicclassFlinkJdbcSink{publicstaticvoidmain(String[] args)throwsException{// 构建流环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(newCustomizeSource());// 构建jdbc sinkSinkFunction<CustomizeBean> jdbcSink =JdbcSink.sink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)",// 数据插入sql语句newJdbcStatementBuilder<CustomizeBean>(){@Overridepublicvoidaccept(PreparedStatement pStmt,CustomizeBean customizeBean)throwsSQLException{
                        pStmt.setString(1, customizeBean.getName());
                        pStmt.setInt(2, customizeBean.getAge());
                        pStmt.setString(3, customizeBean.getGender());
                        pStmt.setString(4, customizeBean.getHobbit());}},// 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withBatchSize(10)// 批次大小,条数.withBatchIntervalMs(5000)// 批次最大等待时间.withMaxRetries(1)// 重复次数.build(),// 写入参数配置newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false").withUsername("root").withPassword("password").build()// jdbc信息配置);// 添加jdbc sink
        customizeSource.addSink(jdbcSink);
        env.execute();}}
  • 事务代码
importcom.mysql.cj.jdbc.MysqlXADataSource;importorg.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;importorg.apache.flink.connector.jdbc.JdbcExecutionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.connector.jdbc.JdbcStatementBuilder;importorg.apache.flink.streaming.api.CheckpointingMode;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.CheckpointConfig;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importorg.apache.flink.util.function.SerializableSupplier;importjavax.sql.XADataSource;/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/8/2
 * @Description: 测试
 **/publicclassFlinkJdbcSink{publicstaticvoidmain(String[] args)throwsException{// 构建流环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(newCustomizeSource());// 每20秒作为checkpoint的一个周期
        env.enableCheckpointing(20000);// 两次checkpoint间隔最少是10秒
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);// 程序取消或者停止时不删除checkpoint
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// checkpoint必须在60秒结束,否则将丢弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只能有一个checkpoint
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置EXACTLY_ONCE语义,默认就是这个
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoint存储位置
        env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpointSinkFunction<CustomizeBean> exactlyOneJdbcSink =JdbcSink.exactlyOnceSink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)",// 数据插入sql语句(JdbcStatementBuilder<CustomizeBean>)(pStmt, customizeBean)->{
                    pStmt.setString(1, customizeBean.getName());
                    pStmt.setInt(2, customizeBean.getAge());
                    pStmt.setString(3, customizeBean.getGender());
                    pStmt.setString(4, customizeBean.getHobbit());},// 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withMaxRetries(0)// 设置重复次数.withBatchSize(25)// 设置批次大小,数据条数.withBatchIntervalMs(1000)// 批次最大等待时间.build(),JdbcExactlyOnceOptions.builder()// 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的.withTransactionPerConnection(true).build(),(SerializableSupplier<XADataSource>)()->{// XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接MysqlXADataSource mysqlXADataSource =newMysqlXADataSource();
                    mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false");// 设置url
                    mysqlXADataSource.setUser("root");// 设置用户
                    mysqlXADataSource.setPassword("password");// 设置密码return mysqlXADataSource;});// 添加jdbc sink
        customizeSource.addSink(exactlyOneJdbcSink);
        env.execute();}}
  • pom依赖
<!-- 在原有的依赖中加入下面两个内容 --><!-- JDBC connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><!-- mysql驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>
  • 结果在这里插入图片描述 jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.
标签: flink 大数据

本文转载自: https://blog.csdn.net/AnameJL/article/details/132065766
版权归原作者 飞天小老头 所有, 如有侵权,请联系我们删除。

“Flink之JDBC Sink”的评论:

还没有评论