0


数据湖(四):Hudi与Spark整合

大数据联盟地址:https://bbs.csdn.net/forums/lanson

文章目录

Hudi与Spark整合

一、向Hudi插入数据

默认Spark操作Hudi使用表类型为Copy On Write模式。Hudi与Spark整合时有很多参数配置,可以参照https://hudi.apache.org/docs/configurations.html配置项来查询,此外,整合时有几个需要注意的点,如下:

  • Hudi这里使用的是0.8.0版本,其对应使用的Spark版本是2.4.3+版本
  • Spark2.4.8使用的Scala版本是2.12版本,虽然2.11也是支持的,建议使用2.12。
  • maven导入包中需要保证httpclient、httpcore版本与集群中的Hadoop使用的版本一致,不然会导致通信有问题。检查Hadoop使用以上两个包的版本路径为:$HADOOP_HOME/share/hadoop/common/lib。
  • 在编写代码过程中,指定数据写入到HDFS路径时直接写“/xxdir”不要写“hdfs://mycluster/xxdir”,后期会报错“java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie.temp/2022xxxxxxxxxx/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/2022xxxxxxxxxx”,可以将对应的hdfs-site.xml、core-site.xml放在resources目录下,直接会找HDFS路径。

1、创建项目,修改pom.xml为如下内容

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <scala.version>2.12.14</scala.version>
  4. <spark.version>2.4.8</spark.version>
  5. </properties>
  6. <dependencies>
  7. <!-- 指定Scala版本,这里使用2.12版本 -->
  8. <dependency>
  9. <groupId>org.scala-lang</groupId>
  10. <artifactId>scala-library</artifactId>
  11. <version>${scala.version}</version>
  12. </dependency>
  13. <!-- 指定HttpClient版本为4.5.2,与Hadoop集群中的版本保持一致($HADOOP_HOME/share/hadoop/common/lib/httpcore-4.4.4.jar),不然通信报错 -->
  14. <dependency>
  15. <groupId>org.apache.httpcomponents</groupId>
  16. <artifactId>httpclient</artifactId>
  17. <version>4.5.2</version>
  18. </dependency>
  19. <!-- 指定HttpCore版本为4.4.4,与Hadoop集群中的版本保持一致($HADOOP_HOME/share/hadoop/common/lib/httpclient-4.5.2.jar) ,不然通信报错-->
  20. <dependency>
  21. <groupId>org.apache.httpcomponents</groupId>
  22. <artifactId>httpcore</artifactId>
  23. <version>4.4.4</version>
  24. </dependency>
  25. <!-- Spark 依赖Jar 包 -->
  26. <dependency>
  27. <groupId>org.apache.spark</groupId>
  28. <artifactId>spark-core_2.12</artifactId>
  29. <version>${spark.version}</version>
  30. <exclusions>
  31. <exclusion>
  32. <groupId>org.apache.httpcomponents</groupId>
  33. <artifactId>httpclient</artifactId>
  34. </exclusion>
  35. <exclusion>
  36. <groupId>org.apache.httpcomponents</groupId>
  37. <artifactId>httpcore</artifactId>
  38. </exclusion>
  39. </exclusions>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.apache.spark</groupId>
  43. <artifactId>spark-sql_2.12</artifactId>
  44. <version>${spark.version}</version>
  45. </dependency>
  46. <dependency>
  47. <groupId>org.apache.spark</groupId>
  48. <artifactId>spark-avro_2.12</artifactId>
  49. <version>${spark.version}</version>
  50. </dependency>
  51. <!--连接Hive 需要的包,同时,读取Hudi parquet格式数据,也需要用到这个包中的parqurt相关类 -->
  52. <dependency>
  53. <groupId>org.apache.spark</groupId>
  54. <artifactId>spark-hive_2.12</artifactId>
  55. <version>${spark.version}</version>
  56. </dependency>
  57. <!-- 连接Hive 驱动包-->
  58. <dependency>
  59. <groupId>org.apache.hive</groupId>
  60. <artifactId>hive-jdbc</artifactId>
  61. <version>1.2.1</version>
  62. </dependency>
  63. <dependency>
  64. <groupId>org.apache.hudi</groupId>
  65. <artifactId>hudi-spark-bundle_2.12</artifactId>
  66. <version>0.8.0</version>
  67. </dependency>
  68. </dependencies>
  69. <build>
  70. <plugins>
  71. <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
  72. <plugin>
  73. <groupId>org.scala-tools</groupId>
  74. <artifactId>maven-scala-plugin</artifactId>
  75. <version>2.15.2</version>
  76. <executions>
  77. <execution>
  78. <goals>
  79. <goal>compile</goal>
  80. <goal>testCompile</goal>
  81. </goals>
  82. </execution>
  83. </executions>
  84. </plugin>
  85. <!-- maven 打jar包需要插件 -->
  86. <plugin>
  87. <artifactId>maven-assembly-plugin</artifactId>
  88. <version>2.4</version>
  89. <configuration>
  90. <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
  91. <!--<appendAssemblyId>false</appendAssemblyId>-->
  92. <descriptorRefs>
  93. <descriptorRef>jar-with-dependencies</descriptorRef>
  94. </descriptorRefs>
  95. <archive>
  96. <manifest>
  97. <mainClass>com.xxx</mainClass>
  98. </manifest>
  99. </archive>
  100. </configuration>
  101. <executions>
  102. <execution>
  103. <id>make-assembly</id>
  104. <phase>package</phase>
  105. <goals>
  106. <goal>assembly</goal>
  107. </goals>
  108. </execution>
  109. </executions>
  110. </plugin>
  111. </plugins>
  112. </build>

2、编写向Hudi插入数据代码

  1. val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
  2. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  3. .getOrCreate()
  4. //关闭日志
  5. // session.sparkContext.setLogLevel("Error")
  6. //创建DataFrame
  7. val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
  8. //将结果保存到hudi中
  9. insertDF.write.format("org.apache.hudi")//或者直接写hudi
  10. //设置主键列名称
  11. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
  12. //当数据主键相同时,对比的字段,保存该字段大的数据
  13. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt")S
  14. //并行度设置,默认1500
  15. .option("hoodie.insert.shuffle.parallelism","2")
  16. .option("hoodie.upsert.shuffle.parallelism", "2")
  17. //表名设置
  18. .option(HoodieWriteConfig.TABLE_NAME,"person_infos")
  19. .mode(SaveMode.Overwrite)
  20. //注意:这里要选择hdfs路径存储,不要加上hdfs://mycluster//dir
  21. //将hdfs 中core-site.xml 、hdfs-site.xml放在resource目录下,直接写/dir路径即可,否则会报错:java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie\.temp/20220509164730/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/20220509164730
  22. .save("/hudi_data/person_infos")

二、指定分区向hudi中插入数据****

向Hudi中存储数据时,如果没有指定分区列,那么默认只有一个default分区,我们可以保存数据时指定分区列,可以在写出时指定“DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY”选项来指定分区列,如果涉及到多个分区列,那么需要将多个分区列进行拼接生成新的字段,使用以上参数指定新的字段即可。

1、指定一个分区列

  1. insertDF.write.format("org.apache.hudi")
  2. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  3. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  4. //指定分区列
  5. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
  6. .option("hoodie.insert.shuffle.parallelism", "2")
  7. .option("hoodie.upsert.shuffle.parallelism", "2")
  8. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  9. .mode(SaveMode.Overwrite)
  10. .save("/hudi_data/person_infos")

**2、指定分区为多个列时,可以先拼接,后指定拼接字段当做分区列: **

指定两个分区,需要拼接

  1. //导入函数,拼接列
  2. import org.apache.spark.sql.functions._
  3. val endDF: DataFrame = insertDF.withColumn("partition_key", concat_ws("-", col("data_dt"), col("loc")))
  4. endDF.write.format("org.apache.hudi")
  5. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  6. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  7. //指定分区列,这里是拼接的列
  8. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition_key")
  9. .option("hoodie.insert.shuffle.parallelism", "2")
  10. .option("hoodie.upsert.shuffle.parallelism", "2")
  11. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  12. .mode(SaveMode. )
  13. .save("/hudi_data/person_infos")

三、 ​​​​​​​读取Hudi数据

使用SparkSQL读取Hudi中的数据,无法使用读取表方式来读取,需要指定HDFS对应的路径来加载,指定的路径只需要指定到*.parquet当前路径或者上一层路径即可,路径中可以使用“*”来替代任意目录和数据。

读取数据返回的结果中除了原有的数据之外,还会携带Hudi对应的列数据,例如:hudi的主键、分区、提交时间、对应的parquet名称。

Spark读取Hudi表数据代码如下:

  1. val session: SparkSession = SparkSession.builder().master("local").appName("queryDataFromHudi")
  2. .getOrCreate()
  3. //读取的数据路径下如果有分区,会自动发现分区数据,需要使用 * 代替,指定到parquet格式数据上层目录即可。
  4. val frame: DataFrame = session.read.format("org.apache.hudi").load("/hudi_data/person_infos/*/*")
  5. frame.createTempView("personInfos")
  6. //查询结果
  7. val result = session.sql(
  8. """
  9. | select * from personInfos
  10. """.stripMargin)
  11. result.show(false)

四、​​​​​​​​​​​​​​更新Hudi数据

向Hudi中更新数据有如下几个特点

  • 同一个分区内,向Hudi中更新数据是用主键来判断数据是否需要更新的,这里判断的是相同分区内是否有相同主键,不同分区内允许有相同主键。
  • 更新数据时,如果原来数据有分区,一定要指定分区,不然就相当于是向相同表目录下插入数据,会生成对应的“default”分区。
  • 向Hudi中更新数据时,与向Hudi中插入数据一样,但是写入的模式需要指定成“Append”,如果指定成“overwrite”,那么就是全覆盖了。建议使用时一直使用“Append”模式即可。
  • 当更新完成之后,再一次从Hudi中查询数据时,会看到Hudi提交的时间字段为最新的时间。

这里将原有的三条数据改成如下三条数据:

  1. #修改之前
  2. {"id":1,"name":"zs1","age":18,"loc":"beijing","data_dt":"20210709"}
  3. {"id":2,"name":"zs2","age":19,"loc":"shanghai","data_dt":"20210709"}
  4. {"id":3,"name":"zs3","age":20,"loc":"beijing","data_dt":"20210709"}
  5. #修改之后
  6. {"id":1,"name":"ls1","age":40,"loc":"beijing","data_dt":"20210709"} --更新数据
  7. {"id":2,"name":"ls2","age":50,"loc":"shanghai","data_dt":"20210710"} --更新数据
  8. {"id":3,"name":"ls3","age":60,"loc":"ttt","data_dt":"20210711"} --相当于是新增数据

更新Hudi数据代码如下:

  1. val session: SparkSession = SparkSession.builder().master("local").appName("updataDataToHudi")
  2. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  3. .getOrCreate()
  4. //读取修改数据
  5. val updateDataDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\updatedata.json")
  6. //向Hudi 更新数据
  7. updateDataDF.write.format("org.apache.hudi") //或者直接写hudi
  8. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  9. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  10. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
  11. .option("hoodie.insert.shuffle.parallelism", "2")
  12. .option("hoodie.upsert.shuffle.parallelism", "2")
  13. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  14. .mode(SaveMode.Append)
  15. .save("/hudi_data/person_infos")
  16. //查询数据
  17. val frame: DataFrame = session.read.format("org.apache.hudi").load("/hudi_data/person_infos/*/*")
  18. frame.createTempView("personInfos")
  19. //查询结果
  20. val result = session.sql(
  21. """
  22. | select * from personInfos
  23. """.stripMargin)
  24. result.show(false)

五、 增量查询Hudi数据

Hudi可以根据我们传入的时间戳查询此时间戳之后的数据,这就是增量查询,需要注意的是增量查询必须通过以下方式在Spark中指定一个时间戳才能正常查询:

option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,timestamp)

例如:原始数据如下:

我们可以查询“20210709220335”之后的数据,查询结果如下:

代码如下:

  1. val session: SparkSession = SparkSession.builder().master("local").appName("updataDataToHudi")
  2. .getOrCreate()
  3. //关闭日志
  4. session.sparkContext.setLogLevel("Error")
  5. //导入隐式转换
  6. import session.implicits._
  7. //查询全量数据,查询对应的提交时间,找出倒数第二个时间
  8. val basePath = "/hudi_data/person_infos"
  9. session.read.format("hudi").load(basePath+"/*/*").createTempView("personInfos")
  10. val df: DataFrame = session.sql("select distinct(_hoodie_commit_time) as commit_time from personInfos order by commit_time desc")
  11. //这里获取由大到小排序的第二个值
  12. val dt: String = df.map(row=>{row.getString(0)}).collect()(1)
  13. //增量查询
  14. val result:DataFrame = session.read.format("hudi")
  15. /**
  16. * 指定数据查询方式,有以下三种:
  17. * val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot" -- 获取最新所有数据 , 默认
  18. * val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental" --获取指定时间戳后的变化数据
  19. * val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized" -- 只查询Base文件中的数据
  20. *
  21. * 1) Snapshot mode (obtain latest view, based on row & columnar data)
  22. * 2) incremental mode (new data since an instantTime)
  23. * 3) Read Optimized mode (obtain latest view, based on columnar data)
  24. *
  25. * Default: snapshot
  26. */
  27. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
  28. //必须指定一个开始查询的时间,不指定报错
  29. .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,dt)
  30. .load(basePath+"/*/*")
  31. result.show(false)

六、​​​​​​​​​​​​​​指定时间范围查询Hudi数据

Hudi还可以通过指定开始时间和结束时间来查询时间范围内的数据。如果想要查询最早的时间点到某个结束时刻的数据,开始时间可以指定成“000”。

1、向原有Hudi表“person_infos”中插入两次数据

目前hudi表中的数据如下:

先执行两次新的数据插入,两次插入数据之间的间隔时间至少为1分钟,两次插入数据代码如下:

  1. //以下代码分两次向 HDFS /hudi_data/person_infos 路径中插入数据,两次运行至少1分钟以上
  2. val session: SparkSession = SparkSession.builder().master("local").appName("PointTimeQueryHudi")
  3. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  4. .getOrCreate()
  5. //读取第一个文件,向Hudi中插入数据
  6. val df1: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\appendData1.json")
  7. val df2: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\appendData2.json")
  8. //向Hudi中插入数据
  9. df2.write.format("hudi")
  10. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  11. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  12. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
  13. .option("hoodie.insert.shuffle.parallelism", "2")
  14. .option("hoodie.upsert.shuffle.parallelism", "2")
  15. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  16. .mode(SaveMode.Append)
  17. .save("/hudi_data/person_infos")
  18. import org.apache.spark.sql.functions._
  19. //查询数据
  20. session.read.format("hudi").load("/hudi_data/person_infos/*/*")
  21. .orderBy(col("_hoodie_commit_time"))
  22. .show(100,false)

此时,数据如下:

2、指定时间段查询Hudi中的数据

代码如下:

  1. val session: SparkSession = SparkSession.builder().master("local").appName("PointTimeQueryHudi")
  2. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  3. .getOrCreate()
  4. //指定时间段,查询hudi中的数据
  5. // val beginTime = "000"
  6. val beginTime = "20210710002148"
  7. val endTime = "20210710002533"
  8. val result: DataFrame = session.read.format("hudi")
  9. //指定增量查询
  10. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
  11. //指定查询开始时间(不包含),“000”指定为最早时间
  12. .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime)
  13. //指定查询结束时间(包含)
  14. .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, endTime)
  15. .load("/hudi_data/person_infos/*/*")
  16. result.createTempView("temp")
  17. session.sql(
  18. """
  19. | select * from temp order by _hoodie_commit_time
  20. """.stripMargin).show(100,false)

开始时间为“000”,相当于是从头开始查询到endTime的数据:

开始时间为“20210710002148”:

七、​​​​​​​​​​​​​​删除Hudi数据

我们准备对应的主键及分区的数据,将Hudi中对应的主键及分区的数据进行删除,在删除Hudi中的数据时,需要指定option(OPERATION_OPT_KEY,"delete")配置项,并且写入模式只能是Append,不支持其他写入模式,另外,设置下删除执行的并行度,默认为1500个,这里可以设置成2个。

原始数据如下:

准备要删除的数据如下:

  1. {"id":11,"loc":"beijing"}
  2. {"id":12,"loc":"beijing"}
  3. {"id":13,"loc":"beijing"}
  4. {"id":14,"loc":"shenzhen"}
  5. {"id":15,"loc":"tianjian"} --此条数据对应的主键一致,但是分区不一致,不能在Hudi中删除

编写代码如下:

  1. val session: SparkSession = SparkSession.builder().master("local").appName("DeleteHudiData")
  2. .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  3. .getOrCreate()
  4. //读取需要删除的数据,只需要准备对应的主键及分区即可,字段保持与Hudi中需要删除的字段名称一致即可
  5. //读取的文件中准备了一个主键在Hudi中存在但是分区不再Hudi中存在的数据,此主键数据在Hudi中不能被删除,需要分区和主键字段都匹配才能删除
  6. val deleteData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\deleteData.json")
  7. //将删除的数据插入到Hudi中
  8. deleteData.write.format("hudi")
  9. //指定操作模式为delete
  10. .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"delete")
  11. //指定主键
  12. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
  13. //指定分区字段
  14. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
  15. //指定表名,这里的表明需要与之前指定的表名保持一致
  16. .option(HoodieWriteConfig.TABLE_NAME,"person_infos")
  17. //设置删除并行度设置,默认1500并行度
  18. .option("hoodie.delete.shuffle.parallelism", "2")
  19. .mode(SaveMode.Append)
  20. .save("/hudi_data/person_infos")
  21. //执行完成之后,查询结果
  22. import org.apache.spark.sql.functions._
  23. session.read.format("hudi").load("/hudi_data/person_infos/*/*")
  24. .orderBy(col("_hoodie_commit_time")).show(100,false)

结果如下:

八、​​​​​​更新Hudi某个分区数据

如果我们想要更新Hudi某个分区的数据,其他分区数据正常使用,那么可以通过配置option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite")选项,该选项“insert_overwrite”可以直接在元数据层面上操作,直接将写入某分区的新数据替换到该分区内,原有数据会在一定时间内删除,相比upsert更新Hudi速度要快。

1、删除person_infos对应的目录,重新插入数据,代码如下

  1. val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
  2. .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  3. .getOrCreate()
  4. //创建DataFrame
  5. val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
  6. insertDF.write.format("org.apache.hudi")
  7. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  8. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  9. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
  10. .option("hoodie.insert.shuffle.parallelism", "2")
  11. .option("hoodie.upsert.shuffle.parallelism", "2")
  12. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  13. .mode(SaveMode.Append)
  14. .save("/hudi_data/person_infos")
  15. //写入完成之后,查询hudi 数据:
  16. val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
  17. person_infos.show(100,false)

2、读取更新分区数据,插入到Hudi preson_infos表中

读取数据如下:

  1. {"id":1,"name":"s1","age":1,"loc":"beijing","data_dt":"20210710"}
  2. {"id":100,"name":"s2","age":2,"loc":"beijing","data_dt":"20210710"}
  3. {"id":200,"name":"s3","age":3,"loc":"beijing","data_dt":"20210710"}
  4. {"id":8,"name":"w1","age":4,"loc":"chongqing","data_dt":"20210710"}
  5. {"id":300,"name":"w2","age":5,"loc":"chongqing","data_dt":"20210710"}

代码如下:

  1. val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
  2. .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  3. .getOrCreate()
  4. //读取需要替换的数据,将beijing分区数据替换成2条,将chognqing分区数据替换成1条
  5. val overWritePartitionData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\overWrite.json")
  6. //写入hudi表person_infos,替换分区
  7. overWritePartitionData.write.format("hudi")
  8. .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite")
  9. //设置主键列名称
  10. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  11. //当数据主键相同时,对比的字段,保存该字段大的数据
  12. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  13. //指定分区列
  14. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
  15. //并行度设置
  16. .option("hoodie.insert.shuffle.parallelism", "2")
  17. .option("hoodie.upsert.shuffle.parallelism", "2")
  18. //表名设置
  19. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  20. .mode(SaveMode.Append)
  21. .save("/hudi_data/person_infos")
  22. //写入完成之后,查询hudi 数据:
  23. val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
  24. person_infos.show(100,false)

九、覆盖Hudi整个表数据

如果我们想要替换Hudi整个表数据,可以在向Hudi表写入数据时指定配置option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite_table")选项,该选项“insert_overwrite_table”可以直接在元数据层面上操作,直接将数据写入表,原有数据会在一定时间内删除,相比删除原有数据再插入更方便。

1、删除Hudi表person_infos对应的HDFS路径,重新插入数据

  1. val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
  2. .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  3. .getOrCreate()
  4. //创建DataFrame
  5. val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
  6. insertDF.write.format("org.apache.hudi")
  7. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  8. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  9. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
  10. .option("hoodie.insert.shuffle.parallelism", "2")
  11. .option("hoodie.upsert.shuffle.parallelism", "2")
  12. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  13. .mode(SaveMode.Append)
  14. .save("/hudi_data/person_infos")
  15. //写入完成之后,查询hudi 数据:
  16. val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
  17. person_infos.show(100,false)

2、读取新数据,覆盖原有Hudi表数据

覆盖更新的数据如下:

  1. {"id":1,"name":"s1","age":1,"loc":"beijing","data_dt":"20210710"}
  2. {"id":100,"name":"s2","age":2,"loc":"beijing","data_dt":"20210710"}
  3. {"id":200,"name":"s3","age":3,"loc":"beijing","data_dt":"20210710"}
  4. {"id":8,"name":"w1","age":4,"loc":"chongqing","data_dt":"20210710"}
  5. {"id":300,"name":"w2","age":5,"loc":"chongqing","data_dt":"20210710"}

代码如下:

  1. val session: SparkSession = SparkSession.builder().master("local").appName("InsertOverWrite")
  2. .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  3. .getOrCreate()
  4. //读取需要替换的数据,覆盖原有表所有数据
  5. val overWritePartitionData: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\overWrite.json")
  6. //写入hudi表person_infos,替换分区
  7. overWritePartitionData.write.format("hudi")
  8. .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite_table")
  9. //设置主键列名称
  10. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  11. //当数据主键相同时,对比的字段,保存该字段大的数据
  12. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  13. //指定分区列
  14. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "loc")
  15. //并行度设置
  16. .option("hoodie.insert.shuffle.parallelism", "2")
  17. .option("hoodie.upsert.shuffle.parallelism", "2")
  18. //表名设置
  19. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  20. .mode(SaveMode.Append)
  21. .save("/hudi_data/person_infos")
  22. //写入完成之后,查询hudi 数据:
  23. val person_infos: DataFrame = session.read.format("hudi").load("/hudi_data/person_infos/*/*")
  24. person_infos.show(100,false)

十、​​​​​​​Spark操作Hudi Merge On Read 模式

默认Spark操作Hudi使用Copy On Write模式,也可以使用Merge On Read 模式,通过代码中国配置如下配置来指定:

option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)

代码操作如下:

  • 删除原有person_infos对应的HDFS路径
  • 读取数据向Hudi表person_info中插入数据

读取的数据如下:

  1. {"id":1,"name":"zs1","age":18,"loc":"beijing","data_dt":"20210709"}
  2. {"id":2,"name":"zs2","age":19,"loc":"shanghai","data_dt":"20210709"}
  3. {"id":3,"name":"zs3","age":20,"loc":"beijing","data_dt":"20210709"}
  4. {"id":4,"name":"zs4","age":21,"loc":"tianjin","data_dt":"20210709"}
  5. {"id":5,"name":"zs5","age":22,"loc":"shenzhen","data_dt":"20210709"}
  6. {"id":6,"name":"zs6","age":23,"loc":"hainai","data_dt":"20210709"}
  7. {"id":7,"name":"zs7","age":24,"loc":"beijing","data_dt":"20210709"}
  8. {"id":8,"name":"zs8","age":25,"loc":"chongqing","data_dt":"20210709"}
  9. {"id":9,"name":"zs9","age":26,"loc":"shandong","data_dt":"20210709"}
  10. {"id":10,"name":"zs10","age":27,"loc":"hunan","data_dt":"20210709"}

代码如下:

  1. //1.读取json格式数据
  2. val insertDf: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\jsondata.json")
  3. //2.将结果使用Merge on Read 模式写入到Hudi中,并设置分区
  4. insertDf.write.format("hudi")
  5. //设置表模式为 mor
  6. .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
  7. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
  8. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
  9. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt")
  10. //并行度设置
  11. .option("hoodie.insert.shuffle.parallelism", "2")
  12. .option("hoodie.upsert.shuffle.parallelism", "2")
  13. //表名设置
  14. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  15. .mode(SaveMode.Append)
  16. .save("/hudi_data/person_infos")

  • 更新Hudi表person_info数据

这里更新“beijing”、“shanghai”、“ttt”分区数据,更新数据如下:

  1. {"id":1,"name":"ls1","age":40,"loc":"beijing","data_dt":"20210709"}
  2. {"id":2,"name":"ls2","age":50,"loc":"shanghai","data_dt":"20210710"}
  3. {"id":3,"name":"ls3","age":60,"loc":"ttt","data_dt":"20210711"}

代码如下:

  1. //3.读取更新数据,并执行插入更新
  2. val updateDf: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\updatedata.json")
  3. updateDf.write.format("hudi")
  4. //设置表模式为 mor
  5. .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
  6. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,"id")
  7. .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,"loc")
  8. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,"data_dt")
  9. //并行度设置
  10. .option("hoodie.insert.shuffle.parallelism", "2")
  11. .option("hoodie.upsert.shuffle.parallelism", "2")
  12. //表名设置
  13. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  14. .mode(SaveMode.Append)
  15. .save("/hudi_data/person_infos")

  • 增量查询Hudi表中的数据

Snapshot 模式查询,这种模式对于COW或者MOR模式都是查询到当前时刻全量的数据,如果有更新,那么就是更新之后全量的数据:

  1. //4.使用不同模式查询 MOR 表中的数据
  2. /**
  3. * 指定数据查询方式,有以下三种:
  4. * val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot" -- 获取最新所有数据 , 默认
  5. * val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental" --获取指定时间戳后的变化数据
  6. * val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized" -- 只查询Base文件中的数据
  7. *
  8. * 1) Snapshot mode (obtain latest view, based on row & columnar data)
  9. * 2) incremental mode (new data since an instantTime)
  10. * 3) Read Optimized mode (obtain latest view, based on columnar data)
  11. *
  12. * Default: snapshot
  13. */
  14. //4.1 Snapshot 模式查询
  15. session.read.format("hudi")
  16. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
  17. .load("/hudi_data/person_infos/*/*")
  18. .show(100,false)

incremental 模式查询,这种模式需要指定一个时间戳,查询指定时间戳之后的新增数据:

  1. //4.2 incremental 模式查询,查询指定时间戳后的数据
  2. session.read.format("hudi")
  3. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
  4. //必须指定一个开始查询的时间,不指定报错
  5. .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,"20210710171240")
  6. .load("/hudi_data/person_infos/*/*")
  7. .show(100,false)

Read Optimized 模式查询,这种模式只查询Base中的数据,不会查询MOR中Log文件中的数据,代码如下:

  1. //4.3 Read Optimized 模式查询,查询Base中的数据,不会查询log中的数据
  2. session.read.format("hudi")
  3. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
  4. .load("/hudi_data/person_infos/*/*")
  5. .show(100,false)

十一、​​​​​​测试COW模式parquet文件删除与MOR模式Parquet文件与log文件Compact

COW默认情况下,每次更新数据Commit都会基于之前parquet文件生成一个新的Parquet Base文件数据,默认历史parquet文件数为10,当超过10个后会自动删除旧的版本,可以通过参数“hoodie.cleaner.commits.retained”来控制保留的FileID版本文件数,默认是10。测试代码如下:

  1. val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
  2. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  3. .getOrCreate()
  4. //创建DataFrame
  5. val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata1.json")
  6. // val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata2.json")
  7. // val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata3.json")
  8. // val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata4.json")
  9. // val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata5.json")
  10. // val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata6.json")
  11. // val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata7.json")
  12. // val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata8.json")
  13. // val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata9.json")
  14. // val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata10.json")
  15. insertDF.write.format("org.apache.hudi")
  16. //设置cow模式
  17. .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
  18. //根据commit提交次数计算保留多少个fileID版本文件,默认10。
  19. .option("hoodie.cleaner.commits.retained","3")
  20. //设置主键列名称
  21. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  22. //当数据主键相同时,对比的字段,保存该字段大的数据
  23. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  24. //并行度设置,默认1500并行度
  25. .option("hoodie.insert.shuffle.parallelism", "2")
  26. .option("hoodie.upsert.shuffle.parallelism", "2")
  27. //表名设置
  28. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  29. .mode(SaveMode.Append)
  30. .save("/hudi_data/test_person")
  31. //查询结果数据
  32. session.read.format("hudi")
  33. //全量读取
  34. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
  35. .load("/hudi_data/test_person/*/*").show()

测试注意:每次运行代码,读取新的一个数据文件,并查看Hudi表对应的HDFS路径,每次读取都会生成一个新的Parquet文件,当达到指定的3个历史版本时(不包含最新Parquet文件),再插入数据生成新的Parquet文件时,一致会将之前的旧版本删除,保存4个文件。

MOR模式下,如果有新增数据会直接写入Base Parquet文件,这个Parquet文件个数的控制也是由“hoodie.cleaner.commits.retained”控制,默认为10。当对应的每个FlieSlice(Base Parquet文件+log Avro文件)中有数据更新时,会写入对应的log Avro文件,那么这个文件何时与Base Parquet文件进行合并,这个是由参数“hoodie.compact.inline.max.delta.commits”决定的,这个参数意思是在提交多少次commit后触发压缩策略,默认是5,也就是当前FlieSlice中如果有5次数据更新就会两者合并生成全量的数据,当前FlieSlice还是这个FileSlice名称,只不过对应的parquet文件中是全量数据,再有更新数据还是会写入当前FileSlice对应的log日志文件中。使“hoodie.compact.inline.max.delta.commits”参数起作用,默认必须开启“hoodie.compact.inline”,此值代表是否完成提交数据后进行压缩,默认是false。

测试代码如下:

  1. #注意代码中设置参数如下:
  2. //根据commit提交次数计算保留多少个fileID版本文件,默认10。
  3. .option("hoodie.cleaner.commits.retained","3")
  4. //默认false:是否在一个事务完成后内联执行压缩操作
  5. .option("hoodie.compact.inline","true")
  6. //设置提交多少次后触发压缩策略,默认5
  7. .option("hoodie.compact.inline.max.delta.commits","2")
  8. #完整代码如下:
  9. val session: SparkSession = SparkSession.builder().master("local").appName("insertDataToHudi")
  10. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  11. .getOrCreate()
  12. //创建DataFrame ,新增
  13. // val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\insertdata1.json")
  14. //创建DataFrame ,更新
  15. val insertDF: DataFrame = session.read.json("file:///D:\\2022IDEA_space\\SparkOperateHudi\\data\\test\\update11.json")
  16. insertDF.write.format("org.apache.hudi") //或者直接写hudi
  17. .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
  18. //根据commit提交次数计算保留多少个fileID版本文件,默认10。
  19. .option("hoodie.cleaner.commits.retained","3")
  20. //默认false:是否在一个事务完成后内联执行压缩操作
  21. .option("hoodie.compact.inline","true")
  22. //设置提交多少次后触发压缩策略,默认5
  23. .option("hoodie.compact.inline.max.delta.commits","2")
  24. //设置主键列名称
  25. .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
  26. //当数据主键相同时,对比的字段,保存该字段大的数据
  27. .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "data_dt")
  28. //并行度设置,默认1500并行度
  29. .option("hoodie.insert.shuffle.parallelism", "2")
  30. .option("hoodie.upsert.shuffle.parallelism", "2")
  31. //表名设置
  32. .option(HoodieWriteConfig.TABLE_NAME, "person_infos")
  33. .mode(SaveMode.Append)
  34. .save("/hudi_data/test_person")
  35. //查询结果数据
  36. session.read.format("hudi")
  37. //全量读取
  38. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
  39. .load("/hudi_data/test_person/*/*").show()

第一次运行插入数据,commit,路径对应数据目录如下:

第一次运行更新数据,commit,路径对应数据目录如下:

**第二次运行更新数据,commit,路径对应的数据目录如下: **

**第三次运行更新数据,commit,路径对应的数据目录如下: **

**第四次运行更新数据,commit,路径对应的数据目录如下: **

**第五次运行更新数据,commit,路径对应的目录数据如下: **


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
标签: spark 大数据 Hudi

本文转载自: https://blog.csdn.net/xiaoweite1/article/details/125036677
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。

“数据湖(四):Hudi与Spark整合”的评论:

还没有评论