前言
最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间……
我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过flink sql-client实现的,但这有个问题,当fink集群重启,JOB就没有了,没有办法通过savePointing来恢复。所以还是记录下。
代码
直接上代码:
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/savepointings");StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE orders (\n"+" order_id INT,\n"+" order_date TIMESTAMP(0),\n"+" customer_name STRING,\n"+" price DECIMAL(10, 5),\n"+" product_id INT,\n"+" order_status BOOLEAN,\n"+" PRIMARY KEY (order_id) NOT ENFORCED\n"+" ) WITH (\n"+" 'connector' = 'mysql-cdc',\n"+" 'hostname' = 'localhost',\n"+" 'port' = '3306',\n"+" 'username' = 'root',\n"+" 'password' = '123456',\n"+" 'database-name' = 'mydb',\n"+" 'table-name' = 'orders'\n"+" );").await();
tableEnv.executeSql("CREATE TABLE products (\n"+" id INT,\n"+" name STRING,\n"+" description STRING,\n"+" PRIMARY KEY (id) NOT ENFORCED\n"+" ) WITH (\n"+" 'connector' = 'mysql-cdc',\n"+" 'hostname' = 'localhost',\n"+" 'port' = '3306',\n"+" 'username' = 'root',\n"+" 'password' = '123456',\n"+" 'database-name' = 'mydb',\n"+" 'table-name' = 'products'\n"+" );").await();
tableEnv.executeSql("CREATE TABLE enriched_orders (\n"+" order_id INT,\n"+" order_date TIMESTAMP(0),\n"+" customer_name STRING,\n"+" price DECIMAL(10, 5),\n"+" product_id INT,\n"+" order_status BOOLEAN,\n"+" product_name STRING,\n"+" product_description STRING,\n"+" PRIMARY KEY (order_id) NOT ENFORCED\n"+" ) WITH (\n"+" 'connector' = 'elasticsearch-7',\n"+" 'hosts' = 'http://localhost:9200',\n"+" 'index' = 'enriched_orders_lhc'\n"+" );");
tableEnv.executeSql("INSERT INTO enriched_orders\n"+" SELECT o.*, p.name, p.description\n"+" FROM orders AS o\n"+" LEFT JOIN products AS p ON o.product_id = p.id");
env.execute("Mysql to ES");}
版权归原作者 lhcnicholas 所有, 如有侵权,请联系我们删除。