0


数据湖之Hudi(9):使用Spark向Hudi中插入数据

0. 相关文章链接

大数据基础知识点 文章汇总

1. 开发说明

Apache Hudi最初是由Uber开发的,旨在以高效率实现低延迟的数据库访问。Hudi 提供了Hudi 表的概念,这些表支持CRUD操作,基于Spark框架使用Hudi API 进行读写操作。

2. 环境构建

2.1. 构建服务器环境

关于构建Spark向Hudi中插入数据的服务器环境,可以参考博文的另外一篇博文,在CentOS7上安装HDFS即可,博文连接:数据湖之Hudi(6):Hudi与Spark和HDFS的集成安装使用

2.2. 构建Maven项目

需要在IDEA中创建一个Maven工程,并将服务器上的core-site.xml 和 hdfs-site.xml 这2个配置文件导入,以及创建一个log4j.properties文件,如下图所示:

log4j.properties 文件内容如下:

  1. log4j.rootCategory=WARN, console
  2. log4j.rootLogger=error,stdout
  3. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  4. log4j.appender.stdout.target=System.out
  5. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  6. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

注意,这是本地跑程序,需要配置好域名映射。

3. Maven依赖

  1. <repositories>
  2. <repository>
  3. <id>aliyun</id>
  4. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  5. </repository>
  6. <repository>
  7. <id>cloudera</id>
  8. <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
  9. </repository>
  10. <repository>
  11. <id>jboss</id>
  12. <url>http://repository.jboss.com/nexus/content/groups/public</url>
  13. </repository>
  14. </repositories>
  15. <properties>
  16. <scala.version>2.12.10</scala.version>
  17. <scala.binary.version>2.12</scala.binary.version>
  18. <spark.version>3.0.0</spark.version>
  19. <hadoop.version>3.0.0</hadoop.version>
  20. <hudi.version>0.9.0</hudi.version>
  21. </properties>
  22. <dependencies>
  23. <!-- 依赖Scala语言 -->
  24. <dependency>
  25. <groupId>org.scala-lang</groupId>
  26. <artifactId>scala-library</artifactId>
  27. <version>${scala.version}</version>
  28. </dependency>
  29. <!-- Spark Core 依赖 -->
  30. <dependency>
  31. <groupId>org.apache.spark</groupId>
  32. <artifactId>spark-core_${scala.binary.version}</artifactId>
  33. <version>${spark.version}</version>
  34. </dependency>
  35. <!-- Spark SQL 依赖 -->
  36. <dependency>
  37. <groupId>org.apache.spark</groupId>
  38. <artifactId>spark-sql_${scala.binary.version}</artifactId>
  39. <version>${spark.version}</version>
  40. </dependency>
  41. <!-- Hadoop Client 依赖 -->
  42. <dependency>
  43. <groupId>org.apache.hadoop</groupId>
  44. <artifactId>hadoop-client</artifactId>
  45. <version>${hadoop.version}</version>
  46. </dependency>
  47. <!-- hudi-spark3 -->
  48. <dependency>
  49. <groupId>org.apache.hudi</groupId>
  50. <artifactId>hudi-spark3-bundle_2.12</artifactId>
  51. <version>${hudi.version}</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.spark</groupId>
  55. <artifactId>spark-avro_2.12</artifactId>
  56. <version>${spark.version}</version>
  57. </dependency>
  58. </dependencies>
  59. <build>
  60. <outputDirectory>target/classes</outputDirectory>
  61. <testOutputDirectory>target/test-classes</testOutputDirectory>
  62. <resources>
  63. <resource>
  64. <directory>${project.basedir}/src/main/resources</directory>
  65. </resource>
  66. </resources>
  67. <!-- Maven 编译的插件 -->
  68. <plugins>
  69. <plugin>
  70. <groupId>org.apache.maven.plugins</groupId>
  71. <artifactId>maven-compiler-plugin</artifactId>
  72. <version>3.0</version>
  73. <configuration>
  74. <source>1.8</source>
  75. <target>1.8</target>
  76. <encoding>UTF-8</encoding>
  77. </configuration>
  78. </plugin>
  79. <plugin>
  80. <groupId>net.alchim31.maven</groupId>
  81. <artifactId>scala-maven-plugin</artifactId>
  82. <version>3.2.0</version>
  83. <executions>
  84. <execution>
  85. <goals>
  86. <goal>compile</goal>
  87. <goal>testCompile</goal>
  88. </goals>
  89. </execution>
  90. </executions>
  91. </plugin>
  92. </plugins>
  93. </build>

4. 核心代码

在上述图片的包中新建scala的object对象,对象名为:Demo01_InsertForCOW,用于实现模拟数据,插入Hudi表,采用COW模式。

具体需求:使用官方QuickstartUtils提供模拟产生Trip数据,模拟100条交易Trip乘车数据,将其转换为DataFrame数据集,保存至Hudi表中,代码基本与spark-shell命令行一致

具体代码如下:

  1. package com.ouyang.hudi.crud
  2. import org.apache.hudi.QuickstartUtils.DataGenerator
  3. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  4. /**
  5. * @ date: 2022/2/23
  6. * @ author: yangshibiao
  7. * @ desc: 模拟数据,插入Hudi表,采用COW模式
  8. * 使用官方QuickstartUtils提供模拟产生Trip数据,
  9. * 模拟100条交易Trip乘车数据,将其转换为DataFrame数据集,
  10. * 保存至Hudi表中,代码基本与spark-shell命令行一致
  11. */
  12. object Demo01_InsertForCOW {
  13. def main(args: Array[String]): Unit = {
  14. // 创建SparkSession实例对象,设置属性
  15. val spark: SparkSession = {
  16. SparkSession.builder()
  17. .appName(this.getClass.getSimpleName.stripSuffix("$"))
  18. .master("local[4]")
  19. // 设置序列化方式:Kryo
  20. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  21. .getOrCreate()
  22. }
  23. // 定义变量:表名称、保存路径
  24. val tableName: String = "tbl_trips_cow"
  25. val tablePath: String = "/hudi-warehouse/tbl_trips_cow"
  26. // 构建数据生成器,模拟产生业务数据
  27. import org.apache.hudi.QuickstartUtils._
  28. import scala.collection.JavaConverters._
  29. import spark.implicits._
  30. // 第1步、模拟乘车数据
  31. val dataGen: DataGenerator = new DataGenerator()
  32. val inserts = convertToStringList(dataGen.generateInserts(100))
  33. // 将集合对象写入到df中
  34. val insertDF: DataFrame = spark.read.json(
  35. spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
  36. )
  37. insertDF.printSchema()
  38. insertDF.show(10, truncate = false)
  39. // TOOD: 第2步、插入数据到Hudi表
  40. import org.apache.hudi.DataSourceWriteOptions._
  41. import org.apache.hudi.config.HoodieWriteConfig._
  42. insertDF.write
  43. .mode(SaveMode.Append)
  44. .format("hudi")
  45. .option("hoodie.insert.shuffle.parallelism", "2")
  46. .option("hoodie.upsert.shuffle.parallelism", "2")
  47. // Hudi 表的属性值设置
  48. .option(PRECOMBINE_FIELD.key(), "ts")
  49. .option(RECORDKEY_FIELD.key(), "uuid")
  50. .option(PARTITIONPATH_FIELD.key(), "partitionpath")
  51. .option(TBL_NAME.key(), tableName)
  52. .save(tablePath)
  53. }
  54. }

点击执行后可能会碰到 null\bin\winutils.exe in the Hadoop binaries 问题,这个是在windows本地执行时没有对应环境,可以忽略,如下图所示:

在代码中打印了数据格式和部分数据,如下所示:

  1. root
  2. |-- begin_lat: double (nullable = true)
  3. |-- begin_lon: double (nullable = true)
  4. |-- driver: string (nullable = true)
  5. |-- end_lat: double (nullable = true)
  6. |-- end_lon: double (nullable = true)
  7. |-- fare: double (nullable = true)
  8. |-- partitionpath: string (nullable = true)
  9. |-- rider: string (nullable = true)
  10. |-- ts: long (nullable = true)
  11. |-- uuid: string (nullable = true)
  12. +-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
  13. |begin_lat |begin_lon |driver |end_lat |end_lon |fare |partitionpath |rider |ts |uuid |
  14. +-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
  15. |0.4726905879569653 |0.46157858450465483|driver-213|0.754803407008858 |0.9671159942018241 |34.158284716382845|americas/brazil/sao_paulo |rider-213|1645620263263|550e7186-203c-48a8-9964-edf12e0dfbe3|
  16. |0.6100070562136587 |0.8779402295427752 |driver-213|0.3407870505929602 |0.5030798142293655 |43.4923811219014 |americas/brazil/sao_paulo |rider-213|1645074858260|c8d5e237-6589-419e-bef7-221faa4faa13|
  17. |0.5731835407930634 |0.4923479652912024 |driver-213|0.08988581780930216|0.42520899698713666|64.27696295884016 |americas/united_states/san_francisco|rider-213|1645298902122|d64b94ec-d8e8-44f3-a5c0-e205e034aa5d|
  18. |0.21624150367601136|0.14285051259466197|driver-213|0.5890949624813784 |0.0966823831927115 |93.56018115236618 |americas/united_states/san_francisco|rider-213|1645132033863|fd8f9051-b5d2-4403-8002-8bb173df5dc8|
  19. |0.40613510977307 |0.5644092139040959 |driver-213|0.798706304941517 |0.02698359227182834|17.851135255091155|asia/india/chennai |rider-213|1645254343160|160c7699-7f5e-4ec3-ba76-9ae63ae815af|
  20. |0.8742041526408587 |0.7528268153249502 |driver-213|0.9197827128888302 |0.362464770874404 |19.179139106643607|americas/united_states/san_francisco|rider-213|1645452263906|fe9d75c0-f326-4cef-8596-4248a57d1fea|
  21. |0.1856488085068272 |0.9694586417848392 |driver-213|0.38186367037201974|0.25252652214479043|33.92216483948643 |americas/united_states/san_francisco|rider-213|1645133755620|5d149bc7-78a8-46df-b2b0-a038dc79e378|
  22. |0.0750588760043035 |0.03844104444445928|driver-213|0.04376353354538354|0.6346040067610669 |66.62084366450246 |americas/brazil/sao_paulo |rider-213|1645362187498|da2dd8e5-c2d9-45e2-8c96-520927e5458d|
  23. |0.651058505660742 |0.8192868687714224 |driver-213|0.20714896002914462|0.06224031095826987|41.06290929046368 |asia/india/chennai |rider-213|1645575914370|f01e9d28-df30-454c-a780-b56cd5b43ce7|
  24. |0.11488393157088261|0.6273212202489661 |driver-213|0.7454678537511295 |0.3954939864908973 |27.79478688582596 |americas/united_states/san_francisco|rider-213|1645094601577|bd4ae628-3885-4b26-8a50-c14f8e42a265|
  25. +-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
  26. only showing top 10 rows

运行程序后会发现数据已经插入到HDFS中了,如下图所示:


注:Hudi系列博文为通过对Hudi官网学习记录所写,其中有加入个人理解,如有不足,请各位读者谅解☺☺☺

注:****其他相关文章链接由此进(包括Hudi在内的各大数据相关博文) -> 大数据基础知识点 文章汇总



本文转载自: https://blog.csdn.net/yang_shibiao/article/details/123100943
版权归原作者 电光闪烁 所有, 如有侵权,请联系我们删除。

“数据湖之Hudi(9):使用Spark向Hudi中插入数据”的评论:

还没有评论