0


数据湖(十八):Flink与Iceberg整合SQL API操作

Flink与Iceberg整合SQL API操作

Flink SQL 在操作Iceberg时,对应的版本为Flink 1.11.x 与Iceberg0.11.1版本,目前,Flink1.14.2版本与Iceberg0.12.1版本对于SQL API 来说兼容有问题,所以这里使用Flink1.11.6版本与Iceberg0.11.1版本来演示Flink SQL API 操作Iceberg。

一、​​​​​​​​​​​​​​SQL API 创建Iceberg表并写入数据

1、创建新项目,导入如下maven依赖包

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <maven.compiler.source>1.8</maven.compiler.source>
  4. <maven.compiler.target>1.8</maven.compiler.target>
  5. <!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
  6. <flink.version>1.11.6</flink.version>
  7. <hadoop.version>3.2.2</hadoop.version>
  8. </properties>
  9. <dependencies>
  10. <!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
  11. <dependency>
  12. <groupId>org.apache.iceberg</groupId>
  13. <artifactId>iceberg-flink-runtime</artifactId>
  14. <version>0.11.1</version>
  15. </dependency>
  16. <!-- java 开发Flink 所需依赖 -->
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-java</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-streaming-java_2.11</artifactId>
  25. <version>${flink.version}</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-clients_2.11</artifactId>
  30. <version>${flink.version}</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.flink</groupId>
  34. <artifactId>flink-streaming-scala_2.11</artifactId>
  35. <version>${flink.version}</version>
  36. </dependency>
  37. <!-- Flink Kafka连接器的依赖 -->
  38. <dependency>
  39. <groupId>org.apache.flink</groupId>
  40. <artifactId>flink-connector-kafka_2.11</artifactId>
  41. <version>${flink.version}</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.apache.flink</groupId>
  45. <artifactId>flink-csv</artifactId>
  46. <version>${flink.version}</version>
  47. </dependency>
  48. <!-- 读取hdfs文件需要jar包-->
  49. <dependency>
  50. <groupId>org.apache.hadoop</groupId>
  51. <artifactId>hadoop-client</artifactId>
  52. <version>${hadoop.version}</version>
  53. </dependency>
  54. <!-- Flink SQL & Table-->
  55. <dependency>
  56. <groupId>org.apache.flink</groupId>
  57. <artifactId>flink-table-runtime-blink_2.11</artifactId>
  58. <version>${flink.version}</version>
  59. </dependency>
  60. <dependency>
  61. <groupId>org.apache.flink</groupId>
  62. <artifactId>flink-table</artifactId>
  63. <version>${flink.version}</version>
  64. </dependency>
  65. <dependency>
  66. <groupId>org.apache.flink</groupId>
  67. <artifactId>flink-table-common</artifactId>
  68. <version>${flink.version}</version>
  69. </dependency>
  70. <dependency>
  71. <groupId>org.apache.flink</groupId>
  72. <artifactId>flink-table-api-java</artifactId>
  73. <version>${flink.version}</version>
  74. </dependency>
  75. <dependency>
  76. <groupId>org.apache.flink</groupId>
  77. <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  78. <version>${flink.version}</version>
  79. </dependency>
  80. <dependency>
  81. <groupId>org.apache.flink</groupId>
  82. <artifactId>flink-table-planner_2.11</artifactId>
  83. <version>${flink.version}</version>
  84. </dependency>
  85. <dependency>
  86. <groupId>org.apache.flink</groupId>
  87. <artifactId>flink-table-planner-blink_2.11</artifactId>
  88. <version>${flink.version}</version>
  89. </dependency>
  90. <dependency>
  91. <groupId>junit</groupId>
  92. <artifactId>junit</artifactId>
  93. <version>4.11</version>
  94. <scope>test</scope>
  95. </dependency>
  96. <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
  97. <dependency>
  98. <groupId>org.slf4j</groupId>
  99. <artifactId>slf4j-log4j12</artifactId>
  100. <version>1.7.25</version>
  101. <scope>test</scope>
  102. </dependency>
  103. <dependency>
  104. <groupId>log4j</groupId>
  105. <artifactId>log4j</artifactId>
  106. <version>1.2.17</version>
  107. </dependency>
  108. <dependency>
  109. <groupId>org.slf4j</groupId>
  110. <artifactId>slf4j-api</artifactId>
  111. <version>1.7.25</version>
  112. </dependency>
  113. <dependency>
  114. <groupId>org.slf4j</groupId>
  115. <artifactId>slf4j-nop</artifactId>
  116. <version>1.7.25</version>
  117. <scope>test</scope>
  118. </dependency>
  119. <dependency>
  120. <groupId>org.slf4j</groupId>
  121. <artifactId>slf4j-simple</artifactId>
  122. <version>1.7.5</version>
  123. </dependency>
  124. </dependencies>

2、编写Flink SQL 创建Iceberg表并写入数据

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
  3. env.enableCheckpointing(1000);
  4. //1.创建Catalog
  5. tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
  6. "'type'='iceberg'," +
  7. "'catalog-type'='hadoop'," +
  8. "'warehouse'='hdfs://mycluster/flink_iceberg')");
  9. //2.使用当前Catalog
  10. tblEnv.useCatalog("hadoop_iceberg");
  11. //3.创建数据库
  12. tblEnv.executeSql("create database iceberg_db");
  13. //4.使用数据库
  14. tblEnv.useDatabase("iceberg_db");
  15. //5.创建iceberg表 flink_iceberg_tbl
  16. tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");
  17. //6.写入数据到表 flink_iceberg_tbl
  18. tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");

3、在Hive中映射Iceberg表并查询

在Hive中执行如下命令创建对应的Iceberg表:

  1. #在Hive中创建Iceberg表
  2. CREATE TABLE flink_iceberg_tbl2 (
  3. id int,
  4. name string,
  5. age int,
  6. loc string
  7. )
  8. STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
  9. LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2'
  10. TBLPROPERTIES ('iceberg.catalog'='location_based_table');
  1. #在Hive中查询Iceberg表中的数据
  2. hive> select * from flink_iceberg_tbl2;
  3. OK
  4. 3 ww 20 guangzhou
  5. 1 zs 18 beijing
  6. 2 ls 19 shanghai

二、​​​​​​​​​​​​​​SQL API 批量查询Iceberg表数据

Flink SQL API 批量查询Iceberg表数据,直接查询显示即可。代码如下:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
  3. env.enableCheckpointing(1000);
  4. //1.创建Catalog
  5. tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
  6. "'type'='iceberg'," +
  7. "'catalog-type'='hadoop'," +
  8. "'warehouse'='hdfs://mycluster/flink_iceberg')");
  9. //2.批量读取表数据
  10. TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");
  11. tableResult.print();

结果如下:

三、​​​​​​​​​​​​​​SQL API 实时查询Iceberg表数据

Flink SQL API 实时查询Iceberg表数据时需要设置参数“table.dynamic-table-options.enabled”为true,以支持SQL语法中的“OPTIONS”选项,代码如下:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
  3. env.enableCheckpointing(1000);
  4. Configuration configuration = tblEnv.getConfig().getConfiguration();
  5. // 支持SQL语法中的 OPTIONS 选项
  6. configuration.setBoolean("table.dynamic-table-options.enabled", true);
  7. //1.创建Catalog
  8. tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
  9. "'type'='iceberg'," +
  10. "'catalog-type'='hadoop'," +
  11. "'warehouse'='hdfs://mycluster/flink_iceberg')");
  12. //2.从Iceberg表当前快照读取所有数据,并继续增量读取数据
  13. // streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s
  14. TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
  15. tableResult.print();

启动以上代码后,可以看到会将目前存在于Iceberg表中的数据读取出来,向Hive中对应的Iceberg表中插入数据,可以看到控制台实时获取数据。

  1. #在向Hive的Iceberg表中插入数据之前需要加入以下两个包:
  2. add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
  3. add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
  4. #向Hive 中Iceberg 表插入两条数据
  5. hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');

在控制台可以看到实时新增数据

四、​​​​​​​​​​​​​​SQL API指定基于快照实时增量读取数据

Flink SQL API 还支持基于某个snapshot-id来继续实时获取数据,代码如下:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
  3. env.enableCheckpointing(1000);
  4. Configuration configuration = tblEnv.getConfig().getConfiguration();
  5. // 支持SQL语法中的 OPTIONS 选项
  6. configuration.setBoolean("table.dynamic-table-options.enabled", true);
  7. //1.创建Catalog
  8. tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
  9. "'type'='iceberg'," +
  10. "'catalog-type'='hadoop'," +
  11. "'warehouse'='hdfs://mycluster/flink_iceberg')");
  12. //2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取
  13. //start-snapshot-id :快照ID
  14. TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");
  15. tableResult2.print();

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
标签: flink sql 大数据

本文转载自: https://blog.csdn.net/xiaoweite1/article/details/125774962
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。

“数据湖(十八):Flink与Iceberg整合SQL API操作”的评论:

还没有评论