0


Flink读取mysql数据库(java)

代码如下:

package com.weilanaoli.ruge.vlink.flink;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.logging.Level;
import java.util.logging.Logger;

class MysqlExample {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("xx.xx.xx.xx")    //输入地址
                .port(3306)                 //输入端口
                .databaseList("xx")         //输入库名
                .tableList("xx.test")       //输入表名
                .username("xx")             //输入用户名
                .password("xxxx")           //输入密码
                .startupOptions(StartupOptions.initial())  //读取binlog策略,这个启动选项有五种
                .deserializer(new JsonDebeziumDeserializationSchema()) //配置不要锁表,但是数据一致性不是精准一次,会变成最少一次
                .build();
        
        //配置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // enable checkpoint
        env.enableCheckpointing(3000);
        DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
        SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {
            @Override
            public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {
                try {
                    System.out.println("processElement=====" + value);
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        dataStreamSource.print("原始数据=====");
        env.execute("Print MySQL Snapshot + Binlog");
    }
}

运行结果如下:

标签: 数据库 flink mysql

本文转载自: https://blog.csdn.net/Jmayday/article/details/132067264
版权归原作者 Jmayday 所有, 如有侵权,请联系我们删除。

“Flink读取mysql数据库(java)”的评论:

还没有评论