0


[大数据 Flink,Java实现不同数据库实时数据同步过程]

🌮前言:

 🌮笔记

🌮实现Mysql同步Es的过程包括以下步骤:

  • 配置Mysql数据库连接: 使用Flink的JDBC连接器来连接Mysql数据库,并定义查询语句,获取需要同步的数据。同时,需要在Flink的配置文件中配置Mysql数据库的连接信息。
  • 配置Elasticsearch连接: 使用Flink的Elasticsearch连接器来连接Elasticsearch,并定义索引和类型,用于将同步的数据写入到指定的索引中。同时,需要在Flink的配置文件中配置Elasticsearch的连接信息。
  • 实现数据的转换和处理: 通过Flink的DataStream API,将从Mysql中查询到的数据转换为Elasticsearch中的文档格式,并进行相应的处理和处理,如去重、过滤等。
  • 实现数据的批量写入: 使用Flink的Elasticsearch连接器提供的批量写入接口,将转换后的数据批量写入到Elasticsearch中。
  • 实现实时同步: 将以上步骤组合成一个Flink Job,并通过Flink的DataStream API实现实时同步,即从Mysql数据库中读取到最新的数据,经过转换和处理后,实时写入到Elasticsearch中。

需要注意的是,在实现实时同步过程中,需要考虑到数据的幂等性和错误处理机制,以保证同步过程的稳定性和可靠性。同时,也需要考虑到数据的增量同步和全量同步的情况,以便根据实际需求进行调整和优化。

🌮配置Mysql数据库连接

需要使用Flink的JDBC连接器来连接Mysql数据库,并定义查询语句,获取需要同步的数据。同时,需要在Flink的配置文件中配置Mysql数据库的连接信息。

🌮在Flink的配置文件中,添加Mysql数据库的连接信息。可以在flink-conf.yaml文件中添加如下配置:

Mysql数据库连接信息

env.java.opts: "-Dmysql.url=jdbc:mysql://localhost:3306/test -Dmysql.username=root -Dmysql.password=123456"

mysql.url表示Mysql数据库的连接地址,mysql.username表示Mysql数据库的用户名,mysql.password表示Mysql数据库的密码。

🌮在Flink程序中,使用JDBCInputFormat来连接Mysql数据库,并定义查询语句,获取需要同步的数据。具体代码如下:

// 定义Mysql数据库连接信息
String mysqlUrl = System.getProperty("mysql.url");
String mysqlUsername = System.getProperty("mysql.username");
String mysqlPassword = System.getProperty("mysql.password");

// 定义查询语句
String query = "SELECT * FROM user";

// 定义JDBC连接器
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.mysql.jdbc.Driver")
    .setDBUrl(mysqlUrl)
    .setUsername(mysqlUsername)
    .setPassword(mysqlPassword)
    .setQuery(query)
    .setRowTypeInfo(rowTypeInfo)
    .finish();

// 读取Mysql数据库中的数据
DataStream<Row> mysqlDataStream = env.createInput(jdbcInputFormat);

rowTypeInfo表示数据类型信息,需要根据Mysql数据库中的表结构来定义。

🌮最后,将步骤2中读取到的数据封装成一个Flink的DataStream程序,用于后续的数据处理和写入Es中。

// 将读取到的数据封装成一个Flink的DataStream程序
DataStream<String> jsonDataStream = mysqlDataStream.map(new MapFunction<Row, String>() {
    @Override
    public String map(Row row) throws Exception {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", row.getField(0));
        jsonObject.put("name", row.getField(1));
        jsonObject.put("age", row.getField(2));
        return jsonObject.toJSONString();
    }
});

🌮配置Elasticsearch连接

需要配置Elasticsearch连接,使用Flink的Elasticsearch连接器来连接Elasticsearch,并定义索引和类型,用于将同步的数据写入到指定的索引中。同时,需要在Flink的配置文件中配置Elasticsearch的连接信息。

🌮在Flink的配置文件中,添加Elasticsearch的连接信息。可以在flink-conf.yaml文件中添加如下配置:

Elasticsearch连接信息

env.java.opts: "-Delasticsearch.hosts=http://localhost:9200"

🌮在Flink程序中,使用ElasticsearchSinkFunction将数据写入Elasticsearch中。具体代码如下:

// 定义Elasticsearch连接信息
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200, "http"));

// 定义ElasticsearchSinkFunction
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<String>() {
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        IndexRequest indexRequest = Requests.indexRequest()
            .index("user")
            .type("_doc")
            .source(element, XContentType.JSON);
        indexer.add(indexRequest);
    }
});

// 将数据写入Elasticsearch中
jsonDataStream.addSink(esSinkBuilder.build());

httpHosts表示Elasticsearch的连接地址,ElasticsearchSinkFunction用于将数据写入Elasticsearch中。在ElasticsearchSinkFunction中,可以定义索引和类型,用于将数据写入到指定的索引中。

以上代码中,将数据写入到名为"user"的索引中,类型为"_doc"。同时,使用IndexRequest将数据写入Elasticsearch中。

🌮实现数据的转换和处理

  • 在第二步中,已经将从Mysql中查询到的数据转换成了JSON格式。接下来,需要将JSON格式的数据转换成Elasticsearch中的文档格式。可以使用Elasticsearch的Bulk API来实现。
  • 在转换成Elasticsearch中的文档格式之前,需要进行去重操作,避免重复写入相同的数据。可以使用Flink的KeyedStream API来实现。
// 将JSON格式的数据转换成Elasticsearch中的文档格式
DataStream<IndexRequest> esDataStream = jsonDataStream.map(new MapFunction<String, IndexRequest>() {
    @Override
    public IndexRequest map(String json) throws Exception {
        JSONObject jsonObject = JSON.parseObject(json);
        String id = jsonObject.getString("id");
        IndexRequest indexRequest = new IndexRequest("user", "_doc", id);
        indexRequest.source(json, XContentType.JSON);
        return indexRequest;
    }
});

// 进行去重操作
KeyedStream<IndexRequest, String> keyedStream = esDataStream.keyBy(new KeySelector<IndexRequest, String>() {
    @Override
    public String getKey(IndexRequest indexRequest) throws Exception {
        return indexRequest.id();
    }
});

// 将去重后的数据写入Elasticsearch中
keyedStream.addSink(esSinkBuilder.build());

使用MapFunction将JSON格式的数据转换成Elasticsearch中的文档格式。在转换成Elasticsearch中的文档格式之前,使用KeyedStream API进行去重操作,避免重复写入相同的数据。最后,将去重后的数据写入Elasticsearch中。

🌮实现数据的批量写入:

在第三步中已经使用了Elasticsearch的Bulk API来实现将转换后的数据批量写入到Elasticsearch中。具体代码如下:

// 将JSON格式的数据转换成Elasticsearch中的文档格式
DataStream<IndexRequest> esDataStream = jsonDataStream.map(new MapFunction<String, IndexRequest>() {
    @Override
    public IndexRequest map(String json) throws Exception {
        JSONObject jsonObject = JSON.parseObject(json);
        String id = jsonObject.getString("id");
        IndexRequest indexRequest = new IndexRequest("user", "_doc", id);
        indexRequest.source(json, XContentType.JSON);
        return indexRequest;
    }
});

// 进行去重操作
KeyedStream<IndexRequest, String> keyedStream = esDataStream.keyBy(new KeySelector<IndexRequest, String>() {
    @Override
    public String getKey(IndexRequest indexRequest) throws Exception {
        return indexRequest.id();
    }
});

// 将去重后的数据写入Elasticsearch中
ElasticsearchSink.Builder<IndexRequest> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<IndexRequest>() {
    @Override
    public void process(IndexRequest indexRequest, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(indexRequest);
    }
});

keyedStream.addSink(esSinkBuilder.build());

在ElasticsearchSinkFunction中,使用RequestIndexer将数据批量写入到Elasticsearch中。需要注意的是,ElasticsearchSinkFunction的泛型类型需要与KeyedStream的泛型类型保持一致。

以上代码中,使用KeyedStream API进行去重操作,避免重复写入相同的数据。最后,使用Elasticsearch的Bulk API将去重后的数据批量写入到Elasticsearch中。

🌮实现实时同步:

// 定义Mysql数据库连接信息
String mysqlUrl = System.getProperty("mysql.url");
String mysqlUsername = System.getProperty("mysql.username");
String mysqlPassword = System.getProperty("mysql.password");

// 定义查询语句
String query = "SELECT * FROM user";

// 定义JDBC连接器
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.mysql.jdbc.Driver")
    .setDBUrl(mysqlUrl)
    .setUsername(mysqlUsername)
    .setPassword(mysqlPassword)
    .setQuery(query)
    .setRowTypeInfo(rowTypeInfo)
    .finish();

// 读取Mysql数据库中的数据
DataStream<Row> mysqlDataStream = env.createInput(jdbcInputFormat);

// 将读取到的数据转换成JSON格式
DataStream<String> jsonDataStream = mysqlDataStream.map(new MapFunction<Row, String>() {
    @Override
    public String map(Row row) throws Exception {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", row.getField(0));
        jsonObject.put("name", row.getField(1));
        jsonObject.put("age", row.getField(2));
        return jsonObject.toJSONString();
    }
});

// 将JSON格式的数据转换成Elasticsearch中的文档格式
DataStream<IndexRequest> esDataStream = jsonDataStream.map(new MapFunction<String, IndexRequest>() {
    @Override
    public IndexRequest map(String json) throws Exception {
        JSONObject jsonObject = JSON.parseObject(json);
        String id = jsonObject.getString("id");
        IndexRequest indexRequest = new IndexRequest("user", "_doc", id);
        indexRequest.source(json, XContentType.JSON);
        return indexRequest;
    }
});

// 进行去重操作
KeyedStream<IndexRequest, String> keyedStream = esDataStream.keyBy(new KeySelector<IndexRequest, String>() {
    @Override
    public String getKey(IndexRequest indexRequest) throws Exception {
        return indexRequest.id();
    }
});

// 将去重后的数据写入Elasticsearch中
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200, "http"));

ElasticsearchSink.Builder<IndexRequest> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<IndexRequest>() {
    @Override
    public void process(IndexRequest indexRequest, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(indexRequest);
    }
});

keyedStream.addSink(esSinkBuilder.build());

// 执行Flink程序
env.execute("Mysql to Es");

🌮依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
        <version>1.13.2</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.76</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.15.0</version>
    </dependency>
</dependencies>

flink-java、flink-streaming-java_2.12、flink-connector-jdbc_2.12、flink-connector-elasticsearch7_2.12是Flink的核心依赖;fastjson是用于将数据转换成JSON格式的依赖;elasticsearch-rest-high-level-client是Elasticsearch的Java客户端依赖。


本文转载自: https://blog.csdn.net/sqL520lT/article/details/131324289
版权归原作者 是汤圆丫 所有, 如有侵权,请联系我们删除。

“[大数据 Flink,Java实现不同数据库实时数据同步过程]”的评论:

还没有评论