0


使用FlinkCDC从mysql同步数据到ES,并实现数据检索

文章目录

一、背景

随着公司的业务量越来越大,查询需求越来越复杂,mysql已经不支持变化多样的复杂查询了。

于是,使用cdc捕获MySQL的数据变化,同步到ES中,进行数据的检索。

一、环境准备

1、创建ES索引

// 创建索引并指定映射PUT/course
{"mappings":{"properties":{"id":{"type":"keyword"},"name":{"type":"text"},"label":{"type":"text"},"content":{"type":"text"}}}}// 查询course下所有数据(备用)GET/course/_search
// 删除索引及数据(备用)DELETE/course

2、创建mysql数据表

CREATETABLE`course`(`id`varchar(32)NOTNULL,`name`varchar(255)DEFAULTNULL,`label`varchar(255)DEFAULTNULL,`content`varchar(255)DEFAULTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

二、使用FlinkCDC同步数据

1、导包

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.18.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>3.0.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>1.18.0</version></dependency>

2、demo

importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
 * cdc
 */publicclassCDCTest{publicstaticvoidmain(String[] args)throwsException{MySqlSource<String> mySqlSource =MySqlSource.<String>builder().hostname("192.168.56.10").port(3306).databaseList("mytest").tableList("mytest.course").username("root").password("root").deserializer(newJsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 开启检查点
        env.enableCheckpointing(3000);

        env
            .fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"MySQL Source")// 1个并行任务.setParallelism(1).addSink(newRichSinkFunction<String>(){privatefinalstaticElasticSearchUtil es =newElasticSearchUtil("192.168.56.10");@Overridepublicvoidinvoke(String value,Context context)throwsException{super.invoke(value, context);JSONObject jsonObject =JSON.parseObject(value);DataInfo dataInfo =newDataInfo();
                    dataInfo.setOp(jsonObject.getString("op"));
                    dataInfo.setBefore(jsonObject.getJSONObject("before"));
                    dataInfo.setAfter(jsonObject.getJSONObject("after"));
                    dataInfo.setDb(jsonObject.getJSONObject("source").getString("db"));
                    dataInfo.setTable(jsonObject.getJSONObject("source").getString("table"));if(dataInfo.getDb().equals("mytest")&& dataInfo.getTable().equals("course")){String id = dataInfo.getAfter().get("id").toString();if(dataInfo.getOp().equals("d")){
                            es.deleteById("course", id);}else{
                            es.put(dataInfo.getAfter(),"course", id);}}}}).setParallelism(1);// 对接收器使用并行性1来保持消息顺序

        env.execute("Print MySQL Snapshot + Binlog");}}

```java
importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importjava.util.Map;/**
 * 收集的数据类型
 * @author cuixiangfei
 * @since 20234-03-20
 */publicclassDataInfo{// 操作 c是create;u是update;d是delete;r是readprivateString op;privateString db;privateString table;privateMap<String,Object> before;privateMap<String,Object> after;publicStringgetOp(){return op;}publicvoidsetOp(String op){this.op = op;}publicStringgetDb(){return db;}publicvoidsetDb(String db){this.db = db;}publicStringgetTable(){return table;}publicvoidsetTable(String table){this.table = table;}publicMap<String,Object>getBefore(){return before;}publicvoidsetBefore(Map<String,Object> before){this.before = before;}publicMap<String,Object>getAfter(){return after;}publicvoidsetAfter(Map<String,Object> after){this.after = after;}publicbooleancheckOpt(){if(this.op.equals("r")){returnfalse;}returntrue;}@OverridepublicStringtoString(){return"DataInfo{"+"op='"+ op +'\''+", db='"+ db +'\''+", table='"+ table +'\''+", before="+ before +", after="+ after +'}';}publicstaticvoidmain(String[] args){String value ="{\"before\":{\"id\":\"333\",\"name\":\"333\",\"label\":\"333\",\"content\":\"3333\"},\"after\":{\"id\":\"333\",\"name\":\"33322\",\"label\":\"333\",\"content\":\"3333\"},\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710923957000,\"snapshot\":\"false\",\"db\":\"mytest\",\"sequence\":null,\"table\":\"course\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000008\",\"pos\":1318,\"row\":0,\"thread\":9,\"query\":null},\"op\":\"u\",\"ts_ms\":1710923957825,\"transaction\":null}";JSONObject jsonObject =JSON.parseObject(value);System.out.println(jsonObject.get("op"));System.out.println(jsonObject.get("before"));System.out.println(jsonObject.get("after"));System.out.println(jsonObject.getJSONObject("source").get("db"));System.out.println(jsonObject.getJSONObject("source").get("table"));}}

3、es工具类

springboot集成elasticSearch(附带工具类)

三、测试

1、先创建几条数据

INSERTINTO`mytest`.`course`(`id`,`name`,`label`,`content`)VALUES('1','11','111','1111');INSERTINTO`mytest`.`course`(`id`,`name`,`label`,`content`)VALUES('2','22 33','222 333','2222 3333');INSERTINTO`mytest`.`course`(`id`,`name`,`label`,`content`)VALUES('3','33 44','33 444','3333 4444');

2、启动cdc

3、查询es

在这里插入图片描述

4、增删改几条数据进行测验


本文转载自: https://blog.csdn.net/A_art_xiang/article/details/136877379
版权归原作者 秃了也弱了。 所有, 如有侵权,请联系我们删除。

“使用FlinkCDC从mysql同步数据到ES,并实现数据检索”的评论:

还没有评论