Flink Table API 读写 MySQL
importorg.apache.flink.connector.jdbc.table.JdbcConnectorOptions;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.DataTypes;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.Schema;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.TableDescriptor;importorg.apache.flink.table.api.TableEnvironment;importorg.apache.flink.table.api.TableResult;importstaticorg.apache.flink.table.api.Expressions.$;publicclassTableApiMysql{publicstaticvoidmain(String[] args){EnvironmentSettings settings =EnvironmentSettings.newInstance().inBatchMode().build();TableEnvironment tableEnv =TableEnvironment.create(settings);Schema schema =Schema.newBuilder().column("user_id",DataTypes.BIGINT()).column("user_name",DataTypes.STRING()).build();TableDescriptor tableDescriptor =TableDescriptor.forConnector("jdbc").option(JdbcConnectorOptions.URL,"jdbc:mysql://localhost:3306/tmp").option(JdbcConnectorOptions.USERNAME,"root").option(JdbcConnectorOptions.PASSWORD,"123456").option(JdbcConnectorOptions.TABLE_NAME,"test").schema(schema).build();
tableEnv.createTable("source", tableDescriptor);// 通过API执行selectSystem.out.println("select format 1: ");
tableEnv.from("source").select($("user_id"), $("user_name")).execute().print();// 写入mysql user_id是自增主键
tableEnv.executeSql("insert into source(user_name) select 'hello'");// 直接SQL执行select *System.out.println("select format 2: ");Table table = tableEnv.sqlQuery("select * from source");TableResult execute = table.execute();
execute.print();}}
本文转载自: https://blog.csdn.net/hopyGreat/article/details/134587428
版权归原作者 hopyGreat 所有, 如有侵权,请联系我们删除。
版权归原作者 hopyGreat 所有, 如有侵权,请联系我们删除。