0


使用Flink CDC将Mysql中的数据实时同步到ES

前言

最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间……

我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过flink sql-client实现的,但这有个问题,当fink集群重启,JOB就没有了,没有办法通过savePointing来恢复。所以还是记录下。

代码

直接上代码:

publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(3000);
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/savepointings");StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE orders (\n"+"   order_id INT,\n"+"   order_date TIMESTAMP(0),\n"+"   customer_name STRING,\n"+"   price DECIMAL(10, 5),\n"+"   product_id INT,\n"+"   order_status BOOLEAN,\n"+"   PRIMARY KEY (order_id) NOT ENFORCED\n"+" ) WITH (\n"+"   'connector' = 'mysql-cdc',\n"+"   'hostname' = 'localhost',\n"+"   'port' = '3306',\n"+"   'username' = 'root',\n"+"   'password' = '123456',\n"+"   'database-name' = 'mydb',\n"+"   'table-name' = 'orders'\n"+" );").await();

        tableEnv.executeSql("CREATE TABLE products (\n"+"    id INT,\n"+"    name STRING,\n"+"    description STRING,\n"+"    PRIMARY KEY (id) NOT ENFORCED\n"+"  ) WITH (\n"+"    'connector' = 'mysql-cdc',\n"+"    'hostname' = 'localhost',\n"+"    'port' = '3306',\n"+"    'username' = 'root',\n"+"    'password' = '123456',\n"+"    'database-name' = 'mydb',\n"+"    'table-name' = 'products'\n"+"  );").await();

        tableEnv.executeSql("CREATE TABLE enriched_orders (\n"+"   order_id INT,\n"+"   order_date TIMESTAMP(0),\n"+"   customer_name STRING,\n"+"   price DECIMAL(10, 5),\n"+"   product_id INT,\n"+"   order_status BOOLEAN,\n"+"   product_name STRING,\n"+"   product_description STRING,\n"+"   PRIMARY KEY (order_id) NOT ENFORCED\n"+" ) WITH (\n"+"     'connector' = 'elasticsearch-7',\n"+"     'hosts' = 'http://localhost:9200',\n"+"     'index' = 'enriched_orders_lhc'\n"+" );");

        tableEnv.executeSql("INSERT INTO enriched_orders\n"+" SELECT o.*, p.name, p.description\n"+" FROM orders AS o\n"+" LEFT JOIN products AS p ON o.product_id = p.id");

        env.execute("Mysql to ES");}

本文转载自: https://blog.csdn.net/lhcnicholas/article/details/129854091
版权归原作者 lhcnicholas 所有, 如有侵权,请联系我们删除。

“使用Flink CDC将Mysql中的数据实时同步到ES”的评论:

还没有评论