0


数据湖之Hudi(9):使用Spark向Hudi中插入数据

0. 相关文章链接

大数据基础知识点 文章汇总

1. 开发说明

Apache Hudi最初是由Uber开发的,旨在以高效率实现低延迟的数据库访问。Hudi 提供了Hudi 表的概念,这些表支持CRUD操作,基于Spark框架使用Hudi API 进行读写操作。

2. 环境构建

2.1. 构建服务器环境

关于构建Spark向Hudi中插入数据的服务器环境,可以参考博文的另外一篇博文,在CentOS7上安装HDFS即可,博文连接:数据湖之Hudi(6):Hudi与Spark和HDFS的集成安装使用

2.2. 构建Maven项目

需要在IDEA中创建一个Maven工程,并将服务器上的core-site.xml 和 hdfs-site.xml 这2个配置文件导入,以及创建一个log4j.properties文件,如下图所示:

log4j.properties 文件内容如下:

log4j.rootCategory=WARN, console
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

注意,这是本地跑程序,需要配置好域名映射。

3. Maven依赖

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <properties>
        <scala.version>2.12.10</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.0.0</spark.version>
        <hadoop.version>3.0.0</hadoop.version>
        <hudi.version>0.9.0</hudi.version>
    </properties>

    <dependencies>

        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Spark SQL 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- hudi-spark3 -->
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark3-bundle_2.12</artifactId>
            <version>${hudi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>

    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

4. 核心代码

在上述图片的包中新建scala的object对象,对象名为:Demo01_InsertForCOW,用于实现模拟数据,插入Hudi表,采用COW模式。

具体需求:使用官方QuickstartUtils提供模拟产生Trip数据,模拟100条交易Trip乘车数据,将其转换为DataFrame数据集,保存至Hudi表中,代码基本与spark-shell命令行一致

具体代码如下:

package com.ouyang.hudi.crud

import org.apache.hudi.QuickstartUtils.DataGenerator
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
 * @ date: 2022/2/23
 * @ author: yangshibiao
 * @ desc: 模拟数据,插入Hudi表,采用COW模式
 * 使用官方QuickstartUtils提供模拟产生Trip数据,
 * 模拟100条交易Trip乘车数据,将其转换为DataFrame数据集,
 * 保存至Hudi表中,代码基本与spark-shell命令行一致
 */
object Demo01_InsertForCOW {

    def main(args: Array[String]): Unit = {

        // 创建SparkSession实例对象,设置属性
        val spark: SparkSession = {
            SparkSession.builder()
                .appName(this.getClass.getSimpleName.stripSuffix("$"))
                .master("local[4]")
                // 设置序列化方式:Kryo
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .getOrCreate()
        }

        // 定义变量:表名称、保存路径
        val tableName: String = "tbl_trips_cow"
        val tablePath: String = "/hudi-warehouse/tbl_trips_cow"

        // 构建数据生成器,模拟产生业务数据
        import org.apache.hudi.QuickstartUtils._
        import scala.collection.JavaConverters._
        import spark.implicits._

        // 第1步、模拟乘车数据
        val dataGen: DataGenerator = new DataGenerator()
        val inserts = convertToStringList(dataGen.generateInserts(100))

        // 将集合对象写入到df中
        val insertDF: DataFrame = spark.read.json(
            spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
        )
        insertDF.printSchema()
        insertDF.show(10, truncate = false)

        // TOOD: 第2步、插入数据到Hudi表
        import org.apache.hudi.DataSourceWriteOptions._
        import org.apache.hudi.config.HoodieWriteConfig._
        insertDF.write
            .mode(SaveMode.Append)
            .format("hudi")
            .option("hoodie.insert.shuffle.parallelism", "2")
            .option("hoodie.upsert.shuffle.parallelism", "2")
            // Hudi 表的属性值设置
            .option(PRECOMBINE_FIELD.key(), "ts")
            .option(RECORDKEY_FIELD.key(), "uuid")
            .option(PARTITIONPATH_FIELD.key(), "partitionpath")
            .option(TBL_NAME.key(), tableName)
            .save(tablePath)
    }
}

点击执行后可能会碰到 null\bin\winutils.exe in the Hadoop binaries 问题,这个是在windows本地执行时没有对应环境,可以忽略,如下图所示:

在代码中打印了数据格式和部分数据,如下所示:

root
 |-- begin_lat: double (nullable = true)
 |-- begin_lon: double (nullable = true)
 |-- driver: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- partitionpath: string (nullable = true)
 |-- rider: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- uuid: string (nullable = true)

+-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
|begin_lat          |begin_lon          |driver    |end_lat            |end_lon            |fare              |partitionpath                       |rider    |ts           |uuid                                |
+-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
|0.4726905879569653 |0.46157858450465483|driver-213|0.754803407008858  |0.9671159942018241 |34.158284716382845|americas/brazil/sao_paulo           |rider-213|1645620263263|550e7186-203c-48a8-9964-edf12e0dfbe3|
|0.6100070562136587 |0.8779402295427752 |driver-213|0.3407870505929602 |0.5030798142293655 |43.4923811219014  |americas/brazil/sao_paulo           |rider-213|1645074858260|c8d5e237-6589-419e-bef7-221faa4faa13|
|0.5731835407930634 |0.4923479652912024 |driver-213|0.08988581780930216|0.42520899698713666|64.27696295884016 |americas/united_states/san_francisco|rider-213|1645298902122|d64b94ec-d8e8-44f3-a5c0-e205e034aa5d|
|0.21624150367601136|0.14285051259466197|driver-213|0.5890949624813784 |0.0966823831927115 |93.56018115236618 |americas/united_states/san_francisco|rider-213|1645132033863|fd8f9051-b5d2-4403-8002-8bb173df5dc8|
|0.40613510977307   |0.5644092139040959 |driver-213|0.798706304941517  |0.02698359227182834|17.851135255091155|asia/india/chennai                  |rider-213|1645254343160|160c7699-7f5e-4ec3-ba76-9ae63ae815af|
|0.8742041526408587 |0.7528268153249502 |driver-213|0.9197827128888302 |0.362464770874404  |19.179139106643607|americas/united_states/san_francisco|rider-213|1645452263906|fe9d75c0-f326-4cef-8596-4248a57d1fea|
|0.1856488085068272 |0.9694586417848392 |driver-213|0.38186367037201974|0.25252652214479043|33.92216483948643 |americas/united_states/san_francisco|rider-213|1645133755620|5d149bc7-78a8-46df-b2b0-a038dc79e378|
|0.0750588760043035 |0.03844104444445928|driver-213|0.04376353354538354|0.6346040067610669 |66.62084366450246 |americas/brazil/sao_paulo           |rider-213|1645362187498|da2dd8e5-c2d9-45e2-8c96-520927e5458d|
|0.651058505660742  |0.8192868687714224 |driver-213|0.20714896002914462|0.06224031095826987|41.06290929046368 |asia/india/chennai                  |rider-213|1645575914370|f01e9d28-df30-454c-a780-b56cd5b43ce7|
|0.11488393157088261|0.6273212202489661 |driver-213|0.7454678537511295 |0.3954939864908973 |27.79478688582596 |americas/united_states/san_francisco|rider-213|1645094601577|bd4ae628-3885-4b26-8a50-c14f8e42a265|
+-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
only showing top 10 rows

运行程序后会发现数据已经插入到HDFS中了,如下图所示:


注:Hudi系列博文为通过对Hudi官网学习记录所写,其中有加入个人理解,如有不足,请各位读者谅解☺☺☺

注:****其他相关文章链接由此进(包括Hudi在内的各大数据相关博文) -> 大数据基础知识点 文章汇总



本文转载自: https://blog.csdn.net/yang_shibiao/article/details/123100943
版权归原作者 电光闪烁 所有, 如有侵权,请联系我们删除。

“数据湖之Hudi(9):使用Spark向Hudi中插入数据”的评论:

还没有评论