0


【Spark 实战】基于spark3.4.2+iceberg1.6.1搭建本地调试环境

基于spark3.4.2+iceberg1.6.1搭建本地调试环境

文章目录

环境准备

  • IntelliJ IDEA 2024.1.2 (Ultimate Edition)
  • JDK 1.8
  • Spark 3.4.2
  • Iceberg 1.6.1

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

使用maven构建sparksql

pom文件

<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.donny.demo</groupId><artifactId>iceberg-demo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>iceberg-demo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><spark.version>3.4.2</spark.version><iceberg.version>1.6.1</iceberg.version><parquet.version>1.13.1</parquet.version><avro.version>1.11.3</avro.version><parquet.hadoop.bundle.version>1.8.1</parquet.hadoop.bundle.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version><exclusions><exclusion><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version><exclusions><exclusion><groupId>org.apache.parquet</groupId><artifactId>parquet-column</artifactId></exclusion><exclusion><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop-bundle</artifactId></exclusion><exclusion><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-core</artifactId><version>${iceberg.version}</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-spark-3.4_2.12</artifactId><version>${iceberg.version}</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-spark-extensions-3.4_2.12</artifactId><version>${iceberg.version}</version><exclusions><exclusion><groupId>org.antlr</groupId><artifactId>antlr4</artifactId></exclusion><exclusion><groupId>org.antlr</groupId><artifactId>antlr4-runtime</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-column</artifactId><version>${parquet.version}</version></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>${parquet.version}</version></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop-bundle</artifactId><version>${parquet.hadoop.bundle.version}</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>${avro.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency></dependencies></project>

在 idea 中 直接使用iceberg 生成好的 runtime jar,无法attach 不上 iceberg 的源码,为了解决这个问题把maven 依赖改成上面的pom文件上的iceberg依赖。

<dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-spark-runtime-3.4_2.12</artifactId><version>1.6.1</version></dependency>

编辑SparkSQL简单任务

  1. 指定了 catalog 类型为 hadoop。可以方便简单的本地调试。
  2. 创建非分区的iceberg原生表
  3. 插入数据
  4. 查询数据(展示数据)
packagecom.donny.demo;importorg.apache.iceberg.expressions.Expressions;importorg.apache.iceberg.spark.Spark3Util;importorg.apache.iceberg.spark.actions.SparkActions;importorg.apache.spark.api.java.function.FilterFunction;importorg.apache.spark.sql.AnalysisException;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.catalog.Table;importjava.util.Objects;/**
 * @author [email protected]
 * @version 1.0
 * @since 2024年09月26日
 */publicclassIcebergSparkDemo{publicstaticvoidmain(String[] args)throwsAnalysisException{SparkSession spark =SparkSession.builder().master("local").appName("Iceberg spark example").config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions").config("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog").config("spark.sql.catalog.local.type","hadoop")//指定catalog 类型.config("spark.sql.catalog.local.warehouse","iceberg_warehouse").getOrCreate();

        spark.sql("create database iceberg_db");
        spark.sql("CREATE TABLE local.iceberg_db.table (id bigint, data string) USING iceberg ");
        spark.sql("INSERT INTO local.iceberg_db.table VALUES (1, 'a'), (2, 'b'), (3, 'c')");Dataset<Row> result = spark.sql("select * from local.iceberg_db.table order by data");
        result.show();

        spark.close();}}

附录A iceberg术语

  • Schema – 表中的字段名称和类型
  • Partition spec – 定义如何从数据字段导出分区值。
  • Partition tuple – 分区元组是存储在每个数据文件中的分区数据的元组或结构体
  • Snapshot – 表在某个时间点的状态,包括所有数据文件的集合。
  • Snapshot log – 快照日志是记录表当前快照随时间变化情况的元数据日志。该日志是一个时间戳和ID对的列表:当前快照发生变化的时间和当前快照发生变化的ID。
  • Manifest list – 列出清单文件的文件;每个快照一个。
  • Manifest – 列出数据或删除文件的文件;快照的子集。
  • Data file – 包含表行的文件。
  • Delete file – 对表格中按位置或数据值删除的行进行编码的文件。

参考

Iceberg 源码阅读(一) 搭建本地调试环境


本文转载自: https://blog.csdn.net/weixin_43820556/article/details/142631269
版权归原作者 顧棟 所有, 如有侵权,请联系我们删除。

“【Spark 实战】基于spark3.4.2+iceberg1.6.1搭建本地调试环境”的评论:

还没有评论