0


Spark 读取阿里云 MaxCompute数据源写阿里云OSS和华为云OBS

前情提要:当前Spark 版本为2.4.5写数据到阿里云OSS

1、编写Spark 代码 - 写OSS

publicclassSparkODPS2OSS4{publicstaticvoidmain(String[] args){SparkSession spark =SparkSession.builder().appName("ODPS2OSS")// 可访问OSS的AK,SK.config("spark.hadoop.fs.oss.impl","org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem").config("spark.hadoop.fs.oss.accessKeyId","*******************").config("spark.hadoop.fs.oss.accessKeySecret","**************").config("spark.hadoop.fs.oss.endpoint","oss-cn-hangzhou-internal.aliyuncs.com").getOrCreate();try{// 通过SQL查询表在这里插入代码片Dataset<Row> data = spark.sql("select 字段 from 表名");// 展示数据
            data.show(20);// 将数据存储到OSS中
            data.toDF().coalesce(1).write().mode(SaveMode.Append).csv("oss://路径");}catch(Exception e){
            e.printStackTrace();}finally{
            spark.stop();}}}

编写Spark 代码 - 写OBS

publicclassSparkODPS2OBS{publicstaticvoidmain(String[] args){SparkSession spark =SparkSession.builder().appName("ODPS2OSS")// 需设置spark.master为local[N]才能直接运行,N为并发数.config("spark.master","local[4]").config("spark.sql.catalogImplementation","hive").config("spark.sql.defaultCatalog","odps").config("spark.hadoop.fs.obs.access.key","********").config("spark.hadoop.fs.obs.secret.key","*********").config("spark.hadoop.fs.obs.endpoint","obs.cn-east-3.myhuaweicloud.com").getOrCreate();try{// 通过SQL查询表Dataset<Row> data = spark.sql("select * from 表名");// 展示数据
            data.show(10);// 将数据存储到OBS中
            data.write().mode(SaveMode.Overwrite).parquet("obs://路径");
            spark.read().parquet("obs://路径").show();}catch(Exception e){
            e.printStackTrace();}finally{
            spark.stop();}}}

2、配置单独的依赖

2.1、先去阿里云的官网配置下载gz包

https://help.aliyun.com/zh/maxcompute/user-guide/set-up-a-linux-development-environment#section-jod-09w-n5w
在这里插入图片描述

2.2、下载之后解压目录为

在这里插入图片描述

2.3、下载之后需要再idea中手动依赖上图中的jars,操作步骤如下

在这里插入图片描述
在这里插入图片描述
这里需要把依赖的文件夹移动如图的位置,这个涉及到加载依赖相关的事情,不然会报错
在这里插入图片描述

3、在resource目录下新增odps.conf配置文件

odps.project.name=MaxCompute的项目空间
odps.access.id=*************
odps.access.key=*************
spark.hadoop.odps.cupid.eni.enable=true
spark.hadoop.odps.cupid.eni.info=cn-hangzhou:vpc-bp1rs4et9dv46cx8j2p8k
odps.end.point=http://service.cn-hangzhou.maxcompute.aliyun.com/api

4、pom文件

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><properties><spark.version>2.4.5</spark.version><cupid.sdk.version>3.3.8-public</cupid.sdk.version><scala.version>2.11.8</scala.version><scala.binary.version>2.11</scala.binary.version><java.version>1.8</java.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties><groupId>org.example</groupId><artifactId>spark_example</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- https://mvnrepository.com/artifact/com.fasterxml.woodstox/woodstox-core --><dependency><groupId>com.fasterxml.woodstox</groupId><artifactId>woodstox-core</artifactId><version>6.4.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope><exclusions><exclusion><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></exclusion><exclusion><groupId>org.scala-lang</groupId><artifactId>scalap</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>cupid-sdk</artifactId><version>${cupid.sdk.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>hadoop-fs-oss</artifactId><version>${cupid.sdk.version}</version></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>odps-spark-datasource_${scala.binary.version}</artifactId><version>${cupid.sdk.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-actors</artifactId><version>${scala.version}</version></dependency><!-- datahub streaming依赖 --><dependency><groupId>com.aliyun.emr</groupId><artifactId>emr-datahub_${scala.binary.version}</artifactId><version>1.6.0</version></dependency><dependency><groupId>com.aliyun.datahub</groupId><artifactId>aliyun-sdk-datahub</artifactId><version>2.9.4-public</version><exclusions><exclusion><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version><exclusions><exclusion><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version><exclusions><exclusion><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>streaming-lib</artifactId><version>3.3.8-public</version><exclusions><exclusion><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId></exclusion><exclusion><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></exclusion><exclusion><groupId>org.scala-lang</groupId><artifactId>scalap</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.zeppelin</groupId><artifactId>spark-interpreter</artifactId><version>0.8.1</version></dependency><!--For usage of log4j2 interface--><!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.12.4</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><minimizeJar>false</minimizeJar><shadedArtifactAttached>true</shadedArtifactAttached><artifactSet><includes><!-- Include here the dependencies you
                                        want to be packed in your fat jar --><include>*:*</include></includes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude><exclude>**/log4j.properties</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>
                                        META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
                                    </resource></transformer></transformers></configuration></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.3.2</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution><execution><id>scala-test-compile-first</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build></project>

5、注意!

由于hadoop使用了3.1.1 然后 阿里云官网提供的jars里面的hadoop是2.7版本,所以需要把里面的jar包都要替换掉,不然会报错,替换掉之后pom里面还要新增一个依赖,jars里面还要新增 一个jar包 commons-configuration2-2.8.0.jar
在这里插入图片描述

<!-- https://mvnrepository.com/artifact/com.fasterxml.woodstox/woodstox-core --><dependency><groupId>com.fasterxml.woodstox</groupId><artifactId>woodstox-core</artifactId><version>6.4.0</version></dependency>
标签: spark 阿里云 odps

本文转载自: https://blog.csdn.net/weixin_43078951/article/details/136739381
版权归原作者 烧酒与墨 所有, 如有侵权,请联系我们删除。

“Spark 读取阿里云 MaxCompute数据源写阿里云OSS和华为云OBS”的评论:

还没有评论