0


二次开发Spark实现JDBC读取远程租户集群Hive数据并落地到本集群Hive的Hive2Hive数据集成【Java】

背景

肤浅

的SQL Boy们可能只知道pyspark构建出sparkSession对象【当然要

enableHiveSupport

】后,写一句SQL:

spark.sql(“

这里写一句SQL字符串

”);

然后spark就会根据此处的SQL,完成各种select查数据、insert overwrite灌数据到结果表的种种操作。对SQL Boy们来说,足够用了,毕竟搞数仓和ETL的可能只会SQL也只用得到SQL。

但是平台开发,这种程度显然是不够的。例如最常见的

数据集成

数据入湖

,一定会涉及

跨集群

多集群

跨Kerberos域

等问题。处理

异构数据源

的数据时,SQL还是很乏力的,举个栗子,我肯定是不能写一句SQL就把

HDFS文件块

的数据处理后写入

FTP服务器

正常的数据集成一般是Spark读取本集群Hive数据,推送到远程租户集群的Hive,但是

CDP版本的Spark

会自动根据集群配置文件读取数据,这种“方便”也就导致了想要

读取远程集群Hive数据

时有很多不便。本文就是为了解决这个问题。

原理分析

不要以为Java和pyspark可以构建多个sparkSession对象,编译前Idea不会报错,就可以构建多个sparkSession对象连接多个Hive。

Spark Session对象在全生命周期只能设置一个Thrift Server链接,换句话说,这种方式Spark Session只能同时连接一个Hive,不能跨集群读写Hive。

考虑到Hive有JDBC访问MySQL的方式,尝试JDBC方式读写Hive。但是遇到了一些问题,扒源码,

JdbcDialects.scala

中找到:

packageorg.apache.spark.sql.jdbc//222行 packageorg.apache.spark.sql.jdbc
#222行附近

/**
 * :: DeveloperApi ::
 * Registry of dialects that apply to every new jdbc `org.apache.spark.sql.DataFrame`.
 *
 * If multiple matching dialects are registered then all matching ones will be
 * tried in reverse order. A user-added dialect will thus be applied first,
 * overwriting the defaults.
 *
 * @note All new dialects are applied to new jdbc DataFrames only. Make
 * sure to register your dialects first.
 */@[email protected] JdbcDialects {/**
   * Register a dialect for use on all new matching jdbc `org.apache.spark.sql.DataFrame`.
   * Reading an existing dialect will cause a move-to-front.
   *
   * @param dialect The new dialect.
   */def registerDialect(dialect: JdbcDialect):Unit={
    dialects = dialect :: dialects.filterNot(_ == dialect)}/**
   * Unregister a dialect. Does nothing if the dialect is not registered.
   *
   * @param dialect The jdbc dialect.
   */def unregisterDialect(dialect : JdbcDialect):Unit={
    dialects = dialects.filterNot(_ == dialect)}private[this]var dialects = List[JdbcDialect]()

  registerDialect(MySQLDialect)
  registerDialect(PostgresDialect)
  registerDialect(DB2Dialect)
  registerDialect(MsSqlServerDialect)
  registerDialect(DerbyDialect)
  registerDialect(OracleDialect)
  registerDialect(TeradataDialect)/**
   * Fetch the JdbcDialect class corresponding to a given database url.
   */def get(url:String): JdbcDialect ={val matchingDialects = dialects.filter(_.canHandle(url))
    matchingDialects.length match{case0=> NoopDialect
      case1=> matchingDialects.head
      case _ =>new AggregatedDialect(matchingDialects)}}}

Spark的JDBC原生支持MySQL和SQL Server等7种DB,居然不支持Hive!!!那么,想要让Spark支持JDBC方式访问Hive、Kylin、Druid、Doris、ClickHouse这类组件,就要二开,自行实现相关功能。

二开Spark

Scala的解决方式:

caseobject HiveSqlDialect extends JdbcDialect {overridedef canHandle(url:String):Boolean= url.startsWith("jdbc:hive2")overridedef quoteIdentifier(colName:String):String={ colName.split('.').map(part =>s"`$part`").mkString(".")}}class RegisterHiveSqlDialect {def register():Unit={ 
    JdbcDialects.registerDialect(HiveSqlDialect)}}

但是,Scala在2018年可能还很流行,2020年用的人已经不多了,为了少出问题,使用Java方式重构:

packagecom.zhiyong.Hive2Hive;importorg.apache.spark.sql.jdbc.JdbcDialect;publicclassHiveDialectextendsJdbcDialect{@OverridepublicbooleancanHandle(String url){return url.startsWith("jdbc:hive2");}@OverridepublicStringquoteIdentifier(String colName){//需要返回colName.split('.').map(part => s"`$part`").mkString(".") String[] split1 = colName.split("\\.");//先切分字符串 String[] split2 =newString[split1.length];StringBuilder strb =newStringBuilder();String result =null;int index =0;//调用map映射读取到值 for(String part : split1){//split2[index] = "`$" + part + "`"; 
        split2[index]="`"+ part +"`"; 
        index++;}//使用.拼接字符串 for(int i =0; i < split2.length; i++){String cell = split2[i];if(i !=0){ strb.append(".");}
        strb.append(cell);}

    result = strb.toString();return result;//返回String str= colName.split(".").map(part => s"`$part`").mkString("."); }}

二开之后,在主类中即可使用:

JdbcDialect hiveDialect =newHiveDialect();JdbcDialects.registerDialect(hiveDialect);//重编码,防止报错 Map<String,String> sourceMap =newLinkedHashMap<>(); 

sourceMap.put("url","jdbc:hive2://192.168.88.11:10000"); 
sourceMap.put("driver","org.apache.hive.jdbc.HiveDriver"); 
sourceMap.put("user","root"); sourceMap.put("password","123456");
sourceMap.put("dbtable","aaa.test1");SparkSession sc =SparkSession.builder().appName("aaaaa").master("local[*]").enableHiveSupport()//这里只是连接到了本集群的Hive.getOrCreate();Dataset<Row> jdbcReadDF = sc.read().format("jdbc").options(sourceMap).load();//使用JDBC读取到hive的数据

即可走JDBC读到Hive的数据。

测试效果

经测试,在云桌面也可以

过Kerberos认证

并且拿到数据。这种方式拿
到的DF表头有问题(自带了表名):
在这里插入图片描述
标准的Spark SQL(或者DSL方式)拿到的表头长这样:
在这里插入图片描述

解决表头的问题

表头不同当然是没办法写数据的。

好在Spark有办法更换表头:

Map<String,String> targetSource =newLinkedHashMap<>(); 

targetSource.put("url","jdbc:hive2://192.168.88.11:10000"); 
targetSource.put("driver","org.apache.hive.jdbc.HiveDriver"); 
targetSource.put("user","root"); 
targetSource.put("password","123456"); 
targetSource.put("dbtable","aaa.test4");Dataset<Row> jdbcReadDF2 = sc.read().format("jdbc").options(targetSource).load();//使用JDBC读取到目标hive 的数据(用于获取表头) Dataset<Row> proDF2 = sc
    .createDataFrame(jdbcReadDF.rdd(), jdbcReadDF2.schema());//更换表头

再次走JDBC的方式读表,由于2个表结构相同,拿到的表头可以直接替换旧的DataFrame。那么直接spark.sql(“select * from db1.tb1 limit1”)的方式也可以拿到表头,这个问题就解决啦。

JDBC写入失败的问题

但是更换表头后,依旧不能以JDBC方式写入:

jdbcReadDF.write().format("jdbc").options(targetSource)//.mode(SaveMode.Append)//不能有这行,会报错method not support .save();

众所周知,mode共有4种,不设置默认调用的是已存在就报错(遇到的就是这种情况),还有ignore忽略且不处理,这2种都没什么用。但是切换成append或者overwrite模式后直接报错method not support。就很诡异。

不写mode时:

Exception in thread "main"org.apache.spark.sql.AnalysisException:Table or view 'aaa.test4' already exists. SaveMode:ErrorIfExists.;

在JdbcRelationProvider.scala中:

packageorg.apache.spark.sql.execution.datasources.jdbc

#71行附近
case SaveMode.ErrorIfExists =>thrownew AnalysisException(s"Table or view '${options.table}' already exists. "+s"SaveMode: ErrorIfExists.")

肯定是不能用默认模式。

使用.mode(SaveMode.Overwrite)后:目标表删没了(beeline中看到的),还会报错:

Exception in thread "main"org.apache.hive.service.cli.HiveSQLException:Errorwhile compiling statement: FAILED:ParseException line 1:31 cannot recognize input near '.''sid' 'INTEGER' in column type
    at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:264)
    at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:250)
    at org.apache.hive.jdbc.HiveStatement.runAsyncOnServer(HiveStatement.java:309)
    at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:250)
    at org.apache.hive.jdbc.HiveStatement.executeUpdate(HiveStatement.java:448)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:863)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:62)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.zhiyong.Hive2Hive.Hive2HiveJDBCDemo.main(Hive2HiveJDBCDemo.java:133)Caused by:org.apache.hive.service.cli.HiveSQLException:Errorwhile compiling statement: FAILED:ParseException line 1:31 cannot recognize input near '.''sid' 'INTEGER' in column type
    at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:387)
    at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:186)
    at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:269)
    at org.apache.hive.service.cli.operation.Operation.run(Operation.java:324)
    at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:460)
    at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:447)
    at sun.reflect.GeneratedMethodAccessor11.invoke(UnknownSource)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
    at org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
    at org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
    at java.security.AccessController.doPrivileged(NativeMethod)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
    at com.sun.proxy.$Proxy33.executeStatementAsync(UnknownSource)
    at org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:294)
    at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:497)
    at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1437)
    at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1422)
    at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
    at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
    at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
    at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)Caused by:java.lang.RuntimeException:org.apache.hadoop.hive.ql.parse.ParseException:line 1:31 cannot recognize input near '.''sid' 'INTEGER' in column type
    at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:207)
    at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
    at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:404)
    at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:329)
    at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1158)
    at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1145)
    at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:184)...26 more

Process finished withexit code 1

使用.mode(SaveMode.Append)后:

21/10/0714:26:23 WARN JdbcUtils:Requested isolation level 1, but transactions are unsupported
    
proDF2.write().format("jdbc").mode(SaveMode.Append).options(targetSource).option("isolationLevel","NONE")//解决上述不支持事务的问题.option("numPartitions","1")//解决上述不支持事务的问题.save();21/10/0714:26:23 ERROR Executor:Exception in task 0.0 in stage 11.0(TID11)java.sql.SQLException:Method not supported
    at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)//HivePreparedStatement.java的75行//* (non-Javadoc)//*//* @see java.sql.PreparedStatement#addBatch()publicvoidaddBatch()throwsSQLException{// TODO Auto-generated method stubthrownewSQLException("Method not supported");}*/
    
    
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:668)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)21/10/0714:26:23 WARN TaskSetManager:Lost task 0.0 in stage 11.0(TID11, localhost, executor driver):java.sql.SQLException:Method not supported
    at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:668)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)21/10/0714:26:23 ERROR TaskSetManager:Task0 in stage 11.0 failed 1 times; aborting job
Exception in thread "main"org.apache.spark.SparkException:Job aborted due tostage failure:Task0 in stage 11.0 failed 1 times, most recent failure:Lost task 0.0 in stage 11.0(TID11, localhost, executor driver):java.sql.SQLException:Method not supported
    at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:668)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:980)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:978)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:978)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:838)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.zhiyong.Hive2Hive.Hive2HiveJDBCDemo.main(Hive2HiveJDBCDemo.java:133)Caused by:java.sql.SQLException:Method not supported
    at org.apache.hive.jdbc.HivePreparedStatement.addBatch(HivePreparedStatement.java:75)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:668)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)Process finished withexit code 1

如果顺着error报错点进去看,可以发现spark集成JDBC后也是调用了Hive的JDBC去写数据,但是Hive的JDBC本身的addBatch方法就是简单地报错:method not support。

事实证明,JDBC方式读Hive没啥问题,但是写入好像不太行。

如果把表头换成正常的DF读表的表头,使用DataFrame的算子可以顺利写入数据到目标表:

proDF.write().format("Hive").mode(SaveMode.Append).saveAsTable("aaa.test4");//正常方式写入到Hive目标表

至此,Hive2Hive跨集群功能实现,在开发机(自行搭建的ApacheVM集群)和云桌面(Eclipse+CDP7.1.6.0)都使用DEMO验证,云桌面一次通过了Kerberos并写入了数据。事实证明这种方式是可行的。

架构变更

这种方式和正常的数据集成有所区别:source端的hive相当于是remote,target端的hive相当于是local。

正常的数据集成推数据时应该是source端的hive是local【主机群】,target端是remote【租户机群】。比如hive→FTP定长双文件,source端hive是local,ftp是remote server,jar也是运行在local的yarn cluster。

这种方式实现的hive2hive跨集群读写如果是用作推数据,jar是
运行在remote的yarn cluster。用图来说明:

在这里插入图片描述
变成了:
在这里插入图片描述
简言之,用于推数据时,跨集群后Spark的Job(打成Jar包)需要运行在目标端的Yarn集群。这种方式更像是租户集群主动拉去主机群数据。

不过跨集群读写Hive的需求确实可以用这种方式实现。

带来的新问题

由此可以看出,这种方式解决了跨集群读写2个Hive的问题,但是也产生了新问题。用于推数据时,原先的架构被破坏,导致

架构混乱

。如果保持原先的架构则不能实现远程推数据。故这种方式最适合的还是拉取远程集群Hive数据到本集群Hive【这种Hive->Hive的数据集成,是各种数据源->本集群Hive数据入湖的一种特例】,而非用于推数据。跨集群远程推数据到租户集群,除此之外,其实有更多办法。

由于Hive读数据时,本身要跑MapReduce【或者Tez、Spark】,一定会吃本集群Yarn的资源,这种方式当然也会吃远程集群的Yarn资源,但是传JDBC连接串这一个字符串参数当然是要比管理一大坨core-site.xml、hdfs-site.xml、yarn-site.xml配置文件要容易很多,这种代价也是可以承受的。

标签: java hive jdbc

本文转载自: https://blog.csdn.net/qq_41990268/article/details/124575115
版权归原作者 虎鲸不是鱼 所有, 如有侵权,请联系我们删除。

“二次开发Spark实现JDBC读取远程租户集群Hive数据并落地到本集群Hive的Hive2Hive数据集成【Java】”的评论:

还没有评论