本文主要介绍通过spark导入doris的3种方式。
1.最简单的方式:jdbc
jdbc 方式需要引入mysql-connector-java的依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
代码demo
.....
df.show()
df
.write.format("jdbc").mode(SaveMode.Append).option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://xxxx:xx/xx?rewriteBatchedStatements=true").option("batchsize","10000").option("user","xxxx").option("password","xxxx").option("isolationLevel","NONE").option("dbtable","xxxxxx").save()
注意:
一定要添加?rewriteBatchedStatements=true参数,不然导入速度会很慢。
2.Doris官方推荐的方式:Spark Doris Connector
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
代码库地址:https://github.com/apache/doris-spark-connector
- 支持从
Doris
中读取数据 - 支持
Spark DataFrame
批量/流式 写入Doris
- 可以将
Doris
表映射为DataFrame
或者RDD
,推荐使用DataFrame
。 - 支持在
Doris
端完成数据过滤,减少数据传输量。
版本兼容:
ConnectorSparkDorisJavaScala1.2.03.2, 3.1, 2.31.0 +82.12, 2.111.1.03.2, 3.1, 2.31.0 +82.12, 2.111.0.13.1, 2.30.12 - 0.1582.12, 2.11
使用已经编译好的版本
可在https://repo.maven.apache.org/maven2/org/apache/doris/下载需要的jar包 但是可供选择的版本比较少,目前只有下图中的3个。
自行编译
编译步骤:
- 修改
custom_env.sh.tpl
文件,重命名为custom_env.sh
- 在源码目录下执行:
sh build.sh
根据提示输入你需要的 Scala 与 Spark 版本进行编译。
编译成功后,会在
dist
目录生成目标jar包,如:
spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
。 将此文件复制到
Spark
的
ClassPath
中即可使用
Spark-Doris-Connector
。
例如,
Local
模式运行的
Spark
,将此文件放入
jars/
文件夹下。
Yarn
集群模式运行的
Spark
,则将此文件放入预部署包中。
例如将
spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
上传到 hdfs 并在
spark.yarn.jars
参数上添加 hdfs 上的 Jar 包路径
- 上传
spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
到hdfs。
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/
- 在集群中添加
spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
依赖。
spark.yarn.jars=hdfs:///spark-ja
版权归原作者 炼数成器 所有, 如有侵权,请联系我们删除。