0


Seatunnel实战:hive_to_starrocks

一、前言

  • SeaTunnel是一个分布式、高性能、可扩展的数据同步工具,它支持多种数据源之间的数据同步,包括Hive和StarRocks。可以使用SeaTunnel的Hive源连接器从Hive读取外部数据源数据,然后使用StarRocks接收器连接器将数据发送到StarRocks。
  • 通过StarRocks读取外部数据源数据。StarRocks源连接器的内部实现是从前端(FE)获得查询计划,将查询计划作为参数传递给BE节点,然后从BE节点获得数据结果。
    名称版本StarRocks2.4.2SeaTunnel2.3.1Spark3.2.1Flink1.16.1

    二、安装SeaTunnel

  1. 安装并设置Java(Java 8 或 11,其他高于 Java 8 的版本理论上也可以使用)JAVA_HOME。
  2. 进入seatunnel下载页面,下载最新版本的distribute packageseatunnel--bin.tar.gz,或者可以通过终端下载(下载很慢,这边已上传至云盘,可以直接自取)
exportversion="2.3.1"wget"https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz"tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"

链接:https://pan.baidu.com/s/1nT0BgUutW66cyiu2C_jqIg
提取码:acdy

  1. 安装连接器
  • 从2.2.0-beta开始,二进制包默认不提供connector依赖,所以第一次使用时,我们需要执行如下命令安装connector:(当然也可以手动下载connector从(Apache Maven Repository下载,然后手动移动到connectors目录下的seatunnel子目录)。
  • 指定connector的版本,执行
/bin/bash /app/apache-seatunnel-incubating-2.3.1/bin/install-plugin.sh 2.3.1

在这里插入图片描述

  1. 安装完检查connectors目录文件

在这里插入图片描述

  1. 配置 SeaTunnel,更改设置config/seatunnel-env.sh
  • 自行选择合适的spark(要求版本>=2.4.0)、flink 版本(要求版本>=1.12.0)
vim /app/apache-seatunnel-incubating-2.3.1/config/seatunnel-env.sh 

在这里插入图片描述

  1. 运行SeaTunnel,查看是否部署安装成功
-- 本地模式
/app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-3-connector-v2.sh \
-m local[*]\
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/seatunnel.streaming.conf.template 

-- 集群模式
/app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-3-connector-v2.sh \
-m yarn\
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/seatunnel.streaming.conf.template 

在这里插入图片描述

  1. SeaTunnel 控制台会打印一些日志如下

在这里插入图片描述

三、配置Seatunnel config

1、Hive source

  • 配置 Hive源,您需要在SeaTunnel作业配置文件中指定Hive的连接信息,包括metastore_uri、table_name。更多seatunnel source hive 例如:
source{
 
  Hive {#parallelism = 6 
    table_name ="mid.ads_test_hive_starrocks_ds"
    metastore_uri ="thrift://192.168.10.200:9083"
    result_table_name ="hive_starrocks_ds"}}

在这里插入图片描述

2、Seatunnel Transform

  • Transform 是指在数据迁移过程中对数据进行转换的过程。它支持多种转换插件,包括Json、NullRate、Nulltf、Replace、Split、Sql、udf和UUID等,Transform插件具有一些通用参数,可以在SeaTunnel作业配置文件中指定这些参数来控制数据转换的行为。更多seatunnel Transform 例如:
transform {
    Filter {
      source_table_name ="fake"
      fields =[name]
      result_table_name ="fake_name"}
    Filter {
      source_table_name ="fake"
      fields =[age]
      result_table_name ="fake_age"}}

在这里插入图片描述

3、StarRocks sink

  • 配置 StarRocks接收器,您需要在SeaTunnel作业配置文件中指定StarRocks的连接信息,包括JDBC URL、用户名和密码。更多seatunnel sink starrocks 例如:
sink {
        starrocks {
                nodeUrls =["192.168.10.10:8030","192.168.10.11:8030","192.168.10.12:8030"]
                base-url ="jdbc:mysql://192.168.10.10:9030/"
                username = root
                password ="xxxxxxxxx"
                database ="example_db"
                table ="ads_test_hive_starrocks_ds"
                batch_max_rows =500000
                batch_max_bytes =104857600
                batch_interval_ms =30000
                starrocks.config ={format="CSV"
                column_separator ="\\x01"
                row_delimiter ="\\x02"}}}

在这里插入图片描述

四、Run SeaTunnel hive_to_starrocks

cat /app/apache-seatunnel-incubating-2.3.1/config/hive_to_sr2.conf

env{
        spark.app.name ="apache-seatunnel-2.3.1_hive_to_sr"
        spark.yarn.queue ="root.default"
        spark.executor.instances =2
        spark.executor.cores =4
        spark.driver.memory ="3g"
        spark.executor.memory ="4g"
        spark.ui.port =1300
        spark.sql.catalogImplementation ="hive"
        spark.hadoop.hive.exec.dynamic.partition ="true"
        spark.hadoop.hive.exec.dynamic.partition.mode ="nonstrict"
        spark.network.timeout ="1200s"
        spark.sql.sources.partitionOverwriteMode ="dynamic"
        spark.yarn.executor.memoryOverhead = 800m
        spark.kryoserializer.buffer.max = 512m
        spark.task.maxFailures=0
        spark.executor.extraJavaOptions ="-Dfile.encoding=UTF-8"
        spark.driver.extraJavaOptions ="-Dfile.encoding=UTF-8"
        job.name ="apache-seatunnel-2.3.1_hive_to_sr"}source{

  Hive {#parallelism = 6 
    table_name ="mid.ads_test_hive_starrocks_ds"
    metastore_uri ="thrift://192.168.10.200:9083"
    result_table_name ="hive_starrocks_ds_t1"}}

transform {
 sql {
      query  ="select xxx,xxx,xxx,xxx from hive_starrocks_ds_t1  where period_sdate >= '2022-10-31'"
      source_table_name ="hive_starrocks_ds_t1"
      result_table_name ="hive_starrocks_ds_t2"}}

sink {
        starrocks {
                nodeUrls =["192.168.10.10:8030","192.168.10.11:8030","192.168.10.12:8030"]
                base-url ="jdbc:mysql://192.168.10.10:9030/"
                username = root
                password ="xxxxxxxxx"
                database ="example_db"
                table ="ads_test_hive_starrocks_ds"
                batch_max_rows =500000
                batch_max_bytes =104857600
                batch_interval_ms =30000
                starrocks.config ={format="CSV"
                column_separator ="\\x01"
                row_delimiter ="\\x02"}}}
  • spark3.x.x
sudo -u hive /app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-3-connector-v2.sh \
-m yarn\
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/hive_to_sr2.conf 
  • spark2.x.x
sudo -u hive /app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-2-connector-v2.sh \
-m yarn\
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/hive_to_sr2.conf 

五、总结

1、问题总结

  • A. 中文乱码问题

设置环境变量来解决中文乱码问题。可以在env中添加以下参数,这将设置Java虚拟机的编码格式为UTF-8,以便正确处理中文字符。

spark.executor.extraJavaOptions ="-Dfile.encoding=UTF-8"
spark.driver.extraJavaOptions ="-Dfile.encoding=UTF-8"

在这里插入图片描述

  • B. 内存限制而被YARN杀死

Spark 程序在 YARN 集群上运行时,由于超出了内存限制而丢失了一个执行器。考虑增加 spark.yarn.executor.memoryOverhead 用于指定每个执行器保留的用于内部元数据、用户数据结构和其他堆外内存需求的堆外内存量。该参数的值将添加到执行器内存中,以确定每个执行器对 YARN 的完整内存请求。建议不要将此值设置得过高,因为这可能会导致过多的垃圾收集开销和性能降低。

spark.yarn.executor.memoryOverhead = 800M

在这里插入图片描述

  • C. db 2153532 is 100 larger than limit 100?

错误信息表明您在尝试将数据刷新到 StarRocks 时遇到了问题。在 db 2153532 上运行的事务数为 100,超过了限制 100。可以尝试减少并发事务的数量,以减轻集群的压力。另外可以调整相关参数。

-- 修改事务数
ADMIN SHOW FRONTEND CONFIG ('max_running_txn_num_per_db'='300')
-- 查看参数是否调整
ADMIN SHOW FRONTEND CONFIG LIKE'%max_running_txn_nu%';

在这里插入图片描述
在这里插入图片描述

  • D. sink导入starrocks数据量不对

刚开始遇到这个问题挺纳闷,怀疑是Seatunnel设置的任务重试导致starrocks这边数据量变多,看到官方文档有设置重试参数为max_retries,将其设置为0,重跑还是有问题,明明在yarn上面提交的任务application是成功的。
在这里插入图片描述
在这里插入图片描述

后来查看spark ui看到有3个Failed Tasks之后才明白,原来是spark某几个tasks内存不足导致tasks失败重试,导致sink导入的数据量变多。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
解决方案:
1、使用starrocks主键模型进行数据去重特性,保证数据唯一性的(目前官方最新版本2.3.1还不支持sink_starrocks exectly-once保障数据幂等性)

2、将spark参数重试机制设置为0,这样设置后,当任务执行失败时,Spark不再尝试重新执行该任务。

spark.task.maxFailures=0

在这里插入图片描述
3、要确定spark.task.maxFailures=0是否在Spark任务中生效,可以查看Spark任务的日志文件。当Spark任务失败时,应该可以在日志中找到类似于以下内容的行:

23/05/12 15:17:55 ERROR scheduler.TaskSetManager: Task 165in stage 1.0 failed 0times; aborting job

可以看出任务失败了一次,但是并没有重试,因为日志中的错误信息是Task 165 in stage 1.0 failed 0 times。这是因为将spark.task.maxFailures设置为0,表示任务失败后不进行重试。

2、使用总结

  • 本篇文章带大家了解使用Seatunnel将Hive中的数据导入到StarRocks中,除此之外,Seatunnel还有很多种数据源可以支持,也有很多种导入方式,例如DataX 、CloudCanal 等
  • 将数仓中跑完的Hive的相关表每天导入到StarRocks中,可以使用以下场景: 1、不更新历史数据: 如果是分区表,我们增量导入到 StarRocks 中即可。非分区表全量导入。 2、更新历史数据: 这种情况主要存在分区表中,往往会更改前几个月数据或者时间更久的数据,这种情况下,就不得不将该表重新同步一边,使StarRocks中的数据和hive中的数据保持一致。hive中表的元数据发生变化,和StarRocks中的表结构不一致: 这种情况下,就需要我们删除重新建表或者truncate历史分区,重新同步数据。
标签: hive 大数据

本文转载自: https://blog.csdn.net/weixin_45971974/article/details/129991740
版权归原作者 柠檬味的鱼° 所有, 如有侵权,请联系我们删除。

“Seatunnel实战:hive_to_starrocks”的评论:

还没有评论