0


数据湖(十七):Flink与Iceberg整合DataStream API操作

Flink与Iceberg整合DataStream API操作

目前Flink支持使用DataStream API 和SQL API 方式实时读取和写入Iceberg表,建议大家使用SQL API 方式实时读取和写入Iceberg表。

Iceberg 支持的Flink版本为1.11.x版本以上,目前经过测试Iceberg版本与Flink的版本对应关系如下:

  • Flink1.11.x版本与Iceberg0.11.1版本匹配。
  • Flink1.12.x~Flink1.1.x 版本与Iceberg0.12.1版本匹配,SQL API有一些bug。
  • Flink1.14.x版本与Iceberg0.12.1版本能整合但是有一些小bug,例如实时读取Iceberg中的数据有bug。

以下Flink与Iceberg整合使用的Flink版本为1.13.5,Iceberg版本为0.12.1版本。后期使用SQL API 操作时使用的Flink版本为1.11.6,Iceberg版本为0.11.1版本。

一、DataStream API 实时写入Iceberg表

DataStream Api方式操作Iceberg方式目前仅支持Java Api。使用DataStream API 实时写入Iceberg表具体操作如下:

1、首先在Maven中导入以下依赖

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <maven.compiler.source>1.8</maven.compiler.source>
  4. <maven.compiler.target>1.8</maven.compiler.target>
  5. <!-- flink 1.12.x -1.13.x 版本与Iceberg 0.12.1 版本兼容 ,不能与Flink 1.14 兼容-->
  6. <flink.version>1.13.5</flink.version>
  7. <!--<flink.version>1.12.1</flink.version>-->
  8. <!--<flink.version>1.14.2</flink.version>-->
  9. <!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
  10. <!--<flink.version>1.11.6</flink.version>-->
  11. <hadoop.version>3.2.2</hadoop.version>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>com.alibaba.ververica</groupId>
  16. <artifactId>ververica-connector-iceberg</artifactId>
  17. <version>1.13-vvr-4.0.7</version>
  18. </dependency>
  19. <!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
  20. <dependency>
  21. <groupId>org.apache.iceberg</groupId>
  22. <artifactId>iceberg-flink-runtime</artifactId>
  23. <version>0.12.1</version>
  24. <!--<version>0.11.1</version>-->
  25. </dependency>
  26. <!-- java 开发Flink 所需依赖 -->
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-java</artifactId>
  30. <version>${flink.version}</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.flink</groupId>
  34. <artifactId>flink-streaming-java_2.11</artifactId>
  35. <version>${flink.version}</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.apache.flink</groupId>
  39. <artifactId>flink-clients_2.11</artifactId>
  40. <version>${flink.version}</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.flink</groupId>
  44. <artifactId>flink-streaming-scala_2.11</artifactId>
  45. <version>${flink.version}</version>
  46. </dependency>
  47. <!-- Flink Kafka连接器的依赖 -->
  48. <dependency>
  49. <groupId>org.apache.flink</groupId>
  50. <artifactId>flink-connector-kafka_2.11</artifactId>
  51. <version>${flink.version}</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.flink</groupId>
  55. <artifactId>flink-connector-base</artifactId>
  56. <version>${flink.version}</version>
  57. </dependency>
  58. <!-- 读取hdfs文件需要jar包-->
  59. <dependency>
  60. <groupId>org.apache.hadoop</groupId>
  61. <artifactId>hadoop-client</artifactId>
  62. <version>${hadoop.version}</version>
  63. </dependency>
  64. <!-- Flink SQL & Table-->
  65. <dependency>
  66. <groupId>org.apache.flink</groupId>
  67. <artifactId>flink-table-runtime-blink_2.11</artifactId>
  68. <version>${flink.version}</version>
  69. </dependency>
  70. <dependency>
  71. <groupId>org.apache.flink</groupId>
  72. <artifactId>flink-table</artifactId>
  73. <version>${flink.version}</version>
  74. </dependency>
  75. <dependency>
  76. <groupId>org.apache.flink</groupId>
  77. <artifactId>flink-table-common</artifactId>
  78. <version>${flink.version}</version>
  79. </dependency>
  80. <dependency>
  81. <groupId>org.apache.flink</groupId>
  82. <artifactId>flink-table-api-java</artifactId>
  83. <version>${flink.version}</version>
  84. </dependency>
  85. <dependency>
  86. <groupId>org.apache.flink</groupId>
  87. <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  88. <version>${flink.version}</version>
  89. </dependency>
  90. <dependency>
  91. <groupId>org.apache.flink</groupId>
  92. <artifactId>flink-table-planner_2.11</artifactId>
  93. <version>${flink.version}</version>
  94. </dependency>
  95. <dependency>
  96. <groupId>org.apache.flink</groupId>
  97. <artifactId>flink-table-planner-blink_2.11</artifactId>
  98. <version>${flink.version}</version>
  99. </dependency>
  100. <dependency>
  101. <groupId>junit</groupId>
  102. <artifactId>junit</artifactId>
  103. <version>4.11</version>
  104. <scope>test</scope>
  105. </dependency>
  106. <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
  107. <dependency>
  108. <groupId>org.slf4j</groupId>
  109. <artifactId>slf4j-log4j12</artifactId>
  110. <version>1.7.25</version>
  111. <scope>test</scope>
  112. </dependency>
  113. <dependency>
  114. <groupId>log4j</groupId>
  115. <artifactId>log4j</artifactId>
  116. <version>1.2.17</version>
  117. </dependency>
  118. <dependency>
  119. <groupId>org.slf4j</groupId>
  120. <artifactId>slf4j-api</artifactId>
  121. <version>1.7.25</version>
  122. </dependency>
  123. <dependency>
  124. <groupId>org.slf4j</groupId>
  125. <artifactId>slf4j-nop</artifactId>
  126. <version>1.7.25</version>
  127. <scope>test</scope>
  128. </dependency>
  129. <dependency>
  130. <groupId>org.slf4j</groupId>
  131. <artifactId>slf4j-simple</artifactId>
  132. <version>1.7.5</version>
  133. </dependency>
  134. </dependencies>

2、编写代码使用DataStream API将Kafka数据写入到Iceberg表

  1. import com.google.common.collect.ImmutableMap;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  5. import org.apache.flink.connector.kafka.source.KafkaSource;
  6. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
  7. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.table.data.GenericRowData;
  11. import org.apache.flink.table.data.RowData;
  12. import org.apache.hadoop.conf.Configuration;
  13. import org.apache.iceberg.*;
  14. import org.apache.iceberg.catalog.Catalog;
  15. import org.apache.iceberg.catalog.TableIdentifier;
  16. import org.apache.iceberg.flink.TableLoader;
  17. import org.apache.flink.table.data.StringData;
  18. import org.apache.iceberg.flink.sink.FlinkSink;
  19. import org.apache.iceberg.hadoop.HadoopCatalog;
  20. import org.apache.iceberg.types.Types;
  21. import java.util.Map;
  22. /**
  23. * 使用DataStream Api 向Iceberg 表写入数据
  24. */
  25. public class StreamAPIWriteIceberg {
  26. public static void main(String[] args) throws Exception {
  27. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  28. //1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据。
  29. env.enableCheckpointing(5000);
  30. //2.读取Kafka 中的topic 数据
  31. KafkaSource<String> source = KafkaSource.<String>builder()
  32. .setBootstrapServers("node1:9092,node2:9092,node3:9092")
  33. .setTopics("flink-iceberg-topic")
  34. .setGroupId("my-group-id")
  35. .setStartingOffsets(OffsetsInitializer.latest())
  36. .setValueOnlyDeserializer(new SimpleStringSchema())
  37. .build();
  38. DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  39. //3.对数据进行处理,包装成RowData 对象,方便保存到Iceberg表中。
  40. SingleOutputStreamOperator<RowData> dataStream = kafkaSource.map(new MapFunction<String, RowData>() {
  41. @Override
  42. public RowData map(String s) throws Exception {
  43. System.out.println("s = "+s);
  44. String[] split = s.split(",");
  45. GenericRowData row = new GenericRowData(4);
  46. row.setField(0, Integer.valueOf(split[0]));
  47. row.setField(1, StringData.fromString(split[1]));
  48. row.setField(2, Integer.valueOf(split[2]));
  49. row.setField(3, StringData.fromString(split[3]));
  50. return row;
  51. }
  52. });
  53. //4.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
  54. Configuration hadoopConf = new Configuration();
  55. Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
  56. //配置iceberg 库名和表名
  57. TableIdentifier name =
  58. TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
  59. //创建Icebeng表Schema
  60. Schema schema = new Schema(
  61. Types.NestedField.required(1, "id", Types.IntegerType.get()),
  62. Types.NestedField.required(2, "nane", Types.StringType.get()),
  63. Types.NestedField.required(3, "age", Types.IntegerType.get()),
  64. Types.NestedField.required(4, "loc", Types.StringType.get()));
  65. //如果有分区指定对应分区,这里“loc”列为分区列,可以指定unpartitioned 方法不设置表分区
  66. // PartitionSpec spec = PartitionSpec.unpartitioned();
  67. PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build();
  68. //指定Iceberg表数据格式化为Parquet存储
  69. Map<String, String> props =
  70. ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
  71. Table table = null;
  72. // 通过catalog判断表是否存在,不存在就创建,存在就加载
  73. if (!catalog.tableExists(name)) {
  74. table = catalog.createTable(name, schema, spec, props);
  75. }else {
  76. table = catalog.loadTable(name);
  77. }
  78. TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
  79. //5.通过DataStream Api 向Iceberg中写入数据
  80. FlinkSink.forRowData(dataStream)
  81. //这个 .table 也可以不写,指定tableLoader 对应的路径就可以。
  82. .table(table)
  83. .tableLoader(tableLoader)
  84. //默认为false,追加数据。如果设置为true 就是覆盖数据
  85. .overwrite(false)
  86. .build();
  87. env.execute("DataStream Api Write Data To Iceberg");
  88. }
  89. }

以上代码有如下几个注意点:

  • 需要设置Checkpoint,Flink向Iceberg中写入Commit数据时,只有Checkpoint成功之后才会Commit数据,否则后期在Hive中查询不到数据。
  • 读取Kafka数据后需要包装成RowData或者Row对象,才能向Iceberg表中写出数据。写出数据时默认是追加数据,如果指定overwrite就是全部覆盖数据。
  • 在向Iceberg表中写数据之前需要创建对应的Catalog、表Schema,否则写出时只指定对应的路径会报错找不到对应的Iceberg表。
  • 不建议使用DataStream API 向Iceberg中写数据,建议使用SQL API。

3、在Kafka 中创建代码中指定的“flink-iceberg-topic”并启动代码生产数据

  1. # 在Kafka 中创建 flink-iceberg-topic topic
  2. [root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic flink-iceberg-topic --partitions 3 --replication-factor 3

创建好以上topic之后,启动代码,然后向topic中生产以下数据:

  1. [root@node1 bin]#./kafka-console-producer.sh --topic flink-iceberg-topic --broker-list node1:9092,node2:9092,node3:9092
  2. 1,zs,18,beijing
  3. 2,ls,19,shanghai
  4. 3,ww,20,beijing
  5. 4,ml,21,shanghai

可以看到在HDFS 对应的路径中保存了对应的数据:

4、通过Hive查看保存到Iceberg中的数据

启动Hive、Hive Metastore 在Hive中创建映射Iceberg的外表:

  1. CREATE TABLE flink_iceberg_tbl (
  2. id int,
  3. name string,
  4. age int,
  5. loc string
  6. )
  7. STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
  8. LOCATION 'hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl'
  9. TBLPROPERTIES ('iceberg.catalog'='location_based_table');

注意:虽然loc是分区列,创建时忽略分区列就可以,此外映射表的路径要保持与保存Iceberg数据路径一致。

通过Hive查询对应的Iceberg表中的数据,结果如下:

  1. hive> select * from flink_iceberg_tbl;
  2. OK
  3. 2 ls 19 shanghai
  4. 3 ww 20 beijing
  5. 1 zs 18 beijing
  6. 4 ml 21 shanghai

二、​​​​​​​​​​​​​​DataStream API 批量/实时读取Iceberg表

DataStream API 读取Iceberg表又分为批量读取和实时读取。通过方法“streaming(true/false)”来控制。

1、批量/全量读取

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.data.RowData;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.iceberg.flink.TableLoader;
  7. import org.apache.iceberg.flink.source.FlinkSource;
  8. /**
  9. * 使用DataStream Api 批量/实时 读取Iceberg 数据
  10. */
  11. public class StreamAPIReadIceberg {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. //1.配置TableLoader
  15. Configuration hadoopConf = new Configuration();
  16. TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);
  17. //2.从Iceberg中读取全量/增量读取数据
  18. DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
  19. .tableLoader(tableLoader)
  20. //默认为false,整批次读取,设置为true 为流式读取
  21. .streaming(false)
  22. .build();
  23. batchData.map(new MapFunction<RowData, String>() {
  24. @Override
  25. public String map(RowData rowData) throws Exception {
  26. int id = rowData.getInt(0);
  27. String name = rowData.getString(1).toString();
  28. int age = rowData.getInt(2);
  29. String loc = rowData.getString(3).toString();
  30. return id+","+name+","+age+","+loc;
  31. }
  32. }).print();
  33. env.execute("DataStream Api Read Data From Iceberg");
  34. }
  35. }

结果如下:

2、实时读取

  1. //当配置 streaming参数为true时就是实时读取
  2. DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
  3. .tableLoader(tableLoader)
  4. //默认为false,整批次读取,设置为true 为流式读取
  5. .streaming(true)
  6. .build();

修改以上代码并启动,向Hive 对应的Iceberg表“flink_iceberg_tbl”中插入2条数据:

在向Hive的Iceberg表中插入数据之前需要加入以下两个包:

  1. add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
  2. add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

向Hive 中Iceberg 表插入两条数据

  1. hive> insert into flink_iceberg_tbl values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');

插入完成之后,可以看到Flink 控制台实时读取到对应数据

三、​​​​​​​​​​​​​​指定基于快照实时增量读取数据

以上案例我们发现Flink将表中所有数据都读取出来,我们也可以指定对应的snapshot-id 决定基于哪些数据增量读取数据。

  1. DataStream<RowData> batchData = FlinkSource.forRowData().env(env)
  2. .tableLoader(tableLoader)
  3. //基于某个快照实时增量读取数据,快照需要从元数据中获取
  4. .startSnapshotId(4226332606322964975L)
  5. //默认为false,整批次读取,设置为true 为流式读取
  6. .streaming(true)
  7. .build();

结果只读取到指定快照往后的数据,如下:

四、合并data files

Iceberg提供Api将小文件合并成大文件,可以通过Flink 批任务来执行。Flink中合并小文件与Spark中小文件合并完全一样。

代码如下:

  1. import org.apache.flink.api.java.ExecutionEnvironment;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.iceberg.Table;
  4. import org.apache.iceberg.actions.RewriteDataFilesActionResult;
  5. import org.apache.iceberg.catalog.Catalog;
  6. import org.apache.iceberg.catalog.TableIdentifier;
  7. import org.apache.iceberg.flink.TableLoader;
  8. import org.apache.iceberg.flink.actions.Actions;
  9. import org.apache.iceberg.hadoop.HadoopCatalog;
  10. /**
  11. * 可以通过提交Flink批量任务来合并Data Files 文件。
  12. */
  13. public class RewrietDataFiles {
  14. public static void main(String[] args) {
  15. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  16. //1.配置TableLoader
  17. Configuration hadoopConf = new Configuration();
  18. //2.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表
  19. Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");
  20. //3.配置iceberg 库名和表名并加载表
  21. TableIdentifier name =
  22. TableIdentifier.of("icebergdb", "flink_iceberg_tbl");
  23. Table table = catalog.loadTable(name);
  24. //4..合并 data files 小文件
  25. RewriteDataFilesActionResult result = Actions.forTable(table)
  26. .rewriteDataFiles()
  27. //默认 512M ,可以手动通过以下指定合并文件大小,与Spark中一样。
  28. .targetSizeInBytes(536870912L)
  29. .execute();
  30. }
  31. }

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文转载自: https://blog.csdn.net/xiaoweite1/article/details/125754227
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。

“数据湖(十七):Flink与Iceberg整合DataStream API操作”的评论:

还没有评论